Kafka源码分析 Consumer(3) offset

Kafka Consumer offset

导读

前面我们分析了Consumer的Fetcher抓取数据流程,不过抓取数据之前要先获取到fetchOffset.这样Fetcher
才能知道这一次抓取要从Partition的哪个fetchOffset开始抓取数据.同时消费者会定时地commitOffsets.
本节包含了consumer的以下内容(这里我们的分析步骤是consumer之后直接跟上对应的服务端处理逻辑):

  • 创建OffsetManagerGroupCoordinator节点的offsetChannel,发送的请求是GroupCoordinatorRequest
  • 消费者的commitOffsets利用上一步的offsetsChannel,发送OffsetCommitRequest请求,保存offset
  • 消费者抓取数据时通过fetchOffsets也是利用offsetsChannel,发送OffsetFetchRequest请求,获取offset

k_consumer_offsets

OffsetChannel对应的OffsetManager协调节点,负责管理消费组中的消费者.
而消费者会和协调节点通信. 通信协议就包括fetchOffset和commitOffset.

消费者的fetch和commit offset都是和GroupCoordinator节点进行通信的. 为什么需要一个GroupCoordinator?
GroupCoordinator实际上是Consumer Group Coordinator. 每个协调节点会管理集群中的一部分消费组.

OffsetManager

在创建ZookeeperConsumerConnector时确保OffsetManager管理的通道(offsetsChannel)被创建出来.

offset可以存储在ZK中,或者以topic普通消息的形式存储于kafka中.如果存储在kafka的topic中,
则需要使用offsetChannel,这种情况和普通的topic消息类似,因此需要建立到Broker的连接.
而如果存储在ZK上,是不需要使用Socket连接走TCP流的,直接使用ZKUtils获取数据就可以了.

k_offsetStorage

1
2
3
4
5
6
7
// Blocks until the offset manager is located and a channel is established to it.
private def ensureOffsetManagerConnected() {
if (config.offsetsStorage == "kafka") {
if (offsetsChannel == null || !offsetsChannel.isConnected)
offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkUtils, ..)
}
}

channelToOffsetManager(consumer)

生产者请求将消息发送给Partition的Leader Broker.消费者抓取数据也通过LeaderFinderThread找到leaderBroker.
但这里的offset请求却是任意一个Broker(为什么可以这么做?).而我们要的是coordinator,即OffsetManager所在的节点.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def channelToAnyBroker(zkUtils: ZkUtils, socketTimeoutMs: Int = 3000) : BlockingChannel = {
var channel: BlockingChannel = null
var connected = false
while (!connected) {
val allBrokers = zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT)
Random.shuffle(allBrokers).find { broker =>
channel = new BlockingChannel(broker.host, broker.port, ..)
channel.connect()
true
}
connected = if (channel == null) false else true
}
channel
}

上面的queryChannel只是一个临时的连接通道而已,GroupCoordinatorRequest请求发送给queryChannel,
会返回OffsetManager协调节点的真正位置,offsetsChannel应该是GroupCoordinatorResponse的节点.

k_channelToOffsetManager

注意这里并没有当前consumer的信息,即offset是以组为级别的,而不是由每个consumer自己维护各自的offset.
这么做的目的是: Broker的Partition记录的offset是针对消费组,而不是消费者.因为offset记录在消费组上,
即使某个消费者挂掉,Partition的offset并不会出错,平衡器可以将Partition调度给同一个消费组的其他消费者.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def channelToOffsetManager(group: String, zkUtils: ZkUtils, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {
var queryChannel = channelToAnyBroker(zkUtils)
var offsetManagerChannelOpt: Option[BlockingChannel] = None
while (!offsetManagerChannelOpt.isDefined) {
var coordinatorOpt: Option[BrokerEndPoint] = None
while (!coordinatorOpt.isDefined) {
// 如果上一步没有准备好queryChannel,则再随机选一个Broker
if (!queryChannel.isConnected) queryChannel = channelToAnyBroker(zkUtils)
// 根据消费组名称, 发送消费组的协调者请求. 一个消费组对应一个协调者
queryChannel.send(GroupCoordinatorRequest(group))
// 返回的是消费者的元数据,结果数据应当包含协调者是谁
val response = queryChannel.receive()
val consumerMetadataResponse = GroupCoordinatorResponse.readFrom(response.payload())
if (consumerMetadataResponse.errorCode == Errors.NONE.code) coordinatorOpt = consumerMetadataResponse.coordinatorOpt
else Thread.sleep(retryBackOffMs) // 如果出错了,继续发送请求,直到成功找到一个协调者
}

val coordinator = coordinatorOpt.get
// 如果协调者就在queryChannel上, 就可以直接使用之前随机选择的通道,因为我们最终是要连接到协调者
if (coordinator.host == queryChannel.host && coordinator.port == queryChannel.port) {
offsetManagerChannelOpt = Some(queryChannel)
} else {
// 如果协调者不是之前建立的, 那么就建立到协调者的连接,并关闭之前建立的随机通道
var offsetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port, ..)
offsetManagerChannel.connect()
offsetManagerChannelOpt = Some(offsetManagerChannel)
queryChannel.disconnect()
}
}
offsetManagerChannelOpt.get
}

