第 9 章 Spark技术
Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark最大的特点就是快(Lightning-Fast),可比 Hadoop MapReduce 的处理速度快 100 倍。此外,Spark 提供了简单易用的 API,几行代码就能实现 WordCount。本章介绍Spark 的框架,Spark Shell 、RDD、Spark SQL、Spark Streaming 等的基本使用。
9.1 Spark框架
Spark作为新一代大数据快速处理平台,集成了大数据相关的各种能力。Hadoop的中间数据需要存储在硬盘上,这产生了较高的延迟。而Spark基于内存计算,解决了这个延迟的速度问题。Spark本身可以直接读写Hadoop上任何格式数据,这使得批处理更加快速。
图9-1是以Spark为核心的大数据处理框架。最底层为大数据存储系统,如:HDFS、HBase等。在存储系统上面是Spark集群模式(也可以认为是资源管理层),这包括Spark自带的独立部署模式、YARN和Mesos集群资源管理模式,也可以是Amazon EC2。Spark内核之上是为应用提供各类服务的组件。Spark内核API支持Java、Python、Scala等编程语言。Spark Streaming提供高可靠性、高吞吐量的实时流式处理服务,能够满足实时系统要求;MLib提供机器学习服务,Spark SQL提供了性能比Hive快了很多倍的SQL查询服务,GraphX提供图计算服务。
图9-1 Spark 框架
从上图看出,Spark有效集成了Hadoop组件,可以基于Hadoop YARN作为资源管理框架,并从HDFS和HBase数据源上读取数据。YARN是Spark目前主要使用的资源管理器。Hadoop能做的,Spark基本都能做,而且做的比Hadoop好。Spark依然是Hadoop生态圈的一员,它替换的主要是MR的计算模型而已。资源调度依赖于YARN,存储则依赖于HDFS。
Spark的大数据处理平台是建立在统一抽象的RDD之上。RDD是弹性分布式数据集(Resilient Distributed Dataset)的英文简称,它是一种特殊数据集合,支持多种来源,有容错机制,可以被缓存,支持并行操作。Spark的一切都是基于RDD的。RDD就是Spark输入的数据。
Spark应用程序在集群上以独立进程集合的形式运行。如图9-2所示,主程序(叫做Driver程序)中的SparkContext对象协调Spark应用程序。SparkContext对象首先连接到多种集群管理器(如:YARN),然后在集群节点上获得Executor。SparkContext把应用代码发给Executor,Executor负责应用程序的计算和数据存储。
图9-2 集群模式
每个应用程序都拥有自己的Executor。Executor为应用程序提供了一个隔离的运行环境,以Task的形式执行作业。对于Spark Shell来说,这个Driver就是与用户交互的进程。
9.1.1 安装Spark
最新的Spark版本是1.6.1。它可以运行在Windows或Linux机器上。运行 Spark 需要 Java JDK 1.7,CentOS 6.x 系统默认只安装了 Java JRE,还需要安装 Java JDK,并确保配置好 JAVA_HOME、PATH和CLASSPATH变量。此外,Spark 会用到 HDFS 与 YARN,因此读者要先安装好 Hadoop。我们可以从Spark官方网站http://spark.apache.org/downloads.html上下载Spark,如图9-3所示。
图9-3 下载安装包
有几种Package type,分别为:
l Source code:Spark 源码,需要编译才能使用。
l Pre-build with user-provided Hadoop:“Hadoop free”版,可应用到任意 Hadoop 版本。
l Pre-build for Hadoop 2.6 and later:基于 Hadoop 2.6 的预编译版,需要与本机安装的 Hadoop 版本对应。可选的还有 Hadoop 2.4 and later、Hadoop 2.3、Hadoop 1.x,以及 CDH 4。
本书选择的是 Pre-build with user-provided Hadoop,简单配置后可应用到任意 Hadoop 版本。下载后,执行如下命令进行安装:
sudo tar -zxf spark-1.6.1-bin-without-hadoop.tgz -C /usr/local/
cd /usr/local
sudo mv ./spark-1.6.1-bin-without-hadoop/ ./spark
sudo chown -R hadoop:hadoop ./spark
9.1.2 配置Spark
安装后,进入conf目录,以spark-env.sh.template文件为模块创建spark-env.sh文件,然后修改其配置信息,命令如下:
cd /usr/local/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
编辑 ./conf/spark-env.sh(vim ./conf/spark-env.sh),在文件的最后加上如下一行:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
保存后,Spark 就可以启动和运行了。在 ./examples/src/main 目录下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等语言的版本。我们可以先运行一个示例程序 SparkPi(即计算 π 的近似值),执行如下命令:
cd /usr/local/spark
./bin/run-example SparkPi
执行时会输出非常多的运行信息,输出结果不容易找到,可以通过 grep 命令进行过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中):
./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"
过滤后的运行结果为 π 的 5 位小数近似值 。
9.2 Spark Shell
以前的统计和机器学习依赖于数据抽样。从统计的角度来看,抽样如果足够随机,其实可以很精准地反应全集的结果,但事实上往往很难做到随机,所以通常做出来也会不准。现在大数据解决了这个问题,它不是通过优化抽样的随机来解决,而是通过全量数据来解决。要解决全量的数据就需要有强大的处理能力,Spark首先具备强大的处理能力,其次Spark Shell带来了即席查询。做算法的工程师,以前经常是在小数据集上跑个单机,然后看效果不错,一到全量上,就可能和单机效果很不一样。有了Spark后就不一样了,尤其是有了Spark Shell。可以边写代码,边运行,边看结果。Spark提供了很多的算法,最常用的是贝叶斯、word2vec、线性回归等。作为算法工程师,或者大数据分析师,一定要学会用Spark Shell。
Spark Shell 提供了简单的方式来学习 Spark API,也提供了交互的方式来分析数据。Spark Shell 支持 Scala 和 Python,本书选择使用 Scala 来进行介绍。Scala集成了面向对象和函数语言的特性,并运行于Java 虚拟机之上,兼容现有的 Java 程序。Scala 是 Spark 的主要编程语言,如果仅仅是写 Spark 应用,并非一定要用 Scala,用Java和Python都是可以的。使用 Scala 的优势是开发效率更高,代码更精简,并且可以通过 Spark Shell 进行交互式实时查询,方便排查问题。执行如下命令启动 Spark Shell:
./bin/spark-shell
启动成功后会有“scala >”的命令提示符。这表明已经成功启动了Spark Shell。在 Spark Shell 启动时,输出日志的最后有这么几条信息:
16/04/16 17:25:47 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
这些信息表明 SparkContext已经初始化好了,可通过对应的sc变量直接进行访问。Spark 的主要抽象是分布式的数据集合RDD,它可被分发到集群各个节点上,进行并行操作。一个RDD可以通过 Hadoop InputFormats 创建(如 HDFS),或者从其他 RDDs转化而来。下面我们从 ./README 文件新建一个 RDD,代码如下:
scala>val textFile = sc.textFile("file:///usr/local/spark/README.md")
上述的sc是Spark创建的SparkContext,我们使用SparkContext对象加载本地文件README.md来创建RDD。输出结果如下:
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :27
上述返回结果为一个MapPartitionsRDD文件。需要说明的是,加载HDFS文件和本地文件都是使用textFile ,区别在于前缀“hdfs://”为HDFS文件,而“file:// ”为本地文件。上述代码中通过“file://”前缀指定读取本地文件,直接返回MapPartitionsRDD。Spark Shell默认方式是读取HDFS中的文件。从HDFS读取的文件先转换为HadoopRDD,然后隐式转换成MapPartitionsRDD。
上面的例子使用Spark中的文本文件README.md创建一个RDD textFile,文件中包含了若干文本行。将该文本文件读入RDD textFile时,其中的文本行将被分区,以便能够分发到集群中并行化操作。我们可以想象,RDD有多个分区,每个分区上有多行的文本内容。RDDs 支持两种类型的操作:
l actions:在数据集上运行计算后返回结果值。
l transformations:转换。从现有RDD创建一个新的RDD。
下面我们演示count()和first()操作:
scala>textFile.count() // RDD 中的 item 数量,对于文本文件,就是总行数
输出结果为:
res0: Long = 95
scala>textFile.first() // RDD 中的第一个 item,对于文本文件,就是第一行内容
输出结果为:
res1: String = # Apache Spark
上面这两个例子都是action的例子。接着演示 transformation,通过 filter transformation来筛选出包含 Spark 的行,返回一个新的RDD,代码如下:
scala>val linesWithSpark = textFile.filter(line => line.contains("Spark"))
scala>linesWithSpark.count() // 统计行数
上面的linesWithSpark RDD有多个分区,每个分区上只有包含了Spark的若干文本行。输出结果为:
res4: Long = 17
上述结果表明一共有17行内容包含“Spark”,这与通过 Linux 命令 cat ./README.md | grep "Spark" -c 得到的结果一致,说明是正确的。action 和 transformation 可以用链式操作的方式结合使用,使代码更为简洁:
scala>textFile.filter(line => line.contains("Spark")).count() // 统计包含 Spark 的行数
RDD的actions和transformations可用在更复杂的计算中。例如,通过如下代码可以找到包含单词最多的那一行内容共有几个单词:
scala>textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
输出结果为:
res5: Int = 14
上述代码将每一行文本内容使用split进行分词,并统计分词后的单词数。将每一行内容map为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行reduce操作,找到最大的数。map()、reduce()中的参数是Scala的函数字面量(function literals),并且可以使用Scala/Java的库。例如,通过使用 Math.max() 函数(需要导入Java的Math库),可以使上述代码更容易理解:
scala>import java.lang.Math
scala>textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
词频统计(WordCount)是Hadoop MapReduce的入门程序,Spark可以更容易地实现。首先结合flatMap、map和reduceKey来计算文件中每个单词的词频:
scala>val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
输出结果为(string,int)类型的键值对ShuffledRDD。这是因为reduceByKey操作需要进行Shuffle操作,返回的是一个Shuffle形式的ShuffleRDD:
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :29
然后使用collect聚合单词计算结果:
scala>wordCounts.collect()
输出结果为:
res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing,1), (Because,1), (The,1)...)
Spark 支持将数据缓存在集群的内存缓存中,当数据需要反复访问时这个特征非常有用。调用 cache(),就可以将数据集进行缓存:
scala>textFilter.cache()
9.3 Spark编程
无论Windows或Linux操作系统,都是基于Eclipse或Idea构建开发环境,通过Java、Scala或Python语言进行开发。根据开发语言的不同,我们需要预先准备好JDK、Scala或Python环境,然后在Eclipse中下载安装Scala或Python插件。
下面我们通过一个简单的应用程序 SimpleApp 来演示如何通过 Spark API 编写一个独立应用程序。不同于使用Spark Shell自动初始化的SparkContext,独立应用程序需要自己初始化一个SparkContext,将一个包含应用程序信息的SparkConf对象传递给SparkContext构造函数。对于独立应用程序,使用 Scala 编写的程序需要使用 sbt 进行编译打包,相应地,Java 程序使用 Maven 编译打包,而 Python 程序通过 spark-submit 直接提交。
在终端中执行如下命令,创建一个文件夹 sparkapp 作为应用程序根目录:
cd ~ # 进入用户主文件夹
mkdir ./sparkapp # 创建应用程序根目录
mkdir -p ./sparkapp/src/main/scala # 创建所需的文件夹结构
9.3.1 编写Spark API程序
在./sparkapp/src/main/scala下建立一个名为SimpleApp.scala 的文件(vim ./sparkapp/src/main/scala/SimpleApp.scala),添加代码如下:
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
//使用关键字def声明函数,必须为函数指定参数类型
def main(args: Array[String]) {
val logFile = "file:///usr/local/spark/README.md" // 一个本地文件
//创建SparkConf对象,该对象包含应用程序的信息
val conf = new SparkConf().setAppName("Simple Application")
//创建SparkContext对象,该对象可以访问Spark集群
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
//line=>line.contains(..)是匿名函数的定义,line是参数
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
上述程序计算 /usr/local/spark/README 文件中包含 “a” 的行数和包含 “b” 的行数。不同于 Spark Shell,独立应用程序需要通过“val sc = new SparkContext(conf)”初始化 SparkContext,SparkContext 的参数 SparkConf 包含了应用程序的信息。
9.3.2 使用sbt编译并打成jar包
该程序依赖 Spark API,因此我们需要通过sbt(或mvn)进行编译打包。我们以sbt为例,创建一个包含应用程序代码的jar包。在 ./sparkapp 中新建文件 simple.sbt(vim ./sparkapp/simple.sbt),添加如下内容,声明该独立应用程序的信息以及与 Spark 的依赖关系:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
文件 simple.sbt 需要指明Spark和Scala的版本。上述版本信息可以从Spark Shell获得。我们启动Spark Shell的过程中,当输出到 Spark 的符号图形时,可以看到相关的版本信息。
Spark中没有自带sbt,需要手动安装sbt,我们选择安装在/usr/local/sbt中:
……
展开