Druid OLAP架构设计

Druid is a fast column-oriented distributed data store. http://druid.io/

Concepts

The Data

Druid将数据集分成三种类型: Timestamp column, Dimension columns(过滤数据), Metric columns(聚合和计算).

以Ad中的点击数据产生的事件为例,Druid会将publisher,advertiser,gender,country当做维度列,click和price当做指标列.

1
2
3
4
5
6
7
timestamp             publisher          advertiser  gender  country  click  price
2011-01-01T01:01:35Z bieberfever.com google.com Male USA 0 0.65
2011-01-01T01:03:63Z bieberfever.com google.com Male USA 0 0.62
2011-01-01T01:04:51Z bieberfever.com google.com Male USA 1 0.45
2011-01-01T01:00:00Z ultratrimfast.com google.com Female UK 0 0.87
2011-01-01T02:00:00Z ultratrimfast.com google.com Female UK 0 0.99
2011-01-01T02:00:00Z ultratrimfast.com google.com Female UK 1 1.53

Roll up

Druid读取数据的入口并不会直接存储原始数据, 而是使用Roll-up这种first-level聚合操作压缩原始数据:

1
2
3
4
5
timestamp             publisher          advertiser  gender country impressions clicks revenue
2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70
2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18
2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31
2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01

用SQL表示类似于对时间撮和所有维度列进行分组,并以原始的指标列做常用的聚合操作.

1
2
GROUP BY timestamp, publisher, advertiser, gender, country
:: impressions = COUNT(1), clicks = SUM(click), revenue = SUM(price)

为什么不存原始数据? 因为原始数据量可能非常大,对于广告的场景,一秒钟的点击数是以千万计数.
如果能够在读取数据的同时就进行一点聚合运算,就可以大大减少数据量的存储.
这种方式的缺点是不能查询单条事件,也就是你无法查到每条事件具体的click和price值了.
由于后面的查询都将以上面的查询为基础,所以Roll-up的结果一定要能满足查询的需求.通常count和sum就足够了.

The rollup granularity is the minimum granularity you will be able to explore data at and events are floored to this granularity.
因此Rollup的粒度是你能查询的数据的最小时间单位. 假设每隔1秒Rollup一次,后面的查询你最小只能以一秒为单位,不能查询一毫秒的事件.默认的粒度单位是ms.

思考这种存储方式的适用场景? 1).统计过去一小时或一天所有维度的统计数据. 2).根据某一个维度分别统计过去一小时,过去一天的数据.
第一种查询只需要读取出时间段内的Segment,进行聚合操作. 而第二种操作在读取出时间段内Segment后,还要进行过滤后再聚合(过滤不是很好吗).
但是对于同一个维度的数据,是分散在不同的Segment中的. 与之相反,Cassandra中相同的分区键的数据都是在同一个SSTable里.

Sharding the Data

Druid的分片是Segment文件. Druid首先总是以时间撮进行分片, 因为事件数据总是有时间撮. 假设以小时为粒度创建下面的两个Segment文件.

在几乎所有的NoSQL中都有数据分片的概念,比如ES的分片,Cassadnra的SSTable,HBase的Region,都表示的是数据的存储介质.
为什么要进行分片,因为数据大了,不能都存成一个大文件吧,所以要拆分成小文件以便于快速查询. 伴随拆分通常都有合并小文件.

Segment sampleData_2011-01-01T01:00:00:00Z_2011-01-01T02:00:00:00Z_v1_0 contains

1
2
2011-01-01T01:00:00Z  ultratrimfast.com  google.com  Male   USA     1800        25     15.70
2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18

Segment sampleData_2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z_v1_0 contains

1
2
2011-01-01T02:00:00Z  ultratrimfast.com  google.com  Male   UK      1953        17     17.31
2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01

从Segment文件的名称可以看出它包含的数据一定是在文件名称对应的起始和结束时间间隔之内的.

Segment文件名称的格式:dataSource_interval_version_partitionNumber.最后一个分区号是当同一个时间撮下数据量超过阈值要分成多个分区了.

