Spark Stramming入门

Apache Spark Streamming小白入门教程

NetworkWordCount

1.本机运行时,修改spark-env.sh(这一步不是必须的)

1
2
3
4
5
6
7
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home
export SCALA_HOME=/Users/zhengqh/Soft/scala-2.10.5
export HADOOP_HOME=/Users/zhengqh/Soft/cdh542/hadoop-2.6.0-cdh5.4.2
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_MASTER_IP=localhost
#export MASTER=spark://localhost:7077
export SPARK_LOCAL_IP=localhost

2.开启netcat数据服务器

1
2
➜  ~  nc -lk 9999
hello world hello spark hello spark hello world

3.运行spark-streamming示例,当在nc终端输入text时,spark-streamming会实时统计过去一秒的wordcount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
➜  spark-1.4.0-bin-hadoop2.6  bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
15/07/06 08:38:36 INFO dstream.SocketReceiver: Connected to localhost:9999
15/07/06 08:38:37 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 127.0.0.1:50018
15/07/06 08:38:37 INFO scheduler.JobScheduler: Added jobs for time 1436143117000 ms
...
15/07/06 08:38:37 INFO scheduler.DAGScheduler: Job 2 finished: print at NetworkWordCount.scala:55, took 0.066998 s
-------------------------------------------
Time: 1436143116000 ms
-------------------------------------------

15/07/06 08:38:37 INFO scheduler.JobScheduler: Finished job streaming job 1436143116000 ms.0 from job set of time 1436143116000 ms
15/07/06 08:38:37 INFO scheduler.JobScheduler: Total delay: 1.204 s for time 1436143116000 ms (execution: 1.118 s)
15/07/06 08:38:40 INFO scheduler.DAGScheduler: Job 10 finished: print at NetworkWordCount.scala:55, took 0.048230 s
-------------------------------------------
Time: 1436143120000 ms
-------------------------------------------
(spark,2)
(hello,4)
(world,2)

15/07/06 08:38:40 INFO scheduler.JobScheduler: Finished job streaming job 1436143120000 ms.0 from job set of time 1436143120000 ms
15/07/06 08:38:40 INFO rdd.ShuffledRDD: Removing RDD 16 from persistence list
15/07/06 08:38:40 INFO scheduler.JobScheduler: Total delay: 0.356 s for time 1436143120000 ms (execution: 0.335 s)
15/07/06 08:38:40 INFO rdd.MapPartitionsRDD: Removing RDD 15 from persistence list
15/07/06 08:38:40 INFO storage.BlockManager: Removing RDD 16

4.在localhost:4040/streaming可以观察streaming的统计信息, 其中在InputRate向上凸出的是产生数据的速度

5.IDEA本地运行

给SparkConf添加.setMaster(), 在Program Arguments中添加localhost 9999 local[2], 然后运行

否则会报错:org.apache.spark.SparkException: A master URL must be set in your configuration

example-code: spark-streamming实时读取socker流,统计过去5秒的wordcount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster(args(2))
val ssc = new StreamingContext(sparkConf, Seconds(5))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

6.打包本地运行

1
➜  spark-1.4.0-bin-hadoop2.6  bin/spark-submit --class com.tongdun.bigdata.spark.intro.NetworkWordCount --jars /Users/zhengqh/IdeaProjects/bigdata/out/artifacts/spark_intro/spark-intro.jar /Users/zhengqh/IdeaProjects/bigdata/out/artifacts/spark_intro/spark-intro.jar localhost 9999 "local[2]"

PS: –jars xx.jar可以省略

7.打包集群运行. 程序的最后一个参数可以指定为local[2]或者spark://dp0652:7077

1
2
3
➜  spark-1.4.0-bin-hadoop2.6  bin/spark-submit --class com.tongdun.bigdata.spark.intro.NetworkWordCount /home/qihuang.zheng/spark-intro.jar localhost 9999 "local[2]"

➜ spark-1.4.0-bin-hadoop2.6 bin/spark-submit --class com.tongdun.bigdata.spark.intro.NetworkWordCount /home/qihuang.zheng/spark-intro.jar localhost 9999 spark://dp0652:7077

8.一般在集群中运行,我们使用–master而不是在代码中设置setMaster(),所以把代码中的setMaster去掉重新打包

先在本地实验, 👌 👉 不指定–master可以成功运行

1
bin/spark-submit --class com.tongdun.bigdata.spark.intro.NetworkWordCount /Users/zhengqh/spark-intro.jar localhost 9999

然后集群实验, 👌 👉 指定–master或不指定都可以成功运行

1
2
3
bin/spark-submit --class com.tongdun.bigdata.spark.intro.NetworkWordCount /home/qihuang.zheng/spark-intro.jar 192.168.6.52 9999

bin/spark-submit --master spark://dp0652:7077 --class com.tongdun.bigdata.spark.intro.NetworkWordCount /home/qihuang.zheng/spark-intro.jar 192.168.6.52 9999

关于本地和集群的运行方式

下面是在本地和集群中运行NetworkWordCount几种方式的结果, ✅表示能正常统计,🙅表示没显示结果.

