Druid源码解析(1) Guice和Realtime流程

Druid is a fast column-oriented distributed data store. http://druid.io/

当启动Druid的服务,会启动一个java进程,比如run_example_server.sh会启动io.druid.cli.Main example realtime.

Guice Inject

Main的buidler类包含了多种服务组, 比如server服务包括了Druid的大部分组件: 协调,历史,Broker,实时,Overlord等.

injectMembers和toInstance注入实例化好的对象

1
2
3
4
5
final Injector injector = GuiceInjectors.makeStartupInjector();
final Cli<Runnable> cli = builder.build();
final Runnable command = cli.parse(args);
injector.injectMembers(command); //command已经是实例化好的线程类,直接注入
command.run();

Guice是个DI框架.客户端使用对象的流程是: 创建Injector,从Injector中获取实例,调用实例的方法. 客户端解析出来的命令是一个Runnable.
CliRealtime继承了ServerRunnable(又继承了GuiceRunnable). 在makeInjector调用的Initialization初始化会添加很多Module.

guice_client

CliRealtime的getModules()主要是RealtimeModule. 每个节点都要注册自己职责范围内的Modules.
ReailtimeModule绑定了SegmentPublisher,ChatHandlerProvider,RealtimeManager等.

1
2
3
4
5
6
7
8
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(new TypeLiteral<List<FireDepartment>>(){}).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class); //①

JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());

binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime"));

toInstance也绑定的也是一个实例化对象,而没有接口. 比如NodeTypeConfig并不是一个接口,而是一个正常的类.
https://github.com/google/guice/wiki/Injections#on-demand-injection

Provider的get方法返回值绑定实现类

重要的是RealtimeManager,它的构造函数有三个List,QueryRunnerFactoryConglomerate. 最后一个参数chiefs直接在构造函数中初始化.
前面两个需要通过@Inject注入. 其中①List是泛型类,所以通过上面的TypeLiteral使用FireDepartmentsProvider注入.

1
2
3
4
5
6
7
8
9
10
11
12
public class RealtimeManager implements QuerySegmentWalker {
private final List<FireDepartment> fireDepartments; //①
private final QueryRunnerFactoryConglomerate conglomerate; //②
private final Map<String, List<FireChief>> chiefs; //key=data source name,value=FireChiefs of all partition of that data source

@Inject
public RealtimeManager(List<FireDepartment> fireDepartments, QueryRunnerFactoryConglomerate conglomerate) {
this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate;
this.chiefs = Maps.newHashMap();
}
}

对象的注入使用Provider:FireDepartmentsProvider,Provider的get方法返回值会作为List的实现类.
而FireDepartmentsProvider的构造方法需要注入 ObjectMapper 和 RealtimeManagerConfig.其中RealtimeManagerConfig在bind Provider前已经注入.

1
2
3
4
public class FireDepartmentsProvider implements Provider<List<FireDepartment>>{
private final List<FireDepartment> fireDepartments = Lists.newArrayList();
public List<FireDepartment> get() { return fireDepartments; }
}

注解和Provides绑定实现类

ObjectMapper是jackson的内部类,druid的实现类是DefaultObjectMapper. 绑定ObjectMapper也是在初始化的JacksonModule中.
这里的to使用了注解方式, 因为注解的类型是Json, 所以对应的是jsonMapper())创建的DefaultObjectMapper(这里是一个Provides方法,类似于Provider).

1
2
3
4
5
6
7
8
9
10
public class JacksonModule implements Module{
public void configure(Binder binder){
binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class));
}

@Provides @LazySingleton @Json
public ObjectMapper jsonMapper() {
return new DefaultObjectMapper();
}
}

JSON Property

前面JsonConfigProvider绑定的druid.realtime,使用RealtimeManagerConfig,而它只有一个属性@JsonProperty private File specFile
在FireDepartmentsProvider的构造方法中会使用DefaultObjectMapper读取启动进程时druid.realtime.specFile指定的json文件.