分片和分区都表示将数据进行切分. 分片是将不同时间撮分布在不同的文件中, 而分区是相同时间撮放不下了,分成多个分区.
巧合的是Kafka中也有Segment和Partition的概念.Kafka的Partition是topic物理上的分组,一个topic可以分为多个partition.
它的partition物理上由多个segment组成.即Partition包含Segment,而Druid是Segment包含Partition.

Design Overview

The Druid Cluster

历史节点:是Druid集群的支柱,它会从DeepStorage下载不可变的Segments文件到本地,并加载Segment服务于查询请求.
历史节点是ShareNothing的架构(当掉任何一个节点都没有关系),它会负责加载Segments,删除Segments,并在Segment上查询数据.

历史节点负责存储数据,并且查询历史的数据.历史节点会从DeepStorage下载Segments,对Broker的查询进行响应,返回结果给Broker.
历史节点会写入ZooKeeper通知自己(的节点信息)以及自己服务了/保存了哪些Segments.(正如HDFS的DN上保存了哪些Block会向NN通知)
历史节点还会从ZooKeeper中得到任务:加载新的Segments(从实时节点转存过来的)或者删除自己已经保存的旧的Segments.

协调节点:管理历史节点上的Segments.协调节点告诉历史节点加载新的Segments,删除旧的Segments,或者移动Segments进行负载均衡.

协调节点监测历史节点的分组,确保数据是可用的,副本充足的. 它会
1) 从元数据存储中读取Segment的元数据,并决定哪些Segments需要被加载到集群中.
2) 使用ZooKeeper查看已经存在的历史节点都有哪些.
3) 创建一个ZK的条目告诉历史节点加载加载或删除新的Segments.

实时节点:实时处理可以使用标准的实时节点或者IndexService(两者的逻辑处理是一样的). 实时处理会读取数据,索引数据,创建Segments.
并且将Segments转存到历史节点. 数据一旦被实时节点接收就可用于查询. 在实时节点转存到历史节点这段时间内,查询仍然是可用的.
只有历史节点通知说已经成功转存,可以用于查询时, 实时节点之前保存的Segments才会被删除. 这份Segment数据现在存在于历史节点中了.

实时节点负责监听输入流数据并使得数据在Druid系统中可以立即被使用. 实时节点响应Broker的查询请求并返回结果给Broker节点.
过期的数据会被推送/转存到DeepStorage, 并通知ZooKeeper. 历史节点会接收到这个通知发现有新的Segments需要被加载/或被删除.

Broker节点:负责将查询请求分发到历史节点和实时节点,并聚合这些节点的结果数据.Broker节点知道Segment都存放在哪些节点上.

Broker如何知道Segment的存放路径? 首先Broker查询实时节点的Segment,这些Segment都在实时节点的内存中,并不会注册到ZK中.
只有历史的数据(非实时的数据)在转存到历史节点之后, 历史节点就会通知ZK说自己已经成功地加载了某一个Segment,
这样Broker只需要查询ZK就知道Segment都属于哪些节点,从而将请求转发到对应的历史节点,由历史节点去查询需要的数据.
Update: 上面以为实时节点不会写入Segment到ZK是错误的, 实际上实时节点和历史节点都会写Segment到ZK中!!!

druid_flow1

上图的查询路径为红色箭头:①客户端向Broker发起请求,Broker会将请求路由到②实时节点和③历史节点.
黑色的箭头是数据在Druid中怎么流转:数据源包括实时流和批量数据. ④实时流经过索引直接写到实时节点.
⑤批量数据通过IndexService存储到DeepStorage,⑥再由历史节点加载. ⑦实时节点也可以将数据转存到DeepStorage.

Druid的集群依赖了ZooKeeper来维护数据拓扑. 每个组件都会与ZooKeeper交互:
1) 实时节点在转存Segment到DeepStorage, 会写入自己转存了什么Segment
2) 协调节点管理历史节点,它负责从ZooKeeper中获取要同步/下载的Segment,并指派任务给具体的历史节点去完成
3) 历史节点从ZooKeeper中领取任务,任务完成后要将ZooKeeper条目删除表示完成了任务
4) Broker节点根据ZooKeeper中的Segment所在的历史节点, 将查询请求路由到指定的历史节点(实时节点会写到ZK中吗?)

