[原创] RDD transformations and actions can only be invoked by the driver

wjw870907 2017-12-07
新手刚学习spark
这是我的代码,就是根据kafka里的json数据取value判断日期写到不同hdfs目录
这个应该怎么写
    val dateformat = new SimpleDateFormat("yyyyMMdd")
    val cal = Calendar.getInstance()
    cal.add(Calendar.DATE, -1)
    val today = dateformat.format(new Date())
    val yesterday = dateformat.format(cal.getTime())
    val lines = rdd.map(_._2)
    val words = lines.flatMap(_.split("\n"))
    for (word <- words) {
      val jsonstr = JSON.parseObject(word)
      val SHEET = jsonstr.getJSONArray("SHEET").getJSONObject(0).getJSONObject("HEADER").getLong("ENDTIME").toString
      val endtime = SHEET.substring(0, 8)
      if (endtime != today && endtime != yesterday) {
        words.saveAsTextFile("hdfs://192.168.1.1:8082/tmp/spark/ticket/" + endtime + "/error")
      }
      else {
        words.saveAsTextFile("hdfs://192.168.1.1:8082/tmp/spark/ticket/" + endtime)
      }
    }
wjw870907 2017-12-07
有大神解答么
Global site tag (gtag.js) - Google Analytics