Kafka源码分析 Consumer(2) Fetcher

Kafka Consumer Fetcher

LeaderFinderThread

获取TopicMetadata,使用生产者模式发送一个需要响应结果的TopicMetadataRequest.
因为一个topic分成多个partition,所以一个TopicMetadata包括多个PartitionMetadata.
PartitionMetadata表示Partition的元数据,有Partition的Leader信息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], 
clientId: String, timeoutMs: Int, correlationId: Int = 0): TopicMetadataResponse = {
val props = new Properties()
props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
props.put("client.id", clientId)
props.put("request.timeout.ms", timeoutMs.toString)
val producerConfig = new ProducerConfig(props)
fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
}
def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint],
producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
correlationId, producerConfig.clientId, topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
// shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the same broker
val shuffledBrokers = Random.shuffle(brokers)
// 随机向一个Broker发送Producer请求,只要成功一次后,就算成功了
while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
fetchMetaDataSucceeded = true
} finally {
i = i + 1
producer.close()
}
}
topicMetadataResponse
}

LeaderFinderThread负责在Leader Partition可用的时候,将Fetcher添加到正确的Broker.
这里addFetcherForPartitions明明是为Partition添加Fetcher,为什么说是添加到Broker?
因为addFetcherForPartitions会创建FetcherThread是以(fetcherId,brokerId)为粒度的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
override def doWork() {
val leaderForPartitionsMap = new HashMap[TopicAndPartition, BrokerEndPoint]
while (noLeaderPartitionSet.isEmpty) cond.await() // No partition for leader election

val brokers = zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT)
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
brokers,config.clientId,config.socketTimeoutMs, correlationId.getAndIncrement).topicsMetadata
topicsMetadata.foreach { tmd => // TopicMetadata
tmd.partitionsMetadata.foreach { pmd => // PartitionMetadata
val topicAndPartition = TopicAndPartition(tmd.topic, pmd.partitionId)
// Partition存在Leader,而且存在于noLeaderPartitionSet中,则从noLeaderPartitionSet中移除,并且加到新的map中
if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
leaderForPartitionsMap.put(topicAndPartition, pmd.leader.get)
noLeaderPartitionSet -= topicAndPartition
}
}
}
// 要为Fetcher添加的Partitions是有Leader的: TopicAndPartition->BrokerInitialOffset
addFetcherForPartitions(leaderForPartitionsMap.map{
// (rebalance)partitionMap是topicRegistry的PartitionTopicInfo, 包含了fetchOffset和consumerOffset
case (topicAndPartition, broker) => topicAndPartition ->
BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
)
shutdownIdleFetcherThreads() // 上面为Fetcher添加partition,如果fetcher没有partition,则删除该fetcher.
Thread.sleep(config.refreshLeaderBackoffMs)
}
}

leaderForPartitionsMap的映射关系是TopicAndPartition到leaderBroker.
抓取数据要关心Partition的offset,从Partition的哪个offset开始抓取数据.

AbstractFetcherManager

每个消费者都有自己的ConsumerFetcherManager.fetch动作不仅只有消费者有,Partition的副本也会拉取Leader的数据.
createFetcherThread抽象方法对于Consumer和Replica会分别创建ConsumerFetcherThread和ReplicaFetcherThread.

k_abstract_fetcher

由于消费者可以消费多个topic的多个partition.每个TopicPartition组合都会有一个fetcherId.
所以fetcherThreadMap的key实际上是由(broker_id, topic_id, partition_id)共同组成的.
针对每个source broker的每个partition都会有拉取线程,即拉取是针对partition级别拉取数据的.

1
2
3
4
5
6
abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1) {
// map of (source broker_id, fetcher_id per source broker) => fetcher
private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
}
case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int)
case class BrokerAndInitialOffset(broker: BrokerEndPoint, initOffset: Long)

所以BrokerAndFetcherId可以表示Borker上某个topic的PartitionId,而BrokerAndInitialOffset是Broker级别的offset.
addFetcherForPartitions的参数中BrokerAndInitialOffset是和TopicAndPartition有关的,即Partition的offset.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// to be defined in subclass to create a specific fetcher
def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread

// 为Partition添加Fetcher是为Partition创建Fetcher线程. 因为Fetcher线程是用来抓取Partition的消息.
def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
// 根据broker-topic-partition分组. 相同fetcherId对应一个fetcher线程
val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
// 分组之后的value仍然不变,还是partitionAndOffsets. 不过因为是根据fetcherId,可能存在不同的partition有相同的fetcherId
for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
var fetcherThread: AbstractFetcherThread = null
fetcherThreadMap.get(brokerAndFetcherId) match {
case Some(f) => fetcherThread = f
case None =>
fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
fetcherThread.start // 启动刚刚创建的拉取线程
}

// 由于partitionAndOffsets现在已经是在同一个partition里. 取得所有partition对应的offset
fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map {
case (topicAndPartition, brokerAndInitOffset) => topicAndPartition -> brokerAndInitOffset.initOffset
})
}
}

partitionAndOffsets.groupBy的partitionAndOffsets是包括所有Broker-Topic-Partition的.
分组后,for循环中partitionsPerFetcher的partitionAndOffsets对相同fetcherId的partitions会被分到同一组.
在同一个for循环里的partitionAndOffsets有相同的fetcherId,可能会有多个partitionAndOffsets.