FireDepartment的三个属性字段dataSchema,ioConfig,tuningConfig正好对应了specFile中的json属性. 所以整个流程是:
指定specFile文件,创建DefaultObjectMapper(JacksonModule),DefaultObjectMapper读取JSON文件,构造FireDepartmentsProvider,返回List

1
2
3
4
5
6
@JsonCreator
public FireDepartment(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") RealtimeIOConfig ioConfig,
@JsonProperty("tuningConfig") RealtimeTuningConfig tuningConfig
)

别的Module注入同样可用

我们并没有看到②QueryRunnerFactoryConglomerate在这里被注入. 怎么办呢? 进入该接口,查看它比较重要的实现类DefaultQueryRunnerFactoryConglomerate.
然后CMD+单机查看它的Usages,只有StorageNodeModule的configure方法,它也是一个Module,被Usage的方法恰好在Initialization初始化的时候.

1
2
3
4
5
6
7
8
9
10
11
12
public class StorageNodeModule implements Module{
public void configure(Binder binder) {
JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);

binder.bind(NodeTypeConfig.class).toProvider(Providers.<NodeTypeConfig>of(null));
binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class);
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);

binder.bind(QueryRunnerFactoryConglomerate.class).to(DefaultQueryRunnerFactoryConglomerate.class).in(LazySingleton.class);
}
}

对于DefaultQueryRunnerFactoryConglomerate的构造函数也需要注入:Map<Class<? extends Query>, QueryRunnerFactory> factories
同样使用Usage进入QueryRunnerFactory,进入其中一个实现类TimeBoundaryQueryRunnerFactory,在进入其Usage是QueryRunnerFactoryModule
可以看到只要是接口要绑定到某个实现类上, 最后一定是使用Guice的Module来完成的.

MapBinder注入Map

TimeBoundaryQueryRunnerFactory的构造函数也依赖了QueryWatcher,正好也在QueryRunnerFactoryModule一并解决了:
其中mappings定义了Druid支持的各种查询类,对应的查询工厂类. MapBinder是Guice中一种支持Map对象的注入(也用到了TypeLiteral).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class QueryRunnerFactoryModule extends QueryToolChestModule {
private static final Map<Class<? extends Query>, Class<? extends QueryRunnerFactory>> mappings =
ImmutableMap.<Class<? extends Query>, Class<? extends QueryRunnerFactory>>builder()
.put(TimeseriesQuery.class, TimeseriesQueryRunnerFactory.class)
.put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class)
.....build();

public void configure(Binder binder) {
super.configure(binder);
binder.bind(QueryWatcher.class).to(QueryManager.class).in(LazySingleton.class);
binder.bind(QueryManager.class).in(LazySingleton.class);

final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder(binder);
for (Map.Entry<Class<? extends Query>, Class<? extends QueryRunnerFactory>> entry : mappings.entrySet()) {
queryFactoryBinder.addBinding(entry.getKey()).to(entry.getValue()); //注入Map的key和value,对应factories
binder.bind(entry.getValue()).in(LazySingleton.class); //最好还要注入一下value
}
binder.bind(GroupByQueryEngine.class).in(LazySingleton.class);
}
}

QueryRunnerFactory具体实现类中是查询的具体实现,这里有XXXQuery,XXXQueryRunner,XXXQueryToolChest,XXXResultValue等.
run_example_client.sh为例,它的查询类型是timeBoundary,对应TimeBoundaryQuery.

RealtimeManager

RealtimeManager构造函数需要的List和QueryRunnerFactoryConglomerate都注入之后,在start方法就可以开工了.
fireDepartments的每个FireDepartment会被构造成FireChief,FireDepartment的DataSchema的DataSource都对应了一个FireChief.
FireChief包括FireDepartment(数据源),Firehose(怎么读取,迭代器,Source),Plumber(Sink).
FireChief线程会initPlumber初始化Plumber, 由Plumber启动作业, initFirehose初始化Firehose连接数据源,最后runFirehose读取数据.

