Kafka技术内幕拾遗

Kafka技术内幕拾遗

  • ✅ 客户端元数据(Metadata)
  • ✅ 客户端线程模型(ThreadModel)
  • 即席查询(Interactive Query)
  • EOS事务(Transaction)

客户端的元数据对象

客户端的连接对象(NetworkClient)在轮询时会判断是否需要更新元数据。客户端调用元数据更新器的maybeUpdate()方法,并不一定每次都需要更新元数据。只有当元数据的超时时间(metadataTimeout)等于0时,客户端才会发送元数据请求。

1. 客户端轮询与元数据更新器

客户端调用选择器的轮询方法,最长的阻塞时间会在“轮询时间(pollTimeout)、元数据的更新时间(metadataTimeout)、请求的超时时间(requestTimeoutMs)”三者中选取最小值。如果元数据的更新时间等于0,表示客户端会立即发送元数据请求,不会阻塞。下面解释这几个时间变量的数据来源,以及它们在发送请求过程中所代表的含义。

  • 生产者的requestTimeoutMs变量,对应的配置项是request.timeout.ms,默认值30秒。该配置表示生产者等待收到响应结果的最长时间。如果生产者在这个时间超时后没有收到响应结果,就会认为生产请求失败,它可以重新发送生产请求。
  • 生产者的retryBackoffMs变量,对应的配置项是retry.backoff.ms,默认值100毫秒。该配置表示客户端发送请求失败时,为了避免在短时间内客户端重复地发送请求导致重试次数用光,客户端必须要等待一小会儿才允许发送新的请求。这个配置项可用于元数据请求、生产请求和拉取请求,但只有在发送失败时才会用到。该配置会传给元数据对象(元数据请求)、记录收集器(生产请求)。
  • 生产者的lingerMs变量,对应的配置项是linger.ms,默认值为0毫秒。该配置表示生产者在发送请求之前是否会延迟等待一段时间收集更多的消息。如果等于0,表示生产者会立即发送请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 客户端的网络连接对象在每次轮询之前,都会判断是否需要更新元数据
public class NetworkClient implements KafkaClient {
private final MetadataUpdater metadataUpdater; // 元数据的更新器

// 生产者会由发送线程调用该方法,消费者会由ConsumerNetworkClient调用该方法
public List<ClientResponse> poll(long pollTimeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
selector.poll(Utils.min(pollTimeout,metadataTimeout,requestTimeoutMs));
}
class DefaultMetadataUpdater implements MetadataUpdater {
Metadata metadata;
void maybeUpdate(long now, Node node) {
// 这里简化了其他一些判断条件,实际的超时时间计算方式比较复杂
long metadataTimeout = metadata.timeToNextUpdate(now);
if(metadataTimeout == 0) // 准备发送“获取元数据”的请求
doSend(new MetadataRequest(metadata.topics()), now);
return metadataTimeout;
}
// 处理“获取元数据请求”的响应
void handleResponse(RequestHeader header, Struct body, long now) {
Cluster cluster = new MetadataResponse(body).cluster();
this.metadata.update(cluster, now); // 更新元数据的具体逻辑
}
}
}

客户端每次轮询收到元数据请求的响应结果后,会解析成Cluster对象,然后更新元数据对象。

2. 元数据对象

元数据对象有多个用于控制元数据更新策略的变量,相关的时间配置项主要有下面几个。

  • metadata.fetch.timeout.ms(生产者的maxBlockTimeMs变量,默认值为60秒):生产者第一次发送消息,如果主题没有分区,它等待元数据更新的最长阻塞时间(第7.3.2节第三小节)。
  • metadata.max.age.ms(元数据的metadataExpireMs变量,默认值为五分钟):即使不需要更新元数据,客户端也需要间隔一段时间更新一次元数据。
  • retry.backoff.ms(元数据的refreshBackoffMs变量,默认值为100毫秒):客户端多次发送元数据请求,需要等待一小段时间再发送元数据请求。

元数据的更新时间主要与后两项配置有关。refreshBackoffMs变量用来计算允许更新的时间(timeToAllowUpdate),metadataExpireMs变量用来计算失效的时间(timeToExpire)。默认情况下,retry.backoff.ms等于100毫秒时,允许更新的时间一般小于0timeToNextUpdate()方法主要取决于失效的时间,下面列举了几种不同的场景。

  • 需要更新元数据时,失效时间等于0,表示需要立即更新元数据。
  • 当前时间在失效阈值的范围内,即上次更新时间加上失效阈值大于当前时间,失效时间等于上次更新时间加上失效阈值,再减去当前时间,结果会大于0,表示再过指定的失效时间才需要更新元数据。
  • 当前时间超过失效阈值的范围,即当前时间大于上次更新时间加上失效阈值,失效时间也设置为0。

