Apache Spark Streamming小白入门教程
NetworkWordCount
1.本机运行时,修改spark-env.sh(这一步不是必须的)
1 | export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home |
2.开启netcat数据服务器
1 | ➜ ~ nc -lk 9999 |
3.运行spark-streamming示例,当在nc终端输入text时,spark-streamming会实时统计过去一秒的wordcount
1 | ➜ spark-1.4.0-bin-hadoop2.6 bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 |
4.在localhost:4040/streaming可以观察streaming的统计信息, 其中在InputRate向上凸出的是产生数据的速度
5.IDEA本地运行
给SparkConf添加.setMaster(), 在Program Arguments
中添加localhost 9999 local[2]
, 然后运行
否则会报错:org.apache.spark.SparkException: A master URL must be set in your configuration
example-code: spark-streamming实时读取socker流,统计过去5秒的wordcount
1 | // Create the context with a 1 second batch size |
6.打包本地运行
1 | ➜ spark-1.4.0-bin-hadoop2.6 bin/spark-submit --class com.tongdun.bigdata.spark.intro.NetworkWordCount --jars /Users/zhengqh/IdeaProjects/bigdata/out/artifacts/spark_intro/spark-intro.jar /Users/zhengqh/IdeaProjects/bigdata/out/artifacts/spark_intro/spark-intro.jar localhost 9999 "local[2]" |
PS: –jars xx.jar可以省略
7.打包集群运行. 程序的最后一个参数可以指定为local[2]或者spark://dp0652:7077
1 | ➜ spark-1.4.0-bin-hadoop2.6 bin/spark-submit --class com.tongdun.bigdata.spark.intro.NetworkWordCount /home/qihuang.zheng/spark-intro.jar localhost 9999 "local[2]" |
8.一般在集群中运行,我们使用–master而不是在代码中设置setMaster(),所以把代码中的setMaster去掉重新打包
先在本地实验, 👌 👉 不指定–master可以成功运行
1 | bin/spark-submit --class com.tongdun.bigdata.spark.intro.NetworkWordCount /Users/zhengqh/spark-intro.jar localhost 9999 |
然后集群实验, 👌 👉 指定–master或不指定都可以成功运行
1 | bin/spark-submit --class com.tongdun.bigdata.spark.intro.NetworkWordCount /home/qihuang.zheng/spark-intro.jar 192.168.6.52 9999 |
关于本地和集群的运行方式
下面是在本地和集群中运行NetworkWordCount几种方式的结果, ✅表示能正常统计,🙅表示没显示结果.
localhost(没有更改spark-env.sh)
1 | ✅ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 |
cluster:
1 | 🙅 bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 |
Kafka-SparkStreamming
如果是本地IDEA运行, 分别启动zookeeper,kafka,然后运行KafkaWordCountProducer,KafkaWordCount.
下面是在集群中的运行步骤:
1.首先在kafka中创建一个topic:
1 | [qihuang.zheng@dp0656 kafka]$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-spark-test --replication-factor 2 --partitions 2 |
2.运行kafka生产者模拟程序: 下面模拟了往kafka-spark-test队列中每秒发送2000个消息,每条消息的长度是70个数字.
1 | [qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCountProducer /home/qihuang.zheng/spark-intro.jar 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 kafka-spark-test 2000 70 |
这是因为我们打包的方式没有把依赖包放进来. 用依赖包的方式就可以正常地在控制台输出kafka的模拟消息:
1 | [qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCountProducer /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 kafka-spark-test 2000 70 |
3.运行KafkaWordCount实时流分析程序
1 | [qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCount /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181 my-consumer-group kafka-spark-test 5 |
报错: 没有找到Kafka-SparkStreamming的相关jar包,因为我们在pom.xml中把spark-streaming-kafka_2.10-1.4.0
也设置为了provided
1 | Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ |
当然也可以把spark-streaming-kafka_2.10-1.4.0
的scope去掉,然后重新编译. 不过还有一种办法:
下载spark-streaming-kafka_2.10-1.4.0.jar
,在spark-submit中添加–jars选项把spark-streaming-kafka_2.10-1.4.0.jar
加进来
1 | [qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --master spark://dp0652:7077 --class org.apache.spark.examples.streaming.KafkaWordCount --jars /home/qihuang.zheng/spark-streaming-kafka_2.10-1.4.0.jar /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181 my-consumer-group kafka-spark-test 5 |
4.KafkaWordCount的输出日志:
1 | 15/07/07 09:40:11 WARN BlockManager: Block input-0-1436233210800 replicated to only 0 peer(s) instead of 1 peers |
5.Streaming-WebUI
因为每秒2000条消息,每隔2秒统计一次,所以每次job的输入大小InputSize大概为2000*2=4000
关于打包
IDEA打包
Project Structures | Artifacts | + | Jar | Empty | 填写jar包名称: spark-intro |
在Available Elements中选择项目名称下的’spark-intro’ compile output | 双击 | 就会到左侧的jar包下
Build | Build Artifacts | 选择刚刚填写的Artifact: spark-intro | Rebuild
Maven打包
因为是maven工程, 所以可以在工程下直接mvn package. 但是注意如果不配置maven的插件.则不会把依赖包打进去.
如果要把依赖包添加进去, 则要添加maven-assembly-plugin
插件:
1 | <plugin> |
注意: 对于spark的相关jar依赖设置scope=provided
1 | <dependency> |
然后在工程的根目录执行mvn package
, 编译成功后会在target下生成2个文件.一个是有依赖的,一个是没有任何依赖的.
1 | ➜ spark-intro git:(master) ✗ ll target |
我们把有依赖的jar包拷贝到spark集群中运行
1 | ➜ target git:(master) ✗ scp spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar qihuang.zheng@192.168.6.52:~/ |
Kafka-SparkStreamming-Redis
在192.168.6.52上启动redis, 注意要以后台方式启动, 并且用admin用户, 否则shutdown时会报错:
1 | cd /usr/install/redis-3.0.2 |
启动kafka生产者模拟程序
1 | bin/spark-submit --class com.tongdun.bigdata.spark.intro.KafkaEventProducer /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 |
启动用户点击次数实时流统计,最终写到Redis中
1 | bin/spark-submit --master spark://dp0652:7077 --class com.tongdun.bigdata.spark.intro.UserClickCountAnalytics --jars /home/qihuang.zheng/spark-streaming-kafka_2.10-1.4.0.jar /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar spark://dp0652:7077 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 |
使用客户端在本机中验证数据:
1 | src/redis-cli -h 192.168.6.52 |
假设我们同时启动了两个UserClickCountAnalytics进程, 则第二个会进入等待状态. 只有当第一个进程杀掉后, 第二个进程才会开始运行.
下图是处于等待的进程,除了InputRate有图像,其他三个都没有
可以看到status为queued
在等待进程的那个终端也可以看到如下输出
1 | [Stage 0:> (0 + 0) / 2]15/07/07 12:05:25 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources |
第一个正在运行的进程
当我们把正在运行的杀掉, 这时候等待运行的就会接着执行
SparkHA-Streamming
如果线上的SparkStreamming程序一直在运行, 而Master发生了切换, 验证下SparkStreamming还能不能正常运行.
这个实验的基础是在Spark-Streamming这一节的基础上.
还是先启动KafkaProducer[在52上]
1 | [qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCountProducer /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 kafka-spark-test 2000 70 |
然后启动Spark-Streamming程序[也在52上], 注意–master的值
1 | [qihuang.zheng@dp0652 spark-1.4.0-bin-hadoop2.6]$ bin/spark-submit --master spark://dp0652:7077,dp0653:7077 --class org.apache.spark.examples.streaming.KafkaWordCount --jars /home/qihuang.zheng/spark-streaming-kafka_2.10-1.4.0.jar /home/qihuang.zheng/spark-intro-1.0-SNAPSHOT-jar-with-dependencies.jar 192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181 my-consumer-group kafka-spark-test 5 |
当前的master是53,我们把53的停掉: sbin/stop-master.sh
可以发现Spark-Streamming的程序并没有受到影响. 也就是说master的变化并不会影响driver程序.
只要你的driver指定–master为HA,而不是单点!
如果Driver程序比如spark-streamming程序挂掉了怎么办(比如要升级应用程序),那么再起一个streamming程序,这个程序会一直被阻塞住
直到旧的程序关掉了, 新的streamming程序就会立马接管过来. 这在http://zqhxuyuan.github.io/2015/06/28/Spark-Streamming/中已经做过了
注意: 不管是dp0652还是dp0653,都无法看到正在运行中的SparkStreamming程序.
SparkStreamming程序类似于Driver, 你在那台机器执行,就再这台机器的4040端口查看.
比如上面的实验,我们在dp0652上运行了Spark-Streamming程序, 所以在http://192.168.6.52:4040/streaming/查看StreamingUI
至于Master的8082端口,则只有主master才可以看到worker, 但是都看不到Running Applications