利用Spark统一日期格式

1) 请编写Spark程序,分析“发布日期”数据格式

2) 将日期格式统一为yyyy-mm-dd的形式

3) 程序打包并在Spark平台运行,结果输出至hdfs://master:9000/recruitspark3

查看数据数据源文件,可以发现数据中,每个字段用 | 分割

每个字段的含义为:职位名称、公司名称、工作城市、工作要求、招聘人数、薪资、工作技术、发布时间、招聘性别、公司描述、学历要求

(1)将数据源文件夹中的文件使用ftp工具上传到linux系统中的/home/hadoop/data目录下,并上传到hdfs的recruitspark2目录,查看hdfs文件系统/recruitspark2目录下文件内容。

package blog.bian.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object RecruitTask3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("RT")
    val sc = new SparkContext(conf)
    val lines: RDD[String] = sc.textFile("hdfs:///recruitspark2")
    val words: RDD[Array[String]] = lines.map(_.split("\\|"))
    var count: LongAccumulator = sc.longAccumulator("count")

    val dateReg="\\d{4}-\\d{2}-\\d{2}"
    val dateReg1="\\d{2}-\\d{2}-\\d{4}"

    val word: RDD[Array[String]] = words.map(x => {
      val datetime:String = x(7).trim
      var newdate=""
      var y,m,d=""
      if(!datetime.matches(dateReg)){
        if(datetime.matches(dateReg1)) {
          val dateSplit: Array[String] = datetime.split("-")
          y=dateSplit(2)
          m=dateSplit(0)
          d=dateSplit(1)
        } else {
          val dateSplit: Array[String] = datetime.split(",")
          if(dateSplit(0).takeRight(2).equals("th")){
            m=dateSplit(1)
            d=dateSplit(0).replaceAll("th","")
          } else {
            m=dateSplit(0)
            d=dateSplit(1).replaceAll("th","")
          }
          y=dateSplit(2)
        }
        m match {
          case "Jan" => m="01"
          case "Feb" => m="02"
          case "Mar" => m="03"
          case "Apr" => m="04"
          case "May" => m="05"
          case "Jun" => m="06"
          case "Jul" => m="07"
          case "Aug" => m="08"
          case "Sep" => m="09"
          case "Oct" => m="10"
          case "Nov" => m="11"
          case "Dec" => m="12"
          case _ => m=m
        }

        newdate=y+"-"+m+"-"+d
        x(7)=newdate
        count.add(1)
      }
      x

    })
//    val filterWord: RDD[Array[String]] = word.filter(x => {
//      !x(7).matches(dateReg)
//    })
//    filterWord.collect().foreach(println)
//val res: Array[Array[String]] = word.collect()
//    res.foreach(println)
val newLines: RDD[String] = word.map(x => x.mkString("|"))
    newLines.saveAsTextFile("hdfs:///recruitspark3")
    println("修改了"+count.value+"行")
    sc.stop()
  }
}

results matching ""

    No results matching ""