StreamCQL源码阅读(3) 拆分组合算子

StreamCQL的算子组成了Application

前戏: buildApplication

上篇在解析Schema的时候分析了CQL中一些常用的Statement syntax和对应的语法/语义解析器结果,
现在继续ApplicationBuilder.buildApplication中parseSchemas的下一步splitOperators.

1
2
3
4
5
6
7
8
9
10
private void buildApplication() {
app = new Application(applicationName);
parseSchemas();
List<SplitContext> splitContexts = splitOperators(); //拆分算子 ⬅️
SplitContext splitContext = combineOperators(splitContexts); //组合算子, 将拆分算子列表转换为只有一个SplitContext ⬅️
changeUnionOperators(splitContext);
changeSchemaAfterAggregate(splitContext);
app.setOperators(splitContext.getOperators()); //拆分结果包含了operatots和transitions
app.setOpTransition(splitContext.getTransitions());
}

combineOperators会创建OperatorCombiner, 并调用combine方法将splitContexts合并起来,终于打印了日志中看到的:combine all split contexts(解析submit之后).
构建Application的主要工作就是Split和Combine,最后将SplitContext的operators和transitions设置到Application对象中,完成应用程序的构建,在这基础上再进行物理优化.

为什么要先拆分, 后面又要再组合?😖 以下面的CQL为例:

1
2
3
4
5
insert into stream s1           INSERT  
select type,count(id) count AGGREGATE
from s0 (id>10)[ROWS 10] FROM: FilterBeforeWindow
having count(id)>10 HAVING: Expression
group by type GROUPBY

首先需要通过之前分析的AnalyzeContext解析出系统中所有的算子.
① INSERT INTO SELECT, 即INSERT语句中包含了SELECT子句. 对于INSERT而言只要确定输出流名称
② SELECT语句则包含比较多的算子, FilterBeforeWindow,Having,GroupBy,COUNT聚合

只要将CQL中的算子都拆分出来, 才能进一步进行整理. 所以拆分算子的工作类似于为每个关键词进行归类.
试想一下: 要整理很多杂乱的东西, 首先对每件物品进行归类, 最后再进行总的汇总😊.

正文: Split and Combine Operators

第一步SplitOperators拆分算子: 创建对应的Splitter,调用其split方法.

1
2
3
4
5
6
7
8
9
10
private List<SplitContext> splitOperators() {
List<SplitContext> splitContexts = Lists.newArrayList();
for (AnalyzeContext pContext : parseContexts) { //parseContexts是语义解析器结果列表
parseAutoCreatePipeStream(splitContexts, pContext); //由于schema推断的存在,中间的流schema没有通过create input语句定义,所以显示的创建一个create input的解析内容
parseSubQueryOperators(splitContexts, pContext); //解析子查询
SplitContext context = OperatorSplitter.split(buildUtils, pContext); //算子拆分 ⬅️
splitContexts.add(context); //每个AnalyzeContext拆分后都对应一个SplitContext
}
return splitContexts;
}

AnalyzeContext是CQL语句的语义解析结果, CQL的上下文信息都封装在语义解析结果里面. 由于语义解析是个比较大的切面,
需要把语义解析结果分成更细粒度, 即算子. 可以认为AnalyzeContext -> SplitContext的转换是将任务更加具体化.

OperatorSplitter的splitters采用static块提前添加了系统中也有的算子拆分类. 结合上面的splitOperators就是一个双层循环了:
针对Application的每一个AnalyzeContext, 判断哪个Splitter可以解析这个AnalyzeContext(每个AnalyzeContext只会对应一个Splitter).

1
2
3
4
5
6
7
8
public static SplitContext split(BuilderUtils buildUtils, AnalyzeContext parseContext) {
for (Splitter splitter : splitters) {
if (splitter.validate(parseContext)) {
return createSplitter(splitter.getClass(), buildUtils).split(parseContext);
}
}
return null;
}

每个具体的Splitter都实现了validate方法根据传入的AnalyzeContext实现类(pContext)用来验证能否进行解析

