StreamCQL源码阅读(4) 应用程序执行

StreamCQL应用程序执行流程

前戏: CQL代码结构

之前我们并没有梳理CQL部分的代码结构, 在分析了差不多的代码之后, 来看看每个部分都一一对应:

cql

还没有涉及的包括: PhysicalPlan,物理计划/逻辑计划优化器,executors执行器.

正文: submitApplication

历经千辛万苦, 终于回到SubmitTask的submitApplication, 创建物理计划Executor,并执行Application.

1
2
3
private void submitApplication() {
new PhysicalPlanExecutor().execute(context.getApp());
}

api.Application -> application.Application

api的Application是流处理执行计划应用程序, 封装的是CQL语句构建而成的应用程序:

1
2
3
4
5
6
7
8
9
10
public class Application {                      
private String applicationId = null; //应用id
private String applicationName = null; //应用名称
private TreeMap<String, String> confs; //整个应用程序中用到的配置属性,也包含用户自定义的配置属性
private String[] userFiles; //用户自定义添加的一些文件
private List<UserFunction> userFunctions; //用户自定义的函数,udf和udaf都在这个里面
private List<Schema> schemas = new ArrayList<Schema>(); //执行计划中的所有的schema
private List<Operator> operators = null; //执行计划中所有的操作,包含输入、输出和计算操作
private List<OperatorTransition> opTransition = null; //整个执行计划中所有的连接线,定义了operator之间的连接关系
}

还记得上一篇中在buildApplication时,由SplitContext拆分算子,组合算子会把operators和transitions都设置到Application里吗?

application.Application针对Schema和Operator采用Manager管理类(实际上底层的存储结构都是Map)来操作:

1
2
3
4
5
6
public abstract class Application {
private String appName; //应用程序名称
private EventTypeMng streamSchema; //所有Schema集合
private OperatorMng operatorManager; //算子集合
private StreamingConfig conf; //系统级别的配置属性
}

Application在API物理计划是不同的对象, 同样API阶段的Operator在物理计划中对应的是IRichOperator.

IRichOperator

OperatorMng管理的算子包括输入算子(addInputStream),输出算子(addOutputStream),功能算子(addFunctionStream).

虽然两个Operator处于不同的阶段, 但是总的来说都可以分为输入,输出和Function.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
① application ⬇️                                            ② api ⬇️
IRichOperator (com.huawei.streaming.operator) Operator (com.huawei.streaming.api.opereators)
|-- AbsOperator |-- FunctionStreamOperator
|-- FunctionOperator |-- InnerOutputSourceOperator
|-- JoinFunctionOp |-- SplitterOperator
|-- DataSourceFunctionOp |-- OutputStreamOperator
|-- AggFunctionOp |-- InputStreamOperator
|-- SplitOp |-- InnerInputSourceOperator
|-- UnionFunctionOp |-- InnerFunctionOperator
|-- SelfJoinFunctionOp |-- FunctorOperator
|-- FunctorOp |-- UnionOperator
|-- FilterFunctionOp |-- FilterOperator
|-- OutputOperator |-- BasicAggFunctionOperator
|-- FunctionStreamOperator |-- JoinFunctionOperator
|-- InputOperator |-- AggregateOperator
|-- BaseDataSourceOperator

IRichOperator流处理算子基本接口: 所有的流处理相关的算子实现,都来源于这个算子, 所有的外部Storm实现均依赖于这个接口

正常的CQL insert语句只有一个输出,所以在前面的SplitContext中会有一个outputStreamName和一系列的operators和transitions.
但是这里对于IRichOperator流算子, 它是构成Storm的Topology组件,就必须考虑算子之间数据的流动,一个Bolt可能有多个输入和输出.

stream_inout

对于一个算子而言,输入数据可以有多个.但是输出是只有一个! 这就好比最终的select只会有一个输出schema: select输出的数据作为算子的输出.
所以IRichOperator的输入getInputStream和getInputSchema都是集合, 而输出的getOutputStream和getOutputSchema都是单一的.
Update: 输入输出这里其实看流,反正是一个流一个名称。都是允许多输入多输出的。一个算子就是一个流,一个算子的多个实例都属于一个流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface IRichOperator extends IOperator, Configurable{     //① 流处理算子基本接口
String getOperatorId(); //获取算子id
int getParallelNumber(); //获取算子并发度
List<String> getInputStream(); //获取输入流名称, 多个输入流
String getOutputStream(); //获取输出流名称
Map<String, IEventType> getInputSchema(); //获取输入schema ⬅️ <key是输入流名称,IEventType是输入流的Schema>
IEventType getOutputSchema(); //获取输出schema ⬅️
Map<String, GroupInfo> getGroupInfo(); //获取分组信息
}

