Apache Drill源码阅读(2) 分析一次查询过程以及RPC

大数据源码阅读系列之ApacheDrill: Apache Drill的RPC流程

一次Query的生命周期

Foreman线程的run方法中的queryRequest是org.apache.drill.exec.proto.UserProtos的RunQuery
可以认为就是用户输入的查询语句,只不过由于是分布式,客户端输入的查询,会通过RPC在Foreman上执行
protobuffer文件的定义在drill-protocol/src/main/protobuf下,比如User.proto对应了UserProtos

关键看下run()上面的注释. 什么时候被调用: 在查询建立起来的时候
以什么样的方式调用: 执行线程池. 功能是什么: 完成远程执行
注意这个方法的结束并不代表查询生命周期的Foreman角色的结束.

Called by execution pool to do query setup, and kick off remote execution.
Note that completion of this function is not the end of the Foreman’s role in the query’s lifecycle.

https://tnachen.wordpress.com/2013/11/05/lifetime-of-a-query-in-drill-alpha-release/
http://yangyoupeng-cn-fujitsu-com.iteye.com/blog/1974556

Client

The SELECT statement in particular is querying a HDFS data file, with a specific WHERE clause filtering based on the expression clause.

From the client, it submits this statement into Sqlline, which is a simple Java-based console that is able to talk to a JDBC driver, passes the SELECT statement into Optiq.

Optiq is a library that Drill utilizes for query parsing and planning, which it provides pluggable transformation rules that can map a SQL token into any specific object you want. Optiq also embeds a cost-based query optimizer, which we utilize for it for choosing the best order of SQL operators in the statement without any other statistics. We implemented custom Optiq rules that maps specific SQL operators (WHERE, LIMIT, etc) and each rule converts its operator into our specific Logical Operator syntax that Drill understands.

This collection of Logical operators with its own configurations forms a Drill Logical plan. Drill’s logical plan sole purpose is to describe logically what work Drill needs to perform to produce the Query results, without any optimizations or distribution.

Once the client generates this logical plan, it looks up one of the DrillBit host/port information and passes the logical plan to that DrillBit.

Drill logical plan的唯一的目标就是Drill的数据流的工作流程,而没有做任何的优化,和分布式计算的分发等工作
一旦client产生了logical plan,那么他会查询其中一个已经配置好的DrillBit的host/port的信息,
然后将logical plan传递给DrillBit(这个接收查询的DrillBit就是Foreman)

Running Logical Plan

Any DrillBit in the cluster can initiate a query, and that DrillBit becomes the Drill process that is responsible for sending back the results back to the client.

在集群中任何一个DrillBit都能运行一个查询,而执行查询的DrillBit要负责将查询结果返回给client

Once the UserServer that is listening to the User submission gets the logical plan, it passes it through Foreman that is responsible for turning this plan into an actual execution plan and submits the plan information for execution.

UserServer会监听客户端提交的查询任务,一旦获取到逻辑计划,它会把逻辑计划传给Foreman.
Foreman会调优该plan,并且转换为实际执行的计划,并提交该计划的信息用于后面的执行.

Inside of Foreman, the first operation it does is to transform the logical plan into a physical plan via Drill’s Optimizer. Drill’s current version of Optimizer is very basic, which simply transforms each logical operator directly into one or more phyiscal operators without much optimization rules looking into the association of other operators.

The physical plan is simple a DAG of physical operators, and each child/parent relationship implies how data flows through the graph. As we are inspired by Google’s Dremel paper, the take away we saw that implemented which is a MPP style multi-level execution tree, where in this execution tree each node represents a different DrillBit process and they each depend on each other results for computation.

物理计划是physical operators的有向无环图.每一子节点或者父节点之间的关系都指明了数据如何在DAG图中流动
在这个执行树中,每一个节点都代表一个不同的DrillBit计算过程,他们相互依赖彼此的计算结果

As we want to break this physical plan into a multi-level execution tree that involves multiple DrillBits, we first need to collect information about each physical operator. Each physical operator with the given operator configuration can return estimated stats about Network/Cpu/Memory/Disk and Row size and count. It also can return a list of preferred DrillBits that it wants to get executed on, which we call endpoint affinities.

将物理计划分解成多个DrillBits参与的多层级的执行树,首先要搜集每一个physical operators的信息
根据给定的操作符的配置信息,每个physical operators会返回预估的统计信息,RowSize行的大小,Count数量
它也能够返回一个将要执行该operator的DrilBit列表,称作距离最近/最优的端点

类似HDFS中的读操作,读取HDFS块时,NN会返回这个块的DN列表,客户端读取离自己最近的DN的数据块

One example Endpoint affinity is where a Parquet Scan opreator will want to initiate this query closet to where the Parquet file is stored, and it can look up the Parquet file’s Metadata information that stores the HDFS data node host and return that as a preferred endpoint if we have a DrillBit running there.

比如一个Parquet扫描操作符会在离保存着Parquet文件最近的DrillBit上面发起查询
他可以查询Parquet文件的元数据信息: 元数据保存了HDFS的DN节点,并返回一个最优的endpoint

Parquet文件是类似JSON那样有者self-describe格式的文件,即文件本身含有schema,尽管schema是free的.
由于Parquet保存在HDFS上,HDFS上的文件是有副本的. 而Scan操作符是要访问文件的,
所以Scan操作符会选择离自己这个操作符最近的DN上的Parquet文件副本时,是最优的情况.
当然对于最优的端点的前提是这个节点安装了DrillBit服务. 因为Drill是操作符的载体.

也就是说,集群的DrillBit服务可以执行一个物理计划分解出来的physical operators
physical operators可以被集群的多个Drillbit执行.
通常DrillBit计算节点上也运行着DN这样的数据存储节点,而操作符需要存储的数据资源
所以操作符会选择离存储资源最近的Drillbit,这样的Drillbit是最优的endpoint.

With the physical plan, all the stats and endpoint affinities, the Parallellizer in Foreman transforms this plan into a number of fragments. Each fragment is simply a self contained Physical plan that is designed to run on each DrillBit node. In any given Physical plan, there will be only one Root Fragment that is going to run in the initiating DrillBit, possibly one or more Leaf fragments and possibly one or more intermediate fragments.

有了物理计划,所有的统计信息,最优端点,Foreman中的Parallellizer会将物理计划转换为多个fragments.
每一个Fragment自身也是一个物理计划, 它们同样会被分配到DrillBit节点上面运行.
任何一个物理计划(经过Foreman转换后的每一个Fragment)只有一个RootFragment,多个中间或Leaf Fragment.

Running Fragments

The Root fragment will be submitted to the Worker manager in its current DrillBit, the intermediate fragments will be stored in the HazelCast distributed cache, and all the leaf fragments will be sent directly to the DrillBits assigned via our BitCom through our RPC layer with Protobuf messages.

Rootfragment会被提交给它所在的当前DrillBit上的WorkerManager.中间fragment保存在Hazelcast分布式缓存,
所有的leaf fragment会直接通过BitCom(RPC层次的东西,协议是Protobuf)发送给其他DrillBits

在WEB页面可以看到的是Major和MinorFragment.那么这里的Root,Intermediate,Leaf Fragment是怎么YY出来的?

The Worker Manager once receives this Root Fragment starts running this plan, which always contains a Screen Operator that blocks and wait for Records to be returned. If a plan also has multiple Drillbits involved, then it will also contain a exchange operator that sets up a Receiver that waits for incoming Records from the wire.

Worker Manager一旦接受到Root Fragment就会运行这个plan,并且总是会包含一个Screen Operator,用来阻塞并且等待返回的数据.
如果该plan需要另外多个DrillBit参与,这些DrillBit组成一个wire,Worker Manager也同时会包含一个exchange operator,该exchange operator启动了一个Receiver用以等待wire中的数据

wire类似HDFS中DN组成的pipeline.当客户端写数据时,参与存储的DN形成一个管道,数据包在管道中流动.
只有所有的DN节点返回ack应答给客户端时,才认为数据写入成功. 这里参与计算的DrillBit节点也一样.
Exchange操作符类似于客户端,只有wire中的DrillBit数据获取完毕,返回给Receiver,才认为计算完成.

The Leaf fragments that are sent via the wire will be executed as soon as they arrive. The fragment will be parsed into a DAG with Physical operators, and setups the execution/data flow. Each Physical operator imposes a Pull style messaging, where starting from the bottom of the tree it pulls it’s parent for Records and the parent will return an Outcome status. The operators is designed to handle each possible outcome status, which can be STOP, OK, OK_WITH_NEW_SCHEMA, NONE. Since Drill supports flexible schema, which in other words can tolerate schema changes in the same dataset, the operators needs to handle what happens when there is a new schema for this set of records. Seeing the benefits of columnar storage:http://the-paper-trail.org/blog/columnar-storage/. Drill also implemented its own in memory columnar format which we called ValueVectors. A ValueVector is simply a collection of bytes that represent a run of values within the same column. Each pull of messages in each Physical operator returns a RecordBatch that can contain one or more ValueVectors per column with schema.

通过wire被发送的叶子Fragment一旦到达目的DrillBit就会被立即执行.叶子Fragment会被解析为由物理操作符组成的DAG.
每一个物理操作符都会利用一个Pull类型的消息机制,从树的底部开始,operator会从他的parent operator中拉取Records,
而他的parent operator则返回一个Outcome status消息. operators被设计为能处理每一种可能的结果状态.
因为Drill支持动态schema,也就是说Drill允许在同一个数据集中schema发生变化,所以Drill要能够处理当schema发生变化的情况

Drill同时实现了他自己的列式内存数据结构:ValueVector,它是一个bytes集合,代表了一个column内的数据.
在每一个Physical operator pull的消息中会返回一个RecordBatch: 对于每个列都会包含一个或者多个ValueVector

At the very tip of the Leaf fragment in my slideshow example is the Scan operator, which is configured to look up a Parquet file and run it through the Parquet Storage Engine. The Storage engine is simply responsible for pulling in data from the data source, and converting them into ValueVectors and passes that back to its child as a RecordBatch.

从数据源中拉取数据,把数据转换为ValueVectors,然后将这些ValueVector作为RecordBatch传递回他的child

FragmentTree从底到上, 底部是Parent, 越往上就是Child. child会拉取parent的记录.
而从上到下来看,Fragment分解为RootFragment->Intermediate->LeafFragment.
这似乎有点矛盾,leaf是parent,往上则是child. 而最上面又是root fragment.

扫描操作符的步骤:
1.Leaf Fragment拉取数据源的数据
2.将数据转换为ValueVectors
3.组装成RecordBatch
4.传递给它的孩子,即上一层

Eventually the Leaf fragment will take this batch, and through the Sender operator send it to the Intermediate DrillBit.

最终,所有的Leaf fragment将会接管这些batch数据,通过Sender operator发送给中间DrillBit
对于数据源,它们只暴露给物理计划形成的所有Leaf Fragment.这些Leaf Fragment负责读取数据.

The Intermediate DrillBit once receives a RecordBatch for the first time, will lookup the fragment from HazelCast by its fragment id from RecordBatch, and setup the Receiver and Physical opreators necessary for processing in this DrillBit.

中间fragment一旦第一次接受到一个RecordBatch,会从HazleCast中通过RecordBatch中保留的fragment id
查询相应的fragment,并且设置Receiver以及必要的physical operator来继续在DrillBit中进行处理计算

The intermediate fragment contains a Filtering operator, and inside this operator once it receives a RecordBatch it looks up the new schema and passes that to our CodeGeneration with the specified filter expression and type information, and generate a specific code that does the filtering. This is designed to avoid casting, run tight loops and leverage prefetching as we can eliminate function calls. This is a very common approach looking at the Hive’s new vectorized query engine via Stinger initiative or in Impala.

中间Fragment包含一个Filtering operator,在这个Filtering operator内部,一旦他接收到一个RecordBatch,他就会查找新的schema,并且将schema传递给CodeGeneration,同时还会传递一个特殊定义的filter expression,type information,借此产生一段特殊的code来完成filter 操作。通过设计成避免casting,运行轻量级的loop,以及进行prefetching,来减少方法的调用,这种方式在Hive的新vectoried query engine(通过Stinger initiative)以及impala中很普遍

The intermediate fragment eventually streams a batch at a time as soon as it is available to the Root DrillBit, which the Screen operator receives and returns it to the Client.

中间fragment最终会议batch为单元,一次发送一个batch给Root DrillBit(就是Foreman),
在Root DrillBit中会由Screen operator来接收相关数据,并且返回给client(接收查询的也负责返回查询结果 )

DrillClient that receives the RecordBatch, simply transforms our own columnar ValueVectors into Rows and shows the result to the client.

DrillClient接收RecordBatch,简单将ValueVector转换成Rows并且显示给client

This is overall what the flow looks like in our Drill alpha release, and we will be continuing to validate this architecture through diagnostic and benchmarking, and also provide more operators and storage engine so we can do much more interesting data analysis.

Physical Operator

前面有逻辑操作符LogicalOperator接口,同样也有物理操作符PhysicalOperator接口

我们先看下HasAffinity有最优节点,方法getOperatorAffinity返回最优的EndPoint列表
描述了一个物理操作符对一些特定的DrillBit节点有亲和性的, 用于分配决策.

1
2
3
4
5
6
7
8
//Describes a physical operator that has affinity to particular nodes. Used for assignment decisions.
public interface HasAffinity extends PhysicalOperator {
public List<EndpointAffinity> getOperatorAffinity();
}

public class EndpointAffinity {
private final DrillbitEndpoint endpoint;
private double affinity = 0.0d;

EndpointAffinity captures affinity value for a given single Drillbit endpoint.
EndpointAffinity有DrillbitEndpoint的引用, 注释中提到affinity value,所以是不是够亲和是可以计算出来的.
初始时这个值是0,调用addAffinity()可以给Drillbit endpoint添加一个亲和力的值.

DrillBit Endpoint对象被定义为protobuf,在Coordination.proto中:

1
2
3
4
5
6
7
message DrillbitEndpoint{
optional string address = 1;
optional int32 user_port = 2;
optional int32 control_port = 3;
optional int32 data_port = 4;
optional Roles roles = 5;
}

Drillbit可以认为是Drill的计算节点. 在bin下的drillbit.sh start启动一个Drill服务.

LogicalPlan有一定的格式:head,storageengine,query. 同样PyhsicalPlan也有,它们的head是相同的.
PhysicalPlan的构造函数是一系列的物理操作符,而LogicalPlan的构造函数是逻辑操作符集合.目的都是构造一张DAG图.

1
2
3
4
5
6
7
8
9
10
11
12
@JsonPropertyOrder({ "head", "graph" })
public class PhysicalPlan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlan.class);

PlanProperties properties;
Graph<PhysicalOperator, Root, Leaf> graph;

@JsonCreator
public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List<PhysicalOperator> operators){
this.properties = properties;
this.graph = Graph.newGraph(operators, Root.class, Leaf.class);
}

查询过程分析

在官方的设计文档中http://drill.apache.org/docs/rpc-overview/对RPC只是寥寥数语,还有待补充.

客户端提交查询

我们根据上面的Query流程一步步分析, 首先是客户端提交一个查询, 经过Optiq生成逻辑计划后会交给DrillClient运行:

1
2
3
4
5
6
7
8
9
10
11
/**
* Submits a Logical plan for direct execution (bypasses parsing) 提交一个逻辑计划,直接执行
* @param plan the plan to execute
* @return a handle for the query result
*/
public List<QueryDataBatch> runQuery(QueryType type, String plan) throws RpcException {
UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build();
ListHoldingResultsListener listener = new ListHoldingResultsListener(query);
client.submitQuery(listener, query); //这个client是UserClient,而不是DrillClient了
return listener.getResults();
}

逻辑计划封装成RunQuery协议,监听器ListHoldingResultsListener用于当获取到结果后通知客户端可以获取数据了.
监听器用Future来封装查询的结果集合,如果结果还没有出来,调用future.get()会被阻塞,这是Java的Future语法本身的特性.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private class ListHoldingResultsListener implements UserResultsListener {
private Vector<QueryDataBatch> results = new Vector<>();
private SettableFuture<List<QueryDataBatch>> future = SettableFuture.create();
private UserProtos.RunQuery query ; //提交失败时, 在重新连接后, 由客户端重新连接

public void queryCompleted(QueryState state) {
future.set(results);
}
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
results.add(result);
}
public List<QueryDataBatch> getResults() throws RpcException{
return future.get();
}
}

UserClient作为用户的客户端,会向DrillBit发送一个RUN_QUERY的请求, 发送内容在RunQuery query对象里.