1
2
3
4
5
6
7
8
9
Splitter                                AnalyzeContext
|-- SelectSplitter |-- SelectAnalyzeContext
|-- DataSourceSplitter |
|-- AggregateSplitter |
|-- JoinSplitter |
|-- InsertSplitter |-- InsertAnalyzeContext >> InsertOnlyAnalyzeContext
|-- SourceOperatorSplitter |-- CreateStreamAnalyzeContext
|-- MultiInsertSplitter |-- MultiInsertStatementAnalyzeContext
|-- UserOperatorSplitter |-- InsertUserOperatorStatementAnalyzeContext

比如AggregateSplitter的validate方法会验证是不是SelectAnalyzeContext.

1
2
3
4
5
6
7
8
public boolean validate(AnalyzeContext parseContext) {
if (!(parseContext instanceof SelectAnalyzeContext)) return false;
SelectAnalyzeContext selectAnalyzeContext = (SelectAnalyzeContext)parseContext;
FromClauseAnalyzeContext clauseContext = selectAnalyzeContext.getFromClauseContext();
if (clauseContext.getJoinexpression() != null) return false;
if (clauseContext.getCombineConditions().size() != 0) return false;
return true; //属于select,然后既不是combine,又不是join,那么就是aggregate
}

普通的select可以看做是aggregate,比如select count(id) from a where就是一种聚合. 因为select子句是count(id)
但是如果是select id from a join b on a.id=b.id因为有join操作就不是aggregate了.
所以可以看到SelectSplitter针对这两种语句分成了AggregateSplitter和JoinSplitter.

只有验证成功,才可以在此Splitter上调用split: 根据AnalyzeContext创建Operator算子, 即根据语义分析结果拆分内容

SourceOperatorSplitter -> Input/Output Operator

SourceOperatorSplitter的split会创建Input和Output算子和临时的pipe算子.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SourceOperatorSplitter implements Splitter {   //源算子拆分,包括输入算子和输出算子
private SplitContext result = new SplitContext();
private CreateStreamAnalyzeContext context; //由validate保证,所以下面的split方法可以强转

public SplitContext split(AnalyzeContext parseContext) {
context = (CreateStreamAnalyzeContext)parseContext; //语义解析结果
setParallelNumber();
addToInput(); //创建输入算子并加入到result中: InputStreamOperator
addToOutput(); //创建输出算子并加入到result中: OutputStreamOperator
addToPipe(); //连接算子: FilterOperator
result.setParseContext(context); //最后都要将AnalyzeContext设置到SplitContext中,说不定后面还是需要它的父亲(context)站出来撑腰呢.
return result;
}
}

在上篇语法结构的C部分,AnalyzeContext.analyze会将语法解析的上下文结果设置到语义解析结果中. 这里创建的算子则进一步依赖于语义解析结果.
所谓任何事物都是可以追朔到源头的: StatementContext->AnalyzeContext->Operator->SplitContext->Application, Operator并不是一步登天,与生俱来的.
可以看到context中的RecordReaderClass,DeserializerClass,ReadWriterProperties,SerDeProperties依次登场并进入到Operator的戏局里.
所以Operator沿袭了祖先的上下文数据, 现在新的世界格局将是以Operator为主角的了, 祖先们就可以隐退江湖了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void addToInput(){
if (context.getDeserializerClassName() != null && context.getSerializerClassName() == null) {
Operator inop = createInputSourceOperator(); //创建算子
if (inputConverter.validate(inop)) {
inop = inputConverter.convert(inop);
}
result.addOperators(inop); //这个很重要,将新创建的算子添加到SplitContext中, 后面才会在设置到Application中 ⬅️
}
}

