本书其它笔记Fast Data Processing with Spark 2(Third Edition)》读书笔记目

Spark概览

Apache Spark is a fast and general-purpose cluster computing system

本文基于目前最新的spark版本2.2.1整理,以Python3.6为例,在Mac单机情况下使用。

安装

部署环境需求

  • Window, Linux, Mac均可
  • Java8+(需要设置JAVA_HOME环境变量)

下面三个根据自己习惯使用的语言选择安装

  • Python2.7+/3.4+
  • R 3.1+
  • 为了使用Scala API, 需要安装完整的Scala 2.11.x版本

解压安装

Spark官网选择合适的版本下载。

1
2
3
$ wget https://www.apache.org/dyn/closer.lua/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz

$ tar -zxvf spark-2.2.1-bin-hadoop2.7.tgz

解压后即可使用, 不过在使用之前,让我们先配置几个常用的环境变量:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# SPARK_HOME根据本机实际情况修改路径
export SPARK_HOME="/path/to/spark-2.2.1-bin-hadoop2.7"

# 当用pyspark时默认使用ipyton
export PYSPARK_DRIVER_PYTHON="ipython"

# 如果习惯使用Jupyter, 可以添加下面的配置,那么在用pyspark时,默认会调用Jupyter notebook,否则为默认的ipython console
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

# 添加到PATH,方便随时调用pyspark命令
export PATH="$SPARK_HOME/bin:$PATH"

关于Spark集群的搭建,练习阶段暂时不会使用到,等以后需要再补充。

快速开始

运行例子和交互式shell

Spark内置了一些Scala, Java列子,同时也有一些Python和R语言的例子在example/src/main目录里。

为了运行Scala, Java 例子,在$SPARK_HOME目录下, 用bin/run-example <class> [params]形式运行。如

1
./bin/run-example SparkPi 10

也可以启动一个Scala语言的交互式的Shell

1
./bin/spark-shell --master local[4]

--master选项设置在本地以两个线程运行。 local[N]中的N一般设置为本机的CPU核数。更多关于--master的选项可以参考Master URLs

启动Python语言的交互式Shell

1
./bin/pyspark --master local[4]

运行Python例子

1
./bin/spark-submit examples/src/main/python/pi.py 10

启动R语言的交互式Shell

./bin/sparkR --master local[4]

R语言的例子

./bin/spark-submit examples/src/main/r/dataframe.R

使用Spark shell做交互式分析

进入到$SPARK_HOME目录下, 启动pyspark

1
$ ./bin/pyspark
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 加载README.md文件
>>> textFile = spark.read.text("README.md")

# 统计文件行数
>>> textFile.count()
103

# 获取第一行
>>> textFile.first()
Row(value='# Apache Spark')

# 过滤出包含"Spark"的行
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

# 统计包含"Spark"的行的行数
>>> textFile.filter(textFile.value.contains("Spark")).count()

>>> from pyspark.sql.functions import *

# 获取包含单词最多行的单词个数
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max
... (col("numWords"))).collect()
[Row(max(numWords)=22)]

