ransformations:转换算子
Actions:操作算子

所谓的转换算子,其实就是通过调用RDD对象的方法,将旧的RDD转换为新的RDD,通过转换,将多个功能组合在一起;如果一个算子的返回值是一个新的rdd,那么这个算子就是转换算子

所有会产生shuffle的算子都可以指定分区数

构建RDD的方法:

方法一:通过读取文件(比较麻烦)

方法二:基Scala集合构建RDD(多用于测试)

方法三:基Scala集合构建RDD

一、转换算子
map
map算子:将RDD中的数据一条一条传递给后面的函数,将函数的返回值构建成一个新的RDD。
map不会产生shuffle,map之后RDD的分区数等于map之前RDD的分区数。
map----懒执行,需要操作算子(Action算子)触发才能执行
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo3Map {
def main(args: Array[String]): Unit = {

//创建spark上下文对象
val conf: SparkConf = new SparkConf()
  .setAppName("map")
  .setMaster("local")
val sc = new SparkContext(conf)

//基于集合构建RDD
val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

/**
 * map算子:将RDD中的数据一条一条传递给后面的函数,
 *          将函数的返回值构建成一个新的RDD。
 * map不会产生shuffle,map之后RDD的分区数等于map之前RDD的分区数.
 *
 * map----懒执行,需要操作算子触发才能执行
 * 如果结尾没有foreach,该代码不会执行,
 * println("map算子")也不会执行
 */
  
  //操作数据,将RDD的元素值乘以2
  //使用map算子
  val mapRDD: RDD[Int] = listRDD.map(i =>{
    println("map算子")
    i * 2
  })

   mapRDD.foreach(println)

}
}

执行结果:
        map算子
        2
        map算子
        4
        map算子
        6
        map算子
        8
        map算子
        10
        map算子
        ...

mapPartitions
mapPartitions :

将每一个分区的数据传递给后面的函数,函数需要返回一个迭代器,再构建一个新的rdd

mapPartitions----懒执行,转换算子
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo3Map {
def main(args: Array[String]): Unit = {

//创建spark上下文对象
val conf: SparkConf = new SparkConf()
  .setAppName("map")
  .setMaster("local")
val sc = new SparkContext(conf)

//基于集合构建RDD
val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

/**
 * mapPartitions : 将每一个分区的数据传递给后面的函数,
 *                  函数需要返回一个迭代器,再构建一个新的rdd
 *
 * mapPartitions: 懒执行,转换算子
 */
val mapPartitionsRDD: RDD[Int] = listRDD.mapPartitions(
  (iter: Iterator[Int]) => {
    val iterator: Iterator[Int] = iter.map(i => i * 2)
    iterator  //返回一个迭代器
  }
)
mapPartitionsRDD.foreach(println)

}
}

    执行结果:
            2
            4
            6
            8
            10
            12
            14
            16
            18

mapPartitionsWithIndex
mapPartitionsWithIndex: 一次遍历一个分区,会多一个分区的下标
需要传两个参数:下标、迭代器
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo3Map {
def main(args: Array[String]): Unit = {

//创建spark上下文对象
val conf: SparkConf = new SparkConf()
  .setAppName("map")
  .setMaster("local")
val sc = new SparkContext(conf)

//基于集合构建RDD
val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

/**
 * mapPartitionsWithIndex: 一次遍历一个分区,会多一个分区的下标
 * 需要传两个参数:下标、迭代器
 */
  
//数据不做处理,直接返回
val mapPartitionsWithIndexRDD: RDD[Int] = listRDD.mapPartitionsWithIndex((index: Int, iter: Iterator[Int]) => {
  println(s"当前分区的编号:$index")
  iter
})
mapPartitionsWithIndexRDD.foreach(println)

}
}

    执行结果:
            当前分区的编号:0
            1
            2
            3
            4
            5
            6
            7
            8
            9