GroupCoordinatorRequest请求通过Socket方式发送到KafkaServer端,在KafkaApis被处理:
将coordinator节点信息封装到response中,返回给客户端,然后让客户端再去连接这个coordinator.

GroupCoordinator和ReplicaManager一样,一个KafkaServer都只有一个这样的对象.都会传递给KafkaApis.

handleGroupCoordinatorRequest(server)

GroupCoordinator为什么既能当做Coordinator,也能当做OffsetManager呢?
消费组会管理组内的消费者(充当协调者),同时offset是针对消费组级别的,所以也是Offset的管理者.
每个KafkaServer只有一个GroupCoordinator,但是它会管理多个消费组,因为一个Broker可以注册多个消费组.

每个GroupCoordinatorRequest代表的是一个消费组的协调者角色.即使客户端连接的是不同的coordinator(随机),
通过partitionFor得到的partition也都是一样的.而topicMetadata也是固定的,这就可以根据partitionFor的
返回值去topicMetadata的partitionsMetadata中找到对应Partition的Leader Replica.

k_handleGroupCoordinatorRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
val responseHeader = new ResponseHeader(request.header.correlationId)
// 这个coordinator是GroupCoordinator,包括了Offset和GroupMetadata.
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)

// get metadata (and create the topic if necessary) 从metadataCache中获取,那么什么地方放入cache中呢?
val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
// topicMetadata由多个partitionMetadata组成
val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap {
// partition有Leader, 对应的就是我们要找的协调节点
partitionMetadata => partitionMetadata.leader
}
val responseBody = coordinatorEndpoint match {
case Some(endpoint) => new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
}
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}

TopicMetadata是从metadataCache共享缓存map中获得.集群中的每个Broker都维护了相同的缓存,所以前面发送请求时可以选择任何一个Broker!

A cache for the state (e.g., current leader) of each partition. This cache is updated through
UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.

MetadataCache

MetadataCache保存的是topic-partition-PartitionStateInfo,其中get方法根据topic,获取对应的PartitionsMetadata.
所以PartitionStateInfo和PartitionMetadata是有一定联系的.比如都有AR,leader,ISR等.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private[server] class MetadataCache(brokerId: Int) extends Logging {
private val cache = new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()

def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol) = {
val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
for (topic <- topicsRequested) {
val partitionStateInfos = cache(topic)
// 将partitionStateInfos中的每个元素都转换成PartitionMetadata,最后再放到TopicMetadata中
val partitionMetadata = partitionStateInfos.map {
case (partitionId, partitionState) =>
val replicaInfo: Seq[BrokerEndPoint] = partitionState.allReplicas.map(aliveBrokers.getOrElse(_, null))
.filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol))
val leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader
val isr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.isr
var leaderBrokerInfo: Option[Broker] = aliveBrokers.get(leader)
var leaderInfo: Option[BrokerEndPoint] = Some(leaderBrokerInfo.get.getBrokerEndPoint(protocol))
var isrInfo: Seq[BrokerEndPoint] = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).map(_.getBrokerEndPoint(protocol))
new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, Errors.NONE.code)
}
topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
}
topicResponses
}

