读取多个文件为一个RDD

读取多个文件为RDD时,我们可以使用textFile()或者wholeTextFiles函数,这两个函数之间的主要的区别在于:返回内容的不同,wholeTextFiles会返回文件名和文件内容,而textFile()只返回文件内容。

textFile()

读取单个或多个文本、csv 文件并返回单个 Spark RDD [String]

Read single or multiple text, csv files and returns a single Spark RDD [String]

wholeTextFiles()

读取单个或多个文件并返回单个 RDD[Tuple2[String, String]],其中元组中的第一个值 (1) 是 文件名,第二个值 (2) 是文件的内容。

Reads single or multiple files and returns a single RDD[Tuple2[String, String]], where first value (1) in a tuple is a file name and second value (2) is content of the file.

1 准备数据

在开始之前,我们假设在文件夹“c:/tmp/files”中有以下文件名和文件内容,我使用这些文件来演示示例。

文件名称文件内容
text01.txtOne,1
text02.txtTwo,2
text03.txtThree,3
text04.txtFour,4
text05.txtInvalid,I

2 Spark 将目录中的所有文本文件读入形成单个 RDD

读取某个目录下的所有文件操作很简单,只需要指定文件路径即可,具体案例如下:

object TextfileDemo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("textFile")
      .master("local")
      .getOrCreate()
    //第一种方法 textfile
    val rdd: RDD[String] = spark.sparkContext.textFile("InData/SparkScalaExampleData/files/*")
    //遍历内容
    rdd.foreach(f =>
    println(f)
    )
    /*
    * Invalid,I
      One,1
      Two,2
      Three,3
      Four,4
    * */

    //第二种方法:wholeTextFiles
    val WholeRDD: RDD[(String, String)] = spark.sparkContext.wholeTextFiles("InData/SparkScalaExampleData/files/*")
    println("wholeTextFiles内容:")
    WholeRDD.foreach(println)
    /*
    * (file:/D:/workSpace/Ideaworkspace/SparkHadoop/InData/SparkScalaExampleData/files/invalid.txt,Invalid,I)
      (file:/D:/workSpace/Ideaworkspace/SparkHadoop/InData/SparkScalaExampleData/files/text01.txt,One,1)
      (file:/D:/workSpace/Ideaworkspace/SparkHadoop/InData/SparkScalaExampleData/files/text02.txt,Two,2)
      (file:/D:/workSpace/Ideaworkspace/SparkHadoop/InData/SparkScalaExampleData/files/text03.txt,Three,3)
      (file:/D:/workSpace/Ideaworkspace/SparkHadoop/InData/SparkScalaExampleData/files/text04.txt,Four,4)
    * 
    * */
  }

} 

3 读取目录下部分文件形成一个RDD

当您知道要读取的多个文件的名称时,只需使用逗号分隔符输入所有文件名即可创建单个 RDD。

When you know the names of the multiple files you would like to read, just input all file names with comma separator in order to create a single RDD.

object TextfileDemo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("textFile")
      .master("local")
      .getOrCreate()

    val RDD: RDD[String] = spark.sparkContext.textFile("InData/SparkScalaExampleData/files/text01.txt,InData/SparkScalaExampleData/files/text02.txt")
    RDD.foreach(println)
    /*
    * One,1
      Two,2
    * */

  }

}

4 读取与文件名匹配的所有文本文件到单个 RDD

textFile() 方法还接受模式匹配和通配符。 例如,下面的代码片段读取所有以文本开头并带有扩展名“.txt”的文件并创建单个 RDD。

textFile() method also accepts pattern matching and wild characters. For example below snippet read all files start with text and with the extension “.txt” and creates single RDD.

object TextfileDemo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("textFile")
      .master("local")
      .getOrCreate()

    val RDD: RDD[String] = spark.sparkContext.textFile("InData/SparkScalaExampleData/files/text*.txt")
    RDD.foreach(println)
    
    /*
    * One,1
      Two,2
      Three,3
      Four,4
    * */
    

  }

}

5 将多个目录中的文件读入单个RDD

textFile还支持读取文件和多个目录组合。

It also supports reading files and multiple directories combination.

object TextfileDemo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("textFile")
      .master("local")
      .getOrCreate()

    val RDD: RDD[String] = spark.sparkContext.textFile("InData/SparkScalaExampleData/files/text01.txt," +
      "InData/SparkScalaExampleData/files/dir1/*,InData/SparkScalaExampleData/files01/*")
    RDD.foreach(println)

    /*
    * One,1
      java,1
      Four,1
    * */


  }

}

6 将嵌套目录中的文本文件读入单个 RDD

textFile() 和 wholeTextFile() 在找到嵌套文件夹时返回错误,因此首先使用 scala、Java、Python 语言通过遍历所有嵌套文件夹并使用逗号分隔符传递所有文件名来创建文件路径列表,以便创建单个 RDD。

textFile() and wholeTextFile() returns an error when it finds a nested folder hence, first using scala, Java, Python languages create a file path list by traversing all nested folders and pass all file names with comma separator in order to create a single RDD.

7 分别读取所有文本文件并合并创建单个 RDD

您还可以将所有文本文件读入单独的 RDD,然后将所有这些使用union函数合并为单个 RDD。

You can also read all text files into a separate RDD’s and union all these to create a single RDD.

8 将多个CSV文件读入RDD

Spark RDD 没有读取 csv 文件格式的方法,因此我们将使用 textFile() 方法像任何其他文本文件一样将 csv 文件读取到 RDD 中,并根据逗号、管道或任何其他分隔符分割记录。

Spark RDD’s doesn’t have a method to read csv file formats hence we will use textFile() method to read csv file like any other text file into RDD and split the record based on comma, pipe or any other delimiter.

object TextfileDemo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("textFile")
      .master("local")
      .getOrCreate()

    val RDD: RDD[String] = spark.sparkContext.textFile("InData/SparkScalaExampleData/CSVData/*")
    RDD.foreach(println)
    val RDDCSV: RDD[Array[String]] = RDD.map( f => {f.split(",")})

    RDDCSV.foreach( f => {
      println("col1:" + f(0) + ",Col2:"+f(1))
    })

    /*
    * java,90
      python,79
      C++,67
      Scala,23
      col1:java,Col2:90
      col1:python,Col2:79
      col1:C++,Col2:67
      col1:Scala,Col2:23
    * */


  }

}

Here, we read all csv files in a directory into RDD, we apply map transformation to split the record on comma delimiter and a map returns another RDD “rdd6” after transformation. finally, we iterate rdd6, reads the column based on an index.
在这里,我们将一个目录中的所有 csv 文件读入 RDD,我们应用 map 转换来分割逗号分隔符上的记录,并且 map 转换后返回另一个 RDD “rdd6”。 最后,我们迭代rdd6,根据索引读取列。