public class Operator { // ② 每个计算单元成为一个Operator,定义了各类操作.分为Source和InnerFunction
private String id; //算子ID
private String name; //算子名称
private int parallelNumber; //并行度
private TreeMap<String, String> args; //每个operator的参数
}

public class OperatorTransition { // ③ 各种Operator的连接关系,定义了从一个operator到另外一个operator的连接
private String id; //当前连接的id
private String streamName; //流名称
private String fromOperatorId; //发起连接的Operator id
private String toOperatorId; //接收连接的Operator id
private DistributeType distributedType; //数据获取类型,仅仅在非sourceOperator中存在
private String distributedFields; //数据分发字段, 仅在distributedType为field的时候生效
private String schemaName; //流上进行数据传输的时候的schema名称
}

还记得上一章在创建Transition时会指定分组字段和分组策略吗? 和这里的GroupBy应该是会有点血缘关系的. 当然算子还少不了输入输出Schema.

PhysicalPlanExecutor -> ExecutorPlanGenerator

PhysicalPlanExecutor.execute传入的是API的Application, 而submit(app)的app是application.Application.
怎么转换: 通过ExecutorPlanGenerator物理计划生成器生成可执行的计划. 可执行指的是可以运行在Storm引擎.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void execute(Application apiApplication) {
LOG.info("start to execute application {}", apiApplication.getApplicationId());
parseUserDefineds(apiApplication, isStartFromDriver); //① 准备工作
com.huawei.streaming.application.Application app = generatorPlan(apiApplication); // ⬅️ API.App->application.App
submit(app); //④ 提交Application
}
//由物理执行计划api.Application生成可执行计划application.Application(最终生成的,可以提交的应用程序)
private com.huawei.streaming.application.Application generatorPlan(Application apiApplication) {
preExecute(apiApplication); //执行器执行之前的钩子
new PhysicPlanChecker().check(apiApplication);
//② 用户自定义的处理: 执行计划的组装, 构建application, 表达式的解析被延迟到这里来实现
com.huawei.streaming.application.Application app = generator.generate(apiApplication);
preSubmit(app); //提交执行计划之前的钩子
executorChecker.check(app); //③ 执行计划检查
return app;
}

①准备工作parseUserDefineds: 注册jar包,注册函数,打包等(类似storm中需要上传jar包).

1
2
3
4
5
6
7
8
9
2015-11-25 02:32:24 | INFO  | [main] | 👉 start to execute application example | com.huawei.streaming.cql.executor.PhysicalPlanExecutor (PhysicalPlanExecutor.java:127)
2015-11-25 02:32:25 | INFO | [main] | start to unzip jar stream-storm-1.0-jar-with-dependencies.jar | com.huawei.streaming.cql.executor.mergeuserdefinds.JarExpander (JarExpander.java:79)
2015-11-25 02:32:25 | INFO | [main] | unzip jar /private/var/folders/xc/x0b8crk9667ddh1zhfs29_zr0000gn/T/da6d53114b1f49458c0e6329553b1ff9/stream-storm-1.0-jar-with-dependencies.jar to /private/var/folders/xc/x0b8crk9667ddh1zhfs29_zr0000gn/T/da6d53114b1f49458c0e6329553b1ff9/jartmp | com.huawei.streaming.cql.executor.mergeuserdefinds.JarExpander (JarExpander.java:91)
2015-11-25 02:32:30 | INFO | [main] | finished to unzip jar to dir | com.huawei.streaming.cql.executor.mergeuserdefinds.JarExpander (JarExpander.java:84)
2015-11-25 02:32:30 | INFO | [main] | start to copy ch | com.huawei.streaming.cql.executor.mergeuserdefinds.JarFilesMerger (JarFilesMerger.java:82)
...
2015-11-25 02:32:38 | INFO | [main] | finished to package jar | com.huawei.streaming.cql.executor.mergeuserdefinds.JarPacker (JarPacker.java:68)

2015-11-25 02:32:39 | INFO | [main] | 👉 start to generator executor application for app example | com.huawei.streaming.cql.executor.ExecutorPlanGenerator (ExecutorPlanGenerator.java:102)

生成的可执行计划会: ①设置系统配置参数, 解析Application中的②Schema和③Operators, 最终返回的是可执行的Application.