filter:过滤
filter: 对rdd中的数据进行过滤,函数返回true保留数据,函数返回false过滤数据
filter: 转换算子,懒执行
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo4Filter {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo4Filter")
  .setMaster("local")
val sc = new SparkContext(conf)

//使用Scala集合构建RDD
val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8))
/**
 * filter: 对rdd中的数据进行过滤,函数返回true保留数据,函数返回false过滤数据
 *
 * filter: 转换算子,懒执行
 */
//过滤出奇数
val filterRDD: RDD[Int] = listRDD.filter(i => {
  i % 2 == 1
})
filterRDD.foreach(println)

}
}

    执行结果:
            1
            3
            5
            7

flatMap:拆分
flatMap:

 将rdd的数据一条一条传递给后面的函数,
 函数的返回值是一个scala集合,可以是数组、list、set、map..
 最后会将这个集合拆分出来构建成一个新的RDD

package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo5FlatMap {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo5FlatMap")
  .setMaster("local")
val sc = new SparkContext(conf)

val listRDD: RDD[String] = sc.parallelize(List("java,spark,java", "spark,scala,hadoop"))
/**
 * flatMap:
 *      将rdd的数据一条一条传递给后面的函数,
 *      函数的返回值是一个scala集合,可以是数组、list、set、map..
 *      最后会将这个集合拆分出来构建成一个新的rdd
 */

val wordsRDD: RDD[String] = listRDD.flatMap(lines => {
  val arr: Array[String] = lines.split(",")//切分
  arr.toList//返回值是一个数组(为了观看元素值,进行toList)
})
wordsRDD.foreach(println)

}
}

    执行结果:
            java
            spark
            java
            spark
            scala
            hadoop

sample:抽样
sample: 抽样(不是精确抽样,100个数据抽10%,结果不一定就是10个)

    第一个参数:withReplacement:----是否放回,填false/true
    第二个参数:fraction----抽样比例

package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo6Sample {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo6Sample")
  .setMaster("local")
val sc = new SparkContext(conf)

//抽样必须在数据样本足够的情况下才能抽样,这里选择学生表抽样
//读取文件构建RDD
val studentsRDD: RDD[String] = sc.textFile("data/students.txt")
/**
 * sample: 抽样(不是精确抽样,100个数据抽10%,结果不一定就是10个)
 * 第一个参数:withReplacement:----是否放回,填false/true
 * 第二个参数:fraction----抽样比例
 */
val sampleRDD: RDD[String] = studentsRDD.sample(false, 0.01)
sampleRDD.foreach(println)

}
}

    执行结果:
            1500100072,段怀梦,23,女,理科二班
            1500100267,应浩波,22,男,理科二班
            1500100276,庾运鹏,24,男,文科一班
            1500100363,郝海荣,21,男,理科一班
            1500100499,陶千凡,22,女,文科二班
            1500100559,能千凡,21,女,理科六班
            1500100575,张恨桃,24,女,理科五班
            1500100624,乜昊苍,24,男,文科六班
            1500100969,毛昆鹏,24,男,文科三班
            1500100996,厉运凡,24,男,文科三班

groupBy:指定一个列进行分组
groupByKey:对k-v格式的RDD进行分组
groupByKey: 按照key进行分组,必须是kv格式的rdd, 将同一个key的value放到迭代器中

        比groupBy的性能高级一点,通常groupByKey比groupBy用得较多一些

groupBy: 指定一个分组的列,返回的rdd的value包含所有的列

     shuffle过程中需要传输的数据量groupBykKy要多,性能差一点

