-->

Apache Spark RDD编程指南

2020-09-24 00:50发布

 RDD 是Apache Spark编程非常重要的一个特性。Spark使用Scala语言编写并支持Java和Python。

 

目录

总览

与Spark链接

Scala语言

Java语言

Python语言

 初始化Spark

Scala语言

Java语言

Python语言

使用Shell

Scala语言

Python语言

弹性分布式数据集(RDD)

并行集合

Scala语言

Java语言

Python语言

外部数据集

Scala语言

Java语言

Python语言

RDD操作

基本

Scala语言

Java语言

Python语言

将函数传递给Spark

Scala语言

Java语言

Python语言

了解闭包 

Scala语言

Java语言

Python语言

本地与集群模式

RDD的打印元素

使用键值对

Scala语言

Java语言

Python语言

转变

动作

随机操作

背景

绩效影响

RDD持久性

选择哪个存储级别?

删除资料

共享变量

广播变量

Scala语言

Java语言

Python语言

蓄能器

Scala语言

Java语言

Python语言

部署到集群

从Java / Scala启动Spark作业

单元测试

从这可去


总览

在较高级别上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的main功能并在集群上执行各种并行操作。Spark提供的主要抽象是弹性分布式数据集(RDD),它是跨集群节点划分的元素的集合,可以并行操作。通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或驱动程序中现有的Scala集合开始并进行转换来创建RDD。用户还可以要求Spark将RDD 保留在内存中,以使其能够在并行操作中有效地重用。最后,RDD会自动从节点故障中恢复。

Spark中的第二个抽象是可以在并行操作中使用的共享变量。默认情况下,当Spark作为一组任务在不同节点上并行运行一个函数时,它会将函数中使用的每个变量的副本传送给每个任务。有时,需要在任务之间或任务与驱动程序之间共享变量。Spark支持两种类型的共享变量:广播变量(可用于在所有节点上的内存中缓存值)和累加器(accumulator),这些变量仅被“添加”到其上,例如计数器和总和。

本指南以Spark的每种受支持语言显示了所有这些功能。如果启动Spark的交互式shell,无论bin/spark-shell是Scala Shell还是bin/pyspark Python Shell, 最简单的方法就是跟随它。

与Spark链接

Scala语言

默认情况下,Spark 3.0.0已构建并分发为与Scala 2.12一起使用。(可以将Spark构建为与其他版本的Scala一起使用。)要在Scala中编写应用程序,您将需要使用兼容的Scala版本(例如2.12.X)。

要编写Spark应用程序,您需要在Spark上添加Maven依赖项。可通过Maven Central在以下位置获得Spark:

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.0.0

另外,如果您想访问HDFS群集,则需要hadoop-client为您的HDFS版本添加依赖项 。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,您需要将一些Spark类导入程序。添加以下行:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在Spark 1.3.0之前,您需要显式import org.apache.spark.SparkContext._启用必要的隐式转换。)

Java语言

Spark 3.0.0支持 lambda表达式 以简洁地编写函数,否则,您可以使用org.apache.spark.api.java.function包中的类 。

请注意,在Spark 2.2.0中已删除了对Java 7的支持。

要使用Java编写Spark应用程序,您需要添加对Spark的依赖。可通过Maven Central在以下位置获得Spark:

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.0.0

另外,如果您想访问HDFS群集,则需要hadoop-client为您的HDFS版本添加依赖项 。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,您需要将一些Spark类导入程序。添加以下行:

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

Python语言

Spark 3.0.0适用于Python 2.7+或Python 3.4+。它可以使用标准的CPython解释器,因此可以使用像NumPy这样的C库。它还适用于PyPy 2.3+。

请注意,自Spark 3.0.0起不推荐使用Python 2。

Python中的Spark应用程序既可以在运行时使用bin/spark-submit包含Spark 的脚本运行,也可以通过在setup.py中包含以下脚本来运行:

    install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

