时间序列数据库之Kairosdb

A time series is a sequence of data points, typically consisting of successive measurements made over a time interval.

存储

添加数据支持telnet的4242端口或者rest的8080端口. 添加数据表示将要存储的指标数据入库.
telnet的格式: put <metric name> <time stamp> <value> <tag> <tag>... \n

比如要监控主机的load:

1
2
3
4
now=$(($(date +%s%N)/1000000))
metric=load_value_test
value=42
echo "put $metric $now $value host=A" | nc -w 30 localhost 4242

上面表示: 在主机host为A的机器上, 指标名称load_value_test, 在某个时间点(入库时一般是当前时间)的值是42.
对于相同的指标名称, 会收集很多个机器. 同样对于一台机器, 收集的指标也不止load,还有CPU等.

1
2
3
4
5
6
7
put load $now 42 host=192.168.0.10
put load $now 55 host=192.168.0.11
put load $now 60 host=192.168.0.12

put cpu $now 10 host=192.168.0.10
put cpu $now 29 host=192.168.0.11
put cpu $now 99 host=192.168.0.12

下面的语法是InfluxDB和KairosDB的对比(针对指标只有一个值)

KairosDB InfluxDB
cpu 1434067467000 0.64 host=serverA,region=us_west cpu,host=serverA,region=us_west value=0.64 1434067467000

其中对应的Schema:

metric_name timestamp tags fields
cpu 1434067467000 host=serverA,region=us_west 0.64/value=0.64

KairosDB也支持rest方式表示(HTTP POST)存储数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[{
"name": "load",
"timestamp": 1349109376,
"type": "long",
"value": 42,
"tags":{"host":"192.168.0.10"}
},
{
"name": "cpu",
"timestamp": 1349109376,
"type": "long",
"value": 10,
"tags":{"host":"192.168.0.10"}
}]

DataPoints

Data points have a metric name, a value, a timestamp, and a list of one or more tags.
Tags are named properties that identify the data, such as its type and where it comes from.
You can either use “timestamp” with “value” for a single data point or you can use “datapoints” to post multiple data points.

datapoints: An array of data points. Each data point consists of a timestamp and value.

The tags are used when querying metrics to narrow down the search. For example,
if multiple metrics are measured on server1, you could add the “host”:”server1” tag to each of the metrics
and queries could return all metrics for the “host” tagged with the value of “server1”.
如果server1有多个指标需要被监控,可以为每个指标都添加host的标签. 当查询host=server1的标签时,所有server1的指标都会返回.

DataPoints一般是多个timestamp和value的组合,表示在某一个时刻的指标值,tags也可以指定多个.
下面表示数据中心为DC1,主机为server1在三个时间点的load值.

1
2
3
4
5
6
7
8
{
"name": "load",
"datapoints": [[1359788400000, 123], [1359788300000, 13.2], [1359788410000, 23.1]],
"tags": {
"host": "server1",
"data_center": "DC1"
}
}

如果指标名称对应的值有多个组成, datapoints的value也可以用多个来表示:
下面表示指标名称为阻抗,指标值是一个复数(实数和虚数). 所以每个时间点的value又是一个json结构.

1
2
3
4
5
6
7
8
9
10
11
12
{
"name": "impedance",
"type": "complex-number",
"datapoints": [
[1359788400000,{"real": 2.3,"imaginary": 3.4}],
[1359788300000,{"real": 1.1,"imaginary": 5.2}]
],
"tags": {
"host": "server1",
"data_center": "DC1"
}
}

如果用InfluxDB的Schema表示:

1
2
impedance,host=server1,data_center=DC1 real=2.3,imaginary=3.4 1359788400000
impedance,host=server1,data_center=DC1 real=1.1,imaginary=5.2 1359788300000

查询

假设前面要监控的所有主机都存储有load这个指标.下面查询过去20分钟内指标名称为”负载”的记录,就会把所有主机的负载都显示出来.

1
2
3
4
5
6
7
8
{
"start_relative":{"value":20,"unit":"minutes"},
"metrics": [
{
"name": "load"
}
]
}

时间范围可以是相对(relative)或绝对(absolute). 必须要有开始时间, 结束时间不指定默认当前.

Returns a list of metric values based on a set of criteria. 根据查询条件返回指标结果. 同时会返回在数据点内的标签和对应的标签值集合
Also returns a set of all tag names and values that are found across the data points.

  • 分组[group_by]: 查询结果可以分组, 分组方式有: 标签, 时间范围, 值.
  • 聚合[aggregators]: Aggregators perform an operation on data points and down samples.
  • 过滤[tags]: 通过指定tag来过滤查询结果. 比如根据指标load查询的是所有节点的负载,现在只关心server1的负载,就可以设置标签host=server1进行过滤.

聚合可以combine. 聚合按照顺序执行, 前一个聚合的输出会作为下一个聚合的输入.
For example, you could sum all data points in 5 minute periods then average them for a week period.
对间隔为5分钟的数据集求和, 然后按照一周的间隔求平均值.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"start_absolute": 1357023600000,
"end_relative": {
"value": "5",
"unit": "days"
},
"metrics": [
{
"tags": {
"host": ["foo", "foo2"],
"customer": ["bar"]
},
"name": "abc.123",
"limit": 10000
"aggregators": [
{
"name": "sum",
"sampling": {
"value": 10,
"unit": "minutes"
}
}
]
}
]
}

start_relative=5 minutes: the query will return all matching data points for the last 5 minutes.
过去5分钟内的所有数据. 因为开始时间是相对于当前时间往前5分钟: [当前时间-5min, 当前时间].

