[原创] RDD transformations and actions can only be invoked by the driver
wjw870907
2017-12-07
新手刚学习spark
这是我的代码,就是根据kafka里的json数据取value判断日期写到不同hdfs目录 这个应该怎么写 val dateformat = new SimpleDateFormat("yyyyMMdd") val cal = Calendar.getInstance() cal.add(Calendar.DATE, -1) val today = dateformat.format(new Date()) val yesterday = dateformat.format(cal.getTime()) val lines = rdd.map(_._2) val words = lines.flatMap(_.split("\n")) for (word <- words) { val jsonstr = JSON.parseObject(word) val SHEET = jsonstr.getJSONArray("SHEET").getJSONObject(0).getJSONObject("HEADER").getLong("ENDTIME").toString val endtime = SHEET.substring(0, 8) if (endtime != today && endtime != yesterday) { words.saveAsTextFile("hdfs://192.168.1.1:8082/tmp/spark/ticket/" + endtime + "/error") } else { words.saveAsTextFile("hdfs://192.168.1.1:8082/tmp/spark/ticket/" + endtime) } } |
|
wjw870907
2017-12-07
有大神解答么
|
相关讨论
相关资源推荐
- RDD方法
- 初学spark碰到的一些问题
- rdd 操作的回调函数中不允许有其他rdd的操作
- Spark中RDD内部进行值转换遇到的问题
- Spark Streaming 避坑的注意事项
- Spark报错It appears that you are attempting to broadcast an RDD or reference an RDD from an action
- Spark RDD类源码阅读
- Spark源码-Core RDD部分代码解析(一)
- Caused by: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the
- java程序作业常见问题_spark 2.0.1使用过程中常见问题汇总