private InputStreamOperator createInputSourceOperator() {
String operatorName = getOperatorName(context.getRecordReaderClassName(),"Input");
//创建输入流算子
InputStreamOperator op = new InputStreamOperator(buildUtils.getNextOperatorName(operatorName), parallelNumber);
//设置输入算子的属性: 反序列化类, 读取记录类
op.setName(context.getStreamAlias());
op.setDeserializerClassName(context.getDeserializerClassName());
op.setRecordReaderClassName(context.getRecordReaderClassName());
//反序列化类的属性和读取记录的属性, 对应CQL最原始的properties.
op.setArgs(new TreeMap<String, String>());
op.getArgs().putAll(context.getReadWriterProperties());
op.getArgs().putAll(context.getSerDeProperties());
return op;
}

pipe stream算子属于中间算子,本来是不会对应任何算子,只要解析出schema即可的. 但是为了和CQL的整体规则一致,便于后面创建算子之间的连线,
所以这里创建一个空的filter算子,不带任何过滤。 这样,就可以在优化器阶段将这个filter算子优化掉

每个operator的参数用一个Map args来保存, 比如上面输入算子的参数包括了ReadWriterProperties和SerDeProperties.
因为执行器不知道每个operator中到底需要哪些参数,所以只能都放在map中, 由上层客户端进行填充,并在底层运行时检测

InsertSplitter -> Insert Operator

insert into语句的AnalyzeContext包含了outputStreamName和select子句, 输出流可以直接设置到SplitContext结果中. 而select子句需要
再次调用查询相关的Splitter(返回值也是SplitContext), 并将其结果产生的operators和transitions添加到insert的SplitContext中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class InsertSplitter implements Splitter {
private InsertAnalyzeContext context;

public SplitContext split(AnalyzeContext parseContext) {
context = (InsertAnalyzeContext)parseContext;
result.setOutputStreamName(context.getOutputStreamName());
//insert中包含了select, 所以要先创建select算子, 调用SelectSplitter.split
SplitContext selectResult = OperatorSplitter.split(buildUtils, context.getSelectContext());
//将select的结果算子和连接都加入到insert算子中
result.getOperators().addAll(selectResult.getOperators());
result.getTransitions().addAll(selectResult.getTransitions());
result.setParseContext(context);
return result;
}
}

SourceOperatorSplitter和InsertSplitter设置到SplitContext result中的内容并不是一样的. 这是由于不同的AnalyzeContext所存储的
数据也是不同的, 数据源只需要序列化类,读取类等. 而插入语句则有输出流和输入源(select). 所以你吃的是什么草, 挤出来的奶也是不一样的.

SelectSplitter

SelectSplitter包含了DataSource,①Aggregate,②JoinSplitter.针对select语句的拆分以及Schema分为:

1、最一般的select子句。
只有一个schema, 输入和输出都是(同)一个schema, 不论有没有窗口,都必须放在聚合算子(AggregateSplitter)中。
只要有where,就都放在functor算子(表达式)中,在优化器中,再进行调整,可以改为filter或者继承再聚合算子中。

单单一个select为什么要添加聚合算子?
答: 聚合不一定就是group by, 可能是filter过滤,limit限制条数等.
而select后面是可以跟上filter或者limit等. 聚合还可以是count,sum等.

1
2
3
4
                     |SELECT子句
insert into stream s0 select id,name from s0 where id>10
--------------- -----------
Aggregate Functor(expression)

2、Join: 多个Join的schema,一个outputschema. 先查询出多个表所有的列,再在functor算子中进行列过滤。

1
2
3
4
5
6
select s0.id,s1.name from s0 join s1 on s0.id = s1.id
------- -------
s0元数据 s1元数据
⬇️
-----------------------------
Functor列过滤

3、Groupby: 聚合算子
4、orderby: 同一般select子句
5、Join语句中不支持聚合和groupby,至少目前不支持
6、三种过滤
A.窗口之后的过滤:where,放在聚合算子中
B.窗口之前的过滤:filter,前面加一个filter算子, 但是这样就牵扯到schema的变化,这个就麻烦一些了。先解析出所有的列,再进行过滤。
C.聚合之后的过滤:having,放在聚合算子中

1
2
3
4
SELECT type,count(id) FROM s0 where code='test' (id>10)[ROWS 10] HAVING count(id)>100 
---------------- ------- --------------------
A⬇ B⬇️ C⬇️
Aggregate Filter+Agg Aggregate

