Apache Apex 流处理和批处理统一框架

Apache Apex: Industry’s only open source, enterprise-grade unified stream and batch processing engine. http://apex.incubator.apache.org/

Hello Apex

Key Concepts

Key set

A key set is a set of fields in the incoming tuple that is used to combine data for aggregation.
聚合计算时如何组合数据, 比如不同记录但是相同key,则会被组合起来一起计算,比如SQL中要进行group by的字段.

Value set

A value set is the set of fields in the incoming tuple on which Aggregator(s) are applied.
聚合计算的计算字段.上面的key类似于dimension维度,而value则类似于metrics指标,要进行聚合计算的字段,比如count(id)中的id.

Aggregator

An aggregator is a mathematical function that is applied on value fields in an incoming tuple. Examples of aggregators are SUM, COUNT, MAX, MIN, and AVERAGE.
聚合方式,常见的有求和,计数,最值,平均数. 比如count(id)中的count.

Aggregates

Aggregates are objects containing the aggregated values for a configured value set and key combination.
聚合操作包括了配置的value set的聚合结果,以及key的组合. 比如select type,count(id) from tbl group by type中的type,count(id).

Time buckets

Time buckets indicate the duration for which the floor time is calculated. For example, for a time bucket of 1 minute, the floor time-value for both 12:01:34 and 12:01:59 will be 12:01:00. Similarly, for an hourly time bucket, floor time-value for 15:02:34 and 15:34:00 will be 15:00:00.
时间桶是对原始的时间按照桶的方式进行划分. 如果桶的大小是一分钟,则在同一分钟的记录都会被分到同一个桶里面.

After calculating the floor value for a duration, the time-value becomes a key. Currently supported time buckets are: 1 second, 1 minute, 1 hour, and 1 day.
对时间进行floor操作后, time-value会成为一个key, 即Key set中要进行group by的字段.

疑问: 如果计算过去一分钟,假设当前时间是12:01:15,则过去一分钟的范围应该是:12:00:15开始的. 按照上面的分桶方式,就会存在12:00:00和12:01:00两个桶!

Combinations

Combinations indicate a group of the keys that are used for aggregate computations.

Incremental Aggregators

Incremental aggregators are aggregate functions for which computations are possible only by using previous aggregate value and the new value. For example,
增量聚合也是一种聚合计算的方式(类似于COUNT等): 直接使用上一次的聚合结果,再结合本次的值进行增量计算.

1
2
3
SUM = {Previous_SUM} + {Current_Value}
COUNT = {Previous_COUNT} + 1
MIN = ( {Current_Value} < {Previous_MIN} ) ? {Current_Value} : {Previous_MIN}

On-the-fly Aggregators (OTF Aggregators)

On-the-fly aggregators are the aggregate functions that use the result of multiple incremental aggregators, and can be calculated on-the-fly if necessary. For example,
即时聚合会使用多个增量聚合算子的结果,比如求平均数利用了上面两个增量算子:Current_SUM和Current_COUNT再进一步计算.

1
2
3
4
Current_SUM = {Previous_SUM} + {Current_Value}      ⬅️incremental aggregator
Current_COUNT = {Previous_COUNT} + 1

AVERAGE = {Current_SUM} / {Current_COUNT} ⬅️use the result of multiple incremental aggregators

DimensionsComputation operator

The DimensionsComputation operator is an Operator that performs intermediate aggregations using incremental aggregators.
维度计算算子只会使用增量聚合算子进行中间的聚合操作. 为什么是中间的? 因为它是不连续的. 最终要通过DimensionStore计算出累积的结果.

中间,不连续. 实际上因为维度计算算子的作用范围只是在一个应用程序窗口内的. 而这个窗口本身是一段一段的(每一段之间没有重叠).

the DimensionsComputation operator applies incremental aggregators on the value set of the tuple data within a boundary of an application window. At the end of the application window, the aggregate value is reset to calculate the new aggregates. Thus, discrete aggregates are generated by DimensionsComputation operator for every application window. This output is used by the DimensionStore operator (labeled as Store in DAG) for calculating cumulative aggregates.
DimensionsComputation算子会在一个应用程序窗口的范围内对tupe data的value set运用增量聚合操作. 窗口结束后,聚合结果被重置用来计算新的聚合操作.
因此在每一次的应用程序窗口中通过DimensionsComputation都会生成不连续的聚合结果. 它的输出会被DimensionStore使用,被用来计算累积的聚合结果.