1
2
3
4
5
6
7
8
9
public com.huawei.streaming.application.Application generate(Application vap) {
LOG.info("start to generator executor application for app " + vap.getApplicationId());
apiApplication = vap; //这个是API的Application
createEmptyApplication(vap.getApplicationId());
parseUserDefineds(vap); //① 用户自定义的处理(系统配置参数)
parseSchemas(); //② 解析所有的Schema,构建schema信息
parseOperators(); //③ 解析所有的Operator,构建OperatorInfo. 整理Operator中的上下级关系
return executorApp; //返回的是可执行的Application
}

parseSchemas: Schema -> TupleEventType

在第一篇中Topology的CQL语句:

1
2
3
4
5
6
7
CREATE INPUT STREAM s(id INT, name STRING, type INT) SOURCE randomgen PROPERTIES ( timeUnit = "SECONDS", period = "1", eventNumPerperiod = "1", isSchedule = "true" );
CREATE OUTPUT STREAM rs(type INT, cc INT) SINK consoleOutput;

INSERT INTO STREAM rs #INSERT STATEMENT
SELECT type, COUNT(id) as cc #SELECT STATEMENT
FROM s[RANGE 20 SECONDS BATCH] #FROM CLAUSE
WHERE id > 5 GROUP BY type; #WHERE CLAUSE/GROUPBY CLAUSE

上面有两个流,分别是输入流s, 输出流rs. 因此会将这两个Schema添加到可执行Application中.

对于输入输出而言,最重要的就是Schema了, 在输入和输出这一对双胞胎眼中,数据进来和出去的格式非常重要.
因为外部数据进来,和暴露数据给外部,最重要的是格式. 你中间不管怎么处理,它们都不管. Only Schema! Give Me the Data!

1
2
2015-11-25 02:32:39 | INFO  | [main] | AddEventType enter, the eventtypeName is:s. | com.huawei.streaming.event.EventTypeMng (EventTypeMng.java:73)
2015-11-25 02:32:39 | INFO | [main] | AddEventType enter, the eventtypeName is:rs. | com.huawei.streaming.event.EventTypeMng (EventTypeMng.java:73)

parseSchemaToIEvent会将API的Application中所有的Schema都转换为IEvent事件:TupleEventType.
Schema中的Column对应TupleEventType的Attribute. Schema的streamName/id对应了下面的name/事件类型.

1
2
3
4
5
6
7
public class TupleEventType implements IEventType {
private String name; //schemaName,表名
private Attribute[] schema; //schemas, 所有列
private String[] attNames; //所有列的列名
private Class< ? >[] attTypes; //所有列的列类型
private HashMap<String, Integer> attid;
}

Schema的管理类用Map结构保存schemaName/eventTypeName和对应的Schema/TupleEventType: 表名->表结构.

1
2
3
4
5
6
7
public class EventTypeMng implements Serializable {
private Map<String, IEventType> schemas; //MAP: 数据类型名称 => 具体数据类型

public void addEventType(IEventType schema) { //⬅️ executorApp.addEventSchema(tupleEventType)
schemas.put(schema.getEventTypeName(), schema); //数据类型|事件类型|表名schemaName|streamName流名称
}
}

parseOperators: 算子解析

前面第一篇的时候submit之前每条CQL语句都有start to parse cql过一次了,这里为什么还会再次parse?
答: 前面只是LazyTask懒解析,其实还是没有开始的. 那为什么要在这里才开始? 因为解析完schema后, 就该轮到operator的解析了.
不过有一点不同的是, 最开始的parse是对整个CQL语句, 这里只解析了部分, 比如表达式,groupby,聚合. WHYYY?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
解析算子: 1)解析二元表达式(属性值表达式) id > 5
2015-11-25 02:32:39 | INFO | [main] | start to parse cql : (s.id > 5) | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:44)
2015-11-25 02:32:39 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:69)
2015-11-25 02:32:39 | INFO | [main] | start to create binary Expressions. | com.huawei.streaming.cql.executor.expressioncreater.PropertyValueExpressionCreator (BinaryExpressionCreator.java:54) ⬅️
2015-11-25 02:32:39 | INFO | [main] | Parse Completed, cql : s.type, count( s.id ) | com.huawei.streaming.cql.semanticanalyzer.parser.SelectClauseParser (SelectClauseParser.java:68)

2)解析 group by
2015-11-25 02:32:39 | INFO | [main] | start to parse cql : s.type | com.huawei.streaming.cql.semanticanalyzer.parser.GroupbyClauseParser (GroupbyClauseParser.java:45) ⬅️
2015-11-25 02:32:39 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.GroupbyClauseParser (GroupbyClauseParser.java:68)
2015-11-25 02:32:39 | INFO | [main] | start to parse cql : s.type | com.huawei.streaming.cql.semanticanalyzer.parser.GroupbyClauseParser (GroupbyClauseParser.java:45)
2015-11-25 02:32:39 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.GroupbyClauseParser (GroupbyClauseParser.java:68)

