数据结构

www.alibaba.com    222.38.194.133    440887    [09/20/2019 14:33:52 +0800]    河北    秦皇岛 铁通    2019    09    20
www.baidu.com    171.13.146.53    654665    [09/20/2019 14:33:52 +0800]    河南    焦作    电信    2019    09    20
www.baidu.com    139.207.106.29    722052    [09/20/2019 14:33:52 +0800]    四川        电信    2019    09    20
www.alibaba.com    182.91.131.22    22103    [09/20/2019 14:33:52 +0800]    广西    桂林    联通    2019    09    20
www.huawei.com    171.12.12.85    155927    [09/20/2019 14:33:52 +0800]    河南    郑州    电信    2019    09    20
//"""多目录输出:依据不同的key输出不同的数据
//  www.alibaba.com: ip,流量,省,地址
        182.86.190.207  2271 江西  景德镇
//  www.baidu.com:ip,流量,省,时间
        182.86.190.207  2271 江西 [09/20/2019 14:33:52 +0800]
//  www.huawei.com:ip,流量,省,地址,所属营业厅
        182.86.190.207  2271 江西  景德镇 电信    
//  
//"""
代码实现 
val dataRdd = sc.textFile("data/etl_access.log")
      .filter(x=>x.split("\t").length==10)
      .map(x=> {
        val splits = x.split("\t")
          val domain = splits(0)
          val ip = splits(1)
          val flow = splits(2)
          val pro = splits(4)
          val city = splits(5)
          val isp = splits(6)
          val year = splits(7)
          val month = splits(8)
          val day = splits(9)
        (domain,(domain,ip,flow,pro,city,isp,year,month,day))
      }).coalesce(1,true)
      .saveAsHadoopFile(outpath,classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat] )
 //注:saveAsHadoopFile 必须是key-value类型 ;saveAsTextFiled底层也是调用saveAsHadoopFile方法的
 
// 重写 MultipleTextOutputFormat 方法:

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any,Any]{
    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
      //已key作为分目录
      val fliename = key+"/"+name
      fliename
    }

    override def generateActualKey(key: Any, value: Any): Any = {
      //输出数据 不需要key
      NullWritable.get()
    }

    override def generateActualValue(key: Any, value: Any): Any = {
      //根据不同的key ,输出不同的value 待优化--->可以从数据库获取想要输出的value
      val splits = value.toString.split(",")
      val sb = new StringBuffer
      key match {
        case "www.ruozedata.com" =>{
          sb.append(splits(0)).append("\t")
            .append(splits(1)).append("\t")
            .append(splits(3))
          sb
        }
        case "www.baidu.com" =>{
          sb.append(splits(0)).append("\t")
            .append(splits(1)).append("\t")
            .append(splits(2))
          sb
        }
        case "www.alibaba.com" =>{
          sb.append(splits(0)).append("\t")
            .append(splits(2)).append("\t")
            .append(splits(3)).append("\t")
            .append(splits(4))
          sb
        }
        case _ =>NullWritable.get()
      }
    }

  }