ETL Tools

ETL Tools(Sqoop…)

Sqoop

Sqoop1

  1. 单张表、全部表、查询条件、Direct方式
  2. RDBMS导入到HDFS、从HDFS导出到RDBMS
  3. 增量(增量方式、检查列、上一次的值)
  4. Job用来支持增量和定时(重复执行)
  5. Evaluation(DDL/DML),RDBMS的客户端工具而已

增量查询示例(how to identify new data):

1
2
3
4
5
6
7
8
$ sqoop import \
--connect jdbc:mysql://localhost/userdb \
--username root \
--table emp \
--m 1 \
--incremental append \
--check-column id \
-last value 1205

数据不会被更新:append
数据会被更新:lastmodified(check-column是一个时间列,表示记录的更新时间)

定期执行增量任务,推荐采用作业(会自动存储上一次的值)。创建作业,执行作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ sqoop job --create myjob \
--import \
--connect jdbc:mysql://localhost/db \
--username root \
--table employee --m 1

$ sqoop job --show myjob
Job: myjob
Tool: import Options:
----------------------------
direct.import = true
codegen.input.delimiters.record = 0
hdfs.append.dir = false
db.table = employee
...
incremental.last.value = 1206
...

$ sqoop job --exec myjob

Sqoop2

DataX

快速入门示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
[qihuang.zheng@dp0653 datax]$ bin/datax.py job/job.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.
2016-10-27 14:46:29.162 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"streamreader",
"parameter":{
"column":[
{
"type":"string",
"value":"DataX"
},
{
"type":"long",
"value":19890604
},
{
"type":"date",
"value":"1989-06-04 00:00:00"
},
{
"type":"bool",
"value":true
},
{
"type":"bytes",
"value":"test"
}
],
"sliceRecordCount":100000
}
},
"writer":{
"name":"streamwriter",
"parameter":{
"encoding":"UTF-8",
"print":false
}
}
}
],
"setting":{
"errorLimit":{
"percentage":0.02,
"record":0
},
"speed":{
"byte":10485760
}
}
}

2016-10-27 14:46:29.194 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2016-10-27 14:46:29.196 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2016-10-27 14:46:29.196 [main] INFO JobContainer - DataX jobContainer starts job.
2016-10-27 14:46:29.198 [main] INFO JobContainer - Set jobId = 0
2016-10-27 14:46:29.215 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2016-10-27 14:46:29.215 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do prepare work .
2016-10-27 14:46:29.216 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do prepare work .
2016-10-27 14:46:29.216 [job-0] INFO JobContainer - jobContainer starts to do split ...
2016-10-27 14:46:29.217 [job-0] INFO JobContainer - Job set Max-Byte-Speed to 10485760 bytes.
2016-10-27 14:46:29.217 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] splits to [1] tasks.
2016-10-27 14:46:29.218 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.
2016-10-27 14:46:29.238 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2016-10-27 14:46:29.242 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2016-10-27 14:46:29.244 [job-0] INFO JobContainer - Running by standalone Mode.
2016-10-27 14:46:29.252 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2016-10-27 14:46:29.256 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2016-10-27 14:46:29.256 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2016-10-27 14:46:29.267 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2016-10-27 14:46:29.568 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[302]ms
2016-10-27 14:46:29.569 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2016-10-27 14:46:39.263 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.016s | All Task WaitReaderTime 0.029s | Percentage 100.00%
2016-10-27 14:46:39.264 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2016-10-27 14:46:39.265 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work.
2016-10-27 14:46:39.265 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do post work.
2016-10-27 14:46:39.265 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2016-10-27 14:46:39.267 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /home/qihuang.zheng/datax/hook
2016-10-27 14:46:39.270 [job-0] INFO JobContainer -
2016-10-27 14:46:39.270 [job-0] INFO JobContainer - PerfTrace not enable!
2016-10-27 14:46:39.271 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.016s | All Task WaitReaderTime 0.029s | Percentage 100.00%
2016-10-27 14:46:39.273 [job-0] INFO JobContainer -
任务启动时刻 : 2016-10-27 14:46:29
任务结束时刻 : 2016-10-27 14:46:39
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0

实验:MySQL->HDFS

下面的实验中DataX部署在本机,MySQL也是本机,HDFS是测试环境。

  1. 查看模板:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