1
2
3
4
5
6
7
public void run() {
plumber = initPlumber(); //fireDepartment.findPlumber() 先找到水管(水龙头)
plumber.startJob(); //准备工作,接上管子
firehose = initFirehose(); //fireDepartment.connect() (向消防局申请一条)消防带,给消防带接上水龙头
runFirehose(firehose); //开始消防工作,水会从源头数据源不断流出来
plumber.finishJob(); //完成工作,卸掉消防带,关闭水龙头
}

fire_flow

Firehose

Firehose消防带连接的是水源,当数据不断注入数据源(比如Kafka),则从消防水管会源源不断喷射出水流,喷射出来的就是InputRow.Firehose类似一个迭代器.

1
2
3
4
5
6
7
8
9
10
private void runFirehose(Firehose firehose) {
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
lateEvent = plumber.add(inputRow, committerSupplier) == -1;
if (indexLimitExceeded || lateEvent) {
plumber.persist(committerSupplier.get());
}
}
}

Firehose通过spec文件的ioConfig的firehose属性①,获取到FirehoseFactory后,根据dataSchema的parser②得到firehoseParser,从而创建Firehose.
为什么Firehose需要dataSchema,因为输出的数据依赖于输入数据的格式,parser用来如何解析输入源数据.parseSpec会指定输入数据的格式,时间撮和维度字段.

1
2
3
public Firehose connect() throws IOException {
return ioConfig.getFirehoseFactory().connect(dataSchema.getParser());
}

以Kafka数据源为例,①firehose的type得到KafkaEightFirehoseFactory. 有三个属性:consumerProps,feed和FirehoseFactory中的type.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
"ioConfig" : {
"type" : "realtime",
"firehose": { //①
"type": "kafka-0.8", //对应KafkaEightFirehoseFactory
"consumerProps": {
"zookeeper.connect": "localhost:2181",
...
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},

DataSchema

DataSchema有四个json属性,它的构造函数参数ObjectMapper是依赖注入进来的. dataSchema.getParser()获得InputRowParser.
parser里面又配置了多个属性,所以在读取spec文件的时候,会将parser的JSON信息转换为Map.

1
2
3
4
5
6
7
8
@JsonCreator
public DataSchema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("parser") Map<String, Object> parser,
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JacksonInject ObjectMapper jsonMapper
)

下面是wikipedia的DataSchema spec文件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : { //②
"type" : "string", //对应StringInputRowParser
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
}
}

如何从JSON转换而来的Map得到InputRowParser,因为格式是固定的,所以在获取到parser后,分别获取timestampSpec和dimensionsSpec. spec是说明书的意思,按照说明书吃药,没错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public InputRowParser getParser(){
final InputRowParser inputRowParser = jsonMapper.convertValue(this.parser, InputRowParser.class);
final DimensionsSpec dimensionsSpec = inputRowParser.getParseSpec().getDimensionsSpec();
final TimestampSpec timestampSpec = inputRowParser.getParseSpec().getTimestampSpec();
return inputRowParser.withParseSpec( //进入parseSpec
inputRowParser.getParseSpec() //获取parseSpec
.withDimensionsSpec( //进入dimensionsSpec
dimensionsSpec //获取dimensionsSpec
.withDimensionExclusions( //过滤dimensionExclusions
Sets.difference(dimensionExclusions, dimSet)
)
)
);
}

KafkaEightFirehoseFactory

KafkaEightFirehoseFactory的connect方法会返回匿名的Firehose对象,它的nextRow方法会根据parser解析kafka的输入数据.
读取Kafka数据使用配置的consumerProps和feed,即可确定要连接的zk和topic. 然后创建一个消费者,数据保存在ConsumerIterator中.