总结下:
1、聚合算子是必须有的。
2、Orderby必须放在独立sort算子中
3、limit放在output中作为限制,但是目前还不支持
4、一个select语句,无论如何拆分,都只有一个输出schema. (至少目前是这样,后面在优化器中会进行调整,将where中的一些列加入到select中,进行一些列变换)
5、Join时候,先查询该流所有列的Join结果,之后再进行列过滤
6、Sort、Join、Aggregate算子都是按照字段进行分发,其他都是随机分发

抽象类SelectSplitter的splitFromClause交给子类(Join,DataSource,AggregatePlitter)自己去实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public abstract class SelectSplitter implements Splitter {
private SplitContext result = new SplitContext(); //算子的拆分结果, 数据主要由AnalyzeContext而来
private SelectAnalyzeContext selectAnalyzeContext; //AnalyzeContext语义解析结果

public SplitContext split(AnalyzeContext parseContext) {
initParameters(parseContext); //初始化, 由于Select语句包含了很多子句,因此还要在这里定义其他子句的AnalyzeContext.
setParallelNumber();
splitFromClause(); //抽象方法
result.setParseContext(selectAnalyzeContext); //设置到SplitContext结果中
return result;
}

private SelectClauseAnalyzeContext selectClauseContext; //既然要解析Slect语句,就要解析它包含的所有子句! 正如前面的insert也要先获得select!
private FromClauseAnalyzeContext fromClauseContext;
...
private void initParameters(AnalyzeContext parseContext) {
selectAnalyzeContext = (SelectAnalyzeContext)parseContext;
selectClauseContext = selectAnalyzeContext.getSelectClauseContext();
fromClauseContext = selectAnalyzeContext.getFromClauseContext();
...
}
}

关于层级嵌套: 比如insert中嵌套了select. 所以解析insert时要先解析select,再把select设置到insert中.
同样select语句包含了更多的子句,比如select子句,from子句, 所以也要把旗下包含的所有子句都解析完了,自己才是完整的可用的.

AggregateSplitter -> FilterOp + AggregateOp + Transition

AggregateSplitter的父类是SelectSplitter, 而Select包含From子句. From中可以有①FilterOperator: filter before window
流前的过滤: FROM transform (evnetid>10)[range UNBOUNDED]. 其中[]表示window, 而[]前面的()则是filter过滤.
②拆分AggregateOperator, 因为聚合算子可能包括多种聚合操作, 如果存在则都设置到AggregateOperator对应的字段中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
protected void splitFromClause() {
//获取Select中From子句的语义解析结果
FromClauseAnalyzeContext clauseContext = getFromClauseContext(); //定义在父类SelectSplitter中,初始化时由Select获取
String streamName = clauseContext.getInputStreams().get(0);
//① 在Window操作前可以有Filter操作, 拆分出Filter算子, 在父类SelectSplitter中实现
FilterOperator fop = splitFiterBeforeWindow(streamName);
//② 聚合算子
AggregateOperator aggregateOperator = splitAggregateOperator(clauseContext, streamName);
//③ 创建算子之间的连接
OperatorTransition transition = createTransition(fop, aggregateOperator, streamName);

getResult().addOperators(fop); //过滤算子
getResult().addOperators(aggregateOperator); //聚合算子
getResult().addTransitions(transition); //从过滤算子到聚合算子的连线
}
protected FilterOperator splitFiterBeforeWindow(String streamName) {
FromClauseAnalyzeContext clauseContext = getFromClauseContext();
//新创建一个Filter过滤算子, from stream(id>1)[RANGE 10s] 可以在流之后,窗口之前的中间存在Filter过滤: 在进入窗口前过滤
FilterOperator fop = new FilterOperator(buildUtils.getNextOperatorName("Filter"), parallelNumber);
//从From语义解析结果中获取filterBeforeWindow对应当前stream的filter表达式,比如上面的id>1
ExpressionDescribe expression = clauseContext.getFilterBeForeWindow().get(streamName);
fop.setFilterExpression(expression.toString()); //过滤条件表达式,比如id>1
fop.setOutputExpression(createFilterOutputExpression(streamName)); //输出列
return fop;
}
private AggregateOperator splitAggregateOperator(FromClauseAnalyzeContext clauseContext, String streamName) {
//创建新的聚合算子
AggregateOperator aggop = new AggregateOperator(getBuildUtils().getNextOperatorName("Aggregator"), getParallelNumber());
parseWindow(clauseContext, streamName, aggop); //解析窗口. Window其实也是From的一部分,所以需要从From子句中获取Windows设置到aggop里
parseWhere(aggop); //解析过滤: setFilterBeforeAggregate,在聚合之前的过滤.
aggop.setFilterAfterAggregate(parseHaving()); //解析having
aggop.setGroupbyExpression(parseGroupby()); //解析分组
aggop.setOrderBy(parseOrderBy()); //解析排序
aggop.setLimit(parseLimit()); //解析限制
aggop.setOutputExpression(getSelectClauseContext().toString()); //输出表达式: select关键字后面的都是输出
return aggop;
}