注意: 这里的fetcherId的计算方式是对topic进行hash,加上partition的结果后和numFetchers求余数.
所以可能存在相同topic,不同partition的fetcherId是相同的.
比如numFetchers=3,相同topic,partition=[2,5].fetcherId就是相同的(因为2%3=2,5%3=2).

入口参数partitionAndOffsets(Map)决定了每个TopicAndPartition只会有一个BrokerAndOffset(唯一性).
对于同一个TopicAndPartition只有一个BrokerAndOffset

topic partition broker offset
t1 1 A 10
t1 2 B 15
t2 1 A 8

根据(broker,topic,partition)对partitionAndOffsets分组:

topic partition broker offset
t1 1 A 10
t1 1 C 12
t1 2 B 15
t2 1 A 8

上面示例中相同的TopicAndPartition有多个Broker.但是这种结构首先就不满足topic-partition的唯一性!
实际上也不允许相同的partition分布在不同的broker上,而不同的partition分布在相同的broker上则是允许的.
因此对于相同broker,相同topic, 不同的partition则是可以满足的!而这正是fetcherId算法的计算方式.

topic partition broker offset
t1 1 A 10
t1 2 A 12

FetcherManager管理所有的FetcherThread,而每个FetcherThread则管理自己的PartitionOffset.
每个角色都各司其职,管理者不需要关心底层的Partition,而是交给线程来管理,因为线程负责处理Partition.
这就好比常见的Master-Worker架构,Master是管理者,负责管理所有的Worker进程,而Worker负责具体的Task.

k_manager_thread

AbstractFetcherThread addPartitions

Consumer和Replica的FetcherManager都会负责将自己要抓取的partitionAndOffsets传给对应的Fetcher线程.
Consumer交给LeaderFinderThread发现线程, Replica则是在makeFollowers时确定partitionOffsets.

1
2
3
AbstractFetcherManager.addFetcherForPartitions(Map<TopicAndPartition, BrokerAndInitialOffset>)  (kafka.server)
|-- LeaderFinderThread in ConsumerFetcherManager.doWork() (kafka.consumer)
|-- ReplicaManager.makeFollowers(int, int, Map<Partition, PartitionState>, int, Map<TopicPartition, Object>, MetadataCache) (kafka.server)

抓取线程也是用partitionMap缓存来保存每个TopicAndPartition的抓取状态.即管理者负责线程相关,而线程负责状态相关.

map class source invoker
fetcherThreadMap: BrokerAndFetcherId->AbstractFetcherThread AbstractFetcherManager LeaderFinderThread.addFetcherForPartitions
partitionMap: TopicAndPartition->PartitionTopicInfo ConsumerFetcherManager updateFetcher->startConnections
leaderForPartitionsMap: TopicAndPartition->leaderBroker LeaderFinderThread fetchTopicMetadata->partitionsMetadata
partitionMap: TopicAndPartition->PartitionFetchState AbstractFetcherThread addFetcherForPartitions->addPartitions
partitionMap: TopicAndPartition->PartitionTopicInfo ConsumerFetcherThread ConsumerFetcherManager.createFetcherThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Abstract class for fetching data from multiple partitions from the same broker.
abstract class AbstractFetcherThread(name: String, clientId: String,
sourceBroker: BrokerEndPoint, fetchBackOffMs: Int = 0, isInterruptible: Boolean = true) {
private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map

def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) {
partitionMapLock.lockInterruptibly()
try {
for ((topicAndPartition, offset) <- partitionAndOffsets) {
// If the partitionMap already has the topic/partition, then do not update the map with the old offset
if (!partitionMap.contains(topicAndPartition))
partitionMap.put(topicAndPartition,
if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(topicAndPartition))
else new PartitionFetchState(offset)
)}
partitionMapCond.signalAll()
} finally partitionMapLock.unlock()
}

总结下从消费者通过ZKRebalancerListener获取到分配给它的PartitionAssignment,转换为topicRegistry.
交给AbstractFetcherManager管理所有的FetcherThread. 同时有Leader发现线程获取Partition的Leader.
Partition的offset的源头是topicRegistry的fetchOffsets(即从offsetChannel获取),贯穿于整个流程.

k_fetcher_flow

FetchRequest & PartitionData

拉取请求指定要拉取哪个TopicAndPartition(offset来自于PartitionFetchState), PartitionData返回要拉取的消息集.

1
2
3
4
5
6
7
8
9
10
11
12
13
type REQ <: FetchRequest  //拉取请求的子类
type PD <: PartitionData //Partition数据,即拉取结果

trait FetchRequest { //定义了拉取接口
def isEmpty: Boolean
def offset(topicAndPartition: TopicAndPartition): Long
}
trait PartitionData {
def errorCode: Short
def exception: Option[Throwable]
def toByteBufferMessageSet: ByteBufferMessageSet
def highWatermark: Long
}

FetchRequest和PartitionData也有Consumer和Replica之分. ConsumerFetcherThread中的方法交给了underlying(类似于装饰模式).
来自于kafka.api的FetchRequest才是真正面向KafkaApis的请求.PartitionFetchInfo除了offset还有fetchSize.
RequestOrResponse是作为KafkaApis中数据传递的介质接口. 参数requestId表示了请求的类型(PRODUCE,FETCH等)

