Apache Spark入门

Apache Spark小白入门教程

Spark本地模式快速(十秒钟)入门

参考:http://spark.apache.org/docs/latest/quick-start.html

解压缩spark包,在本地测试,不需要安装hadoop,直接启动spark-shell

1
2
3
4
5
6
7
8
9
➜  spark-1.4.0-bin-hadoop2.6  bin/spark-shell
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25)
15/06/28 10:36:07 INFO ui.SparkUI: Started SparkUI at http://127.0.0.1:4040
15/06/28 10:36:07 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
15/06/28 10:36:08 INFO hive.HiveContext: Initializing execution hive, version 0.13.1
15/06/28 10:36:23 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.
scala>

spark-shell默认创建了SparkContext scSqlContext sqlContext,下面开始试验一些RDD操作

第一个例子: 统计一个文本文件的单词数量.调用sc.textFile(fileName)会生成一个MapPartitionsRDD

1
2
3
4
5
6
7
8
scala> val textFile = sc.textFile("README.md")
15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(63424) called with curMem=0, maxMem=278019440
15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.9 KB, free 265.1 MB)
15/06/28 10:36:45 INFO storage.MemoryStore: ensureFreeSpace(20061) called with curMem=63424, maxMem=278019440
15/06/28 10:36:45 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.6 KB, free 265.1 MB)
15/06/28 10:36:45 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58638 (size: 19.6 KB, free: 265.1 MB)
15/06/28 10:36:45 INFO spark.SparkContext: Created broadcast 0 from textFile at <console>:21
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

调用上面生成的textFile RDD的count()会触发一个Action.

注意:由于本机已经安装了Hadoop,使用的是伪分布式模式,所以Spark会读取Hadoop的配置信息.
我们这里先不启动Hadoop,使用本地模式,要手动添加file:///并使用绝对路径读取文本文件.
重新构造读取本地文本文件的textFile RDD,并调用count()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scala> textFile.count()
java.net.ConnectException: Call From hadoop/127.0.0.1 to localhost:9000 failed on
connection exception: java.net.ConnectException: 拒绝连接;
For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
...
Caused by: java.net.ConnectException: 拒绝连接
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
...
scala> textFile
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