AggregateOperator对Select子句的解析最后都是字符串.

aggregation setXXX AnalyzerContext parseXXX
window setWindow FromClauseAnalyzeContext.windows.get(streamName) parseWindow
where setFilterBeforeAggregate FilterClauseAnalzyeContext whereClauseContext parseWhere
having setFilterAfterAggregate FilterClauseAnalzyeContext havingClauseContext parseHaving
group by setGroupbyExpression SelectClauseAnalyzeContext groupbyClauseContext parseGroupby
order by setOrderBy OrderByClauseAnalyzeContext parseOrderBy
limit setLimit LimitClauseAnalzyeContext parseLimit
output exp setOutputExpression SelectClauseAnalyzeContext selectClauseContext getSelectClauseContext

通过上面的AggregateSplitter.split方法,我们知道了聚合算子由FilterBeforeWindow,Window,FilterBeforeAggregate,Having,GroupBy,OrderBy,Limit组成.

语法结构中并没有Aggregate这种类型, 但是我们发现Aggregate用到的这些和Select语句中包含的子句都差不多. 其中两个Window可以认为是From子句.
FilterBeforeAggregate是Where子句, 这样Select语句的所有部分就和Aggregate都吻合了! 所以说Select也是一种Aggregate! (^_^这不是巧合吧)

AggregateOperator

stream-operators

AggregateOperator聚合算子:包含了window,以及window前后的filter操作. 当然还少不了count,sum之类的UDAF函数计算和UDF函数计算(BasicAggFunctionOperator)
AggregateOperator >> BasicAggFunctionOperator >> InnerFunctionOperator 这些类的字段正好对应了聚合算子的所有组成部分.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class AggregateOperator extends BasicAggFunctionOperator {
private Window window; //窗口 ==> From子句
private String filterBeforeAggregate; //filter的过滤条件 ==> Where子句
}

public class BasicAggFunctionOperator extends InnerFunctionOperator {
private String filterAfterAggregate; //聚合类的过滤条件, 这里都是udaf函数, 过滤一定发生在数据聚合之后, 这里的表达式一定使用的是outputSchema中的列名称
private String groupbyExpression; //分组的表达式
private String orderBy; //排序: 允许有多个字段,之间按照逗号分割, 允许出现udf和udaf函数
private Integer limit; //窗口的输出限制
}

public class InnerFunctionOperator extends Operator {
private String outputExpression; //输出的列定义,不光有单纯的列,还有udf以及udaf函数
}

过滤条件是一个字符串形式的逻辑表达式, 允许有and,or以及大括号和udf函数, 但是绝对不允许出现udaf函数,因为这没有聚合操作
过滤发生在数据进入窗口之后,聚合之前. 比如 (a>1 and a <100) or (b is not null) 就是where的过滤,

InnerFunctionOperator功能性算子,主要为系统提供window,join,order by,group by等聚合操作。
这里的名称和operator包中的不一样. 这里定义的这些operator,主要是进行执行计划的序列化和反序列化的。
所有的数据类型全部是字符串类型,之后还要经过语法的解析,物理执行计划的优化之后,才会在application中提交。

Transition

别忘了,还有createTransition创建连线哦: 在AggregateSplitter.splitFromClause中fromOp是FilterBeforeWindow FilterOperator, toOp是AggregateOperator.

算子之间进行连接, 涉及到分组策略, Storm中Bolt可以指定怎么根据输入源进行分组, 即输入源将数据怎么分流到当前Bolt. 分组是有一定依据的,不能胡乱连接.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected OperatorTransition createTransition(Operator fromOp, Operator toOp, String streamName) {
FromClauseAnalyzeContext clauseContext = getFromClauseContext();
DistributeType distype = DistributeType.SHUFFLE;
String disFields = null;
Schema schema = clauseContext.getInputSchemas().get(0);
//如果有GroupBy,分组策略就是字段分组(distype=FIELDS), 比如group by type, 则按照type字段分组(disFields=type).
if (getGroupbyClauseContext() != null) {
disFields = removeDataSourceColumnsFromGroupbyExpression(schema, getGroupbyClauseContext().toString());
distype = DistributeType.FIELDS;
}
//现在是在SelectSpiltter中, 所以可能是Aggregate,Join,DataSource中任意一种.
if (toOp instanceof JoinFunctionOperator) {
List<Schema> inputSchemas = getFromClauseContext().getInputSchemas();
schema = BaseAnalyzer.getSchemaByName(streamName, inputSchemas);
if (((JoinFunctionOperator)toOp).getJoinType() != JoinType.CROSS_JOIN) {
disFields = getJoinExpression(schema);
distype = DistributeType.FIELDS;
}
}
//创建起始算子到结束算子之间的连线, 并确定分组策略,分组字段, 发起连接的算子的schema信息
return new OperatorTransition(buildUtils.getNextStreamName(), fromOp, toOp, distype, disFields, schema);
}

现在似乎要逐步用Storm的例子来理解了,否则算子之间为什么要进行连接? 连接之后就确定了上游算子怎么发送数据到下游算子. 假设Storm有两个Bolt: Bolt1和Bolt2.
Bolt1过滤数据,输出(word,count)两个字段. 为了能使得对Bolt1过滤后的数据进行分流,Bolt2使用Bolt1的word字段进行FieldGrouping. 这样Bolt1相同的word字段
只会到相同的Bolt任务中, 不同的word字段会分发到不同的Bolt2任务. 所以可以把Bolt1看做过滤算子, Bolt2看做是聚合算子, 中间存在分组连接对Bolt1的数据分流.

那么具体下游算子要怎么接收上游算子的数据呢? 不用担心! 这里先只是创建连接,只要有连接,就都好办了,找对关系找对门路是最重要的!

SplitContext

现在不难看出SplitContext的作用是各类语义分析结果拆分内容. 和前面几个XXXContext一样都只是保存数据的介质(还记得StatementContext,AnalyzeContext吗)
常见的CQL语句格式insert into stream s0 select中insert语句确定了outputStreamName, select语句确定算子拆分的结果:operators和transitions.

1
2
3
4
5
6
public class SplitContext {
private List<OperatorTransition> transitions = new ArrayList<OperatorTransition>();
private List<Operator> operators = new ArrayList<Operator>();
private String outputStreamName; //输出的流名称: 指的是在CQL中显示指定输出流名称的。例如insert into 之类的语句
private AnalyzeContext parseContext; //CQL解析结果: 通过这个结果,可以进行多个CQL之间的连接
}

每一条CQL语句都对应一个AnalyzeContext, 每个AnalyzeContext都有一个Splitter用来创建算子. 即使上面我们创建了Transition连接, 但也是在同一个CQL语句内的!
而一个完整的Topology是要求能把多个CQL语句的上下文都串联起来组成一个DAG图的. 所以这就是在SplitContext中保留AnalyzeContext的含义: It’ time to Combine!