DimensionsStore operator

The DimensionsStore operator is an Operator that performs transient and final aggregations on the data generated by the Dimensions Computations operator. It maintains the historical data for aggregates to ensure a meaningful historical view.
维度存储算子会使用维度计算算子生成的数据进行临时的或者最终的聚合. 它维护了历史数据的聚合,确保输出有意义的历史视图.

因为维度计算算子输出的是一段一段时间的结果, 所以维度存储算子需要对每一段的结果进行合并,确保结果是连续的.

The DimensionsStore operator uses the discrete aggregates generated by the DimensionsComputation operator to generate cumulative aggregates in turn. Because the aggregates generated by the DimensionsComputation operator are incremental aggregates, the sum of multiple such aggregates provides cumulative aggregates as follows:
因为维度计算生成的是不连续的,而且是增量的聚合. 维度存储算子做的工作就是对多个这样的聚合结果进行累积操作.

1
2
3
4
SUM1 = SUM(Value11, Value12, ...)
SUM2 = SUM(Value21, Value22, ...)

{Cumulative_SUM} = SUM1 + SUM2

The DimensionsStore operator also stores transient aggregates in a persistent proprietary store called HDHT. The DimensionsStore operator uses HDHT to present the requested historical data.
对于临时的聚合,维度存储算子会保存在一个持久化的专有存储系统HDHT中. 当请求历史数据时,维度存储算子会呈现HDHT中的数据. final聚合的结果存储在哪里?

存储算子,需要存储最终结果. 而计算算子,则只负责计算每一部分的数据, 最后汇总到存储算子做最终的聚合.

Query

Queries are parsed by the Query operator and passed onto DimensionsStore to fetch data from HDHT Store.
查询请求会被Query算子解析, 并传送给DimensionStore算子去从HDHT存储系统中获取数据. 这是不是说临时的和final的都存储在HDHT中?

DAG

apex-dag

Partitioning

The Dimensions Computation operator can be statically partitioned for higher processing throughput.

1
2
3
4
<property>
<name>dt.operator.DimensionsComputations.attr.PARTITIONER</name>
<value>com.datatorrent.common.partitioner.StatelessPartitioner:8</value>
</property>

This attribute creates a StatelessPartitioner for the DimensionsComputation operator. The parameter of 8 is going to partition the operator 8 times. The StatelessPartitioner ensures that the operators are clones of each other. The tuples passed to individual clones of operators are decided based on the hashCode of the tuple.
使用Partitioner可以增加处理能力. StatelessPartitioner可以确保复制的每个operator都是一样的. tupe转发到哪个operator处理通过hash.

Unifier

A unifier which combines all the intermediate results from individual DimensionsComputation operators into a single stream. This stream is then passed to Dimensions Store. Following code needs to be added in populateDAG for adding an Unifier:
unifier(统一者)会合并所有DimensionsComputation的中间结果成一个单独的流. 这个流会传送到DimensionsStore

1
2
DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag.addOperator("DimensionsComputation", DimensionsComputationFlexibleSingleSchemaPOJO.class);
dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());

Dimensions Computation Configuration Example

假设input tuple有如下的字段, 则publisher,advertiser,location当做维度字段,其他数值类型的当做指标字段.

publisher advertiser location impressions clicks cost revenue
google.com tongdun hangzhou 10 99 0.5 10