scala> val textFile = sc.textFile("file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

scala> textFile.count()
15/06/28 10:44:07 INFO scheduler.DAGScheduler: Job 0 finished: count at <console>:24, took 0.275609 s
res2: Long = 98

又一个Action RDD : 输出文本文件的第一行

1
2
3
scala> textFile.first()
15/06/28 10:44:27 INFO scheduler.DAGScheduler: Job 1 finished: first at <console>:24, took 0.017917 s
res3: String = # Apache Spark

统计包含了Spark这个单词一共有几行

1
2
3
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:23
scala> textFile.filter(line => line.contains("Spark")).count()

文本文件中长度最长的那一行,它一共有多少个单词

1
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

单词计数(WordCount)

1
2
3
4
5
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:23

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (systems.,1...

RDD缓存(Cache),第一次计算花了0.05s,第二次计算的时间只有0.01s。

1
2
3
4
5
6
7
8
9
10
scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[4] at filter at <console>:23

scala> linesWithSpark.count()
15/06/28 10:47:11 INFO scheduler.DAGScheduler: Job 5 finished: count at <console>:26, took 0.054036 s
res8: Long = 19

scala> linesWithSpark.count()
15/06/28 10:47:14 INFO scheduler.DAGScheduler: Job 6 finished: count at <console>:26, took 0.016638 s
res9: Long = 19

用表格的形式列举出来RDD转换操作的几个实验步骤。注意:上面只有Action操作才有编号,没有Action没有编号,比如cache()就不是Action

步骤 动作
0 val textFile = sc.textFile(“file:///home/hadoop/soft/spark-1.4.0-bin-hadoop2.6/README.md”)
textFile.count()
1 textFile.first()
2 val linesWithSpark = textFile.filter(line => line.contains(“Spark”))
linesWithSpark.count()
3 textFile.map(line => line.split(“ “).size).reduce((a, b) => if (a > b) a else b)
val wordCounts = textFile.flatMap(line => line.split(“ “)).map(word => (word, 1)).reduceByKey((a, b) => a + b)
4 wordCounts.collect()
5 linesWithSpark.cache();
linesWithSpark.count()
6 linesWithSpark.count()
7 linesWithSpark.count()

spark-shell启动时会打印:INFO ui.SparkUI: Started SparkUI at http://127.0.0.1:4040
打开http://127.0.0.1:4040,这个页面是Spark的WebUI页面。最主要的有三个Tab:Jobs, Stages, Storage。

Jobs & Stages

Jobs: 上面每个Action RDD编号对应了下图中的Job Id.

Stages: 上面有8个Job, 但是Stages多了一个(一共有9个Stages). 其实是④的collect()有两个stage

在Jobs中点击Job Id=4的collect RDD(输出WordCount的结果). 在下方的列表中可以看到有2个Stages
仔细观察列表的最后面两列, 分别是Shuffle Read和Shuffle Write.
其中map会进行Shuffle Write, collect会进行Shuffle Read

点击Stage Id=4的map. 它的DAG可视化图和上面的概览图的左侧是一样的

Spark的WebUI还提供了一个EventTime,可以很清楚地看到每个阶段消耗的时间

回退,点击Stage Id=5的collect

Storage: 只有在cache()之后,执行完一次Action才有。


Spark Standalone集群安装(30分钟~1小时)

Standalone译为单机、独立式的,但是并不是说Standalone只有一台机器,它也可以有分布式/集群模式,主要有两个组件:

  • Master
  • Worker

s

1.准备工作:

1
2
3
4
安装hadoop分布式集群,启动hdfs和yarn
master无密码ssh到slaves(将master的pub追加到所有slaves的authorized_keys)
关闭所有节点的防火墙(chkconfig iptables off)
安装scala-2.10,并设置~/.bashrc

2.修改spark-env.sh的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ vi $SPARK_HOME/conf/spark-env.sh
#环境变量
export JAVA_HOME=/usr/java/jdk1.7.0_51
export SCALA_HOME=/usr/install/scala-2.10.5

#最简配置
export HADOOP_HOME=/usr/install/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_MASTER_IP=dp0652
export MASTER=spark://dp0652:7077
#export SPARK_LOCAL_IP=dp0652
export SPARK_LOCAL_DIRS=/usr/install/spark-1.4.0-bin-hadoop2.6

#其他配置
export SPARK_MASTER_WEBUI_PORT=8082
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=8g

在新版本中,SPARK_HOME设置为Spark的安装目录,SPARK_LOCAL_DIRS为临时文件夹,存放spark的运行时作业信息。

有几个端口信息:

  • 8082:master web ui
  • 7077:master port
  • 上一节的4040是应用程序的端口,不同应用程序的端口从4040不断增加

3.添加slaves文件,指定所有的Worker

1
2
3
4
5
6
vi conf/slaves
dp0652
dp0653
dp0655
dp0656
dp0657

4.将spark目录分发到集群的其他节点

1
2
3
4
5
cd ..
scp -r $SPARK_HOME dp0653:/usr/install
scp -r $SPARK_HOME dp0655:/usr/install
scp -r $SPARK_HOME dp0656:/usr/install
scp -r $SPARK_HOME dp0657:/usr/install

由于集群中dp0652和dp0653的内存比较大, 我们修改了这两个节点的spark-env.sh,并且启动了两个Worker示例。

1
2
export SPARK_WORKER_INSTANCES=2
export SPARK_WORKER_MEMORY=20g

5.启动集群, 在master上执行start-all.sh就可以启动Spark Master和所有的Worker.

1
2
3
4
5
6
7
8
9
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.master.Master-1-dp0652.out
dp0656: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0656.out
dp0655: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0655.out
dp0657: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0657.out
dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0652.out
dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-1-dp0653.out
dp0652: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0652.out
dp0653: starting org.apache.spark.deploy.worker.Worker, logging to /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.deploy.worker.Worker-2-dp0653.out

6.在master(0652)和slaves(0653,0655)上查看Spark进程。
可以看到master 0652上有一个Spark Master、两个Spark Worker。
0653上有两个Spark Worker,0655上有一个Spark Worker。

1
2
3
4
5
6
7
8
9
10
11
[qihuang.zheng@dp0652 ~]$ jps -lm
40708 org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082
41095 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077
40926 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077

[qihuang.zheng@dp0653 ~]$ jps -lm
27153 org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://dp0652:7077
27029 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077

[qihuang.zheng@dp0655 ~]$ jps -lm
8766 org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077

7.查看web ui: http://dp0652:8082/

8.spark-shell可以交互式地执行Spark代码,spark-submit则用于提交jar包。

1
2
3
4
5
6
bin/spark-shell --master spark://dp0652:7077 --executor-memory 4g

bin/spark-submit --master spark://dp0652:7077 \
--class org.apache.spark.examples.SparkPi \
--executor-memory 4g --total-executor-cores 2 \
lib/spark-examples-1.4.0-hadoop2.6.0.jar 1000

问题

1.如果配置了SPARK_LOCAL_IP, 但是并没有在slaves上修改为自己的IP,则会报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
15/07/02 09:04:08 ERROR netty.NettyTransport: failed to bind to /192.168.6.52:0, shutting down Netty transport
Exception in thread "main" java.net.BindException: Failed to bind to: /192.168.6.52:0: Service 'sparkWorker' failed after 16 retries!
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/07/02 09:04:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/07/02 09:04:09 INFO util.Utils: Shutdown hook called

原因分析: SPARK_LOCAL_IP指的是本机IP地址,因此分发到集群的不同节点上,都要到各自的节点修改为自己的IP地址.
如果集群节点比较多,则比较麻烦, 可以用SPARK_LOCAL_DIRS代替.

2.如果没有配置export MASTER, 在worker上会报错:

1
2
3
4
5
6
7
5/07/02 08:40:51 INFO worker.Worker: Retrying connection to master (attempt # 12)
15/07/02 08:40:51 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master...
15/07/02 08:40:51 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@dp0652:7077].
Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.
Reason: 拒绝连接: dp0652/192.168.6.52:7077
15/07/02 08:41:23 ERROR worker.Worker: RECEIVED SIGNAL 15: SIGTERM
15/07/02 08:41:23 INFO util.Utils: Shutdown hook called

导致的后果是虽然slaves上都启动了Worker进程(使用jps查看),但是在Master上并没有看到workers. 这时候应该查看Master上的日志.
master上启动成功显示的日志是spark@dp0652:7077. 而上面却显示的是sparkMaster@dp0652:7077. 所以应该手动export MASTER

3.最后成功启动集群, 在Master上的日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.master.Master --ip dp0652 --port 7077 --webui-port 8082
========================================
15/07/02 09:27:49 INFO master.Master: Registered signal handlers for [TERM, HUP, INT]
15/07/02 09:27:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/07/02 09:27:50 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng
15/07/02 09:27:50 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng
15/07/02 09:27:50 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng)
15/07/02 09:27:51 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/02 09:27:51 INFO Remoting: Starting remoting
15/07/02 09:27:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@dp0652:7077]
15/07/02 09:27:51 INFO util.Utils: Successfully started service 'sparkMaster' on port 7077.
15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@dp0652:6066
15/07/02 09:27:51 INFO util.Utils: Successfully started service on port 6066.
15/07/02 09:27:51 INFO rest.StandaloneRestServer: Started REST server for submitting applications on port 6066
15/07/02 09:27:51 INFO master.Master: Starting Spark master at spark://dp0652:7077
15/07/02 09:27:51 INFO master.Master: Running Spark version 1.4.0
15/07/02 09:27:51 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/02 09:27:51 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8082
15/07/02 09:27:51 INFO util.Utils: Successfully started service 'MasterUI' on port 8082.
15/07/02 09:27:51 INFO ui.MasterWebUI: Started MasterWebUI at http://192.168.6.52:8082
15/07/02 09:27:52 INFO master.Master: I have been elected leader! New state: ALIVE
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.52:35398 with 1 cores, 20.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.56:60106 with 1 cores, 8.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.55:50995 with 1 cores, 8.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM
15/07/02 09:27:54 INFO master.Master: Registering worker 192.168.6.57:34020 with 1 cores, 8.0 GB RAM
15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.52:55912 with 1 cores, 20.0 GB RAM
15/07/02 09:27:56 INFO master.Master: Registering worker 192.168.6.53:35846 with 1 cores, 20.0 GB RAM

在53的其中一个Worker上的日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Spark Command: /usr/java/jdk1.7.0_51/bin/java -cp /usr/install/spark-1.4.0-bin-hadoop2.6/sbin/../conf/:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/install/spark-1.4.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/install/hadoop/etc/hadoop/:/usr/install/hadoop/etc/hadoop/ -Xms512m -Xmx512m -XX:MaxPermSize=128m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://dp0652:7077
========================================
15/07/02 09:27:52 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT]
15/07/02 09:27:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/07/02 09:27:52 INFO spark.SecurityManager: Changing view acls to: qihuang.zheng
15/07/02 09:27:52 INFO spark.SecurityManager: Changing modify acls to: qihuang.zheng
15/07/02 09:27:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qihuang.zheng); users with modify permissions: Set(qihuang.zheng)
15/07/02 09:27:53 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/02 09:27:53 INFO Remoting: Starting remoting
15/07/02 09:27:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@192.168.6.53:55994]
15/07/02 09:27:54 INFO util.Utils: Successfully started service 'sparkWorker' on port 55994.
15/07/02 09:27:54 INFO worker.Worker: Starting Spark worker 192.168.6.53:55994 with 1 cores, 20.0 GB RAM
15/07/02 09:27:54 INFO worker.Worker: Running Spark version 1.4.0
15/07/02 09:27:54 INFO worker.Worker: Spark home: /usr/install/spark-1.4.0-bin-hadoop2.6
15/07/02 09:27:54 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/02 09:27:54 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081
15/07/02 09:27:54 INFO util.Utils: Successfully started service 'WorkerUI' on port 8081.
15/07/02 09:27:54 INFO ui.WorkerWebUI: Started WorkerWebUI at http://192.168.6.53:8081
15/07/02 09:27:54 INFO worker.Worker: Connecting to master akka.tcp://sparkMaster@dp0652:7077/user/Master...
15/07/02 09:27:54 INFO worker.Worker: Successfully registered with master spark://dp0652:7077