start_relative=30 min, end_relative=10 min. if the start time is 30 minutes and the end time is 10 minutes,
the query returns matching data points that occurred between the last 30 minutes up to and including the last 10 minutes.

1
2
3
      30min ago              10min ago          now
---------|----------------------|----------------|---->
|<-------------------->|

Most aggregators support downsampling(降采样). Downsampling allows you to reduce the sampling rate(抽样率) of the data points
and aggregate these values over a longer period of time. 减少数据点集合的抽样率,可以在更长的时段里聚合这些数据.
For example, you could average all daily values over the last week.
Rather than getting 7 values you would get one value which is the average for the week.

聚合

1
2
3
4
5
6
7
8
9
10
"aggregators": [
{
"name": "sum",
"align_sampling": true,
"align_start_time": true,
"sampling": {
"value": 1,
"unit": "minutes"
}
}]

Sampling is the length of the interval on which to aggregate data. 要聚合数据的间隔长度

由于每一个时刻都会有data point数据点产生,sampling是对数据进行取样.假设sampling是一分钟.则如何把每个时刻的数据点放在对应的Range里.
以监控为例,监控图要展示一分钟的CPU,Load,IO等信息. 由于每个时间点(假设每秒钟)都会产生一条数据.聚合时要对属于同一分钟的数据进行聚合.

align_start_time

When set to true the time for the aggregated data point for each range will
fall on the start of the range instead of being the value for the first
data point within that range.

align_sampling

Setting this to true will cause the aggregation range to be aligned based on
the sampling size. For example if your sample size is either milliseconds,
seconds, minutes or hours then the start of the range will always be at the top
of the hour. The effect of setting this to true is that your data will
take the same shape when graphed as you refresh the data.

Cassandra Schema

有三张表(ColumnFamily)来存储时间序列数据.

  • data_points - where the data is kept.
  • row_key_index - index to lookup what rows to get during a query.
  • string_index - used to answer the query of what tags and metrics are in the system.

data_points是存储原始数据, row_key_index是row_key的索引, 实际上索引的是data_points的数据. string_index没什么用.

data_points的row-key由下面几个部分组成:

  • Metric name (UTF-8)
  • Row time stamp. The time stamp for the row to begin at.
  • Datastore type.
  • Concatenated string of tags (tag1=val1:tag2=val2…)

列名是相对row-key中的timestamp的time offset. 列的值是指标的值.

|row-key|time_0|time_1000|…|
|——-|——–|———–|…|
|load:12345678000:host=server1|20|45|…|
|load:12345678000:host=server2|10|43|…|
|cpu:12345678000:host=server1|33|57|…|

row-key中的timestamp记录的是第一条记录的时间撮,所以time1(0)表示的是12345678000时刻, 这个点的load=20.
time2(100)表示的是12345678000+100=123456781000时刻, 主机为server1的指标load值为45.
这种存储方式对于同一个tag,相同的指标(metric_name),即使每一毫秒记录一次,最长能记录3周(如果一秒一条,则3000周!).
注意到列名是时间撮的偏移量(毫秒). 并不是相同指标相同tag分成多行,这里分成多列!

row_key_index表的row_key是metric_name. 列名是data_points的row_key.

row-key load:12345678000:host=server1 load:12345678000:host=server2
load
cpu:12345678000:host=server1
cpu

因为row_key是metric_name. 所以上面data_points的三条记录有两个row_key: load和cpu.
相同metric_name的不同tag会作为这个row_key的多个列. 实际上因为data_points的row_key不多,
所以这里的列也不会很多. 即data_points的列跟时间有关,这里只跟指标有关(尤其是tag)

When a query comes in a column slice of the row key index is done for the particular metric,
this returns the rows that will contain the data. The row keys are then filtered based on if any tags were specified.
A multi get hector call is made to fetch the data from the various rows.
If any row has more data then the remainder is fetched individually using a larger buffer.

查询一般会给出metric_name,所以先在row_key_index中查询metric_name对应的列.
如果查询时指定了tag,会只过滤出来指定tag的row_keys. 得到row_keys后会多线程读取data_points中的记录.
从上面的表结构可以看出row_key_index的列相当于外键. 不过index这个词也很符合,是对row_key进行索引.

1
2
             row_key_index            data_points
metric_name ============> row_key ===========> query time-range metric value

对比InfluxDB, Influx中把每个metric_name当做一个Table. 类似于这里的row_key_index表(主键是metric_name).
不同的是data_points表. K的row_key包含了metric_name,时间撮,tag. 列名是timestamp,列的值是指标值.
而InfluxDB表的row_key是timestamp, 而tag和field都作为列.
K如果相同metric相同timestamp, 但是有多个tag,最终会形成多条row_key记录.
而I对于同一个row_key(相同timestamp), 多个tag会形成多个列, 但只有一行.

数据存储流程

KairosDatastore的putDataPoint会调用底层存储,首先写到缓冲区,待缓冲区满了再刷新到存储层.
添加一个数据点: 指标名称metric_name, tag标签集合, 数据点(数据类型,指标的具体值).

1
2
3
4
5
6
public void putDataPoint(String metricName, ImmutableSortedMap<String, String> tags, DataPoint dataPoint, int ttl) throws DatastoreException {
m_datastore.putDataPoint(metricName, tags, dataPoint, ttl);
for (DataPointListener dataPointListener : m_dataPointListeners) {
dataPointListener.dataPoint(metricName, tags, dataPoint);
}
}

底层存储是Cassandra,对应的存储实现是CassandraDatastore. 由于一条记录要写到三张表中.
分别使用了三个缓冲区m_rowKeyWriteBuffer,m_stringIndexWriteBuffer,m_dataPointWriteBuffer.