维度计算算子的配置示例, 分别使用到了上面的keys(分组/维度字段),values(计算/指标字段),时间分桶.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{"keys":[{"name":"publisher","type":"string","enumValues":["twitter","facebook","yahoo"]},
{"name":"advertiser","type":"string","enumValues":["starbucks","safeway","mcdonalds"]},
{"name":"location","type":"string","enumValues":["N","LREC","SKY","AL","AK"]}],
"timeBuckets":["1m","1h","1d"],
"values":
[{"name":"impressions","type":"long","aggregators":["SUM","COUNT","AVG"]},
{"name":"clicks","type":"long","aggregators":["SUM","COUNT","AVG"]},
{"name":"cost","type":"double","aggregators":["SUM","COUNT","AVG"]},
{"name":"revenue","type":"double","aggregators":["SUM","COUNT","AVG"]}],
"dimensions":
[{"combination":[]},
{"combination":["location"]},
{"combination":["advertiser"], "additionalValues":["impressions:MIN", "clicks:MIN", "cost:MIN", "revenue:MIN", "impressions:MAX", "clicks:MAX", "cost:MAX", "revenue:MAX"]},
{"combination":["publisher"], "additionalValues":["impressions:MIN", "clicks:MIN", "cost:MIN", "revenue:MIN", "impressions:MAX", "clicks:MAX", "cost:MAX", "revenue:MAX"]},
{"combination":["advertiser","location"]},
{"combination":["publisher","location"]},
{"combination":["publisher","advertiser"]},
{"combination":["publisher","advertiser","location"]}]
}

dimensions: - This defines the combinations of keys that are used for grouping data for aggregate calculations. This can be mentioned in combination with the JSON key.
维度集合, 是key的组合(key就是维度哦), 用来分组数组. 因为一条原始数据, 可能要根据不同的维度字段进行不同聚合操作.
比如上面的广告数据, 聚合计算可能是: 按照地区分组, 按照广告商分组, 按照发布者分组, 按照发布者和广告商分组等等.

keys只是定义了都有哪些维度字段,但是并不是所有的维度字段都必须参与到group by分组中(那样就只有最后一个combination).虽然给定这些维度字段,
但是实际的聚合操作可能只需要其中的一两个, 或者像第一个combination根本就没有分组字段. 所以用dimensions来表示用户的自定义聚合字段.

additionalValues can be used for mentioning additional aggregators for any value. For example, impressions:MIN indicates that for a given combination, calculate “MIN” for value “impression” as well. By default, the down time rounded off to the next value as per time bucket is always considered as one of the keys.

Application Development

1.聚合需求

  • 报价Quote: 包括了最近的交易价格, 最近的交易时间, 当天的交易量
  • 每分钟图表: 最高的交易价格, 最低的交易价格, 这一分钟的交易量
  • 简单的滑动平均数: 过去5分钟的平均交易价格

yahoo-fin

  • StockTickerInput读取实时数据的输入算子.输出字段:股票编号,价格,增量交易量,最近交易时间.
  • 股票编号symbol是作为KeyValuePair的key, 输入算子输出三个OutputPort. 输入算子通过输出Port将数据传递到下一个算子中.
  • 由于Operators会被序列化,所有的input和output ports都应该声明为transient,因为ports是无状态的,不应该被序列化(有状态的才需要序列化).

2.输入算子

emitTuples() emits the data to the output ports of the operator.
emitTuples() will be called one or more times within one application window as long as time is allowed within the window.
emitTuples方法将数据发送到输入算子的output ports中. 在一个应用程序的窗口内会调用多次emitTuples,只要时间在这个窗口内是允许的.
因为假设应用程序的窗口是1分钟,而emitTuples读取数据源是每隔5秒钟读取一次,则在一分钟内,显然会调用60/5=12次emitTuples.

1
2
3
4
5
6
7
8
9
10
11
public class StockTickInput implements InputOperator {
@OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<KeyValPair<String, Double>> price = new DefaultOutputPort<KeyValPair<String, Double>>();
@OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<KeyValPair<String, Long>> volume = new DefaultOutputPort<KeyValPair<String, Long>>();
@OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<KeyValPair<String, String>> time = new DefaultOutputPort<KeyValPair<String, String>>();

public void emitTuples(){
price.emit(new KeyValPair<String, Double>(symbol, currentPrice));
volume.emit(new KeyValPair<String, Long>(symbol, vol));
time.emit(new KeyValPair<String, String>(symbol, timeStamp));
}
}

计算当天总的交易量, 由于每次模拟的输入数据是当前的交易量,所以在第一次交易量之后,都要保存上一次的交易量,计算增量时以当前值-上次交易量.
因为当前的交易量数据包含了上一次的交易量,所以如果获取的这个交易量数据没有时间窗口的概念,则必然下一次的交易量比上一次的要大(或者相等).