package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo8GroupByKey {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo8GroupByKey")
  .setMaster("local")
val sc = new SparkContext(conf)

//读取文件构建RDD
val linesRDD: RDD[String] = sc.textFile("data/words.txt")
//拆分
val wordsRDD: RDD[String] = linesRDD.flatMap(lines => lines.split(","))
//将RDD转成kv格式(value设置成1,让其和单词凑成一个kv格式)
val kvRDD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
/**
 * groupByKey: 按照key进行分组,必须是kv格式的rdd, 将同一个key的value放到迭代器中
 *             比groupBy的性能高级一点,通常groupByKey比groupBy用得较多一些
 *
 * groupBy: 指定一个分组的列,返回的rdd的value包含所有的列
 * shuffle过程中需要传输的数据量groupBykKy要多,性能差一点
 */
//groupByKey按照key分组
val groupByKeyRDD: RDD[(String, Iterable[Int])] = kvRDD.groupByKey()
groupByKeyRDD.foreach(println)
//groupBy指定key进行分组
val groupByRDD: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(word => word._1)
groupByRDD.foreach(println)

  //统计单词的数量
val countRDD: RDD[(String, Int)] = groupByKeyRDD.map {
  case (word: String, ints: Iterable[Int]) =>
    val count: Int = ints.sum
    (word, count)
}
countRDD.foreach(println)

}
}
//groupByKey进行分组的结果
(spark,CompactBuffer(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1))
(hive,CompactBuffer(1, 1, 1, 1, 1, 1))
(hadoop,CompactBuffer(1, 1, 1, 1, 1, 1))
(java,CompactBuffer(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1))

//groupBy进行分组的结果
(spark,CompactBuffer((spark,1), (spark,1), (spark,1), (spark,1), (spark,1),..))
(hive,CompactBuffer((hive,1), (hive,1), (hive,1), (hive,1), (hive,1),...))
(hadoop,CompactBuffer((hadoop,1), (hadoop,1), (hadoop,1), (hadoop,1),...))
(java,CompactBuffer((java,1), (java,1), (java,1), (java,1), (java,1),...))

//groupByKey统计单词的数量
(spark,12)
(hive,6)
(hadoop,6)
(java,12)
reduceByKey:预聚合,按照key进行聚合计算(只适合简单的聚合计算)
reduceByKey: 按照key进行聚合计算,会在Map端进行预聚合

         只能做简单的聚合计算(如果逻辑比较复杂的时候,就不能使用reduceByKey了)
         

(f:v,v=>v)-----相当于v1,v2=>v1+v2
(lyw,1),(lyw,1)转换成(lyw,2)

groupByKey只能分组,然后再使用count聚合
reduceByKey是分组加聚合
package com.shujia.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo9ReduceByKey {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo9ReduceByKey")
  .setMaster("local")
val sc = new SparkContext(conf)

//读取文件构建RDD
val linesRDD: RDD[String] = sc.textFile("data/words.txt")
//拆分
val wordsRDD: RDD[String] = linesRDD.flatMap(lines => lines.split(","))
//将RDD转成kv格式
val kvRDD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))

/**
 * reduceByKey: 按照key进行聚合计算,会在Map端进行预聚合
 *              只能做简单的聚合计算
 * (f:v,v=>v)   相当于v1,v2=>v1+v2
 * (lyw,1),(lyw,1)转换成(lyw,2)
 */
val reduceByKeyRDD: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
reduceByKeyRDD.foreach(println)

}
}
reduceByKey和groupByKey的底层原理比较图

aggregateByKey:预聚合,按照key进行聚合计算
reduceByKey:只适合简单的聚合计算

    会在map进行预聚合,聚合函数会应用在map端和reduce端(聚合函数会应用在分区内的聚合和分区间的聚合)
    

aggregateByKey:

    aggregateByKey(初始值:U)(分区内:(U,V)=>U,分区间:(U1,U2)=>U)

简化-----aggregateByKey(U)((U,V)=>U,(U1,U2)=>U)

    按照key进行聚合,需要两个聚合函数,一个是map端的聚合,一个是reduce端的聚合
    

在大数据计算组shuffle是最耗时间的,shuffle过程是数据是需要落地到磁盘的
使用aggregateByKey性能更好,不适合使用groupByKey再count

aggregateByKey: 会再map端做预聚合,性能高, 可以减少shuffle过程中的数据量

            1、初始值,初始值可以有多个
            2、map端的聚合函数
            3、reduce端的聚合函数

