Kafka消费者增量拉取

https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
https://www.cnblogs.com/huxi2b/p/9335064.html

简介

为了减少客户端每次拉取都要拉取全部的分区,增加了增量拉取分区的概念。

拉取会话(Fetch Session),类似于web中的session是有状态的,客户端的fetch也可以认为是有状态的。
这里的状态指的是知道“要拉取哪些分区”,如果第一次拉取了分区1,如果后续分区1没有数据,就不需要拉取分区1了。

FetchSession的数据结构如下:

1
2
3
4
5
6
case class FetchSession(val id: Int, // session编号是随机32位数字,防止未授权的客户端伪造数据
val privileged: Boolean,
val partitionMap: FetchSession.CACHE_MAP,
val creationMs: Long,
var lastUsedMs: Long,
var epoch: Int) // 自增

为了支持增量拉取,FetchSession需要维护每个分区的以下信息:

  • topic,partition Index(来自于TopicParttition)
  • maxBytes,fetchOffset,fetcherLogStartOffset(来自于最近一次的拉取请求)
  • highWatermark,localLogStartOffset(来自Leader的本地日志)

因为Follower或者Consumer发送拉取请求都是到Leader,所以FetchSession也是记录在Leader节点上的

FetchRequest Metadata(客户端的拉取请求元数据)

sessionId epoch 含义
0 -1 全量拉取(没有使用或者创建session时)
0 0 全量拉取(如果是新的会话,epoch从1开始)
$ID 0 关闭标识为$ID的增量拉取会话,并创建一个新的全量拉取
$ID $EPOCH 创建增量拉取

对于客户端而言,什么时候一个分区会被包含到增量的拉取请求中:

  • 客户端通知Broker,分区的maxBytes,fetchOffset,logStartOffset改变了
  • 分区在之前的增量拉取会话中不存在,客户端想要增加这个分区(拉取新的分区)
  • 分区在增量拉取会话中,客户端要移除

Fetch Response Metadata(服务端返回给客户端的sessionId)

sessionId 含义
0 之前没有创建过拉取回话
$ID 下一个请求会是增量的拉取请求,并且sessionId是$ID

服务端增加分区包含到增量的拉取响应中:

  • Broker通知客户端分区的hw或者brokerLogStartOffset变化了
  • 分区有新的数据

源码解析

Fetcher.java#sendFetches(): prepareFetchRequests创建FetchSessionHandler.FetchRequestData。
构建拉取请求通过FetchSessionHandler.Builder,builder.add(partition, PartitionData)会添加next:
即要拉取的分区。构建时调用Builder.build(),针对Full拉取:

1
2
3
4
5
6
7
// FetchSessionHandler.Builder.build()
if (nextMetadata.isFull()) { // epoch=0或者-1
sessionPartitions = next; // next为之前调动add添加的分区
next = null; // 本地full拉取,下次next=null
Map<TopicPartition, PartitionData> toSend = Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata);
}

收到响应结果后,通过sessionHandler,调用FetchSessionHandler.handleResponse()。
假设第一次是Full拉取,响应结果没有出错时,nextMetadata.isFull()仍然为true。
假设服务端创建了一个新的session(随机的唯一ID),客户端的Fetch SessionId会设置为服务端返回的sessionId,
并且epoch会增加1。这样下次客户端的拉取就不再是Full,而是Increment了(toSend, toForget分别表示要拉取的和不需要拉取的)。
同样假设服务端正常处理(这次不会生成新的session),客户端也正常处理响应,则sessionId不会增加,但是epoch会增加1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public boolean handleResponse(FetchResponse<?> response) {
if (response.error() != Errors.NONE) {
log.info("Node {} was unable to process the fetch request with {}: {}.",
node, nextMetadata, response.error());
if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) {
nextMetadata = FetchMetadata.INITIAL;
} else {
nextMetadata = nextMetadata.nextCloseExisting();
}
return false;
} else if (nextMetadata.isFull()) {
String problem = verifyFullFetchResponsePartitions(response);
if (problem != null) {
log.info("Node {} sent an invalid full fetch response with {}", node, problem);
nextMetadata = FetchMetadata.INITIAL;
return false;
} else if (response.sessionId() == INVALID_SESSION_ID) {
log.debug("Node {} sent a full fetch response{}",
node, responseDataToLogString(response));
nextMetadata = FetchMetadata.INITIAL;
return true;
} else {
// The server created a new incremental fetch session. 客户端正常处理全量拉取的响应
log.debug("Node {} sent a full fetch response that created a new incremental " +
"fetch session {}{}", node, response.sessionId(), responseDataToLogString(response));
nextMetadata = FetchMetadata.newIncremental(response.sessionId());
return true;
}
} else {
String problem = verifyIncrementalFetchResponsePartitions(response);
if (problem != null) {
log.info("Node {} sent an invalid incremental fetch response with {}", node, problem);
nextMetadata = nextMetadata.nextCloseExisting();
return false;
} else if (response.sessionId() == INVALID_SESSION_ID) {
// The incremental fetch session was closed by the server.
log.debug("Node {} sent an incremental fetch response closing session {}{}",
node, nextMetadata.sessionId(), responseDataToLogString(response));
nextMetadata = FetchMetadata.INITIAL;
return true;
} else {
// The incremental fetch session was continued by the server. 客户端正常处理增量拉取的响应结果
log.debug("Node {} sent an incremental fetch response for session {}{}",
node, response.sessionId(), responseDataToLogString(response));
nextMetadata = nextMetadata.nextIncremental();
return true;
}
}
}