3.聚合算子

  • 当天的交易总量, 增量求和算子, 设置Cumulative=true, 表示求和是以累积的方式. 设置类型为Long,表示交易量是Long类型.
  • 每一分钟的交易量. 设置应用程序的时间窗口, 只在当前这一分钟的最后一个事件才发送这一分钟的求和结果
  • 每一分钟的最值. 同样设置时间窗口, 由于是求最大值和最小值, 使用RangeKeyVal算子. 设置类型为Double,表示的是最小值和最大值都是Double的价格.
  • 报价由三个来源组成: 首先要表示哪支股票(String), 最近交易价格(Double), 当天的交易量(Long), 最近交易时间(String).
  • 图表由两个来源组成: 首先也是哪支股票(String), 最值交易价格(HighLow), 当前一分钟的交易量(Long)
  • 过去五分钟的平均交易价格. 设置滑动窗口的大小使用setWindowSize.

每个算子的第一个元素都是Key, 都要表示当前处理的是哪条记录,比如示例中是哪一只股票,类型都是String.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//This sends total daily volume by adding volumes from each ticks.
public SumKeyVal<String, Long> getDailyVolumeOperator(String name, DAG dag) {
SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
oper.setType(Long.class); //每支股票的当天交易量,值为交易量
oper.setCumulative(true); //累积,因为Input的输出是增量,所以要累积求和
return oper;
}

//Get aggregated volume of 1 minute and send at the end window of 1 minute.
public SumKeyVal<String, Long> getMinuteVolumeOperator(String name, DAG dag, int appWindowCount) {
SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
oper.setType(Long.class);
oper.setEmitOnlyWhenChanged(true);
dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount); //一分钟的时间窗口
return oper;
}
//Get High-low range for 1 minute.
public RangeKeyVal<String, Double> getHighLowOperator(String name, DAG dag, int appWindowCount) {
RangeKeyVal<String, Double> oper = dag.addOperator(name, new RangeKeyVal<String, Double>());
dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);
oper.setType(Double.class);
return oper;
}

//1.Quote (Merge price, daily volume, time)
public ConsolidatorKeyVal<String,Double,Long,String,?,?> getQuoteOperator(String name, DAG dag){
ConsolidatorKeyVal<String,Double,Long,String,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,Double,Long,String,Object,Object>());
return oper;
}

//2.Chart (Merge minute volume and minute high-low)
public ConsolidatorKeyVal<String,HighLow,Long,?,?,?> getChartOperator(String name, DAG dag) {
ConsolidatorKeyVal<String,HighLow,Long,?,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,HighLow,Long,Object,Object,Object>());
return oper;
}

//3.Get simple moving average of price.
public SimpleMovingAverage<String, Double> getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount) {
SimpleMovingAverage<String, Double> oper = dag.addOperator(name, new SimpleMovingAverage<String, Double>());
oper.setWindowSize(appWindowCount);
oper.setType(Double.class);
return oper;
}

SimpleMovingAverage keeps track of the data of the previous N application windows in a sliding manner.
For each end window event, it provides the average of the data in those application windows.
Moving算子会以滑动的行为跟踪前面N个应用程序窗口的数据. 在每个窗口的最后一个事件(这个事件标记了窗口结束),它提供了所有这N个窗口的平均值.

OperatorContext.APPLICATION_WINDOW_COUNT可以理解为跳动窗口,窗口之间不重叠. WindowSize表示滑动窗口的大小,窗口之间会重叠

1
2
3
4
5
跳动窗口(WINDOW_COUNT)                    滑动窗口(WindowSize)
--------------------------> ---------------------------> Tuple
|-------| |-------|
|-------| |-------|
|-------| |-------|

4.构造DAG

Connecting the operators together and constructing the DAG: Now that we know the operators used, we will create the DAG, set the streaming window size, instantiate the operators, and connect the operators together by adding streams that connect the output ports with the input ports among those operators.
将所有的算子连接起来构造DAG: 设置流窗口大小,实例化算子,为算子之间添加stream,连接算子之间的输入ports和输出ports.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void populateDAG(DAG dag, Configuration conf){
//实例化各个算子,然后将算子使用stream进行连接
dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data);
dag.addStream("time", tick.time, quoteOperator.in3);
dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2);

dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE"));

