Apache Kafka JIRA

Apache Kafka JIRA

https://issues.apache.org/jira/browse/KAFKA

2944

https://github.com/apache/kafka/pull/723

最后来分析KafkaBasedLogreadToLogEnd()方法如何读取到日志的最末尾,具体步骤如下。

  1. 定位到分区的最末尾,通过消费者的seekToEnd()只是声明了重置策略为LATEST,并没有真正定位。客户端还需要调用消费者的轮询方法,才能保证发送拉取请求,并更新消费者的当前位置;
  2. 比较消费者的当前位置(endOffset)与上一次还没定位到最末尾时的位置(startOffset),如果前者大于后者,客户端需要调用seek()方法定位到旧的位置(startOffset);
  3. 如果步骤(2)回退到旧的位置,需要调用轮询方法消费消息,直到当前位置是分区的最末尾位置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class KafkaBasedLog<K, V> { 
private void readToLogEnd() { // 读取到日志的最末尾
// 1. 定位到分区的最末尾(logEndOffset)
Set<TopicPartition> assignment = consumer.assignment();
Map<TopicPartition, Long> offsets = new HashMap<>();
for (TopicPartition tp : assignment) {
long offset = consumer.position(tp); // 获取当前的消费位置
offsets.put(tp, offset); // 暂存起来
consumer.seekToEnd(singleton(tp)); // 定位到最末尾的位置
}
// 2. 回退到开始位置
Map<TopicPartition, Long> endOffsets = new HashMap<>();
try {
poll(0);
} finally {
for (TopicPartition tp : assignment) {
long startOffset = offsets.get(tp); // 旧的消费位置
long endOffset = consumer.position(tp); // 当前的偏移量等于最末尾的位置
if (endOffset > startOffset) {
endOffsets.put(tp, endOffset);
consumer.seek(tp, startOffset);
}
}
}
// 3. 开始读取,直到读取到分区的最末尾位置
while (!endOffsets.isEmpty()) {
poll(Integer.MAX_VALUE);
Iterator it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
if (consumer.position(entry.getKey()) < entry.getValue()) break;
else it.remove();
}
}
}
}

客户端调用readToLogEnd()之前,如果还有新的消息没有消费,当调用readToLogEnd()方法时,可以保证客户端会完全消费新写入的消息。如图8-31(左图)所示,偏移量从36是新写入的消息(比如一个连接器配置、两个任务配置、一个提交日志的配置,总共四条消息)。客户端为了读取到分区最近的位置,先定位到最近的位置(7)。注意这时不能立即调用轮询方法,因为如果客户端在最近的位置,调用轮询不会有任何的新消息。客户端应该再定位到上次消费的位置(3),然后才能调用轮询方法,直到消费者的当前位置大于等于最近位置时,就说明客户端读取到了日志的最末尾。右图中,假设客户端已经消费到了日志的最末尾,那么调用readToLogEnd()方法会立即返回。

8

图8-31 读取到分区最末尾的位置

注意:上面的readToLogEnd()方法用到了Kafka新消费者的三个方法。(1):postion()方法返回消费者当前的位置,即消费进度,这个值比客户端真正消费过的位置要大1。比如客户端消费了两条消息,postion()方法的返回值就等于3。(2):seekToEnd(tp)方法定位到日志的最末尾,同样,这个值也是实际的偏移量加上1(即nextOffset)。比如分区实际只有六条消息,最末尾的偏移量等于7。(3):seekTo(tp,offset)方法定位到日志的指定位置。客户端定位到指定位置后,下一步一般是要调用轮询方法,并从这个位置拉取消息。所以如果客户端已经消费了偏移量等于12的两条消息,定位的位置是3,表示要拉取第三条的消息。不能定位到2,那样的话,从位置2开始拉取消息,就重复拉取了第二条消息。

2500/2076/KIP-17


文章目录
  1. 1. 2944
  2. 2. 2500/2076/KIP-17