Druid 分布式OLAP入门

Druid is a fast column-oriented distributed data store. http://druid.io/

参考文档

http://druid.io/docs/latest/tutorials/tutorial-a-first-look-at-druid.html
Druid.io实时OLAP数据分析存储系统介绍: http://lxw1234.com/archives/2015/11/563.htm
http://www.slideshare.net/RostislavPashuto/aggregated-queries-with-druid-on-terrabytes-and-petabytes-of-data

Data(meta)

数据源是Wikipedia的编辑日志. 当这种事件发生时,会push到IRC通道. 我们的示例会实时读取IRC通道的数据写入到Druid的实时节点中.

Each event has a timestamp indicating the time of the edit (in UTC time),
a list of dimensions indicating various metadata about the event
(such as information about the user editing the page and where the user is a bot),
and a list of metrics associated with the event (such as the number of characters added and deleted).

事件的定义: 时间戳, 事件的元数据:维度, 事件关联的指标.
Dimensions (things to filter on) 参与事件过滤
Metrics (things to aggregate over) 要聚合的字段

Hello Druid

1.启动ZooKeeper: zkServer.sh start

2.启动示例服务器:

1
2
3
4
5
6
➜  druid-0.8.2  ./run_example_server.sh
This will run a stand-alone version of Druid
+ java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=/Users/zhengqh/Soft/druid-0.8.2/examples/wikipedia/wikipedia_realtime.spec
-Ddruid.extensions.localRepository=./extensions-repo '-Ddruid.extensions.remoteRepositories=[]' -Ddruid.publish.type=noop
-classpath '/Users/zhengqh/Soft/druid-0.8.2/../config/realtime:/Users/zhengqh/Soft/druid-0.8.2/examples/wikipedia:/Users/zhengqh/Soft/druid-0.8.2/config/_common:/Users/zhengqh/Soft/druid-0.8.2/config/realtime:/Users/zhengqh/Soft/druid-0.8.2/lib/*'
io.druid.cli.Main example realtime ⬅️ 启动一个实时节点

3.启动示例客户端:

1
2
3
4
5
6
7
➜  druid-0.8.2  ./run_example_client.sh
This will run a query against a stand-alone version of Druid.