localhost(没有更改spark-env.sh)

1
2
3
4
5
6
7
✅ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999

🙅 bin/spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --master "local[2]" --jars lib/spark-examples-1.4.0-hadoop2.6.0.jar lib/spark-examples-1.4.0-hadoop2.6.0.jar localhost 9999

🙅 bin/spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --master local\[2\] --jars lib/spark-examples-1.4.0-hadoop2.6.0.jar lib/spark-examples-1.4.0-hadoop2.6.0.jar localhost 9999

✅ bin/spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --master "local[*]" --jars lib/spark-examples-1.4.0-hadoop2.6.0.jar lib/spark-examples-1.4.0-hadoop2.6.0.jar localhost 9999

cluster:

1
2
3
4
5
6
7
8
9
🙅 bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999

✅ bin/spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --master "local[2]" lib/spark-examples-1.4.0-hadoop2.6.0.jar localhost 9999

✅ bin/spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --master local\[2\] lib/spark-examples-1.4.0-hadoop2.6.0.jar localhost 9999

✅ bin/spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --master "local[*]" lib/spark-examples-1.4.0-hadoop2.6.0.jar localhost 9999

🙅 bin/spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --master spark://dp0652:7077 lib/spark-examples-1.4.0-hadoop2.6.0.jar localhost 9999

Kafka-SparkStreamming

如果是本地IDEA运行, 分别启动zookeeper,kafka,然后运行KafkaWordCountProducer,KafkaWordCount.
下面是在集群中的运行步骤:

1.首先在kafka中创建一个topic:

1
2
3
4
5
6
7
8
[qihuang.zheng@dp0656 kafka]$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-spark-test --replication-factor 2 --partitions 2
Created topic "kafka-spark-test".
[qihuang.zheng@dp0656 kafka]$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
kafka-spark-test
[qihuang.zheng@dp0656 kafka]$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe kafka-spark-test
Topic:kafka-spark-test PartitionCount:2 ReplicationFactor:2 Configs:
Topic: kafka-spark-test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: kafka-spark-test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

2.运行kafka生产者模拟程序: 下面模拟了往kafka-spark-test队列中每秒发送2000个消息,每条消息的长度是70个数字.

1
2
3
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCountProducer /home/qihuang.zheng/spark-intro.jar 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 kafka-spark-test 2000 70
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/KafkaProducer
...

这是因为我们打包的方式没有把依赖包放进来. 用依赖包的方式就可以正常地在控制台输出kafka的模拟消息:

1
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCountProducer /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 kafka-spark-test 2000 70

3.运行KafkaWordCount实时流分析程序

1
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCount /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181 my-consumer-group kafka-spark-test 5

报错: 没有找到Kafka-SparkStreamming的相关jar包,因为我们在pom.xml中把spark-streaming-kafka_2.10-1.4.0也设置为了provided

1
2
3
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
at org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:59)
at org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)

当然也可以把spark-streaming-kafka_2.10-1.4.0的scope去掉,然后重新编译. 不过还有一种办法:
下载spark-streaming-kafka_2.10-1.4.0.jar,在spark-submit中添加–jars选项把spark-streaming-kafka_2.10-1.4.0.jar加进来

1
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --master spark://dp0652:7077 --class org.apache.spark.examples.streaming.KafkaWordCount --jars /home/qihuang.zheng/spark-streaming-kafka_2.10-1.4.0.jar /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181 my-consumer-group kafka-spark-test 5

4.KafkaWordCount的输出日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
15/07/07 09:40:11 WARN BlockManager: Block input-0-1436233210800 replicated to only 0 peer(s) instead of 1 peers
15/07/07 09:40:11 WARN BlockManager: Block input-0-1436233211000 replicated to only 0 peer(s) instead of 1 peers
15/07/07 09:40:12 WARN BlockManager: Block input-0-1436233212000 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1436233212000 ms
-------------------------------------------
(4,139354)
(8,140225)
(6,140124)
(0,140515)
(2,139827)
(7,140151)
(5,139425)
(9,140219)
(3,140248)
(1,139912)

5.Streaming-WebUI

因为每秒2000条消息,每隔2秒统计一次,所以每次job的输入大小InputSize大概为2000*2=4000

关于打包

IDEA打包

Project Structures | Artifacts | + | Jar | Empty | 填写jar包名称: spark-intro |
在Available Elements中选择项目名称下的’spark-intro’ compile output | 双击 | 就会到左侧的jar包下
Build | Build Artifacts | 选择刚刚填写的Artifact: spark-intro | Rebuild

Maven打包

因为是maven工程, 所以可以在工程下直接mvn package. 但是注意如果不配置maven的插件.则不会把依赖包打进去.
如果要把依赖包添加进去, 则要添加maven-assembly-plugin插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

注意: 对于spark的相关jar依赖设置scope=provided

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.bin.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.bin.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
...

然后在工程的根目录执行mvn package, 编译成功后会在target下生成2个文件.一个是有依赖的,一个是没有任何依赖的.

