时间序列数据库之InfluxDB

InfluxDB is an open source distributed time series database

there are two major types of time-series data:

  • Regular – data that is sampled at regular intervals. For example every second, or every hour.
  • Irregular – data that is only recorded when an event occurs, which might happen frequently, but always at completely random times.

规则: 定时比如每秒钟,每小时抽样数据. 不规则: 当事件发生时才记录.

ts_reg rs_irreg

Schema

  • Measurements (e.g. cpu, temperature, event, memory)
  • Tags (e.g. region=uswest, host=serverA, sensor=23)
  • Fields (e.g. value=23.2, info=’this is some extra stuff’, present=true)
  • Timestamp (nano-second epoch)

All data is indexed by measurement, tagset, and time

  • Data bases –> MySQL,PG
  • Time series –> Table
  • Points or Events –> Row

Data in InfluxDB is organized by time series, which contain a measured value, like “cpu_load” or “temperature”.
Time series have zero to many points, one for each discrete sample of the metric.
Points consist of time (a timestamp), a measurement (“cpu_load”),
at least one key-value field (the measured value itself, e.g. “value=0.64” or “15min=0.78”),
and zero to many key-value tags containing metadata (e.g. “host=server01”, “region=EMEA”, “dc=Frankfurt”).
数据点包括时间,指标名称,至少一个Key-Value字段, 0个或多个标签(可以没有标签).

1
2
3
4
5
6
7
8
9
10
<measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [unix-nano-timestamp]
[key] [point] [timestamp]

key: <measurement>,<tag-set>
point: <field-set>

cpu,host=serverA,region=us_west value=0.64
temperature,machine=unit42,type=assembly external=25,internal=37 1434067467000000000
payment,device=mobile,product=Notepad,method=credit billed=33,licenses=3i 1434067467100293230
stock,symbol=AAPL bid=127.46,ask=127.48

A point with the measurement name of cpu and tag host, with the measured value of 0.64
如果指标的值只有单一的值,比如CPU的利用率,通常field字段为: value=0.64
如果指标的值不止一个, 比如机器的温度, 有多个字段表示: external=25,internal=37(室温和外温)

Write and Query

Conceptually you can think of a measurement as an SQL table, with rows where the primary index is always time.
tags and fields are effectively columns in the table. tags are indexed, fields are not.
指标名称会作为Table. 标签和字段会作为Table的列. 表的主键总是时间. 标签会被索引.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
> INSERT cpu,host=serverA,region=us_west value=0.64
> INSERT temperature,machine=unit42,type=assembly external=25,internal=37

> SELECT * FROM cpu
name: cpu
---------
time host region value
1451023636310055056 serverA us_west 0.64

> SELECT * FROM temperature
name: temperature
-----------------
time external internal machine type
1451023737743269282 25 37 unit42 assembly

从上面的插入和查询结果看出, 指标名称是Table, 所有的tag和字段都会作为列. 主键即第一列总是time字段.

插入和查询也可以通过浏览器进行: http://localhost:8083/

influx_web

Scala Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class InfluxDBClient(url: String, db: String, port: Int = 8086) {

val influxdb = InfluxDB.connect(url, port)
val database = influxdb.selectDatabase(db)
val exist = database.exists()
val result = Await.result(exist, 5 second)
result match {
case true => println("already exist")
case false => database.create()
}

def testBulkWrite(): Unit = {
val points = List(
Point("cpu", System.currentTimeMillis()).addField("value", 456),
Point("host", System.currentTimeMillis()).addField("value", 456),
Point("net", System.currentTimeMillis()).addField("value", 456)
)
database.bulkWrite(points, precision = Precision.MILLISECONDS)
}

def testWriteMetrics() = {
var points = List(
Point("appId_1").addTag("taskId","1").addField("bytes","10").addField("records", "1000"),
Point("appId_1").addTag("taskId","2").addField("bytes","15").addField("records", "1100"),
Point("appId_1").addTag("taskId","3").addField("bytes","18").addField("records", "1300"),
Point("appId_1").addTag("taskId","4").addField("bytes","28").addField("records", "1500"),
Point("appId_1").addTag("taskId","5").addField("bytes","50").addField("records", "1900")
)
database.bulkWrite(points, precision = Precision.MILLISECONDS)

Thread.sleep(1000)
points = List(
Point("appId_1").addTag("taskId","1").addField("bytes","55").addField("records", "2000"),
Point("appId_1").addTag("taskId","2").addField("bytes","60").addField("records", "2200"),
Point("appId_1").addTag("taskId","3").addField("bytes","74").addField("records", "2500"),
Point("appId_1").addTag("taskId","4").addField("bytes","88").addField("records", "2600"),
Point("appId_1").addTag("taskId","5").addField("bytes","100").addField("records", "3000")
)
database.bulkWrite(points, precision = Precision.MILLISECONDS)
}

}