dag.addStream("high_low", highlow.range, chartOperator.in1);
dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2);
dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART"));

dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA"));
}

Apache Apex Platform Overview

Streaming Computational Model 流计算模型

  • 应用程序运行在Apex平台中, 由operators算子和streams流组成的DAG表示.
  • 输入数据都会在内存中被计算, 并且可以选择非阻塞地将输出保存到HDFS
  • 在operators之间流动的数据是原子性的/不可变的, 每个data element和它的类型定义/schema组成一个Tuple
  • 应用程序设计成这些tuples流经合适的计算单元,并得到最终的计算结果
  • 消息队列(buffer server)会在不同的流程中管理计算单元之间的tuples streaming

Streaming Windows & Application Window

streaming platform的一个基础构件是将一个stream流分成(breaking up into)等价的有限的时间切片: 叫做streaming windows.
每个streaming window是一小批数据: micro-batch. 一般一个窗口的大小是500ms. 即每隔500ms形成一个micro-batch-window.
虽然Apex平台是以tuple级别来处理计算的, 只有在window的边界才会记录. –> 以每个window为边界处理这个窗口中一批的tuples.
以窗口为边界处理一批一批的tuples,所以一整个窗口内的计算对于platform是一个原子事件.

计算单元针对的是tuple元组, 但是并不是每一个元组过来了就记录一次, 而是等一个窗口满了之后,一起处理这个窗口内的所有有序tuples.
假设一秒中收到了1000个tuples,如果一个tuple就记录一次,需要记录1000次. 而以一秒的窗口为时间单位,则只需要记录一次.

micro-batch

Application Window是应用程序的窗口,比如滑动窗口为5min, 则对于每秒钟的streaming window而言, 会有5*60=300个streaming windows.

A streaming window is an abstraction of many tuples into a higher atomic event for easier management.
An application window is a group of consecutive streaming windows used for data aggregation on a per operator level.
StreamingWindow是将多个tuples抽象成一个更高级的原子事件,为了便于管理. ApplicationWindow是一组连续的StreamingWindow.
ApplicationWindow是在每个operator级别上的数据聚合操作. 一个operator的聚合会在多个StreamingWindow上的tuples一批批地计算.

上图的每个Window都是以一个begin_window事件开始,以一个end_window事件结束. 由于计算是并行执行的为了流程序最快完成, 每个operator无法
精确地预测它发送出去的tuples什么时候会被下游operator处理. 能保证的是: 上游Operator在处理当前或往后的window, 下游Operator处理的则是
当前或往前一个Window. 当任意Operator的一个StreamingWindow处理完成时, 所有它的上游算子都必须已经处理完这个Window了(begin并没有这个限制).

begin和end_window事件用来表示一个StreamingWindow的边界触发条件. 如果当前算子的这个窗口结束了,则当前算子的所有上游算子
都已经完全处理完这个窗口的数据,否则如果还有上游算子在这个窗口上的tuple没有处理完, 当前算子是没办法结束这个窗口的.
对于同一个StreamingWindow, 多个算子之间是存在关联关系的. 即上游算子在某个窗口的处理结果会发送给下游算子继续在这个窗口处理.

Streaming Application Manager (STRAM) 流应用程序管理

STRAM是YARN中的ApplicationMaster. 在YARN上可以运行MapReduce,Spark,Storm等.所以Apex也是运行在YARN之上的一个流处理框架.
应用程序用DAG构成, 最终要被YARN框架调度, 都要经过逻辑计划-物理计划(partition)-执行计划(fragments), 最后在YARN的Container上运行任务.

下图是以Container的角色对应到DAG拓扑图的每个算子:

stram_container

下图是应用程序的Quote的执行计划的生成过程:

stram_plan

各个计划的组成部分都是构成DAG的算子, 每个算子在DAG中都被唯一编号,如果算子分区,会以运行的Container编号再次区分.

参考文档

http://docs.datatorrent.com/application_development/


文章目录
  1. 1. Hello Apex
    1. 1.1. Key Concepts
    2. 1.2. Application Development
  2. 2. Apache Apex Platform Overview
    1. 2.1. Streaming Computational Model 流计算模型
    2. 2.2. Streaming Application Manager (STRAM) 流应用程序管理
  3. 3. 参考文档