Druid依赖了三个外部组件: ZooKeeper, Metadata Storage, Deep Storage.

当新创建一个Segments会写入一条数据到元数据存储中(MySQL), 协调节点会监测元数据存储得知哪些数据需要被加载或者旧数据需要被删除.
创建Segment的服务(实时节点转存或者IndexService)会将Segment上传到DeepStorage,历史节点会从DeepStorage下载Segments并加载它.
对于一个查询路由路径,Broker只会将请求分发到实时节点和历史节点, 因此元数据存储和DeepStorage都不会参与查询中(看做是后台的进程).

MetaStore和ZooKeeper中保存的信息是不一样的. ZooKeeper中保存的是Segment属于哪些节点. 而MetaStore则是保存Segment的元数据等.
为了使得一个Segment存在于集群中,MetaStore存储的记录是关于Segment的自描述元数据: Segment的元数据,大小,所在的DeepStorage.
元数据存储的数据会被协调节点用来知道集群中可用的数据应该有哪些(所在的Segment可以通过实时节点转存或者批量数据直接写入).

druid-dataflow-2

Fault Tolerance

1.历史节点挂掉,就不会服务这个节点上的Segments. 但是只要这些Segments仍然存在于DeepStorage,其他节点就会下载它们并服务这些Segments.
这意味着1)实际上可以从集群中移除所有的历史节点,并且重新发布它们,也不会有任何的数据损失(因为数据最终都保存到DeepStorage中).
2)如果DeepStorage不可用,历史节点上已经加载了DeepStorage的Segments,仍然可用于查询. 但是新进来的数据无法进入到集群中(DS挂掉).

2.协调节点挂掉,数据的拓扑(data topology)停止服务,就不会有新的数据以及数据的负载均衡. 因为协调节点会通知历史节点下载新数据.
如果实时节点将Segment转存到DeepStorage,而没有历史节点去下载这些数据,会导致实时节点最终会丢弃这份过期的数据.虽然因为历史节点没有
加载Segment,这份数据会在实时节点中保留一段时间,但是最终还是会丢弃的,因为还有新的实时数据要存,否则都保存的话实时节点内存不撑爆了.

3.Broker节点负责分发查询请求,可以有多个节点. 一个Broker挂掉没有关系,还会有其他的Broker接管请求,但是要至少保证有一个Broker.
当然如果不向Broker发送请求,而只关心最新的实时数据,可以直接访问实时节点. 不过这种情况是很少见的.

4.实时节点根据发送流的语义,可以有多个实时节点同时运行,处理同一个输入流. 每个实时节点分担输入流的一部分数据.

5.元数据存储挂掉,协调节点就无法找到集群中新的Segments(因为新的Segment一定会写入记录到元数据存储中).但仍然可以提供当前集群的数据视图.

6.ZooKeeper挂掉,数据拓扑不会被更新(同协调节点挂掉),但是Broker仍然可以维护最近的数据拓扑,并继续提供查询的服务.

Architecture

正是由于各个节点和其他节点都是最小化解耦的, 所以下面两张图分别表示实时节点和批量数据的流程:

druid_real

图一: 数据从Kafka导入到实时节点, 客户端直接查询实时节点的数据.

druid_index

图二: IndexService(实时流和批量处理都会索引尝试Segment文件),最终用于查询

druid_arch2

图三: 论文http://static.druid.io/docs/druid.pdf中的图例分别表示实时流/批量数据和查询流的交互图

  • ① 实时数据写入到实时节点,会创建索引结构的Segment
  • ② 实时节点的Segment经过一段时间会转存到DeepStorage
  • ③ 元数据写入MySQL; 实时节点转存的Segment会在ZooKeeper中新增一条记录
  • ④ 协调节点从MySQL获取元数据,比如schema信息(维度列和指标列)
  • ⑤ 协调节点监测ZK中有新分配/要删除的Segment,写入ZooKeeper信息:历史节点需要加载/删除Segment
  • ⑥ 历史节点监测ZK, 从ZooKeeper中得到要执行任务的Segment
  • ⑦ 历史节点从DeepStorage下载Segment并加载到内存/或者将已经保存的Segment删除掉
  • ⑧ 历史节点的Segment可以用于Broker的查询路由