Action

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
$ influx -host 10.57.xx.xx -port 8086
> connect influxdb.hz.xxx.cn:80
> show databases;
> use pontus;
> show measurements;
name: measurements
------------------
name
kafka
mysqlhdfs
> precision rfc3339 #这里差了8个时区
> select time,lag,logSize from kafka where jobId='215' and eventType='lagStatistics' and time > now() - 33h order by time desc;
name: kafka
-----------
time lag logSize
2017-05-19T09:19:16.39Z 0 181593
2017-05-19T06:49:16.506Z 0 181593
2017-05-19T06:46:16.467Z 0 181593 2017-05-19T14:46重启streaming
2017-05-19T06:43:16.479Z 175952 181593 相差5641
2017-05-19T06:40:16.65Z 175952 181593
2017-05-18T10:16:16.386Z 130008 135649
2017-05-18T10:13:16.452Z 130008 135649
2017-05-18T10:10:16.31Z 129546 135187
2017-05-18T02:28:06.335Z 10109 15734
2017-05-18T02:25:05.741Z 8297 13922
2017-05-18T02:22:05.511Z 6421 12046
2017-05-18T02:19:05.795Z 4513 10138
2017-05-18T02:16:05.704Z 2335 7960 相差5625
2017-05-18T02:13:05.47Z 245 5870
2017-05-18T02:10:06.101Z 0 5625 相差5625
2017-05-18T02:07:05.471Z 0 5625

2017-11-16T19:40:32.907Z 32456 1798688004
2017-11-16T19:37:32.922Z 33382 1798509135
2017-11-16T19:34:32.893Z 34429 1798313729
2017-11-16T19:31:32.981Z 25833 1798137612
2017-11-16T19:28:32.943Z 24161 1798000413
2017-11-16T19:25:32.967Z 26833 1797849966
2017-11-16T19:22:33.48Z 24147 1797702880

select * from kafka where eventType='batchComplete' and jobId='3663' and inputSize>0 order by time desc limit 300;
time appName eventType groupId inputSize jobId lag logSize processTime scheduleTime topic totalTime
2017-11-16T19:33:00Z batchComplete 60354 3663 3760 0 3760
2017-11-16T19:32:00Z batchComplete 61599 3663 3900 1 3901
2017-11-16T19:31:00Z batchComplete 66603 3663 3969 0 3969
2017-11-16T19:30:00Z batchComplete 65360 3663 4088 0 4088
2017-11-16T19:29:00Z batchComplete 64490 3663 3903 0 3903
2017-11-16T19:28:00Z batchComplete 63074 3663 4084 1 4085
2017-11-16T19:27:00Z batchComplete 55798 3663 3501 1 3502
2017-11-16T19:26:00Z batchComplete 48649 3663 3131 1 3132
2017-11-16T19:25:00Z batchComplete 47084 3663 3073 0 3073
2017-11-16T19:24:00Z batchComplete 44500 3663 3000 1 3001
2017-11-16T19:23:00Z batchComplete 43943 3663 2979 0 2979
2017-11-16T19:22:00Z batchComplete 51673 3663 3310 1 3311
2017-11-16T19:21:00Z batchComplete 51044 3663 3365 1 3366
2017-11-16T19:20:00Z batchComplete 50402 3663 3790 1 3791
2017-11-16T19:19:00Z batchComplete 49716 3663 2943 0 2943
2017-11-16T19:18:00Z batchComplete 48627 3663 2918 1 2919
2017-11-16T19:17:00Z batchComplete 46057 3663 3140 0 3140
2017-11-16T19:16:00Z batchComplete 45991 3663 2920 1 2921
2017-11-16T19:15:00Z batchComplete 43326 3663 2741 0 2741
2017-11-16T19:14:00Z batchComplete 13953 3663 1275 1 1276

1


文章目录
  1. 1. Schema
  2. 2. Write and Query
  3. 3. Scala Client
  4. 4. Action