# Apache Apex 流处理和批处理统一框架

Apache Apex: Industry’s only open source, enterprise-grade unified stream and batch processing engine. http://apex.incubator.apache.org/

## Hello Apex

### Key Concepts

Key set

A key set is a set of fields in the incoming tuple that is used to combine data for aggregation.

Value set

A value set is the set of fields in the incoming tuple on which Aggregator(s) are applied.

Aggregator

An aggregator is a mathematical function that is applied on value fields in an incoming tuple. Examples of aggregators are SUM, COUNT, MAX, MIN, and AVERAGE.

Aggregates

Aggregates are objects containing the aggregated values for a configured value set and key combination.

Time buckets

Time buckets indicate the duration for which the floor time is calculated. For example, for a time bucket of 1 minute, the floor time-value for both 12:01:34 and 12:01:59 will be 12:01:00. Similarly, for an hourly time bucket, floor time-value for 15:02:34 and 15:34:00 will be 15:00:00.

After calculating the floor value for a duration, the time-value becomes a key. Currently supported time buckets are: 1 second, 1 minute, 1 hour, and 1 day.

Combinations

Combinations indicate a group of the keys that are used for aggregate computations.

Incremental Aggregators

Incremental aggregators are aggregate functions for which computations are possible only by using previous aggregate value and the new value. For example,

On-the-fly Aggregators (OTF Aggregators)

On-the-fly aggregators are the aggregate functions that use the result of multiple incremental aggregators, and can be calculated on-the-fly if necessary. For example,

DimensionsComputation operator

The DimensionsComputation operator is an Operator that performs intermediate aggregations using incremental aggregators.

the DimensionsComputation operator applies incremental aggregators on the value set of the tuple data within a boundary of an application window. At the end of the application window, the aggregate value is reset to calculate the new aggregates. Thus, discrete aggregates are generated by DimensionsComputation operator for every application window. This output is used by the DimensionStore operator (labeled as Store in DAG) for calculating cumulative aggregates.
DimensionsComputation算子会在一个应用程序窗口的范围内对tupe data的value set运用增量聚合操作. 窗口结束后,聚合结果被重置用来计算新的聚合操作.

DimensionsStore operator

The DimensionsStore operator is an Operator that performs transient and final aggregations on the data generated by the Dimensions Computations operator. It maintains the historical data for aggregates to ensure a meaningful historical view.

The DimensionsStore operator uses the discrete aggregates generated by the DimensionsComputation operator to generate cumulative aggregates in turn. Because the aggregates generated by the DimensionsComputation operator are incremental aggregates, the sum of multiple such aggregates provides cumulative aggregates as follows:

The DimensionsStore operator also stores transient aggregates in a persistent proprietary store called HDHT. The DimensionsStore operator uses HDHT to present the requested historical data.

Query

Queries are parsed by the Query operator and passed onto DimensionsStore to fetch data from HDHT Store.

DAG

Partitioning

The Dimensions Computation operator can be statically partitioned for higher processing throughput.

This attribute creates a StatelessPartitioner for the DimensionsComputation operator. The parameter of 8 is going to partition the operator 8 times. The StatelessPartitioner ensures that the operators are clones of each other. The tuples passed to individual clones of operators are decided based on the hashCode of the tuple.

Unifier

A unifier which combines all the intermediate results from individual DimensionsComputation operators into a single stream. This stream is then passed to Dimensions Store. Following code needs to be added in populateDAG for adding an Unifier:
unifier(统一者)会合并所有DimensionsComputation的中间结果成一个单独的流. 这个流会传送到DimensionsStore

Dimensions Computation Configuration Example

publisher advertiser location impressions clicks cost revenue
google.com tongdun hangzhou 10 99 0.5 10

dimensions: - This defines the combinations of keys that are used for grouping data for aggregate calculations. This can be mentioned in combination with the JSON key.

keys只是定义了都有哪些维度字段,但是并不是所有的维度字段都必须参与到group by分组中(那样就只有最后一个combination).虽然给定这些维度字段,

additionalValues can be used for mentioning additional aggregators for any value. For example, impressions:MIN indicates that for a given combination, calculate “MIN” for value “impression” as well. By default, the down time rounded off to the next value as per time bucket is always considered as one of the keys.

### Application Development

1.聚合需求