3)聚合算子 count(id)
2015-11-25 02:32:39 | INFO | [main] | start to create aggregate service | com.huawei.streaming.cql.executor.operatorviewscreater.AggregateServiceViewCreator (AggregateServiceViewCreator.java:89) ⬅️

不仅仅是Driver.run调用了ApplicationParser.parse. 从调用树还能看到有XXXInfoCreator,ViewCreator.(还记得聚合也是一种查询吗)

stream-opinfo

算子解析: 这里的解析是为了使得输入和输出算子统一,避免用户自定义和系统内置的算子对外表现不一致处理起来的麻烦
由于输入和输出算子中存在特例,即针对文件,tcp,kafka等编写了特例, 所以需要①首先将他们抽象化,之后再来处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void parseOperators(){
Map<String, Operator> opts = formatOperators(); //① 输入输出算子抽象化
Map<String, AbsOperator> opMappings = createOperatorInfos(opts); //② 算子解析 ⬅️ Operator -> AbsOperator
combineOperators(opMappings); //③ 整理算子顺序

for (Entry<String, AbsOperator> et : opMappings.entrySet()){ //④ 添加算子到Application
IRichOperator operator = et.getValue();
//如果没有输入,也算是input
if (operator instanceof InputOperator){
executorApp.addInputStream(operator);
continue;
}
//如果没有输出,也算是output
if (operator instanceof OutputOperator){
executorApp.addOutputStream(operator);
continue;
}
//不是输入输出,就是功能算子
executorApp.addFunctionStream(operator);
}
}

② Operator是算子, AbsOperator则是流处理算子(继承IRichOperator).它们的转换由createOperatorInfos->OperatorInfoCreatorFactory完成.
④ 和Schema的管理一样,算子的管理类OperatorMng用三个Map分别管理输入,输出,功能算子. Map的key是operatorId,value是Operator算子本身.

1
2
3
4
5
6
public class OperatorMng{    
private List<IRichOperator> sortedFunctions; //DFG排序后的功能算子列表,作为创建Storm拓扑顺序的基础(输出和功能算子组成-->Bolt)
private Map<String, IRichOperator> inputs; //输入算子 --> Spout
private Map<String, IRichOperator> functions; //功能算子 --> Bolt
private Map<String, IRichOperator> outputs; //输出算子 --> Bolt
}

上面parseOperators的两个Map:opts和opMapping以及OperatorMng的三个Map的Key,Value都是operatorID->AbsOperator实例.

createOperatorInfos

operators是API中的算子, 要转换为可执行计划对应的算子, 通过OperatorInfoCreatorFactory工厂类根据operator上的注解
先取得OperatorInfoCreator的具体实现类,再调用createInstance, 在具体的OperatorInfoCreator实现类中才完成转换.

1
2
3
4
5
6
7
8
9
10
11
private Map<String, AbsOperator> createOperatorInfos(Map<String, Operator> operators) {
Map<String, AbsOperator> opMappings = Maps.newHashMap();
for (Operator op : operators.values()) {
AbsOperator opinfo = createOperatorInfo(op);
opMappings.put(opinfo.getOperatorId(), opinfo);
}
return opMappings;
}
private AbsOperator createOperatorInfo(Operator operator) {
return OperatorInfoCreatorFactory.createOperatorInfo(apiApplication, operator, executorApp.getStreamSchema(), this.systemConfig);
}

假设Operator是AggregateOperator,它的注解是AggregaterInfoCreator.所以最终调用的是AggregaterInfoCreator.createInstance创建AggFunctionOp.

1
2
@OperatorInfoCreatorAnnotation(AggregaterInfoCreator.class)
public class AggregateOperator extends BasicAggFunctionOperator

接口OperatorInfoCreator创建每个算子的实例, createInstance参数分别是: 执行计划信息,xml执行计划中的算子信息(哪里的xml?),schema信息,系统配置信息.
下图是OperatorInfoCreator的实现类, 创建的算子实例除了FunctionOperator,还有InputOperator,OutputOperator,FunctionStreamOperator.

1
2
3
4
5
6
7
8
9
10
11
OperatorInfoCreator (c.h.s.c.e.operatorinfocreater)     创建的算子实例               父类
|-- AggregaterInfoCreator AggFunctionOp >> FunctionOperator
|-- DataSourceInfoOperatorCreator FunctionOperator -- FunctionOperator
|-- FunctorInfoCreator FunctorOp >> FunctionOperator
|-- FilterInfoCreator FilterFunctionOp >> FunctionOperator
|-- InputInfoCreator InputOperator
|-- UnionInfoCreator UnionFunctionOp >> FunctionOperator
|-- OutputInfoCreator OutputOperator
|-- SplitterInfoCreator SplitOp >> FunctionOperator
|-- JoinInfoOperatorCreator JoinFunctionOp >> FunctionOperator
|-- FunctionStreamInfoCreator FunctionStreamOperator