{
"queryType":"timeBoundary",
"dataSource":"wikipedia"
{

timeBoundary表示所有事件的时间边界, 下面是两次查询的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[ {
"timestamp" : "2015-12-02T01:40:47.555Z",
"result" : {
"minTime" : "2015-12-02T01:40:47.555Z",
"maxTime" : "2015-12-02T01:42:21.869Z"
}
} ]

[ {
"timestamp" : "2015-12-02T01:40:47.555Z",
"result" : {
"minTime" : "2015-12-02T01:40:47.555Z",
"maxTime" : "2015-12-02T01:42:52.597Z"
}
} ]

可以看到每次查询只会有一个timeBucket,因为默认是所有的记录都放在一个timeBucket里.

4.queryType设置为timeseries, granularity时间粒度为minute,表示每分钟的数据为一个timeBucket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
➜  druid-0.8.2  cat timeseries.json
{
"queryType": "timeseries",
"dataSource": "wikipedia",
"intervals": [ "2010-01-01/2020-01-01" ],
"granularity": "minute",
"aggregations": [
{"type": "longSum", "fieldName": "count", "name": "edit_count"},
{"type": "doubleSum", "fieldName": "added", "name": "chars_added"}
]
}

➜ druid-0.8.2 curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @timeseries.json
[ {
"timestamp" : "2015-12-02T01:40:00.000Z",
"result" : {
"chars_added" : 18400.0,
"edit_count" : 43
}
}, {
"timestamp" : "2015-12-02T01:41:00.000Z",
"result" : {
"chars_added" : 45920.0,
"edit_count" : 153
}
}, {
"timestamp" : "2015-12-02T01:42:00.000Z",
"result" : {
"chars_added" : 104842.0,
"edit_count" : 150
}
}, {...

5.topN查询: select page,sum(count) as edit_count from x where country='US' group by page order by edit_count desc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
➜  druid-0.8.2  cat topn.json
{
"queryType": "topN",
"dataSource": "wikipedia",
"granularity": "all",
"dimension": "page",
"metric": "edit_count",
"threshold" : 10,
"aggregations": [
{"type": "longSum", "fieldName": "count", "name": "edit_count"}
],
"filter": { "type": "selector", "dimension": "country", "value": "United States" },
"intervals": ["2012-10-01T00:00/2020-01-01T00"]
}

➜ druid-0.8.2 curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @topn.json
[ {
"timestamp" : "2015-12-02T01:40:47.555Z",
"result" : [ {
"page" : "Hillel_at_the_University_of_Illinois_at_Urbana-Champaign",
"edit_count" : 4
}, {
"page" : "All_Together_Now",
"edit_count" : 2
}, {
.....] //TOTAL 10 RECORDS
} ]%

5.RealTime实时节点的服务器日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
③ timeBoundary
2015-12-02T01:42:22,513 INFO [timeBoundary_wikipedia_[2015-12-02T01:00:00.000Z/2015-12-02T02:00:00.000Z]] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-02T01:42:22.512Z","service":"realtime","host":"localhost:8084","metric":"query/partial/time","value":63,"dataSource":"wikipedia","id":"b9474f46-fa0f-44a9-b4e2-5a1aba27c85c","type":"timeBoundary"}]
2015-12-02T01:42:22,514 INFO [timeBoundary_wikipedia_[2015-12-02T01:00:00.000Z/2015-12-02T02:00:00.000Z]] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-02T01:42:22.513Z","service":"realtime","host":"localhost:8084","metric":"query/wait/time","value":18,"dataSource":"wikipedia","id":"b9474f46-fa0f-44a9-b4e2-5a1aba27c85c","type":"timeBoundary"}]
2015-12-02T01:42:22,622 INFO [qtp940621403-23] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-02T01:42:22.620Z","service":"realtime","host":"localhost:8084","metric":"query/time","value":352,"context":"{\"queryId\":\"b9474f46-fa0f-44a9-b4e2-5a1aba27c85c\",\"timeout\":300000}","dataSource":"wikipedia","duration":"PT94670899200S","hasFilters":"false","id":"b9474f46-fa0f-44a9-b4e2-5a1aba27c85c","interval":["0000-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"remoteAddress":"127.0.0.1","type":"timeBoundary"}]

④ timeseries
2015-12-02T01:45:40,470 INFO [timeseries_wikipedia_[2015-12-02T01:00:00.000Z/2015-12-02T02:00:00.000Z]] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-02T01:45:40.470Z","service":"realtime","host":"localhost:8084","metric":"query/partial/time","value":51,"dataSource":"wikipedia","duration":"PT315532800S","hasFilters":"false","id":"ce112856-ce91-4ff4-ae47-06894843e2a6","interval":["2010-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"],"numComplexMetrics":"0","numMetrics":"2","type":"timeseries"}]
2015-12-02T01:45:40,471 INFO [timeseries_wikipedia_[2015-12-02T01:00:00.000Z/2015-12-02T02:00:00.000Z]] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-02T01:45:40.470Z","service":"realtime","host":"localhost:8084","metric":"query/wait/time","value":1,"dataSource":"wikipedia","duration":"PT315532800S","hasFilters":"false","id":"ce112856-ce91-4ff4-ae47-06894843e2a6","interval":["2010-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"],"numComplexMetrics":"0","numMetrics":"2","type":"timeseries"}]
2015-12-02T01:45:40,476 INFO [qtp940621403-22] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-02T01:45:40.475Z","service":"realtime","host":"localhost:8084","metric":"query/time","value":151,"context":"{\"queryId\":\"ce112856-ce91-4ff4-ae47-06894843e2a6\",\"timeout\":300000}","dataSource":"wikipedia","duration":"PT315532800S","hasFilters":"false","id":"ce112856-ce91-4ff4-ae47-06894843e2a6","interval":["2010-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"],"remoteAddress":"127.0.0.1","type":"timeseries"}]

内存数据持久化成Segment数据文件
2015-12-02T01:49:52,551 INFO [chief-wikipedia[0]] io.druid.segment.realtime.plumber.RealtimePlumber - Submitting persist runnable for dataSource[wikipedia]
2015-12-02T01:49:52,581 INFO [wikipedia-incremental-persist] io.druid.segment.realtime.plumber.RealtimePlumber - DataSource[wikipedia], Interval[2015-12-02T01:00:00.000Z/2015-12-02T02:00:00.000Z], Metadata [null] persisting Hydrant[FireHydrant{index=io.druid.segment.incremental.OnheapIncrementalIndex@749297a4, queryable=io.druid.segment.ReferenceCountingSegment@4421cbc6, count=0}]
2015-12-02T01:49:52,621 INFO [wikipedia-incremental-persist] io.druid.guice.PropertiesModule - Loading properties from common.runtime.properties
2015-12-02T01:49:52,623 INFO [wikipedia-incremental-persist] io.druid.guice.PropertiesModule - Loading properties from runtime.properties
2015-12-02T01:49:52,666 INFO [wikipedia-incremental-persist] io.druid.segment.IndexMerger - Starting persist for interval[2015-12-02T01:00:00.000Z/2015-12-02T01:49:52.507Z], rows[1,374]
2015-12-02T01:49:52,877 INFO [wikipedia-incremental-persist] io.druid.segment.IndexMerger - outDir[/tmp/realtime/basePersist/wikipedia/2015-12-02T01:00:00.000Z_2015-12-02T02:00:00.000Z/0/v8-tmp] completed index.drd in 21 millis.
2015-12-02T01:49:53,157 INFO [wikipedia-incremental-persist] io.druid.segment.IndexMerger - outDir[/tmp/realtime/basePersist/wikipedia/2015-12-02T01:00:00.000Z_2015-12-02T02:00:00.000Z/0/v8-tmp] completed dim conversions in 277 millis.
2015-12-02T01:49:53,833 INFO [wikipedia-incremental-persist] io.druid.segment.IndexMerger - outDir[/tmp/realtime/basePersist/wikipedia/2015-12-02T01:00:00.000Z_2015-12-02T02:00:00.000Z/0/v8-tmp] completed walk through of 1,374 rows in 675 millis.
...
2015-12-02T01:49:54,099 INFO [wikipedia-incremental-persist] io.druid.segment.IndexMerger - Starting dimension[user] with cardinality[468]
2015-12-02T01:49:54,146 INFO [wikipedia-incremental-persist] io.druid.segment.IndexMerger - Completed dimension[user] in 47 millis.
2015-12-02T01:49:54,147 INFO [wikipedia-incremental-persist] io.druid.segment.IndexMerger - outDir[/tmp/realtime/basePersist/wikipedia/2015-12-02T01:00:00.000Z_2015-12-02T02:00:00.000Z/0/v8-tmp] completed inverted.drd in 313 millis.
2015-12-02T01:49:54,188 INFO [wikipedia-incremental-persist] io.druid.segment.IndexIO$DefaultIndexIOHandler - Converting v8[/tmp/realtime/basePersist/wikipedia/2015-12-02T01:00:00.000Z_2015-12-02T02:00:00.000Z/0/v8-tmp] to v9[/tmp/realtime/basePersist/wikipedia/2015-12-02T01:00:00.000Z_2015-12-02T02:00:00.000Z/0]

⑤ topN
2015-12-02T01:55:04,188 INFO [topN_wikipedia_[2015-12-02T01:00:00.000Z/2015-12-02T02:00:00.000Z]] io.druid.offheap.OffheapBufferPool - Allocating new intermediate processing buffer[0] of size[100,000,000]
2015-12-02T01:55:04,319 INFO [topN_wikipedia_[2015-12-02T01:00:00.000Z/2015-12-02T02:00:00.000Z]] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-02T01:55:04.319Z","service":"realtime","host":"localhost:8084","metric":"query/partial/time","value":191,"dataSource":"wikipedia","dimension":"page","duration":"PT228787200S","hasFilters":"true","id":"837752d5-7fa8-447f-bf21-b843827577db","interval":["2012-10-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"],"numComplexMetrics":"0","numMetrics":"1","threshold":"10","type":"topN"}]
2015-12-02T01:55:04,320 INFO [topN_wikipedia_[2015-12-02T01:00:00.000Z/2015-12-02T02:00:00.000Z]] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-02T01:55:04.319Z","service":"realtime","host":"localhost:8084","metric":"query/wait/time","value":1,"dataSource":"wikipedia","dimension":"page","duration":"PT228787200S","hasFilters":"true","id":"837752d5-7fa8-447f-bf21-b843827577db","interval":["2012-10-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"],"numComplexMetrics":"0","numMetrics":"1","threshold":"10","type":"topN"}]
2015-12-02T01:55:04,337 INFO [qtp940621403-26] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-02T01:55:04.336Z","service":"realtime","host":"localhost:8084","metric":"query/time","value":289,"context":"{\"queryId\":\"837752d5-7fa8-447f-bf21-b843827577db\",\"timeout\":300000}","dataSource":"wikipedia","duration":"PT228787200S","hasFilters":"true","id":"837752d5-7fa8-447f-bf21-b843827577db","interval":["2012-10-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"],"remoteAddress":"127.0.0.1","type":"topN"}]

6.总结:

The most basic Druid setup: a single realtime node. We streamed in some data and queried it.
Realtime nodes collect very recent data and periodically hand that data off to the rest of the Druid cluster.
最基本的Druid设置, 单一的RealTime节点. 实时导入数据并查询这些数据. 实时节点会收集最近的数据,每隔一段时间将数据转换/保存到Druid集群中.

Druid collects each individual event and packages them together in a container known as a segment.
Segments contain data over some span of time. 每条事件被收集并打包成Druid的segment容器,这个容器内的数据包含了一段时间的所有数据.


Druid Cluster(单机伪分布式)

依赖组件和配置文件

config/_common/common.runtime.properties文件定义了Druid外部依赖的三个组件:
ZooKeeper, MySQL, DeepStorage(HDFS). 在单机下, 可以设置为本地的ZK, derby, local本地文件系统.

ConfigFile Node Port
config/coordinator/runtime.properties 协调节点(Coordinator) 8081
config/historical/runtime.properties 历史节点(Historical) 8083
config/broker/runtime.properties Broker节点 8082
config/realtime/runtime.properties 实时节点(Realtime) 8084
config/overlord/runtime.properties overlord(IndexService) 8090

历史节点和实时节点类似于Worker|DataNode,在集群环境可以有多个.
在集群环境下, 在每个节点都要修改配置文件的druid.host为本机的IP地址.

启动协调节点

Coordinator nodes are in charge of load assignment and distribution. 协调节点负责加载任何和分发任务.
Coordinator nodes monitor the status of the cluster and command historical nodes to assign and drop segments.

1
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/coordinator:lib/* io.druid.cli.Main server coordinator

启动历史节点

Historical nodes are the workhorses of a cluster and are in charge of 历史节点会加载历史segments
loading historical segments and making them available for queries. 让这些segments能够被查询使用
Realtime nodes hand off segments to historical nodes 实时节点的数据会转换到历史节点上

1
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/historical:lib/* io.druid.cli.Main server historical

启动Broker

Broker nodes are responsible for figuring out which historical and/or realtime nodes correspond to which queries. 更像Cassandra中的Coordinator
They also merge partial results from these nodes in a scatter/gather fashion. 确定查询要分发到那个历史节点和实时节点,并聚合这些节点的数据返回给客户端.

1
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/broker:lib/* io.druid.cli.Main server broker

启动实时节点

因为是测试环境,将间隔时间设少点, 下面表示窗口的大小为5分钟, 每隔一分钟统计过去五分钟的数据. 每隔3分钟持久化中间数据.

1
2
3
"segmentGranularity": "FIVE_MINUTE",
"intermediatePersistPeriod": "PT3m",
"windowPeriod": "PT1m",

启动RealTime节点, 使用上面修改过的spec. 可以看到只有实时节点添加了specFile,因为实时节点负责读取数据源.

1
java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/wikipedia/wikipedia_realtime.spec -classpath config/_common:config/realtime:lib/* io.druid.cli.Main server realtime

All Things Up

Once the real-time node starts up, it should begin ingesting data and handing that data off to the rest of the Druid cluster.
You can use a web UI located at coordinator_ip:port to view the status of data being loaded.
Once data is handed off from the real-time nodes to historical nodes, the historical nodes should begin serving segments.
一旦启动了实时节点,数据就会流入并转换到Druid的集群中. 当数据从实时节点转换到历史节点,历史节点就可以用其上的segments服务于查询了.
当然数据在实时节点的时候是存在于实时节点的内存上, 当然查询也是没有问题的. 数据转换到历史节点是因为这些数据不再是实时数据了而是历史数据.

druid_arch2

  • ① 实时数据写入到实时节点,会创建索引结构的Segment
  • ② 实时节点的Segment经过一段时间会转存到DeepStorage
  • ③ 元数据写入MySQL; 实时节点转存的Segment会在ZooKeeper中新增一条记录
  • ④ 协调节点从MySQL获取元数据,比如schema信息(维度列和指标列)
  • ⑤ 协调节点监测ZK中有新分配/要删除的Segment,写入ZooKeeper信息:历史节点需要加载/删除Segment
  • ⑥ 历史节点监测ZK, 从ZooKeeper中得到要执行任务的Segment
  • ⑦ 历史节点从DeepStorage下载Segment并加载到内存/或者将已经保存的Segment删除掉
  • ⑧ 历史节点的Segment可以用于Broker的查询路由

客户端查询

向Broker(8082)发起查询请求, Broker会同时向实时节点和历史节点发起请求.

1
curl -X POST 'http://localhost:8082/druid/v2/?pretty' -H 'content-type: application/json' -d@examples/wikipedia/query.body

如果一开始就向历史节点(8083)发起查询请求,当实时节点还没运行足够长时间,不会转换数据到历史节点. 所以下面的查询刚开始没有数据
因为窗口大小为5分钟,所以实时节点至少五分钟后才会转换数据到历史节点(虽然持久化间隔为3分钟,但第一次还是需要5分钟的窗口填满)

1
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d@examples/wikipedia/query.body

实时节点(8084)的内存中保存有最近(索引过后)的数据,并且这部分数据还没转存到历史节点. 当历史节点回馈说它已经完全加载了segment:这里面包括了两个步骤
A.实时节点数据转存到历史节点, B.历史节点加载Segment到内存中. 同时实时节点就会丢弃这个segment. 因为只需要有一份数据存在即可.

1
curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d@examples/wikipedia/query.body

如果你仅仅是需要查询数据, 只需要向Broker发起请求即可, 因为Broker会向实时节点和历史节点发起请求并将它们的结果合并起来返回给客户端.


Loading Stream Data

http://druid.io/docs/0.8.2/tutorials/tutorial-loading-streaming-data.html

Hello Druid一节其实就是Load实时数据,不过数据是从wikipedia的IRC实时获取. 当然实际业务数据可以改成从Kafka获取.

1.启动ZK和Kafka,创建测试用的topic,启动一个控制台的生产者,这样测试用的数据可以直接复制到控制台上来模拟生产数据.

1
2
3
4
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia

2.examples/indexing/wikipedia.spec中已经配置了Kafka数据源和feed(topic)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.8",
"consumerProps": {
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "druid-example",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},

3.启动实时节点

1
java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath "config/_common:config/realtime:lib/*" io.druid.cli.Main server realtime

4.往kafka中写入数据, 可以把测试数据(5条)直接复制到kafka-console-producer.sh的终端窗口中

5.观察realtime终端,接收到了数据并通知产生了DataSegment:

1
2
3
4
5
6
7
8
9
10
11
12
13
2015-12-03T07:27:14,378 INFO [chief-wikipedia[0]] io.druid.server.coordination.BatchDataSegmentAnnouncer - 
Announcing segment[wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2013-08-31T00:00:00.000Z] ⬅️
at path[/druid/segments/192.168.6.113:8084/192.168.6.113:8084_realtime__default_tier_2015-12-03T07:27:14.375Z_fa529bb4ba6f4877ad95d835afdb01500]

2015-12-03T07:27:19,173 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:19.152Z","service":"realtime","host":"192.168.6.113:8084","metric":"ingest/events/thrownAway","value":0,"dataSource":"wikipedia"}]
2015-12-03T07:27:19,174 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:19.173Z","service":"realtime","host":"192.168.6.113:8084","metric":"ingest/events/unparseable","value":0,"dataSource":"wikipedia"}]
2015-12-03T07:27:19,175 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:19.174Z","service":"realtime","host":"192.168.6.113:8084","metric":"ingest/events/processed","value":5,"dataSource":"wikipedia"}] ⬅️处理了5条数据
2015-12-03T07:27:19,175 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:19.175Z","service":"realtime","host":"192.168.6.113:8084","metric":"ingest/rows/output","value":0,"dataSource":"wikipedia"}]
2015-12-03T07:27:19,176 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:19.176Z","service":"realtime","host":"192.168.6.113:8084","metric":"ingest/persists/count","value":0,"dataSource":"wikipedia"}]
2015-12-03T07:27:19,176 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:19.176Z","service":"realtime","host":"192.168.6.113:8084","metric":"ingest/persists/time","value":0,"dataSource":"wikipedia"}]
2015-12-03T07:27:19,177 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:19.177Z","service":"realtime","host":"192.168.6.113:8084","metric":"ingest/persists/backPressure","value":0,"dataSource":"wikipedia"}]
2015-12-03T07:27:19,177 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:19.177Z","service":"realtime","host":"192.168.6.113:8084","metric":"ingest/persists/failed","value":0,"dataSource":"wikipedia"}]
2015-12-03T07:27:19,178 INFO [MonitorScheduler-0] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:19.177Z","service":"realtime","host":"192.168.6.113:8084","metric":"ingest/handoff/failed","value":0,"dataSource":"wikipedia"}]

6.向端口为8084的实时节点查询(因为这里只启动了一个实时节点,其他节点并没有启动)

1
curl -XPOST -H'Content-type: application/json' "http://localhost:8084/druid/v2/?pretty" -d'{"queryType":"timeBoundary","dataSource":"wikipedia"}'

realtime节点发起一个timeBoundary查询,经历了segment/scan/pending->query/partial/time->query/wait/time->query/time最终返回结果.

1
2
3
4
2015-12-03T07:27:34,419 INFO [qtp91831175-22] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:34.415Z","service":"realtime","host":"192.168.6.113:8084","metric":"segment/scan/pending","value":0}]
2015-12-03T07:27:34,489 INFO [timeBoundary_wikipedia_[2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z]] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:34.488Z","service":"realtime","host":"192.168.6.113:8084","metric":"query/partial/time","value":54,"dataSource":"wikipedia","id":"6dddbc48-2583-4fff-984c-52043fbcb03b","type":"timeBoundary"}]
2015-12-03T07:27:34,489 INFO [timeBoundary_wikipedia_[2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z]] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:34.489Z","service":"realtime","host":"192.168.6.113:8084","metric":"query/wait/time","value":20,"dataSource":"wikipedia","id":"6dddbc48-2583-4fff-984c-52043fbcb03b","type":"timeBoundary"}]
2015-12-03T07:27:34,534 INFO [qtp91831175-22] com.metamx.emitter.core.LoggingEmitter - Event [{"feed":"metrics","timestamp":"2015-12-03T07:27:34.532Z","service":"realtime","host":"192.168.6.113:8084","metric":"query/time","value":229,"context":"{\"queryId\":\"6dddbc48-2583-4fff-984c-52043fbcb03b\",\"timeout\":300000}","dataSource":"wikipedia","duration":"PT94670899200S","hasFilters":"false","id":"6dddbc48-2583-4fff-984c-52043fbcb03b","interval":["0000-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"remoteAddress":"0:0:0:0:0:0:0:1","type":"timeBoundary"}]

druid_real

7.高级流数据读取Advanced Streaming Ingestion

Druid solved the availability problem by switching from a pull-based model to a push-based model; 从拉的模型改为推的模型
rather than Druid indexers pulling data from Kafka, another process pulls data and pushes the data into Druid. 由IndexService从Kafka拉取消息并推送到Druid
Since with the push based model, we can ensure that the same data makes it into the same shard, we can replicate data. 这样同一份数据达到相同的分片并可副本复制
The indexing service encapsulates this functionality, where a task-and-resources model replaces a standalone machine model. 任务和资源模型替换了传统的机器模型
In addition to simplifying machine configuration, the model also allows nodes to run in the cloud with an elastic number of machines.
If you are interested in this form of real-time ingestion, please check out the client library Tranquility.

Loading Batch Data

从前面的架构图可以看出有两种数据来源: 实时数据和批数据. 实时数据直接写到实时节点, 实时节点的数据会转存到DeepStorage.
对于批量数据,可以直接加载数据到DeepStorage比如HDFS. 剩下的过程和实时数据转存到DeepStorage过程类似最终服务于Broker的查询.

druid_index

批量数据使用IndexService,接收Post请求的任务,直接产生Segment写到DeepStorage里.DeepStorage中的数据只会被历史节点使用.
所以这里要启动的服务有: IndexService(overlord), Historical, Coordinator(协调节点通知历史节点下载Segment),
同样为了试验方便,和上面的实时节点一样这里也没有启动Broker节点,因为数据存在于历史节点,也是可以直接向历史节点查询数据.

1
2
3
4
zkServer.sh start
java -Xmx1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath "config/_common:config/overlord:lib/*" io.druid.cli.Main server overlord
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath "config/_common:config/coordinator:lib/*" io.druid.cli.Main server coordinator
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath "config/_common:config/historical:lib/*" io.druid.cli.Main server historical

注意,如果控制台报错zsh: match not found,要给classpath添加双引号才不会被转义.

向8090端口表示的Overlord节点提交索引任务(task), 过会儿, 向8083表示的历史节点查询数据:

1
2
3
4
5
6
7
8
9
10
11
➜  druid-0.8.2  curl -X 'POST' -H 'Content-Type:application/json' -d @examples/indexing/wikipedia_index_task.json localhost:8090/druid/indexer/v1/task
{"task":"index_wikipedia_2015-12-03T08:20:00.127Z"}

➜ druid-0.8.2 curl -XPOST -H'Content-type: application/json' "http://localhost:8083/druid/v2/?pretty" -d'{"queryType":"timeBoundary","dataSource":"wikipedia"}'
[ {
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"maxTime" : "2013-08-31T12:41:27.000Z",
"minTime" : "2013-08-31T01:02:33.000Z"
}
} ]

1.IndexService节点的日志: 启动任务(运行的是internal peon),发布Segment到DB(DeepStorage)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2015-12-03T08:20:00,130 INFO [qtp1380593659-21] io.druid.indexing.overlord.HeapMemoryTaskStorage - Inserting task index_wikipedia_2015-12-03T08:20:00.127Z with status: TaskStatus{id=index_wikipedia_2015-12-03T08:20:00.127Z, status=RUNNING, duration=-1}
2015-12-03T08:20:00,131 INFO [qtp1380593659-21] io.druid.indexing.overlord.TaskLockbox - Adding task[index_wikipedia_2015-12-03T08:20:00.127Z] to activeTasks
2015-12-03T08:20:00,138 INFO [TaskQueue-Manager] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_wikipedia_2015-12-03T08:20:00.127Z]: LockTryAcquireAction{interval=2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z}
2015-12-03T08:20:00,140 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskLockbox - Created new TaskLockPosse: TaskLockPosse{taskLock=TaskLock{groupId=index_wikipedia_2015-12-03T08:20:00.127Z, dataSource=wikipedia, interval=2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z, version=2015-12-03T08:20:00.139Z}, taskIds=[]}
2015-12-03T08:20:00,140 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskLockbox - Added task[index_wikipedia_2015-12-03T08:20:00.127Z] to TaskLock[index_wikipedia_2015-12-03T08:20:00.127Z]
2015-12-03T08:20:00,140 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskQueue - Asking taskRunner to run: index_wikipedia_2015-12-03T08:20:00.127Z
2015-12-03T08:20:00,266 INFO [pool-7-thread-1] io.druid.indexing.overlord.ForkingTaskRunner - Running command: java -cp ....-Ddruid.port=8100 io.druid.cli.Main internal peon /tmp/persistent/task/index_wikipedia_2015-12-03T08:20:00.127Z/0aab109c-d8f4-4ff7-b3e4-4c89c6fae716/task.json /tmp/persistent/task/index_wikipedia_2015-12-03T08:20:00.127Z/0aab109c-d8f4-4ff7-b3e4-4c89c6fae716/status.json

2015-12-03T08:20:00,320 INFO [pool-7-thread-1] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_wikipedia_2015-12-03T08:20:00.127Z output to: /tmp/persistent/task/index_wikipedia_2015-12-03T08:20:00.127Z/0aab109c-d8f4-4ff7-b3e4-4c89c6fae716/log
2015-12-03T08:20:11,621 INFO [qtp1380593659-22] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_wikipedia_2015-12-03T08:20:00.127Z]: LockTryAcquireAction{interval=2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z}
2015-12-03T08:20:11,633 INFO [qtp1380593659-22] io.druid.indexing.overlord.TaskLockbox - Task[index_wikipedia_2015-12-03T08:20:00.127Z] already present in TaskLock[index_wikipedia_2015-12-03T08:20:00.127Z]
2015-12-03T08:20:11,714 INFO [qtp1380593659-23] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_wikipedia_2015-12-03T08:20:00.127Z]: LockListAction{}
2015-12-03T08:20:12,907 INFO [qtp1380593659-24] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_wikipedia_2015-12-03T08:20:00.127Z]: SegmentInsertAction
{segments=[DataSegment{size=9852, shardSpec=NoneShardSpec, metrics=[count, added, deleted, delta],
dimensions=[anonymous, city, continent, country, language, namespace, newPage, page, region, robot, unpatrolled, user],
version='2015-12-03T08:20:00.139Z', loadSpec={type=local, path=/tmp/druid/localStorage/wikipedia/2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z/2015-12-03T08:20:00.139Z/0/index.zip},
interval=2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z, dataSource='wikipedia', binaryVersion='9'}]}
2015-12-03T08:20:12,956 INFO [qtp1380593659-24] io.druid.metadata.IndexerSQLMetadataStorageCoordinator - Published segment [wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z] to DB

2015-12-03T08:20:13,501 INFO [pool-7-thread-1] io.druid.indexing.overlord.TaskQueue - Received SUCCESS status for task: index_wikipedia_2015-12-03T08:20:00.127Z
2015-12-03T08:20:13,502 INFO [pool-7-thread-1] io.druid.indexing.overlord.ForkingTaskRunner - Ignoring request to cancel unknown task: index_wikipedia_2015-12-03T08:20:00.127Z
2015-12-03T08:20:13,502 INFO [pool-7-thread-1] io.druid.indexing.overlord.TaskLockbox - Removing task[index_wikipedia_2015-12-03T08:20:00.127Z] from activeTasks
2015-12-03T08:20:13,502 INFO [pool-7-thread-1] io.druid.indexing.overlord.TaskLockbox - Removing task[index_wikipedia_2015-12-03T08:20:00.127Z] from TaskLock[index_wikipedia_2015-12-03T08:20:00.127Z]
2015-12-03T08:20:13,503 INFO [pool-7-thread-1] io.druid.indexing.overlord.TaskLockbox - TaskLock is now empty: TaskLock{groupId=index_wikipedia_2015-12-03T08:20:00.127Z, dataSource=wikipedia, interval=2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z, version=2015-12-03T08:20:00.139Z}
2015-12-03T08:20:13,504 INFO [pool-7-thread-1] io.druid.indexing.overlord.HeapMemoryTaskStorage - Updating task index_wikipedia_2015-12-03T08:20:00.127Z to status: TaskStatus{id=index_wikipedia_2015-12-03T08:20:00.127Z, status=SUCCESS, duration=1270}
2015-12-03T08:20:13,504 INFO [pool-7-thread-1] io.druid.indexing.overlord.TaskQueue - Task done: IndexTask{id=index_wikipedia_2015-12-03T08:20:00.127Z, type=index, dataSource=wikipedia}
2015-12-03T08:20:13,504 INFO [pool-7-thread-1] io.druid.indexing.overlord.TaskQueue - Task SUCCESS: IndexTask{id=index_wikipedia_2015-12-03T08:20:00.127Z, type=index, dataSource=wikipedia} (1270 run duration)

2.协调节点的日志: 当协调节点发现数据库新增了Segment,会分配Segment给历史节点.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2015-12-03T08:20:36,903 INFO [DatabaseSegmentManager-Exec--0] io.druid.metadata.SQLMetadataSegmentManager - Polled and found 1 segments in the database
2015-12-03T08:20:37,808 INFO [DatabaseRuleManager-Exec--0] io.druid.metadata.SQLMetadataRuleManager - Polled and found rules for 1 datasource(s)
2015-12-03T08:20:49,223 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader - Starting coordination. Getting available segments.
2015-12-03T08:20:49,239 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader - Found [1] available segments.
2015-12-03T08:20:49,253 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinator - Creating LoadQueuePeon for server[192.168.6.113:8083] at path[/druid/loadQueue/192.168.6.113:8083]

2015-12-03T08:20:49,269 INFO [Coordinator-Exec--0] io.druid.server.coordinator.ReplicationThrottler - [_default_tier]: Replicant create queue is empty.
2015-12-03T08:20:49,269 INFO [Coordinator-Exec--0] io.druid.server.coordinator.ReplicationThrottler - [_default_tier]: Replicant terminate queue is empty.
①2015-12-03T08:20:49,294 INFO [Coordinator-Exec--0] io.druid.server.coordinator.LoadQueuePeon - 👉Asking server peon[/druid/loadQueue/192.168.6.113:8083] to load segment[wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z]
2015-12-03T08:20:49,299 INFO [Coordinator-Exec--0] io.druid.server.coordinator.LoadQueuePeon - Server[/druid/loadQueue/192.168.6.113:8083] loading [wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z]
2015-12-03T08:20:49,303 INFO [Master-PeonExec--0] io.druid.server.coordinator.LoadQueuePeon - Server[/druid/loadQueue/192.168.6.113:8083] processing segment[wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z]
2015-12-03T08:20:49,304 WARN [Coordinator-Exec--0] io.druid.server.coordinator.rules.LoadRule - Not enough [_default_tier] servers or node capacity to assign segment[wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z]! Expected Replicants[2]
2015-12-03T08:20:49,304 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorBalancer - [_default_tier]: One or fewer servers found. Cannot balance.
2015-12-03T08:20:49,305 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - [_default_tier] : Assigned 1 segments among 1 servers
2015-12-03T08:20:49,305 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Load Queues:
2015-12-03T08:20:49,306 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Server[192.168.6.113:8083, historical, _default_tier] has 1 left to load, 0 left to drop, 9,852 bytes queued, 0 bytes served.
2015-12-03T08:20:49,898 INFO [main-EventThread] io.druid.server.coordinator.LoadQueuePeon - Server[/druid/loadQueue/192.168.6.113:8083] done processing [/druid/loadQueue/192.168.6.113:8083/wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z]

3.历史节点的日志: 历史节点根据协调节点的执行会从DeepStorge中下载并加载Segment,并通知Segment可用于查询.
由于下载任务是由协调节点注册成为ZK中的节点①(由协调节点同一分发任务), 所以历史节点同步完数据要把ZK的节点删除掉表示任务已经完成②.

1
2
3
4
5
6
7
8
9
10
11
12
2015-12-03T08:19:59,672 INFO [main] io.druid.server.coordination.AbstractDataSegmentAnnouncer - Announcing self[DruidServerMetadata{name='192.168.6.113:8083', host='192.168.6.113:8083', maxSize=10000000000, tier='_default_tier', type='historical', priority='0'}] at [/druid/announcements/192.168.6.113:8083]
2015-12-03T08:20:49,512 INFO [ZkCoordinator-0] io.druid.server.coordination.ZkCoordinator - New request[LOAD: wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z] with zNode[/druid/loadQueue/192.168.6.113:8083/wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z].
2015-12-03T08:20:49,513 INFO [ZkCoordinator-0] io.druid.server.coordination.ZkCoordinator - Loading segment wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z
2015-12-03T08:20:49,630 INFO [ZkCoordinator-0] io.druid.segment.loading.LocalDataSegmentPuller - Unzipped 9852 bytes from [/tmp/druid/localStorage/wikipedia/2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z/2015-12-03T08:20:00.139Z/0/index.zip] to [/tmp/druid/indexCache/wikipedia/2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z/2015-12-03T08:20:00.139Z/0]
2015-12-03T08:20:49,671 INFO [ZkCoordinator-0] org.skife.config.ConfigurationObjectFactory - Using method itself for [${base_path}.columnCache.sizeBytes] on [io.druid.query.DruidProcessingConfig#columnCacheSizeBytes()]
2015-12-03T08:20:49,671 INFO [ZkCoordinator-0] org.skife.config.ConfigurationObjectFactory - Assigning value [100000000] for [druid.processing.buffer.sizeBytes] on [io.druid.query.DruidProcessingConfig#intermediateComputeSizeBytes()]
2015-12-03T08:20:49,672 INFO [ZkCoordinator-0] org.skife.config.ConfigurationObjectFactory - Assigning value [1] for [druid.processing.numThreads] on [io.druid.query.DruidProcessingConfig#getNumThreads()]
2015-12-03T08:20:49,672 INFO [ZkCoordinator-0] org.skife.config.ConfigurationObjectFactory - Assigning default value [processing-%s] for [${base_path}.formatString] on [com.metamx.common.concurrent.ExecutorServiceConfig#getFormatString()]
2015-12-03T08:20:49,702 INFO [ZkCoordinator-0] io.druid.guice.JsonConfigurator - Loaded class[interface io.druid.segment.data.BitmapSerdeFactory] from props[druid.processing.bitmap.] as [ConciseBitmapSerdeFactory{}]
2015-12-03T08:20:49,871 INFO [ZkCoordinator-0] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z] at path[/druid/segments/192.168.6.113:8083/192.168.6.113:8083_historical__default_tier_2015-12-03T08:20:49.870Z_c03ee9d8695946888b162868e614bbf61]
2015-12-03T08:20:49,896 INFO [ZkCoordinator-0] io.druid.server.coordination.ZkCoordinator - Completed request [LOAD: wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z]
②2015-12-03T08:20:49,897 INFO [ZkCoordinator-0] io.druid.server.coordination.ZkCoordinator - 👉zNode[/druid/loadQueue/192.168.6.113:8083/wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2015-12-03T08:20:00.139Z] was removed

4.查看Index任务 http://localhost:8090/console.html

druid_lord

EOF.


文章目录
  1. 1. 参考文档
  2. 2. Data(meta)
  3. 3. Hello Druid
  4. 4. Druid Cluster(单机伪分布式)
    1. 4.1. 依赖组件和配置文件
    2. 4.2. 启动协调节点
    3. 4.3. 启动历史节点
    4. 4.4. 启动Broker
    5. 4.5. 启动实时节点
    6. 4.6. All Things Up
    7. 4.7. 客户端查询
  5. 5. Loading Stream Data
  6. 6. Loading Batch Data