Spark Standalone HA(30分钟)

参考文档:

简单问答:

  • 为什么需要HA?因为Master只有一个节点,会出现单点故障。如果Master挂掉了,Spark集群就不可用。
  • 怎么实现HA?使用ZooKeeper,启动两个Master,只有一个Master会起作用,另一个是Standby。

以前面的五台机器为例,实现HA的部署结构:

node host
masters dp0652,dp0653
slaves dp0655,dp0656,dp0657

1.修改配置文件:

1
2
3
$ vi spark-env.sh
export SPARK_MASTER_IP=dp0652
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=dp0655:2181,dp0656:2181,dp0657:2181 -Dspark.deploy.zookeeper.dir=/spark"

2.修改slaves

1
2
3
4
$ vi slaves
dp0655
dp0656
dp0657

3.将配置文件分发到集群所有节点

1
2
3
4
5
6
7
8
scp spark-env.sh dp0653:/usr/install/spark-1.4.0-bin-hadoop2.6/conf
scp spark-env.sh dp0655:/usr/install/spark-1.4.0-bin-hadoop2.6/conf
scp spark-env.sh dp0656:/usr/install/spark-1.4.0-bin-hadoop2.6/conf
scp spark-env.sh dp0657:/usr/install/spark-1.4.0-bin-hadoop2.6/conf
scp slaves dp0653:/usr/install/spark-1.4.0-bin-hadoop2.6/conf
scp slaves dp0655:/usr/install/spark-1.4.0-bin-hadoop2.6/conf
scp slaves dp0656:/usr/install/spark-1.4.0-bin-hadoop2.6/conf
scp slaves dp0657:/usr/install/spark-1.4.0-bin-hadoop2.6/conf

