StreamCQL源码阅读(1) 提交任务

大数据上的流式SQL引擎—StreamCQL: http://www.csdn.net/article/2015-11-13/2826204
http://www.csdn.net/article/2015-12-22/2826541

Introduce

CQL(Continuous Query Language),持续查询语言,用于数据流上的查询语言。相对于传统的SQL,CQL加入了窗口的概念,使得数据可以一直保存在内存中,由此可以快速进行大量内存计算,CQL的输出结果为数据流在某一时刻的计算结果。

CQL是建立在Storm基础上的类SQL查询语言,它解决了Storm原生API使用复杂,上手难度高,很多基本功能缺失的问题,提升了流处理产品的易用性。

在CQL设计语法之初,通过参考市面上现有的CEP产品的语法,发现这些产品都不算是全部的SQL语句,即仅仅使用SQL语句还不能运行,还必须依靠一些客户端的代码。 这样就给使用带来了一些不便, 用户必须学习客户端API,比较繁琐,上手难度也比较大。

所以,CQL设计目标就是,用纯粹的SQL语句再加上一些命令,就可以完成所有的任务发布以及执行,这样,就可以通过SQL接口,直接进行任务的下发,统一了客户端接口。对于有一定SQL基础的用户,只需要掌握一些CQL比较特殊的语法,比如窗口或者流定义的语法,就可以写出可运行的CQL程序,大大降低了上手难度。

关键概念

Stream(流):流是一组(无穷)元素的集合,流上的每个元素都属于同一个schema;每个元素都和逻辑时间有关;即流包含了元组和时间的双重属性。留上的任何一个元素,都可以用Element<tuple,Time>的方式来表示,tuple是元组,包含了数据结构和数据内容,Time就是该数据的逻辑时间。

Window(窗口):窗口是流处理中解决事件的无边界(unbounded)及流动性的一种重要手段,把事件流在某一时刻变成静态的视图,以便进行类似数据库表的各种查询操作。在stream上可以定义window,窗口有两种类型,时间窗口(time-based)和记录窗口(row-based)。两种窗口都支持两种模式,滑动(slide)和跳动(tumble)。

Expression(表达式):符号和运算符的一种组合,CQL解析引擎处理该组合以获取单个值。简单表达式可以是常量、变量或者函数,可以用运算符将两个或者多个简单表达式联合起来构成更复杂的表达式。

QuickStart

1.startup zk and storm local

1
2
3
4
5
zkServer.sh start

nohup bin/storm nimbus &
nohup bin/storm ui &
nohup bin/storm supervisor &

2.build and run cql client

1
2
3
4
5
6
cd StreamCQL
mvn clean install
cd cql-binary/target
tar xvf stream-cql-bianry-1.0.tar.gz
cd stream-cql-bianry-1.0
bin/cql

3.create first topology:

1
2
3
4
5
6
7
8
9
10
11
CREATE INPUT STREAM s(id INT, name STRING, type INT)
SOURCE randomgen PROPERTIES ( timeUnit = "SECONDS", period = "1", eventNumPerperiod = "1", isSchedule = "true" );

CREATE OUTPUT STREAM rs(type INT, cc INT)
SINK consoleOutput;

INSERT INTO STREAM rs SELECT type, COUNT(id) as cc
FROM s[RANGE 20 SECONDS BATCH]
WHERE id > 5 GROUP BY type;

SUBMIT APPLICATION example;

输入流: 随机数,每秒生成一个事件
输出流: 控制台, 每隔20秒输出一次, 只统计id>5,根据type分组,求和
提交应用程序, 相当于创建了一个Storm的Topology.

4.A complicate topology:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE INPUT STREAM s1(name STRING)
SOURCE RANDOMGEN PROPERTIES ( timeUnit = "SECONDS", period = "1", eventNumPerperiod = "1", isSchedule = "true" );

CREATE OUTPUT STREAM s2(c1 STRING)
SINK kafakOutput PROPERTIES ( topic = "cqlOut", zookeepers = "127.0.0.1:2181", brokers = "127.0.0.1:9092" );

