Cassandra源码分析-Network

Cassandra-2.2 源码分析:Netty客户端/服务端、请求处理、消息服务

CassandraDaemon

启动日志,代表了各个组件的启动顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
INFO  [main] 2016-10-11 15:39:47,410 ColumnFamilyStore.java:382 - Initializing system.sstable_activity
INFO [main] 2016-10-11 15:39:48,950 CacheService.java:111 - Initializing key cache with capacity of 49 MBs.
INFO [main] 2016-10-11 15:39:48,967 CacheService.java:133 - Initializing row cache with capacity of 0 MBs
INFO [main] 2016-10-11 15:39:48,972 CacheService.java:162 - Initializing counter cache with capacity of 24 MBs
INFO [main] 2016-10-11 15:39:48,974 CacheService.java:173 - Scheduling counter cache save to every 7200 seconds (going to save all keys).
INFO [main] 2016-10-11 15:39:49,080 ColumnFamilyStore.java:382 - Initializing system.hints
INFO [main] 2016-10-11 15:39:49,089 ColumnFamilyStore.java:382 - Initializing system.......
INFO [main] 2016-10-11 15:39:51,302 ColumnFamilyStore.java:382 - Initializing demo.test
INFO [main] 2016-10-11 15:39:51,716 Index.java:93 - Initializing Lucene index
INFO [main] 2016-10-11 15:39:52,405 Index.java:101 - Initialized index demo.test.idx
INFO [main] 2016-10-11 15:39:52,413 ColumnFamilyStore.java:382 - Initializing demo.tweets
INFO [main] 2016-10-11 15:39:52,419 AutoSavingCache.java:163 - Completed loading (1 ms; 21 keys) KeyCache cache
INFO [main] 2016-10-11 15:39:52,497 CommitLog.java:168 - Replaying bin/../data/commitlog/CommitLog-5-1474959171115.log, ....
INFO [main] 2016-10-11 15:39:52,739 CommitLog.java:170 - Log replay complete, 135 replayed mutations

INFO [main] 2016-10-11 15:39:52,969 StorageService.java:600 - Cassandra version: 2.2.6
INFO [main] 2016-10-11 15:39:52,969 StorageService.java:601 - Thrift API version: 20.1.0
INFO [main] 2016-10-11 15:39:52,969 StorageService.java:602 - CQL supported versions: 3.3.1 (default: 3.3.1)
INFO [main] 2016-10-11 15:39:53,010 IndexSummaryManager.java:85 - Initializing index summary manager with a memory pool size of 49 MB and a resize interval of 60 minutes

INFO [main] 2016-10-11 15:39:53,013 StorageService.java:621 - Loading persisted ring state
INFO [main] 2016-10-11 15:39:53,056 StorageService.java:794 - Starting up server gossip
INFO [main] 2016-10-11 15:39:53,247 MessagingService.java:540 - Starting Messaging Service on localhost/127.0.0.1:7000 (lo0)
INFO [main] 2016-10-11 15:39:53,318 StorageService.java:968 - Using saved tokens [-1036061867878377743, -1049032071638556980, ]
INFO [main] 2016-10-11 15:39:53,425 StorageService.java:1937 - Node localhost/127.0.0.1 state jump to NORMAL

INFO [main] 2016-10-11 15:39:53,785 Server.java:151 - Netty using Java NIO event loop
INFO [main] 2016-10-11 15:39:53,970 Server.java:185 - Starting listening for CQL clients on localhost/127.0.0.1:9042...
INFO [main] 2016-10-11 15:39:54,159 CassandraDaemon.java:439 - Not starting RPC server as requested. Use JMX (StorageService->startRPCServer()) or nodetool (enablethrift) to start it

停止Cassandra,会停止CassandraDaemon、Server(nativeServer)、Gossiper。因为默认没有启动ThriftServer,所以就不需要停止它了。

1
2
3
INFO  [RMI TCP Connection(5)-127.0.0.1] 2016-10-11 15:49:05,275 CassandraDaemon.java:451 - Cassandra shutting down...
INFO [RMI TCP Connection(5)-127.0.0.1] 2016-10-11 15:49:05,286 Server.java:218 - Stop listening for CQL clients
INFO [StorageServiceShutdownHook] 2016-10-11 15:49:05,292 Gossiper.java:1448 - Announcing shutdown

CassandraDaemon启动类,有三个主要的服务类:

  1. StorageService:存储相关的服务
  2. ThriftServer:Thrift协议
  3. Server:native网络传输通信服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static final CassandraDaemon instance = new CassandraDaemon();
public static void main(String[] args) {
instance.activate();
}
public void activate(){
setup();
start();
}
protected void setup(){
StorageService.instance.initServer();

int rpcPort = DatabaseDescriptor.getRpcPort();
int nativePort = DatabaseDescriptor.getNativeTransportPort();
thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
}
public void start() {
nativeServer.start();
thriftServer.start();
}

配置文件中端口和对应的实现类:

1
2
3
4
5
6
storage_port: 7000              --> StorageService
native_transport_port: 9042 --> nativeServer
rpc_port: 9160 --> ThriftServer

start_native_transport: true --> 默认开启native协议
start_rpc: false --> 默认关闭thrift协议

c-ports
图片来源:http://stackoverflow.com/questions/2359159/cassandra-port-usage-how-are-the-ports-used

CassandraDaemon的内部类Server有两个实现类,用于Thrift协议的o.a.c.thrift.ThriftServer,以及用于native二进制协议的o.a.c.transport.Server。

ThriftServer

cassandra.thrift文件在安装包的interface下,主要分为

  1. data structures(Column、SuperColumn等)
  2. service的struct数据结构:ConsistencyLevel、ColumnParent、ColumnPath、SliceRange、KeyRange、KeySlice、Deletion、Mutation、TokenRange、ColumnDef、CfDef、KsDef、ColumnSlice等
  3. service的api服务方法:get、get_slice、multiget_slice、get_range_slices、insert、add、remove、batch_mutate、get_multi_slice等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThriftServer implements CassandraDaemon.Server {
public void start() {
CassandraServer iface = getCassandraServer();
server = new ThriftServerThread(address, port, backlog, getProcessor(iface), getTransportFactory());
server.start();
}
private static class ThriftServerThread extends Thread {
private final TServer serverEngine;
public ThriftServerThread(...) {
serverEngine = new TServerCustomFactory(DatabaseDescriptor.getRpcServerType()).buildTServer(args);
}
public void run() {
serverEngine.serve();
}
}
}
public class CustomTThreadPoolServer extends TServer {
public void serve() {
serverTransport_.listen();
stopped = false;
while (!stopped) {
TTransport client = serverTransport_.accept();
processorFactory_.getProcessor(client_).process(input,output)
}
executorService.shutdown();
}
}

以get查询为例:cassandra.thrift的服务定义了get方法需要主键key、列路径ColumnPath、一致性级别