$ bin/datax.py -r mysqlreader -w hdfswriter
Please refer to the mysqlreader document: https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
Please refer to the hdfswriter document: https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job.

{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [],
"compress": "",
"defaultFS": "",
"fieldDelimiter": "",
"fileName": "",
"fileType": "",
"path": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
  1. 准备MySQL表和Hive表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#mysql
CREATE TABLE `task` (
`id` bigint(20) NOT NULL,
`name` varchar(50) NOT NULL,
`done` tinyint(1) DEFAULT NULL,
PRIMARY KEY (`id`)
);
插入一些MySQL数据
mysql> select * from a;
+------+-------+
| id | price |
+------+-------+
| 1 | 15 |
| 2 | 25 |
| 3 | 10 |
| 4 | 45 |
| 5 | 10 |
| 6 | 10 |
+------+-------+
6 rows in set (0.00 sec)

#hive
create EXTERNAL table text_table(
id INT,
price INT
)
row format delimited
fields terminated by "\t"
STORED AS TEXTFILE
location '/user/qihuang.zheng/text_table';
  1. 测试环境的一些注意点:

1) 修改权限

1
2
/usr/install/hadoop/bin/hadoop fs -mkdir /user/qihuang.zheng/text_table
/usr/install/hadoop/bin/hadoop fs -chmod 777 /user/qihuang.zheng/

2) hdfs://tdfs修改为主机端口,因为datax在本机运行,不认识tdfs。

3) 模板给出的都必须填写,不允许有空字符串,比如MySQL的用户名密码,channel数量等

  1. job/mysql2hdfs.job
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id","price"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test"],
"table": ["a"]
}
],
"password": "root",
"username": "root",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "price",
"type": "INT"
}
],
"compress": "GZIP",
"defaultFS": "hdfs://192.168.6.52:9000",
"fieldDelimiter": "\t",
"fileName": "text",
"fileType": "text",
"path": "/user/qihuang.zheng/text_table",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "2"
}
}
}
}
  1. 执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
➜  datax bin/datax.py job/mysql2hdfs.json
2016-10-27 16:26:07.454 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2016-10-27 16:26:07.461 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2016-10-27 16:26:07.462 [main] INFO JobContainer - DataX jobContainer starts job.
2016-10-27 16:26:07.465 [main] INFO JobContainer - Set jobId = 0
2016-10-27 16:26:08.274 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://127.0.0.1:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2016-10-27 16:26:08.312 [job-0] INFO OriginalConfPretreatmentUtil - table:[a] has columns:[id,price].
十月 27, 2016 4:26:10 下午 org.apache.hadoop.util.NativeCodeLoader <clinit>
2016-10-27 16:26:12.665 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2016-10-27 16:26:12.666 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2016-10-27 16:26:12.667 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] do prepare work .
2016-10-27 16:26:13.008 [job-0] INFO HdfsWriter$Job - 由于您配置了writeMode append, 写入前不做清理工作, [/user/qihuang.zheng/text_table] 目录下写入相应文件名前缀 [text] 的文件
2016-10-27 16:26:13.008 [job-0] INFO JobContainer - jobContainer starts to do split ...
2016-10-27 16:26:13.009 [job-0] INFO JobContainer - Job set Channel-Number to 2 channels.
2016-10-27 16:26:13.021 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2016-10-27 16:26:13.022 [job-0] INFO HdfsWriter$Job - begin do split...
2016-10-27 16:26:13.031 [job-0] INFO HdfsWriter$Job - splited write file name:[hdfs://192.168.6.52:9000/user/qihuang.zheng/text_table__8a0c1ab7_af4b_4228_8ff0_36e138f2aa31/text__ec1afe8c_5286_4372_afb8_91d99223c61d]
2016-10-27 16:26:13.031 [job-0] INFO HdfsWriter$Job - end do split.
2016-10-27 16:26:13.031 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] splits to [1] tasks.
2016-10-27 16:26:13.070 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2016-10-27 16:26:13.081 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2016-10-27 16:26:13.093 [job-0] INFO JobContainer - Running by standalone Mode.
2016-10-27 16:26:13.133 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2016-10-27 16:26:13.157 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2016-10-27 16:26:13.157 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2016-10-27 16:26:13.197 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2016-10-27 16:26:13.219 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select id,price from a
] jdbcUrl:[jdbc:mysql://127.0.0.1:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2016-10-27 16:26:13.304 [0-0-0-writer] INFO HdfsWriter$Task - begin do write...
2016-10-27 16:26:13.305 [0-0-0-writer] INFO HdfsWriter$Task - write to file : [hdfs://192.168.6.52:9000/user/qihuang.zheng/text_table__8a0c1ab7_af4b_4228_8ff0_36e138f2aa31/text__ec1afe8c_5286_4372_afb8_91d99223c61d]
2016-10-27 16:26:13.314 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select id,price from a
] jdbcUrl:[jdbc:mysql://127.0.0.1:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2016-10-27 16:26:13.916 [0-0-0-writer] INFO HdfsWriter$Task - end do write
2016-10-27 16:26:14.013 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[823]ms
2016-10-27 16:26:14.014 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2016-10-27 16:26:23.185 [job-0] INFO StandAloneJobContainerCommunicator - Total 6 records, 18 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2016-10-27 16:26:23.185 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2016-10-27 16:26:23.186 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] do post work.
2016-10-27 16:26:23.187 [job-0] INFO HdfsWriter$Job - start rename file [hdfs://192.168.6.52:9000/user/qihuang.zheng/text_table__8a0c1ab7_af4b_4228_8ff0_36e138f2aa31/text__ec1afe8c_5286_4372_afb8_91d99223c61d.gz] to file [hdfs://192.168.6.52:9000/user/qihuang.zheng/text_table/text__ec1afe8c_5286_4372_afb8_91d99223c61d.gz].
2016-10-27 16:26:23.213 [job-0] INFO HdfsWriter$Job - finish rename file [hdfs://192.168.6.52:9000/user/qihuang.zheng/text_table__8a0c1ab7_af4b_4228_8ff0_36e138f2aa31/text__ec1afe8c_5286_4372_afb8_91d99223c61d.gz] to file [hdfs://192.168.6.52:9000/user/qihuang.zheng/text_table/text__ec1afe8c_5286_4372_afb8_91d99223c61d.gz].
2016-10-27 16:26:23.214 [job-0] INFO HdfsWriter$Job - start delete tmp dir [hdfs://192.168.6.52:9000/user/qihuang.zheng/text_table__8a0c1ab7_af4b_4228_8ff0_36e138f2aa31] .
2016-10-27 16:26:23.239 [job-0] INFO HdfsWriter$Job - finish delete tmp dir [hdfs://192.168.6.52:9000/user/qihuang.zheng/text_table__8a0c1ab7_af4b_4228_8ff0_36e138f2aa31] .
2016-10-27 16:26:23.240 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.
2016-10-27 16:26:23.241 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2016-10-27 16:26:23.244 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /Users/zhengqh/Soft/datax/hook
2016-10-27 16:26:23.353 [job-0] INFO JobContainer -
2016-10-27 16:26:23.353 [job-0] INFO JobContainer - PerfTrace not enable!
2016-10-27 16:26:23.355 [job-0] INFO StandAloneJobContainerCommunicator - Total 6 records, 18 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2016-10-27 16:26:23.357 [job-0] INFO JobContainer -
任务启动时刻 : 2016-10-27 16:26:07
任务结束时刻 : 2016-10-27 16:26:23
任务总计耗时 : 15s
任务平均流量 : 1B/s
记录写入速度 : 0rec/s
读出记录总数 : 6
读写失败总数 : 0
  1. 验证数据

查询HIVE表

1
2
3
4
5
6
7
8
9
hive> select * from text_table;
OK
1 15
2 25
3 10
4 45
5 10
6 10
Time taken: 0.305 seconds, Fetched: 6 row(s)

查询HDFS

1
2
3
4
5
6
7
8
9
10
[qihuang.zheng@dp0653 ~]$ hadoop fs -ls /user/qihuang.zheng/text_table
Found 1 items
2016-10-27 16:26 /user/qihuang.zheng/text_table/text__ec1afe8c_5286_4372_afb8_91d99223c61d.gz
[qihuang.zheng@dp0653 ~]$ hadoop fs -text /user/qihuang.zheng/text_table/text*
1 15
2 25
3 10
4 45
5 10
6 10

文章目录
  1. 1. Sqoop
    1. 1.1. Sqoop1
    2. 1.2. Sqoop2
  2. 2. DataX
    1. 2.1. 实验:MySQL->HDFS