注意:元数据对象的metadataExpireMsrefreshBackoffMs都是固定的值,timeToNextUpdate()方法依赖needUpdate和上次的更新时间,来计算下次更新元数据的时间。当调用元数据对象的requestUpdate()方法和update()方法时,才会分别更新needUpdate和上次的更新时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final class Metadata {
private final long refreshBackoffMs; // 更新失败时,下一次更新的补偿时间
private final long metadataExpireMs; // 每隔多久,更细一次元数据
private int version; // 版本号,当更新一次元数据,版本号加一
private long lastRefreshMs; // 上一次更新的时间,更新失败也会更新这个值
private long lastSuccessfulRefreshMs; // 上一次成功更新的时间
private Cluster cluster; // 集群的配置信息
private boolean needUpdate; // 是否需要更新元数据

public synchronized int requestUpdate() {
this.needUpdate = true; // 需要更新元数据
return this.version; // 返回当前的版本号,这个版本号是旧的
}
public synchronized boolean updateRequested(){return this.needUpdate;}

public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(
this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate=this.lastRefreshMs+this.refreshBackoffMs-nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}

public synchronized void awaitUpdate(int lastVersion,long maxWaitMs){
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {
if (remainingWaitMs != 0) wait(remainingWaitMs); // 等待
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs) throw new TimeoutException("failed")
remainingWaitMs = maxWaitMs - elapsed;
}
}
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.version += 1;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
for(Listener listener:listeners) listener.onMetadataUpdate(cluster);
this.cluster = cluster;
notifyAll(); // 通知
}
}

元数据对象的每个方法都加上了synchronized关键字,即使有多个客户端线程(用户线程)使用同一个生产者示例,并且访问相同的元数据对象,也是线程安全的。awaitUpdate()方法只会被生产者在的waitOnMetadata()方法调用。如果元数据的版本号(this.version)小于上一次的版本号(lastVersion),用户线程会通过wait()进入阻塞状态。调用元数据对象的update()方法,更新版本号,并通知用户线程退出awaitUpdate()方法。

元数据对象除了会更新元数据内容,还有一个保存集群配置的Cluster对象。Cluster保存了分区信息相关的变量,分区信息包括分区的主副本、ISRAR等内容。第二章生产者客户端发送消息时,利用“分区信息”为消息指定分区编号。本章从控制器、LeaderAndIsr请求,最后到Metadata请求,与第二章的“分区信息”互相呼应,算是画上了一个圆满的句号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class Cluster { // 集群配置
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
}
public class PartitionInfo { // 分区信息
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
}

3. 元数据更新的日志与实例

下面举例了生产者发送两条消息,为了模拟发送第一条消息时,生产者必须要等待元数据更新完成。下面的代码会在第一条消息发送完成后等待一秒钟才发送第二条消息。

1
2
3
4
5
6
7
8
9
10
11
12
// 生产者发送消息的示例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
log.info("start producer client app");
Thread.sleep(1000*10);

log.info("start send #1 message...");
producer.send(new ProducerRecord<String, String>("test", "m1"));
log.info("sending #1 message end..");
Thread.sleep(1000); // 等待一秒才发送第二条消息
log.info("start send #2 message...");
producer.send(new ProducerRecord<String, String>("test", "m2"));
log.info("sending #2 message end..");

为了更清晰地理解元数据、NetworkClient一些变量的含义,在必要的地方加上了日志(比如needUpdatemetadataTimeout等)。将日志级别调成TRACE后,更详细的日志如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
[18:00:04,596] TRACE Starting the Kafka producer
[18:00:04,939] DEBUG Updated cluster metadata version 1 to Cluster(
nodes = [localhost:9092 (id: -1 rack: null)], partitions = [])
[18:00:05,077] DEBUG Starting Kafka producer I/O thread.
[18:00:05,079] INFO [NetworkClient] select timeout:30000
[18:00:05,094] DEBUG Kafka producer started
[18:00:05,094] INFO start producer client app (kafka.examples.Producer)

[18:00:15,103] INFO start send #1 message... (kafka.examples.Producer)
[18:00:15,109] TRACE Requesting metadata update for topic test. 【1】
[18:00:15,109] TRACE Waking up Sender thread for metadata update.
[18:00:15,111] INFO [Metadata] awaitUpdate begin...
[18:00:15,117] INFO [Sender] readyNodes:0
[18:00:15,117] INFO [Metadata] needUpdate: true
[18:00:15,118] INFO [MetadataUpdater] metadataTimeout: 0
[18:00:15,118] DEBUG Initialize connection to node1 for send metadata request
[18:00:15,118] DEBUG Initiating connection to node1 at localhost:9092. 【2】
[18:00:15,241] INFO [NetworkClient] metadataTimeout:0
[18:00:15,241] INFO [NetworkClient] select timeout:0
[18:00:15,246] DEBUG Completed connection to node -1

[18:00:15,246] INFO [Sender] readyNodes:0
[18:00:15,246] INFO [Metadata] needUpdate: true
[18:00:15,247] INFO [MetadataUpdater] metadataTimeout: 0
[18:00:15,443] DEBUG Sending metadata request {topics=[test]} to node -1 【3】
[18:00:15,444] INFO [NetworkClient] metadataTimeout:0
[18:00:15,444] INFO [NetworkClient] select timeout:0
[18:00:15,448] INFO [Sender] readyNodes:0
[18:00:15,448] INFO [Metadata] needUpdate: true
[18:00:15,449] INFO [NetworkClient] metadataTimeout:2147483647
[18:00:15,449] INFO [NetworkClient] select timeout:30000