1
2
3
4
5
6
7
8
9
10
struct ColumnPath {
3: required string column_family,
4: optional binary super_column,
5: optional binary column,
}
service cassandra {
ColumnOrSuperColumn get(1:required binary key,
2:required ColumnPath column_path,
3:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
}

服务端的处理方法在interface/thrift/gen-java/o.a.c.thrift.Cassandra类的TProcessor中

1
2
3
4
5
6
7
public static class get<I extends Iface> extends org.apache.thrift.ProcessFunction<I, get_args> {
public get_result getResult(I iface, get_args args) throws org.apache.thrift.TException {
get_result result = new get_result();
result.success = iface.get(args.key, args.column_path, args.consistency_level);
return result;
}
}

最终会调用o.a.c.thrift.CassandraServer的get方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level) {
ThriftClientState cState = state();
String keyspace = cState.getKeyspace();
cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.SELECT);

CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family);
org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);

SortedSet<CellName> names = new TreeSet<CellName>(metadata.comparator);
names.add(metadata.comparator.cellFromByteBuffer(column_path.column));
IDiskAtomFilter filter = new NamesQueryFilter(names);

ReadCommand command = ReadCommand.create(keyspace, key, column_path.column_family, now, filter);
Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel, cState);
ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));

List<ColumnOrSuperColumn> tcolumns = thriftifyColumnFamily(cf, metadata.isSuper() && column_path.column != null, false, now);
return tcolumns.get(0);
}

根据客户端构造好的ReadCommand查询发生在readColumnFamily,并通过StorageProxy代理类完成读操作

1
2
3
4
5
6
7
8
9
protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level, ClientState cState) {
Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey, ColumnFamily>();
schedule(DatabaseDescriptor.getReadRpcTimeout());
List<Row> rows = StorageProxy.read(commands, consistency_level, cState);
for (Row row: rows) {
columnFamilyKeyMap.put(row.key, row.cf);
}
return columnFamilyKeyMap;
}

Java Driver(Netty)

DataStax的Java客户端使用Netty实现,Cassandra的native服务端协议也采用Netty实现。
所以先了解客户端怎么发送数据,才能知道服务端怎么接收数据。使用Driver,读取Cassandra版本的简单示例如下:

1
2
3
4
5
6
7
Cluster cluster = Cluster.builder()
.addContactPoints(CONTACT_POINTS).withPort(PORT)
.build();
Session session = cluster.connect();
ResultSet rs = session.execute("select release_version from system.local");
Row row = rs.one();
String releaseVersion = row.getString("release_version");

Session是客户端建立的和服务端的会话连接对象,当connect连接建立成功后,实际上客户端和服务端的网络通道已经都打通了。Connection是客户端和服务端节点实际的连接处理对象。

c-connection-init

DataStax的Driver是Netty的客户端,Cassadra的nativeServer是Netty的服务端。所以Driver采用Bootstrap连接服务端,服务端采用ServerBootstrap接受客户端的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
class Connection {
ListenableFuture<Void> initAsync() {
Bootstrap bootstrap = factory.newBootstrap();
ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions();
bootstrap.handler(
new Initializer(this, protocolVersion, protocolOptions.getCompression().compressor(), protocolOptions.getSSLOptions(),
factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds(),
factory.configuration.getNettyOptions(),
factory.configuration.getCodecRegistry()));

ChannelFuture future = bootstrap.connect(address);
}
}

客户端实际的Handler主要是Initializer,而其中处理请求的是Connection.Dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static class Initializer extends ChannelInitializer<SocketChannel> {
// Stateless handlers
private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
private static final Message.ProtocolEncoder messageEncoderV4 = new Message.ProtocolEncoder(ProtocolVersion.V4);
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
private final Connection connection;
private final FrameCompressor compressor;
private final NettyOptions nettyOptions;
private final ChannelHandler idleStateHandler;
private final CodecRegistry codecRegistry;

protected void initChannel(SocketChannel channel) throws Exception {
// set the codec registry so that it can be accessed by ProtocolDecoder
channel.attr(Message.CODEC_REGISTRY_ATTRIBUTE_KEY).set(codecRegistry);
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder", new Frame.Decoder());
pipeline.addLast("frameEncoder", frameEncoder);
if (compressor != null) {
pipeline.addLast("frameDecompressor", new Frame.Decompressor(compressor));
pipeline.addLast("frameCompressor", new Frame.Compressor(compressor));
}
pipeline.addLast("messageDecoder", messageDecoder);
pipeline.addLast("messageEncoder", messageEncoderFor(protocolVersion));
pipeline.addLast("idleStateHandler", idleStateHandler);
pipeline.addLast("dispatcher", connection.dispatcher);
nettyOptions.afterChannelInitialized(channel);
}
}

Dispatcher看起来只是负责读取消息的响应结果

1
2
3
4
5
6
7
8
9
class Dispatcher extends SimpleChannelInboundHandler<Message.Response> {
protected void channelRead0(ChannelHandlerContext ctx, Message.Response response) throws Exception {
int streamId = response.getStreamId();
ResponseHandler handler = pending.remove(streamId);
handler.cancelTimeout();
handler.callback.onSet(Connection.this, response, System.nanoTime() - handler.startTime, handler.retryCount);
if (isClosed()) tryTerminate(false);
}
}

那么客户端在哪里发送数据呢?我们从示例的session.execute看看能不能找到发送消息的线索。

1
2
3
4
5
public ResultSetFuture executeAsync(final Statement statement) {
DefaultResultSetFuture future = new DefaultResultSetFuture(this, cluster.manager.protocolVersion(), makeRequestMessage(statement, null));
new RequestHandler(this, future, statement).sendRequest();
return future;
}

makeRequestMessage会创建请求,那么sendRequest就会真正地发送请求了。

1
2
3
4
5
6
7
8
9
10
11
12
13
class RequestHandler {
void sendRequest() {
startNewExecution();
}
private void startNewExecution() {
//future就是callback,因为future中会makeRequestMessage,所以这里可以获取callback的Request
Message.Request request = callback.request();
SpeculativeExecution execution = new SpeculativeExecution(request, position);
runningExecutions.add(execution);
execution.sendRequest(); //发送请求,request封装在execution中
}

}

SpeculativeExecution是推测执行,其中QueryPlan是查询计划(根据客户端设置的负载均衡策略,路由客户端请求到不同的host节点,这个host就是传说中的Coordinator)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SpeculativeExecution implements Connection.ResponseCallback {
private final Message.Request request;

void sendRequest() {
Host host;
while (!isDone.get() && (host = queryPlan.next()) != null && !queryStateRef.get().isCancelled()) {
if (query(host)) return;
}
reportNoMoreHosts(this);
}
private boolean query(final Host host) {
HostConnectionPool currentPool = manager.pools.get(host);
if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true))
scheduleExecution(speculativeExecutionPlan.nextExecution(host));
Connection connection = currentPool.borrowConnection(manager.configuration().getPoolingOptions().getPoolTimeoutMillis(), TimeUnit.MILLISECONDS);
write(connection, this);
return true;
}

private void write(Connection connection, Connection.ResponseCallback responseCallback) throws ConnectionException, BusyConnectionException {
connectionHandler = connection.write(responseCallback, statement.getReadTimeoutMillis(), false);
}
}

query方法看起来把request对象丢了,不过write(connection, this)传递了this对象,仍然有机会取出request对象。
write方法继续传递responseCallback对象,可以看到callback.request()起死回生了,我们的请求对象request并没有丢失。
channel.writeAndFlush(request)是Netty写数据的方法,即客户端把请求对象发送给了服务端。