1
2
3
4
5
6
7
8
9
10
case class PartitionFetchInfo(offset: Long, fetchSize: Int)

case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
extends RequestOrResponse(Some(ApiKeys.FETCH.id))

k_fetchRequest_partitionData

ConsumerFetcherThread.buildFetchRequest

AbstractFetcherThread的doWork会抽象出buildFetchRequest,ConsumerFetcherThread会使用FetchRequestBuilder
build出来的是和kafka.api.FetchRequestBuilder相同文件下的kafka.api.FetchRequest,作为underlying.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class ConsumerFetcherThread(...){
private val fetchRequestBuilder = new FetchRequestBuilder().
clientId(clientId).replicaId(Request.OrdinaryConsumerId).maxWait(config.fetchWaitMaxMs).
minBytes(config.fetchMinBytes).requestVersion(kafka.api.FetchRequest.CurrentVersion)

// partitionMap来自于AbstractFetcherThread.addPartitions或者delayPartitions
protected def buildFetchRequest(partitionMap: collection.Map[TopicAndPartition, PartitionFetchState]): FetchRequest = {
partitionMap.foreach { case ((topicAndPartition, partitionFetchState)) =>
if (partitionFetchState.isActive)
fetchRequestBuilder.addFetch(topicAndPartition.topic,
topicAndPartition.partition, partitionFetchState.offset, fetchSize)
}
new FetchRequest(fetchRequestBuilder.build()) //构造器模式,在最后才进行build
}
}

class FetchRequestBuilder() {
private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]

def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))
this
}
def build() = {
val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement,
clientId, replicaId, maxWait, minBytes, requestMap.toMap)
requestMap.clear()
fetchRequest
}
}

buildFetchRequest的参数partitionMap的每个条目包含了FetchRequest需要从指定的offset开始抓取数据.
ConsumerFetcherThread的调用者是其抽象父类AbstractFetcherThread,而它的参数来自于自己的partitionMap.
它的partitionMap被放入是在addPartitions时,然后到了AbstractFetcherManager.addFetcherForPartitions,
接着其调用者是ConsumerFetcherThread内部类LeaderFetcherThread.这样就知道了数据的来龙去脉,形成一个闭环.

这里的设计思路是: 线程的抽象父类负责通过addPartitions添加到partitionMap中,它也负责获取partitionMap.
而子类(Consumer)不需要知道partitionMap是如何得来,只要能根据提供的partitionMap构建FetchRequest就可以.

k_partitionMap

AbstractFetcherThread doWork

AbstractFetcherThread定义了多个回调方法,它的doWork方法会构建FetchRequest,然后处理拉取请求.
因为拉取分为Consumer和Replica,所以将具体的拉取动作要留给子类自己实现.

前面buildFetchRequest的参数partitionMap的一种来源是LeaderFetcherThread.下面的processFetchRequest在拉取数据之后
更新这批数据最后一条消息的下一个offset作为partitionMap中Partition最新PartitionFetchState,
所以下一次调用buildFetchRequest构建新的FetchRequest时,PartitionFetchInfo的offset也是最新的.
总结下来:partitionMap在addPartitions中被添加.在doWork拉取到数据后被更新offset,表示最新拉取的位置

k_fetch_update_offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
abstract class AbstractFetcherThread(..){
private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map

// ① 根据partitionMap构建FetchRequest请求
protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ
// ② 根据抓取请求向Broker拉取消息
protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]
// ③ process fetched data 处理抓取到的数据
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD)
// ④ handle a partition whose offset is out of range and return a new fetch offset 处理超出范围的offset
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long
// ⑤ deal with partitions with errors, potentially due to leadership changes 处理出错的partitions
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])

// 拉取线程工作, doWork是被循环调用的,所以一旦partiionMap发生了变化(比如拉取一次之后),新的FetchRequest中的offset也发生了变化
override def doWork() {
val fetchRequest = inLock(partitionMapLock) {
val fetchRequest = buildFetchRequest(partitionMap)
// 如果没有拉取请求, 则延迟back-off毫秒后继续发送请求
if (fetchRequest.isEmpty) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
fetchRequest
}
if (!fetchRequest.isEmpty) processFetchRequest(fetchRequest)
}

private def processFetchRequest(fetchRequest: REQ) {
val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var responseData: Map[TopicAndPartition, PD] = Map.empty
responseData = fetch(fetchRequest)
responseData.foreach { case (topicAndPartition, partitionData) =>
// 响应结果:TopicAndPartition->PartitionData,根据TopicAndPartition,就能从partitionMap中的PartitionFetchState
partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =>
// we append to the log if the current offset is defined and it is the same as the offset requested during fetch
// fetchRequest是由partitionMap通过buildFetchRequest构建出来的,而currentPartitionFetchState也来自于partitionMap
if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) {
// responseData的PartitionData,包含了拉取的消息内容
val messages = partitionData.toByteBufferMessageSet
// 最后一条消息的offset+1,为新的offset,即下一次要拉取的offset的开始位置从newOffset开始
val newOffset = messages.shallowIterator.toSeq.lastOption match { //正常的迭代器迭代之后消息就没有了,使用shallow拷贝,消息仍然存在
case Some(m: MessageAndOffset) => m.nextOffset
case None => currentPartitionFetchState.offset
}
// 更新partitionMap中的Partition拉取状态, 这样下次请求时,因为partitionMap内容更新了,重新构造的buildFetchRequest的offset也变化了
partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
})
}
if (partitionsWithError.nonEmpty) handlePartitionsWithErrors(partitionsWithError)
}
}