要在Python中运行Spark应用程序而无需pip安装PySpark,请使用bin/spark-submitSpark目录中的脚本。该脚本将加载Spark的Java / Scala库,并允许您将应用程序提交到集群。您还可以bin/pyspark用来启动交互式Python Shell。

如果您想访问HDFS数据,则需要使用PySpark的构建链接到您的HDFS版本。 对于常用的HDFS版本,Spark主页上也提供了预构建的软件包

最后,您需要将一些Spark类导入程序。添加以下行:

from pyspark import SparkContext, SparkConf

PySpark在驱动程序和工作程序中都需要相同的次要版本的Python。它在PATH中使用默认的python版本,您可以通过指定要使用的Python版本PYSPARK_PYTHON,例如:

$ PYSPARK_PYTHON=python3.4 bin/pyspark
$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

 初始化Spark

Scala语言

Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建一个,SparkContext您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。

每个JVM仅应激活一个SparkContext。stop()在创建新的SparkContext之前,您必须先激活它。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName参数是您的应用程序显示在集群UI上的名称。 masterSpark,Mesos或YARN群集URL或特殊的“本地”字符串,以本地模式运行。实际上,当在集群上运行时,您将不希望master在程序中进行硬编码,而是在其中启动应用程序spark-submit并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark。

Java语言

Spark程序必须做的第一件事是创建一个JavaSparkContext对象,该对象告诉Spark如何访问集群。要创建一个,SparkContext您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appName参数是您的应用程序显示在集群UI上的名称。 masterSpark,Mesos或YARN群集URL或特殊的“本地”字符串,以本地模式运行。实际上,当在集群上运行时,您将不希望master在程序中进行硬编码,而是在其中启动应用程序spark-submit并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark。

Python语言

Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建一个,SparkContext您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName参数是您的应用程序显示在集群UI上的名称。 masterSpark,Mesos或YARN群集URL或特殊的“本地”字符串,以本地模式运行。实际上,当在集群上运行时,您将不希望master在程序中进行硬编码,而是在其中启动应用程序spark-submit并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark。

使用Shell

Scala语言

在Spark Shell中,已经在名为的变量中为您创建了一个特殊的可识别解释器的SparkContext sc。制作自己的SparkContext无效。您可以使用--master参数设置上下文连接到哪个主机,还可以通过将逗号分隔的列表传递给参数来将JAR添加到类路径--jars。您还可以通过在--packages参数中提供逗号分隔的Maven坐标列表,从而将依赖项(例如Spark Packages)添加到Shell会话中。可能存在依赖关系的任何其他存储库(例如Sonatype)都可以传递给--repositories参数。例如,要bin/spark-shell在四个核心上运行,请使用:

$ ./bin/spark-shell --master local[4]

或者,也可以添加code.jar到其类路径中,使用:

$ ./bin/spark-shell --master local[4] --jars code.jar

要使用Maven坐标包含依赖项,请执行以下操作:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

有关选项的完整列表,请运行spark-shell --help。在幕后, spark-shell调用更通用的spark-submit脚本

Python语言

在PySpark shell中,已经在名为的变量中为您创建了一个特殊的可识别解释器的SparkContext sc。制作自己的SparkContext无效。您可以使用--master参数设置上下文连接的主机,也可以通过将逗号分隔的列表传递到来将Python .zip,.egg或.py文件添加到运行时路径--py-files。您还可以通过在--packages参数中提供逗号分隔的Maven坐标列表,从而将依赖项(例如Spark Packages)添加到Shell会话中。可能存在依赖关系的任何其他存储库(例如Sonatype)都可以传递给--repositories参数。Spark软件包具有的所有Python依赖项(在该软件包的requirements.txt中列出)都必须pip在必要时使用手动安装。例如运行bin/pyspark 在四个核心上使用:

$ ./bin/pyspark --master local[4]