CREATE INPUT STREAM s3( c1 STRING)
SOURCE KafkaInput PROPERTIES (groupid = "cqlClient", topic = "cqlInput", zookeepers = "127.0.0.1:2181", brokers = "127.0.0.1:9092" )

CREATE OUTPUT STREAM s4(c1 STRING)
SINK consoleOutput;

INSERT INTO STREAM s2 SELECT * FROM s1;
INSERT INTO STREAM s4 SELECT * FROM s3;

SUBMIT APPLICATION cql_kafka_example;

输入流s1 发射数据 到kafka输出流s2
kafka输入流 发射数据 到控制台输出流s4

5.查看拓扑: http://localhost:8080

Architecture

StreamCQL的代码由三部分组成: cql,streaming,adapter分别对应下面的三个组件.

stream-arch

客户端提交的CQL语句会由执行计划生成器ExecutorPlanGenerator生成可运行的任务,最终由Storm适配器组装Topology提交执行.

StreamCQL对应的Storm拓扑:

stream-storm

至少有一个输入和输出. Component之间可以组合比如Select,Join等.

Window example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
--按照type对窗口内数据进行分组,每组容量为10
SELECT * FROM transformEvent[ROWS 10 SLIDE PARTITION BY TYPE];

--时间排序窗,一般用来解决数据乱序问题
SELECT * FROM transformEvent[RANGE 1000 MILLISECONDS SORT BY dte];

--事件驱动时间滑动窗
INSERT INTO STREAM rs sum(OrderPrice),avg(OrderPrice),count(OrderPrice)
FROM transformEvent[RANGE 10 SECONDS SLIDE TRIGGER by TS EXCLUDE now];

--保存周期为一个自然天的分组窗
INSERT INTO STREAM rs select id,name,count(id)
FROM transformEvent[RANGE TODAY ts PARTITION BY TYPE]
WHERE id > 5 GROUP BY TYPE HAVING id > 10;

Split example:

1
2
3
4
5
FROM teststream
INSERT INTO STREAM s1 SELECT *
INSERT INTO STREAM s2 SELECT a
INSERT INTO STREAM s3 SELECT id, name WHERE id > 10
PRARLLEL 4;

stream-split

原始的spout输入流会分成三个输出流. 所以中间用一个SplitBolt来作为中间介质.

From log to see StreamCQL

熟悉一个开源框架的流程, 可以先跑一个测试例子, 查看打印的日志信息, 通过日志的顺序, 可以大致熟悉整体的流程.
当然要求框架本身的日志信息足够明了, StreamCQL做的不错. 这种方式的优点是不至于不知道要从哪里看起来.

CQLClient to Driver

bin/cql会开启一个CQLClient客户端, 当输入;表示一个语句的终结时,就会触发一次CQL语句的编译执行等.

Driver.run是CQL的运行起点

  • 1、编译
  • 2、语义分析
  • 3、命令执行
  • 4、返回结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run(String cql) {
ParseContext parseContext = parser.parse(cql); //CQL解析 ⬅️
saveAllChangeableCQLs(cql, parseContext);
preDriverRun(context, parseContext);
executeTask(parseContext); //执行任务
postDriverRun(context, parseContext);
}

private void executeTask(ParseContext parseContext) {
mergeConfs();
Task task = TaskFactory.createTask(context, parseContext, config, analyzeHooks);
task.execute(parseContext); //执行任务 ⬅️
context.setQueryResult(task.getResult()); //返回结果(查询性的命令比如show,get会有结果,其他没有结果)
}

初始的语法解析类是ApplicationParser. parse方法采用visitor访问者模式遍历CQL语句. 感兴趣的可以进入CQLParser查看具体的解析过程.

1
2
3
4
5
6
IParser (com.huawei.streaming.cql.semanticanalyzer.parser)
|-- OrderbyClauseParser
|-- GroupbyClauseParser
|-- ApplicationParser ⬅️
|-- SelectClauseParser
|-- DataSourceArgumentsParser

ParseContext的实现类很多,基本上CQL语法的每一部分都会对应一个语法解析器.