1
2
3
4
5
6
7
8
9
10
11
ResponseHandler write(ResponseCallback callback, long statementReadTimeoutMillis, boolean startTimeout) throws ConnectionException, BusyConnectionException {
ResponseHandler handler = new ResponseHandler(this, statementReadTimeoutMillis, callback);
dispatcher.add(handler);
Message.Request request = callback.request().setStreamId(handler.streamId);
if (DISABLE_COALESCING) { //直接写,不缓存
channel.writeAndFlush(request).addListener(writeHandler(request, handler));
} else { //缓存
flush(new FlushItem(channel, request, writeHandler(request, handler)));
}
return handler;
}

nativeServer(Netty)

native服务器使用Netty,ServerBootstrap绑定的Initializer添加了多种Handler组成ChannelPipeline:

  1. Frame解码、编码
  2. 消息解码、编码
  3. 消息分发(Dispatcher)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Server implements CassandraDaemon.Server {
private EventLoopGroup workerGroup;
private EventExecutor eventExecutorGroup;

private void run() {
eventExecutorGroup = new RequestThreadPoolExecutor();
boolean hasEpoll = enableEpoll ? Epoll.isAvailable() : false;
if (hasEpoll) {
workerGroup = new EpollEventLoopGroup();
} else {
workerGroup = new NioEventLoopGroup();
}
ServerBootstrap bootstrap = new ServerBootstrap()
.group(workerGroup)
.channel(hasEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, 0)
.childOption(ChannelOption.SO_KEEPALIVE, DatabaseDescriptor.getRpcKeepAlive())
.childOption(ChannelOption.ALLOCATOR, CBUtil.allocator)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
bootstrap.childHandler(new Initializer(this));
bootstrap.bind(socket);
}

private static class Initializer extends ChannelInitializer {
private final Server server;

protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst("connectionLimitHandler", new ConnectionLimitHandler()); //连接限制
pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory)); //Frame解码
pipeline.addLast("frameEncoder", new Frame.Encoder()); //Frame编码
pipeline.addLast("frameDecompressor", new Frame.Decompressor()); //Frame解压缩
pipeline.addLast("frameCompressor", new Frame.Compressor()); //Frame压缩
pipeline.addLast("messageDecoder", new Message.ProtocolDecoder()); //消息内容解码
pipeline.addLast("messageEncoder", new Message.ProtocolEncoder()); //消息内容编码
pipeline.addLast(server.eventExecutorGroup, "executor", new Message.Dispatcher()); //消息分发
}
}
}

编解码和请求、响应是对应的,比如服务端收到请求,将客户端发送的请求进行解码(ProtocolDecoder),服务端处理完毕后,将响应内容编码发送到客户端(ProtocolEncoder)。

CQL协议和Thrift协议一样,都需要事先定义好数据结构、服务方法等,CQL协议的说明文档在doc文件夹下,Frame的中文翻译是框架,所以它定义了消息内容的格式,其中Header消息头一共9个字节(40+32=72bits/8=9byte),消息内容是不定长的。Message是建立在Frame之上的消息类型(所以你可以看到Initializer构建ChannelPipeline是先Frame,然后是Message,最后是Message的Dispatcher,这跟请求的处理也是类型的:服务端先接收请求,然后解析出对应的请求类型,最后才处理请求)。

消息类型有多种:ERROR、STARTUP、QUERY、RESULT、PREPARE、EXECUTE、EVENT、BATCH,每种消息类型都指定了是Request还是Response。比如ERROR、RESULT、EVENT是Response,其他都是Request。

Dispatcher

服务端的Dispatcher处理器会接收请求、执行请求、返回响应结果。这里的flush和Netty客户端中发送请求时采用缓存形式的flush类似,
不过最终的目的都是发送数据给对端(客户端发送请求给服务端,服务端发送响应结果给客户端)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static class Dispatcher extends SimpleChannelInboundHandler<Request> {
public void channelRead0(ChannelHandlerContext ctx, Request request) {
ServerConnection connection = (ServerConnection)request.connection();
logger.trace("Received: {}, v={}", request, connection.getVersion());

Response response = request.execute(qstate); //服务端执行请求
response.setStreamId(request.getStreamId());
response.attach(connection);
connection.applyStateTransition(request.type, response.type);

logger.trace("Responding: {}, v={}", response, connection.getVersion());
flush(new FlushItem(ctx, response, request.getSourceFrame()));
}
}

以CQL查询为例,trace日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,345 Message.java:506 - Received: QUERY select * from velocity_app where attribute='zqhxuyuan' and type='login' and partner_code='tongdun' and app_name='tongdun_app';, v=4

TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,346 QueryProcessor.java:221 - Process org.apache.cassandra.cql3.statements.SelectStatement@37504b54 @CL.ONE
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,346 SliceQueryPager.java:92 - Querying next page of slice query; new filter: SliceQueryFilter [reversed=false, slices=[[, ]], count=100, toGroup = 1]
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,347 ReadCallback.java:76 - Blockfor is 1; setting up requests to localhost/127.0.0.1
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,347 AbstractReadExecutor.java:118 - reading data locally
TRACE [SharedPool-Worker-2] 2016-10-11 17:02:27,348 SliceQueryFilter.java:269 - collecting 0 of 100: 1111111111-1::false:0@1476176498640102
TRACE [SharedPool-Worker-2] 2016-10-11 17:02:27,348 SliceQueryFilter.java:269 - collecting 1 of 100: 1111111111-1:event:false:10@1476176498640102
TRACE [SharedPool-Worker-2] 2016-10-11 17:02:27,348 SliceQueryFilter.java:269 - collecting 1 of 100: 1111111111-1:timestamp:false:8@1476176498640102
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,348 StorageProxy.java:1444 - Read: 1 ms.
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,349 AbstractQueryPager.java:95 - Fetched 1 live rows
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,349 AbstractQueryPager.java:112 - Got result (1) smaller than page size (100), considering pager exhausted
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,349 AbstractQueryPager.java:133 - Remaining rows to page: 2147483646

TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,349 Message.java:525 - Responding: ROWS [attribute(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][partner_code(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][app_name(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][type(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][sequence_id(forseti, velocity_app), org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type)][event(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][timestamp(forseti, velocity_app), org.apache.cassandra.db.marshal.LongType]
| zqhxuyuan | tongdun | tongdun_app | login | 1111111111-1 | {jsondata} | 1111111111
---, v=4

如果开启tracing on,会显示查询语句在服务端的运行轨迹。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Tracing session: 6eceb810-8f91-11e6-a2b4-dbe2eb0e3cb9

activity | timestamp | source | source_elapsed
--------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------+----------------
Execute CQL3 query | 2016-10-11 17:02:27.345000 | 127.0.0.1 | 0
Parsing select * from velocity_app where attribute='zqhxuyuan' and type='login' and partner_code='tongdun' and app_name='tongdun_app'; [SharedPool-Worker-1] | 2016-10-11 17:02:27.345000 | 127.0.0.1 | 333
Preparing statement [SharedPool-Worker-1] | 2016-10-11 17:02:27.346000 | 127.0.0.1 | 730
Executing single-partition query on velocity_app [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2236
Acquiring sstable references [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2357
Merging memtable tombstones [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2446
Skipped 0/0 non-slice-intersecting sstables, included 0 due to tombstones [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2591
Merging data from memtables and 0 sstables [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2661
Read 1 live and 0 tombstone cells [SharedPool-Worker-2] | 2016-10-11 17:02:27.348000 | 127.0.0.1 | 3240
Request complete | 2016-10-11 17:02:27.349147 | 127.0.0.1 | 4147

可以看到日志文件第一行/最后一行打印的时间撮和tracing on的第一行/最后一行基本一致。

1
2
3
4
5
6
7
//日志文件
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,345 Message.java:506 - Received: QUERY
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,349 Message.java:525 - Responding: ROWS

//CQL tracing on
Execute CQL3 query | 2016-10-11 17:02:27.345000
Request complete | 2016-10-11 17:02:27.349147

QueryMessage

以o.a.c.transport.messages.QueryMessage请求为例,

1
2
3
4
5
public Message.Response execute(QueryState state) {
Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload());
return response;
}

CQLQueryHandler的处理器是QueryProcessor,

1
2
3
4
5
6
7
8
9
10
11
12
public ResultMessage process(String queryString, QueryState queryState, QueryOptions options) {
ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
options.prepare(p.boundNames);
CQLStatement prepared = p.statement;
return processStatement(prepared, queryState, options);
}
public ResultMessage processStatement(CQLStatement statement, QueryState queryState, QueryOptions options) {
logger.trace("Process {} @CL.{}", statement, options.getConsistency());
ClientState clientState = queryState.getClientState();
ResultMessage result = statement.execute(queryState, options);
return result == null ? new ResultMessage.Void() : result;
}

Response response = request.execute(qstate):我们举例了request是QueryMessage(即Message.Request类型),返回结果是ResultMessage,正好是Message.Response类型。
这和我们说的消息类型中,QUERY是Request,RESULT是Response就对应上来了。

现在从Message进入到Statement,以SelectStatement为例,我们终于看到了和thrift类似的StorageProxy代理调用

通常消息类型也会对应不同的Statement,比如QueryMessage对应了SelectStatement,Execute或Batch消息对应不同的Statement。
请求对象的转换:Request - Statement - Command。比如查询请求 - SelectStatement - ReadCommands。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException {
ConsistencyLevel cl = options.getConsistency();
int limit = getLimit(options);
Pageable command = getPageableCommand(options, limit, now);
int pageSize = getPageSize(options);
if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
return execute(command, options, limit, now, state); //不分页查询
QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState());
return execute(pager, options, limit, now, pageSize); //分页查询
}
private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state) {
List<Row> rows = command instanceof Pageable.ReadCommands
? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency(), state.getClientState())
: StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency());
return processResults(rows, options, limit, now);
}

不管是thrift协议的ThriftServer,还是二进制协议的Server,最终都会调用StorageProxy代理类。

c-storageproxy

StorageProxy

StorageProxy代理类的read方法根据一致性级别是不是Serial有两种:普通的读取和事务性的读取(Transaction)。

Cassandra的事务支持使用Paxos实现,对应的读方法是:readWithPaxos

CQL或者Driver客户端发送请求,某个服务端的StorageProxy接收请求,这个服务器叫做Coordinator协调节点。
协调节点接收客户端请求,除了ReadCommand读取请求外,还有consistencyLevel用来决定要不要往其他节点继续发送请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Performs the actual reading of a row out of the StorageService, fetching a specific set of column names from a given column family.
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state) {
return consistencyLevel.isSerialConsistency()
? readWithPaxos(commands, consistencyLevel, state)
: readRegular(commands, consistencyLevel);
}
private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel) {
List<Row> rows = fetchRows(commands, consistencyLevel);
return rows;
}
private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel){
List<Row> rows = new ArrayList<>(initialCommands.size());

// send out read requests
for (int i = 0; i < commands.size(); i++) {
ReadCommand command = commands.get(i);
AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
exec.executeAsync(); //异步线程执行
}
// read results and make a second pass for any digest mismatches
for (AbstractReadExecutor exec: readExecutors) {
Row row = exec.get(); //获取线程的执行结果
if (row != null) {
row = exec.command.maybeTrim(row);
rows.add(row); //添加到结果集中
}
}
return rows; //最终的查询结果
}

通过StorageProxy得到的查询结果怎么发送会客户端,则是由前面Netty的Dispatcher完成,不属于StorageProxy的职责。

读取的线程池有多种实现,比如不带推测的NeverSpeculatingReadExecutor。实际的读取线程还是被包装在ReadCommand中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor {
public void executeAsync() {
makeDataRequests(targetReplicas.subList(0, 1));
if (targetReplicas.size() > 1)
makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
}
}
private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints) {
for (InetAddress endpoint : endpoints) {
if (isLocalRequest(endpoint)) { //请求的目的地包含本地节点
hasLocalEndpoint = true;
continue;
}
MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
}
if (hasLocalEndpoint) { //立即在本地执行,由于还有远程数据需要读取,所以需要callback/handler
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
}
}

在前面启动CassandraDaemon时,我们说每个Cassandra都会启动Thrift和native两种服务器。对应的StorageProxy作为代理类会接收客户端发送的各种请求(比如读和写)。
但是作为分布式系统,客户端发送请求,具体要交给哪些节点处理呢?Cassandra中有一个协调者的角色表示接收客户端的请求所在的节点,但这个节点可能并不是真正存储数据的节点,
它会将客户端的请求转发到其他真正应该需要存储数据的节点。读取和存储一样,如果数据没有存储在协调者节点上,也就无法从协调者读取数据,那么协调者也应该负责发送读取请求到
真正存储数据的节点,然后等待真实节点返回数据给协调者,再由协调者返回数据给客户端。

这里接收请求的节点即协调者,就会负责makeRequests创建请求。如果说客户端的请求正好也会存储到当前协调者上,那么协调者就可以直接存储数据了。
所以如果满足isLocalRequest,就会在本地节点通过maybeExecuteImmediately立即执行命令。对于其他非本地的远程节点,则通过sendRRWithFailure把带有命令的请求发送出去(发送到哪个目标节点,由第二个参数endpoint决定)。

LocalReadRunnable封装了ReadCommand线程类和回调函数,实际的读取在command.getRow,最后返回Row一行记录。ReadCommand有两种实现:SliceFromReadCommand和SliceByNamesReadCommand。

1
2
3
4
5
6
7
8
9
10
11
static class LocalReadRunnable extends DroppableRunnable {
private final ReadCommand command;
private final ReadCallback<ReadResponse, Row> handler;

protected void runMayThrow() {
Keyspace keyspace = Keyspace.open(command.ksName);
Row r = command.getRow(keyspace);
ReadResponse result = ReadVerbHandler.getResponse(command, r);
handler.response(result);
}
}

我们来看下客户端调用StorageProxy的命令(比如ReadCommand)是如何在服务端传输的

  1. AbstractReadExecutor(比如NeverSpeculatingReadExecutor),然后调用executeAsync执行线程池
  2. LocalReadRunnable,调用maybeExecuteImmediately执行线程
  3. 在LocalReadRunnable里,runMayThrow会开始真正执行ReadCommand的getRow指令

c-proxyread

从客户端到服务端的StorageProxy执行读取请求,最终返回读取结果给客户端的流程如下:

c-network-flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Client(Netty)->Server(Netty): channel.write(request)
Server(Netty)->Dispatcher: pipeline
Dispatcher->Request: execute
Request->Statement: execute
Statement->StorageProxy: read
StorageProxy->ReadExecutor: executeAsync
ReadExecutor->LocalReadRunnable: runMayThrow
StorageProxy->ReadExecutor: get
LocalReadRunnable->ReadCommand: getRow(keyspace): row
LocalReadRunnable->ReadVerbHandler: getResponse(command,row): result
LocalReadRunnable->ReadVerbHandler: response(result)
ReadExecutor->MessageService: sendRR(message,endpoint,handler)
StorageProxy->Dispatcher: query result pass to dispatcher
Dispatcher->Client(Netty): channel.write(response)

协调节点选择目标节点

NeverSpeculatingReadExecutor异步方式执行读取请求,其中targetReplicas决定了要发送请求给哪些节点。
targetReplicas是一个列表,第一个节点发送Data请求,其他剩余节点发送Digest请求。targetReplicas定义在
父类AbstractReadExecutor中,并且在构造ReadExecutor实例对象的时候被赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor {
public NeverSpeculatingReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) {
super(command, consistencyLevel, targetReplicas);
}
public void executeAsync() {
makeDataRequests(targetReplicas.subList(0, 1));
if (targetReplicas.size() > 1)
makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
}
public Collection<InetAddress> getContactedReplicas() {
return targetReplicas;
}
}

具体的AbstractReadExecutor实例,只有在执行请求的时候才开始被创建。targetReplicas目标节点的分布和数据模型的主键key相关,因为key决定了分布在哪些节点上。
RingPosition是一个具体的Token值,Cassandra每个节点都有个多个Token Range,当对key计算得到一个具体的Token值时,它是一定能够落在特定的节点上的。
当然为了保证数据的可靠性,会将同一份数据分布在多个节点上,根据副本策略,比如SimpleStrategy会顺时针依次存放到三个不同的节点上。其他策略会考虑机架等因素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//StorageProxy处理读取请求,创建ReadExecutor线程
private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel) {
AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
}

//AbstractReadExecutor
public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException {
Keyspace keyspace = Keyspace.open(command.ksName);
List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.key); //如何获取key的所有副本???
ReadRepairDecision repairDecision = Schema.instance.getCFMetaData(command.ksName, command.cfName).newReadRepairDecision();
List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas); //如果没有足够的副本,提前抛出UAE异常

ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
// Speculative retry is disabled *OR* there are simply no extra replicas to speculate. 禁用重试,或者没有可用的副本来满足重试
if (retryType == RetryType.NONE || consistencyLevel.blockFor(keyspace) == allReplicas.size())
return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
//还有其他实现
}

//StorageProxy,将key根据Partitioner包装成RingPosition。Partitioner是对key的hash计算方式
public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, ByteBuffer key) {
return getLiveSortedEndpoints(keyspace, StorageService.getPartitioner().decorateKey(key));
}
private static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos) {
List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, pos);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
return liveEndpoints;
}

为了获取指定searchPosition在TokenMetadata中的位置,给定所有已经排序好的sortedTokens,再根据二分查找就可以很快查询出对应的Token了。
如何根据指定的searchToken获取所有副本应该存储的节点位置,定义了一个抽象方法calculateNaturalEndpoints计算自然点,交给子类实现。

可见TokenMetadata是数据源,Cassandra节点在启动后,应该将所管理的Tokens都更新到TokenMetadata,
这个TokenMetadta在每个节点上上的数据应该是一样的,确保客户端不管请求哪个协调者都得到相同的元数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//AbstractReplicationStrategy
public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition) {
Token searchToken = searchPosition.getToken();
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
if (endpoints == null) {
TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
// if our cache got invalidated, it's possible there is a new token to account for too
keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm));
cachedEndpoints.put(keyToken, endpoints);
}
return new ArrayList<InetAddress>(endpoints);
}
public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);

最简单的SimpleStrategy,通过ringIterator迭代TokenMetadata。TokenMetadata包含了所有的Token,每个Token都对应一个节点地址endpoint。
注意Cassandra使用VNodes,每个节点负责的TokenRange是不连续的。所以如果顺序分配Token的话,有可能都是同一个endpoint。
举例:[Token1:Node1, Token2:Node1, Token3:Node2, Token4:Node2, Token5:Node1, Token6:Node3, …],
如果顺序分配Token=[Token1:Node1, Token2:Node1, Token3:Node2],则只能找到两个节点=[Node1,Node2],所以查找时还要去重节点信息。
实际上的选择策略是:[Token1:Node1, Token3:Node2, Token6:Node3],最终选择的节点=[Node1,Node2,Node3]。

你可能会问假设key对应的是Token1,而上面却选择了[Token1, Token3, Token6],而Token3和Token6根本就不等于Token1。
但实际上我们选择的是节点,而不是Token!

1
2
3
4
5
6
7
8
9
10
11
12
13
//SimpleStrategy
public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata) {
int replicas = getReplicationFactor();
ArrayList<Token> tokens = metadata.sortedTokens();
List<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false);
while (endpoints.size() < replicas && iter.hasNext()) {
InetAddress ep = metadata.getEndpoint(iter.next());
if (!endpoints.contains(ep))
endpoints.add(ep);
}
return endpoints;
}

Token、TokenMetadata会在《存储引擎》中详细介绍

StorageService

CassandraDaemon在启动thrift服务器和native服务器之前,先初始化了StorageService。刚启动的Cassandra会尝试加入集群,其中和网络相关的是MessagingService消息服务。

StorageService类似前面的ThriftServer和native netty Server,都是一种服务端的实现。只不过StorageService负责存储,而前两者负责消息传输、RPC调用。

问题:StorageProxy可以看做是StorageService的前置代理类,客户端请求要先经过StorageProxy才能到达StorageService。还是说StorageProxy和StorageService是平等的关系?
实际上两者应该是平等的,我们并没有看到StorageProxy到StorageService的调用。再者,StorageService本身也是可以接收客户端请求的。

1
2
3
4
5
6
7
8
9
10
11
public synchronized void initServer(int delay) {
prepareToJoin();
}
private void prepareToJoin() {
Gossiper.instance.register(this);
Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates);
MessagingService.instance().listen();
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
}

StorageService除了消息的存储服务类MessagingService外,还有其他和消息存储相关的第三方类,这些类共同组成了Cassandra分布式的存储特性,包括:

  1. Gossiper协议,用来保证集群、节点的一致性
  2. HintedHandOffManager,在节点出现异常时,管理暂时失败的请求
  3. BatchlogManager,提交日志管理类

MessagingService

StorageService中启动的MessagingService使用的是storage_port=7000端口(服务器和服务器之间的协议),
而使用Netty的nativeServer使用native_transport_port=9042端口(客户端和服务端的协议)。
为什么要有两种端口,而且这两种端口,最后的服务都会走到ReadCommand.getRow()上去,为什么不统一成一个端口?
这是因为客户端《==》服务器,服务器《==》服务器,两者的处理协议不同。服务器和服务器之间采用原始IO处理消息,不使用Netty。
c-port-protocol