DataPointsRowKey表示的是data_points表的row_key, 组成格式:

1
rowKey = new DataPointsRowKey(metricName, rowTime, dataPoint.getDataStoreDataType(), tags);

row_key_index的row_key是metricName. value是data_points的rowKey.

1
m_rowKeyWriteBuffer.addData(metricName, rowKey, "", now, rowKeyTtl);

第一个参数是rowKey, 接下来两个分别是列和值.

1
2
3
4
5
6
7
public void addData(RowKeyType rowKey,ColumnKeyType columnKey,ValueType value,long timestamp,int ttl) {
waitOnBufferFull();
m_bufferCount ++;
if (columnKey.toString().length() > 0){
m_buffer.add(new Triple<RowKeyType, ColumnKeyType, ValueType>(rowKey, columnKey, value, timestamp, ttl));
}
}

在添加数据之前会先判断缓冲区是否已满,如果没有,则缓冲区计数器加1(累加到一定程序一定会满的), 并将数据添加到缓冲区中.
当缓冲区计数器达到阈值, 或者可重入锁的引用计数=0, 就启动提交作业的任务WriteDataJob.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void submitJob() {
Mutator<RowKeyType> pendingMutations = null;
List<Triple<RowKeyType, ColumnKeyType, ValueType>> buffer = null;

//创建新的变量, 将原变量值赋值给新变量, 重置原变量, 接下来的操作都是在新变量上进行
pendingMutations = m_mutator;
buffer = m_buffer;
m_mutator = new MutatorImpl<RowKeyType>(m_keyspace, m_rowKeySerializer);
m_buffer = new ArrayList<Triple<RowKeyType, ColumnKeyType, ValueType>>();
m_bufferCount = 0;

WriteDataJob writeDataJob = new WriteDataJob(pendingMutations, buffer);
m_executorService.submit(writeDataJob);
writeDataJob.waitTillStarted();
}

写入Cassandra,这里用的客户端是hector. 跟官方的drive一样,先构造INSERT语句,然后调用客户端的execute就插入数据到C了.
由于缓冲区是有很多条记录的,所以这里用批量插入的方式,先依次加入到Mutator:m_pendingMutations中. 最后执行这个Mutator.

1
2
3
4
5
6
7
8
if (m_pendingMutations != null) {
for (Triple<RowKeyType, ColumnKeyType, ValueType> data : m_buffer) {
HColumnImpl<ColumnKeyType, ValueType> col = new HColumnImpl(data.getSecond(), data.getThird(), data.getTime(), m_columnKeySerializer, m_valueSerializer);
m_pendingMutations.addInsertion(data.getFirst(), m_cfName, col);
}
m_pendingMutations.execute();
}
m_pendingMutations = null;

由于m_buffer缓冲区包含了Row, ColumnName, ColumnValue. 构造的HColumn是列信息(包括了列名和列值),
addInsertion的第一个和第三个参数分别是Row和Column(Name和Value). 第二个参数是ColumnFamilyName,即表名.

数据查询流程

queryDatabase需要提供DatastoreMetricQuery指标查询对象,有两个实现类DatastoreMetricQueryImpl和QueryMetric.
都能够对应到前面我们的示例查询JSON串. 比如分组(group by), 聚合(aggregator), 起止时间, tags等.

首先根据metric_name和起止时间确定row_key:getKeysForQueryIterator,返回FilteredRowKeyIterator过滤后的RowKey迭代器:
SliceQuery是根据metric_name查询row_key_index的列.因为row_key_index的主键就是metricName.列正是DataPointsRowKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public FilteredRowKeyIterator(String metricName, long startTime, long endTime, SetMultimap<String, String> filterTags) {
//指定了row_key_index的Key(行键),Name(列名),Value(列值)的序列化对象
SliceQuery<String, DataPointsRowKey, String> sliceQuery = HFactory.createSliceQuery(m_keyspace, StringSerializer.get(), new DataPointsRowKeySerializer(true), StringSerializer.get());
//设置表row_key_index的主键为metricName, 相当于查询: select * from row_key_index where row_key = $metric_name
sliceQuery.setColumnFamily(CF_ROW_KEY_INDEX).setKey(metricName);
m_sliceIterator = createSliceIterator(sliceQuery, metricName, startTime, endTime);
}
private ColumnSliceIterator<String, DataPointsRowKey, String> createSliceIterator(SliceQuery<String, DataPointsRowKey, String> sliceQuery, String metricName, long startTime, long endTime) {
//构建data_points表的起始和结束的row_key. 因为data_points的主键除了metricName,时间撮,还有tag. 在没有指定tag情况下,会查询出所有的tag.
DataPointsRowKey startKey = new DataPointsRowKey(metricName,calculateRowTime(startTime), "");
DataPointsRowKey endKey = new DataPointsRowKey(metricName,calculateRowTime(endTime), "");
endKey.setEndSearchKey(true);
//列的Slice切片. 实际上现在还是在row_key_index中. 因为对于row_key_index的row_key存在多个DataPointsRowKey
//指定起始和结束时间, 确定最终要查询data_points表的哪些row_key.
ColumnSliceIterator<String, DataPointsRowKey, String> iterator = new ColumnSliceIterator(sliceQuery,startKey, endKey, false, m_singleRowReadSize);
return (iterator);
}