以AggregaterInfoCreator将AggregateOperator转换为AggFunctionOp为例: 由于AggregateOperator本身包含了Window对象
以及filterBeforeAggregate,filterAfterAggregate等字符串, 所以根据这些数据构造相应的对象,并最终构造出AggFunctionOp.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public AbsOperator createInstance(Application vapp, Operator operator, EventTypeMng streamschema, Map<String, String> systemConfig) {
LOG.debug("start to create aggregate operator"); //LOG
prepare(vapp, operator, systemConfig);
//窗口,过滤,表达式
WindowViewCreator creater = new WindowViewCreator();
IWindow window = creater.create(inputSchemas, aggOperator.getWindow(), this.applicationConfig);
FilterView filterView = createFilterView();
IExpression bexpr = null;
if (filterView != null) bexpr = filterView.getBoolexpr();
//聚合结果合并
AggResultSetParameters pars = createResultSetMergeParmeters(streamschema, window, bexpr);
IAggResultSetMerge resultSetMerge = new AggResultSetMergeViewCreator(pars).create(); // ⬅️
//聚合算子
AggFunctionOp aggFunctionOp = new AggFunctionOp(window, filterView, resultSetMerge, OutputTypeAnalyzer.createOutputType(aggOperator.getWindow()));
//系统参数
StreamingConfig config = new StreamingConfig();
if (operator.getArgs() != null) config.putAll(operator.getArgs());
config.putAll(this.applicationConfig);
aggFunctionOp.setConfig(config);
//设置并行度和ID,完成流算子的构建
return OperatorInfoCreatorFactory.buildStreamOperator(operator, aggFunctionOp);
}

上面生成AggFunctionOp动用了WindowViewCreator创建Window,FilterView,AggResultSetMergeViewCreator创建IAggResultSetMerge. 最后组装成AggFunctionOp.

上一章AggregateSplitter拆分算子的时候解析From子句时就创建了FilterBeforeWindow的FilterOperator和AggregateOperator,
这里也有Window和Filter,不过是FilterBeforeAggregate. 这两个对象分别对应了AggregateOperator的Window对象和filterBeforeAggregate字符串.
这是因为拆分算子的时候创建的FilterOperator是FilterBeforeWindow, 创建的AggregateOperator本身包含了FilterBeforeAggregate和Window.

IAggResultSetMerge表示聚合结果合并. 将IAggResultSetMerge设置给AggFunctionOp, 后面初始化Op的时候会用到这个对象来创建处理视图.
A.结合前面的代码阅读体验, 比如第一章创建完Task和SemanticAnalyzer都会初始化.这里创建完Op也会初始化流计算算子.
B.实际上对象创建完不就是被使用嘛! 如果有依赖的对象,可以设置到构造函数中,在实际使用对象的时候获取依赖对象进行必要计算,充分体现JAVA的面向对象思想.
C.数据的传递:从语法解析结果到语义解析结果到拆分的算子再到这里的流计算算子. 数据都是层层传递并不断更新或添加新的数据结构满足不同阶段的对象构建.

那么话说AbsOperator比如AggFunctionOp是在什么时候初始化的? 答案是: 在StormSpout/StormBolt的prepare中初始化.

combineOperators

上一篇重点介绍了拆分算子SplitContext(Operator)和组合算子OperatorCombiner(连线). 这里也不例外,在创建完具体的AbsOperator算子后,也需要组合.
这里的组合不再需要创建Transition了. 因为OperatorTransition是针对Operator. 而AbsOperator是没有Transition的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void combineOperators(Map<String, AbsOperator> operatorInfos) {
//算子之间的连接, 获取算子通过apiApplication.getOperators(), 获取连线也是通过apiApplication
for (OperatorTransition ot : apiApplication.getOpTransition()) {
//获取连接的入口和出口算子
String fromOpId = ot.getFromOperatorId();
String toOpId = ot.getToOperatorId();
String streamName = ot.getStreamName();
DistributeType distributedType = ot.getDistributedType();
String distributedFields = ot.getDistributedFields();
//连接的Schema, 对于From和To都是使用相同的Schema
String outputSchemaName = ot.getSchemaName();
distributedFields = ExecutorUtils.removeStreamName(distributedFields);
TupleEventType outputSchema = (TupleEventType)(executorApp.getEventType(outputSchemaName));

//operatorInfos是所有的算子集合, 根据传入的fromOpId或者toOpId,从集合中找出对应的算子
combineFromTransition(operatorInfos, fromOpId, streamName, outputSchema);
combineToTransition(operatorInfos, toOpId, streamName, distributedType, distributedFields, outputSchema);
}
}

