HBase BulkLoad

HBase的BulkLoad分成了两个过程: MR生成HFile, 导入HFile到HBase集群

HBase BulkLoad

HBase & MapReduce

1
2
[qihuang.zheng@spark047213 ~]$ hadoop jar hbase-1.0.2/lib/hbase-server-1.0.2.jar
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/filter/Filter

运行MapReduce作业时,需要将HBase的依赖包加入到HADOOP_CLASSPATH中: http://hbase.apache.org/book.html#hbase.mapreduce.classpath

1
2
3
4
5
6
7
8
9
10
11
12
13
14
export HBASE_HOME=/home/qihuang.zheng/hbase-1.0.2
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` hadoop jar ${HBASE_HOME}/lib/hbase-server-1.0.2.jar rowcounter data.md5_id

[qihuang.zheng@spark047213 ~]$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` hadoop jar ${HBASE_HOME}/lib/hbase-server-1.0.2.jar
An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in HBase table
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
verifyrep: Compare the data from tables in two different clusters.

MR作业生成HFile

You must specify exactly one column to be the row key, and you must specify a column name for every column that exists in the input data.
importtsv命令导入HDFS上的文件需要指定唯一的row-key,并且输入数据的每一列都需要指定列名. 所以后两列不想存储都不行.

In order to function efficiently, HFileOutputFormat2 must be configured such that each output HFile fits within a single region. In order to do this, jobs whose output will be bulk loaded into HBase use Hadoop’s TotalOrderPartitioner class to partition the map output into disjoint ranges of the key space, corresponding to the key ranges of the regions in the table.

确保每个输出的HFile文件都只存在于唯一的Region中.这样在导入到HBase集群中, 只要移动HFile文件即可.
如果一个HFile文件存在于不止一个Region中,则导入之后,这个Region还会进行Split.
而我们的目的是移动HFile文件后立即可用且没有额外的工作. 同时因为Region是排序的,输出也必须是有序的.
导入到HBase集群的输出文件会使用Hadoop的TotalOrderPartitioner将map输出进行分区:分解成不同的范围
这个范围也对应了HBase建表时候创建的多个Region的KeyRange.

MR输出的每个HFile文件正好对应了HBase每个Region的KeyRange.这样文件只要直接拷贝就可以使用.

在建表的时候预先分配Split: http://grokbase.com/t/hbase/user/145bkvaq0x/how-to-pre-split-a-table-whose-row-key-is-md5-url

1
2
3
4
create 'data.md5_mob2', 'mob', {NUMREGIONS => 16, SPLITALGO => 'HexStringSplit'}
create 'data.md5_id2', 'id', {NUMREGIONS => 16, SPLITALGO => 'HexStringSplit'}
create 'data.md5_id2', 'id', {SPLITS_FILE => 'split32.txt'}
create 'data.md5_id4', 'id', {SPLITS_FILE => 'split32.txt', REGION_REPLICATION => 1, CONFIGURATION => {'hbase.regionserver.region.split.policy' => 'KeyPrefixRegionSplitPolicy', 'prefix_split_key_policy.prefix_length' => '3'}}

假设按照3位(0-f),则分区数=16^3=4096,在建表时会有问题. 于是改成了16168=2048个Region(实际上会多一个Region的).

sh hfile.sh id_mdf_tmp1 id_hbase1

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/bin/sh
#Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>
input=$1
output=$2
HADOOP_CLASSPATH=`hbase classpath` hadoop jar ${HBASE_HOME}/lib/hbase-server-1.0.2.jar importtsv \
-Dimporttsv.columns=id:id,HBASE_ROW_KEY,id:c1,id:c2 \
-Dimporttsv.separator=, \
-Dimporttsv.bulk.output=/user/tongdun/$output \
-Dcreate.table=no \
-Dno.strict=true \
-Dmapreduce.map.speculative=false \
-Dmapreduce.reduce.speculative=false \
data.md5_id2 /user/tongdun/$input

如果只是KeyValue数据,可以不设置Column.只有ColumnFamily. 但是由于源数据包括了四个字段,在表中都必须对应字段.
如果没有指定importtsv.bulk.output,则直接写入到HBase集群.指定的话先在HDFS上生成HFile文件.这时并不会写数据到HBase中.

MR任务内存不足