在下面的分析中,你会看到MessageService的原始ServerSocket也能处理请求(节点之间),而前面分析的CQL等客户端(Netty)通过StorageProxy也可以处理请求。
c-read-native-message

消息服务采用原始的ServerSocket,启动服务端线程后,在SocketThread中开始接受客户端请求,客户端请求的类型包括stream和普通的消息。

Streaming消息也包括多种类型,主要发生于节点之间数据的流式交换,比如sstableloader,nodetool repair都会产生streaming线程。
http://www.datastax.com/dev/blog/streaming-in-cassandra-2-0

一个Cassandra服务端节点通常只有一个ServerSocket,作为服务端要接受客户端的连接,通常服务端会为每个连接的客户端建立一个Socket,即会在服务端维护很多Socket连接。
getServerSockets()中会绑定服务端端口,这样在创建SocketThread时,就可以开始接受客户端连接了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private List<ServerSocket> getServerSockets(InetAddress localEp) {
ServerSocketChannel serverChannel = serverChannel = ServerSocketChannel.open();
ServerSocket socket = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
socket.bind(address,500);
return ArrayList.asList(socket);
}
private void listen(InetAddress localEp) throws ConfigurationException {
for (ServerSocket ss : getServerSockets(localEp)) { //监听storage port端口,如果有加密,会有两个端口
SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
th.start();
socketThreads.add(th);
}
}
public static class SocketThread extends Thread {
private final ServerSocket server;
public final Set<Closeable> connections = Sets.newConcurrentHashSet();

public void run() {
while (!server.isClosed()) {
Socket socket = server.accept();
DataInputStream in = new DataInputStream(socket.getInputStream());
int header = in.readInt();
boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
int version = MessagingService.getBits(header, 15, 8);
Thread thread = isStream ? new IncomingStreamingConnection(version, socket, connections)
: new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket, connections);
thread.start();
connections.add((Closeable) thread);
}
}
void close() throws IOException {
server.close(); //关闭ServerSocket线程
for (Closeable connection : connections) connection.close(); //关闭TCP连接处理消息的线程
}
}

IncomingTcpConnection是一个后台线程类,会不停地读取并处理消息,然后交给MessagingService的实例处理。

一个StorageService对应一个ServerSocket,每个ServerSocket只有一个SocketThread,SocketThread线程会循环接受客户端连接,TCP连接线程会循环处理消息。
如果Socket线程挂了,说明服务端节点挂掉了,那么所有已经连接的客户端也就丢失了连接,TCP连接线程也就不需要处理消息。
c-socket connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run(){
receiveMessages();
}
private void receiveMessages() {
DataInput in = new DataInputStream(socket.getInputStream());
while (true){
receiveMessage(in, version);
}
}
private InetAddress receiveMessage(DataInput input, int version) throws IOException {
MessageIn message = MessageIn.read(input, version, id);
MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
return message.from;
}

MessageIn根据输入流构造,其中最关键的是verb,用来决定是哪种类型的消息。

1
2
3
4
5
6
7
public class MessageIn<T> {
public final InetAddress from;
public final T payload;
public final Map<String, byte[]> parameters;
public final MessagingService.Verb verb;
public final int version;
}

MessagingService.receive(MessageIn)接收到消息后会创建一个MessageDeliveryTask,每个Task会在不同Stage的ThreadPool中运行

1
2
3
4
5
public void receive(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp) {
Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp);
LocalAwareExecutorService stage = StageManager.getStage(message.getMessageType());
stage.execute(runnable, ExecutorLocals.create(state));
}

MessageDeliveryTask也是一个线程,不过它是被线程池调度的,执行完了就完了,不像IncomingTcpConnection那样永远不会结束。

1
2
3
4
5
6
7
8
9
public class MessageDeliveryTask implements Runnable {
private final MessageIn message;

public void run() {
MessagingService.Verb verb = message.verb;
IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
verbHandler.doVerb(message, id);
}
}

先来看下线程的调度,是通过LocalAwareExecutorService,类似线程池。注意execute方法并没有真正执行任务,而是把Runnable的任务包装成FutureTask,并等待后续的某个时间才开始调度。

1
2
3
4
5
6
7
8
9
10
11
public abstract class AbstractLocalAwareExecutorService implements LocalAwareExecutorService
public void execute(Runnable command, ExecutorLocals locals) {
addTask(newTaskFor(command, null, locals));
}
protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, ExecutorLocals locals) {
if (locals != null) {
return new LocalSessionFutureTask<T>(runnable, result, locals);
}
return new FutureTask<>(runnable, result);
}
}

服务端只有一个ServerSocket,接受每个客户端连接,都会产生一个Socket,并不是说所有客户端只产生一个Socket(JAVA基础)。
每个Socket都会产生一个IncommingTcpConnection。Cassandra服务端之间的通信采用MessagingService(客户端和服务端采用Netty),
下图中假设其他Cassandra服务端[1]发送了三个请求给另一个Cassandra服务端[2],所以在服务端2上只会有一个连接,而不是三个连接。
(如果是有三个Cassandra服务端[1,2,3]各自发送一个请求给节点4,则服务端4上会有三个连接)。
图中左上角是其他节点发送请求,其余的都是在当前节点上执行。
c-message service

前面我们说过协调者会将客户端请求转发到非本地节点,实际上使用的是OutboundTcpConnection,那么对于服务端接收客户端消息,则用的是StorageService的IncomingTcpConnection
c-msginout

比较StorageProxy和StorageService的MessageService的一些共同点:

服务类 线程池 指令 线程 真正执行方法
StorageProxy AbstractReadExecutor ReadCommand LocalReadRunnable ReadCommand.getRow
MessageService LocalAwareExecutorService MessageIn MessageDeliveryTask/ExecutorLocals IVerbHandler.doVerb(messageIn)

JMC查看线程相关信息,包括服务端线程(一个SocketThread)、输入(Incoming)、输出(Outgoing)
c-network thread |
c-socket thread

消息最终通过VerbHandler被处理,那么我们接着举例读写相关的Handler。

IVerbHandler接口

IVerbHandler和消息类型一样有多种实现类。

思考下前面使用StorageProxy时,ReadCommand直接执行getRow方法,而用IVerbHandler,则对应使用ReadVerbHandler.doVerb(messageIn),其中messageIn就是ReadCommand。
所以实际上ReadVerbHandler是ReadCommnad的一层封装而已,在ReadVerbHandler.doVerb中最终还是会调用到ReadCommand.getRow方法。
那么为什么要有ReadCommand和ReadVerbHandler两种实现呢,实际上ReadCommand仅仅是Read操作的处理方式,而ReadVerbHandler不仅包括要调用ReadCommand,还要负发送请求。

c-verbhandler

ReadVerbHandler

Mutation是写操作,Read是读操作,读写操作都会返回响应给客户端。只不过读操作要将读取结果集Row对象封装到MessageOut中。读写的区别是message的payload,读是ReadCommand,而写是Mutation。这里的读是根据主键唯一查询,如果是根据主键进行能范围查询,则对应RangeSliceVerbHandler。