不过我们更关心updateCache的调用时机: ControllerChannelManager.sendRequestsToBrokers发送了UPDATE_METADATA_KEY,
KafkaApis.handleUpdateMetadataRequest->replicaManager.maybeUpdateMetadataCache->metadataCache.updateCache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  def addOrUpdatePartitionInfo(topic: String, partitionId: Int, stateInfo: PartitionStateInfo) {
cache.get(topic) match {
case Some(infos) => infos.put(partitionId, stateInfo)
case None => {
val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo]
cache.put(topic, newInfos)
newInfos.put(partitionId, stateInfo)
}
}
}
def updateCache(updateMetadataRequest: UpdateMetadataRequest, brokerId: Int, stateChangeLogger: StateChangeLogger) {
aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(tp.topic, tp.partition)
} else addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
}
}
}

虽然MetadataCache的类变量brokerId以及updateCache也有brokerId,不过它们都没有被用到.
这里的MetadataCache只是为了找出group对应的partition的Leader节点(元数据),即协调节点.

下面总结了生产,消费,协调,更新元数据的请求和响应,以及附带的数据结构:

request data response result meaning
ProduceRequest partitionRecords ProduceResponse PartitionResponse.baseOffset 生产者提供消息,返回在Log中的offset
FetchRequest PartitionFetchInfo(offset,fetchSize) FetchResponse FetchResponsePartitionData.messages 拉取消息提供起始和大小,返回消息集
GroupCoordinatorRequest groupId GroupCoordinatorRespnse coordinatorOpt 获取消费组的协调节点,提供消费组id,返回协调节点
UpdateMetadataRequest PartitionStateInfos(leader,isr,ar) UpdateMetadataResponse - 更新元数据,提供Partition的元数据,保存到MetadataCache中

commitOffsets(consumer)

在初始化ZooKeeperConsumerConnector时剩下最后一步autoCommit了,自动提交offset.
要提交哪些partition取自于topicRegistry,它是在rebalance中为该consumer分配的partition.
消费者要提交offset,使用协调节点的offsetsChannel,发送TopicAndPartition->offset给Kafka.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def commitOffsets(isAutoCommit: Boolean) {
val offsetsToCommit = immutable.Map(topicRegistry.flatMap {
case (topic, partitionTopicInfos) => partitionTopicInfos.map {
case (partition, info) => TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
}
}.toSeq: _*)
commitOffsets(offsetsToCommit, isAutoCommit)
}
def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], isAutoCommit: Boolean) {
if (config.offsetsStorage == "zookeeper") {
offsetsToCommit.foreach {
case (topicAndPartition, offsetAndMetadata) => commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
}
} else {
val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId)
ensureOffsetManagerConnected()
offsetsChannel.send(offsetCommitRequest)
val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload())
offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) =>
val offset = offsetsToCommit(topicPartition).offset
commitOffsetToZooKeeper(topicPartition, offset)
}
}
}

如果offset的存储介质是ZK,则写到group-topic-partition对应的节点,值就是offset,表示这个partition的当前offset.

1
2
3
4
5
6
7
8
9
def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) {
// 如果checkpointedZkOffsets中的offset和要提交的相同,则不需要再写了,因为已经写过了
if (checkpointedZkOffsets.get(topicPartition) != offset) {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
//checkpointedZkOffsets在addPartitionTopicInfo中被放入.如果提交时刻值发生了变化,则以现在为准.
checkpointedZkOffsets.put(topicPartition, offset)
}
}

如果存储介质是kafka,则发送OffsetCommitRequest请求给Kafka服务端处理.

handleOffsetCommitRequest(server)

KafkaApis.handleOffsetCommitRequest会交给GroupCoordinator处理,包括准备和store.
注意至始至终OffsetCommitRequest中最重要的是OffsetAndMetadata,即offset的值.

k_handleOffsetCommitRequest

1
2
3
4
5
def handleCommitOffsets(groupId: String, memberId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
var delayedOffsetStore: Option[DelayedStore] = Some(
groupManager.prepareStoreOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback))
delayedOffsetStore.foreach(groupManager.store)
}

和fetchOffsets不同,commitOffsets有可能是个延迟的操作DelayedStore. 只有成功提交offset才会添加到offsetsCache.
所以GroupMetadataManager.putOffset是作为putCacheCallback的,只有执行成功后才会调用这个回调函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def prepareStoreOffsets(groupId: String, consumerId: String, generationId: Int,
offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicAndPartition, Short] => Unit): DelayedStore = {
// construct the message set to append
val messages = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => new Message(
key = offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition), //Group-Topic-Partition
bytes = offsetCommitValue(offsetAndMetadata) //offset
)}.toSeq
val offsetTopicPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId)) //Topic-Partition
val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))

// set the callback function to insert offsets into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
// construct the commit response status and insert the offset and metadata to cache if the append status has no error
val status = responseStatus(offsetTopicPartition)
offsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
}
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => (topicAndPartition, Errors.NONE.code) }
responseCallback(commitStatus) // finally trigger the callback logic passed from the API layer
}
DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
}

// Store offsets by appending it to the replicated log and then inserting to cache
def store(delayedAppend: DelayedStore) {
replicaManager.appendMessages(config.offsetCommitTimeoutMs.toLong, config.offsetCommitRequiredAcks, true,
delayedAppend.messageSet, delayedAppend.callback) // 传递需要保存的消息集,以及callback回调函数
}

这里的DelayedStore和DelayedProducer,DelayedFetch类似,不同的是它并不交给Purgatory管理.
另一个相同点是都把responseCallback作为其中的一个高阶函数,在操作完成后,才调用这个回调函数.

k_storeOffsets

这里的代码类似于handleProducerRequest.正常的responseCallback是直接传递给ReplicaManager.apendMessages,
不过这里由于添加了缓存的操作,所以在这之前会先操作缓存(putOffset),然后把新的callback传给appendMessages.

  • ② prepareStoreOffsets获取offsetsAndMetadataMessageSet
  • ④ replicaManager.appendMessages(messageSet)
  • ⑤ 保存offset到log中
  • ⑥ 回调putCacheCallback, ⑦ 先putOffset加入到offsetsCache
  • ⑧ 回调responseCallback
handle outer-callback inner-callback
KafkaApis.handleProducerRequest sendResponseCallback produceResponseCallback
KafkaApis.handleOffsetCommitRequest sendResponseCallback
GroupMetadataManager.prepareStoreOffsets putCacheCallback responseCallback

fetchOffsets(consumer)

offset是如何与抓取动作互相结合起来的? 是在ZKRebalancerListener进行rebalance时,根据分配的
PartitionAssignment获取partition的offset,用于构建PartitionTopicInfo,形成topicRegistry(也用于commit).

1
2
3
private def rebalance(cluster: Cluster): Boolean = {
val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
}

offset代表了Consumer在这个Partition上已经抓取到的最新数据偏移量,给定offset之后,Consumer就知道这次FetchRequest
应该从Partition的什么位置开始抓取数据.抓取数据的流程交给了FetcherThread去完成(offset是给fetcher做准备的).

k_offset_fetcher

通过Channel发送请求获取响应的流程都是一样的.请求发送到Channel,然后从Channel中接收数据.
由于在发送请求后要立即得到结果,所以这个Channel是阻塞类型的通道,如果结果没有返回,就阻塞等待.

k_req_resp_chann_server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def fetchOffsets(partitions: Seq[TopicAndPartition]) = {
if (config.offsetsStorage == "zookeeper") {
val offsets = partitions.map(fetchOffsetFromZooKeeper)
Some(OffsetFetchResponse(immutable.Map(offsets:_*)))
} else {
val offsetFetchRequest = OffsetFetchRequest(groupId = config.groupId, requestInfo = partitions, clientId = config.clientId)
var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None
while (!isShuttingDown.get && !offsetFetchResponseOpt.isDefined) {
offsetFetchResponseOpt = offsetsChannelLock synchronized {
ensureOffsetManagerConnected()
offsetsChannel.send(offsetFetchRequest)
val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().payload())
Some(offsetFetchResponse)
}
if (offsetFetchResponseOpt.isEmpty) Thread.sleep(config.offsetsChannelBackoffMs)
}
offsetFetchResponseOpt
}
}

如果offset存储在zk中,则读取group-topic-partition的节点值,否则发送OffsetFetchRequest到offsetChannel获取offset响应结果.

1
2
3
4
5
6
7
8
9
private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = {
val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
// 获取ZK中/consumers/[group_id]/offsets/[topic]/[partition_id]的offset值
val offsetString = zkUtils.readDataMaybeNull(dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
offsetString match {
case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong))
case None => (topicPartition, OffsetMetadataAndError.NoOffset)
}
}

handleOffsetFetchRequest(server)

ConsumerConnector是向offsetsChannel发送FetchRequest请求,而offsetsChannel会连接到消费组的coordinator节点.