//统计单词的数量
package com.shujia.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo14Agg {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo14Agg")
  .setMaster("local")
val sc = new SparkContext(conf)

//读取文件构建RDD
val linesRDD: RDD[String] = sc.textFile("data/words.txt")
//拆分
val wordsRDD: RDD[String] = linesRDD.flatMap(lines => lines.split(","))
//将RDD转成kv格式
val kvRDD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))

//统计单词的数量,reduceByKey方法
val reduceRDD: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
reduceRDD.foreach(println)
//(spark,12)
//(hive,6)
//(hadoop,6)
//(java,12)

//统计单词的数量,reduceByKey方法
val aggRDD: RDD[(String, Int)] = kvRDD.aggregateByKey(0)(
  (u: Int, v: Int) => u + v,
  (u1: Int, u2: Int) => u1 + u2
)
aggRDD.foreach(println)
//(spark,12)
//(hive,6)
//(hadoop,6)
//(java,12)

}
}
//计算每个班级学生的平均年龄
package com.shujia.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo15AggAvgAge {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo15AggAvgAge")
  .setMaster("local")
val sc = new SparkContext(conf)

//读取文件构建RDD
val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

//取出班级和年龄
val clazzAndAge: RDD[(String, Double)] = studentsRDD.map(stu => {
  val split: Array[String] = stu.split(",")
  (split(4),split(2).toDouble)
})
clazzAndAge.foreach(println)
//(文科六班,22.0)
//(文科六班,24.0)
//(理科六班,22.0)
//(理科三班,24.0)
//...

// //使用groupByKey先进行分组
// val groupByKeyRDD: RDD[(String, Iterable[Double])] = clazzAndAge.groupByKey()
// groupByKeyRDD.foreach(println)
// //(理科二班,CompactBuffer(21.0, 23.0, 21.0, 23.0, 21.0, 21.0, ...))
// //(文科三班,CompactBuffer(22.0, 21.0, 23.0, 24.0, 24.0, 23.0, ...))
// //...
//
// //再进行平均年龄
// val clazzAvgAgeRDD: RDD[(String, Double)] = groupByKeyRDD.map{
// case (clazz:String, iter:Iterable[Double]) =>
// val avgAge: Double = iter.sum / iter.size //平均年龄=班级总年龄/人数
// (clazz,avgAge)
// }
// clazzAvgAgeRDD.foreach(println)
// //(理科二班,22.556962025316455)
// //(文科三班,22.680851063829788)
// //....

/**
 * 再大数据计算组shuffle是最耗时间的,shuffle过程是数据是需要落地到磁盘的
 * 使用aggregateByKey性能更好,不适合使用groupByKey再count
 * aggregateByKey: 会再map端做预聚合,性能高, 可以减少shuffle过程中的数据量
 * 1、初始值,初始值可以有多个
 * 2、map端的聚合函数
 * 3、reduce端的聚合函数
 */
//初始值是一个---泛型,初始值是两个---元组
//初始值并不是班级和年龄,是年龄和人数
val aggRDD: RDD[(String, (Double, Int))] = clazzAndAge.aggregateByKey((0.0, 0))(
  (u: (Double, Int), age: Double) => (u._1 + age, u._2 + 1), //map端的聚合函数:年龄相加一次,人数加1
  (u1: (Double, Int), u2: (Double, Int)) => (u1._1 + u2._1, u1._2 + u2._2) //reduce端的聚合函数
)
aggRDD.foreach(println)
//(理科二班,(1782.0,79))
//(文科三班,(2132.0,94))
//(理科四班,(2060.0,91))
//...

//计算平均年龄
val avgAgeRDD2: RDD[(String, Double)] = aggRDD.map {
  case (clazz: String, (sumAge: Double, num: Int)) =>
    (clazz, sumAge / num)
}

avgAgeRDD2.foreach(println)
//(理科二班,22.556962025316455)
//(文科三班,22.680851063829788)
//(理科四班,22.63736263736264)
//(理科一班,22.333333333333332)
//...

}
}
union:合并
distinct:去重
intersection:取交集
union: 合并两个rdd, 两个rdd的数据类型要一致
union:只是代码逻辑层面的合并,在物理层面并没有合并
union:不会产生shuffle