MessageIn类似于StorageProxy的Message.Request,而MessageOut就等价于Message.Response。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ReadVerbHandler implements IVerbHandler<ReadCommand> {
public void doVerb(MessageIn<ReadCommand> message, int id) {
ReadCommand command = message.payload;
Keyspace keyspace = Keyspace.open(command.ksName);
Row row = command.getRow(keyspace);

MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE, getResponse(command, row), ReadResponse.serializer);
MessagingService.instance().sendReply(reply, id, message.from);
}

public static ReadResponse getResponse(ReadCommand command, Row row) {
if (command.isDigestQuery()) {
return new ReadResponse(ColumnFamily.digest(row.cf));
} else {
return new ReadResponse(row);
}
}
}
public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand> {
public void doVerb(MessageIn<AbstractRangeCommand> message, int id) {
RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
}
}

ReadVerbHandler会调用ReadCommand的实际业务处理getRow方法,而且还要将读取结果发送回源节点:MessageIn中不仅带有具体的操作指令,还有这些指令的来源。
比如Server1发送了ReadCommand给Server2(表示Server1要读取Server2),那么message中不仅有ReadCommand,还表示ReadCommand是从Server1过来的。
所以在Server2节点上,ReadVerbHandler执行完ReadCommand后,要将读取结果返回给Server1。

ReadCommand有两个实现类:SliceFromReadCommand和SliceByNamesReadCommand,同样读操作会通过Keyspace->ColumnFamilyStore->ColumnFamily。

在StorageProxy.read中,最终也会到达ReadCommand。那么为什么有两种读取实现呢?其实通过IVerbHandler是以接收消息的形式,一旦节点接收到读命令后,接着读取keyspace。
而StorageProxy可以看做是协调节点,如果请求发送的目标endpoints中包含当前本地节点,也需要读取数据,这时不是以接收消息的形式,而是直接RPC的形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
//SliceFromReadCommand
public Row getRow(Keyspace keyspace) {
CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}

//Keyspace
public Row getRow(QueryFilter filter) {
ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
return new Row(filter.key, columnFamily);
}

DecoratedKey会在《存储引擎》中详细介绍。

协调节点、MessageService

StorageProxy协调节点如果含有请求的数据,本地线程LocalReadRunnable执行ReadCommand的getRow方法,通过handler.response处理读取结果。

注意这里并不是执行将读取结果返回给客户端的逻辑,因为发送响应并不是由StorageProxy完成(涉及到和客户端的交互都是用Netty完成)。

1
2
3
4
5
6
7
8
9
10
11
static class LocalReadRunnable extends DroppableRunnable {
private final ReadCommand command;
private final ReadCallback<ReadResponse, Row> handler;

protected void runMayThrow() {
Keyspace keyspace = Keyspace.open(command.ksName);
Row r = command.getRow(keyspace);
ReadResponse result = ReadVerbHandler.getResponse(command, r);
handler.response(result);
}
}

StorageProxy如果本身没有数据,或者即使有数据,还要将请求发送给其他节点来满足一致性级别。
其他Cassandra节点使用7000对应的storage_port通过MessageService处理协调节点发送过来的请求。
MessageService接收协调节点转发的请求,执行ReadCommand.getRow和协调节点本地线程类似。

执行ReadCommand.getRow总是会返回Row读取结果,不管是不是协调节点,都要封装出对应的响应对象ReadResponse。
但是后续的处理有点不同,协调节点调用handler.response,而非协调节点则通过MessageService将MessageOut返回给协调节点。

协调节点的Handler是ReadCallback,它会启动一个后台的digest比较工作,所以和请求处理关联不是很大,这里暂时省略。

1
2
3
4
5
6
7
8
9
public class ReadVerbHandler implements IVerbHandler<ReadCommand> {
public void doVerb(MessageIn<ReadCommand> message, int id) {
ReadCommand command = message.payload;
Keyspace keyspace = Keyspace.open(command.ksName);
Row row = command.getRow(keyspace);
MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(Verb.REQUEST_RESPONSE, getResponse(command, row), ReadResponse.serializer);
MessagingService.instance().sendReply(reply, id, message.from);
}
}

比较协调节点和非协调节点的区别是:协调节点会转发客户端的请求给非协调节点,非协调节点在处理请求完成后要返回响应结果给协调节点。
两者的共同点是:在处理客户端的请求时,都是使用ReadCommand.getRow获取在自己本地节点的数据。最后协调节点因为要汇聚多个非协调
节点的数据,所以有一个Handler做一些回调相关的工作,但这个工作并不是发送响应结果返回给客户端,而是做一些校验、修复等。

c-internode-resp

MutationVerbHandler

Cassandra的Insert、Update、Delete都属于Mutation,所以MutationVerbHandler处理的是Mutation操作。
和读操作不同的是,读取数据时可能只会读取一个节点,其他节点读取的是Digest。而写操作要将写发送到每个副本上去。
当然MutationVerbHandler本身不会去实现副本复制。它只负责要么将Mutation存储到本地,要么将Mutation发送出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MutationVerbHandler implements IVerbHandler<Mutation> {
public void doVerb(MessageIn<Mutation> message, int id) throws IOException {
byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
InetAddress replyTo;
if (from == null) {
replyTo = message.from;
byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
if (forwardBytes != null)
forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
} else {
replyTo = InetAddress.getByAddress(from);
}
message.payload.apply(); //这里是重点
WriteResponse response = new WriteResponse();
MessagingService.instance().sendOneWay(response.createMessage(), id, replyTo);
}

private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException {
try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes))) {
int size = in.readInt();
// tell the recipients who to send their ack to
MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
// Send a message to each of the addresses on our Forward List
for (int i = 0; i < size; i++) {
InetAddress address = CompactEndpointSerializationHelper.deserialize(in);
int id = in.readInt();
MessagingService.instance().sendOneWay(message, id, address);
}
}
}
}

Mutation的apply会将Mutation运用到Keyspace->ColumnFamilyStore,最终我们看到了分布式存储系统中熟悉的Memtable这个对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//Mutation
public void apply() {
Keyspace ks = Keyspace.open(keyspaceName);
ks.apply(this, ks.getMetadata().durableWrites);
}

//Keyspace
public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) {
try (OpOrder.Group opGroup = writeOrder.start()) {
ReplayPosition replayPosition = null;
if (writeCommitLog) replayPosition = CommitLog.instance.add(mutation);
DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
for (ColumnFamily cf : mutation.getColumnFamilies()) {
ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
SecondaryIndexManager.Updater updater = updateIndexes
? cfs.indexManager.updaterFor(key, cf, opGroup)
: SecondaryIndexManager.nullUpdater;
cfs.apply(key, cf, updater, opGroup, replayPosition);
}
}
}

//ColumnFamilyStore
public void apply(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition) {
Memtable mt = data.getMemtableFor(opGroup, replayPosition);
mt.put(key, cf, indexer, opGroup);
maybeUpdateRowCache(key);
}

Response

MutationVerbHandler将Mutation运用到本地结束后,要返回结果给客户端。就像MessagingService接收请求后使用IncomingTcpConnection->MessageDeliveryTask线程操作读,返回结果会使用OutboundTcpConnection线程完成写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    public void sendOneWay(MessageOut message, int id, InetAddress to) {
OutboundTcpConnection connection = getConnection(to, message);
connection.enqueue(message, id); //队列
}