1
2
3
➜  spark-intro git:(master) ✗ ll target
-rw-r--r-- 1 zhengqh staff 16M 7 7 09:22 spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar
-rw-r--r-- 1 zhengqh staff 466K 7 7 09:22 spark-intro-1.0-SNAPSHOT.jar

我们把有依赖的jar包拷贝到spark集群中运行

1
➜  target git:(master) ✗ scp spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar qihuang.zheng@192.168.6.52:~/

Kafka-SparkStreamming-Redis

在192.168.6.52上启动redis, 注意要以后台方式启动, 并且用admin用户, 否则shutdown时会报错:

1
2
cd /usr/install/redis-3.0.2
sudo -u admin src/redis-server &

启动kafka生产者模拟程序

1
bin/spark-submit --class com.tongdun.bigdata.spark.intro.KafkaEventProducer /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092

启动用户点击次数实时流统计,最终写到Redis中

1
2
3
bin/spark-submit --master spark://dp0652:7077 --class com.tongdun.bigdata.spark.intro.UserClickCountAnalytics --jars /home/qihuang.zheng/spark-streaming-kafka_2.10-1.4.0.jar /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar spark://dp0652:7077 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092

bin/spark-submit --master spark://dp0652:7077 --class com.tongdun.bigdata.spark.intro.UserClickCountAnalytics2 --jars /home/qihuang.zheng/spark-streaming-kafka_2.10-1.4.0.jar /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar spark://dp0652:7077 user_events 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 192.168.6.52

使用客户端在本机中验证数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
src/redis-cli -h 192.168.6.52
192.168.6.52:6379> select 1
192.168.6.52:6379[1]> HGETALL app::users::click
1) "d7f141563005d1b5d0d3dd30138f3f62"
2) "1891"
3) "97edfc08311c70143401745a03a50706"
4) "1814"
5) "4A4D769EB9679C054DE81B973ED5D768"
6) "1845"
7) "a95f22eabc4fd4b580c011a3161a9d9d"
8) "1835"
9) "8dfeb5aaafc027d89349ac9a20b3930f"
10) "1821"
11) "6b67c8c700427dee7552f81f3228c927"
12) "1842"
13) "011BBF43B89BFBF266C865DF0397AA71"
14) "1819"
15) "f2a8474bf7bd94f0aabbd4cdd2c06dcf"
16) "1909"
17) "c8ee90aade1671a21336c721512b817a"
18) "1843"
19) "068b746ed4620d25e26055a9f804385f"
20) "1955"

假设我们同时启动了两个UserClickCountAnalytics进程, 则第二个会进入等待状态. 只有当第一个进程杀掉后, 第二个进程才会开始运行.

下图是处于等待的进程,除了InputRate有图像,其他三个都没有

可以看到status为queued

在等待进程的那个终端也可以看到如下输出

1
2
3
[Stage 0:>                                                          (0 + 0) / 2]15/07/07 12:05:25 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/07/07 12:05:40 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/07/07 12:05:55 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

第一个正在运行的进程

当我们把正在运行的杀掉, 这时候等待运行的就会接着执行

SparkHA-Streamming

如果线上的SparkStreamming程序一直在运行, 而Master发生了切换, 验证下SparkStreamming还能不能正常运行.
这个实验的基础是在Spark-Streamming这一节的基础上.

还是先启动KafkaProducer[在52上]

1
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCountProducer /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 kafka-spark-test 2000 70

然后启动Spark-Streamming程序[也在52上], 注意–master的值

1
[qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --master spark://dp0652:7077,dp0653:7077 --class org.apache.spark.examples.streaming.KafkaWordCount --jars /home/qihuang.zheng/spark-streaming-kafka_2.10-1.4.0.jar /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181 my-consumer-group kafka-spark-test 5

当前的master是53,我们把53的停掉: sbin/stop-master.sh

可以发现Spark-Streamming的程序并没有受到影响. 也就是说master的变化并不会影响driver程序.
只要你的driver指定–master为HA,而不是单点!

如果Driver程序比如spark-streamming程序挂掉了怎么办(比如要升级应用程序),那么再起一个streamming程序,这个程序会一直被阻塞住
直到旧的程序关掉了, 新的streamming程序就会立马接管过来. 这在http://zqhxuyuan.github.io/2015/06/28/Spark-Streamming/中已经做过了

注意: 不管是dp0652还是dp0653,都无法看到正在运行中的SparkStreamming程序.
SparkStreamming程序类似于Driver, 你在那台机器执行,就再这台机器的4040端口查看.
比如上面的实验,我们在dp0652上运行了Spark-Streamming程序, 所以在http://192.168.6.52:4040/streaming/查看StreamingUI
至于Master的8082端口,则只有主master才可以看到worker, 但是都看不到Running Applications

spark-master

spark-stream


文章目录
  1. 1. NetworkWordCount
  2. 2. Kafka-SparkStreamming
  3. 3. 关于打包
    1. 3.1. IDEA打包
    2. 3.2. Maven打包
  4. 4. Kafka-SparkStreamming-Redis
  5. 5. SparkHA-Streamming