Segments

数据进入到Druid首先会进行索引, 这给予了Druid一个机会可以进行分析数据,添加索引结构,压缩,为查询优化调整存储结构.

  • 转换为列式结构
  • 使用BitMap索引
  • 使用不同的压缩算法

索引的结果是生成Segment文件,Segment中除了保存不同的维度和指标,还保存了这些列的索引信息.

Druid将索引数据保存到Segment文件中,Segment文件根据时间进行分片. 最基本的设置中, 每一个时间间隔都会创建一个Segment文件.
这个时间间隔的长度配置在granularitySpec的segmentGranularity参数.为了Druid工作良好,通常Segment文件大小为300-700M.

前面Roll-up时也有一个时间粒度:queryGranularity指的是在读取时就进行聚合.segmentGranularity则是用于分片进来之后的数据.

Sharding Data to Create Segments

如果指定的列有很多行不同的列值,并且总的行数超过配置的阈值,为了存储这一个相同数据源相同时间段的所有数据,就需要创建多个Segments.
每个Segment都会有一个partition number来标识. 这种策略叫做sharding分片, 即将一份大的数据分成很多份小份的.

那么问题来了: 对于相同的列值, 会不会被分到多个分片里去呢?

什么情况会出现这种分片策略: high cardinality dimensions
因为维度的基数很高,表示值很少有重复的, 即大部分数据都是唯一的. 只有小部分数据是重复.

SQL的distinct count经常会遇到cardinality这个词. 对于去重就有hyperloglog这种基数统计方法来快速求值.

DataStructure

Segment文件的内部结构可以看做是列式存储. 每一列的数据都是以不同的数据结果存储. 通过列式存储,查询时只查询需要的列可以减少延迟.
有三种类型的数据结构: timestamp列,维度列,指标列. 时间撮和指标在底层都是int数组或long数组. 指标值是int或long,而时间撮为long.

druid_columns

为什么是数组,因为列式存储,要保存的是某一列的所有行. 所以数组的每一个元素表示的是每一行的这一列的值. 列式存储还有一个好处是压缩.

当查询确定要查询哪些行之后, 首先解压缩(因为存储的时候会压缩),然后直接取出相关的行(定位到数组的指定位置),并运用聚合算子计算指标.

维度列因为要支持过滤和分组,每一个维度列的数据结构包含了三部分:

  • 1.A dictionary that maps values (which are always treated as strings) to integer IDs, 值到ID的Map映射
  • 2.A list of the column’s values, encoded using the dictionary in 1, and 列的值列表, 存储的是上一步对应的ID
  • 3.For each distinct value in the column, a bitmap that indicates which rows contain that value. 倒排索引

为什么要以这种结果来存储? 在第一步中将值映射到ID,这样第二步和第三步中就可以以更紧凑地格式来表示.
第三步中的BitMap允许快速过滤并且BitMap对于AND和OR的支持不仅操作快速而且也很方便. 第二步中的列表值对于分组和TopN是必须的.
实际上如果仅仅是对指标基于过滤器之上进行聚合操作, 根本不会触碰到第二步中的数据.

为什么过滤操作不会去查询第二步的数据,或者说并不会真正去查询一条记录,比如查询每条记录,然后判断是否满足过滤条件,并不会发生这样的操作!
因为每个Segment内部都有索引结构(即BitMap),这就允许历史节点可以直接找出哪些行匹配了过滤条件,而不需要任何一行的数据.
基于多个过滤器条件还可以在BitMap上完成任何的布尔逻辑运算,这些操作都不需要查询原始数据记录(never look directory at a row of data)

以上面示例中的page列为例, 代表这一维度列的数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1: Dictionary that encodes column values
{
"Justin Bieber": 0,
"Ke$ha": 1
}