由于运行MapReduce的资源有限,如果读取太大的文件,会造成内存不足.但是仅仅是一个26G文件,也会有Container内存不足.更何况还有100多G的文件呢!

1
2
3
4
5
6
7
8
Container [pid=48778,containerID=container_1449135806348_0034_01_000162] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 4.7 GB of 8.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_1449135806348_0034_01_000162 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 48795 48778 48778 48778 (java) 3961 5369 5007175680 1054447 /usr/install/java/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx4096m -Djava.io.tmpdir=/home/admin/hadoop/data/yarn-tmp/nmdir/usercache/qihuang.zheng/appcache/application_1449135806348_0034/container_1449135806348_0034_01_000162/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/home/admin/output/hadoop/logs/application_1449135806348_0034/container_1449135806348_0034_01_000162 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA org.apache.hadoop.mapred.YarnChild 192.168.47.242 40792 attempt_1449135806348_0034_r_000015_0 162
|- 48778 11861 48778 48778 (bash) 0 1 9424896 308 /bin/bash -c /usr/install/java/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx4096m -Djava.io.tmpdir=/home/admin/hadoop/data/yarn-tmp/nmdir/usercache/qihuang.zheng/appcache/application_1449135806348_0034/container_1449135806348_0034_01_000162/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/home/admin/output/hadoop/logs/application_1449135806348_0034/container_1449135806348_0034_01_000162 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA org.apache.hadoop.mapred.YarnChild 192.168.47.242 40792 attempt_1449135806348_0034_r_000015_0 162 1>/home/admin/output/hadoop/logs/application_1449135806348_0034/container_1449135806348_0034_01_000162/stdout 2>/home/admin/output/hadoop/logs/application_1449135806348_0034/container_1449135806348_0034_01_000162/stderr

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

参考: http://stackoverflow.com/questions/21005643/container-is-running-beyond-memory-limits

YARN的ApplicationMaster也占用了NM中的一个Container. 在刚刚启动AM时,还没有启动MR任务时:

mr_am

添加参数,修改map/reduce的内存为5G.

1
2
-Dmapreduce.map.memory.mb=5120 -Dmapreduce.map.java.opts=-Xmx4096m \
-Dmapreduce.reduce.memory.mb=5120 -Dmapreduce.reduce.java.opts=-Xmx4096m \

mr-5g

MR任务内存调优

由于可用内存每个节点只有24G. 一共9个NM. 24G的内存分成5G,一个节点最多启动4个Container,最后剩余4G. 这4个G可以分给Reduce

26G文件, 209个MapTasks, 16个Reduces. MapNumer=26*1024/128(BlockSize)=209.
Reduce的数量是根据HBase中的Region数量是固定的:因为使用了NUMREGIONS => 16, SPLITALGO => 'HexStringSplit'

这样9个节点,16个ReduceTask,每个节点运行了2个Reduce,剩余的4G可以给每个Reduce增加2G.

1
2
-Dmapreduce.map.memory.mb=5120 -Dmapreduce.map.java.opts=-Xmx4096m \
-Dmapreduce.reduce.memory.mb=7189 -Dmapreduce.reduce.java.opts=-Xmx6144m \

127G,MapTasks=127*8=1016.

mr_127g

在还没有运行Reduce任务时,由于每个Map的内存最多5G,所以每个节点剩余的内存也都是4G.
一旦Reduce任务运行起来,一个Reduce任务7G. 原先一个节点4个Map,当一个Map完成后,释放5G空间,剩余9G.
此时启动了一个Reduce,则剩余空间是9-7=2G. 但是下图看到剩余的却是1G. 什么东西多占用了1G?

mr_reduce

Map任务是对输入文件按照128M进行划分.而Reduce固定16个.在大文件的情况下,每个Reduce的数据量也很大.内存能够撑得住?

生成HFile文件