1
2
3
4
5
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(ImmutableMap.of(feed,1));
final List<KafkaStream<byte[], byte[]>> streamList = streams.get(feed);
final KafkaStream<byte[], byte[]> stream = streamList.get(0);
final ConsumerIterator<byte[], byte[]> iter = stream.iterator();

stream就是kafka的消息流. 通过迭代消息流中的message, 使用InputRowParser解析数据, 返回的就是InputRow.
再接上RealtimeManager的runFirehose会调用Firehose的nextRow读取数据, 整个流程就完成了: DataSchema定义-InputRowParser解析-InputRow.

1
2
3
4
5
6
7
8
9
return new Firehose() {
public boolean hasMore() {
return iter.hasNext();
}
public InputRow nextRow(){
final byte[] message = iter.next().message();
return theParser.parse(ByteBuffer.wrap(message));
}
}

druid_row

RealtimePlumber

从Firehose读取的每一行InputRow都会添加到Plumber中.每一行数据都有一个时间撮timestamp.truncatedTime是使用segmentGranularity对时间撮进行截断.
由于每条记录最终都要存在于一个Segment中,而Segment是以Interval指定的时间间隔存储.比如间隔为1h的Segment:20151011-100000~20151011-110000.
sinks保存的是截断的时间撮对应Sink.Sink保存的是这段时间内的所有事件.获取到Sink后往Sink中添加这一行记录. Sink底层使用了IncrementalIndex增量索引.
如果Sink不能再添加新的一行(比如Segment大小达到阈值)或者与达到刷新时间的间隔(IntermediatePersistPeriod,默认10分钟),就会将Sink中的数据进行持久化.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException {
final Sink sink = getSink(row.getTimestampFromEpoch());
final int numRows = sink.add(row);
if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
persist(committerSupplier.get());
}
return numRows;
}
private Sink getSink(long timestamp) {
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
final VersioningPolicy versioningPolicy = config.getVersioningPolicy();
final long truncatedTime = segmentGranularity.truncate(new DateTime(timestamp)).getMillis();
Sink retVal = sinks.get(truncatedTime);
if (retVal == null) {
final Interval sinkInterval = new Interval(new DateTime(truncatedTime),segmentGranularity.increment(new DateTime(truncatedTime)));
retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval));
segmentAnnouncer.announceSegment(retVal.getSegment());
sinks.put(truncatedTime, retVal);
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk<Sink>(retVal));
}
return retVal;
}

在初次创建一个Sink的时候,会通过segmentAnnouncer通知生成一个新的Segment. 实际上是通知ZooKeeper创建对应的临时节点.
然后往sinks中添加截断的时间撮和Sink的映射关系,假设后面事件的截断时间撮(比如都在同一个小时内),就直接使用创建好的Sink.
sinkTimeline是Sink的时间线,除了Interval,还有版本信息,分区编号. 比如一个Segment在同一个小时内数据量太大,会分成多个分区.

Sink使用了FireHydrant和IncrementalIndex增量索引. 我们知道Druid存储的并不是原始数据,而是Roll-up后的结果.
在前面getSink第一次创建Sink的时候, 也会顺带创建FireHydrant和OnheapIncrementalIndex(在堆中的增量索引)以及DataSegment!
因为实时数据写入到实时节点,经过索引后,这些数据要能够立即被查询到. 所以经过Roll-up后的数据是放在实时节点的内存中的.

druid_index

IncrementalIndex

添加一行InputRow会从FireHydrant中获取出OnheapIncrementalIndex,往增量索引中添加一条记录.

  • dimensions: 一行记录的所有列名称,从dataSchema的dimensionsSpec/dimensions指定
  • dimensionValues: 某个dimension的列值, 可以是数组有多个值
  • dimensionOrder: <dimension, order> 每个dimension列名的位置
  • dimValues: DimensionHolder<dimension, DimDim> 通过canonical可以快速判断列值是否存在
  • dims[][]: dims[index]中的index来自于dimensionOrder对应的order顺序, 值=getDimVals(DimDim, dimensionValues)
  • TimeAndDims: 时间撮和dims组成
  • metrics AggregatorFactory[]: 聚合算子的构造工厂,通过工厂类的factorize可以构造出算子
  • aggs AggregatorType[]: metrics指标列对应的每个Aggregator算子