4.在dp0653上修改spark-env.sh

1
export SPARK_MASTER_IP=dp0653

5.在dp0652上启动集群: sbin/start-all.sh

6.在dp0653上启动Master: sbin/start-master.sh

7.可以看到dp0652是active, dp0653是standby

8.关闭dp0652的master: sbin/stop-master.sh

9.观察dp0653是否成为master:

10.执行应用程序. 注意–master现在有多个

1
2
3
4
bin/spark-submit --master spark://dp0652:7077,dp0653:7077 \
--class org.apache.spark.examples.SparkPi \
--executor-memory 4g --total-executor-cores 2 \
lib/spark-examples-1.4.0-hadoop2.6.0.jar 10

10.观察dp0652的master日志:

1
2
3
4
5
6
15/07/07 14:41:50 INFO Master: Registering worker 192.168.6.55:58543 with 1 cores, 8.0 GB RAM
15/07/07 14:41:50 INFO Master: Registering worker 192.168.6.56:37859 with 1 cores, 8.0 GB RAM
15/07/07 14:41:50 INFO Master: Registering worker 192.168.6.57:34379 with 1 cores, 8.0 GB RAM

15/07/07 14:45:01 ERROR Master: RECEIVED SIGNAL 15: SIGTERM
15/07/07 14:45:01 INFO Utils: Shutdown hook called