Iterator迭代器需要实现hasNext和next方法, 其中m_filterTags来自于查询条件指定的tags.
在查询row_key_index的每一列时,这一列(DataPointsRowKey)也包含了tags标签集合.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private DataPointsRowKey nextKeyFromIterator(ColumnSliceIterator<String, DataPointsRowKey, String> iterator){
DataPointsRowKey next = null;
outer:
while (iterator.hasNext()){
DataPointsRowKey rowKey = iterator.next().getName();
Map<String, String> keyTags = rowKey.getTags(); //获取DataPointsRowKey的tags集合(比如host=A1,server=B1)
for (String tag : m_filterTags.keySet()){ //只需要获取这些tags(比如只选择host=A1,A2)
String value = keyTags.get(tag); //DataPointsRowKey中每个tag只会有一个值,而指定的过滤条件,一个tag可以有多个值
if (value == null || !m_filterTags.get(tag).contains(value)) //所以判断DataPointsRowKey中tag的值是否被包含在过滤值列表中
continue outer; //Don't want this key
}
next = rowKey;
break;
}
return (next);
}

根据row-keys查询,采用批处理的方式(大小为m_multiRowSize). 如果keys大小小于m_multiRowSize,就加入到临时列表queryKeys.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void queryWithRowKeys(DatastoreMetricQuery query, QueryCallback queryCallback, Iterator<DataPointsRowKey> rowKeys) {
long currentTimeTier = 0L;
String currentType = null;
List<QueryRunner> runners = new ArrayList<QueryRunner>();
List<DataPointsRowKey> queryKeys = new ArrayList<DataPointsRowKey>(); //临时变量,用来保存每一批的row-keys
while (rowKeys.hasNext()){
DataPointsRowKey rowKey = rowKeys.next();
if (currentTimeTier == 0L) currentTimeTier = rowKey.getTimestamp();
if (currentType == null) currentType = rowKey.getDataType();
if ((rowKey.getTimestamp() == currentTimeTier) && (queryKeys.size() < m_multiRowSize) && (currentType.equals(rowKey.getDataType()))) {
queryKeys.add(rowKey);
} else{
//每一次批处理都封装到QueryRunner对象中, 最后统一运行.
runners.add(new QueryRunner(m_keyspace, CF_DATA_POINTS, m_kairosDataPointFactory, queryKeys, query.getStartTime(), query.getEndTime(), queryCallback, m_singleRowReadSize,m_multiRowReadSize, query.getLimit(), query.getOrder()));
queryKeys = new ArrayList<DataPointsRowKey>(); //重置所有的临时变量
queryKeys.add(rowKey);
currentTimeTier = rowKey.getTimestamp();
}
}
//最后还要进行一次处理. 和上面的else部分不同,不需要清空变量.因为没有下一次了. 注意现在根据row_keys是要查询data_points表.
if (!queryKeys.isEmpty()) {
runners.add(new QueryRunner(m_keyspace, CF_DATA_POINTS, m_kairosDataPointFactory, queryKeys, query.getStartTime(), query.getEndTime(), queryCallback, m_singleRowReadSize, m_multiRowReadSize, query.getLimit(), query.getOrder()));
}
for (QueryRunner runner : runners) {
runner.runQuery();
}
queryCallback.endDataPoints(); //查询回调,比如将查询结果放入缓存中
}

在根据metric_name和时间范围获取到了需要查询的DataPointsRowKey迭代器. 就可以去data_points表查询原始数据点集合了.

QueryRunner

data_points表的Key,Name,Value的类型根据表结构分别是DataPointsRowKey, Integer, byte[].
同时由于一次查询多个row_keys,构造的查询切片是MultigetSliceQuery.

分组聚合查询流程

QueryMetric.getAggregators被调用的地方是KairosDatastore, 被用在MetricsResource.execute中:

1
2
3
4
5
6
7
8
9
public Response get(String json) throws Exception {
//根据查询提供的json字符串,构造QueryMetric查询对象
List<QueryMetric> queries = queryParser.parseQueryMetric(json);
for (QueryMetric query : queries){
DatastoreQuery dq = datastore.createQuery(query); //创建服务于QueryMetric的DatastoreQueryImpl
List<DataPointGroup> results = dq.execute(); //执行查询,会去查询底层的CassandraDatastore
jsonResponse.formatQuery(results, query.isExcludeTags(), dq.getSampleSize());
}
}

现在一个查询的流程就清楚了:

1
查询json字符串 --> 构造QueryMetric查询对象 --> 创建DatastoreQueryImpl --> 查询底层的CassandraDatastore --> 设置查询结果到响应中

查询支持缓存,如果缓存未命中,除了传入要查询的对象QueryMetric,还有回调对象CachedSearchResult. 缓存命中就直接打开缓存.
缓存使用本地文件,包括了index文件和data文件,正好对一个了row_key_index表和data_points表.
和Cassandra的key cache一样都是本地节点的缓存,而不是分布式的集群缓存.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  public List<DataPointGroup> execute() throws DatastoreException {
CachedSearchResult cachedResults = null;
List<DataPointRow> returnedRows = null;
String tempFile = m_cacheDir + m_cacheFilename;
if (m_metric.getCacheTime() > 0) {
//缓存存在,直接打开缓存. 由于缓存使用本地文件, 所以实际上是将本地文件内容加载到内存中.
cachedResults = CachedSearchResult.openCachedSearchResult(m_metric.getName(), tempFile, m_metric.getCacheTime(), m_dataPointFactory);
if (cachedResults != null) {
returnedRows = cachedResults.getRows();
}
}
//缓存未命中,需要到底层存储(m_datastore)查询一次,并放入缓存中
if (cachedResults == null) {
//这里只是创建了一个缓存对象. 但是数据添加到缓存中, 则是有后面一句的回调函数来实现.
cachedResults = CachedSearchResult.createCachedSearchResult(m_metric.getName(), tempFile, m_dataPointFactory);
m_datastore.queryDatabase(m_metric, cachedResults);
returnedRows = cachedResults.getRows(); //查询到结果后, 缓存中一定是有数据的, 即要保证回调函数被调用.
}
}
//....
}