FromTransition: 连线的from算子的输出是outputSchema
ToTransition: 连线的to算子的输入是outputSchema. 这里outputSchema命名为schema似乎更好

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void combineFromTransition(Map<String, AbsOperator> operatorInfos, String fromOpId, String streamName, TupleEventType outputSchema){
AbsOperator sopInfo = operatorInfos.get(fromOpId);
StreamingConfig sConfig = sopInfo.getConfig();
sConfig = (sConfig == null ? new StreamingConfig() : sConfig);
sConfig.put(StreamingConfig.STREAMING_INNER_OUTPUT_SCHEMA, outputSchema);
sConfig.put(StreamingConfig.STREAMING_INNER_OUTPUT_STREAM_NAME, streamName);
sopInfo.setConfig(sConfig);
}
private void combineToTransition(Map<String, AbsOperator> operatorInfos, String toOpId, String streamName,
DistributeType distributedType, String distributedFields, TupleEventType outputSchema) {
AbsOperator fopInfo = operatorInfos.get(toOpId);
if (!StringUtils.isEmpty(distributedFields)) {
fopInfo.setGroupInfo(streamName, distributedType, distributedFields.split(","));
} else {
fopInfo.setGroupInfo(streamName, distributedType, null);
}
StreamingConfig sConfig = fopInfo.getConfig();
sConfig = (sConfig == null ? new StreamingConfig() : sConfig);
sConfig.put(StreamingConfig.STREAMING_INNER_INPUT_STREAM_NAME, streamName);
sConfig.put(StreamingConfig.STREAMING_INNER_INPUT_SCHEMA, outputSchema);
fopInfo.setConfig(sConfig);
}

类似于Graph中的顶点A -> 边 -> 顶点B. Transition就类似于边, 连接着左右两边的算子, 分别是From算子和To算子.

StormApplication

SubmitTask.submitApplication -> PhysicalPlanExecutor.execute -> PhysicalPlanExecutor.submit(application.Application) ->
StormApplication.launch -> createTopology 创建拓扑, 对于Storm的程序而言, 构成拓扑的组件包括Spouts和Bolts.
这些数据都来自于Application的输入,输出和功能算子. 由于Storm只有两种组件Spout和Bolt, 所以输入算子归于Spout,输出和功能算子都属于Bolt.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void createSpouts() {
List< ? extends IRichOperator> sources = getInputStreams(); //获得所有源算子信息: OperatorMng.inputs ⬅️
checkInputStreams(sources);
for (IRichOperator input : sources) {
StormSpout spout = new StormSpout(); //将算子设置到为StormSpout中
spout.setOperator(input); //Spout接收数据时,将使用设置的算子开始处理
builder.setSpout(input.getOperatorId(), spout, input.getParallelNumber());
}
}
private void createBolts() {
List<IRichOperator> orderedFunOp = genFunctionOpsOrder(); //获取已经排好序的功能算子,包含output算子 ⬅️
for (IRichOperator operator : orderedFunOp) {
setOperatorGrouping(operator); //设置Bolt的分组策略 ⬅️
}
}

哪些Operator算子会作为Bolt, OutpoutBolt, Spout都是由OperatorMng管理的比如getInputStreams,genFunctionOpsOrder
这样创建的Bolt会直接依赖于对应的Operator, 在处理Bolt时,就不需要再判断是哪一种类型的Operator了.
所以正是由于对算子的种类进行了分离(输入,输出,功能)才使得处理Storm的component时变得容易.
一旦根据算子创建并组织完构成Topology的Spouts和Bolts, 就可以提交拓扑给Storm集群执行了. DONE😄

Bolt Grouping

在开发Storm应用程序时, 一般是在Storm的Topology代码中创建Bolt并直接设置Bolt的分组策略.
假设有这样的Topology, Bolt1输出到Bolt3和Bolt4, Bolt2输出到Bolt3(一个Bolt可以有多个输出,也可以由多个输入).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
             |------ |Bolt4|
|Bolt1| -----|
|------ |Bolt3|
|Bolt2| -----|

//Bolt1有两个输出流, 输出字段都是一样的, 两个输出流的名称stream-id不一样
builder.setBolt("bolt1", new Bolt1(), 2)
declarer.declareStream("streamA", new Fields("f1","f2"))
declarer.declareStream("streamB", new Fields("f1","f2"))
//Bolt2只有一个输出流
builder.setBolt("bolt2", new Bolt2(), 2)
declarer.declareStream("streamC", new Fields("f1"))