1
2
3
4
5
6
7
8
ParseContext (com.huawei.streaming.cql.semanticanalyzer.parser.context)
|-- CreateStreamStatementContext 语句
|-- CreatePipeStatementContext
|-- CreateInputStatementContext
|-- CreateOutputStatementContext
|-- FromClauseContext 子句
|-- RangeWindowContext 窗口
|-- ...

Input,Output,Insert

ApplicationParser.parse返回的ParseContext具体针对特定的CQL语句返回的是什么类型? 这个类型对于创建什么类型的任务非常重要.
因为这个是创建一个新的Stream, 所以ParseContext是CreateInputStatementContext.

1
2
3
4
5
6
7
2015-11-25 02:32:19 | INFO  | [main] | start to parse cql : CREATE INPUT STREAM s
(id INT, name STRING, type INT)
SOURCE randomgen
PROPERTIES ( timeUnit = "SECONDS", period = "1",
eventNumPerperiod = "1", isSchedule = "true" ) | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:44)
2015-11-25 02:32:19 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:69)
2015-11-25 02:32:19 | INFO | [main] | start to execute CREATE INPUT STREAM s (id INT, name STRING, type INT) SOURCE randomgen PROPERTIES ( 'timeUnit' = 'SECONDS', 'period' = '1', 'eventNumPerperiod' = '1', 'isSchedule' = 'true' ) | com.huawei.streaming.cql.tasks.LazyTask (LazyTask.java:62)

CreateStreamStatementContext.createTask创建的是LazyTask. 它的execute方法只是把当前ParseContext加入到DriverContext的parseContexts中.

1
2
3
public void execute(ParseContext parseContext) {
context.addParseContext(parseContext);
}

同样输出流经过ApplicationParser.parse返回的是CreateOutputStatementContext,它也继承了CreateStreamStatementContext.

1
2
3
4
5
2015-11-25 02:32:20 | INFO  | [main] | start to parse cql : CREATE OUTPUT STREAM rs
(type INT, cc INT)
SINK consoleOutput | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:44)
2015-11-25 02:32:20 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:69)
2015-11-25 02:32:20 | INFO | [main] | start to execute CREATE OUTPUT STREAM rs (type INT, cc INT) SINK consoleOutput | com.huawei.streaming.cql.tasks.LazyTask (LazyTask.java:62)

insert语句是InsertStatementContext, 输入输出插入这些都是延迟执行的任务,并不需要立即执行,因为需要根据上下文构造一个完整的DAG拓扑图.

1
2
3
4
5
2015-11-25 02:32:20 | INFO  | [main] | start to parse cql : INSERT INTO STREAM rs SELECT type, COUNT(id) as cc
FROM s[RANGE 20 SECONDS BATCH]
WHERE id > 5 GROUP BY type | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:44)
2015-11-25 02:32:20 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:69)
2015-11-25 02:32:20 | INFO | [main] | start to execute INSERT INTO STREAM rs SELECT type, count(id) AS cc FROM s[RANGE 20 SECONDS BATCH] WHERE id > 5 GROUP BY type | com.huawei.streaming.cql.tasks.LazyTask (LazyTask.java:62)

Submit

提交应用程序,经过parse返回的是SubmitApplicationContext,创建的Task是SubmitTask.

1
2
3
2015-11-25 02:32:23 | INFO  | [main] | start to parse cql : SUBMIT APPLICATION example | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:44)
2015-11-25 02:32:23 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:69)
2015-11-25 02:32:24 | INFO | [main] | combine all split contexts | com.huawei.streaming.cql.builder.operatorsplitter.OperatorSplitter (OperatorCombiner.java:101)

Task & SemanticAnalyzer

stream-createTask

可以看到对于前面的输入流,输出流,insert语句,并没有对应的Task实现类,所以它们都使用LazyTask.

Driver.executeTask会根据ParseContext具体的实现类由TaskFactory创建对应的Task. ParseContext抽象类除了创建Task,还会创建SemanticAnalyzer

1
2
3
4
5
//创建对应语句的执行task
public abstract Task createTask(DriverContext driverContext, List<SemanticAnalyzeHook> analyzeHooks) throws CQLException;