[18:00:15,628] DEBUG Updated cluster metadata version 2 to Cluster( 【4】
nodes = [192.168.199.101:9092 (id: 0 rack: null)], partitions = [
Partition(topic=test,partition=1,leader=0,replicas=[0,],isr=[0,],
Partition(topic=test,partition=0,leader=0,replicas=[0,],isr=[0,],
Partition(topi =test,partition=2,leader=0,replicas=[0,],isr=[0,]])
[18:00:15,628] INFO [Metadata] awaitUpdate end...

[18:00:15,628] INFO [Sender] readyNodes:0
[18:00:15,628] INFO [Metadata] needUpdate: false
[18:00:15,629] INFO [NetworkClient] metadataTimeout:299839
[18:00:15,629] INFO [NetworkClient] select timeout:30000

[18:00:15,636] TRACE Sending record ProducerRecord(topic=test, partition=null,
key=null, value=m1, timestamp=null) with callback null to topic test_0 【5】
[18:00:15,636] TRACE Allocating a new 16384 byte message buffer for test_0
[18:00:15,700] TRACE Waking up the sender, test_0 is full or a new batch 【6】
[18:00:15,700] INFO sending #1 message end.. (kafka.examples.Producer)

[18:00:15,702] INFO [accumulator] batch: test-0
[18:00:15,702] INFO [accumulator] ready expired: true
[18:00:15,702] INFO [Metadata] needUpdate: false
[18:00:15,703] DEBUG Initiating connection to node 0 at localhost:9092. 【7】
[18:00:15,704] INFO [Sender] readyNodes:0
[18:00:15,705] INFO [NetworkClient] metadataTimeout:299767
[18:00:15,705] INFO [NetworkClient] select timeout:30000
[18:00:15,706] DEBUG Completed connection to node 0

[18:00:15,706] INFO [accumulator] batch: test-0
[18:00:15,707] INFO [accumulator] ready expired: true
[18:00:15,707] INFO [Metadata] needUpdate: false
[18:00:15,707] INFO [accumulator] drained batch: test-0
[18:00:15,718] TRACE Nodes with data ready to send: [localhost:9092]
[18:00:15,719] TRACE Created 1 produce requests: [ClientRequest( 【8】
expectResponse=true,callback=o.a.k.c.p.internals.Sender$1@6008d3ea,
request=RequestSend(header={.}, body={acks=1,timeout=30000,
topic_data=[{topic=test,data=[{partition=0,
record_set=HeapByteBuffer[pos=0 lim=36 cap=16384]
}]}]}), createdTimeMs=1494151215706, sendTimeMs=0)]
[18:00:15,719] INFO [Sender] readyNodes:1
[18:00:15,720] INFO [NetworkClient] poll timeout:0
[18:00:15,720] INFO [NetworkClient] metadataTimeout:299761
[18:00:15,720] INFO [NetworkClient] select timeout:0
[18:00:15,720] INFO [Sender] readyNodes:0
[18:00:15,721] INFO [Metadata] needUpdate: false
[18:00:15,721] INFO [NetworkClient] metadataTimeout:299747
[18:00:15,721] INFO [NetworkClient] select timeout:30000

[18:00:15,737] TRACE Received produce response from node 0 【9】
[18:00:15,740] TRACE Produced messages to test-0 with base offset offset 11.
[18:00:15,741] INFO [Sender] readyNodes:0
[18:00:15,741] INFO [Metadata] needUpdate: false
[18:00:15,741] INFO [NetworkClient] metadataTimeout:299726
[18:00:15,741] INFO [NetworkClient] select timeout:30000

[18:00:16,705] INFO start send #2 message... (kafka.examples.Producer)
[18:00:16,706] TRACE [KafkaProducer] waitedOnMetadataMs: 0
[18:00:16,706] TRACE Sending record ProducerRecord(topic=test, partition=null,
key=null, value=m2, timestamp=null) with callback null to test_2
[18:00:16,706] TRACE Allocating a new 16384 byte message buffer for test_2
[18:00:16,706] TRACE Waking up the sender, test_2 is full or a new batch
[18:00:16,706] INFO sending #2 message end.. (kafka.examples.Producer)

如图1所示,将上面日志中一些重要的时间点与事件抽取出来,具体步骤如下。

  1. 第一次发送消息,唤醒发送线程,等待元数据更新完成;
  2. 初始化网络连接,为发送元数据请求做准备;
  3. 生产者发送元数据请求;
  4. 收到元数据响应,更新元数据对象,步骤(1)等待元数据更新完成正式结束;
  5. 生产者发送消息的流程接着执行,为消息指定分区,追加消息到记录收集器;
  6. 创建新的批记录(RecordBatch),再次唤醒发送线程;
  7. 从记录收集器中获取准备好的目标代理节点,并初始化网络连接,准备发送生产请求;
  8. 从记录收集器中再次获取准备好的节点,并获取需要发送的数据,创建生产请求;
  9. 发送生产请求,并等待响应结果,一批记录(实际上只有一条记录)的发送流程结束。

7

图1 生产者发送消息与更新元数据的过程

客户端线程模型(Thread Model)

Kafka作为一个流式数据平台,对开发者提供了三种客户端:生产者/消费者、连接器、流处理。本文着重分析这三种客户端的线程模型。

消费者的线程模型

0.8版本以前的消费者客户端会创建一个基于ZK的消费者连接器,一个消费者客户端是一个Java进程,消费者可以订阅多个主题,每个主题也可以多个线程。为了让消息在多个节点被分布式地消费,提高消息处理的吞吐量,Kafka允许多个消费者订阅同一个主题,这些消费者需要满足“一个分区只能被一个消费者中的一个线程处理”的限制条件。通常,我们会将同一份相同业务处理逻辑的应用程序部署在不同机器上,并且指定一个消费组编号。当不同机器上的消费者进程启动后,所有这些消费者进程就组成了一个逻辑意义上的消费组。

消费组中的消费者数量是动态变化的,当有新消费者加入消费组,或者旧消费者离开消费组,都会触发基于ZK的消费组“再平衡”操作。当“再平衡”操作发生时,每个消费者都会在客户端执行分区分配算法,然后从全局的分配结果中获取属于自己的分区。它的缺点是消费者会和ZK产生频繁的交互,造成ZK集群的压力过大,并且容易产生羊群效应和脑裂等问题。

在0.8版本以后,Kafka重新设计了客户端,并且引入了“协调者”和“消费组管理协议”。新的消费者将“消费组管理协议”和“分区分配策略”进行了分离。协调者负责消费组的管理,而分区分配则会在消费组的一个主消费者中完成。采用这种方式,每个消费者都需要发送下面两种请求给协调者。

  • 加入组请求:协调者收集消费组的所有消费者,并选举一个主消费者执行分区分配工作。
  • 同步组请求:主消费者完成分区分配,由协调者将分区的分配结果传播给每个消费者。

新版本的消费者客户端引入了一个客户端协调者的抽象类,它的实现除了消费者的协调者,还有一个连接器的实现。

连接器的线程模型

Kafka连接器的出现标准化了Kafka与各种外部存储系统的数据同步。用户开发和使用连接器就变得非常简单,只需要在配置文件中定义连接器,就可以将外部系统的数据导入Kafka或将Kafka数据导出到外部系统。如图1所示,中间部分都是Kafka连接器的内部组件,包括源连接器(Source Connector)和目标连接器(Sink Connector)。

1

图1 Kafka连接器的源连接器与目标连接器

Kafka连接器的单机模式会在一个进程内启动一个Worker以及所有的连接器和任务。分布式模式的每个进程都有一个Worker,而连接器和任务则分别运行在各个节点上。图2列举了连接器和任务在不同Worker上的四种分布方式:

  1. 一个Worker,一个源任务、一个目标任务
  2. 一个Worker,两个源任务、两个目标任务
  3. 两个Worker,两个源任务、两个目标任务
  4. 三个Worker,两个源任务、两个目标任务

2

图2 分布式模式的Kafka连接器集群

分布式模式下,不同Worker进程之间的协调工作类似于消费者的协调。消费者通过协调者获取分配的分区,Worker也会通过协调者获取分配的连接器与任务。如图3所示,消费者客户端和Worker客户端为了加入到组管理中,分别通过客户端的协调者对象来和服务端的消费组协调者(GroupCoordinator)通信。

8

图3 消费者和Worker的工作都是通过协调者分配的

流处理的线程模型

Kafka流处理的工作流程简单来看分成三个步骤:消费者读取输入分区的数据、流式地处理每条数据、生产者将处理结果写入输出分区,这里面步骤1也充分利用了“消费组管理协议”。Kafka流处理的输入数据源基于具有分布式分区模型的Kafka主题,它的线程模型主要由下面三个类组成:

  • 流实例(KafkaStreams):通常一个节点(一台机器)只运行一个流实例。
  • 流线程(StreamThread):一个流实例可以配置多个流线程。
  • 流任务(StreamTask):一个流线程可以运行多个流任务,根据输入主题的分区数确定任务数。

如图4所示,输入主题有六个分区,Kafka流处理总共就会产生六个流任务。流实例可以动态扩展,流线程的个数也可以动态配置。图中一共有三个流线程,则每个流线程会有两个流任务,每个流任务都对应输入主题的一个分区。

4

图4 Kafka流处理的线程模型

Kafka的流处理框架使用并行的线程模型处理输入主题的数据集,这种设计思路和Kafka的消费者线程模型非常类似。消费者分配到订阅主题的不同分区,流处理框架的流任务也分配到输入主题的不同分区。如图5所示,输入主题1的分区P1和输入主题2的分区P1分配给流线程1的流任务,输入主题1的分区P2和输入主题2的分区P2分配给流线程2的流任务。流处理相比消费者,还会将拓扑的计算结果写到输出主题。

5

图5 消费者模型与流处理的线程模型

消费者和流处理的故障容错机制也是类似的。如图6所示,假设消费者2进程挂掉,它所持有的分区会被分配给同一个消费组中的消费者1,这样消费者1会分配到订阅主题的所有分区。对于流处理而言,如果流线程2挂掉了,流线程2中的流任务会分配给流线程1。即流线程1会运行两个流任务,每个流任务分配的分区仍然保持不变。

6·

图6 消费者与流处理的故障容错机制

小结

Kafka客户端抽象出来的的“组管理协议”充分运用在消费者、连接器、流处理三个使用场景中。客户端中的消费者、连接器中的工作者、流处理中的流进程都可以看做“组”的一个成员。当增加或减少组成员时,在这个协议的约束下,每个组成员都可以获取到最新的任务,从而做到无缝的任务迁移。一旦理解了“组管理协议”,对于理解Kafka的架构设计是很有帮助的。

即席查询(Interactive Query)

EOS事务(Transaction)

参考文档


文章目录
  1. 1. 客户端的元数据对象
    1. 1.1. 1. 客户端轮询与元数据更新器
    2. 1.2. 2. 元数据对象
    3. 1.3. 3. 元数据更新的日志与实例
  2. 2. 客户端线程模型(Thread Model)
    1. 2.1. 消费者的线程模型
    2. 2.2. 连接器的线程模型
    3. 2.3. 流处理的线程模型
    4. 2.4. 小结
  3. 3. 即席查询(Interactive Query)
  4. 4. EOS事务(Transaction)