或者,也可以添加code.py到搜索路径中(以便以后import code使用),请使用:

$ ./bin/pyspark --master local[4] --py-files code.py

有关选项的完整列表,请运行pyspark --help。在幕后, pyspark调用更通用的spark-submit脚本

也可以在增强的Python解释器IPython中启动PySpark shell 。PySpark可与IPython 1.0.0及更高版本一起使用。要使用IPython,请在运行时将PYSPARK_DRIVER_PYTHON变量设置为:ipythonbin/pyspark

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

要使用Jupyter笔记本(以前称为IPython笔记本),

$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

您可以通过设置自定义ipythonjupyter命令PYSPARK_DRIVER_PYTHON_OPTS

启动Jupyter Notebook服务器后,您可以从“文件”选项卡创建一个新的“ Python 2”笔记本。在笔记本内部,您可以在%pylab inline从Jupyter笔记本开始尝试Spark之前输入命令作为笔记本的一部分。

弹性分布式数据集(RDD)

Spark围绕弹性分布式数据集(RDD)的概念展开,RDD是可并行操作的元素的容错集合。创建RDD的方法有两种:并行化 驱动程序中的现有集合,或引用外部存储系统(例如共享文件系统,HDFS,HBase或提供Hadoop InputFormat的任何数据源)中的数据集。

并行集合

Scala语言

通过在驱动程序(Scala )中的现有集合上调用SparkContextparallelize方法来创建并行集合Seq。复制集合的元素以形成可以并行操作的分布式数据集。例如,以下是创建包含数字1到5的并行化集合的方法:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

创建后,分布式数据集(distData)可以并行操作。例如,我们可能会调用distData.reduce((a, b) => a + b)以添加数组的元素。我们稍后将描述对分布式数据集的操作。

并行集合的一个重要参数是将数据集切入的分区数。Spark将为集群的每个分区运行一个任务。通常,群集中的每个CPU都需要2-4个分区。通常,Spark会尝试根据您的集群自动设置分区数。但是,您也可以通过将其作为第二个参数传递给parallelize(例如sc.parallelize(data, 10))来手动设置它。注意:代码中的某些位置使用术语片(分区的同义词)来保持向后兼容性。

Java语言

通过在驱动程序中现有的上调用JavaSparkContextparallelize方法来创建并行集合Collection。复制集合的元素以形成可以并行操作的分布式数据集。例如,以下是创建包含数字1到5的并行化集合的方法:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

创建后,分布式数据集(distData)可以并行操作。例如,我们可能会调用distData.reduce((a, b) -> a + b)以添加列表中的元素。我们稍后将描述对分布式数据集的操作。

并行集合的一个重要参数是将数据集切入的分区数。Spark将为集群的每个分区运行一个任务。通常,群集中的每个CPU都需要2-4个分区。通常,Spark会尝试根据您的集群自动设置分区数。但是,您也可以通过将其作为第二个参数传递给parallelize(例如sc.parallelize(data, 10))来手动设置它。注意:代码中的某些位置使用术语片(分区的同义词)来保持向后兼容性。

Python语言

通过在驱动程序中现有的可迭代对象或集合上调用SparkContextparallelize方法来创建并行集合。复制集合的元素以形成可以并行操作的分布式数据集。例如,以下是创建包含数字1到5的并行化集合的方法:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

创建后,分布式数据集(distData)可以并行操作。例如,我们可以调用distData.reduce(lambda a, b: a + b)添加列表中的元素。我们稍后将描述对分布式数据集的操作。

并行集合的一个重要参数是将数据集切入的分区数。Spark将为集群的每个分区运行一个任务。通常,群集中的每个CPU都需要2-4个分区。通常,Spark会尝试根据您的集群自动设置分区数。但是,您也可以通过将其作为第二个参数传递给parallelize(例如sc.parallelize(data, 10))来手动设置它。注意:代码中的某些位置使用术语片(分区的同义词)来保持向后兼容性。