1
2
3
4
5
6
public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> {
private final QueryResultHandler queryResultHandler = new QueryResultHandler();

public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
}

connection对象是客户端建立的到服务器端的连接, 在UserClient的父类BasicClient的构造方法中创建的.
这里用的是Netty, 客户端在创建过程还绑定了多个Handler: 协议的编解码,消息的编解码,InboundHandler到达处理器,客户端握手Handler.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends RpcBus<T, R> {
private final Bootstrap b;
protected R connection;
public BasicClient(...) {
b = new Bootstrap()
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
connection = initRemoteConnection(ch);
ch.closeFuture().addListener(getCloseHandler(ch, connection));
final ChannelPipeline pipe = ch.pipeline();
pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator()));
pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName()));
pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName()));
pipe.addLast("handshake-handler", new ClientHandshakeHandler());
pipe.addLast("message-handler", new InboundHandler(connection));
pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
}
});
}

send()调用最终会调用RpcBus的同名send方法, 第一个参数listener是Rpc的输出监听器(相对Income到达)
其中发送内容RunQuery query是protobufBody, 最后一个参数dataBodies是可选的.

1
2
3
4
5
6
7
8
9
10
11
12
// The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a system.
public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> implements Closeable {
protected final CoordinationQueue queue = new CoordinationQueue(16, 16);

public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) {
ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz, connection);
OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
ChannelFuture channelFuture = connection.getChannel().writeAndFlush(m);
channelFuture.addListener(futureListener);
channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
}

客户端发起的一次查询是怎么提交到服务端上执行: connection.getChannel().writeAndFlush(m)
客户端通过connection的通道往服务端写入一个Rpc消息, Rpc消息分为到达Inboud和输出Outbound.
OutboundRpcMessage含有protobuf协议体,以及数据部分. 协议本身只是定义了数据的格式. 真正的数据也要一起发送出去.
虽然这里是客户端的查询请求, dataBodies本身是没有值的,因为在一开始调用的时候就没有传入这个参数.

到此为止, 客户端发起的一次查询请求已经完成了, 接下去的流程交给服务端处理这个请求了.
这里用到一个futureListener, 是为了能够监听服务器端是否已经把数据准备好了.
这里的queue会将CoordinationId和RpcListener放到map中, 并在接收到数据后从map中移除.

1
2
3
4
5
6
public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection) {
int i = circularInt.getNext();
RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
Object old = map.put(i, future);
return future;
}

服务端处理Query请求

服务端的启动方式和客户端一样都是通过Netty. 并且都注册了一个InboundHandler.
因为在上一步客户端发送REQUET请求, 所以服务器的InboundHandler能够接收到这个请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
private final C connection;
public InboundHandler(C connection) {
this.connection = connection;
}

protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output) throws Exception {
final Channel channel = connection.getChannel();
final Stopwatch watch = new Stopwatch().start();

switch (msg.mode) {
case REQUEST: {
// handle message and ack.
ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
break;
}

这里InboundHandler继承的是Netty的MessageToMessageDecoder抽象类,所以要在decode中重写接收的逻辑
如果是继承Netty的ChannelInboundHandlerAdapter, 则重写的方法是channelRead, 后面这种在netty的helloworld中比较常见.
为什么需要ResponseSender, 因为服务端接收客户端的查询请求, 在获取到结果后, 要负责将结果response发送send给客户端才算完成.

1
2
3
4
5
protected void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException{
sender.send(handle(connection, rpcType, pBody, dBody));
}

protected abstract Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;

handle是个抽象方法, 这里因为在Server中了, 所以选择UserServer的实现方法. 客户端传入的rpcType=RUN_QUERY等于下面的RUN_QUERY_VALUE

1
2
3
4
5
6
7
8
9
public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnection> {
final UserWorker worker;

protected Response handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody){
switch (rpcType) {
case RpcType.RUN_QUERY_VALUE:
final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
final QueryId queryId = worker.submitWork(connection, query);
return new Response(RpcType.QUERY_HANDLE, queryId);

DrillClient将查询交给UserClient, UserServer则将具体的查询工作交给了UserWorker, 它的返回值也是一个QueryId协议.
最终的返回值是Response, 对应了RpcBus的sender.send(handle(…)) –> sender.send(Response)

注意服务端接受到查询请求RUN_QUERY后, 交给worker处理, worker会立即返回这个查询对应的QueryId. 因此也不是立即返回结果的
看下服务端的ResponseSender的实现类, 定义在RpcBus中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private class ResponseSenderImpl implements ResponseSender {
RemoteConnection connection;
int coordinationId;

public ResponseSenderImpl(RemoteConnection connection, int coordinationId) {
super();
this.connection = connection;
this.coordinationId = coordinationId;
}

public void send(Response r) {
OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, coordinationId, r.pBody, r.dBodies);
connection.getChannel().writeAndFlush(outMessage);
}
}

注意这里的coordinationId是客户端放入queue队列中futureListener的一个编号. 而服务端返回的QueryId是r.pBody.
服务端也将OutboundRpcMessage通过connection通道写回去. 即写到客户端去, 因为服务端并没有将查询结果立即计算出来,
所以需要将QueryId返回给客户端, 并在适当的时候如果服务端获取到结果会通知客户端.

服务端发送的是Response, 所以现在服务端的流程已经走完了(虽然worker还没有完成,但是对于一次RPC来说是完成了), 轮到客户端接收响应.

客户端获取查询结果

还是在RpcBus的InboundHandler中. 只不过这次是客户端接受到服务端发送的响应请求:

1
2
3
4
5
6
7
case RESPONSE:
MessageLite m = getResponseDefaultInstance(msg.rpcType); //这里是QueryId
RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass()); //对应一开始的queue.get(...)
Parser<?> parser = m.getParserForType();
Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes())); //pBody只是协议格式
rpcFuture.set(value, msg.dBody); //dBody才是数据内容
break;

从队列中获取Future, 最后调用future的set方法, 将数据设置到value中, 这样future.get()就能获取到value.

1
2
3
4
public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
RpcOutcome<?> rpc = map.remove(coordinationId);
return (RpcOutcome<V>) rpc;
}

Listener & Handler

Step Operation
DrillClient.runQuery: ListHoldingResultsListener
UserClient.submitQuery: 通过QueryResultHandler将ListHoldingResultsListener>>UserResultsListener封装为SubmissionListener>>RpcOutcomeListener
RpcBus.send: 将RpcOutcomeListener封装为ChannelListenerWithCoordinationId<>RpcOutcome
InboundHandler.RESPONSE: 从队列中获取RpcOutcome rpcFuture, 设置数据到value
RpcListener.set: 调用RpcOutcomeListener.success, 即SubmissionListener.success: 添加UserResultsListener负责监听结果
ListHoldingResultsListener.getResults: 从Future中获取结果
Object Why
ListHoldingResultsListener 持有结果集的监听器, 所以负责最终结果的获取,当然获取操作不在这里, 它只管取数据
SubmissionListener 提交监听器, 作业提交后, 我负责执行, 但是执行的动作并不在这里哦
UserResultsListener 用户的结果集监听器, 这里应该是负责结果的产生, 不过它是个接口
RpcListener RPC监听器,要和具体本次查询的coordinationId关联起来

用户的结果集监听器的方法表示了查询的一个过程:
1.QueryId到达, 由服务端产生QueryId
2.数据到达, 并被监听器成功接收到
3.查询完毕, 监听器不会再收到任何数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface UserResultsListener {
/**
* QueryId is available. Called when a query is successfully submitted to the server.
* @param queryId sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK Acks.OK}
*/
void queryIdArrived(QueryId queryId);

void submissionFailed(UserException ex);

/**
* A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message was received
* @param result data batch received
*/
void dataArrived(QueryDataBatch result, ConnectionThrottle throttle);

/**
* The query has completed (successsful completion or cancellation). The listener will not receive any other
* data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED)
*/
void queryCompleted(QueryState state);
}

文章目录
  1. 1. 一次Query的生命周期
    1. 1.1. Client
    2. 1.2. Running Logical Plan
    3. 1.3. Running Fragments
  2. 2. Physical Operator
  3. 3. 查询过程分析
    1. 3.1. 客户端提交查询
    2. 3.2. 服务端处理Query请求
    3. 3.3. 客户端获取查询结果
    4. 3.4. Listener & Handler