通过partitionMap构造fetchRequest①, 所以fetchRequest.offset(topicAndPartition)②对应的是partitionMap某个partition的
PartitionFetchState③, 而这和currentPartitionFetchState应该是同一个PartitionFetchState,它也是从partitionMap里获取的.

k_partitionFetchState_newOffset

可以看到processFetchRequest作为抽象类的主方法,将其他回调方法都组织起来.

k_abstractFetcherThread_doWork

ConsumerFetcherThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ConsumerFetcherThread(name: String, val config: ConsumerConfig, sourceBroker: BrokerEndPoint,
partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager) extends AbstractFetcherThread {
private val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, ..)

protected def fetch(fetchRequest: FetchRequest): collection.Map[TopicAndPartition, PartitionData] =
simpleConsumer.fetch(fetchRequest.underlying).data.map { case (key, value) => key -> new PartitionData(value) }

def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) {
val pti = partitionMap(topicAndPartition) // PartitionTopicInfo
if (pti.getFetchOffset != fetchOffset)
throw new RuntimeException("Offset doesn't match for partition, pti offset: %d fetch offset: %d")
pti.enqueue(partitionData.underlying.messages.asInstanceOf[ByteBufferMessageSet]) // FetchResponsePartitionData的消息集入队
}
}

在processFetchRequest中针对partition的fetchRequest.offset和currentPartitionFetchState.offset进行了比较.
因为他们都是从partitionMap得到的,所以一般来说抓取请求的offset和partitionMap中旧的offset是相同的.
这里processPartitionData也要比较PartitionTopicInfo.fetchOffset和上面partitionMap中旧的offset要相等.

fetch请求会产生PartitionData分区数据(抓取的结果).获取到数据后调用processPartitionData,将结果放到队列中.
FetchResponse的data存储TopicAndPartition->FetchResponsePartitionData,其value被用于构建PartitionData.
而实际上FetchResponsePartitionData作为underlying,包含了从服务端抓取的消息集内容messages.

k_consumerFetchThread

AbstractFetcherThread的responseData中PartitionData的错误码是OFFSET_OUT_OF_RANGE的处理方式:
根据autoOffsetReset重置策略,Smallest对应Earliest, Largest对应Latest. 发送OffsetRequest请求.

1
2
3
4
5
6
7
8
9
10
11
12
13
// handle a partition whose offset is out of range and return a new fetch offset
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
val startTimestamp = config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString => OffsetRequest.EarliestTime
case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime
case _ => OffsetRequest.LatestTime
}
val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId)
val pti = partitionMap(topicAndPartition)
pti.resetFetchOffset(newOffset)
pti.resetConsumeOffset(newOffset)
newOffset
}

PartitionTopicInfo

这里还看到了ConsumerFetcherManager的partitionMap不仅用于LeaderFinderThread查找partition的offset,
因为它也传给了每个ConsumerFetcherThread,也用于这里查找partition的PartitionTopicInfo.
而PartitionTopicInfo除了offsert,还保持了实际结果数据的BlockingQueue.所以这里在获取到数据后会先加到队列中.

PartitionTopicInfo(pti)的offset用于抓取Partition,而BlockingQueue用于存储抓取后的结果数据.

k_partition_topicinfo

PartitionTopicInfo是在ZKRebalancerListener.rebalance时调用addPartitionTopicInfo创建的.
队列是topicThreadIdAndQueues某个threadId对应的Queue,在ConcumserConnector.consume时创建空的队列.

在创建PartitionTopicInfo时就把各项工作都准备好,抓取时以fetchOffset为准,有了数据后填充到chunkQueue中.
这样在要抓取数据的时候只要取出partitionTopicInfo的fetchOffset就知道要从哪里开始抓取数据了,
在抓取到结果数据(partitionData)后,填充到partitionTopicInfo的队列中,由客户端控制迭代器获取结果数据.

1
2
3
4
5
6
7
8
9
10
class PartitionTopicInfo(val topic: String, val partitionId: Int,
private val chunkQueue: BlockingQueue[FetchedDataChunk],
private val consumedOffset: AtomicLong, private val fetchedOffset: AtomicLong,
private val fetchSize: AtomicInteger, private val clientId: String) extends Logging {
def enqueue(messages: ByteBufferMessageSet) {
val next = messages.shallowIterator.toSeq.last.nextOffset
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
fetchedOffset.set(next)
}
}

fetchedOffset

在PartitionTopicInfo将本批消息集messages入队列后,要更新fetchedOffset为这一批消息最后一条消息的offset的下一条.
虽然fetchedOffset在PartitionTopicInfo中是个私有的类变量,但是它提供了getFetchOffset用来获取最新要抓取的fetchOffset.

其实在PartitionTopicInfo中,fetchedOffset.get也能够获取到最新的值,就在上面的enqueue中哦.

getFetchOffset出现在LeaderFinderThread,在一开始为FetcherThread添加partition的时候就指出了最新要抓取的offset.

1
2
3
addFetcherForPartitions(leaderForPartitionsMap.map{ case (topicAndPartition, broker) =>
topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
)

另外一个地方是ConsumerFetcherThread在processPartitionData时在入队列之前的验证(enqueue的调用者).

ConsumerFetcherThread的partitionMap在构造的时候就是由ConsumerFetcherManager的partitionMap传入的.
而ConsumerFetcherManager的partitionMap是在updateFetcher时调用startConnections传入的topicRegistry.

我们已经看到在AbstractFetcherThread中拉取数据之后也会以最新的newOffset更新partitionMap.
不过这个partitionMap的value是PartitionFetchState,它的来源也是从addFetcherForPartitions->addPartitions而来的.

实际上如果partitionMap已经存在TopicAndPartition,就不会再更新了.就只能由fetch触发newOffset的更新.

1
2
3
4
5
6
7
8
def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) {
for ((topicAndPartition, offset) <- partitionAndOffsets) {
// If the partitionMap already has the topic/partition, then do not update the map with the old offset
if (!partitionMap.contains(topicAndPartition)) {
partitionMap.put(topicAndPartition, new PartitionFetchState(offset))
}
}
}

k_fetchOffset

SimpleConsumer

SimpleConsumer抓取数据流程使用和scala版本的Producer类似,用同步类型的阻塞通道实现.

因此ConsumerFetcherThread调用fetch时就会阻塞地返回结果数据给抓取线程(FetchResponse).
那么为什么在ConsumerFetcherThread中在fetch之后要有processPartitionData这种马后炮呢?

1
2
3
4
5
6
7
8
9
10
11
class SimpleConsumer(val host: String, val port: Int, val soTimeout: Int, 
val bufferSize: Int, val clientId: String) extends Logging {
private val blockingChannel = new BlockingChannel(host, port, bufferSize, ..)

def fetch(request: FetchRequest): FetchResponse = {
var response: NetworkReceive = sendRequest(request)
val fetchResponse = FetchResponse.readFrom(response.payload(), request.versionId)
val fetchedSize = fetchResponse.sizeInBytes
fetchResponse
}
}

一个消费者线程(ConsumerFetcherThread)会和一个source Broker建立唯一的连接通道,
而这些broker最开始源于LeaderFinderThread的leaderForPartitionsMap.

1
2
3
4
5
LeaderFinderThread                                      AbstractFetcherManager 
|-- addFetcherForPartitions(leaderForPartitionsMap) --> |-- createFetcherThread(fetcherId,broker)

ConsumerFetcherManager ConsumerFetcherThread
|-- new ConsumerFetcherThread --> |-- simepleConsumer.fetch --> SimpleConsumer(sourceBroker) --> BlockingChannel

小结

再次梳理下消费者,线程,队列,通道等的关系.

No Event DataStructure
1 消费者订阅多个主题(Topic),每个主题可以配置多个线程(Count) TopicCount
2 消费者对一个Topic就可以有多个线程 topicThreadIds: topic->theadIds
3 每个线程对应一个队列和KafkaStream queuesAndStreams
4 一个Topic和Partition组合对应一个fetcherId fetcherId
5 根据fetcherId和broker可以创建一个FetcherThread createFetcherThread(fetcherId,broker)
6 一个FetcherThread使用一个SimpleConsumer和一个BlockingChannel通道 SimpleConsumer
7 一个fetcherId可能对应相同broker,相同topic,不同的partition partitionMap
8 一次fetch操作从partitionMap中构建一个FetchRequest buildFetchRequest
9 所以一个FetchRequest会包括多个Partition TopicAndPartition->PartitionFetchInfo

k_consumer_hubble

因为FetchRequest对应唯一的SimpleConsumer(6),所以构建FetchRequest的partitionMap也是属于唯一的FetcherThread(上图左下角).

Method Action Explain
AbstractFetcherManager.addFetcherForPartitions fetcherThreadMap(brokerAndFetcherId).addPartitions(..) 把属于某个Fetcher的Partitions加入到对应的线程中
AbstractFetcherThread.addPartitions partitionMap.put(…) 添加只属于这个线程的Partition到对应的partitionMap中
AbstractFetcherThread.buildFetchRequest FetchRequest 根据partitionMap构建FetchRequest

目前为止的工作如下, 下一步就是服务端处理FetchRequest,并返回FetchResponse.

  • [1] ZookeeperConsumerConnector.rebalance里会先fetchOffsets,形成topicRegistry表示消费者的topic注册信息
  • [2] topicRegistry中存放的是topic->partition->PartitionTopicInfo,而PartitionTopicInfo包括了fetchOffset和queue
  • [3] queue是底层的阻塞队列,用来存储结果数据. fetchOffset用在FetchThread抓取Partition的数据
  • [4] 最终partitionMap形成FetchRequest,通过SimpleConsumer抓取到了结果数据TopicAndPartition->PartitionData
  • [5] FetchRequest通过TCP发送到KafkaServer,由KafkaApis.handleFetchRequest处理抓取请求
  • [6] PartitionData是要返回给消费者的分区数据,在processPartitionData处理分区数据时,会填充到TopicPartitionInfo的队列中
  • [7] 现在queue阻塞队列[3]有了数据, 而这个queue也用于KafkaStream
  • [8] 客户端通过迭代器不断获取/消费结果数据,实际上就是从queue中拉取消息

KafkaApis.handleFetchRequest

SimpleConsumer的fetch抓取请求会在KafkaApis端被处理.我们的目标是返回messageSet消息集.

handle Request sendResponseCallback.responseStatus ReplicaManager LocalLog DelayedOperation
handleProducerRequest ProduceRequest Map[TopicPartition, PartitionResponse] appendMessages appendToLocalLog DelayedProduce
handleFetchRequest FetchRequest Map[TopicAndPartition, FetchResponsePartitionData] fetchMessages readFromLocalLog DelayedFetch

Producer生产消息时,返回状态PartitionResponse包含了baseOffset,表示生产的消息存到partition的什么位置.
而消费者Fetch消息的返回状态有hw和messages. 因为你要返回消息给消费者才叫fetch,消费者需要得到这批数据!

生产者消息是写到Partition的Leader,消费者也应该是从Leader消费数据的.FetchRequest怎么确保是Leader?
FetchRequest是由partitionMap构成的,而它的源头来自于LeaderFinderThread的addFetcherForPartitions,
在那里获得PartitionMetadata的Leader,才会加入到FetcherThread中.所以FetchRequest的也都是Leader Partition.

1
2
3
4
5
6
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
// call the replica manager to fetch messages from the local replica
replicaManager.fetchMessages(fetchRequest.maxWait.toLong, fetchRequest.replicaId,
fetchRequest.minBytes, authorizedRequestInfo, sendResponseCallback)
}

ReplicaManager.fetchMessages

抓取请求和生产请求类似,如果需要返回数据,但是读取的数据没有达到fetchMinBytes,则会被放入延迟缓存中.
logReadResults如果能够立即返回,则其中附带的hw和messageSet会通过responseCallback直接返回给客户端.
但如果需要被延迟,显然logReadResults数据并不完整,所以要把fetchOffsetMetadata作为FetchMetadata.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def fetchMessages(timeout: Long, replicaId: Int, fetchMinBytes: Int,
fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo],
responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
val isFromFollower = replicaId >= 0 //只有follower才有replicaId,而consumer是没有的
val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
// read from local logs 从本地日志文件中读取消息
val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)
// if the fetch comes from the follower, update its corresponding log end offset
if(Request.isValidBrokerId(replicaId)) updateFollowerLogReadResults(replicaId, logReadResults)
if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) {
val fetchPartitionData = logReadResults.mapValues(result =>
FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
responseCallback(fetchPartitionData)
} else { // construct the fetch results from the read results 根据读取的结果构造抓取结果
val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) =>
(topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata,fetchInfo.get(topicAndPartition).get))
}
val fetchMetadata = FetchMetadata(fetchMinBytes,fetchOnlyFromLeader,fetchOnlyCommitted,isFromFollower,fetchPartitionStatus)
// 要构造一个延迟的Fetch请求,要把需要的数据都封装进来,然后使用这个DelayedFetch操作
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionOperationKey(_)).toSeq
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
}
}

获取消息会找到Partition的Leader Replica,取得对应的Log实例,然后按照给定的offset,fetchSize读取日志.
logReadInfo是FetchDataInfo,包含LogOffsetMetadata和消息集messageSet.并进一步封装到LogReadResult.
readFromLocalLog将参数中的PartitionFetchInfo经过FetchDataInfo转换成了LogReadResult.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Read from a single topic/partition at the given offset upto maxSize bytes
def readFromLocalLog(fetchOnlyFromLeader: Boolean, readOnlyCommitted: Boolean,
readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = {
readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
// decide whether to only fetch from leader. 获取TopicPartition的Leader副本
val localReplica = if (fetchOnlyFromLeader) getLeaderReplicaIfLocal(topic, partition) else getReplicaOrException(topic, partition)
// decide whether to only fetch committed data (i.e. messages below high watermark) 正常来说只能最多读取到HW
val maxOffsetOpt = if (readOnlyCommitted) Some(localReplica.highWatermark.messageOffset) else None
val initialLogEndOffset = localReplica.logEndOffset
val logReadInfo = localReplica.log.read(offset, fetchSize, maxOffsetOpt) // 参数分别作为startOffset,maxLength,maxOffset
val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset <= 0
val logReadResult = LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, readToEndOfLog, None)
(TopicAndPartition(topic, partition), logReadResult)
}
}

PartitionFetchInfo中有本次抓取的起始offset,作为startOffset.而fetchSize是抓取大小,作为maxSize.
localReplica是一个Replica,所以它有两个volatile类型的LogOffsetMetadata:logEndOffset和highWatermark.
其中highWatermark metadata的messageOffset决定了maxOffset,maxOffset会在LogSegment中决定要读取的length.

Log read->LogSegment

segments是offset->LogSegment的map映射,所以entry.getValue就是某个offset对应的LogSegment.