外部数据集

Scala语言

Spark可以从Hadoop支持的任何存储源创建分布式数据集,包括您的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat

可以使用SparkContexttextFile方法创建文本文件RDD 。此方法需要一个URI的文件(本地路径的机器上,或一个hdfs://s3a://等URI),并读取其作为行的集合。这是一个示例调用:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

一旦创建,distFile就可以通过数据集操作对其进行操作。例如,我们可以使用mapreduce操作将所有行的大小相加,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)

关于使用Spark读取文件的一些注意事项:

  • 如果在本地文件系统上使用路径,则还必须在工作节点上的相同路径上访问该文件。将文件复制给所有工作人员,或者使用网络安装的共享文件系统。

  • Spark的所有基于文件的输入法(包括textFile)都支持在目录,压缩文件和通配符上运行。例如,你可以使用textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • textFile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(HDFS中的块默认为128MB),但是您也可以通过传递更大的值来请求更大数量的分区。请注意,分区不能少于块。

除了文本文件,Spark的Scala API还支持其他几种数据格式:

  • SparkContext.wholeTextFiles使您可以读取包含多个小文本文件的目录,并将每个小文本文件作为(文件名,内容)对返回。与相比textFile,会在每个文件的每一行返回一条记录。分区由数据局部性决定,在某些情况下,数据局部性可能导致分区太少。对于那些情况,wholeTextFiles提供一个可选的第二个参数来控制最小数量的分区。

  • 对于SequenceFiles,请使用SparkContext的sequenceFile[K, V]方法,其中KV是文件中键和值的类型。这些应该是Hadoop的Writable接口的子类,例如IntWritableText。此外,Spark允许您为一些常见的可写对象指定本机类型。例如,sequenceFile[Int, String]将自动读取IntWritables和Texts。

  • 对于其他Hadoop InputFormat,可以使用该SparkContext.hadoopRDD方法,该方法采用任意JobConf输入格式类,键类和值类。以与使用输入源进行Hadoop作业相同的方式设置这些内容。您还可以SparkContext.newAPIHadoopRDD基于“新” MapReduce API(org.apache.hadoop.mapreduce)将其用于InputFormats 。

  • RDD.saveAsObjectFileSparkContext.objectFile支持以包含序列化Java对象的简单格式保存RDD。尽管它不如Avro这样的专用格式有效,但它提供了一种保存任何RDD的简便方法。

Java语言

Spark可以从Hadoop支持的任何存储源创建分布式数据集,包括您的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat

可以使用SparkContexttextFile方法创建文本文件RDD 。此方法需要一个URI的文件(本地路径的机器上,或一个hdfs://s3a://等URI),并读取其作为行的集合。这是一个示例调用:

JavaRDD<String> distFile = sc.textFile("data.txt");

一旦创建,distFile就可以通过数据集操作对其进行操作。例如,我们可以使用mapreduce操作将所有行的大小相加,如下所示:distFile.map(s -> s.length()).reduce((a, b) -> a + b)

关于使用Spark读取文件的一些注意事项:

  • 如果在本地文件系统上使用路径,则还必须在工作节点上的相同路径上访问该文件。将文件复制给所有工作人员,或者使用网络安装的共享文件系统。

  • Spark的所有基于文件的输入法(包括textFile)都支持在目录,压缩文件和通配符上运行。例如,你可以使用textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • textFile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(HDFS中的块默认为128MB),但是您也可以通过传递更大的值来请求更大数量的分区。请注意,分区不能少于块。

除了文本文件,Spark的Java API还支持其他几种数据格式:

  • JavaSparkContext.wholeTextFiles使您可以读取包含多个小文本文件的目录,并将每个小文本文件作为(文件名,内容)对返回。与相比textFile,会在每个文件的每一行返回一条记录。

  • 对于SequenceFiles,请使用SparkContext的sequenceFile[K, V]方法,其中KV是文件中键和值的类型。这些应该是Hadoop的Writable接口的子类,例如IntWritableText

  • 对于其他Hadoop InputFormat,可以使用该JavaSparkContext.hadoopRDD方法,该方法采用任意JobConf输入格式类,键类和值类。以与使用输入源进行Hadoop作业相同的方式设置这些内容。您还可以JavaSparkContext.newAPIHadoopRDD基于“新” MapReduce API(org.apache.hadoop.mapreduce)将其用于InputFormats 。

  • JavaRDD.saveAsObjectFileJavaSparkContext.objectFile支持以包含序列化Java对象的简单格式保存RDD。尽管它不如Avro这样的专用格式有效,但它提供了一种保存任何RDD的简便方法。

Python语言

PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括您的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat

可以使用SparkContexttextFile方法创建文本文件RDD 。此方法需要一个URI的文件(本地路径的机器上,或一个hdfs://s3a://等URI),并读取其作为行的集合。这是一个示例调用:

>>> distFile = sc.textFile("data.txt")

一旦创建,distFile就可以通过数据集操作对其进行操作。例如,我们可以使用mapreduce操作将所有行的大小相加,如下所示:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

关于使用Spark读取文件的一些注意事项:

  • 如果在本地文件系统上使用路径,则还必须在工作节点上的相同路径上访问该文件。将文件复制给所有工作人员,或者使用网络安装的共享文件系统。

  • Spark的所有基于文件的输入法(包括textFile)都支持在目录,压缩文件和通配符上运行。例如,你可以使用textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • textFile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(HDFS中的块默认为128MB),但是您也可以通过传递更大的值来请求更大数量的分区。请注意,分区不能少于块。

除文本文件外,Spark的Python API还支持其他几种数据格式:

  • SparkContext.wholeTextFiles使您可以读取包含多个小文本文件的目录,并将每个小文本文件作为(文件名,内容)对返回。与相比textFile,会在每个文件的每一行返回一条记录。

  • RDD.saveAsPickleFileSparkContext.pickleFile支持将RDD保存为由腌制的Python对象组成的简单格式。批处理用于咸菜序列化,默认批处理大小为10。

  • SequenceFile和Hadoop输入/输出格式

请注意,此功能当前已标记Experimental,仅供高级用户使用。将来可能会替换为基于Spark SQL的读/写支持,在这种情况下,Spark SQL是首选方法。

可写支持

PySpark SequenceFile支持在Java内加载键-值对的RDD,将Writables转换为基本Java类型,并使用Pyrolite腌制所得的Java对象。将键/值对的RDD保存到SequenceFile时,PySpark会执行相反的操作。它将Python对象分解为Java对象,然后将它们转换为Writables。以下可写对象将自动转换:

可写类型 Python类型
文本 unicode str
可写 整型
浮动可写 浮动
双写 浮动
布尔可写 布尔
字节可写 字节数组
空可写 没有
MapWritable 字典

数组不是开箱即用的。用户ArrayWritable在读取或写入时需要指定自定义子类型。编写时,用户还需要指定将数组转换为自定义ArrayWritable子类型的自定义转换器。阅读时,默认转换器会将自定义ArrayWritable子类型转换为Java Object[],然后将其腌制为Python元组。要获取array.array用于基本类型数组的Python ,用户需要指定自定义转换器。

保存和加载SequenceFile

与文本文件类似,可以通过指定路径来保存和加载SequenceFiles。可以指定键和值类,但是对于标准可写对象则不需要。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

保存和加载其他Hadoop输入/输出格式

对于“新”和“旧” Hadoop MapReduce API,PySpark还可以读取任何Hadoop InputFormat或编写任何Hadoop OutputFormat。如果需要,可以将Hadoop配置作为Python字典传递。这是使用Elasticsearch ESInputFormat的示例:

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
                             "org.apache.hadoop.io.NullWritable",
                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",
                             conf=conf)
>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

请注意,如果InputFormat仅取决于Hadoop配置和/或输入路径,并且可以根据上表轻松地转换键和值类,则这种方法在这种情况下应该能很好地工作。

如果您具有自定义的序列化二进制数据(例如从Cassandra / HBase加载数据),则首先需要将Scala / Java端上的数据转换为可由Pyrolite的picker处理的数据。一个转换器特性提供了这一点。只需扩展此特征并在该convert 方法中实现您的转换代码即可。请记住,确保将此类以及访问所需的任何依赖项InputFormat打包到Spark作业jar中,并包含在PySpark类路径中。

有关 使用Cassandra / HBase 和自定义转换器的示例,请参见Python示例Converter示例InputFormatOutputFormat

RDD操作

RDD支持两种类型的操作:转换(从现有操作创建新的数据集)和动作(操作),在对数据集执行计算后,将值返回给驱动程序。例如,map是一个转换,该转换将每个数据集元素都传递给一个函数,并返回代表结果的新RDD。另一方面,这reduce是一个使用某些函数聚合RDD的所有元素并将最终结果返回给驱动程序的操作(尽管也有并行操作reduceByKey返回了分布式数据集)。

Spark中的所有转换都是惰性的,因为它们不会立即计算出结果。相反,他们只记得应用于某些基本数据集(例如文件)的转换。仅当动作要求将结果返回给驱动程序时才计算转换。这种设计使Spark可以更高效地运行。例如,我们可以意识到,通过创建的数据集map将用于中,reduce并且仅将结果返回reduce给驱动程序,而不是将较大的映射数据集返回给驱动程序。

默认情况下,每次在其上执行操作时,都可能会重新计算每个转换后的RDD。但是,您也可以使用(或)方法将RDD 保留在内存中,在这种情况下,Spark会将元素保留在群集中,以便下次查询时可以更快地进行访问。还支持将RDD持久保存在磁盘上,或在多个节点之间复制。persistcache

基本

Scala语言

为了说明RDD基础知识,请考虑以下简单程序:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行从外部文件定义基本RDD。该数据集未加载到内存中或没有采取其他行动:lines仅是文件的指针。第二行定义lineLengthsmap转换的结果。再次,lineLengths 是不是马上计算,由于懒惰。最后,我们运行reduce,这是一个动作。此时,Spark将计算分解为任务,以在不同的机器上运行,每台机器既运行其映射的一部分,又运行本地还原,仅将其答案返回给驱动程序。

如果我们以后还要使用lineLengths,可以添加:

lineLengths.persist()

在之前reduce,这将导致lineLengths在第一次计算后将其保存在内存中。

Java语言

为了说明RDD基础知识,请考虑以下简单程序:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行从外部文件定义基本RDD。该数据集未加载到内存中或没有采取其他行动:lines仅是文件的指针。第二行定义lineLengthsmap转换的结果。再次,lineLengths 是不是马上计算,由于懒惰。最后,我们运行reduce,这是一个动作。此时,Spark将计算分解为任务,以在不同的机器上运行,每台机器既运行其映射的一部分,又运行本地还原,仅将其答案返回给驱动程序。

如果我们以后还要使用lineLengths,可以添加:

lineLengths.persist(StorageLevel.MEMORY_ONLY());

在之前reduce,这将导致lineLengths在第一次计算后将其保存在内存中

Python语言

为了说明RDD基础知识,请考虑以下简单程序:

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

第一行从外部文件定义基本RDD。该数据集未加载到内存中或没有采取其他行动:lines仅是文件的指针。第二行定义lineLengthsmap转换的结果。再次,lineLengths 是不是马上计算,由于懒惰。最后,我们运行reduce,这是一个动作。此时,Spark将计算分解为任务,以在不同的机器上运行,每台机器既运行其映射的一部分,又运行本地还原,仅将其答案返回给驱动程序。

如果我们以后还要使用lineLengths,可以添加:

lineLengths.persist()

在之前reduce,这将导致lineLengths在第一次计算后将其保存在内存中。

将函数传递给Spark

Scala语言

Spark的API在很大程度上依赖于在驱动程序中传递函数以在群集上运行。有两种推荐的方法可以做到这一点:

  • 匿名函数语法,可用于简短的代码段。
  • 全局单例对象中的静态方法。例如,您可以如下定义object MyFunctions并传递MyFunctions.func1
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

请注意,虽然也可以在类实例中传递对方法的引用(与单例对象相对),但这需要将包含该类的对象与方法一起发送。例如,考虑:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

在这里,如果我们创建一个新的MyClass实例,并调用doStuff就可以了,map里面有引用的 func1方法是的MyClass实例,所以整个对象需要被发送到群集。它类似于写作rdd.map(x => this.func1(x))

以类似的方式,访问外部对象的字段将引用整个对象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

等价于写作rdd.map(x => this.field + x),它引用了所有内容this。为避免此问题,最简单的方法是将其复制field到局部变量中,而不是从外部访问它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

Java语言

Spark的API在很大程度上依赖于在驱动程序中传递函数以在群集上运行。在Java中,功能由实现org.apache.spark.api.java.function包中的接口的类表示 。有两种创建此类功能的方法:

  • 在您自己的类中(作为匿名内部类或命名类)实现Function接口,并将其实例传递给Spark。
  • 使用lambda表达式 来简洁地定义一个实现。

尽管本指南中的大部分内容都使用lambda语法来简化,但以长格式使用所有相同的API还是很容易的。例如,我们可以将上面的代码编写如下:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

或者,如果内联编写函数很麻烦:

class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

请注意,Java中的匿名内部类也可以访问封闭范围内的变量,只要它们被标记即可final。与其他语言一样,Spark会将这些变量的副本发送到每个工作程序节点。

Python语言

Spark的API在很大程度上依赖于在驱动程序中传递函数以在群集上运行。建议使用三种方法来执行此操作:

  • Lambda表达式,用于可以作为表达式编写的简单函数。(Lambda不支持多语句函数或不返回值的语句。)
  • def函数内部的Local 调用Spark,以获取更长的代码。
  • 模块中的顶级功能。

例如,要传递比使用所支持的函数更长的函数lambda,请考虑以下代码:

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

请注意,虽然也可以在类实例中传递对方法的引用(与单例对象相对),但这需要将包含该类的对象与方法一起发送。例如,考虑:

class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

在这里,如果我们创建了一个new MyClass和调用doStuff就可以了,map里面有引用的 func方法是的MyClass实例,所以整个对象需要被发送到群集。

以类似的方式,访问外部对象的字段将引用整个对象:

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

为避免此问题,最简单的方法是将其复制field到局部变量中,而不是从外部访问它:

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

了解闭包 

关于Spark的难点之一是在跨集群执行代码时了解变量和方法的范围和生命周期。修改超出其范围的变量的RDD操作可能经常引起混乱。在下面的示例中,我们将查看foreach()用于增加计数器的代码,但是其他操作也会发生类似的问题。

考虑下面的朴素的RDD元素总和,其行为可能会有所不同,具体取决于执行是否在同一JVM中进行。一个常见的示例是在local模式(--master = local[n])中运行Spark 而不是将Spark应用程序部署到集群(例如,通过将spark-submit提交给YARN)时

Scala语言

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

Java语言

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

Python语言

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

本地与集群模式

上面的代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark将RDD操作的处理分解为任务,每个任务都由执行程序执行。在执行之前,Spark计算任务的闭包。闭包是执行者在RDD上执行其计算时必须可见的那些变量和方法(在本例中为foreach())。此闭包被序列化并发送给每个执行器。

发送给每个执行程序的闭包中的变量现在是副本,因此,在函数中引用计数器foreach,它不再是驱动程序节点上的计数器。驱动程序节点的内存中仍然存在一个计数器,但是执行者将不再看到该计数器!执行者仅从序列化闭包中看到副本。因此,由于对计数器的所有操作都引用了序列化闭包中的值,所以计数器的最终值仍将为零。

在本地模式下,在某些情况下,该foreach函数实际上将在与驱动程序相同的JVM中执行,并且将引用相同的原始计数器,并且实际上可能会对其进行更新。

为确保在此类情况下行为明确,应使用Accumulator。Spark中的累加器专门用于提供一种机制,用于在集群中的各个工作节点之间拆分执行时安全地更新变量。本指南的“累加器”部分将详细讨论这些内容。

通常,闭包-像循环或局部定义的方法之类的构造,不应用于突变某些全局状态。Spark不定义或保证从闭包外部引用的对象的突变行为。某些执行此操作的代码可能会在本地模式下工作,但这只是偶然的情况,此类代码在分布式模式下将无法正常运行。如果需要某些全局聚合,请使用累加器。

RDD的打印元素

另一个常见用法是尝试使用rdd.foreach(println)或打印RDD的元素rdd.map(println)。在单台机器上,这将生成预期的输出并打印所有RDD的元素。但是,在cluster模式下,stdout执行程序要调用的输出现在正在写入执行程序的输出,stdout而不是驱动程序上的那个,因此stdout驱动程序不会显示这些信息!要打印在驱动器的所有元素,可以使用的collect()方法,首先使RDD到驱动器节点从而:rdd.collect().foreach(println)。但是,这可能会导致驱动程序用尽内存,因为collect()会将整个RDD提取到一台计算机上。如果您只需要打印RDD的一些元素,则更安全的方法是使用take()rdd.take(100).foreach(println)

使用键值对

Scala语言

虽然大多数Spark操作可在包含任何类型的对象的RDD上运行,但一些特殊操作仅可用于键-值对的RDD。最常见的是分布式“混洗”操作,例如通过键对元素进行分组或聚合。

在Scala中,这些操作在包含Tuple2对象(该语言的内置元组,只需编写即可创建(a, b))的RDD上自动可用 。PairRDDFunctions类中提供键值对操作, 该类会自动包装RDD元组。

例如,以下代码reduceByKey对键值对使用运算来计算文件中每一行文本出现的次数:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

counts.sortByKey()例如,我们还可以使用按字母顺序对,最后 counts.collect()将它们作为对象数组带回到驱动程序中。

注意:在键-值对操作中使用自定义对象作为键时,必须确保自定义equals()方法与匹配hashCode()方法一起使用。有关完整的详细信息,请参见Object.hashCode()文档中概述的合同。

Java语言

虽然大多数Spark操作可在包含任何类型的对象的RDD上运行,但一些特殊操作仅可用于键-值对的RDD。最常见的是分布式“混洗”操作,例如通过键对元素进行分组或聚合。

在Java中,键值对使用Scala标准库中的scala.Tuple2类表示 。您可以简单地调用new Tuple2(a, b)创建一个元组,稍后使用tuple._1()和来访问其字段tuple._2()

键值对的RDDJavaPairRDD类表示 。您可以使用特殊的map操作版本(例如 mapToPair和)从JavaRDD构造JavaPairRDD flatMapToPair。JavaPairRDD将同时具有标准的RDD功能和特殊的键值功能。

例如,以下代码reduceByKey对键值对使用运算来计算文件中每一行文本出现的次数:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

counts.sortByKey()例如,我们还可以使用按字母顺序对,最后 counts.collect()将它们作为对象数组带回到驱动程序中。

注意:在键-值对操作中使用自定义对象作为键时,必须确保自定义equals()方法与匹配hashCode()方法一起使用。有关完整的详细信息,请参见Object.ha

标签: