译:使用Kafka Connect构建一个可扩展的ETL管道

How to Build a Scalable ETL Pipeline with Kafka Connect
http://www.confluent.io/blog/how-to-build-a-scalable-etl-pipeline-with-kafka-connect

Apache Kafka是一个高性能的分布式消息系统,现在已经被很多公司用来管理他们的实时数据.
他们使用Kafka用在很多地方,实时流处理,数据同步,消息处理等等.其中最重要的应用之一是ETL管道.
Kafka是构建数据流管道的理想工具,它是可靠的,可扩展的,并且是高效的.

直到最近,使用kafka构建管道都需要一些额外的工作,每个你想要连接到kafka的系统要么需要自定义的代码
或者针对不同的接入系统分别使用不同的工具,每种新的工具都使用不同的配置,比如不同的数据格式,还要用
不同的方式来管理和监控.用这些大杂烩构建出来的管道是很脆弱的,并且很难进行统一管理.

Kafka Connect被设计用来简化构建大规模,实时数据管道的构建.它标准化了数据和kafka之间输入输入的方式.
你可以使用kafka connectors从外部系统中读取数据或者写入外部系统,管理数据流,扩展系统,这一切都不需要写新的代码.
Kafka Connect管理了和其他系统连接的所有共同问题(扩展性,故障容忍,配置,管理),这样每个connector就可以专注于
怎么以最好的方式在目标系统和kafka之间拷贝数据. Kafka Connect可以获取整个数据库(的数据);
或者从你所有的应用服务器收集指标到kafka的topics中,使得数据可以被用于低延迟的流处理系统;
一个导出的connector会将数据从kafka的topics中发送到二级索引(ElasticSearch)或者批处理系统(Hadoop)用于离线分析.

Confluent平台(译注:开发kafka的背后公司)现在已经整合了Kafka Connect,包含了三种connectors:
移动文件的connector, 针对SQL数据库的JDBC connector, 针对Hadoop(包括Hive)的HDFS connector.
JDBC和HDFS connector都提供了有用的功能让你能够很容易地构建ETL管道.

JDBC connector允许你使用JDBC Driver就可以从任何的关系型数据库中导出数据到kafka.
通过使用JDBC,这个connector可以支持很多类型的数据库,而且不需要为每一种数据库都编写自定义的代码.

数据被加载的方式是定时地执行一个SQL查询,并在结果集中为每一行创建一个输出记录.
默认情况下,数据库中所有的表都会被拷贝,每张表都对应自己的输出topic,这样很容易获取整个数据库的数据到kafka中.
数据库会监控新创建和删除的表,并且自动地调整. 当从一张表中拷贝数据时,connector可以只加载新建或修改的行.
你只需要指定哪些列需要被用来检测改变(即发生改变的时候才拷贝数据到kafka中).

HDFS connector允许你将kafka topics的数据导入到HDFS文件中,文件格式可以有很多种,
并且可以和Hive集成, 数据注入到HDFS后,就可以使用HiveQL立即查询到.

HDFS connector持续地消费kafka的数据然后写到HDFS中. 每个Kafka topic的数据可以用不同的方式分区并且切分成块.
每一块数据代表了一个HDFS文件,对应了topic, kafka partition, 数据块在文件名中的开始和结束offsets.
如果没有指定分区方式,默认的分区器只会用kafka topic和partition简单地组织数据.
每个数据块的大小可以通过记录数量,写文件耗费的时间,schema的适配性等因素来决定.

HDFS connector可以选择和Hive集成,一旦启用这个功能,connector会为每个kafka topic
自动地创建一张Hive的外部分区表,并且会根据HDFS中可用的数据更新表.

这篇文章中我们模拟了怎么使用Kafka Connect,并结合使用JDBC和HDFS connectors来构建一个可扩展的数据管道.
我们还会模拟JDBC和HDFS connectors的一些有用的功能,比如数据库变更的捕获,数据结构的迁移,自定义的分区.

在30分钟内构建一个可扩展的ETL管道

为了模拟Kafka Connect,我们构建了一个简单的数据管道,将一些常用的系统都连接在一起:
MySQL→Kafka→HDFS→Hive.这个管道会捕获数据库的数据变更,加载变更历史到数据仓库,即Hive.

在MySQL数据库中,有一张users表,保存了用户概要信息的当前状态,以及唯一的id列和modified列表示最近变更的时间撮.
我们通过更新users表中对应的条目来模拟用户概要的变更. JDBC connector会自动捕获这些用户信息的变更事件,
每一条变更记录都会作为一个事件写入到kafka中,然后HDFS connector会将这些事件写入到HDFS中.
最终存在于HDFS中的数据会是用户概要信息的编辑历史,你可以使用Hive或者Spark来分析这些数据.

为了你能够顺序启动所有需要的环境来完成这个例子,我们创建了一个虚拟机,以及源码仓库.包括了Confluent平台,Hadoop,Hive.
你可以使用预先构建好的虚拟机或者Vagrant来运行这个示例.

预编译虚拟机

在Virtualbox中,使用File->Import Appliance来导入预先构建好的虚拟机环境.
一旦虚拟机启动后,使用vagrant用户名和密码登陆到虚拟机中.

Vagrant

使用Vagrant可以简化虚拟机环境的搭建(因为Vagrant帮你把一些必要的环境都装好了)

1
2
3
4
$ git clone https://github.com/confluentinc/kafka-connect-blog
$ cd kafka-connect-blog
$ vagrant up
$ vagrant ssh

环境设置

在虚拟机中运行./setup.sh,会完成下面的操作:

1.下载和安装MySQL服务器,并启动MySQL服务器
2.下载MySQL的JDBC驱动. Hive的Metastore会使用它,Kafka的JDBC connector也会使用它
3.配置Hive的Metastore使用MySQL数据库

然后启动必要的服务./start.sh,完成下面的操作:

1.启动Kafka Connect的必要服务.因为Kafka Connect使用Kafka来传输数据,所以需要启动Kafka.
由于写入到HDFS的数据格式是Avro,所以需要运行Schema Registry来为数据保存成Avro的schemas.
2.启动Hadoop,我们有一个单节点的HDFS集群: hdfs://localhost:9000
3.启动Hive的metastore. 使用MySQL作为Hive的元数据存储, thrift://localhost:9083

数据准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ mysql -u root --password="mypassword"
mysql> CREATE DATABASE demo;
mysql> USE demo;
mysql> CREATE TABLE users (
-> id serial NOT NULL PRIMARY KEY,
-> name varchar(100),
-> email varchar(200),
-> department varchar(200),
-> modified timestamp default CURRENT_TIMESTAMP NOT NULL,
-> INDEX `modified_index` (`modified`)
-> );
mysql> INSERT INTO users (name, email, department) VALUES ('alice', 'alice@abc.com', 'engineering');
mysql> INSERT INTO users (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
mysql> exit;

使用kafka connect注入数据

现在我们可以启动Kafka Connect将数据从MySQL注入到Kafka中,然后再到HDFS. 注意MySQL中的数据会保存在Kafka中
直到被注入到HDFS中,因此可以导入到其他的系统比如Cassandra,ElasticSearch.

1
2
$ connect-standalone /mnt/etc/connect-avro-standalone.properties \
/mnt/etc/mysql.properties /mnt/etc/hdfs.properties &

上面的命令启动了带有两个connectors的kafka connect,一个是JDBC connect用来导入MySQL数据到kafka中,
一个是HDFS connector用来从kafka中导出数据到HDFS. JDBC connector创建了一个test_jdbc_users topic,
HDFS connector从test_jdbc_users中读取数据,写入到/topics/test_jdbc_users,并且创建了一张外部的Hive表.

一旦connector完成数据注入到HDFS, 使用Hive检查数据存在于HDFS中:

1
2
3
4
5
6
7
8
$ hive
hive> SHOW TABLES;
OK
test_jdbc_users
hive> SELECT * FROM test_jdbc_users;
OK
1 alice alice@abc.com engineering 1450305345000
2 bob bob@abc.com sales 1450305346000

上面的示例中,我们使用了FieldPartitioner来配置HDFS connector.相关的配置项如下:

partitioner.class: 指定写入数据到HDFS的分区器.在本例中,使用FieldPartitioner,
将数据根据partition.field.name的值分区到不同的目录中
partition.field.name: 指定分区字段

在本例中,我们使用department作为分区字段,检查HDFS的文件真的是以department分区的:

1
2
3
4
$ hadoop fs -ls /topics/test_jdbc_users
Found 2 items
drwxr-xr-x - vagrant supergroup 0 2015-12-16 22:45 /topics/test_jdbc_users/department=engineering
drwxr-xr-x - vagrant supergroup 0 2015-12-16 22:45 /topics/test_jdbc_users/department=sales

在/topics/test_jdbc_users目录下,有两个目录,每个目录的名字格式都是department=value. value部分根据users表中的department列决定.

捕获数据库更新

下面我们模拟JDBC connector怎么捕获数据库的更新. 配置信息如下:

mode: 指定了怎么捕获数据库的变更.在本例中,我们结合了增量的列和时间撮列.这是最健壮和最精确的模式. 结合这两者,
只要时间撮是足够细粒度的,每个(id,timestamp)元组都能唯一表示一行记录的更新.即使一个更新在局部完成后失败了,
未处理的更新仍然会被准确地检测到,当系统恢复的时候,也会被发送出去.
incrementing.column.name: 指定增量的列,这里用id列
timestamp.column.name: 指定时间撮列,被JDBC connect用来捕获表中已经存在的行的变更

1
2
mysql>UPDATE users SET email = 'alice@def.com', modified = CURRENT_TIMESTAMP WHERE name='alice';
mysql>UPDATE users SET email = 'bob@ghi.com', modified = CURRENT_TIMESTAMP WHERE name='bob';

JDBC connector会检测到users表中的数据发生了变化,因为modified列有更新的值. 所以它会从数据库中抓取修改的数据,
拷贝到Kafka,最终进入到HDFS. 为了验证修改后的数据存在HDFS中:

1
2
3
4
5
6
hive> SELECT * FROM test_jdbc_users;
OK
1 alice alice@abc.com engineering 1450305345000
1 alice alice@def.com engineering 1450306396000
2 bob bob@abc.com sales 1450305346000
2 bob bob@ghi.com sales 1450306397000

注意hive外部表中新增了两条记录.新的两条记录对应了mysql中users标的最新的内容.

现在我们已经模拟了一个简单的从数据库到数据仓库的ETL数据管道. Hive中的数据是用户概要信息的全部编辑历史数据.

表结构迁移

最后我们来模拟下HDFS connector如何处理表结构的迁移. connector支持表结构的进化以及数据的表结构变化的响应.
schema.compatibility可以设置为NONE,BACKWARD,FORWARD,FULL. 这里我们介绍最常用的BACKWARD.

如果表结构以backward的兼容性演化,我们可以总是使用最近的schema来统一查询数据. 比如移除字段是向后兼容的改变,
因为当我们遇到使用旧的schema的记录,对于包含删除的字段,我们可以直接忽略这些字段. 添加具有默认值的字段也是向后兼容.


文章目录
  1. 1. 在30分钟内构建一个可扩展的ETL管道
    1. 1.1. 预编译虚拟机
    2. 1.2. Vagrant
    3. 1.3. 环境设置
    4. 1.4. 数据准备
    5. 1.5. 使用kafka connect注入数据
    6. 1.6. 捕获数据库更新
    7. 1.7. 表结构迁移