StreamCQL源码阅读(2) 语法和语义解析

StreamCQL的Schema以及语法和语义解析

前戏: SemanticAnalyzer

客户端提交的CQL语句经过Application.parse返回StatementContext(语法解析结果),再进入语义解析.
如果是SubmitTask会触发之前的LazyTask链条依次进行语义解析. 语义解析结果AnalyzeContext最后用于构建Application.
语义解析跟CQL语法相关, 不过首先我们来看下构建Application的第一步是解析schemas.

正文一: Schema

parseSchemas解析Schema: 循环每个AnalyzeContext的getCreatedSchemas,最后设置为Application的schemas.

1
2
3
4
5
6
7
private void parseSchemas() {
List<Schema> schemas = Lists.newArrayList();
for (AnalyzeContext context : parseContexts) {
schemas.addAll(context.getCreatedSchemas());
}
app.setSchemas(schemas); //设置到Application里 ⬅️
}

create stream schema

那么每个AnalyzeContext的CreatedSchemas是在什么时候设置进来的? 以AnalyzeContext的一个实现类CreateStreamAnalyzeContext为例:

1
2
3
4
5
6
CreateStreamAnalyzeContext.setSchema(Schema)  (com.huawei.streaming.cql.semanticanalyzer.analyzecontext)
|--CreatePipeStreamAnalyzer.analyze()
|--ApplicationBuilder.createPipeStreamSplitContext(Schema) (com.huawei.streaming.cql.builder)
|--CreateOutputStreamAnalyzer.analyze()
|--FromClauseAnalyzer.createNewStreamContext(String, Schema)
|--CreateInputStreamAnalyzer.analyze() ⬅️

AnalyzeContext的setSchema方法会被对应的SemanticAnalyzer实现类的analyze调用,比如CreateInputStreamAnalyzer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public AnalyzeContext analyze() {
//createInputStreamParseContext这个是ParseContext语法解析上下文对应的是CreateInputStatementContext
String streamName = createInputStreamParseContext.getStreamName();
ColumnNameTypeListContext columns = createInputStreamParseContext.getColumns(); //列的信息在语法内容中

//下面的AnalyzeContext就是CreateStreamAnalyzeContext
getAnalyzeContext().setStreamName(streamName);
getAnalyzeContext().setSchema(createSchema(streamName, columns)); //根据streamName和columns创建Schema

setSerDeDefine(); //序列化反序列化定义, 包括设置类setSerDeClass和属性setSerDeProperties
setSourceDefine(); //数据源定义
setParallelNumber(); //并行度
return getAnalyzeContext(); //返回语义分析的结果,所以上面几个set动作其实都是往AnalyzeContext设置内容的.
}

从RDBMS的角度来理解Schema: 如果把streamName看做table, schema就表示表元数据: 表是由列组成的.
streamName和columns信息最初都是存放在语法内容的上下文中ParseContext, 而这里的语义分析上下文AnalyzeContext需要根据语法创建对应的表结构.
创建表元数据createSchema很简单了,就是new一个Schema,并将columns的每一列创建一个Column对象,加入到Schema中.
现在我们知道了Schema的来龙去脉了. 其实语义分析的基础是语法内容, 根据语法产生的数据,构建语义需要的数据结构.

以设置序列化反序列化方式为例:setSerDeDefine()->setSerDeClass+setSerDeProperties, 只不过把语法解析的内容取出来设置到语义解析结果中.

1
2
3
4
5
6
7
8
9
10
private void setSerDeClass() {
//1.首先从语法解析结果中获取出反序列化类
ClassNameContext deserClassName = createInputStreamParseContext.getDeserClassName();
setSerDeByCQL(deserClassName);
}
private void setSerDeByCQL(ClassNameContext deserClassName) {
String newDeserClassName = deserClassName.getClassName();
//2.然后设置到语义解析结果中
getAnalyzeContext().setDeserializerClassName(newDeserClassName);
}

能不能把语法和语义的这两个上下文结果数据合并在一起? 作者在注释中也提到了这个问题. 如果修改了语法,对应的语义也要一起修改(SemanticAnalyzerFactory).

insert & select schema