//创建语义分析执行解析器
public abstract SemanticAnalyzer createAnalyzer() throws SemanticAnalyzerException;

比如SubmitApplicationContext创建的分析器是SubmitApplicationAnalyzer. CreateStreamStatementContext也是个抽象类,
有三个子类CreateInputStatementContext,CreateOutputStatementContext,CreatePipeStatementContext,它们创建的分析器分别是:
CreateInputStreamAnalyzer, CreateOutputStreamAnalyzer, CreatePipeStreamAnalyzer 它们都继承了CreateStreamAnalyzer.

1
2
3
4
5
6
7
8
9
10
11
ParseContext ------|--SubmitApplicationContext
|--CreateStreamStatementContext--|
|---CreateInputStatementContext
|---CreateOutputStatementContext
|---CreatePipeStatementContext

SemanticAnalyzer --|--SubmitApplicationAnalyzer
|--CreateStreamAnalyzer ---------|
|---CreateInputStreamAnalyzer
|---CreateOutputStreamAnalyzer
|---CreatePipeStreamAnalyzer

SemanticAnalyzer的创建方式和创建Task一样都是使用工厂类SemanticAnalyzerFactory. 在创建完之后都调用了init初始化.

1
2
3
4
5
6
7
8
9
10
public static Task createTask(DriverContext driverContext, ParseContext parseContext, StreamingConfig config, List<SemanticAnalyzeHook> analyzeHooks) {
Task task = parseContext.createTask(driverContext, analyzeHooks); // 创建Task ⬅️
task.init(driverContext, config, analyzeHooks);
return task;
}
public static SemanticAnalyzer createAnalyzer(ParseContext parseContext, List<Schema> schemas) {
SemanticAnalyzer analyzer = parseContext.createAnalyzer(); // 创建语义解析器 ⬅️
analyzer.init(schemas);
return analyzer;
}

SubmitTask

parseSubmit

SubmitTask执行应用程序提交的execute方法和前面的LazyTask有点复杂, 因为它要把前面创建的LazyTask都组合起来,组成一个完整的应用程序.

1
2
3
4
5
6
7
8
9
10
11
public void execute(ParseContext parseContext) {   //这里的parseContext是SubmitApplicationContext
parseSubmit(parseContext); //解析 ⬅️
createApplication(); //创建应用程序
dropApplicationIfAllow(); //如果允许的话先删除应用程序
submitApplication(); //提交应用程序
}
//解析Submit,创建对应的语义解析器:SubmitApplicationAnalyzer
private void parseSubmit(ParseContext parseContext) {
SemanticAnalyzer analyzer = SemanticAnalyzerFactory.createAnalyzer(parseContext, EMPTY_SCHEMAS);
submitContext = (SubmitApplicationAnalyzeContext)analyzer.analyze(); // 创建完语义解析器, 就要进行语义解析 ⬅️
}

这里面几个对象的创建关系是(Parser语法解析器->Context->Analyzer语义分析器->AnalyzerContext):

1
2
3
4
ApplicationParser.parse() --> SubmitApplicationContext : ParseContext
|--createTask: SubmitTask
|--createAnalyzer: SubmitApplicationAnalyzer
|--createAnalyzeContext: SubmitApplicationAnalyzeContext

createApplication

创建Application,如果有路径的话,直接加载物理执行计划,否则创建一个API用的Application并设置到DriverContext中.

1
2
3
4
5
6
7
8
9
10
11
private Application createAPIApplication(String appName) {
Application app = null;
if (context.getApp() == null) { //创建Application
semanticAnalyzerLazyContexts(); //准备analyzeContexts ⬅️
app = new ApplicationBuilder().build(appName, analyzeContexts, context);
} else {
app = context.getApp();
}
app.setApplicationId(appName);
return app;
}

创建APIApplication, 记得前面的那些LazyTask吗, 都要用语义分析分析一遍,对应的AnalyzeContext会被ApplicationBuilder用到.
因为LazyTask的execute方法只是简单地把当前的ParseContext实现类加入到DriverContext中,所以下面的for循环能从DriverContext获取出所有ParseContext.