原始文件27G. Reduce数量16个,生成的16个文件都有5.5G.可以看到文件名实际上已经能确定Region了.
26G文件生成HFile花了20min. 6T需要6*1024*20/26/60=3Day. 看起来比脚本处理快多了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[qihuang.zheng@spark047213 ~]$ hadoop fs -du -h /user/tongdun/id_hbase1_1/id
5.5 G /user/tongdun/id_hbase1_1/id/00acc7bdb9444ede81bd938c772e420f
5.5 G /user/tongdun/id_hbase1_1/id/0671fcdbd0884eecaed2876060a203fe
5.5 G /user/tongdun/id_hbase1_1/id/069cafcd20d947ec80fc2063415886ca
5.5 G /user/tongdun/id_hbase1_1/id/12144090330941c3a6236fa899c3b8f5
5.5 G /user/tongdun/id_hbase1_1/id/3870800164104b92a64db7f80757d750
5.5 G /user/tongdun/id_hbase1_1/id/41d52b83667a49f8ba4ea68336c2c66c
5.5 G /user/tongdun/id_hbase1_1/id/4de99c7e03804ed085bf63c1c828d700
5.5 G /user/tongdun/id_hbase1_1/id/541c7dfa1fa74b139d93a5ef10049934
5.5 G /user/tongdun/id_hbase1_1/id/5a6819f2d0864cde8450c1d5676bab07
5.5 G /user/tongdun/id_hbase1_1/id/5f8fe4436d6443e18c2e120d2f5d07f0
5.5 G /user/tongdun/id_hbase1_1/id/77f018a00d3543abb54de65bba26419d
5.5 G /user/tongdun/id_hbase1_1/id/b35a2c68790c4b31bf0b59511b4baf7b
5.5 G /user/tongdun/id_hbase1_1/id/bab7d211bf614bc48d9661df9ba68d81
5.5 G /user/tongdun/id_hbase1_1/id/bd1cc8041aaf47ec980d7855d42f34c6
5.5 G /user/tongdun/id_hbase1_1/id/cbd41ef63b984d45ba49be5c3f0e5466
5.5 G /user/tongdun/id_hbase1_1/id/f01fbad0952b496bab2e4ab199c196cd

问题: 为什么不是0-f开头的每个都有一个文件.上面0开头的有三个文件.2开头的都没有.能对应上KeyRange吗?

导入HBase集群和验证数据

completebulkload tool is used to import the data into the running cluster: iterates through the prepared data files, and for each one determines the region the file belongs to. It then contacts the appropriate RegionServer which adopts the HFile, moving it into its storage directory and making the data available to clients.
循环每个HFile文件,决定这个文件属于哪个Region,联系对应的RegionServer,移动到RS的存储目录,客户端查询数据.

A.将生成的HFile导入到HBase集群中可以用hbase的增量加载: hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles $output data.md5_id2
B.仍然使用hadoop命令,但和前面创建HFile不同,这是一个hdfs的mv操作,并不会启动MapReduce: HADOOP_CLASSPATH=hbase classpathhadoop jar ${HBASE_HOME}/lib/hbase-server-1.0.2.jar completebulkload $output data.md5_id2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
HADOOP_CLASSPATH=`hbase classpath` hadoop jar ${HBASE_HOME}/lib/hbase-server-1.0.2.jar completebulkload /user/tongdun/id_hbase/id_hbase1_1 data.md5_id2

hbase(main):001:0> list
TABLE
data.md5_id
data.md5_id2
2 row(s) in 0.9380 seconds

=> ["data.md5_id", "data.md5_id2"]
hbase(main):002:0> get "data.md5_id2","000045d573fb248697bb9c3f8536c1b5"
COLUMN CELL
id:c1 timestamp=1450359965709, value=00
id:c2 timestamp=1450359965709, value=00
id:id timestamp=1450359965709, value=110100195503206224
3 row(s) in 1.0650 seconds

hbase(main):004:0> get "data.md5_id2","ddd5df06f27f9e127eb5816b338970dd"
COLUMN CELL
id:id timestamp=1450359965709, value=110100195512089920
3 row(s) in 0.1520 seconds

由于是移动文件, /user/tongdun/id_hbase/id_hbase1_1会被移动到HDFS中hbase的存储路径: 但是貌似名字不是一样的

hbase-path

由于completebulkload的文件夹下级必须是column-family:id,批量导入脚本:

1
2
3
4
5
#!/bin/sh
for f in `hadoop fs -ls /user/tongdun/id_hbase | awk '{print $8}'`
do
HADOOP_CLASSPATH=`hbase classpath` hadoop jar ${HBASE_HOME}/lib/hbase-server-1.0.2.jar completebulkload $f data.md5_id2
done

增加(手机号)新数据流程