问题:客户端fetch数据,是按照Partition级别指定startOffset的,而一个Partition有多个Segment.
如果fetch的数据跨越多个Segment,怎么确保返回多个Segment的数据给客户端? 似乎下面的代码针对的是一个Segment.

1
2
3
4
5
6
7
8
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
var entry = segments.floorEntry(startOffset) // 小于startOffset的最大的那一条
while(entry != null) {
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, entry.getValue.size)
if(fetchInfo == null) entry = segments.higherEntry(entry.getKey) // 大于指定key的最小那一条
else return fetchInfo
}
}

append消息时写到index的是相对偏移量,读取时给定绝对偏移量,要转换为相对偏移量,从index文件中找出mapping映射关系.
translateOffset的offset是客户端读取是的绝对偏移量,要从这个位置开始读取,但是文件真正的位置要通过mapping获取.

k_logSegment_append_read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
val mapping = index.lookup(offset) //返回值是绝对offset和其物理位置
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo = {
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)
if(startPosition == null) return null // if the start position is already off the end of the log, return null
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length = maxOffset match {
case None => min((maxPosition - startPosition.position).toInt, maxSize) // no max offset, just read until the max position
case Some(offset) => {
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset >= startOffset) {
val mapping = translateOffset(offset, startPosition.position)
// the max offset is off the end of the log, use the end of the file
val endPosition = if(mapping == null) logSize else mapping.position
min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
}
}
}
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}

上面有两处translateOffset,第一次是对startOffset获取startPosition,第二次是对maxOffset获取endPosition.
maxOffset通常是Replica的HW,即消费者最多只能读取到hw这个位置的消息,当然前提是maxOffset要大于startOffset.

读取方法的几个参数以及读取过程产生的变量的含义:

var meaning
startOffset 客户端指定要从哪个offset开始读取,这是一个绝对offset(针对partition级别)
maxOffset Replica的HW或者None,通常只能读取不超过hw的消息
maxSize 要读取的最大长度.有可能endPosition-startPosition比maxSize大,那么只需要maxSize而已
maxPositition 默认是LogSegment的大小(entry.getValue.size)
startPosition 从index文件lookup之后,得到OffsetPosition,对应的要读取的物理位置
baseOffset Log和index的基础offset,log文件和index文件都是以baseOffset命名的
endPosition maxOffset为HW,获取在index中的endPosition,即HW对应的offset的物理位置
length 要读取的长度. maxPosition和endPosition中的较小值减去startPosition,还要和maxSize比较

log.read的log是FileMessageSet,根据传入的开始位置和读取长度,构造一个FileMessageSet对象.
通常读取文件是定位到某个位置,读取出指定长度的数据,而这里是新建一个FileMessageSet,相当于一个视图.

1
2
3
def read(position: Int, size: Int): FileMessageSet = {
new FileMessageSet(file, channel, start = this.start + position, end = math.min(this.start + position + size, sizeInBytes()))
}

看到了吧,fetch请求需要的messageSet以FileMessageSet的形式会返回给客户端.

k_readFromLocalLog

PartitionTopicInfo除了fetchedOffset用于抓取数据,还有一个consumedOffset.
而且ZookeeperConsumerConnector中除了fetchOffsets也有commitOffsets的操作.

  • fetchOffsets会指示消费者从Partition的起始位置开始抓取数据
  • 有了fetchOffset,配合抓取线程,以及fetch动作,返回messageSet
  • 由于客户端要记录自己消费过的offset,所以使用consumedOffset
  • fetch的返回值虽然含有messageSet,但是这仅仅是服务端返回的一个视图对象
  • 客户端要自己去迭代视图对象里的消息,才能真正获取到messageSet中的每条消息
  • 所以这就是为什么ConsumerFetcherThread在fetch之后要有processPartitionData
  • PartitionData包括了messageSet视图对象,处理PartitionData首先放入队列中
  • 然后客户端通过队列和KafkaStream,迭代获取消息!

KafkaStream

通过ConsumerConnector.createMessageStreams返回topic->List<KafkaStream>.
调用KafkaStream.iterator会创建ConsumerIterator,迭代器负责从通道(阻塞队列)中返回数据.

  • PartitionData: 服务端返回给客户端的每个分区的数据
  • MessageSet: PartitionData底层包含了原始的消息集messages
  • FetchedDataChunk: 原始消息集,PartitionTopicInfo构成的
  • PartitionTopicInfo: 负责将原始消息集构成FetchedDataChunk放入队列中(阻塞通道)
  • MessageAndMetadata: 通过next迭代,返回每一条消息

队列可以存放多个FetchedDataChunk,而每个FetchedDataChunk包括了消息集,然后消费其中的每条消息.
所以首先从通道中获取出FetchedDataChunk,然后获取消息集的迭代器,在此迭代器基础上获取每一条消息.
如果当前FetchedDataChunk的消息集(所有消息)都遍历完毕,就从通道中取出下一个FetchedDataChunk.

k_queue_chunk

PartitionTopicInfo有两个关于offset的变量(过去式ed):fetchedOffset和consumedOffset.

  • fetchedOffset: 要从Partition的哪个fetchOffset开始抓取
  • consumedOffset: 从Partition的指定offset抓取到数据后,自己消费到哪个consumeOffset