2: Column data
[0,
0,
1,
1]

3: Bitmaps - one for each unique value of the column
value="Justin Bieber": [1,1,0,0]
value="Ke$ha": [0,0,1,1]

注意: 在最坏情况下前面两种会随着数据量的大小而线性增长. 而BitMap的大小则等于数据量大小 * 列的个数.

现在来解释下上面的数据结构是如何形成的:

1.字典表的key都是唯一的, 所以Map的key是unique的column value, Map的value从0开始不断增加.
因为示例数据的page列只有两个不同的值. 所以为比伯编号0, Ke$ha编号为1.

1
2
3
4
Key            |Value
---------------|-----
Justin Bieber |0
Ke$ha |1

2.列的数据, 要保存的是每一行中这一列的值, 值是ID而不是原始的值. 因为有了上面的Map字典, 所以有下面的对应关系:

1
2
3
4
5
rowNum  page                ID
1 Justin Bieber 0
2 Justin Bieber 0
3 Ke$ha 1
4 Ke$ha 1

这样列的值列表直接取最后一列: [0,0,1,1]

3.倒排BitMap. 一般在搜索引擎中的倒排是从doc->termsterm->doc的转换.

1
2
3
4
doc     terms                   term        doc
1 Hello,World hello 1,2
2 Hello,druid ==> World 1,3
3 Druid World druid 2,3

而BitMap跟搜索引擎中的倒排还是有点区别的. 它的value也是一个数组,数组的个数等于这一列的行数(和第二步一样).
BitMap的key是第一步Map的key(列的原始值). value数组的每个元素表示指定列的某一行是否包含/存在/等于当前key.
以key=Justin Bieber, 前面两行的列原始值都等于Justin Bieber, 所以值为1, 而后面两行的列值不等,所以为0.

注意: BitMap保存的value数组只有两个值: 1和0, 1表示这一行包含或等于BitMap的key, 0表示不存在/不包含/不等于.
value数组的值跟第一步字典表的ID没有任何关系, 否则Justin Bieber的编号为0, 为什么BitMap的前两个都是1呢!

因为Map字典的key是unique的并且一列只有一个值, 所以对于这一列的每一行数据, 在BitMap中都只有一个非零的条目.
for each row in ‘column data’, there will only be a single bitmap that has non-zero entry

什么意思呢? 第一行记录的page值是Justin Bieber,对应BitMap中的value数组第一个元素=1表示第一行的page列值等于JB.
对于其他value的数组的第一行记录都不能是1了. 否则如果Ke$ha的第一行值也等于1,表示第一行的page值是Ke$ha,就矛盾了.

1
2
3
4
5
6
7
8
                        第一行的page列值为Justin Bieber/列值为Justin Bieber的在第一行里
^
|
value="Justin Bieber": [1,1,0,0]
value="Ke$ha": [0,0,1,1]
^
|
第一行的page列值不是Ke$ha

其实就是给出所有的unique列值,对每一个唯一的列值,循环所有的行,看看是不是等于当前列值,是的话打1标签,不是打0标签.

1
2
3
4
5
6
7
                            BitMap raw value
---------------------
rowNum page Justin Bieber Ke$ha
1 Justin Bieber 1 0
2 Justin Bieber --> 1 0
3 Ke$ha 0 1
4 Ke$ha 0 1

这种存储方式, 如果unique重复的列很少,比如page列的每一个值都是不同的. BitMap就会是一个稀疏矩阵.

想象一下value数组只有一个元素为1,其他元素都是0. 比如下面的矩阵就是一个稀疏矩阵
A: [1,0,0,0,0,0,0,0,0,0,0]
B: [0,1,0,0,0,0,0,0,0,0,0]
C: [0,0,1,0,0,0,0,0,0,0,0]
D: [0,0,0,1,0,0,0,0,0,0,0]
E: [0,0,0,0,1,0,0,0,0,0,0]

unique的重复数量很少也叫做high cardinality,表示基数很高,不同列的数量很多,列值相同的记录数很少.
稀疏矩阵对于BitMap而言却是有优点的,因为越是稀疏,它可以被压缩的比例越大,最后存储的空间越少(相对原始数据).