1
2
3
4
5
6
7
8
9
10
11
def handleOffsetFetchRequest(request: RequestChannel.Request) {
val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
}
val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.TOPIC_AUTHORIZATION_FAILED.code)
val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap
val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
val response = OffsetFetchResponse(offsets ++ unauthorizedStatus, offsetFetchRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
}

GroupCoordinator根据group-topic-partition,返回topic-partition的offset.
GroupCoordinator.handleFetchOffsets交给GroupMetadataManager.getOfsets处理.
GroupMetadataManager会从offsetsCache获取对应的offsetAndMetadata.
而offsetsCache是在commitOffset时放入的.所以commit时放入cache,fetch时从cache获取.

1
2
3
4
5
// Fetch the current offset for the given group/topic/partition from the underlying offsets storage.
private def getOffset(key: GroupTopicPartition) = {
val offsetAndMetadata = offsetsCache.get(key)
OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
}

fetchOffsets和commitOffsets中用到的几个缓存数据结构:

metadata data cache
TopicMetadata topic,partitionsMetadata MetadataCache
PartitionMetadata partition_id,isr,leader,ar -
OffsetMetadata offset GroupMetadataManager.offsetsCache
GroupMetadata group_id,members,state GroupMetadataManager.groupsCache

小结

下表是本节重点分析的三个方法,以及在KafkaServer端的处理步骤:

ConsumerConnector Request KafkaApis GroupCoordinator GroupMetadataManager
channelToOffsetManager GroupCoordinatorRequest handleGroupCoordinatorRequest
commitOffsets OffsetCommitRequest handleOffsetCommitRequest handleCommitOffsets store
fetchOffsets OffsetFetchRequest handleOffsetFetchRequest handleFetchOffsets getOffsets

虽然High-level Consumer不需要客户端应用程序自己管理offset,但由于Kafka的消费者会自己管理offset,
所以对于高级API的offset被Kafka抽象出来,对于客户端是透明的.客户端不需要管理,并不代表这个工作不需要做.

目前为止Consumer分成了三大部分. 首先介绍high-level,然后分别介绍了消费者的抓取线程和offset管理.
因为high-level也需要抓取数据和管理offset,只不过这部分工作是内置的.而low-level则需要客户端代码自己实现.
所以如果是low-level consumer,则客户端代码要实现抓取和offset的管理,以及故障处理(比如寻找Leader等).
参考: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

consumer main
high level consumer ConsumerGroup
low level consumer SimpleConsumer

SimpleConsumer

客户端代码使用Low Level Consumer的一般流程如下:

  • 1) 查找到一个”活着”的Broker,并且找出每个Partition的Leader, 同时找出Follower(Replica Brokers)
  • 2) 定义好请求(OffsetRequest),该请求应该能描述应用程序需要哪些数据
  • 3) Fetch数据, 这一步会串联起来其他的几个步骤
  • 4) 识别Leader的变化,并对之作出必要的响应

1) 找出Partition的Leader和Follower

根据topic和partition,向SimpleConsumer发送TopicMetadataRequest,返回信息是TopicMetadata.
而TopicMetadata包括了topic对应的所有Partition的PartitionMetadata.
PartitionMetadata是Partition的元数据,所以含有Partition的Leader信息.

这个过程和LeaderFinderThread中分析的fetchTopicMetadata一样,不过现在客户端代码要自己实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop:
for (String seed : a_seedBrokers) {
SimpleConsumer consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop; // 只要找到一个,就退出所有循环
}
}
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
// PartitionMetadata除了Partition的Leader,还有follower.
m_replicaBrokers.add(replica.host());
}
}
return returnMetaData;
}

2) 获取上次/最近读取的offset

1
2
3
4
5
6
7
8
9
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}

getOffsetsBefore类似于ZKRebalanceListener.rebalance的fetchOffsets,以及ConsumerFetcherThread.SimpleConsumer的fetch

method invoker request response
fetchOffsets ZKRebalanceListener.rebalance OffsetFetchRequest OffsetFetchResponse
getOffsetsBefore getLastOffset OffsetRequest OffsetResponse
fetch ConsumerFetcherThread FetchRequest FetchResponse
1
2
3
def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).payload())
def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).payload())
def fetch(request: FetchRequest): FetchResponse = FetchResponse.readFrom(sendRequest(request).payload(), request.versionId)

k_offset_request

3) 读取数据(fetch)