1.上传文件到HDFS: hadoop fs -put mobile_173.txt /user/tongdun/mob_173
2.生成HFile并导入到HBase: sh hfile2.sh mob_173 mob_173_out

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input=$1
output=$2

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` hadoop jar ${HBASE_HOME}/lib/hbase-server-1.0.2.jar importtsv \
-Dimporttsv.columns=mob:mob,HBASE_ROW_KEY \
-Dimporttsv.separator=, \
-Dimporttsv.bulk.output=/user/tongdun/$output \
-Dcreate.table=no \
-Dno.strict=true \
-Dmapreduce.map.speculative=false \
-Dmapreduce.reduce.speculative=false \
-Dmapreduce.map.memory.mb=5120 -Dmapreduce.map.java.opts=-Xmx4096m \
-Dmapreduce.reduce.memory.mb=7189 -Dmapreduce.reduce.java.opts=-Xmx6144m \
data.md5_mob2 /user/tongdun/$input

HADOOP_CLASSPATH=`hbase classpath` hadoop jar ${HBASE_HOME}/lib/hbase-server-1.0.2.jar completebulkload /user/tongdun/$output data.md5_mob2

注意: 要根据源文件格式,比如mobile_173.txt只有两列,分别是手机号和md5.
对于手机号,创建表时指定的列族为mob,列名也是mob.

上面第一个命令会在output生成文件, 但是第二个命令会移动到hbase中,所以最终output是没有文件的.

或者直接将文本文件导入到HBase中: nohup sh hfile3.sh &

1
2
3
4
5
6
7
8
9
10
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` hadoop jar ${HBASE_HOME}/lib/hbase-server-1.0.2.jar importtsv \
-Dimporttsv.columns=mob:mob,HBASE_ROW_KEY \
-Dimporttsv.separator=, \
-Dcreate.table=no \
-Dno.strict=true \
-Dmapreduce.map.speculative=false \
-Dmapreduce.reduce.speculative=false \
-Dmapreduce.map.memory.mb=5120 -Dmapreduce.map.java.opts=-Xmx4096m \
-Dmapreduce.reduce.memory.mb=7189 -Dmapreduce.reduce.java.opts=-Xmx6144m \
data.md5_mob2 /user/tongdun/mob_173/mobbile_173.txt

查看MR作业: yarn application -list

验证新增数据

hdfs:

1
2
3
4
[qihuang.zheng@spark047213 ~]$ hadoop fs -cat /user/tongdun/mob_173/*.txt | head
17300000000,04b8c36de88db9ad1aa488b3beb960ea
17300000001,0d0a267f885048645083110b9dcfc770
17300000002,5a0c8c8e5682476d2e5a6446995f116a

hbase:

1
2
3
4
hbase(main):006:0> get 'data.md5_mob2','04b8c36de88db9ad1aa488b3beb960ea'
COLUMN CELL
mob:mob timestamp=1454381066042, value=17300000000
1 row(s) in 0.0280 seconds

修改副本数

Input Output Time multi
26G 87.3G 20min 3.3
806.5G 2.6T 10h 3.3
6T 18T 3

数据量有点大,所以考虑把这些数据的副本数都设置为1,减少数据的占用空间.

To set replication of an individual file to 4: hadoop fs -setrep -w 4 /path/to/file
You can also do this recursively. To change replication of entire HDFS to 1: hadoop fs -setrep -R -w 1 /

1
2
3
4
5
nohup hadoop fs -setrep -R -w 1 /hbase/data/default &
nohup hadoop fs -setrep -R -w 1 /user/tongdun/id_mdf* &
nohup hadoop fs -setrep -R -w 1 /user/tongdun/id_hbase* &
nohup hadoop fs -setrep -R -w 1 /user/tongdun/mob_mdf_tmp* &
nohup hadoop fs -setrep -R -w 1 /user/tongdun/mob_hbase &

-R选项表示递归,所以最后面的参数可以是文件夹.

Ref


文章目录
  1. 1. HBase BulkLoad
    1. 1.1. HBase & MapReduce
    2. 1.2. MR作业生成HFile
    3. 1.3. MR任务内存不足
    4. 1.4. MR任务内存调优
    5. 1.5. 生成HFile文件
    6. 1.6. 导入HBase集群和验证数据
    7. 1.7. 增加(手机号)新数据流程
      1. 1.7.1. 验证新增数据
    8. 1.8. 修改副本数
    9. 1.9. Ref