当然如果一个列可以包含多个值. 数据结构就会稍微有点不同. 假设第二行的page列值为Justin Bieber和Ke$ha两个人:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1: Dictionary that encodes column values
{
"Justin Bieber": 0,
"Ke$ha": 1
}

2: Column data
[0,
[0,1], <--Row value of multi-value column can have array of values
1,
1]

3: Bitmaps - one for each unique value
value="Justin Bieber": [1,1,0,0]
value="Ke$ha": [0,1,1,1]
^
|
Multi-value column has multiple non-zero entries

If a row has more than one value for a column, its entry in the ‘column data’ is an array of values.
Additionally, a row with n values in ‘column data’ will have n non-zero valued entries in bitmaps.

因为第二行的page列有两个值, 所以column data数组的第二个元素也是一个数组, 包含了这两个元素的在字典中的ID.
在BitMap中, 现在第二行数据(箭头指向的)因为page列有两个值, 所以就会有两个非0的元素,即有两个1.

上面只是针对page列的BitMap, 对于其他的维度列, 都有自己的BitMap! 即每一个维度列都有一个BitMap.

Deep into Druid

Prepare Work

Druid使用guice进行对象依赖注入.

Segment

Segment的数据结构代码在processing子目录的segment包下.

Real-time Node

实时节点提供了实时的索引.数据在这些实时节点上被索引后可以立即被查询使用.
实时节点会隔段时间构建Segments,代表在这段之间之内收集到的数据.
创建完Segments后会将他们转存到历史节点.它们使用ZK来监听传输任务,使用元数据存储保存要传输的Segment的元数据.
一旦传输完毕,原先在实时节点的segments就会被忘记/即被清除掉,腾出的空间用来接受新的实时流数据.

Historical Node

每个历史节点维护和ZooKeeper的持续连接,新到来的Segment信息会写到ZK的节点中,历史节点会监听这个节点的配置.
协调节点会负责分配新的Segment给历史节点: 在ZK中为分配到任务的历史节点的load queue path创建一个临时节点.
历史节点之间没有相互联系,也不会和协调节点直接联系,而是通过ZK来完成任务的分发和任务的领取.

druid_historical

Coordinator Node

1.协调节点主要负责Segment的管理和分布. 更具体地来说协调节点通知历史节点加载或删除Segment,都是基于配置完成的(ZooKeeper).
2.协调节点负责加载新的Segments,删除过期的Segments,管理Segment的副本分布,对Segment进行负载均衡(都是发送命令让历史节点去工作).
3.协调节点维护到ZooKeeper的连接以便获取当前的Druid集群信息(比如它都管理了哪些历史节点,当前集群还有什么任务需要运行,有哪些segments).
4.协调节点还连接MetaStore获取当前可用的Segments(保存在segment表,存储集群应该要加载的segments)和规则(segments文件如何处理).

协调节点类似于Master,Master要负责管理,并指派任务给Worker执行.

协调节点对Segment的负载均衡(如果某个节点数据量太大不应该分配任务,而把任务分配给数据量小的节点)是如何实现的.
当新创建了一个Segment,协调节点还没有分配给历史节点时,它要选择一个适合的历史节点,而不是随便选择一个历史节点.
协调节点会将系统中可用的历史节点列表首先按照容量排序,最小容量的服务器将获得最高的优先级(不会让穷人越穷富人越富)

注意到: MetaStore中保存了集群应该加载的Segments[4], 而协调节点本身也有集群的当前Segments信息[3].
在协调节点每次运行的时候会比较这两者数据, 当发现Segment不在元数据存储,但仍然在集群中被服务. 那么这个Segment就会添加到删除列表.
因为元数据存储中保存的才是集群应该加载的Segments,如果不在里面,说明这个Segment被更新过.(集群中服务的Segment会比元数据存储要延迟)

