数据结构
www.alibaba.com 222.38.194.133 440887 [09/20/2019 14:33:52 +0800] 河北 秦皇岛 铁通 2019 09 20
www.baidu.com 171.13.146.53 654665 [09/20/2019 14:33:52 +0800] 河南 焦作 电信 2019 09 20
www.baidu.com 139.207.106.29 722052 [09/20/2019 14:33:52 +0800] 四川 电信 2019 09 20
www.alibaba.com 182.91.131.22 22103 [09/20/2019 14:33:52 +0800] 广西 桂林 联通 2019 09 20
www.huawei.com 171.12.12.85 155927 [09/20/2019 14:33:52 +0800] 河南 郑州 电信 2019 09 20
//"""多目录输出:依据不同的key输出不同的数据
// www.alibaba.com: ip,流量,省,地址
182.86.190.207 2271 江西 景德镇
// www.baidu.com:ip,流量,省,时间
182.86.190.207 2271 江西 [09/20/2019 14:33:52 +0800]
// www.huawei.com:ip,流量,省,地址,所属营业厅
182.86.190.207 2271 江西 景德镇 电信
//
//"""
代码实现
val dataRdd = sc.textFile("data/etl_access.log")
.filter(x=>x.split("\t").length==10)
.map(x=> {
val splits = x.split("\t")
val domain = splits(0)
val ip = splits(1)
val flow = splits(2)
val pro = splits(4)
val city = splits(5)
val isp = splits(6)
val year = splits(7)
val month = splits(8)
val day = splits(9)
(domain,(domain,ip,flow,pro,city,isp,year,month,day))
}).coalesce(1,true)
.saveAsHadoopFile(outpath,classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat] )
//注:saveAsHadoopFile 必须是key-value类型 ;saveAsTextFiled底层也是调用saveAsHadoopFile方法的
// 重写 MultipleTextOutputFormat 方法:
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any,Any]{
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
//已key作为分目录
val fliename = key+"/"+name
fliename
}
override def generateActualKey(key: Any, value: Any): Any = {
//输出数据 不需要key
NullWritable.get()
}
override def generateActualValue(key: Any, value: Any): Any = {
//根据不同的key ,输出不同的value 待优化--->可以从数据库获取想要输出的value
val splits = value.toString.split(",")
val sb = new StringBuffer
key match {
case "www.ruozedata.com" =>{
sb.append(splits(0)).append("\t")
.append(splits(1)).append("\t")
.append(splits(3))
sb
}
case "www.baidu.com" =>{
sb.append(splits(0)).append("\t")
.append(splits(1)).append("\t")
.append(splits(2))
sb
}
case "www.alibaba.com" =>{
sb.append(splits(0)).append("\t")
.append(splits(2)).append("\t")
.append(splits(3)).append("\t")
.append(splits(4))
sb
}
case _ =>NullWritable.get()
}
}
}
没有评论