HBase-RDD

HBASE-RDD批量导入RDD数据到HBase中

HBase-RDD

IDE本地运行(√)

  • 本机不需要启动HBase,为了能连接到测试的HBase集群,拷贝测试HBase集群的hbase-site.xml,测试Hadoop集群的core-site.xml, hdfs-site.xml到classpath下.
  • 设置SparkApplication的.setMaster(“local[2]”),本机不需要启动Spark,因为使用的是本地模式

在本地运行时,没有添加Master,会报错需要设置master:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
15/12/18 16:43:06 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:394)
at cn.fraudmetrix.hbase.HBaseRDD$.<init>(HBaseRDD.scala:21)
at cn.fraudmetrix.hbase.HBaseRDD$.<clinit>(HBaseRDD.scala)
at cn.fraudmetrix.hbase.HBaseRDD.main(HBaseRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
15/12/18 16:43:06 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.ExceptionInInitializerError
at cn.fraudmetrix.hbase.HBaseRDD.main(HBaseRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:394)
at cn.fraudmetrix.hbase.HBaseRDD$.<init>(HBaseRDD.scala:21)
at cn.fraudmetrix.hbase.HBaseRDD$.<clinit>(HBaseRDD.scala)
... 6 more

远程Spark-Shell提交运行(√)

应用程序打包时需要把测试环境的hbase-site.xml等删除掉,使用远程hbase集群的配置.

hbase_res

上传依赖包和应用程序jar包到远程机器:

1
2
3
4
5
6
7
8
9
cd hbase-rdd
sbt package
scp target/scala-2.10/hbase-rdd_2.10-0.7.0.jar qihuang.zheng@fort.tongdun.cn:192.168.47.211/admin/tmp

cd tongdun-app
mvn clean package -DskipTests && scp target/spark-app-1.1-SNAPSHOT.jar qihuang.zheng@fort.tongdun.cn:192.168.47.211/admin/tmp

sudo mv /tmp/hbase-rdd_2.10-0.7.0.jar ~/ && sudo chown qihuang.zheng:users hbase-rdd_2.10-0.7.0.jar
sudo mv /tmp/spark-app-1.1-SNAPSHOT.jar ~/ && sudo chown qihuang.zheng:users spark-app-1.1-SNAPSHOT.jar

关于依赖包的打包,CLASSPATH:
在Spark-ES中,因为依赖包比较少,直接加到–jars中. 而HBase的依赖包比较多需要把lib下的都加入到SPARK_CLASSPATH下.
在Spark-Cassandra中,打的包是assembly,所以–jars只需要一个spark-cassandra-connector-assembly-1.4.0-SNAPSHOT.jar.
hbase-rdd依赖的包除了spark,json4s外就是hbase了. 使用sbt package并不会将hbase打成assembly.需要外部自己提供.

通常应用程序代码不会打assembly包,因为一个应用会用到很多组件,比如spark-es, spark-cassandra, hbase-rdd.
而这些组件本身自己也会依赖第三方包.如果应用依赖的组件打成了assembly包,则–jars只要一个assembly包就可以比如spark-cassandra.
如果组件没有打assembly包,则–jars除了这个组件外,还需要添加组件依赖的第三方包,比如spark-es依赖了json4s.
这里的hbase-rdd也一样,没有对hbase-rdd打assembly包, 而他依赖了hbase,所以需要添加hbase到–jars中.

由于spark-shell的–jars不能指定目录,可以在提交任务的节点添加目录到spark-env.sh的SPARK_CLASSPATH.
提交任务的节点并不会作为Spark集群的节点,所以并不需要重启Spark集群.
不幸的是shell都启动不起来.估计hbase的版本和spark的一些包有冲突(比如thrift).

解决依赖包冲突

由于hbase的依赖包比较多,所以考虑直接用文件夹

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#添加到SPARK_CLASSPATH,并不会添加到Driver中
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/qihuang.zheng/hbase-1.0.2/lib/*
#依赖包最好通过--jars手动添加
#export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/qihuang.zheng/json4s-ast_2.10-3.2.11.jar:/home/qihuang.zheng/json4s-core_2.10-3.2.11.jar:/home/qihuang.zheng/json4s-jackson_2.10-3.2.11.jar:/home/qihuang.zheng/json4s-native_2.10-3.2.11.jar
#配置文件在这里不起作用的
#export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/qihuang.zheng/hbase-1.0.2/conf/core-site.xml:/home/qihuang.zheng/hbase-1.0.2/conf/hdfs-site.xml:/home/qihuang.zheng/hbase-1.0.2/conf/hbase-site.xml

#hbase-rdd依赖了hbase的client,common,server.但是hbase还依赖了其他包.
#/home/qihuang.zheng/hbase-1.0.2/lib/hbase-client-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib/hbase-common-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib/hbase-server-1.0.2.jar

#在spark-env.sh中配置了hbase以及依赖包,就不需要添加--jars了. 但是启动出错:
/usr/install/spark-1.5.2-bin-hadoop2.4/bin/spark-shell \
--jars /home/qihuang.zheng/hbase-rdd_2.10-0.7.0.jar,/home/qihuang.zheng/json4s-ast_2.10-3.2.11.jar,/home/qihuang.zheng/json4s-core_2.10-3.2.11.jar,/home/qihuang.zheng/json4s-jackson_2.10-3.2.11.jar,/home/qihuang.zheng/json4s-native_2.10-3.2.11.jar \
--master spark://192.168.47.213:7077 \
--executor-memory 4g --total-executor-cores 30 --driver-memory 8g

但是启动spark-shell就报错,通常是hbase的jar包与系统已有的jar包冲突(比如hive)

  • 拷贝hive的libthrift到hbase/lib下, 删除hbase的libthrift
  • 删除hbase/lib下的netty-3.2.4.Final.jar

上面的spark-shell可以先不用跟上任何–jars,直接使用SPARK_CLASSPATH设置hbase/lib/*,验证能够成功启动.

spark-shell测试读写HBase集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import unicredit.spark.hbase._

implicit val config = HBaseConfig(
"hbase.rootdir" -> "hdfs://tdhdfs/hbase",
"hbase.zookeeper.quorum" -> "192.168.47.83,192.168.47.84,192.168.47.86",
"hbase.master" -> "192.168.47.213:60000",
"hbase.cluster.distributed" -> "true",
"hbase.zookeeper.property.dataDir" -> "/home/qihuang.zheng/data"
)

val table = "data.md5_id"
val cf = "id"
val column = "id"
val families = Set("id")

val r1 = ("T9f4b08fd176e54151708c15aa625ff3", Map(column -> "052900195501010003"))
val r2 = ("T9f4b08fd176e54151708c15aa625ff4", Map(column -> "052900195501010004"))
val rdd: RDD[(String, Map[String, String])] = sc.parallelize(Array(r1, r2))
rdd.toHBase(table, cf)

注意: HBaseConfig必须命名为config, 否则会报错找不到隐式参数config.

1
2
3
<console>:42: error: could not find implicit value for parameter config: unicredit.spark.hbase.HBaseConfig
rdd.toHBase(table, cf)
^

修改HBaseConfig的变量名称为config后, 执行任务时还是报错:

1
15/12/18 14:40:42 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, spark047243): java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/Put

添加hbase的client,common,server包后:

1
15/12/18 14:45:12 WARN scheduler.TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, spark047216): java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/protobuf/generated/MasterProtos$MasterService$BlockingInterface

必须使用hbase全部的jar包,生成逗号分隔的jar包方式(MD,之前用过ls直接拼接,可是一下子又搜不到了):

1
2
3
4
5
#!/bin/sh
for loop in `ls *.jar`;do
result="/home/qihuang.zheng/hbase-1.0.2/lib2/$loop,$result"
done
echo $result

既然手动添加到–jars中,就不再需要往spark-env.sh中添加hbase/lib/*了.

现在把所有的依赖包都加进来了就没有问题了:

1
2
3
4
/usr/install/spark-1.5.2-bin-hadoop2.4/bin/spark-shell \
--master spark://192.168.47.213:7077 \
--executor-memory 6g --total-executor-cores 40 --driver-memory 8g \
--jars /home/qihuang.zheng/hbase-rdd_2.10-0.7.0.jar,/home/qihuang.zheng/json4s-ast_2.10-3.2.11.jar,/home/qihuang.zheng/json4s-core_2.10-3.2.11.jar,/home/qihuang.zheng/json4s-jackson_2.10-3.2.11.jar,/home/qihuang.zheng/json4s-native_2.10-3.2.11.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/zookeeper-3.4.6.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/xz-1.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/xmlenc-0.52.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/snappy-java-1.0.4.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/protobuf-java-2.5.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/paranamer-2.3.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/netty-all-4.0.23.Final.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/metrics-core-2.2.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/log4j-1.2.17.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/libthrift-0.9.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/leveldbjni-all-1.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jsr305-1.3.9.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jsch-0.1.42.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/joni-2.1.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jettison-1.3.3.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jets3t-0.9.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jcodings-1.0.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jaxb-impl-2.2.3-1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jaxb-api-2.2.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/java-xmlbuilder-0.4.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/javax.inject-1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jasper-runtime-5.5.23.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jasper-compiler-5.5.23.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jamon-runtime-2.3.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jackson-xc-1.8.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jackson-mapper-asl-1.8.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jackson-jaxrs-1.8.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jackson-core-asl-1.8.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/httpcore-4.1.3.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/httpclient-4.2.5.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/htrace-core-3.1.0-incubating.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-thrift-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-testing-util-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-shell-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-server-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-rest-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-resource-bundle-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-protocol-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-prefix-tree-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-it-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-hadoop-compat-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-hadoop2-compat-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-common-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-client-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-checkstyle-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-annotations-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/guice-servlet-3.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/guice-3.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/guava-12.0.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/findbugs-annotations-1.3.9-1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/disruptor-3.3.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-net-3.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-math3-3.1.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-math-2.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-logging-1.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-lang-2.6.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-io-2.4.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-httpclient-3.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-el-1.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-digester-1.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-daemon-1.0.13.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-configuration-1.6.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-compress-1.4.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-collections-3.2.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-codec-1.9.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-cli-1.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-beanutils-core-1.8.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-beanutils-1.7.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/avro-1.7.4.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/asm-3.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/api-util-1.0.0-M20.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/api-asn1-api-1.0.0-M20.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/apacheds-kerberos-codec-2.0.0-M15.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/apacheds-i18n-2.0.0-M15.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/aopalliance-1.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/activation-1.1.jar

验证记录写入到了HBase中:

1
2
3
4
5
hbase(main):002:0> scan "data.md5_id"
ROW COLUMN+CELL
T9f4b08fd176e54151708c15aa625ff3 column=id:id, timestamp=1450425601427, value=052900195501010003
T9f4b08fd176e54151708c15aa625ff4 column=id:id, timestamp=1450425601427, value=052900195501010004
2 row(s) in 0.1110 seconds

远程Spark-Submit提交执行(×)

无法连接到HBase的ZK集群

上面的spark-shell的HBaseConfig使用的是直接在代码中写死. 通常配置信息是由hbase-site.xml提供.
如果应用程序没法找到hbase-site.xml, 会默认连接本地的ZK.显然应该连接的是hbase-site.xml中配置的ZK集群.
尝试了把hbase-site.xml加入到SPARK_CLASSPATH中也不行.
虽然打包的应用程序中也有hbase-site.xml,但是貌似应用程序没有读到!

解决方式: 利用Hadoop Configuration的添加资源方法手动添加hbase-site.xml

1
2
3
4
5
6
7
8
9
10
11
//1.配置文件放到classpath下(本地可以直接运行)
val sparkConf = new SparkConf().setAppName("HBaseTest")//.setMaster(Constant.localMaster)
val sc = new SparkContext(sparkConf)

//确保hbase-site.xml在classpath下
//implicit val config = HBaseConfig()

//2.手动添加资源(否则远程执行时无法找到配置文件,而且hbase-site.xml也是在classpath下)
val hadoopConf : Configuration = new Configuration()
hadoopConf.addResource("hbase-site.xml")
implicit val config = HBaseConfig(hadoopConf)

运行任务

和spark-shell一样,必须要添加所有的hbase依赖包到--jars中. 否则在运行时会报错找不到HBase的相关类.
--jars指定的会被加入到Driver中. 但是SPARK_CLASSPATH配置的hbase/lib/*下的jar包并没有被加到Driver中.
而Driver将应用分发给Worker节点执行时, 由于Worker没有HBase的相关jar包,导致任务出错.

1
2
3
4
5
6
7
8
9
10
11
15/12/18 13:26:11 INFO spark.SparkContext: Added JAR file:/home/qihuang.zheng/hbase-rdd_2.10-0.7.0.jar at http://192.168.47.211:43695/jars/hbase-rdd_2.10-0.7.0.jar with timestamp 1450416371356
15/12/18 13:26:11 INFO spark.SparkContext: Added JAR file:/home/qihuang.zheng/json4s-ast_2.10-3.2.11.jar at http://192.168.47.211:43695/jars/json4s-ast_2.10-3.2.11.jar with timestamp 1450416371359
15/12/18 13:26:11 INFO spark.SparkContext: Added JAR file:/home/qihuang.zheng/json4s-core_2.10-3.2.11.jar at http://192.168.47.211:43695/jars/json4s-core_2.10-3.2.11.jar with timestamp 1450416371362
15/12/18 13:26:11 INFO spark.SparkContext: Added JAR file:/home/qihuang.zheng/json4s-jackson_2.10-3.2.11.jar at http://192.168.47.211:43695/jars/json4s-jackson_2.10-3.2.11.jar with timestamp 1450416371362
15/12/18 13:26:11 INFO spark.SparkContext: Added JAR file:/home/qihuang.zheng/json4s-native_2.10-3.2.11.jar at http://192.168.47.211:43695/jars/json4s-native_2.10-3.2.11.jar with timestamp 1450416371363
15/12/18 13:26:11 INFO spark.SparkContext: Added JAR file:/home/qihuang.zheng/spark-app-1.1-SNAPSHOT.jar at http://192.168.47.211:43695/jars/spark-app-1.1-SNAPSHOT.jar with timestamp 1450416371370

15/12/18 14:17:09 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/install/spark/lib/mysql-connector-java-5.1.34.jar:/home/qihuang.zheng/hbase-1.0.2/lib/*' as a work-around

15/12/18 13:26:18 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 0.0 (TID 6, spark047217): java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/Table
15/12/18 13:26:18 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 0.0 (TID 11, spark047218): java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/RegionLocator

spark-submit提交任务:

1
2
3
4
/usr/install/spark-1.5.2-bin-hadoop2.4/bin/spark-submit --master spark://192.168.47.213:7077,192.168.47.214:7077 \
--executor-memory 6g --total-executor-cores 40 --driver-memory 8g \
--jars /home/qihuang.zheng/hbase-rdd_2.10-0.7.0.jar,/home/qihuang.zheng/json4s-ast_2.10-3.2.11.jar,/home/qihuang.zheng/json4s-core_2.10-3.2.11.jar,/home/qihuang.zheng/json4s-jackson_2.10-3.2.11.jar,/home/qihuang.zheng/json4s-native_2.10-3.2.11.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/zookeeper-3.4.6.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/xz-1.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/xmlenc-0.52.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/snappy-java-1.0.4.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/protobuf-java-2.5.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/paranamer-2.3.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/netty-all-4.0.23.Final.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/metrics-core-2.2.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/log4j-1.2.17.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/libthrift-0.9.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/leveldbjni-all-1.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jsr305-1.3.9.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jsch-0.1.42.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/joni-2.1.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jettison-1.3.3.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jets3t-0.9.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jcodings-1.0.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jaxb-impl-2.2.3-1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jaxb-api-2.2.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/java-xmlbuilder-0.4.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/javax.inject-1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jasper-runtime-5.5.23.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jasper-compiler-5.5.23.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jamon-runtime-2.3.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jackson-xc-1.8.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jackson-mapper-asl-1.8.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jackson-jaxrs-1.8.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/jackson-core-asl-1.8.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/httpcore-4.1.3.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/httpclient-4.2.5.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/htrace-core-3.1.0-incubating.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-thrift-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-testing-util-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-shell-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-server-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-rest-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-resource-bundle-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-protocol-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-prefix-tree-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-it-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-hadoop-compat-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-hadoop2-compat-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-common-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-client-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-checkstyle-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/hbase-annotations-1.0.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/guice-servlet-3.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/guice-3.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/guava-12.0.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/findbugs-annotations-1.3.9-1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/disruptor-3.3.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-net-3.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-math3-3.1.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-math-2.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-logging-1.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-lang-2.6.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-io-2.4.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-httpclient-3.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-el-1.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-digester-1.8.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-daemon-1.0.13.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-configuration-1.6.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-compress-1.4.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-collections-3.2.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-codec-1.9.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-cli-1.2.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-beanutils-core-1.8.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/commons-beanutils-1.7.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/avro-1.7.4.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/asm-3.1.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/api-util-1.0.0-M20.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/api-asn1-api-1.0.0-M20.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/apacheds-kerberos-codec-2.0.0-M15.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/apacheds-i18n-2.0.0-M15.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/aopalliance-1.0.jar,/home/qihuang.zheng/hbase-1.0.2/lib2/activation-1.1.jar \
--class cn.fraudmetrix.hbase.HBaseRDD spark-app-1.1-SNAPSHOT.jar /user/tongdun/id_mdf_tmp1/1

明明指定了–master,为什么还会报错A master URL must be set in your configuration. ES2HDFS中也遇到这个错误.

1
2
3
4
5
6
15/12/18 16:20:39 WARN scheduler.TaskSetManager: Lost task 20.0 in stage 0.0 (TID 30, spark047244): java.lang.ExceptionInInitializerError
at cn.fraudmetrix.hbase.HBaseRDD$$anonfun$1.apply(HBaseRDD.scala:85)
at cn.fraudmetrix.hbase.HBaseRDD$$anonfun$1.apply(HBaseRDD.scala:83)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
..
15/12/18 16:20:39 WARN scheduler.TaskSetManager: Lost task 10.0 in stage 0.0 (TID 11, spark047244): java.lang.NoClassDefFoundError: Could not initialize class cn.fraudmetrix.hbase.HBaseRDD$

这个错误和在本地运行时没有指定master的错误是一样的. 难道要在代码中写死远程Spark集群的master?

代码中手动指定master后,出现了一个更离奇的现象,虽然启动一个任务,但是后台突然间又会启动多个相同的任务.

master2

这个问题在ES2HDFS时也碰到过.而且同样会以admin用户开启了多个任务.

而且后台还报错:

1
2
3
4
5
6
7
8
9
10
15/12/18 17:21:49 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on spark047223:39784 (size: 2.7 KB, free: 1589.8 MB)
15/12/18 17:21:49 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on spark047245:48421 (size: 2.7 KB, free: 1589.8 MB)
15/12/18 17:21:50 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on spark047223:58559 (size: 18.4 KB, free: 1589.7 MB)
15/12/18 17:21:50 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on spark047223:39784 (size: 18.4 KB, free: 1589.7 MB)
15/12/18 17:21:51 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on spark047245:48421 (size: 18.4 KB, free: 1589.7 MB)
15/12/18 17:21:55 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 0.0 (TID 40, spark047216, NODE_LOCAL, 7511 bytes)
15/12/18 17:21:55 INFO scheduler.TaskSetManager: Starting task 41.0 in stage 0.0 (TID 41, spark047216, NODE_LOCAL, 7511 bytes)
15/12/18 17:21:55 INFO scheduler.TaskSetManager: Finished task 23.0 in stage 0.0 (TID 21) in 20735 ms on spark047216 (1/209)
15/12/18 17:21:55 INFO scheduler.TaskSetManager: Starting task 48.0 in stage 0.0 (TID 42, spark047216, NODE_LOCAL, 7511 bytes)
15/12/18 17:21:55 WARN scheduler.TaskSetManager: Lost task 41.0 in stage 0.0 (TID 41, spark047216): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1

如果在代码中去掉指定master, 就只会有一个任务!

master1

sbt assembly fat jar

实际上可以使用maven工程的mvn assembly plugin同样可以打fat jar(而且也可以是scala代码,比如tongdun-app).

https://github.com/unicredit/hbase-rdd-examples是hbase-rdd的示例工程. 可以将业务代码写在这个工程里.
设置spark为provided, 其他依赖包都会被打进assembly中. 这样就避免了上面的传入很长一串hbase依赖包的问题.
设置hbase-rdd打进assembly,这样连hbase-rdd.jar都不需要了.直接一个包含了hbase和hbase-rdd的fat jar就可以.

1
2
3
4
5
6
7
8
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.5.0" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.5.0" % "provided",
"org.apache.hbase" % "hbase-common" % "1.0.2",
"org.apache.hbase" % "hbase-client" % "1.0.2",
"org.apache.hbase" % "hbase-server" % "1.0.2",
"eu.unicredit" %% "hbase-rdd" % "0.7.0"
)

打包出错,两个jar包的配置文件有冲突:

1
2
3
4
5
➜  hbase-rdd-examples git:(master) ✗ sbt clean assembly
[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /Users/zhengqh/.ivy2/cache/commons-logging/commons-logging/jars/commons-logging-1.2.jar:META-INF/maven/commons-logging/commons-logging/pom.properties
[error] /Users/zhengqh/.ivy2/cache/org.apache.htrace/htrace-core/jars/htrace-core-3.1.0-incubating.jar:META-INF/maven/commons-logging/commons-logging/pom.properties
[error] Total time: 295 s, completed 2015-12-18 20:43:29

http://blog.csdn.net/oopsoom/article/details/41318599
https://github.com/luohuazju/sillycat-graph/blob/master/project/Build.scala

参考了第二篇,添加了很复杂的mergeStrategy合并策略. 实际中冲突的会有pom.properties,pom.xml,servlet,jsp等.最后成功编译.

1
2
3
4
[info] SHA-1: d5f9eda520e927bd52d2f9d5f9b29d3e117a3f4f
[info] Packaging /Users/zhengqh/Github/_example/hbase-rdd-examples/target/scala-2.10/hbase-rdd-examples-assembly-0.7.0.jar ...
[info] Done packaging.
[success] Total time: 259 s, completed 2015-12-18 21:24:54

Spark手动操作HBase

在每个Partition中创建HBase连接,最后关闭HTable,看起来没有问题。但是注意:这里没有关闭Connection和Admin,后面会看到这是有问题的。

你可能会问什么不把HBaseConfiguration的创建放在外面,这是因为会存在序列化的异常,需要特殊处理才可以使用。

1
2
3
4
5
6
7
8
9
10
df.foreachPartition(partition=>{
val config = HBaseConfiguration.create()
if(!zk.equals("")) config.set("hbase.zookeeper.quorum", zk)
config.set("hbase.zookeeper.property.clientPort", zkPort)
val conn = ConnectionFactory.createConnection(config)
val admin = conn.getAdmin
val userTable = TableName.valueOf(tableName)
val myTable = conn.getTable(userTable)
myTable.close()
})

项目的基本流程是:SparkStreaming读取Kafka数据,写入HBase,HBase的逻辑如上。但是经过一段时间后,消息堆积报警,任务不消费,停在了某个时间段。

排查步骤:去mesos页面找到这个任务,这里我们的Kafka有3个分区,流程序设置了两个Executor,即两台机器如何分配3个分区的任务。
那么其中一台负责一个分区,另外一台负责两个分区。但并不是说Task只有两个。真正的Spark Task数量还是3个。
验证方式是df.foreachPartition在一次streaming batch中会输出三条信息(在一台机器上输出一条,另外一台机器会输出两条)

1

然后去对应的机器执行jstack查看能否发现可疑的线程

2

发现ZooKeeper的很多线程都在RUNNABLE状态下。正常来说,我们建立ZooKeeper的连接只有每个Partition才有的。
但是我们的Kafka只有3个分区,也就是说最多只会有3个ZK的连接(如果三个任务都分配到一台机器的话,实际上3个任务分两台机器执行,所以不会超过2个ZK连接)
那么是不是ZK的连接没有释放呢!!!

3

为了验证我们的结论,重启流程序,然后观察每次Batch执行完成后的ZK连接。下面的高亮点是重启的时刻,可以看到在这之前,流程序没有消费,停在了1:50分

6

下面以最近一次的输出为例,可以看到以前的ZK连接还在,并且多了两个连接

4

如果去另外一台机器上看,会发现多了一个连接

5

至此,可以验证SparkStreaming在每次Batch处理完成后,并没有释放掉ZK的连接。导致在运行一段时间后,
ZK的连接数会一直上升,比如到200~300的时候,程序就hang住不消费了。这是因为ZK的连接数太多了。

换成官方的hbase写,连接ZK交给内部处理,多次运行netstat,会发现连接ZK的端口号都没有变化。
下面的实验换了一个流处理程序,不过对我们的观察结果没有影响。

5

这里还会发现不管streaming的batch多少次,连接ZK的端口号一直没有变化,它没有释放的过程,
因为这个连接是全局的,在流处理程序开始的时候就创建了ZK连接,在这之后,所以的batch都使用同一个ZK连接。

总结下排查步骤:

  1. 根据任务找到执行的机器
  2. 找到进程执行jstack
  3. 观察jstack中的RUNNABLE线程
  4. 找出异常或有规律的线程,比如很多ZK都在运行
  5. 看代码中相关的处理逻辑,比如ZK创建和连接是怎么做的
  6. 如果是连接问题,利用工具netstat查看ZK的端口
  7. 重启程序,观察每次运行的结果,并找出本次结果与上一次结果的对比和差异
  8. 比如ZK连接没有释放的话,每次运行完之后,旧的连接还在
  9. 那么基本可以断定出来ZK连接没有释放造成ZK连接数最终耗尽,导致问题出现

修复:

关闭HBase连接仅仅是关闭HTable是不够的,还要关闭Connection和Admin。
这就好比数据库中关闭ResultSet是不够的,也要关闭Statement和Connection。


文章目录
  1. 1. HBase-RDD
    1. 1.1. IDE本地运行(√)
    2. 1.2. 远程Spark-Shell提交运行(√)
      1. 1.2.1. 解决依赖包冲突
      2. 1.2.2. spark-shell测试读写HBase集群
    3. 1.3. 远程Spark-Submit提交执行(×)
      1. 1.3.1. 无法连接到HBase的ZK集群
      2. 1.3.2. 运行任务
    4. 1.4. sbt assembly fat jar
  2. 2. Spark手动操作HBase