# 统计单词出现的频次
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).name("wor
... d")).groupBy("word").count()

>>> wordCounts.collect()
[Row(word='online', count=1),
 Row(word='graphs', count=1),
 ...
]

# 缓存
>>> linesWithSpark.cache()
DataFrame[value: string]

>>> linesWithSpark.count()
20

在使用Spark Shell时,也会开启一个Spark monitor UI,默认在本地4040端口运行。

Spark Monitor UI

创建一个简单的应用

一个简单的应用统计文件中包含字符"a"和"b"的行数的脚本SimpleApp.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
$ cat SimpleApp.py
"""SimpleApp.py"""
from pyspark.sql import SparkSession

appName='Simple Application'
logFile = "README.md"  # Should be some file on your system
spark = SparkSession.builder.appName(appName).getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

运行应用

1
$ ./bin/spark-submit --master "local[4]" SimpleApp.py

创建SparkSession对象

一个SparkSession对象表示与本地(或远程)的Spark集群连接,是与Spark交互的主要入口。

为了建立到Spark的连接,SparkSession需要配置如下信息:

  • Master URL: spark服务器的连接地址, 具体可参考Master URLs
  • Application Name: 便于识别的应用名
  • Spark Home: spark的安装路径
  • JARs: 任务依赖的JAR包路径

SparkSession vs SparkContext

SparkSessionSparkContext两者之间有什么关系呢? 让我们回顾一下spark的发展历史,看能不能找到一点线索:

在Spark 2.0.0 之前。主要有三个连接对象:

  • SparkContext: 主要连接Spark执行环境和完成进行创建RDDs等其它功能
  • SqlContext: 和SparkSQL一起在SparkContext背后工作
  • HiveContext: 和Hive仓库交互

Saprk 2.0.0之后,引入了Datasets/DataFrames作为主要的分数式数据抽象接口。SparkSession对象成为了Spark操作的主要入口。有几点需要注意:

  1. 在Scala和Java中Datasets是主要的数据抽象类型。而在R和Python语言中,DataFrame是主要的抽象类型。它们在API操作上基本没有区别(可以认为是一个东西,只是在不同语言里叫法不一样)

  2. 尽管在2.0.0之后,Datasets/DataFrames作为新的接口形式,但是RDDs依然在被使用,所以当操作RDDs,主要使用SparkContext

  3. SparkSession对象包含了SparkContext对象。在Spark 2.0.0版本之后,SparkContext仍然是作为连接Spark集群的管道,因此SparkContext主要执行环境操作,比如: 累加器(accumulators),addFile, addJars等。而SparkSession则用于读取和创建Datasets/DataFrames等操作。

创建一个SparkSession对象

我们可以使用下面的语句创建一个SparkSession对象

1
2
3
4
>>> from pyspark.sql import SparkSession

>>> sparkSession = SparkSession.builder.appName('app name').\
                        master('local').getOrCreate()

其实当我们运行pyspark时,其实就已经自动创建了一个SparkSession并且分配给了spark变量。

pyspark脚本里面,我们可以看到下面一句话。

1
export PYTHONSTARTUP="${SPARK_HOME}/python/pyspark/shell.py"

它设置了环境变量PYTHONSTARTUP, 那么在我们运行pyspark时,会先自动运行脚本

1
${SPARK_HOME}/python/pyspark/shell.py

我们再来看看这个脚本里有些什么,下图是脚本的部分截图:

从红色框里的代码,我们可以看到创建了SparkSession对象并赋值给sparkSparkSession包含了SparkContext`(spark.sparkContext), 并且使用sc指向SparkContext`。

简要的说,我们一般采用如下规则:

  • 创建一个SparkSession对象
  • 使用SparkSession读取,创建SQL语句的视图,创建Datasets/DataFrames
  • SparkSession得到sparkContext, 用于完成累加器(accumulators),分发缓存文件和RDDs的相关操作。

sparkContext元数据

sparkContext包含了一些实用的元数据信息。

获取spark版本信息

1
2
3
4
5
>>> spark.version
u'2.2.1'

>>> sc.version
u'2.2.1'

获取部署的应用名

1
2
>>> sc.appName
u'PySparkShell'

获取内存信息(Python中暂时没有实现)

1
2
3
4
5
6
7
>>> sc.getExecutorMemoryStatus
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-4-755a4391f6a7> in <module>()
----> 1 sc.getExecutorMemoryStatus

AttributeError: 'SparkContext' object has no attribute 'getExecutorMemoryStatus'

但是Scala语言中有

1
2
scala> sc.getExecutorMemoryStatus
res0: scala.collection.Map[String,(Long, Long)] = Map(169.254.93.254:55267 -> (384093388,384093388))

169.254.93.254:55267代办主机信息。 (384093388,384093388)分别代表当前分配的最大内存和剩余内存

获取主机信息

1
2
>>> sc.master
u'local[*]'

获取配置信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
>>> sc.getConf().toDebugString().split('\n')
[u'spark.app.id=local-1515662568196',
 u'spark.app.name=PySparkShell',
 u'spark.driver.host=169.254.93.254',
 u'spark.driver.port=55105',
 u'spark.executor.id=driver',
 u'spark.master=local[*]',
 u'spark.rdd.compress=True',
 u'spark.serializer.objectStreamReset=100',
 u'spark.sql.catalogImplementation=hive',
 u'spark.submit.deployMode=client']