OperatorCombiner

现在我们知道为什么要进行合并了,因为如果仅仅是拆分每一条CQL语句, 这样最后都是一段一段的,我们需要把这些一段一段拼接成完整的图.
目前只有两种CQL语句: 一种是流定义create stream语句, 一种是insert into语句. 对于流定义没有算子之间的连接,但是schema还是需要的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public SplitContext combine(List<SplitContext> splitContexts) {
for (int i = 0; i < splitContexts.size(); i++) {
combineSplitContext(splitContexts.get(i));
}
return result;
}
private void combineSplitContext(SplitContext context) {
//原始每一条CQL的算子和连线不能被破坏的! 要重新加入最后的返回结果中.
result.getOperators().addAll(context.getOperators());
result.getTransitions().addAll(context.getTransitions());
//如果是SourceOperatorSplitter对应的CreateStreamAnalyzeContext, 是没有transition的.
if (context.getParseContext() instanceof CreateStreamAnalyzeContext){
addSchemasFromCreateStream(context); //添加到inputStreams,outputStreams,pipeStreams
return;
}
//合并时新添加的只是CQL与CQL之间的连线! 对于算子已经都是完整的了,不会缺胳膊断腿的,不需要再添加. ⬅️
createTransition(context);
}

private void createTransition(SplitContext context) {
//MultiInsertStatementAnalyzeContext
//InsertUserOperatorStatementAnalyzeContext
InsertAnalyzeContext insertContext = (InsertAnalyzeContext)context.getParseContext();
createFromTransition(context, insertContext);
createToTransition(context, insertContext);
}

Splitter中SourceOperatorSplitter是没有operators和transitions, InsertSplitter的算子和连线则依赖于SelectSpitter.
所以如果是CreateStreamAnalyzeContext则不会调用createTransition, 只有三种insert才会调用:Insert,MultiInsert,UDFInsert

将多个算子组合起来, 组建算子之间的上下级关系. 算子之间的连线,有两种来源:
1、算子是由一条CQL语句拆分出多个算子组成,这样,连线就可以在拆分的时候确定。
2、算子是由多条CQL语句组合而来,通过使用insert into select from这样的语句,就可以实现多个算子之间的级联。
甚至可以改变算子之间的连接关系。比如在aggregate算子之前加入union算子, 在aggregate算子之后加入split算子。

为每个insert into select语句解析出来的结果加入上下文连线。
CQL语句之间的连线,必然从inputStream或者PipeStream发起,连接到outputStream或者PipeStream.

input-pipe-output

PipeStream

splitOperators在拆分之前, 会首先解析是否需要创建之前没有声明过的流. 如果不存在,则创建PipeStream.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void parseAutoCreatePipeStream(List<SplitContext> splitContexts, AnalyzeContext pcontext) {
parseAutoCreatePipeStreamForInsert(splitContexts, pcontext);
parseAutoCreatePipeStreamForMultiInsert(splitContexts, pcontext);
parseAutoCreatePipeStreamForUserOperator(splitContexts, pcontext);
}
private void parseAutoCreatePipeStreamForInsert(List<SplitContext> splitContexts, AnalyzeContext pcontext) {
if (pcontext instanceof InsertAnalyzeContext) {
InsertAnalyzeContext ipc = (InsertAnalyzeContext)pcontext;
if (!ipc.isPipeStreamNotCreated()) return; //允许类型不是输入/输出流的流不存在
SplitContext sc = createPipeStreamSplitContext(ipc.getOutputSchema());
splitContexts.add(sc);
}
}
private SplitContext createPipeStreamSplitContext(Schema schema) {
LOG.info("create pipe Stream while stream is not created!");
CreateStreamAnalyzeContext pipe = new CreateStreamAnalyzeContext();
pipe.setSchema(schema);
pipe.setStreamName(schema.getId());
return OperatorSplitter.split(buildUtils, pipe); //输入输出流使用SourceOperatorSplitter拆分算子
}