//Bolt3接收Bolt1的streamA流使用字段分组, 接收Bolt2的streamC流使用shuffle分组
builder.setBolt("bolt3", new Bolt3(), 4)
.fieldsGrouping("bolt1", "streamA", new Field("f1"))
.shuffleGrouping("bolt2", "streamC")

//Bolt4接收Bolt1的streamB流使用字段分组, 分组字段是Bolt1产生的f2字段.
builder.setBolt("bolt4", new Bolt4(), 3)
.fieldsGrouping("bolt1", "streamB", , new Field("f2"))

setBolt时设置的componentId/operatorId都是自己Bolt, 分组时的componentId则是输入的componentId/operatorId.

通过解析CQL的分组以及算子/组件之间的连接, 现在就不需要在Topology写死了. 因此需要框架能够动态地构建Topology.

为什么IRichOperator的getInputStream()和getOutputStream()表示的是输入流和输出流的名称, 而不是输入流对象和输出流对象(比如算子本身).
这是因为Operator算子会用于Topology的Spout/Bolt, 创建完Spout/Bolt之后, 用于构建Topology其他必要的信息除了分组外,
还有Storm的component-id对应算子的id 和 Storm的stream-id对应算子的输入/输出流名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void setOperatorGrouping(IRichOperator operator) {
BoltDeclarer bolt = createBoltDeclarer(operator);
//一个Bolt可能有多个输入即多个InputStream, 同时输出也可能有多个: 设置不同的Grouping策略
//注意: Bolt设置分组时的componentId是其输入源的ComponentId,而不是自己的componentId, 自己是在builder.setBolt时设置的
for (String strname : operator.getInputStream()) { //strname是当前算子的输入流名称
GroupInfo groupInfo = operator.getGroupInfo().get(strname); //算子的分组信息
setBoltGrouping(bolt, strname, groupInfo);
}
}
private void setBoltGrouping(BoltDeclarer bolt, String strname, GroupInfo groupInfo) {
DistributeType distribute = groupInfo.getDitributeType();
switch (distribute) {
case FIELDS:
Fields fields = new Fields(groupInfo.getFields());
//根据输入流的名称, 获取这个输入流是个什么算子, 为的是获得这个输入算子的operatorId,作为分组策略的第一个参数
IRichOperator operator = getOperatorByOutputStreamName(strname);
//字段分组三个参数分别表示: componentId, streamId, fields. 这里的componentId表示从哪个数据源接入数据,而不是当前算子的operatorId
bolt.fieldsGrouping(operator.getOperatorId(), strname, fields);
break;
//... 其他分组类型 ...
default:
setDefaultBoltGrouping(bolt, strname);
}
}
private void setDefaultBoltGrouping(BoltDeclarer bolt, String strname) {
IRichOperator operator = getOperatorByOutputStreamName(strname);
//shuffle分组两个参数分别表示: 输入流的operatorId/componentId, streamId
bolt.shuffleGrouping(operator.getOperatorId(), strname);
}

Operator算子的id会作为Storm中Spout/Bolt的component-id, 而Operator的输入流/输出流名称是作为Spout/Bolt的stream-id.
component-id只是用于区别不同的组件,或者用于从哪个输入组件获取数据. 而stream-id则可以作为分流/多流/合并流等.

Bolt Creation

createBolts设置Operator的分组策略, 首先创建IRichBolt,并返回Bolt的声明BoltDeclarer,以便后续操作可以在BoltDeclarer继续进行(比如上面的分组策略).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private BoltDeclarer createBoltDeclarer(IRichOperator operator){
IRichBolt bolt;
if ((operator instanceof FunctionOperator) || (operator instanceof FunctionStreamOperator)) {
bolt = createStormBolt(operator);
}else{
bolt = createOutputStormBolt(operator);
}
return builder.setBolt(operator.getOperatorId(), bolt, operator.getParallelNumber());
}
private IRichBolt createOutputStormBolt(IRichOperator f){
StormOutputBolt outputbolt = new StormOutputBolt();
outputbolt.setOperator(f); //类似于StormSpout将算子赋值,真正执行时会使用算子进行操作
return outputbolt;
}

StormSpout,StormBolt,StormOutputBolt都是对Storm的组件的封装. 除了继承各自的IRichSpout和IRichBolt外,还要实现StreamAdapter接口的setOperator方法.
流处理算子适配接口: 依靠这个接口,将流处理的算子注入到具体的Storm的Spout/Bolt中. 创建Bolt为啥不用构造函数一句话的事儿: new StormBolt(operator)