distinct:去重
distinct: 会产生shuffle
distinct;会先在map端局部去重,再到reduce全局去重

intersection:取两个rdd的交集
package com.shujia.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo10Union {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo10Union")
  .setMaster("local")
val sc = new SparkContext(conf)

//构建两个RDD
val listRDD1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8))
val listRDD2: RDD[Int] = sc.makeRDD(List(4, 5, 6, 7, 8,9,10))

val unionRDD: RDD[Int] = listRDD1.union(listRDD2)
unionRDD.foreach(println)
/**
 * union: 合并两个rdd, 两个rdd的数据类型要一致
 * union:只是代码逻辑层面的合并,在物理层面并没有合并
 * union:不会产生shuffle
 */
val distinctRDD: RDD[Int] = unionRDD.distinct()
distinctRDD.foreach(println)
/**
 * distinct:去重
 * distinct: 会产生shuffle
 * distinct;会先在map端局部去重,再到reduce全局去重
 */

val intersectionRDD: RDD[Int] = listRDD1.intersection(listRDD2)
intersectionRDD.foreach(println)

/**
 * intersection:取两个rdd的交集
 *
 */

}
}
join:内关联
left join:左关联
full join:全外关联
必须是kv格式的RDD!

inner join:内关联,通过RDD的key进行关联,
left join : 以左为主,如果右没有,补None。所用函数----leftOuterJoin
full join : 全外关联,两边都可能关联不上,关联不上补None。所用函数----fullOuterJoin
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo11Join {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo11Join")
  .setMaster("local")
val sc = new SparkContext(conf)

//构建RDD
val namesRDD: RDD[(String, String)] = sc.makeRDD(List(("001", "张三"), ("002", "李四")))
val agesRDD: RDD[(String, Int)] = sc.makeRDD(List(("002", 24), ("003", 25)))

/**
 * inner join:通过rRDD的key进行关联,必须是kv格式的RDD
 */
val innerJoinRDD: RDD[(String, (String, Int))] = namesRDD.join(agesRDD)
innerJoinRDD.foreach(println) //(002,(李四,24))
//关联之后操作一下数据(便于观看)

// val RDD=innerJoinRDD.map(kv=>{
// val id: String = kv._1
// val name: String = kv._2._1
// val age: Int = kv._2._2
// (id,name,age)
// })
// RDD.foreach(println)//(002,李四,24)

//当参数类型较多的时候,不适合使用上面的方法
//使用case更简洁
val RDD: RDD[(String, String, Int)] =innerJoinRDD.map{
  case (id:String,(name:String,age:Int))=>
    (id,name,age)
}
RDD.foreach(println)//(002,李四,24)

/**
 * left join : 以左为主,如果右没有,补None
 * leftOuterJoin
 */
val leftJoinRDD: RDD[(String, (String, Option[Int]))] = namesRDD.leftOuterJoin(agesRDD)
leftJoinRDD.foreach(println)
//(002,(李四,Some(24)))
//(001,(张三,None))

//整理数据
//结果会有两种:关联上的、未关联上的。需要使用两次case
val RDD2: RDD[(String, String, Int)] =leftJoinRDD.map{
  case (id:String, (name:String, Some(age)))=>
  (id,name,age)//关联上的返回值
  case (id:String,(name:String,None))=>
    (id,name,0)//未关联上的返回值,未关联上的,Int类型的我们给它设置个默认值--0
}
RDD2.foreach(println)
//(002,李四,24)
//(001,张三,0)

/**
 * full join: 全外关联,两边都可能关联不上,补None
 * fullOuterJoin
 */
val fullJoinRDD: RDD[(String, (Option[String], Option[Int]))] = namesRDD.fullOuterJoin(agesRDD)
fullJoinRDD.foreach(println)
//(003,(None,Some(25)))
//(002,(Some(李四),Some(24)))
//(001,(Some(张三),None))

//整理数据
val RDD3: RDD[(String, Any, Int)] =fullJoinRDD.map{
  case (id:String, (Some(name),Some(age)))=>  //匹配的数据都有
    (id,name,age)
  case (id:String,(Some(name),None))=>  //没有匹配到age
    (id,name,0)
  case  (id:String,(None,Some(age)))=>  //没有匹配到name
    (id,"无名氏",age)
}
RDD3.foreach(println)
//(003,无名氏,25)
//(002,李四,24)
//(001,张三,0)

}
}
mapValues:只处理kv格式的v
mapValues: kv格式,只对value做处理,key不变
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo12MapValues {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo12MapValues")
  .setMaster("local")
val sc = new SparkContext(conf)

//构建RDD
val agesRDD: RDD[(String, Int)] = sc.makeRDD(List(("002", 24), ("003", 25)))

//年龄加1
//使用map方法
val mapRDD: RDD[(String, Int)] = agesRDD.map{
  case (id:String, age:Int)=>
    (id,age+1)
}
mapRDD.foreach(println)
//(002,25)
//(003,26)

/**
 * mapValues: 只对value做处理,key不变
 */
//使用mapValues方法
val mapValuesRDD: RDD[(String, Int)] = agesRDD.mapValues(v => v + 1)
mapValuesRDD.foreach(println)
//(002,25)
//(003,26)

}
}
sortBy:指定一个列进行排序(默认升序)
sortBy: 指定数据中的一个列进行排序,默认是升序