如果有分组条件,会将List<DataPointRow>首先按照tag分组成List<DataPointGroup>.
分组一般是对多个相同的tag标签名称. 比如相同metricName. 分组的tag为host. host的值有很多个,表示集群的每个主机. 根据tag分组, 比如select host,sum(value) from cpu group by host`
因为对于相同的metricName, 不同的tag在kairosdb中是不同的row-key.
所以按照tag分组后,在物理存储中相同tag即相同row-key的都会被归于同一组(物理和逻辑视图一致了).

1
2
3
4
5
//首先根据tag分组
List<DataPointGroup> queryResults = groupByTypeAndTag(m_metric.getName(), returnedRows, getTagGroupBy(m_metric.getGroupBys()), m_metric.getOrder());
// Now group for all other types of group bys.
Grouper grouper = new Grouper(m_dataPointFactory);
queryResults = grouper.group(removeTagGroupBy(m_metric.getGroupBys()), queryResults);

tag分组

groupByTypeAndTag会根据分组类型标签进行分组. 测试用例可以查看KairosDatastoreTest. 如果数据点没有指定任何的标签,
则只会有一个DataPointGroup,测试方法:test_query_sumAggregator和test_query_noAggregator都只有一个数据点组.

对应KairosDatastore.groupByTypeAndTag(),所有数据点rows的groupType都一样,所以typeGroups只有一个元素.
这样所有的数据点都统一放在了typeGroups的value中[typeGroups.get(type)]. sortedTypes的for循环也只会调用一次而已.
注意DataPointGroupRowWrapper是DataPointRow的封装,即一个DataPointGroupRowWrapper只有一个数据点,虽然它也继承了DataPointGroup.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected List<DataPointGroup> groupByTypeAndTag(String metricName, List<DataPointRow> rows, TagGroupBy tagGroupBy, Order order) {
List<DataPointGroup> ret = new ArrayList<DataPointGroup>();
//key是groupType, value是一个List,可以存放多个数据点集合组
ListMultimap<String, DataPointGroup> typeGroups = ArrayListMultimap.create();
//Go through each row grouping them by type
for (DataPointRow row : rows) {
String groupType = m_dataPointFactory.getGroupType(row.getDatastoreType());
typeGroups.put(groupType, new DataPointGroupRowWrapper(row));
}
//Sort the types for predictable results
TreeSet<String> sortedTypes = new TreeSet<String>(typeGroups.keySet());
//Now go through each type group and group by tag if needed.
for (String type : sortedTypes) {
if (tagGroupBy != null) {
//...
} else {
ret.add(new SortingDataPointGroup(typeGroups.get(type), new TypeGroupByResult(type), order));
}
}
return ret;
}

如果查询时指定了标签TagGroupBy(可以有多个), groups的key是标签名称, value是属于这个标签的数据组.
注意groups和上面的typeGroups类似,value都能够放入key对应的多个对象:同一个标签名称,当然会有多个标签值了!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ListMultimap<String, DataPointGroup> groups = ArrayListMultimap.create();
Map<String, TagGroupByResult> groupByResults = new HashMap<String, TagGroupByResult>();

//按照类型分组, 很可能只有一个分组,比如groupType都是long类型. 因此还要按照客户端指定的tag再次分组
//但是typeGroups中的每个DataPointGroup都是用一个个单独的Row包装起来的. 并不是说一个DataPointGroup有多个Row.
for (DataPointGroup dataPointGroup : typeGroups.get(type)) {
//tagGroupBy是客户端指定要查询哪些标签, dataPointGroup是数据点, 从中解析出标签对应的值
LinkedHashMap<String, String> matchingTags = getMatchingTags(dataPointGroup, tagGroupBy.getTagNames());
String tagsKey = getTagsKey(matchingTags);
//同一个tagsKey会有两个Map分别保存不同的数据. 不过最后都用于下面构造出SortingDataPointGroup
//注意groups的value最后针对tagsKey是一个集合, 而groupByResults因为是普通的Map,只会有一个对象.
groups.put(tagsKey, dataPointGroup);
groupByResults.put(tagsKey, new TagGroupByResult(tagGroupBy, matchingTags));
}

//Sort groups by tags 根据标签名称排序.
TreeSet<String> sortedGroups = new TreeSet<String>(groups.keySet());
for (String key : sortedGroups) {
//根据上面的tagsKey, 从两个Map中获取对应的数据. 分别是DataPointGroup(多个数据点组成的分组集合)和分组结果.
SortingDataPointGroup sdpGroup = new SortingDataPointGroup(groups.get(key), groupByResults.get(key), order);
sdpGroup.addGroupByResult(new TypeGroupByResult(type));
ret.add(sdpGroup);
}

查询时指定的tagNames名称是有序的,所以返回的matchingTags也需要有序(使用LinkedHashMap).所以上面第二段代码还会按照tags进行分组.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static LinkedHashMap<String, String> getMatchingTags(DataPointGroup datapointGroup, List<String> tagNames) {
LinkedHashMap<String, String> matchingTags = new LinkedHashMap<String, String>();
//循环tagNames, 结果按照tagNames出现的顺序
for (String tagName : tagNames) {
Set<String> tagValues = datapointGroup.getTagValues(tagName);
if (tagValues != null) {
//虽然返回的tagValues是一个集合,表示一个标签会有多个值. 但那是针对相同标签,不同值而言.
//而这里因为DataPointGroup只是Row的封装,所以一个标签只有一个值!
//当然同一行可以有多个标签, 但那是不同的标签, 而不是一个标签多个值!
String tagValue = tagValues.iterator().next();
matchingTags.put(tagName, tagValue != null ? tagValue : "");
}
}
return matchingTags;
}

按照tag分组的示例(同一个指标名称,多个tag分组,类似于SQL中的group by dc,host):

metric_name dc host
metric1 dc1 server1
metric1 dc2 server1
metric1 dc1 server2
metric1 dc2 server2

DataPointGroup

DataPointGroup对象除了指标名称(name)和分组结果列表(group_by),还有tags:对应TagSet,values:对应DataPoint迭代器.

1
2
3
4
public interface DataPointGroup extends Iterator<DataPoint>, TagSet {
public String getName(); //Returns the metric name for this group
public List<GroupByResult> getGroupByResult(); //Returns the list of group by results or an empty list if the results are not grouped.
}

以tag分组的返回结果为例,可以看到与DataPointGroup都是一一对应的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{"queries": [{"results": [
{
"name": "metric1", //指标名称
"group_by": [ //List<GroupByResult>
{
"name": "tag", //不同的分组方式,这里名称不一样,比如还有value,time等
"tags": ["data_center", "host"],
"group": {
"data_center": "dc1",
"host": "server1"
}
}
],
"tags": { //TagSet
"data_center": ["dc1"],
"host": ["server1"]
},
"values": [ //Iterator<DataPoint>
[1353222000000, 31],
[1364796000000, 723]
]
}
]}]}

其他分组

除了根据tag分组,还可以根据Value,时间或者Bin分组. 具体实现在Grouper. 对应测试用例在GrouperTest.
根据Value分组:将数据点的值与指定范围取模.根据时间分组,要设置时间单位,比如月份,则相同月份的数据点会在一个组里.
下面是根据value和time进行分组的示例. range_size和group_count分别表示每一组的数据点容量/取值范围.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"group_by": [
{
"name": "value",
"range_size": 1000
}
]

"group_by": [
{
"name": "time",
"group_count": "168",
"range_size": {
"value": "1",
"unit": "hours"
}
}
]

按照时间分组,通常的需求是按照时间段,统计今天和昨天(上周等)在同一时间段的聚合行为等. 比如上面按照每小时分组,一共有168组.
由于一周正好是168个小时. 所以每周的每天的相同小时会归于一组. 比如21号0点和28号0点会是同一组.
如果unit=days, group_count=7. 则周一的数据会归于一组. 周二的所有数据归于一组.

1 2 3 4 …………. 168
2015122101 2015122102 2015122103 2015122104 2015122723
2015122801 2015122802 2015122803 2015122804 2016010323

以总共一周,每一小时的分组结果为例(下面只显示了其中的一组, 每一组的格式都是类似的):
对于metric1指标,它所在的机器是dc1:server1,显然每天每一秒都会有指标数据产生.
如果对于metric1指标的所有数据,仅仅根据tag分组,则每个tag组合的所有指标数据都只有一组.
为了更细粒度地查看相同指标,相同tag在不同时间段内的表现,设置了时间分组,可以方便地看出同一个小时在今天和上周的差异.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{"queries": [{"results": [
{
"name": "metric1", //① 每一组的指标名称
"group_by": [ //② List<GroupByResult>
{
"name": "time",
"range_size": {"value": 1,"unit": "HOURS"},
"group_count": 168,
"group": {"group_number": 60}
}
],
"tags": { //③ TagSet
"data_center": ["dc1"],
"host": ["server1"]
},
"values": [ //④ Iterator<DataPoint>
[1353222000000, 146],
[1353826800000, 241]
]
}
]}]}

分组的过程实际上是将原始的数据点集合按照指定的分组条件将属于相同的组的所有数据点重新组合下.
下面的dataPointGroupList是上一步经过type和tags分组后的数据点组. 这是因为首先根据tag分组.
因为只有相同tag的数据,在时间上进一步分组才有意义,否则比如相同metric,不同tag比较是没有意义的.
所以上面的示例中查询结果除了group_by,还有tags. values对应的就是这个标签在某一个组的值列表(数据点).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public List<DataPointGroup> group(List<GroupBy> groupBys, List<DataPointGroup> dataPointGroupList) throws IOException {
List<DataPointGroup> dataPointGroups = new ArrayList<DataPointGroup>();
for (DataPointGroup dataPointGroup : dataPointGroupList) {
Map<List<Integer>, Group> groupIdsToGroup = new LinkedHashMap<List<Integer>, Group>();
Map<String, String> tags = getTags(dataPointGroup);
while (dataPointGroup.hasNext()) {
DataPoint dataPoint = dataPointGroup.next();
List<Integer> groupIds = new ArrayList<Integer>();
List<GroupByResult> results = new ArrayList<GroupByResult>();
for (GroupBy groupBy : groupBys) { //允许同时有多个分组方式.上面的几个例子都是一种分组方式
int groupId = groupBy.getGroupId(dataPoint, tags); //获取这个数据点在哪一个组内: group_number
groupIds.add(groupId);
results.add(groupBy.getGroupByResult(groupId));
}
// add to group 首先判断组是否存在,如果已经存在,则直接添加到存在的组里面. 比如21号创建了组,28号时就直接用21号的组
Group group = getGroup(groupIdsToGroup, dataPointGroup, groupIds, results);
group.addDataPoint(dataPoint); //④ 数据点
}

for (Group group : groupIdsToGroup.values()) {
if (!dataPointGroup.getGroupByResult().isEmpty()) {
group.addGroupByResults(dataPointGroup.getGroupByResult()); //② 添加GroupByResult
}
dataPointGroups.add(group.getDataPointGroup()); //最后在Group中的DataPointGroup是完整的,包含了①②③④
}
dataPointGroup.close();
}
return dataPointGroups;
}

聚合

由于上一步返回的queryResults是分组后的DataPointGroup. 而聚合操作是在分组后的结果集中进行的.
比如要统计相同指标,在同一组内的平均值,最大值等. 聚合动作是在每一个组内都做相同的操作.以SQL角度来看,
分组后都需要有一个聚合函数: select host,server,sum(value) from cpu group by host,server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
m_results = new ArrayList<DataPointGroup>();
for (DataPointGroup queryResult : queryResults) {
DataPointGroup aggregatedGroup = queryResult;
List<Aggregator> aggregators = m_metric.getAggregators();
if (m_metric.getLimit() != 0) {
aggregatedGroup = new LimitAggregator(m_metric.getLimit()).aggregate(aggregatedGroup);
}
//This will pipe the aggregators together. 通过链式,可以同时有多个聚合操作,这些聚合操作是一个接一个,而不是同时发生!
for (Aggregator aggregator : aggregators) {
if (aggregator.canAggregate()) //Make sure the aggregator can handle this type of data.
aggregatedGroup = aggregator.aggregate(aggregatedGroup);
}
m_results.add(aggregatedGroup);
}
return (m_results);

RangeAggregator是个抽象类,具体的实现交给SubAggregator.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public DataPointGroup aggregate(DataPointGroup dataPointGroup) {
if (m_alignSampling)
m_startTime = alignRangeBoundary(m_startTime);
if(m_exhaustive)
return(new ExhaustiveRangeDataPointAggregator(dataPointGroup, getSubAggregator()));
else
return(new RangeDataPointAggregator(dataPointGroup, getSubAggregator()));
}

/**
* Return a RangeSubAggregator that will be used to aggregate data over a
* discrete range of data points. This is called once per grouped data series.
*
* For example, if one metric is queried and no grouping is done this method is
* called once and the resulting object is called over and over for each range within the results.
*
* If the query were grouping by the host tag and host has values of 'A' and 'B'
* this method will be called twice, once to aggregate results for 'A' and once to aggregate results for 'B'.
* 上面的意思是聚合操作是针对每一组进行的. 每一组都要调用一次聚合操作. 控制调用的是在前面的queryResults循环中!
*/
protected abstract RangeSubAggregator getSubAggregator();

RangeDataPointAggregator是对RangeSubAggregator的代理,其next调用会通过子聚合类来获取迭代器,从而获取迭代器里的数据点.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private class RangeDataPointAggregator extends AggregatedDataPointGroupWrapper {
protected RangeSubAggregator m_subAggregator;
protected Iterator<DataPoint> m_dpIterator;

public DataPoint next() {
if (!m_dpIterator.hasNext()){
//We calculate start and end ranges as the ranges may not be consecutive if data does not show up in each range.
//currentDataPoint是在父类中定义的,代表每次迭代过程中的当前数据点
long startRange = getStartRange(currentDataPoint.getTimestamp());
long endRange = getEndRange(currentDataPoint.getTimestamp());
SubRangeIterator subIterator = new SubRangeIterator(endRange);

long dataPointTime = currentDataPoint.getTimestamp();
if (m_alignStartTime) dataPointTime = startRange;
m_dpIterator = m_subAggregator.getNextDataPoints(dataPointTime, subIterator).iterator();
}
//如果当前迭代器还有元素,则每次调用next,都会调用一次m_dpIterator的next.如果m_dpIterator没有元素了,再次申请一个新的迭代器.
//相当于一次申请了一个内部迭代器, 可以用于外部的多次next调用. 通过这种方式, 一批一批地处理.
return (m_dpIterator.next());
}
}

RangeDataPointAggregator是个DataPointGroup,最终要实现Iterator迭代器,虽然它自己又代理了innerDataPointGroup.
迭代器要有hasNext和next方法,交给RangeSubAggregator不断获取下一个数据点,形成的Iterator.

1
2
3
4
5
6
    RangeDataPointAggregator   --> AggregatedDataPointGroupWrapper --> DataPointGroup --> Iterator<DataPoint>
| | | |------------------|
DataPointGroup RangeSubAggregator | hasNext,next()
|------------------\---------------------------|(inner) |
\-------------------------------------------------------------|
m_subAggregator.getNextDataPoints().iterator

以RangeAggregator子类CountAggregator的RangeSubAggregator=CountDataPointAggregator求和以及平均值为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private class CountDataPointAggregator implements RangeSubAggregator {
public Iterable<DataPoint> getNextDataPoints(long returnTime, Iterator<DataPoint> dataPointRange) {
long count = 0;
while (dataPointRange.hasNext()) {
count++;
dataPointRange.next();
}
return Collections.singletonList(m_dataPointFactory.createDataPoint(returnTime, count));
}
}

private class AvgDataPointAggregator implements RangeSubAggregator {
public Iterable<DataPoint> getNextDataPoints(long returnTime, Iterator<DataPoint> dataPointRange) {
int count = 0;
double sum = 0;
while (dataPointRange.hasNext()) {
DataPoint dp = dataPointRange.next();
sum += dp.getDoubleValue();
count++;
}
return Collections.singletonList(m_dataPointFactory.createDataPoint(returnTime, sum / count));
}
}

注意上面的第二个参数也是一个迭代器,因此是在dataPointRange多个数据点上做不同的聚合操作(比如求和或平均值).
最终只返回一个数据点,用singleton封装成List,因为聚合操作形成的也只有一个值.
数据点范围是由SubRangeIterator, 它是由currentDataPoint的startRange和endRange构造出来的.
SubRangeIterator是RangeDataPointAggregator的内部类,所以实际上使用的还是最原始传进来的DataPointGroup!

这里开始和结束范围有一个samplingValue抽样的概念. 为什么要抽样? 抽样会对数据精度有影响吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected long getStartRange(long timestamp) {
long samplingValue = m_sampling.getValue();
long numberOfPastPeriods = m_unitField.getDifferenceAsLong(timestamp, m_startTime) / samplingValue;
return m_unitField.add(m_startTime, numberOfPastPeriods * samplingValue);
}
protected long getEndRange(long timestamp) {
long samplingValue = m_sampling.getValue();
long numberOfPastPeriods = m_unitField.getDifferenceAsLong(timestamp, m_startTime) / samplingValue;
return m_unitField.add(m_startTime, (numberOfPastPeriods + 1) * samplingValue);
}

protected class SubRangeIterator implements Iterator<DataPoint> {
public boolean hasNext() {
return ((currentDataPoint != null) && (currentDataPoint.getTimestamp() < m_endRange));
}
public DataPoint next() {
DataPoint ret = currentDataPoint;
if (hasNextInternal())
currentDataPoint = nextInternal(); //这里会调用外部类父类AggregatedDataPointGroupWrapper,最终还是innerDataPointGroup.next()
return (ret);
}
}

所以有必要看下具体的innerDataPointGroup, 它其实决定了数据点组中每个数据点的获取.

EOF AGG QUERY

在KairosDatastore.execute的aggregate流程中,只是创建了一个聚合RangeDataPointAggregator就结束了,返回的是DataPointGroup.
虽然我们上面分析了Iterator是交给子聚合类来实现. 但是如何调用迭代器,最终返回DataPoint即结果中的values列表?
实际是在MetricsResource在execute之后的jsonResponse.formatQuery输出结果时才调用, 现在聚合查询的整个流程就完成了.

1
2
3
4
5
6
7
8
9
10
11
12
13
//This loop must call close on each group at the end.
for (DataPointGroup group : queryResults) {
m_jsonWriter.key("name").value(metric);
//...
//前面写了name,group_by,tags,现在真正开始输出DataPoint列表
m_jsonWriter.key("values").array();
while (group.hasNext()) {
DataPoint dataPoint = group.next();
m_jsonWriter.array().value(dataPoint.getTimestamp());
dataPoint.writeValueToJson(m_jsonWriter);
m_jsonWriter.endArray();
}
}

kairosdb-client

存储:

1
2
3
4
5
6
7
8
9
MetricBuilder builder = MetricBuilder.getInstance();
builder.addMetric("metric1")
.addTag("host", "server1")
.addTag("customer", "Acme")
.addDataPoint(System.currentTimeMillis(), 10)
.addDataPoint(System.currentTimeMillis(), 30L);
HttpClient client = new HttpClient("http://localhost:8080");
Response response = client.pushMetrics(builder);
client.shutdown();

查询:

1
2
3
4
5
6
7
8
QueryBuilder builder = QueryBuilder.getInstance();
builder.setStart(2, TimeUnit.MONTHS)
.setEnd(1, TimeUnit.MONTHS)
.addMetric("metric1")
.addAggregator(AggregatorFactory.createAverageAggregator(5, TimeUnit.MINUTES));
HttpClient client = new HttpClient("http://localhost:8080");
QueryResponse response = client.query(builder);
client.shutdown();

自定义类型:

You must first understand the different custom type values
you must specify on the client; group type and registered type. 客户端,分组类型,注册类型
The group type is used for JSON serialization/de-serialization. 分组类型用于JSON的序列化和反序列化
The registered type is used by the server to identify which DataPointFactory 注册类型用于定位DataPointFactory
it will use to serialize/de-serialize the data to the data store. 它会序列化或反序列化数据到底层存储

存储时,首先通过客户端向KairosDB注册分组类型. 添加指标时,指定注册类型.

1
2
3
4
5
6
7
8
9
10
HttpClient client = new HttpClient("http://localhost:8080");
client.registerCustomDataType("complex", ComplexNumber.class); // "complex" is the group type

MetricBuilder metricBuilder = MetricBuilder.getInstance();
Metric metric = metricBuilder.addMetric("metric1", "complex-number"); // "complex-number" is the registered type
metric.addTag("host", "myHost");
metric.addDataPoint(System.currentTimeMillis(), new ComplexNumber(2.3, 3.4));
metric.addDataPoint(System.currentTimeMillis(), new ComplexNumber(1.1, 5));

client.pushMetrics(metricBuilder);

查询时,将数据点的值转换为自定义的类型.

1
2
3
4
5
6
7
8
9
10
11
12
QueryBuilder queryBuilder = QueryBuilder.getInstance();
queryBuilder.addMetric("metric1");
queryBuilder.setStart(1, TimeUnit.HOURS);

QueryResponse response = client.query(queryBuilder);
List<DataPoint> dataPoints = response.getQueries().get(0).getResults().get(0).getDataPoints();

for (DataPoint dataPoint : dataPoints)
{
ComplexNumber complex = (ComplexNumber) dataPoint.getValue();
System.out.println(complex.real + " + " + complex.imaginary + "i");
}

文章目录
  1. 1. 存储
  2. 2. DataPoints
  3. 3. 查询
  4. 4. 聚合
  5. 5. Cassandra Schema
  6. 6. 数据存储流程
  7. 7. 数据查询流程
  8. 8. QueryRunner
  9. 9. 分组聚合查询流程
    1. 9.1. tag分组
    2. 9.2. DataPointGroup
    3. 9.3. 其他分组
    4. 9.4. 聚合
    5. 9.5. EOF AGG QUERY
  10. 10. kairosdb-client