1
2
3
4
5
6
7
8
9
private void semanticAnalyzerLazyContexts() {
for (ParseContext parseContext : context.getParseContexts()) {
preAnalyze(context, parseContext);
SemanticAnalyzer analyzer = SemanticAnalyzerFactory.createAnalyzer(parseContext, context.getSchemas());
AnalyzeContext analyzeContext = analyzer.analyze();
postAnalyze(context, analyzeContext, parseContext);
analyzeContexts.add(analyzeContext); //将各自的分析器分析的结果AnalyzeContext实现类加入到analyzeContexts
}
}

像SubmitTask一样都要先创建SemanticAnalyzer,然后调用analyze方法, 这些没有调用的方法都要调用. 验证了那句话:人在江湖漂,哪有不挨刀.该来的总是会来的.

1
2
3
4
ApplicationParser.parse() --> CreateInputStatementContext|CreateOutputStatementContext : ParseContext
|--createTask: LazyTask
|--createAnalyzer: CreateInputStreamAnalyzer|CreateOutputStreamAnalyzer
|--createAnalyzeContext: CreateStreamAnalyzeContext

注意Input,Output的StatementContext,SemanticAnalyzer都有各自的实现类,并都继承了Stream的相关父类,但是AnalyzeContext没有各自的实现类,都是一样的了.

ApplicationBuilder

ApplicationBuilder的构建需要每个分析器分析的结果AnalyzeContext(parseContexts): 专门用来完成从多个解析内容到应用程序的转换
buildApplication()将整个应用程序的构建分成: 1、各个算子的构建; 2、将完成拆分的应用程序解析成为Application

1
2
3
4
5
6
7
8
9
10
public Application build(String appName, List<AnalyzeContext> parContexts, DriverContext driverContext) {
this.applicationName = appName; //应用程序名称
this.parseContexts = parContexts; //一系列CQL语句的解析结果

executeLogicOptimizer(); //在构建应用程序之前,要先执行逻辑优化器. 目前貌似还没实现.
buildApplication(); //构建应用程序
executePhysicOptimizer(); //在构建应用程序之后,要先执行物理优化器
parseDriverContext(driverContext); //将DriverContext的的值设置到Application中,比如UserConf,UserFile,UDF
return app; //构建完成的应用程序
}

逻辑计划包含的功能:
1、SQL语句的重写,比如将where中的聚合filter调整到having中等等
2、count(a+b),count(*),count(a) 的优化,全部改成count(1)
3、Join的调整,将不等值Join改为Innerjoin
4、将where条件中的等值表达式提升到On上面去。

物理优化器的优化内容:
1、OrderBy优化,实现sorted-merge排序。
2、limit优化,上一个算子中加入limit。
3、算子替换,将功能比较简单的算子,替换为Filter算子或者functor算子
4、移除无意义的filter算子

逻辑计划和物理计划中间的步骤是构建Application. 在这里才开始new一个Application.

1
2
3
4
5
6
7
8
9
10
private void buildApplication() {
app = new Application(applicationName); //创建Application
parseSchemas(); //设置所有AnalyzeContext的CreatedSchemas到app的schemas
List<SplitContext> splitContexts = splitOperators(); //拆分算子
SplitContext splitContext = combineOperators(splitContexts); //合并算子
changeUnionOperators(splitContext);
changeSchemaAfterAggregate(splitContext);
app.setOperators(splitContext.getOperators()); //将拆分|合并算子的operators和transitions设置到Application里
app.setOpTransition(splitContext.getTransitions());
}

预告: SemanticAnalyzer.analyze()和具体的CQL语法相关,下一篇我们就来看看CQL的语法和语义解析是怎么工作的.


文章目录
  1. 1. Introduce
  2. 2. QuickStart
  3. 3. Architecture
  4. 4. From log to see StreamCQL
    1. 4.1. CQLClient to Driver
    2. 4.2. Input,Output,Insert
    3. 4.3. Submit
      1. 4.3.1. Task & SemanticAnalyzer
      2. 4.3.2. SubmitTask
        1. 4.3.2.1. parseSubmit
        2. 4.3.2.2. createApplication
      3. 4.3.3. ApplicationBuilder