ascending----控制排序方式
ascending=true-----升序(默认不写)
ascending=false----降序
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo13Sort {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo13Sort")
  .setMaster("local")
val sc = new SparkContext(conf)

//读取文件创建RDD
val studentsRDD: RDD[String] = sc.textFile("data/students.txt")
/**
 * sortBy: 指定数据中的一个列进行排序,默认是升序
 * ascending 控制排序方式
 */
//按照学生的年龄进行排序
val sortRDD: RDD[String] = studentsRDD.sortBy(stu => {
  val age: Int = stu.split(",")(2).toInt
  age
})
sortRDD.foreach(println)

}
}
//执行结果
...
1500100985,申飞珍,21,女,文科一班
1500100993,衡从蕾,21,女,理科二班
1500100997,陶敬曦,21,男,理科六班
1500100001,施笑槐,22,女,文科六班
1500100003,单乐蕊,22,女,理科六班
1500100005,宣谷芹,22,女,理科五班
1500100008,符半双,22,女,理科六班
...

//在排序的时候,加上ascending = false,就是降序
val sortRDD: RDD[String] = studentsRDD.sortBy(stu => {

  val age: Int = stu.split(",")(2).toInt
  age
},ascending = false)
sortRDD.foreach(println)

//执行结果
...
1500100996,厉运凡,24,男,文科三班
1500100007,尚孤风,23,女,文科六班
1500100010,羿彦昌,23,男,理科六班
1500100014,羿旭炎,23,男,理科五班
1500100016,潘访烟,23,女,文科一班
1500100020,杭振凯,23,男,理科四班
...
sortByKey:kv格式----通过key进行排序,默认升序
sortByKey: kv格式,通过key进行排序,默认升序。不需要指定参数
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo13Sort {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo13Sort")
  .setMaster("local")
val sc = new SparkContext(conf)
  
/**
 * sortByKey: 通过key进行排序,默认升序
 */
  
//构建RDD
val agesRDD: RDD[(Int, String)] = sc.makeRDD(List((25, "002"), (23, "003")))

val sortByKeyRDD: RDD[(Int, String)] = agesRDD.sortByKey()

sortByKeyRDD.foreach(println)
//(23,003)
//(25,002)

}
}
cartesian:笛卡尔积
cartesian:笛卡尔积
性能很差,需求很少,使用率很低
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo16cartesian {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo16cartesian")
  .setMaster("local")
val sc = new SparkContext(conf)

val namesRDD: RDD[(String, String)] = sc.makeRDD(List(("001", "张三"), ("002", "李四")))
val agesRDD: RDD[(String, Int)] = sc.makeRDD(List(("002", 24), ("003", 25)))

/**
 * cartesian:笛卡尔积
 * 性能很差,需求很少,使用率很低
 */
val cartesianRDD: RDD[((String, String), (String, Int))] = namesRDD.cartesian(agesRDD)
cartesianRDD.foreach(println)
//((001,张三),(002,24))
//((001,张三),(003,25))
//((002,李四),(002,24))
//((002,李四),(003,25))

}
}
二、操作算子
foreach:一次遍历一条数据
foreachPartition:一次遍历一个分区(返回迭代器)
saveAsTextFile: 将数据保存到hdfs
count: 统计行数
collect: 将rdd的数据拉取到内存中
foreach: 一次遍历一条数据
foreachPartition:一次遍历一个分区,一个分区返回一个迭代器,参数是一个迭代器类型
saveAsTextFile: 将数据保存到hdfs,执行之前输出目录不能存在,rdd一个分区对应一个文件
count: 统计行数
collect: 将rdd的数据拉取到内存中,返回一个数组;如果数据量很大,会出现内存溢出

