利用Spark统一日期格式
1) 请编写Spark程序,分析“发布日期”数据格式
2) 将日期格式统一为yyyy-mm-dd的形式
3) 程序打包并在Spark平台运行,结果输出至hdfs://master:9000/recruitspark3
查看数据数据源文件,可以发现数据中,每个字段用 | 分割
每个字段的含义为:职位名称、公司名称、工作城市、工作要求、招聘人数、薪资、工作技术、发布时间、招聘性别、公司描述、学历要求
(1)将数据源文件夹中的文件使用ftp工具上传到linux系统中的/home/hadoop/data目录下,并上传到hdfs的recruitspark2目录,查看hdfs文件系统/recruitspark2目录下文件内容。
package blog.bian.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object RecruitTask3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RT")
val sc = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("hdfs:///recruitspark2")
val words: RDD[Array[String]] = lines.map(_.split("\\|"))
var count: LongAccumulator = sc.longAccumulator("count")
val dateReg="\\d{4}-\\d{2}-\\d{2}"
val dateReg1="\\d{2}-\\d{2}-\\d{4}"
val word: RDD[Array[String]] = words.map(x => {
val datetime:String = x(7).trim
var newdate=""
var y,m,d=""
if(!datetime.matches(dateReg)){
if(datetime.matches(dateReg1)) {
val dateSplit: Array[String] = datetime.split("-")
y=dateSplit(2)
m=dateSplit(0)
d=dateSplit(1)
} else {
val dateSplit: Array[String] = datetime.split(",")
if(dateSplit(0).takeRight(2).equals("th")){
m=dateSplit(1)
d=dateSplit(0).replaceAll("th","")
} else {
m=dateSplit(0)
d=dateSplit(1).replaceAll("th","")
}
y=dateSplit(2)
}
m match {
case "Jan" => m="01"
case "Feb" => m="02"
case "Mar" => m="03"
case "Apr" => m="04"
case "May" => m="05"
case "Jun" => m="06"
case "Jul" => m="07"
case "Aug" => m="08"
case "Sep" => m="09"
case "Oct" => m="10"
case "Nov" => m="11"
case "Dec" => m="12"
case _ => m=m
}
newdate=y+"-"+m+"-"+d
x(7)=newdate
count.add(1)
}
x
})
// val filterWord: RDD[Array[String]] = word.filter(x => {
// !x(7).matches(dateReg)
// })
// filterWord.collect().foreach(println)
//val res: Array[Array[String]] = word.collect()
// res.foreach(println)
val newLines: RDD[String] = word.map(x => x.mkString("|"))
newLines.saveAsTextFile("hdfs:///recruitspark3")
println("修改了"+count.value+"行")
sc.stop()
}
}