1
2
3
4
5
6
7
8
9
10
11
12
13
public class StormSpout implements IRichSpout, StreamAdapter {
private IRichOperator input;
public void setOperator(IRichOperator operator){
input = operator;
}
}
public class StormOutputBolt implements IRichBolt, StreamAdapter {
private OutputCollector outputCollector;
private OutputOperator output;
public void setOperator(IRichOperator operator) {
this.output = (OutputOperator)operator;
}
}

StormBolt的execute方法

1
2
3
4
5
6
7
8
9
public void execute(Tuple input) {
String sourceStreamName = input.getSourceStreamId(); //获取Tuple的输入流stream-id
List<String> inStreams = functionStream.getInputStream(); //输入流名称列表,因为一个Bolt可以有多个输入流
for (String streamName : inStreams) {
if (!sourceStreamName.equals(streamName)) continue; //只有Tuple的输入流stream-id和IRichOperator的输入流名称相同时,才处理这个Tuple
TupleEvent event = TupleTransform.tupeToEvent(input, functionStream.getInputSchema().get(streamName));
functionStream.execute(streamName, event);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2015-11-25 02:32:39 | INFO  | [main] | start to submit application example | com.huawei.streaming.cql.executor.PhysicalPlanExecutor (PhysicalPlanExecutor.java:201)
2015-11-25 02:32:39 | INFO | [main] | reset submit jar to /private/var/folders/xc/x0b8crk9667ddh1zhfs29_zr0000gn/T/example.5f8ed0baaeb243a49308fd75144cf715.jar | com.huawei.streaming.storm.StormApplication (StormApplication.java:314)
2015-11-25 02:32:39 | INFO | [main] | Using defaults.yaml from resources | backtype.storm.utils.Utils (Utils.java:253)
2015-11-25 02:32:39 | INFO | [main] | The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] | backtype.storm.utils.StormBoundedExponentialBackoffRetry (StormBoundedExponentialBackoffRetry.java:47)
2015-11-25 02:32:39 | INFO | [main] | Using defaults.yaml from resources | backtype.storm.utils.Utils (Utils.java:253)
2015-11-25 02:32:39 | INFO | [main] | GenFunctionOpsOrder enter. | com.huawei.streaming.application.OperatorMng (OperatorMng.java:205)
2015-11-25 02:32:39 | INFO | [main] | Using defaults.yaml from resources | backtype.storm.utils.Utils (Utils.java:253)
2015-11-25 02:32:39 | INFO | [main] | Generated ZooKeeper secret payload for MD5-digest: -5668598407594625313:-5703359794945963404 | backtype.storm.StormSubmitter (StormSubmitter.java:82)
2015-11-25 02:32:39 | INFO | [main] | Uploading topology jar /private/var/folders/xc/x0b8crk9667ddh1zhfs29_zr0000gn/T/example.5f8ed0baaeb243a49308fd75144cf715.jar to assigned location: storm-local/nimbus/inbox/stormjar-a5b95134-e3f2-431d-b675-924d8c468cf3.jar | backtype.storm.StormSubmitter (StormSubmitter.java:371)
2015-11-25 02:32:40 | INFO | [main] | Successfully uploaded topology jar to assigned location: storm-local/nimbus/inbox/stormjar-a5b95134-e3f2-431d-b675-924d8c468cf3.jar | backtype.storm.StormSubmitter (StormSubmitter.java:396)
2015-11-25 02:32:40 | INFO | [main] | Finished submitting topology: example | backtype.storm.StormSubmitter (StormSubmitter.java:248)
2015-11-25 02:32:40 | INFO | [main] | delete user packed jar after submit | com.huawei.streaming.cql.executor.PhysicalPlanExecutor (PhysicalPlanExecutor.java:156)
2015-11-25 02:32:40 | INFO | [main] | unRegister jars from class loader. | com.huawei.streaming.cql.DriverContext (DriverContext.java:427)

文章目录
  1. 1. 前戏: CQL代码结构
  2. 2. 正文: submitApplication
    1. 2.1. api.Application -> application.Application
      1. 2.1.1. IRichOperator
      2. 2.1.2. PhysicalPlanExecutor -> ExecutorPlanGenerator
    2. 2.2. parseSchemas: Schema -> TupleEventType
    3. 2.3. parseOperators: 算子解析
      1. 2.3.1. createOperatorInfos
      2. 2.3.2. combineOperators
  3. 3. StormApplication
    1. 3.1. Bolt Grouping
    2. 3.2. Bolt Creation