在准备了上面的这些数据后, IncrementalIndex调用addToFacts添加facts, OnheapIncrementalIndex的实现会使用构造好的聚合算子,开始聚合操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator> {
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts = new ConcurrentSkipListMap<>();

protected Integer addToFacts(AggregatorFactory[] metrics, boolean deserializeComplexMetrics, InputRow row,
AtomicInteger numEntries, TimeAndDims key, ThreadLocal<InputRow> rowContainer, Supplier<InputRow> rowSupplier){
Aggregator[] aggs = new Aggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics));
}

rowContainer.set(row); //线程安全操作, 在开始聚合前设置为当前行
for (Aggregator agg : aggs) {
synchronized (agg) {
agg.aggregate(); //这里调用聚合算子的aggregate会发生聚合操作
}
}
rowContainer.set(null); //结束当前行的聚合操作后, 设置为空
return numEntries.get();
}
}

InputRow添加到IncrementalIndex, 会加入到增量索引的facts中. facts的TimeAndDims包含了时间撮和维度信息.
经过Roll-up的聚合算子会进行聚合操作,聚合结果也可以通过IncrementalIndex的相关getXXXValue获取.

persistHydrant

在persist持久化最开始, 会进行swap操作:创建一个新的FireHydrant,返回旧的FireHydrant.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void persist(final Committer committer) {
final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList();
for (Sink sink : sinks.values()) { //每个Sink都要进行切换, 旧的保存, 新的存储最新的实时数据
if (sink.swappable()) {
indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval()));
}
}

for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
persistHydrant(pair.lhs, schema, pair.rhs, metadata));
}
committer.run();
resetNextFlush();
}

protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval, Map<String, Object> metaData){
int numRows = indexToPersist.getIndex().size(); //增量索引中的行数
final IndexSpec indexSpec = config.getIndexSpec(); //索引配置,在tuningConfig中,比如bitmap类型,列压缩格式
final File persistedFile = indexMerger.persist( //合并索引
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
metaData,
indexSpec
);
indexToPersist.swapSegment( //增量索引转换为可查询的索引片
new QueryableIndexSegment(indexToPersist.getSegment().getIdentifier(), indexIO.loadIndex(persistedFile))
);
return numRows;
}

持久化增量索引,IndexMerge.persist会进一步调用merge,创建IncrementalIndexAdapter适配器.因为indexToPersist是OnHeapIncrementalIndex

1
2
3
4
return merge(
Arrays.<IndexableAdapter>asList(new IncrementalIndexAdapter(dataInterval, index, indexSpec.getBitmapSerdeFactory().getBitmapFactory())),
index.getMetricAggs(), outDir, segmentMetadata, indexSpec, progress
);

IndexMerge会对维度和指标合并成mergedDimensions,mergedMetrics,还有每一行的合并函数rowMergerFn. 最后makeIndexFiles创建索引文件.


文章目录
  1. 1. Guice Inject
    1. 1.1. injectMembers和toInstance注入实例化好的对象
    2. 1.2. Provider的get方法返回值绑定实现类
    3. 1.3. 注解和Provides绑定实现类
    4. 1.4. JSON Property
    5. 1.5. 别的Module注入同样可用
    6. 1.6. MapBinder注入Map
  2. 2. RealtimeManager
    1. 2.1. Firehose
    2. 2.2. DataSchema
    3. 2.3. KafkaEightFirehoseFactory
    4. 2.4. RealtimePlumber
    5. 2.5. IncrementalIndex
    6. 2.6. persistHydrant