方法
基于键的缩减操作聚合每个 RDD 键的值。这种类型的操作只能在类型为 的 RDD 上执行JavaPairRDD,该类型的 RDD 中的元素是键值元组。请注意,与 Java Map 对象不同,键没有唯一约束。RDD 中可能存在多个具有相同键的元组。

缩减操作将对同一键的值执行,直到每个键只有一个值。生成的 RDD 将是一个真正的键值映射,具有唯一的键。

让我们以下面的RDD为例(键,值):

(A, 3)
(A, 5)
(B, 2)
(A, 4)
(B, 7)
在应用键归约、求和值后,我们得到:

(A, 12)
(B, 9)
RDD 包含与原始 RDD 中的不同键一样多的元组。

样本数据
我们将使用维基百科流量指标作为数据集。这些文件是公开且免费的。每小时生成一个文件。每个文件解压缩后大约 300 MB。

这是一个示例文件提取:

fr Louvoy%C3%A8rent 1 7766
fr Louvoyer 2 24276
fr Louvre-Lens 1 39497
fr Louvres 1 36541
fr Louxor 2 33183
每行代表一个包含 4 个空格分隔字段的记录:

维基百科项目的名称:国家代码,有时后跟一个后缀,表明该站点是维基百科、维基教科书、维基词典等。
URL 编码的页面标题
收到的请求数
页面的大小(以字节为单位)
让我们来练习
从维基百科的统计数据中,我们可以计算出每个项目的访问量。与全局聚合不同,我们要获取的是一个key-value列表:项目代码-总访问次数。

让我们首先按行读取文件并拆分每条记录:

sc.textFile(“data/wikipedia-pagecounts/pagecounts-20141101-000000″)
.map(line -> line.split(” “))
我们获得一个JavaRDD<String[]>.

我们可以通过运算将其转化JavaRDD为一个。然后我们必须返回类型的对象:JavaPairRDDmapToPair()Tuple2

.mapToPair(s -> new Tuple2(s[0], Long.parseLong(s[2])))
JavaPairRDD 提供了转换,使我们能够在本机上处理此键值集合:reduceByKey(), sortByKey(),以及可以组合两个JavaPairRDDs 的函数:join(),intersect()等。

例如,我们将使用reduceByKey()带有求和运算的函数。返回值将属于同一个键,而我们不知道该键:

.reduceByKey((x, y) -> x + y)
最后,我们可以将所有元组输出到控制台。元组键由 field 表示_1,而值由 field 表示_2。

.foreach(t -> System.out.println(t._1 + ” -> ” + t._2))
整个代码是:

SparkConf conf = new SparkConf().setAppName(“wikipedia-mapreduce-by-key”)
.setMaster(“local”);
JavaSparkContext sc = new JavaSparkContext(conf);
sc.textFile(“data/wikipedia-pagecounts/pagecounts-20141101-000000″)
.map(line -> line.split(” “))
.mapToPair(s -> new Tuple2(s[0], Long.parseLong(s[2])))
.reduceByKey((x, y) -> x + y)
.foreach(t -> System.out.println(t._1 + ” -> ” + t._2));
执行此操作时,我们会得到类似于以下的结果:

got.mw -> 14
mo.d -> 1
eo.q -> 38
fr -> 602583
ja.n -> 167
mus -> 21
xal -> 214
Wikipedia France 的值 3269849 是文件中标记为“fr”的访问总和。

按键排序结果
我们注意到结果没有排序。事实上,出于性能原因,Apache Spark 不保证 RDD 中的顺序:元组是独立处理的。

我们可以使用 sortByKey() 方法按键对元组进行排序,该方法将布尔值作为输入来反转排序:

.sortByKey()
结果是:

AR -> 195
De -> 115
EN -> 4
En -> 10
En.d -> 8
FR -> 1
It -> 2
SQ.mw -> 11
Simple -> 1
aa -> 27
aa.b -> 6
aa.d -> 1
aa.mw -> 11

排序区分大小写。要执行不区分大小写的排序,我们可以使用比较器。不幸的是,我们不能使用来自 Comparator.comparing()(Java 8 中的新功能)的比较器,因为返回的比较器不可序列化。

// produces an exception:
// Task not serializable: java.io.NotSerializableException
.sortByKey(Comparator.comparing(String::toLowerCase))
然后我们必须使用一个实现 Serializable 接口的比较器:

class LowerCaseStringComparator implements Comparator, Serializable {
@Override
public int compare(String s1, String s2) {
return s1.toLowerCase().compareTo(s2.toLowerCase());
}
}
然后我们以这种方式使用比较器:

Comparator c = new LowerCaseStringComparator(); … .sortByKey(c)

我们得到了我们想要的:
...
ang.q -> 15
ang.s -> 9
AR -> 195
ar -> 108324
ar.b -> 293
...

按值排序结果
JavaPairRDD 类有一个 sortByKey() 方法,但是没有 sortByValue() 方法。要按值排序,我们必须反转我们的元组,以便值成为键。

由于 JavaPairRDD 不强加唯一键,我们可以有冗余值。

我们使用 mapToPair() 反转元组:

.mapToPair(t -> new Tuple2(t._2, t._1))
然后我们可以按降序对 RDD 进行排序(最大值在前)并使用方法 take() 保存前 10 个元素:

.sortByKey(false).take(10)
请注意,take() 返回一个 Java 集合 (java.util.List),而不是 RDD。我们使用的 forEach() 方法是来自集合 API 的方法,而不是 RDD 的 forEach()。

.forEach(t -> System.out.println(t._2 + ” -> ” + t._1));
排序代码为:

.mapToPair(t -> new Tuple2(t._2, t._1))
.sortByKey(false) .take(10)
.forEach(t -> System.out.println(t._2 + ” -> ” + t._1));
然后,我们获得该小时内访问量最大的维基百科网站的前 10 名:

meta.m -> 15393394
meta.mw -> 12390990
en -> 7209054
en.mw -> 4405366
es -> 1210216
de -> 692501
ja.mw -> 674700
es.mw -> 666607
ru -> 664970
ja -> 637371
处理多个文件
由于 textFile() 方法参数中的通配符支持,可以处理多个文件。
我们可以通过 setMaster(“local[16]”) 告诉 Apache Spark 使用 16 个线程。

因此,11 月 1 日(24 个文件)的前 10 个计算变为:

SparkConf conf = new SparkConf()
.setAppName(“wikipedia-mapreduce-by-key”)
.setMaster(“local[16]”);
JavaSparkContext sc = new JavaSparkContext(conf);
sc.textFile(“data/wikipedia-pagecounts/pagecounts-20141101-*”)
.map(line -> line.split(” “))
.mapToPair(s -> new Tuple2(s[0], Long.parseLong(s[2])))
.reduceByKey((x, y) -> x + y)
.mapToPair(t -> new Tuple2(t._2, t._1))
.sortByKey(false) .take(10)
.forEach(t -> System.out.println(t._2 + ” -> ” + t._1));
我们得到这个结果:

meta.m -> 430508482
meta.mw -> 313310187
en -> 171157735
en.mw -> 103491223
ru -> 29955421
ja -> 22201812
ja.mw -> 22186094
es -> 21424067
de -> 21098513
fr -> 17967662
在 Macbook pro(SSD,Core i7 2GHz)上,处理 9 GB 数据需要 40 秒。不错!

结论

我们对 Apache Spark API 及其提供的内容有了很好的了解。键值元组的操作是框架中最常用的部分之一。
在接下来的文章中,我们将部署一个 Spark 集群并研究流式 API。

此代码可在Github上获得。