Spark RDD API详解(一) Map和Reduce
原始链接:https://www.zybuluo.com/jewes/note/35032
RDD是什么?A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable(不可改变的), partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as map, filter, and persist. In addition, org.apache.spark.rdd.PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join;org.apache.spark.rdd.DoubleRDDFunctions contains operations available only on RDDs of Doubles; and org.apache.spark.rdd.SequenceFileRDDFunctions contains operations available on RDDs that can be saved as SequenceFiles. All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit.
Internally, each RDD is characterized by five main properties:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions. Please refer to the Spark paper for more details on RDD internals.
RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API。
如何创建RDD?RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。
举例:从普通数组创建RDD,里面包含了1到9这9个数字,它们分别在3个分区中。
scala> val a = sc.parallelize(1 to 9, 3) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12
举例:读取文件README.md来创建RDD,文件中的每一行就是RDD中的一个元素
scala> val b = sc.textFile("README.md") b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12
虽然还有别的方式可以创建RDD,但在本文中我们主要使用上述两种方式来创建RDD以说明RDD的API。
mapmap是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
举例:
scala> val a = sc.parallelize(1 to 9, 3) scala> val b = a.map(x => x*2) scala> a.collect res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) scala> b.collect res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。
mapPartitionsmapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
它的函数定义为:
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。
举例:
scala> val a = sc.parallelize(1 to 9, 3) scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } scala> a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。
mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。
温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/68681.html