对于insert的Schemas, 来自于select, 而select又会再次调用from的getCreatedSchemas,下面是三个AnalyzeContext获取创建的Schemas:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public List<Schema> getCreatedSchemas() {                   // InsertAnalyzeContext
List<Schema> schemas = Lists.newArrayList();
schemas.addAll(selectContext.getCreatedSchemas()); // 1.Insert使用了Select ⬅️
schemas.addAll(super.getCreatedSchemas());
return schemas;
}
public List<Schema> getCreatedSchemas() { // SelectAnalyzeContext
List<Schema> schemas = Lists.newArrayList();
schemas.addAll(fromClauseContext.getCreatedSchemas()); // 2.Select使用了From的 ⬅️
schemas.addAll(super.getCreatedSchemas());
return schemas;
}
public List<Schema> getCreatedSchemas() { // FromClauseAnalyzeContext
List<Schema> schemas = new ArrayList<Schema>();
for (Entry<String, InsertAnalyzeContext> et : subQueryForStream.entrySet()) {
String streamName = et.getKey();
for (int i = 0; i < inputSchemas.size(); i++) { // 4.最终都是来源于这里的
if (streamName.equals(inputSchemas.get(i).getStreamName())) {
schemas.add(inputSchemas.get(i));
}
}
InsertAnalyzeContext sp = et.getValue();
schemas.addAll(sp.getCreatedSchemas()); // 3.我勒个去,又回到了Insert ⬅️
}
return schemas;
}

对于其他的AnalyzeContext, getCreatedSchemas一般是没有的. 表结构实际上只有输入输出流和insert/select语句才有.

正文二: CQL Statement syntax

下面结合官方的CQL语法文档配合上对应的StatementContext,AnalyzeContext一起分析常用的几种语法结构:

Create Input Stream Statement syntax

1
2
3
4
5
CREATE INPUT STREAM streamName 
columnList [streamComment] ①数据列
[serdeClause] ②反序列化
sourceClause ③数据读取/数据源
[parallelClause]; ④可选的算子并行度

Create Input Stream 语句定义了输入流的①数据列名称,③数据读取方式和②数据反序列化方式
SERDE 定义了数据的反序列化方式,即如何将从inputStream中读取的数据解析成对应的完整的流的Schema。
系统配置了默认的序列化和反序列化类,当使用系统默认反序列化类时,SERDE子句可以省略,同时,SERDE 的相关配置属性也可以省略。
SOURCE 定义了数据从什么地方读取,比如从MQ消息队列中读取或者文件等其他方式读取。SOURCE 语句一定不能省略。
ParallelClause 语句指定了该输入算子的并发数量。

使用系统内置反序列化类读取算子示例

1
2
3
4
5
CREATE INPUT STREAM example
(eventId INT, eventDesc STRING)
SERDE simpleSerDe PROPERTIES (separator = "|")
SOURCE TCPClientInput PROPERTIES (server = "127.0.0.1", port = "9999")
Parallel 2;

输入流语句的Context即输入流的语法解析内容对应了语法结构的组成部分.

1
2
3
4
5
6
7
public class CreateInputStatementContext extends CreateStreamStatementContext {
private ClassNameContext deserClassName; //② 反序列化类: SERDE
private StreamPropertiesContext deserProperties; // 反序列化属性
private ClassNameContext sourceClassName; //③ 数据源类: SOURCE
private StreamPropertiesContext sourceProperties; // 数据源属性
private ParallelClauseContext parallelNumber; //④ 并行度: PARALLEL
}

streamName和columns是在父类CreateStreamStatementContext中定义的.
CreateInputStreamAnalyzer语义分析的结果已经在上面分析过了. 接下来我们看看insert和select语法,
分别按照A.语法结构定义(statement syntax), B.语法解析结果(statement context), C.语义解析(analyze) D.语义解析结果(analyze context).
最终结果都是为了得到语义解析结果. 一般都是获取语法解析的结果(StatementContext)经过计算最后设置到语义解析结果中(AnalyzeContext).

Insert Statement syntax

A.Insert包含三种语法结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--1.Single Insert
INSERT INTO STREAM transformTemp SELECT * FROM transform;

--2.不包含子查询的 MultiInsert
FROM teststream
INSERT INTO STREAM s1 SELECT *
INSERT INTO STREAM s2 SELECT a
INSERT INTO STREAM s3 SELECT id, name WHERE id > 10
Parallel 4;

--3.包含子查询的 MultiInsert
FROM
(
SELECT count(id) as id, 'sss' as name
FROM testStream(id >5 )[RANGE 1 SECONDS SLIDE] GROUP BY ss
)
INSERT INTO STREAM s1 SELECT *
INSERT INTO STREAM s2 SELECT a
INSERT INTO STREAM s3 SELECT id,name WHERE id > 10;

可以将数据导入一个未定义的流中,但是如果有要将多个流的数据导入一个新流,这么几个导入语句生成的schema列名称和列类型必须相同。
允许将select结果导入到一个不存在的流中,只要这个流不是输入和输出流,系统就会自动创建该流。

MultiInsert 语句一般用来进行单流数据的分割,它可以将一个流的数据,按照不同的处理规则,在处理完毕之后,发送至指定流。从而达到单流变多流的目的。
MultiInsert 语句只有一个From子句,该子句中,只允许进行简单流的定义,不允许出现窗口等复杂语法。

B.InsertStatementContext对应第一种的语法解析结果(没有并行度), MultiInsertStatementContext对应2.3两种的多级插入语法解析结果(有并行度).

1
2
3
4
5
6
7
8
9
10
public class InsertStatementContext extends ParseContext {
private String streamName; //insert into stream s1的streamName是s1
private SelectStatementContext select; //select * from ...
}

public class MultiInsertStatementContext extends ParseContext {
private FromClauseContext from; //FROM ...
private List<MultiInsertContext> inserts; //insert into ... insert into ...
private ParallelClauseContext parallel;
}

C.insert语句的语义解析(multi insert的类似:解析from以及多条insert语句):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class InsertStatementAnalyzer extends BaseAnalyzer {
private InsertAnalyzeContext context = null;
private InsertStatementContext insertContext;

public AnalyzeContext analyze() {
String streamName = insertContext.getStreamName(); //insertContext: InsertStatementContext
context.setOutputStreamName(streamName);
if (checkSchemaExists(streamName)) {
context.setOutputSchema(getSchemaByName(streamName));
} else {
context.setPipeStreamNotCreated(true);
}
analyzeSelectStatement(); //分析Select语句
createOutputStream(); //创建输出流
return context; //返回的是语义分析的结果AnalyzeContext: InsertAnalyzeContext
}
}

D.语义解析结果AnalyzeContext: 对于第一种的insert语句(单条insert),包含了select子句.
InsertStatementContext 语法解析结果中含有 SelectStatementContext select.
InsertAnalyzeContext 语义解析结果中也含有 SelectAnalyzeContext selectContext.

1
2
3
4
5
6
7
8
9
10
11
12
private void analyzeSelectStatement() {
//insertContext.getSelect()返回的是InsertStatementContext中的SelectStatementContext, 对应分析器就是SelectStatementAnalyzer
SemanticAnalyzer selectAnalzyer = SemanticAnalyzerFactory.createAnalyzer(insertContext.getSelect(), getAllSchemas());
if (selectAnalzyer instanceof SelectStatementAnalyzer) {
//创建查询语句的语义分析器, 设置查询的输出流名称为当前insert语句的输出流名称.
//比如insert into stream s1 select ... 表示查询语句的输出是s1.
((SelectStatementAnalyzer)selectAnalzyer).setOutputStreamName(context.getOutputStreamName());
}
//InsertAnalyzeContext context是insert语句的语义解析结果.
//还是要对查询分析器SelectStatementAnalyzer经过分析analyze得到查询语义解析结果SelectAnalyzeContext
context.setSelectContext((SelectAnalyzeContext)selectAnalzyer.analyze());
}

insert中包含了select语句, 所以在语义解析insert语句时, 要首先对其包含的select进行语义解析.

1
2
3
4
5
6
7
8
9
    SelectStatementContext select -----------------------------
| ⬇️
InsertStatementContext insertContext ⬇️ insertContext.getSelect() = SelectStatementContext
| ⬇️ 根据ParaseContext创建对应的SemanticAnalyzer
InsertStatementAnalyzer.analyze() ⬇️ 再调用语义解析器的analyze方法进行语义解析,返回值为语义解析结果AnalyzeContext
|--analyzeSelectStatement() -- SelectStatementAnalyzer.analyze()
|--createOutputStream() |
|--InsertAnalyzeContext <-----------------------------|--SelectAnalyzeContext
context context.setSelectContext

创建输出流, select的schema也就是insert的schema.

1
2
3
4
5
6
7
8
private void createOutputStream() {
//select的Schema中应该已经包含了完整的元数据,比如columns, 这里创建的输出流只是更改了输出流的名称.
Schema outputSchema = context.getSelectContext().getSelectClauseContext().getOutputSchema();
outputSchema.setId(context.getOutputStreamName());
outputSchema.setName(context.getOutputStreamName());
outputSchema.setStreamName(context.getOutputStreamName());
context.setOutputSchema(outputSchema);
}

Select Statement syntax

A.查询语句的语法结构:

1
2
3
4
5
6
7
8
SelectClause 
FromClause
[WhereClause]
[GroupByClause]
[HavingClause]
[OrderbyClause]
[LimitClause]
[ParallelClause];

Select语句(Statement)中的每个子句(Clause)都有自己的语法结构,因此都有自己的语法解析和语义解析.

B.Select语句包含了多个子句, 比如select from table. 则`select 对应select子句,from table`对应from子句.

1
2
3
4
5
6
7
8
9
10
public class SelectStatementContext extends ParseContext {  //②
private SelectClauseContext select;
private FromClauseContext from; //③
private WhereClauseContext where;
private GroupbyClauseContext groupby;
private HavingClauseContext having;
private OrderbyClauseContext orderby;
private LimitClauseContext limit;
private ParallelClauseContext parallel;
}