前面分析ConsumerFetcherThread时已经分析过了PartitionTopicInfo(PTI)和fetchedOffset的上层调用.
在fetch之后,processPartitionData时,PTI.fetchOffset①用于构造FetchedDataChunk被放入队列.
然后设置PTI.fetchedOffset②为本批已经抓取到消息的下一个offset(只有抓取到之后才更新,表示已经抓取过了).

k_pti_enqueue

1
2
3
4
5
def enqueue(messages: ByteBufferMessageSet) {
val next = messages.shallowIterator.toSeq.last.nextOffset
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) // ①, this就是PTI
fetchedOffset.set(next) // ②
}

PartitionTopicInfo的fetchedOffset被更新的地方还可以通过PTI.resetFetchOffset.

1
2
PartitionTopicInfo.resetFetchOffset(long)  (kafka.consumer)
|-- ConsumerFetcherThread.handleOffsetOutOfRange(TopicAndPartition) (kafka.consumer)

fetchedOffset是在fetch之后,表示消费者已经抓取到了这个位置,主要在ConsumerFetcherThread的生命周期里.
而抓取到的分区数据被消费者真正的消费则不在抓取线程范围内,需要由客户端消费线程自己去管理(做好本职工作).
所以下面ConsumerIterator是无法修改抓取线程生命周期里的FetchedDataChunk,而只能更新相关的PTI信息.

k_two_offset

ConsumerIterator

拉取线程将FetchedDataChunk放入队列中,消费者迭代器会从队列中取出FetchedDataChunk,完成消息消费的动作.

k_consumer_iterator

consumedOffset只有在消费者主动迭代消费消息时才会被更新.如果它没有消费消息,更新操作不能进行.
每消费一条消息,都要重新设置(resetConsumeOffset)PartitionTopicInfo的consumedOffset表示自己消费了这条消息.
可以看到PTI在抓取数据时用于更新fetchedOffset,下面的currentTopicInfo被用于消费到消息时更新consumedOffset.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],..) {
private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L

override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
currentTopicInfo.resetConsumeOffset(consumedOffset)
item
}

protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
var localCurrent = current.get() // MessageAndOffset的迭代器

var item = localCurrent.next() // MessageAndOffset
// reject the messages that have already been consumed 跳过已经消费过的消息
// 即每个消息项item:MessageAndOffset 大于 consumedOffset, 才会返回MessageAndMetadata
while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) item = localCurrent.next()
consumedOffset = item.nextOffset // 指向下一条消息
new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
}
}

通过FetchedDataChunk的messages能够得到MessageAndOffset的迭代器,item是迭代器中的每一条消息.
如果MessageAndOffset的迭代器不存在,或者当前迭代器数据已经迭代完了,则从channel中获取新的迭代器.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if(localCurrent == null || !localCurrent.hasNext) {
// 没有指定消费超时, 直接从channle中take, 否则轮询channel. channle就是BlockingQueue
if (consumerTimeoutMs < 0) currentDataChunk = channel.take
else currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)

// FetchedDataChunk包括了fetchOffset,消息内容,以及PartitionTopicInfo引用
currentTopicInfo = currentDataChunk.topicInfo
val cdcFetchOffset = currentDataChunk.fetchOffset
val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
// 比较FetchedDataChunk.PartitionTopicInfo.consumedOffset和FetchedDataChunk.fetchOffset
if (ctiConsumeOffset < cdcFetchOffset) currentTopicInfo.resetConsumeOffset(cdcFetchOffset)

// messages是MessageSet(FileMessageSet),是一个视图,通过iterator获得迭代器
localCurrent = currentDataChunk.messages.iterator
current.set(localCurrent)
}

这里还看到FetchedDataChunk.fetchOffset和PartitionTopicInfo.consumedOffset交汇的地方:
PartitionTopicInfo已经消费的offset 应该大于 FetchedDataChunk申请的fetchOffset,

在addPartitionTopicInfo时创建的PartitionTopicInfo的fetchedOffset=consumedOffset.
在enqueue时创建的FetchedDataChunk使用的fetchOffset就是创建PTI时的fetchedOffset.
然后更新了PTI.fetchedOffset为抓取消息集的下一个.所以如果消费者还没有开始迭代消费消息,
此时PTI.consumedOffset=FetchedDataChunk.fetchOffset<PTI.fetchedOffset

随着消费者开始迭代makeNext消费消息,consumedOffset会增加,并更新PTI.consumedOffset,
因此会出现: PTI.consumedOffset>FetchedDataChunk.fetchOffset<PTI.fetchedOffset,

k_update_offset


文章目录
  1. 1. LeaderFinderThread
  2. 2. AbstractFetcherManager
  3. 3. AbstractFetcherThread addPartitions
  4. 4. FetchRequest & PartitionData
    1. 4.1. ConsumerFetcherThread.buildFetchRequest
  5. 5. AbstractFetcherThread doWork
  6. 6. ConsumerFetcherThread
    1. 6.1. PartitionTopicInfo
    2. 6.2. fetchedOffset
    3. 6.3. SimpleConsumer
    4. 6.4. 小结
  7. 7. KafkaApis.handleFetchRequest
    1. 7.1. ReplicaManager.fetchMessages
    2. 7.2. Log read->LogSegment
  8. 8. KafkaStream
    1. 8.1. ConsumerIterator