• 报价Quote: 包括了最近的交易价格, 最近的交易时间, 当天的交易量
• 每分钟图表: 最高的交易价格, 最低的交易价格, 这一分钟的交易量
• 简单的滑动平均数: 过去5分钟的平均交易价格

• StockTickerInput读取实时数据的输入算子.输出字段:股票编号,价格,增量交易量,最近交易时间.
• 股票编号symbol是作为KeyValuePair的key, 输入算子输出三个OutputPort. 输入算子通过输出Port将数据传递到下一个算子中.
• 由于Operators会被序列化,所有的input和output ports都应该声明为transient,因为ports是无状态的,不应该被序列化(有状态的才需要序列化).

2.输入算子

emitTuples() emits the data to the output ports of the operator.
emitTuples() will be called one or more times within one application window as long as time is allowed within the window.
emitTuples方法将数据发送到输入算子的output ports中. 在一个应用程序的窗口内会调用多次emitTuples,只要时间在这个窗口内是允许的.

3.聚合算子

• 当天的交易总量, 增量求和算子, 设置Cumulative=true, 表示求和是以累积的方式. 设置类型为Long,表示交易量是Long类型.
• 每一分钟的交易量. 设置应用程序的时间窗口, 只在当前这一分钟的最后一个事件才发送这一分钟的求和结果
• 每一分钟的最值. 同样设置时间窗口, 由于是求最大值和最小值, 使用RangeKeyVal算子. 设置类型为Double,表示的是最小值和最大值都是Double的价格.
• 报价由三个来源组成: 首先要表示哪支股票(String), 最近交易价格(Double), 当天的交易量(Long), 最近交易时间(String).
• 图表由两个来源组成: 首先也是哪支股票(String), 最值交易价格(HighLow), 当前一分钟的交易量(Long)
• 过去五分钟的平均交易价格. 设置滑动窗口的大小使用setWindowSize.

SimpleMovingAverage keeps track of the data of the previous N application windows in a sliding manner.
For each end window event, it provides the average of the data in those application windows.
Moving算子会以滑动的行为跟踪前面N个应用程序窗口的数据. 在每个窗口的最后一个事件(这个事件标记了窗口结束),它提供了所有这N个窗口的平均值.

OperatorContext.APPLICATION_WINDOW_COUNT可以理解为跳动窗口,窗口之间不重叠. WindowSize表示滑动窗口的大小,窗口之间会重叠

4.构造DAG

Connecting the operators together and constructing the DAG: Now that we know the operators used, we will create the DAG, set the streaming window size, instantiate the operators, and connect the operators together by adding streams that connect the output ports with the input ports among those operators.

## Apache Apex Platform Overview

### Streaming Computational Model 流计算模型

• 应用程序运行在Apex平台中, 由operators算子和streams流组成的DAG表示.
• 输入数据都会在内存中被计算, 并且可以选择非阻塞地将输出保存到HDFS
• 在operators之间流动的数据是原子性的/不可变的, 每个data element和它的类型定义/schema组成一个Tuple
• 应用程序设计成这些tuples流经合适的计算单元,并得到最终的计算结果
• 消息队列(buffer server)会在不同的流程中管理计算单元之间的tuples streaming

Streaming Windows & Application Window

streaming platform的一个基础构件是将一个stream流分成(breaking up into)等价的有限的时间切片: 叫做streaming windows.

Application Window是应用程序的窗口,比如滑动窗口为5min, 则对于每秒钟的streaming window而言, 会有5*60=300个streaming windows.

A streaming window is an abstraction of many tuples into a higher atomic event for easier management.
An application window is a group of consecutive streaming windows used for data aggregation on a per operator level.
StreamingWindow是将多个tuples抽象成一个更高级的原子事件,为了便于管理. ApplicationWindow是一组连续的StreamingWindow.
ApplicationWindow是在每个operator级别上的数据聚合操作. 一个operator的聚合会在多个StreamingWindow上的tuples一批批地计算.

begin和end_window事件用来表示一个StreamingWindow的边界触发条件. 如果当前算子的这个窗口结束了,则当前算子的所有上游算子

### Streaming Application Manager (STRAM) 流应用程序管理

STRAM是YARN中的ApplicationMaster. 在YARN上可以运行MapReduce,Spark,Storm等.所以Apex也是运行在YARN之上的一个流处理框架.

## 参考文档

http://docs.datatorrent.com/application_development/