spark:任务的层级关系
application(应用程序) --->job(触发的任务)-->stage(map端/reduce端)--> task(运行任务)
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo7Action1 {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo7Action1")
  .setMaster("local")
val sc = new SparkContext(conf)

/**
 * action算子   -- 触发任务执行,每一个action算子都会触发一个job任务
 * 1、foreach: 遍历rdd
 * 2、saveAsTextFile : 保存数据
 * 3、count : 统计rdd的行数
 * 4、collect: 将rdd转换成scala的集合
 */

//读取文件构建RDD
val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

/**
 * foreach: 一次遍历一条数据
 * foreachPartition:一次遍历一个分区,一个分区返回一个迭代器
 *                    参数是一个迭代器类型
 */
studentsRDD.foreach(println)

studentsRDD.foreachPartition((iter:Iterator[String])=>println(iter.toList))

//foreach执行结果:
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
1500100004,葛德曜,24,男,理科三班
1500100005,宣谷芹,22,女,理科五班
1500100006,边昂雄,21,男,理科二班
...

//foreachPartition执行结果:
List(1500100001,施笑槐,22,女,文科六班, 1500100002,吕金鹏,24,男,文科六班,...)

/**
 * saveAsTextFile: 将数据保存到hdfs,
 * 1、执行之前输出目录不能存在
 * 2、rdd一个分区对应一个文件
 */
studentsRDD.saveAsTextFile("data/temp2")

/**
 *count: 统计行数
 */
val count: Long = studentsRDD.count()
println(s"studentsRDD的行数:$count")


/**
 * collect: 将rdd的数据拉取到内存中,返回一个数组
 * 如果数据量很大,会出现内存溢出
 */
val studentArr: Array[String] = studentsRDD.collect()
println(studentArr)

}
}
sum:求和
reduce: 全局聚合
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Deno17Reduce {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Deno17Reduce")
  .setMaster("local")
val sc = new SparkContext(conf)

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

/**
 * sum:求和,只能用于int或者double,long类型的求和
 * sum:action算子
 */
val sum: Double = listRDD.sum()
println(sum)//45.0

/**
 * reduce: 全局聚合
 * reduce:action算子(操作算子)
 *
 * reduceBykYe:通过key进行聚合
 * reduceBykYe:转换算子
 */
val reduce: Int = listRDD.reduce((x, y) => x + y)
println(reduce)//45

}
}
take:取top
package com.shujia.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo18Take {
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
  .setAppName("Demo18Take")
  .setMaster("local")
val sc = new SparkContext(conf)

val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

/**
 * take: 取top,是一个action算子
 */
//获取前10条数据
val top10: Array[String] = studentsRDD.take(10)
top10.foreach(println)

//获取第一条数据
val first: String = studentsRDD.first()
println(first)

}
}