比如下面的multi insert语句, 其中teststream是事先创建好的, 而s1,s2,s3都没有创建过, 在解析的时候判断这些insert对应的stream没有创建过,
就会创建这些pipe-stream. 因为insert的schema取决于select, 所以这些pipe-stream的schema等于InsertAnalyzeContext的outputSchema.

1
2
3
4
5
FROM teststream
INSERT INTO STREAM s1 SELECT *
INSERT INTO STREAM s2 SELECT a
INSERT INTO STREAM s3 SELECT id, name WHERE id > 10
PRARLLEL 4;

判断PipeStream是否创建过在InsertStatementAnalyzer.analyze中. 只要是insert语句, 如果在系统已有的schemas中不存在, 就设置setPipeStreamNotCreated=true
这样parseAutoCreatePipeStreamForInsert在判断到isPipeStreamNotCreated才会创建createPipeStreamSplitContext. 否则schema已经存在就不再需要创建了.

1
2
3
4
5
if (checkSchemaExists(streamName)) {
context.setOutputSchema(getSchemaByName(streamName));
} else {
context.setPipeStreamNotCreated(true);
}

From & To Transition

首先找到insert into语句中计算出来的连线的起点。找到对应的算子. 然后根据起点的schema名称,找到对应的流名称,创建连线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void createFromTransition(SplitContext context, InsertAnalyzeContext insertContext) {
List<OperatorTransition> startTransitions = context.getFirstTransitons();
for (OperatorTransition transition : startTransitions) {
Operator op = context.getOperatorById(transition.getFromOperatorId());
String startStreamName = transition.getSchemaName();
SplitContext fromContext = getFromSplitContext(startStreamName);
Schema schema = getInputSchema(startStreamName, insertContext);
String nextStreamName = buildUtils.getNextStreamName();

Operator fromOp = fromContext.getLastOperator();
OperatorTransition fromtransition = new OperatorTransition(nextStreamName, fromOp, op, DistributeType.SHUFFLE, null, schema);
result.addTransitions(fromtransition);
}
}
private void createToTransition(SplitContext context, InsertAnalyzeContext insertContext) {
Set<Operator> ops = getLastOperator(context);
for (Operator op : ops) {
String startStreamName = insertContext.getOutputStreamName();
SplitContext toContext = getToSplitContext(startStreamName);
Schema schema = insertContext.getOutputSchema();
String nextStreamName = buildUtils.getNextStreamName();

Operator toOp = toContext.getFirstOperator();
OperatorTransition totransition = new OperatorTransition(nextStreamName, op, toOp, DistributeType.SHUFFLE, null, schema);
result.addTransitions(totransition);
}
}

物理优化

拆分并组合算子后,因为每个算子之间都通过transition来连接(没有通过连线连接的算子是一个孤岛,是不会起作用的),所以可以进一步优化.
changeUnionOperators:union算子替换和changeSchemaAfterAggregate: 替换所有的having和orderby这些在聚合之后的表达式.

1
2
3
4
5
6
7
8
9
public class PhysicOptimizer implements Optimizer {
public Application optimize(Application app) {
new AggregateConverter().optimize(app);
new FilterPruner().optimize(app);
new SameStreamCombiner().optimize(app);
new SameTransitionPruner().optimize(app);
return app;
}
}

文章目录
  1. 1. 前戏: buildApplication
  2. 2. 正文: Split and Combine Operators
    1. 2.1. SourceOperatorSplitter -> Input/Output Operator
    2. 2.2. InsertSplitter -> Insert Operator
    3. 2.3. SelectSplitter
    4. 2.4. AggregateSplitter -> FilterOp + AggregateOp + Transition
      1. 2.4.1. AggregateOperator
      2. 2.4.2. Transition
      3. 2.4.3. SplitContext
    5. 2.5. OperatorCombiner
      1. 2.5.1. PipeStream
      2. 2.5.2. From & To Transition
    6. 2.6. 物理优化