11.观察dp0653的master日志:

1
2
3
4
5
6
7
8
9
10
11
15/07/07 14:42:03 INFO ConnectionStateManager: State change: CONNECTED

15/07/07 14:45:35 INFO ZooKeeperLeaderElectionAgent: We have gained leadership
15/07/07 14:45:36 INFO Master: I have been elected leader! New state: RECOVERING
15/07/07 14:45:36 INFO Master: Trying to recover worker: worker-20150707144149-192.168.6.56-37859
15/07/07 14:45:36 INFO Master: Trying to recover worker: worker-20150707144149-192.168.6.55-58543
15/07/07 14:45:36 INFO Master: Trying to recover worker: worker-20150707144149-192.168.6.57-34379
15/07/07 14:45:36 INFO Master: Worker has been re-registered: worker-20150707144149-192.168.6.55-58543
15/07/07 14:45:36 INFO Master: Worker has been re-registered: worker-20150707144149-192.168.6.56-37859
15/07/07 14:45:36 INFO Master: Worker has been re-registered: worker-20150707144149-192.168.6.57-34379
15/07/07 14:45:36 INFO Master: Recovery complete - resuming operations!

Spark SQL

Hive on Spark

编译支持hive的spark

1
2
3
mvn -Pyarn -Dyarn.version=2.6.0 -Phadoop-2.6 -Dhadoop.version=2.6.0 \
-Phive -Phive-0.13.1 -Phive-thriftserver \
-DskipTests clean package