当历史节点重启或不可用时,协调节点会注意到节点丢失并认为这个节点服务的所有Segments都是dropped.
如果给予一段充足的时间,这些segments也许会被重新分配给集群中的其他历史节点. 然后被丢弃的每一个Segment并不会被立即遗忘掉.
实际上存在一种transitional的数据结构用来存储所有丢弃的segments,并且关联了一个使用期.
这个使用期表示一段时间内协调节点不会重新分配丢弃的segments. 因此如果一个历史节点下线后在很短的时间内又上线了,
历史节点会直接从它(历史节点还是transitional数据结构?)的缓存中服务这些(刚刚被丢弃)segments(这些segments都没有在跨越集群重新分配).

通常在ShareNothing的架构中,如果一个节点变得不可用了,会有一个服务将下线的这个节点的数据搬迁到其他节点.
但是如果这个节点下线后又立即重启,而如果服务在一下线的时候就开始搬迁数据,是会产生跨集群的数据传输,实际上是没有必要的.
因为分布式文件系统对同一份数据会有多个副本,搬迁数据实际上是为了满足副本数.而下线又重启的节点上的数据不会有什么丢失的.
因此短期的副本不足并不会影响整体的数据健康状况.何况跨机器搬迁数据也需要一定的时间,何不如给定一段时间如果它真的死了,才开始搬迁.

Broker

在Broker启动的时候, 实时节点会将自己,以及它自己服务了哪些Segments的信息都发布到ZK中.
Broker可以从发布到ZK的元数据了解到哪些Segments存在于哪个节点上,并且会路由查询请求,命中到正确的节点.

一般的Druid查询会指定包含的时间间隔,表示要请求的数据是在这个时间段之内的. 与此同时,Druid的Segments也是以指定的时间间隔粒度分区存储的.
并且这些Segments分布在集群的各个节点. 如果查询的时间跨度比存储的时间粒度要大,则查询会命中不止一个Segment,因此会命中多个节点.

为了决定请求分发到哪些节点上,Broker会首先构建一个ZooKeeper的视图信息.在ZK中维护了历史节点和实时节点以及他们服务的Segments.
对于ZK中的每个数据源,Broker会构建Segment的时间线,以及服务这些Segment的节点. 当查询指定数据库和时间段,Broker会寻找时间线中
符合数据源和时间段的Segment,并获取包含这次查询数据的节点, 最后Broker会将查询分发到这些节点上去.

代码路径: CliBroker-BrokerServerView

Broker节点对于缓存使用了LRU失效策略. Broker的缓存会保存每个Segment的结果. 缓存可以是本地的(针对Broker节点)或者分布式比如MemCached.
Broker接收到每一次查询,首先将本次查询映射到一系列的Segments. 这些Segments集合的子集可能已经存在于缓存中,它们的结果可以直接从缓存中获取.
对于没有存在于缓存中的Segments结果,Broker节点会将查询分发到历史节点.当历史节点返回结果,Broker就会结果也缓存到Cache中.
注意实时节点的Segments永远不会被缓存,所以实时数据的查询总是会被分发到实时节点.因为实时数据总是在变化,缓存实时数据是不可靠的.

代码路径: CliBroker-CacheModule-CacheProvider-LocalCacheProvider-MapCache(ByteCountingLRUMap)
具体如何使用Cache在CachingClusteredClient

More

LinkedIn Pinot初探: http://blog.talkingdata.net/?p=3169
Kafka文件存储机制那些事: http://tech.meituan.com/kafka-fs-design-theory.html


文章目录
  1. 1. Concepts
    1. 1.1. The Data
    2. 1.2. Roll up
    3. 1.3. Sharding the Data
  2. 2. Design Overview
    1. 2.1. The Druid Cluster
    2. 2.2. Fault Tolerance
    3. 2.3. Architecture
  3. 3. Segments
    1. 3.1. Sharding Data to Create Segments
    2. 3.2. DataStructure
  4. 4. Deep into Druid
    1. 4.1. Prepare Work
    2. 4.2. Segment
    3. 4.3. Real-time Node
    4. 4.4. Historical Node
    5. 4.5. Coordinator Node
    6. 4.6. Broker
  5. 5. More