C.查询语句的语义分析(from子句的解析要优先于select子句).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SelectStatementAnalyzer extends BaseAnalyzer {
private SelectAnalyzeContext context; //① StatementAnalyzer的analyze结果为AnalyzerContext
private SelectStatementContext selectContext; //② 语义解析StatementAnalyzer依赖于语法解析StatementContext

public SelectAnalyzeContext analyze() {
fromAnalyzer(); //③
selectAnalyzer();
resetOutputColumnTypes();
whereAnalyzer();
groupbyAnalyzer();
havingAnalyzer();
orderByAnalyzer();
limitAnalyzer();
parallelAnalyzer();
dataSourceQueryArgumentsAnalyzer();
return context; //①
}
}

D.查询的语义解析结果SelectAnalyzeContext(和SelectWithOutFromAnalyzeContext)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
一次查询: 带有from的比如select * from table
SelectAnalyzeContext ---------------|
| |--FromClauseAnalyzeContext fromClauseContext
| |--ParallelClauseAnalyzeContext parallelClauseContext
| |++SelectStatementContext context //②
|
多次查询: 不带from的,比如前面multi insert,对同一个流查询多次
SelectWithOutFromAnalyzeContext ----|
|--SelectClauseAnalyzeContext selectClauseContext
|--FilterClauseAnalzyeContext whereClauseContext
|--SelectClauseAnalyzeContext groupbyClauseContext
|--OrderByClauseAnalyzeContext orderbyClauseContext
|--FilterClauseAnalzyeContext havingClauseContext
|--LimitClauseAnalzyeContext limitClauseContext
|++MultiSelectContext context

下面的context是SelectAnalyzeContext, 不同解析器解析的结果也是AnalyzeContext,分别设置到SelectAnalyzeContext对应的字段中.

1
2
3
4
5
6
7
8
private void fromAnalyzer() {                                         //    ②         ③
SemanticAnalyzer analyzer = SemanticAnalyzerFactory.createAnalyzer(selectContext.getFrom(), getAllSchemas());
context.setFromClauseContext((FromClauseAnalyzeContext)analyzer.analyze());
} //① ③
private void groupbyAnalyzer() {
SemanticAnalyzer analyzer = SemanticAnalyzerFactory.createAnalyzer(selectContext.getGroupby(), getInputSchemas());
context.setGroupbyClauseContext((SelectClauseAnalyzeContext)analyzer.analyze());
}

语法解析Parser - 语法解析结果ParseContext语义解析SemanticAnalyzer语义解析结果AnalyzeContext

stream-select

DataSource Statement syntax

数据源和 From 子句一起结合使用,dataSourceBody 语法在 from 字句中也进行了定义

数据源的查询参数分为查询Schema定义(SCHEMA)和数据源查询(QUERY)两个部分。
Schema 的定义同 Create Input Stream 中的 schema 定义,主要用来指定数据源查询结果 的列数量、名称、类型,便于进行下一步处理。

RDB 数据读取,支持多行数据读取,同时支持 CQL UDF 以及窗口和聚合运算
QUERY 内部的参数顺序固定,不同的数据源,有不同的参数。
RDB 的 SQL 中,如果不包含 Where,就会一次查询出多行记录。和原始流做了 Join 之后,最终输出多条结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
--数据源定义
CREATE DATASOURCE rdbdatasource #dataSourceName
SOURCE RDBDataSource #className
PROPERTIES ( #datasourceProperties
url = "jdbc:postgresql://127.0.0.1:1521/streaming",
username = "55B5B07CF57318642D38F0CEE0666D26",
password = "55B5B07CF57318642D38F0CEE0666D26" );

--数据源查询
insert into rs select rdb.id,s.id,count(rdb.id),sum(s.id)
from S[rows 10 slide], #普通的流可以和DataSource进行join
DATASOURCE rdbdatasource #dataSourceName, 前面定义好的数据源
[ #dataSourceBody
SCHEMA (id int,name String,type int), #dataSourceSchema
QUERY ("select rid as id,rname,rtype from rdbtable where id = ? ", s.id) #dataSourceQuery
] rdb #sourceAlias
where rdb.name like '%hdd%' #dataSourceArguments
group by rdb.id,s.id;

原始流s的id会作为查询条件,传入数据源的SQL查询语句中, 同时数据源本身也有自己的查询条件.

CreateDataSource相关的语法解析,语义解析和CreateStreamStatement类似,就不分析了.


文章目录
  1. 1. 前戏: SemanticAnalyzer
  2. 2. 正文一: Schema
    1. 2.1. create stream schema
    2. 2.2. insert & select schema
  3. 3. 正文二: CQL Statement syntax
    1. 3.1. Create Input Stream Statement syntax
    2. 3.2. Insert Statement syntax
    3. 3.3. Select Statement syntax
    4. 3.4. DataSource Statement syntax