如果没有编译hive on spark,而是直接把hive-site.xml分发到spark集群的conf目录下,直接启动spark-sql会报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-sql
Exception in thread "main" java.lang.RuntimeException: java.io.IOException: 权限不够
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:330)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:109)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: 权限不够
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1006)
at java.io.File.createTempFile(File.java:1989)
at org.apache.hadoop.hive.ql.session.SessionState.createTempFile(SessionState.java:432)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:328)
... 11 more
15/07/03 08:42:33 INFO util.Utils: Shutdown hook called
15/07/03 08:42:33 INFO util.Utils: Deleting directory /tmp/spark-831ff199-cf80-4d49-a22f-824736065289

这是因为Spark集群的每个Worker都需要Hive的支持,而Worker节点并没有都安装了hive. 而且spark需要编译支持hive的包.
但是重新编译hive on spark要花很多时间,可不可以直接使用集群中已经安装好的hive呢? YES!!
http://lxw1234.com/archives/2015/06/294.htm
http://shiyanjun.cn/archives/1113.html
http://www.cnblogs.com/hseagle/p/3758922.html

1.在spark-env.sh中添加

1
2
export HIVE_HOME=/usr/install/apache-hive-0.13.1-bin
export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.34.jar:$SPARK_CLASSPATH

2.将apache-hive-0.13.1-bin分发到集群中的每个节点(SparkWorker所在的节点)

1
2
cd install
scp -r apache-hive-0.13.1-bin dp0653:/usr/install/

3.拷贝apache-hive-0.13.1-bin/conf/hive-site.xml到$SPARK_HOME/conf下

1
scp apache-hive-0.13.1-bin/conf/hive-site.xml dp0653:/usr/install/spark-1.4.0-bin-hadoop2.6/conf

4.重启spark集群(standalone模式)

1
2
sbin/stop-all.sh
sbin/start-all.sh

5.测试spark-sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around.
15/07/03 10:00:56 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/install/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.34.jar:' as a work-around.

15/07/03 10:01:00 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.6.53:9083
15/07/03 10:01:00 INFO hive.metastore: Connected to metastore.
15/07/03 10:01:00 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr.
SET spark.sql.hive.version=0.13.1
SET spark.sql.hive.version=0.13.1

spark-sql> show databases;
default
test
spark-sql> use test;
spark-sql> show tables;
koudai false

spark-sql> select count(*) from koudai;
311839

SparkSQL & thrift

直接用bin/spark-sql启动:

1
2
3
4
5
[qihuang.zheng@dp0653 ~]$ jps -lm
35146 org.apache.spark.deploy.SparkSubmit --master spark://192.168.6.52:7078 --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver spark-internal
35668 sun.tools.jps.Jps -lm
[qihuang.zheng@dp0653 ~]$ ps -ef | grep SparkSQL
506 35146 35011 26 09:16 pts/15 00:00:19 /usr/java/jdk1.7.0_51/bin/java ... org.apache.spark.deploy.SparkSubmit --master spark://192.168.6.52:7078 --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver spark-internal

hive在53上, 查看thrift服务:

1
2
[qihuang.zheng@dp0653 ~]$ ps -ef | grep thrift
admin 28541 1 0 Aug19 ? 00:02:47 /usr/java/jdk1.7.0_51/bin/java ... org.apache.hadoop.util.RunJar /usr/install/apache-hive-1.2.0-bin/lib/hive-service-1.2.0.jar org.apache.hive.service.server.HiveServer2 --hiveconf hive.metastore.uris=thrift://192.168.6.53:9083 --hiveconf hive.metastore.local=false --hiveconf hive.server2.thrift.bind.host=192.168.6.53 --hiveconf hive.server2.thrift.port=10001

启动spark的thrift server:

1
2
sudo sbin/start-thriftserver.sh --master spark://192.168.6.52:7078
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /usr/install/spark-1.4.1-bin-hadoop2.6/sbin/../logs/spark-qihuang.zheng-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-dp0652.out

在启动thrift-server的时候, 指定master, 会在master的web ui上看到app. 但是启动完成后, app就结束了.
根据日志信息, 由于没有正确指定端口,导致无法连接