上面通过getLastOffset的返回值是fetch时的fetchOffset(代码中的readOffset),然后构建FetchRequest,
向consumer发起fetch动作.这也和ConsumerFetcherThread的buildFetchRequest,fetch两个步骤是一样的.

ConsumerFetcherThread对fetch的结果调用processPartitionData将消息放入队列被ConsumerIterator消费.
而这里根据fetchResponse.messageSet消息集直接获取消息内容payload,不需要其他的线程完成获取消息的工作.

k_fetch_request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);

if (fetchResponse.hasError()) {
// ...
}

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) { // 申请的fetchOffset,返回结果中的消息集的offset应该也是大于fetchOffset的
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue; // 旧的offset,则不执行下面的逻辑, 继续下一条消息
}
// 每消费一条消息,就重置readOffset,这样下次fetch时,readOffst就是上一次最近的
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
a_maxReads--;
}

注意readOffset会请求最近读取的消息(messageAndOffset),计算出下一个offset的值(nextOffset).
通过这种方式可以处理完一整块消息,并且在下次fetch的时候告诉Kafka应该从哪里开始抓取新的数据.

在代码中还显示检查了读取出来的消息的offset不应该比抓取请求的readOffset小.这种情况对于消息的压缩是必要的.
如果kafka将消息集进行了压缩,抓取请求会返回一个完整的压缩块,即使抓取请求的offset并不是压缩块的开始位置.
(比如readOffset=10,但压缩块的offset范围从5-20,则返回的完整的压缩块的开始offset会比readOffset要小)
因此对于比readOffset要小的这部分消息,需要跳过,否则就会出现之前读取过的消息又被重复读取的情况.

由于SimpleConsumer获取数据并没有处理Leader Broker发生故障的情况,客户端要自己写代码来完成这个工作.

4) 错误处理

ConsumerFetcherThread的抽象父类AbstractFetcherThread在processFetchRequest对返回结果responseData中
的每个Partition的错误码分成:Errors.NONE和Errors.OFFSET_OUT_OF_RANGE.这里也要处理OffsetOutOfRangeCode

AbstractFetcherThread对于offset超出范围的处理是根据重置策略传递earliest或Latest得到newOffset.
而这里则是调用2) getLastOffset获取最近读取成功的offset, 所以传递的时间是Latest(last=latest).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (fetchResponse.hasError()) {
numErrors++;
short code = fetchResponse.errorCode(a_topic, a_partition);
if (numErrors > 5) break;

if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for the last element to reset
readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
continue;
}
numErrors = 0;

findNewLeader中也会调用findLeader,不过参数是m_replicaBrokers,说明findLeader会先调用一次,m_replicaBrokers才有值
之前调用过一次findLeader返回的leadBroker作为findNewLeader的参数,是a_oldLeader,findNewLeader返回新的Leader

因为fetch失败,说明是Leader Broker发生了故障,而Leader出错后,应该由Partition的follower Broker中的一个成为Leader.
所以在findNewLeader时,传递的是第一次findLeader得到的m_replicaBrokers.然后构造SimpleConsumer发送TopicMetadataRequest
如果在findNewLeader中无法连接包含当前Partition的任何一个replicaBrokers,就放弃并强制退出(抛出异常).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
// m_replicaBrokers表示的是Partition的follower brokers. 所以findLeader会先调用一次,这样m_replicaBrokers才有值
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time(i=0) through if the leader hasn't changed give ZooKeeper a second to recover
// second time(i=1), assume the broker did recover before failover, or it was a non-Broker issue
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) Thread.sleep(1000);
}
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}

客户端代码使用SimpleConsumer完成low-level的消息消费的流程图:

k_simple_consumer


文章目录
  1. 1. 导读
  2. 2. OffsetManager
    1. 2.1. channelToOffsetManager(consumer)
    2. 2.2. handleGroupCoordinatorRequest(server)
    3. 2.3. MetadataCache
  3. 3. commitOffsets(consumer)
    1. 3.1. handleOffsetCommitRequest(server)
  4. 4. fetchOffsets(consumer)
    1. 4.1. handleOffsetFetchRequest(server)
  5. 5. 小结
  6. 6. SimpleConsumer
    1. 6.1. 1) 找出Partition的Leader和Follower
    2. 6.2. 2) 获取上次/最近读取的offset
    3. 6.3. 3) 读取数据(fetch)
    4. 6.4. 4) 错误处理