public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg) {
return getConnectionPool(to).getConnection(msg);
}
public OutboundTcpConnectionPool getConnectionPool(InetAddress to) {
OutboundTcpConnectionPool cp = connectionManagers.get(to);
if (cp == null) {
cp = new OutboundTcpConnectionPool(to);
OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp);
if (existingPool != null) cp = existingPool;
else cp.start();
}
cp.waitForStarted();
return cp;
}
}

每个节点都会创建三个OutboundTcpConnection,启动OutboundTcpConnectionPool时会同时启动三个OutboundTcpConnection

1
2
3
4
5
6
7
8
9
10
OutboundTcpConnectionPool(InetAddress remoteEp) {
smallMessages = new OutboundTcpConnection(this);
largeMessages = new OutboundTcpConnection(this);
gossipMessages = new OutboundTcpConnection(this);
}
public void start(){
smallMessages.start();
largeMessages.start();
gossipMessages.start();
}

nodetool的netstats命令最后三行(Pool Name 连接池名称)对应上面的三种OutboundTcpConnection。

1
2
3
4
5
6
7
8
9
10
11
➜  apache-cassandra-2.2.6 bin/nodetool -h 192.168.6.52 netstats
Mode: NORMAL
Not sending any streams.
Read Repair Statistics:
Attempted: 376745
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name Active Pending Completed
Large messages n/a 0 9
Small messages n/a 0 2495610
Gossip messages n/a 0 2390273

OutboundTcpConnection的enqueue会将消息入队列,后台线程会从队列中取出消息执行write方法,将消息发送出去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void enqueue(MessageOut<?> message, int id) {
if (backlog.size() > 1024)
expireMessages();
backlog.put(new QueuedMessage(message, id));
}
public void run() {
final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
while (true) {
cs.coalesce(backlog, drainedMessages, drainedMessageSize);
for (QueuedMessage qm : drainedMessages) {
writeConnected(qm, count == 1 && backlog.isEmpty());
}
drainedMessages.clear();
}
}
private void writeConnected(QueuedMessage qm, boolean flush) {
writeInternal(qm.message, qm.id, timestampMillis);
completed++;
if (flush) out.flush();
}
private void writeInternal(MessageOut message, int id, long timestamp) throws IOException {
out.writeInt(MessagingService.PROTOCOL_MAGIC);
out.writeInt(id);
out.writeInt((int) timestamp);
message.serialize(out, targetVersion);
}

OutboundTcpConnection线程实际上和StorageService用的都是Storage端口(7000),这也说明了:
涉及到节点之间数据发送,使用的是storage_port,比如协调节点转发客户端请求给其他节点,
或者其他节点在处理完协调节点转发的客户端请求后,要将响应结果先返回给协调节点。

c-storageport


下面是读操作引起Java堆内存溢出的堆栈信息,有可能是读操作将数据不断放入内存,导致内存不足引起内存溢出。
最终调用的是OnDiskAtom的deserializeFromSSTable,即读取SSTable时反序列化的数据会写到内存中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
ERROR 19:20:50,316 Exception in thread Thread[ReadStage:63,5,main]

java.lang.OutOfMemoryError: Java heap space
at java.nio.ByteBuffer.wrap(ByteBuffer.java:350)
at java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
at org.apache.cassandra.io.util.RandomAccessReader.readBytes(RandomAccessReader.java:391)
at org.apache.cassandra.utils.ByteBufferUtil.read(ByteBufferUtil.java:392)
at org.apache.cassandra.utils.ByteBufferUtil.readWithShortLength(ByteBufferUtil.java:371)
at org.apache.cassandra.db.OnDiskAtom$Serializer.deserializeFromSSTable(OnDiskAtom.java:84)
at org.apache.cassandra.db.OnDiskAtom$Serializer.deserializeFromSSTable(OnDiskAtom.java:73)
at org.apache.cassandra.db.columniterator.IndexedSliceReader$IndexedBlockFetcher.getNextBlock(IndexedSliceReader.java:370)
at org.apache.cassandra.db.columniterator.IndexedSliceReader$IndexedBlockFetcher.fetchMoreData(IndexedSliceReader.java:325)
at org.apache.cassandra.db.columniterator.IndexedSliceReader.computeNext(IndexedSliceReader.java:151)
at org.apache.cassandra.db.columniterator.IndexedSliceReader.computeNext(IndexedSliceReader.java:48)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.apache.cassandra.db.columniterator.SSTableSliceIterator.hasNext(SSTableSliceIterator.java:90)
at org.apache.cassandra.db.filter.QueryFilter$2.getNext(QueryFilter.java:171)
at org.apache.cassandra.db.filter.QueryFilter$2.hasNext(QueryFilter.java:154)
at org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:143)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:122)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:96)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.apache.cassandra.db.filter.SliceQueryFilter.collectReducedColumns(SliceQueryFilter.java:157)
at org.apache.cassandra.db.filter.QueryFilter.collateColumns(QueryFilter.java:136)
at org.apache.cassandra.db.filter.QueryFilter.collateOnDiskAtom(QueryFilter.java:84)
at org.apache.cassandra.db.CollationController.collectAllData(CollationController.java:293)
at org.apache.cassandra.db.CollationController.getTopLevelColumns(CollationController.java:65)
at org.apache.cassandra.db.ColumnFamilyStore.getTopLevelColumns(ColumnFamilyStore.java:1357)
at org.apache.cassandra.db.ColumnFamilyStore.getColumnFamily(ColumnFamilyStore.java:1214)
at org.apache.cassandra.db.ColumnFamilyStore.getColumnFamily(ColumnFamilyStore.java:1126)
at org.apache.cassandra.db.Table.getRow(Table.java:347)
at org.apache.cassandra.db.SliceFromReadCommand.getRow(SliceFromReadCommand.java:70)
at org.apache.cassandra.service.StorageProxy$LocalReadRunnable.runMayThrow(StorageProxy.java:1052)

Tracing on

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Message.Response execute(QueryState state) {
try {
UUID tracingId = null;
if (isTracingRequested()) {
tracingId = UUIDGen.getTimeUUID();
state.prepareTracingSession(tracingId);
}
if (state.traceNextQuery()) {
state.createTracingSession();
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.put("query", query);
Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
}
Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload());
return response;
} finally {
Tracing.instance.stopSession(); //logger.trace("request complete");
}
}

Gossip

http://blog.csdn.net/FireCoder/article/details/5707539
http://blog.csdn.net/zhangzhaokun/article/details/5859760
http://wiki.apache.org/cassandra/ArchitectureGossip
http://thelastpickle.com/blog/2011/12/15/Anatomy-of-a-Cassandra-Partition.html


文章目录
  1. 1. CassandraDaemon
  2. 2. ThriftServer
  3. 3. Java Driver(Netty)
  4. 4. nativeServer(Netty)
    1. 4.1. Dispatcher
    2. 4.2. QueryMessage
  5. 5. StorageProxy
    1. 5.0.1. 协调节点选择目标节点
  • 6. StorageService
    1. 6.1. MessagingService
    2. 6.2. IVerbHandler接口
      1. 6.2.1. ReadVerbHandler
        1. 6.2.1.1. 协调节点、MessageService
      2. 6.2.2. MutationVerbHandler
      3. 6.2.3. Response
  • 7. Tracing on
  • 8. Gossip