1
2
3
4
5
6
7
8
9
10
11
12
13
15/08/21 09:48:39 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.6.53:9083
15/08/21 09:48:39 INFO hive.metastore: Connected to metastore.
15/08/21 09:48:39 INFO service.AbstractService: Service:ThriftBinaryCLIService is started.
15/08/21 09:48:39 INFO service.AbstractService: Service:HiveServer2 is started.
15/08/21 09:48:39 INFO thriftserver.HiveThriftServer2: HiveThriftServer2 started
15/08/21 09:48:39 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead
15/08/21 09:48:39 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create ServerSocket on address /192.168.47.213:10000.
at org.apache.thrift.transport.TServerSocket.<init>(TServerSocket.java:93)
at org.apache.thrift.transport.TServerSocket.<init>(TServerSocket.java:79)
at org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
at org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
at java.lang.Thread.run(Thread.java:744)

指定hive的端口:

1
2
3
4
sudo sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10001 \
--hiveconf hive.server2.thrift.bind.host=192.168.6.53 \
--master spark://192.168.6.52:7078

查看thrift进程:

1
2
[qihuang.zheng@dp0652 spark-1.4.1-bin-hadoop2.6]$ ps -ef | grep thrift
root 24997 1 99 09:55 pts/0 00:00:12 /usr/java/jdk1.7.0_51/bin/java .. org.apache.spark.deploy.SparkSubmit --master spark://192.168.6.52:7078 --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 spark-internal --hiveconf hive.server2.thrift.port=10001 --hiveconf hive.server2.thrift.bind.host=192.168.6.53

但是日志还是报错:

1
2
15/08/21 09:55:42 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create ServerSocket on address /192.168.6.53:10001.

过了几秒,再次查看thrift进程, 找不到HiveThriftServer2了!

1
2
3
4
5
6
7
8
qihuang.zheng@dp0652 spark-1.4.1-bin-hadoop2.6]$ bin/beeline -u jdbc:hive2://192.168.6.53:10001
scan complete in 3ms
Connecting to jdbc:hive2://192.168.6.53:10001
Connected to: Apache Hive (version 1.2.0)
Driver: Spark Project Core (version 1.4.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.4.1 by Apache Hive
0: jdbc:hive2://192.168.6.53:10001> show tables;

http://lxw1234.com/archives/2015/12/593.htm

Spark-SQL配置参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/usr/install/spark-1.4.1-bin-hadoop2.4/bin/spark-shell --conf spark.driver.maxResultSize=2g --conf spark.executor.memory=512m                              

/usr/install/spark-1.4.1-bin-hadoop2.4/bin/spark-shell --driver-java-options -Dspark.driver.maxResultSize=2g
scala> sc.getConf.get("spark.driver.maxResultSize")
res1: String = 2g

/usr/install/spark-1.4.1-bin-hadoop2.4/bin/spark-sql --driver-java-options -Dspark.driver.maxResultSize=2g
/usr/install/spark-1.4.1-bin-hadoop2.4/bin/spark-sql --conf spark.driver.maxResultSize=2g

/usr/install/spark-1.4.1-bin-hadoop2.4/bin/spark-sql -h
Usage: ./bin/spark-sql [options] [cli option]
--driver-java-options Extra Java options to pass to the driver.
--conf PROP=VALUE Arbitrary Spark configuration property.
--properties-file FILE Path to a file from which to load extra properties. If not
specified, this will look for conf/spark-defaults.conf.

文章目录
  1. 1. Spark本地模式快速(十秒钟)入门
    1. 1.1. Jobs & Stages
  2. 2. Spark Standalone集群安装(30分钟~1小时)
    1. 2.1. 问题
  3. 3. Spark Standalone HA(30分钟)
  4. 4. Spark SQL
    1. 4.1. Hive on Spark
    2. 4.2. SparkSQL & thrift
    3. 4.3. Spark-SQL配置参数