服务端处理拉取请求时,会创建不同类型的FetchContext:

  • SessionErrorContext:拉取会话错误(比如epoch不相等)
  • SessionlessFetchContext:不需要拉取会话(旧版本)
  • IncrementalFetchContext:增量拉取
  • FullFetchContext:全量拉取
1
2
3
4
5
6
7
8
9
// KafkaApis.handleFetchRequest
val fetchContext = fetchManager.newContext(
fetchRequest.metadata,
fetchRequest.fetchData,
fetchRequest.toForget,
fetchRequest.isFromFollower)

// 针对不同的拉取上下文,分别更新并生成响应数据
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)

服务端的FetchManager创建Context时,如果FetchMetadata.isFull,再判断epoch=-1时,类型为SessionlessFetchContext,
否则(epoch=0)时,类型为FullFetchContext。如果!isFull(),必须保证session.epoch = FetchMetadata.epoch,否则类型为SessionErrorContext。
当!isFull且epoch相等时,先增加session.epoch(服务端的epoch,即为客户端下次拉取的epoch),然后返回类型为IncrementalFetchContext。

FullFetchContext更新响应数据,对于全量拉取,一般是新会话,所以需要更新缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
def createNewSession: FetchSession.CACHE_MAP = {
val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
updates.entrySet.asScala.foreach(entry => {
val part = entry.getKey
val respData = entry.getValue
val reqData = fetchData.get(part)
cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
})
cachedPartitions
}
val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower,
updates.size, () => createNewSession)
debug(s"Full fetch context with session id $responseSessionId returning " +
s"${partitionsToLogString(updates.keySet)}")
new FetchResponse(Errors.NONE, updates, 0, responseSessionId)
}

def maybeCreateSession(now: Long,
privileged: Boolean,
size: Int,
createPartitions: () => FetchSession.CACHE_MAP): Int =
synchronized {
// If there is room, create a new session entry.
if ((sessions.size < maxEntries) ||
tryEvict(privileged, EvictableKey(privileged, size, 0), now)) {
val partitionMap = createPartitions()
// 这里创建一个新的session时,同时也会增加epoch,从0到1
val session = new FetchSession(newSessionId(), privileged, partitionMap,
now, now, JFetchMetadata.nextEpoch(INITIAL_EPOCH))
debug(s"Created fetch session ${session.toString}")
sessions.put(session.id, session)
touch(session, now)
session.id
} else {
debug(s"No fetch session created for privileged=$privileged, size=$size.")
INVALID_SESSION_ID
}
}

总结下客户端和服务端的Full拉取过程:

1.客户端创建的拉取请求FetchMetadata.isFull(),初始时epoch=0
2.服务端创建的FetchContext类型为FullFetchContext
3.服务端创建新的Session(xxx),以及初始化epoch=1(0+1=1),并缓存
4.客户端收到服务端的FetchResponse,设置FetchMetadata.sessionId为response中的sessionId(xxx),并增加epoch=1(从步骤1的0+1=1)
5.客户端继续拉取,isFull=false,sessionId=xxx, epoch=1
6.服务端创建的FetchContext类型为IncrementalFetchContext(满足session.epoch=reqMetadata.epoch=1, isFull=false)
7.服务端增加epoch,设置session.epoch=2,为下次的拉取(对比epoch)做准备
8.对reqMetadata.epoch加1(=2)然后对比session.epoch(2),如果不等,返回错误码INVALID_FETCH_SESSION_EPOCH,相等返回NONE
9.客户端收到服务端的FetchResponse,设置epoch增加1(sessionId没有变化时,不需要更新sessionId,实际上设置的是nextMetadata对象)


文章目录
  1. 1. 简介
  2. 2. 源码解析