Kafka源码分析 DelayOperation

接Kafla ISR,生产消息和拉取消息被缓存…

DelayedOperation

DelayedOperation包括两种: DelayedFetch和DelayedProduce. 存在DelayedFetch是为了更高效地fetch(batch fetch);
存在DelayedProduce是为了等待更多副本的写入, 以达到用户指定的持久性保证(这样消息更不容易丢失).
对于DelayedOperation而言,什么时候不再需要delay是必须指明的, 根据操作的不同,delay被满足的条件有所不同.但也有共同的比如max wait time.

k_delay

ProducerRequest & DelayedProduce

1
2
3
4
5
case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, correlationId: Int, clientId: String,
requiredAcks: Short,
ackTimeoutMs: Int,
data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
extends RequestOrResponse(Some(ApiKeys.PRODUCE.id))
  • requiredAcks: 当ack=-1时,要求所有在ISR的replica都已经确认写入了这个produce request的内容,leader才会返回响应给producer
  • ackTimeoutMs: 服务器可以等待的在RequiredAcks的条件下收到指定数量的ack(ISR所有副本的数量)的最长时间.

当一个ProducerRequest由于这两个参数而需要等待时,Kafka就生成一个DelayedProduce对象,来表示’delay’的语义(生产请求被延迟发送响应给Producer)

1
2
3
4
5
6
7
8
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus)
case class ProduceMetadata(produceRequiredAcks: Short, produceStatus: Map[TopicAndPartition, ProducePartitionStatus])

class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
extends DelayedOperation(delayMs)
  • ProducePartitionStatus: 针对一个Partition, 当所有replica拉取到至少requiredOffset,才会返回响应给Producer
  • ProduceMetadata: 针对所有Partition, 记录了一些对Producer请求加工后的信息,便于以后判断delay条件时使用
  • produceRequiredAcks: 和ProducerRequest的requiredAcks是一样的, 都表示什么时候返回ack给Producer
  • delayMs: 和ProducerRequest的ackTimeoutMs一样, 表示超时时间

DelayedProduce 在把消息集append到本地后,就可以获取append之后这个消息集的最后一条消息的offset,由此推出来当
follower来请求到至少那一条消息(最后一条消息)时,就说明这个replica已经拉取完了这个message set,这个offset就是requiredOffset

FetchRequest & DelayedFetch

1
2
3
4
5
6
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, correlationId: Int = FetchRequest.DefaultCorrelationId, clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
extends RequestOrResponse(Some(ApiKeys.FETCH.id))
  • maxWait: 如果没有足够的数据, 这个FetchRequest可以最多等待这么长时间, 以期待更多的数据到来
  • minBytes: 在返回Response前最少获取这么多字节的数据。如果client把这个值设成0,那么server将会总是立即返回响应,尽管可能在上一次请求后没有新数据,client也会收到一个空的message set。如果把这个值设成1,那么只要一个partition有至少1byte的数据,或者指定的timeout到了,那么server也会返回响应。通过把这个值设大一些,并且配合timeout的设置,consumer可以只读取一批数据,从而在吞吐量和延迟之间进行调节(比如把MaxWaitTime设成100ms, 并且把MiniBytes设成64k,将使得server在返回响应给client之前,最多等待100ms来收集够64k的数据)

当一个FetchRequest由于这两个参数而需要等待时,Kafka就生成一个DelayedFetch对象,来表示’delay’的语义(拉取请求被延迟返回数据给消费者)

1
2
3
4
5
6
7
8
9
10
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo)
case class FetchMetadata(fetchMinBytes: Int, fetchOnlyLeader: Boolean, fetchOnlyCommitted: Boolean, isFromFollower: Boolean,
fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus])

class DelayedFetch(delayMs: Long,
fetchMetadata: FetchMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit)
extends DelayedOperation(delayMs)
  • PartitionFetchInfo 针对一个partition的,说明了从Partition的哪个offset开始读,以及最多读取多少byte的数据
  • FetchPartitionStatus 针对一个partition的,startOffsetMetadata包含了与LogSegment有关的信息,它可以定位到具体的物理Segment
  • FetchMetadata 包括了每个partition的FetchPartitionStatus(是个Map),以及前边提到的minBytes(fetchMinBytes)等信息。

DelayedFetch包括了FetchMetadata等信息(所有要拉取Partition的状态),其中的delayMs就是前边提到的maxWait.
FetchRequest提供了关于原始的FetchRequest的信息,以及在FetchRequest处理的步骤中补充的一些信息

多个Partition

在上面的定义里,在针对单个Partition的FetchRequest和ProduceRequest里,都可以produce以及fetch到这个partition。
而delay参数,比如ack以及minBytes都是针对整个Request(一个Request是包含多个Partition)的。
所以对于ProduceRequest而言,只有它包括的所有partition的ISR都已经确认,这个Request才会整体不再被delay。
而对于FetchRequest,只要总的bytes超过了minBytes,不管是不是所有Partition的数据都有,它都会不再被delay。

ProduceRequest的这种行为是由于对于它的响应只会有一个,所以需要等所有partition都处理完了,再能给出这个响应。
这个对于异步的Producer也是一样。如果需要允许确认一部分Partition就返回响应,那么server和client端的处理就会麻烦很多。

而FetchRequest的这种行为使得无法保证fetch到的数据中不同partition的比例,可能并不是特别好。
但是大部分情况下,还都是只针对一个Partition发一个FetchRequest,所以不会受到影响。

responseCallback

DelayedOperation的两个实现类的参数都是类似的,都有delayMs,Metatada,ReplicaManager,responseCallback.
不过因为生产和拉取请求类型不同,metadata和response的类型也不同. Metadata中都对应了另外一个触发条件.
比如FetchMetadata的触发条件是fetchMinBytes, ProduceMetadata的触发条件是produceRequiredAcks.
这也是DelayedOperation只有一个类参数delayMs的原因,因为生产和拉取请求都有共同的timeout最长等待时间.

k_req_meta

因为延迟操作最后一定会满足某个条件而触发完成的动作,完成后会调用responseCallback回调函数.

k_delay_resp

Purgatory & Cache

为什么需要缓存?

当生产者设置了同步选项(requiredAcks为-1)时才会启动生产缓存,因为每一批生产的消息,需要等待ISR中所有的follower replicas同步成功.
ISR中所有follower replicas上报自己的messageOffset追上leader partition之前,生产请求会一直保留在生产缓存中,等待直到超时.

拉取请求为什么也需要缓存? 因为kafka在消费消息时默认一次拉取最低消费1条消息.如果消费者拉取的时候没有任何新消息生产,
则拉取请求会保留到拉取缓存中,等待直到超时. 这在一定程度上避免了反复拉取一批空消息占用带宽资源的问题。

Produce缓存

生产者可以选择批量发送消息,所以在server端接到这批消息时(当前server含有leader partition),首先要把消息写入本地文件.
之后会第一时间更新拉取缓存.kafka对缓存的处理策略是,任何操作只要有可能会对缓存施加影响,都第一时间进行更新缓存,
以尽可能的释放处于等待状态的拉取请求.也就是说有可能等待新消息(生产)的拉取请求,在这个时候就会带着新消息返回给消费者了.

之后会根据生产者配置的requiredAcks选项分为三种策略,只有当requiredAcks为-1会把这批消息的生产请求进行缓存.
缓存之后会迅速对生产缓存和拉取缓存进行检查,kafka不放过任何可以释放缓存中请求的机会.

Producer设置acks=-1,则要求所有的ISR都ack给Leader,再由Leader发送ack给Producer.
由于ISR是分布在集群中各个节点的,再加上网络等问题,所以这个过程并不是很快就能完成的.
但是Producer生产消息的速度是很快的,并且通常是大批量的.因此Producer不能立即得到响应.
而且一个ProducerRequest包含的是多个Partition的消息(TopicAndPartition->MessageSet)

一个Request要等所有Partition都ack给Producer,才算这个Request完成了.所以需要在服务端保存Producer请求相关的数据.相当于要维护一个缓存.
因为ISR fetch是异步的,不能保证什么时候完成,消息追加到Leader的Partition后需要等待所有ISR同步完成,等待的过程就使得这个请求是被延迟
ReplicaManager负责创建延迟的DelayedProduce,但是监视这个操作什么时候加入到purgatory,以及什么时候完成,则交给Purgatory统一管理.

k_delay_produce

Fetch缓存

kafka中从分区和普通的消费者几乎一致,所有这些fetch请求都会在handleFetchRequest方法中进行处理.
fetch请求首先会更新上一次拉取结果的从分区的水位线(即follower的最新messageOffset),
因为我们知道什么时候释放生产缓存是根据从分区的水位线是否追上主分区来决定的(都赶上后才释放),
所以fetch请求更新完从分区水位线之后会立刻检查是否可以释放生产请求(每个fetch请求都要检查),
由于生产请求释放之后会相应地影响被缓存的拉取请求,所以也要检查是否能更新拉取请求.

读取本地磁盘,按偏移量读出本次能够拉去的消息.并根据每次拉取的最小消息量配置项决定是否要把拉取请求推入拉取缓存中。
如果拉取请求被推入拉取缓存,他就会等待新的消息生产,直到从缓存中释放,返回消息给消费者或者从分区。

缓存的相互影响主要是生产缓存对拉取缓存的影响, 但是两者是有相互影响的.

  • 生产者写入新消息到Leader, follower会去同步Leader的消息, 客户端的acks=-1时,生产请求加入到生产缓存
  • 当生产缓存被释放(DelayedProduce完成), 说明所有的follower都同步了这批消息, Leader会更新HW

因为follower和消费者都会向Leader拉取消息, 不同的是follower跟踪的是Leader的LEO,消费者跟踪的是HW.

  • 拉取请求一直在等待生产者的新消息, 在等待的过程中由于条件不满足(minBytes不够或未超时),拉取请求加入到拉取缓存
  • 当有新的消息到来, 拉取缓存就有机会释放处于等待中的拉取请求
  • 拉取缓存被释放(DelayedFetch完成), 说明拉取到了足够的消息, 如果是消费者,则直接返回拉取的新消息给消费者
  • 如果是follower fetch了消息,返回ack给Leader, 并导致生产缓存有机会释放等待中的生产请求(满足ISR的条件)

k_delay_cache

DelayedOperation接口

两种Request都有max wait时间,为此需要定期检查被delayed的这些request是否超时了,即需要一个定时任务。

除了时间触发之外,对DelayedOperation的delay条件是否满足的检查还有一些是事件触发的,对于DelayedProduce,
当一个replica发来fetch请求时,leader就获取了replica拉取进度的新信息,
因此就需要检查下DelayedProduce是否可以被满足(可以被满足就是说这个DelayedOperation应该摆脱delay状态)。
同样的,对于DelayedFetch,当有新的producer request里的消息append到leader,
就需要检查下处于delayed状态的DelayedFetch请求是否可以被满足。

因此,DelayedOperation必须有接口可以被调用以检查是否delay条件被满足了。
此外,DelayedOperation应该指明在delay条件满足之后应该怎么做(发送response);以及在expire之后应该怎么做。
因此,它大概需要以下方法:

  • isCompleted 检查是否已经被完成
  • tryComplete 检查delay条件是否已经满足,满足的话就执行一些操作
  • onExpiration 在expire以后怎么做
  • onComplete 在complete之后怎么做

延迟操作是如何完成的?

An operation whose processing needs to be delayed for at most the given delayMs. For example
a delayed produce operation could be waiting for specified number of acks; or
delayed fetch operation could be waiting for a given number of bytes to accumulate.

Kafka implements several request types that cannot immediately be answered with a response. Examples:
A produce request with acks=all cannot be considered complete until all in-sync replicas
have acknowledged the write and we can guarantee it will not be lost if the leader fails.
A fetch request with min.bytes=1 won’t be answered until there is at least one new byte of data for the consumer to consume.
This allows a “long poll” so that the consumer need not busy wait checking for new data to arrive.

一个操作的处理需要被延迟执行,但是最多只能延迟给定的delayMs时间. 比如
一个延迟的消息生产操作可能需要等待指定数量的acks(写到leader后,等待所有的ISR返回ack,leader再ack给客户端)
一个延迟的消息读取操作可能需要等待累积到指定数量的字节(并不是leader增加了一点hw,consumer就立即获取一丁点的数据)

The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once.
Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either
forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed,or
tryComplete(), which first checks if the operation can be completed by now, and if yes calls forceComplete().

完成一个延迟的操作的逻辑定义在onComplete中,这个方法只会被调用一次.
一旦一个操作完成了,isCompleted()返回true. onComplete()被触发的条件:
forceComplete: 如果这个操作在过了delayMs后还没有完成,会强制调用onComplete.
tryComplete: 首先检查这个操作是否可以完成,如果可以完成,就调用forceComplete.

在Kafka的架构中,有多个线程共同处理请求,因此可能会有多个线程同时检查delay条件以及在发现满足条件后尝试执行操作。
但是对于onComplete这样的方法,在DelayedOperation的整个生命周期中,只允许被调用一次(因为这个方法会返回给client响应),因此需要对访问它的方法进行同步。
而DelayedOperation约定onComplete方法只允许被forceComplete方法调用
因此在forceComplete方法中用AtomicBoolean的CAS操作构造了一段只被执行一次的代码。

总的来说,这些DelayedOperation在超时后,就直接调forceComplete,然后调onExpiration.
在由事件触发时,就调用tryComplete,如果在tryComplete中发现delay条件被满足,就调用forceComplete.

k_forceComplete1

Timer & Reaper & Watcher

每个请求被缓存起来的时候就表示这是一个延迟的请求,这个请求会有一个定时器附属在它身上,当达到timeout操作还没完成时,就强制完成操作.
什么时候会被加入到定时任务里: 在tryCompleteElseWatch里尝试完成,如果不能完成,就将这个操作加入到timeout队列中.
由于每个请求到来的时间不同,它们的失效时间也不同,操作是交给统一的Timer管理,但是清理已经完成的操作则交给失效清理线程.

前面我们说过一个操作为了摆脱delay状态,除了delayMs超时后,还有一个是其他事件导致业务条件满足了.
所以失效清理线程不仅仅是清理因为超时被强制完成的操作, 也会清理条件满足后触发完成的操作.

为什么在将操作加入到监视器中,还要再加入到Timer定时器中?
因为监视器只是用来判断delay条件是否满足,而操作从Purgatory中移除的另外一个条件是超时了!
如果仅仅有监视器,而没有timeout,尽管delay能满足(最终完成),但是超过了客户端的等待时间,是不能接受的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// timeout timer
private[this] val executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
def newThread(runnable: Runnable): Thread = Utils.newThread("executor-"+purgatoryName, runnable, false)
})
private[this] val timeoutTimer = new Timer(executor)

// the number of estimated total operations in the purgatory
private[this] val estimatedTotalOperations = new AtomicInteger(0)

// a list of operation watching keys
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
private def watchForOperation(key: Any, operation: T) {
inReadLock(removeWatchersLock) {
// 一个key会有多个监视操作, 因为key是TopicAndPartition,而多个Producer请求过来,可能会发送到同一个TopicAndPartition上.
// 这个TopicAndPartition就可以监视多个ProduceRequest Operation了. 同理fetch的key也是TopicAndPartition
val watcher = watchersForKey.getAndMaybePut(key)
// Watcher是这个Key的监视器, 一个监视器会监视多个操作, 所以watch一个operation,是将这个operation加入到key的监视列表中
watcher.watch(operation)
}
}

def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
var watchCreated = false
// 在第一次尝试完成时判断这个操作没有完成, 对每个key,都将Operation操作加入到key的监视列表中
for(key <- watchKeys) {
// 如果发现在某个key的时候,操作被其他线程完成了,直接返回,那么在这之后的keys不需要再监视这个操作了
if (operation.isCompleted()) return false
// 只有操作没有完成, 才将操作注册/加入到每个key的监视列表中
watchForOperation(key, operation)
if (!watchCreated) {
watchCreated = true
estimatedTotalOperations.incrementAndGet()
}
}
//... 经过两轮tryComplete还没有完成,就加入到Timer中,让失效线程负责清理这个请求
if (!operation.isCompleted()) {
timeoutTimer.add(operation)
}
}

后台失效清理线程作为DelayedOperationPurgatory的内部类,因为它要访问watchersForKey中所有的Watchers.
在doWork中首先让Timer每隔200ms做一次tick,在做tick的过程中,TimeWheel的taskCounter即delayed返回值会发生变化.
因为随着时间的流逝,要么有些操作会因为超时被完成,要么业务条件满足而触发完成,都会使得TimerWheel中的任务数量变少,
这样条件判断就有机会执行(即使totalOperations没变化,delayed减少,就有可能大于interval),清理线程就可以开始工作了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// background thread expiring operations that have timed out 后台线程: 把超时的操作失效
private val expirationReaper = new ExpiredOperationReaper()
expirationReaper.start()

// Return the number of delayed operations in the expiry queue 在失效队列中被延迟的操作请求数量
def delayed() = timeoutTimer.size
// Return all the current watcher lists, note that the returned watchers may be removed from the list by other threads
private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values }

// A background reaper to expire delayed operations that have timed out
private class ExpiredOperationReaper extends ShutdownableThread("ExpirationReaper-%d".format(brokerId), false) {
override def doWork() {
// 虽然run方法一直运行,但是Timer每隔200ms才走一次tick.
timeoutTimer.advanceClock(200L)
// Trigger a purge if the number of completed but still being watched operations is larger than the purge threshold.
// That number is computed by the difference btw the estimated total number of operations and the number of pending delayed operations.
// 已经完成的操作,但是仍然保存在监控列表中, 这些操作需要被清理掉.
if (estimatedTotalOperations.get - delayed > purgeInterval) {
// now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to clean up watchers.
// Note that, if more operations are completed during the clean up, we may end up with a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(delayed)
allWatchers.map(_.purgeCompleted())
}
}
// 这是父类ShutdownableThread的方法, 只要线程没挂,就一直运行.
override def run(): Unit = {
while(isRunning.get()){
doWork()
}
}
}

allWatchers是Purgatory中所有key监视的所有Watchers.每个Watchers都有唯一的key,每个key都有一个链表类型的operations.
如果某个key下面没有一个operation,说明这个key的所有请求都已经完成,就可以在Purgatory外部将key从watchersForKey中移除

对于Watchers来说,有两件事要做

  • tryCompleteWatched: 当它关注的key有事件发生时,需要调用它的方法来遍历operations,找出其中可以被complete(即不再被delay)的operation
  • purgeCompleted: 由于一个DelayedOperation可以对应多个key,所以当这个Watchers对应的key没有被触发,它保存的operations里的元素仍然可能由于其它的key触发而而被complete。所以外界需要能主动地检测这个Watchers里的哪些operation已经被complete了,并且移除这些元素

做这两件事情的时机很重要:

  • 第一件事,在产生事件的地方进行检测就好。比如fetch线程处理fetch请求的过程,以及produce request的处理过程中,可以调用tryCompleteWatched。
  • 第二件事的处理时机比较不好确定。因为当把一个request从某个key的watchers中移除以后,它可能还在另一个key的watchers里。而每次移除一个request,都要调用purgeCompleted显然不现实。但是0.9.0的实现中引用了新的数据结构来对request的超时进行检测,通过它可以准确获得某个时刻在purgatory中的请求数量(但并不是server中的DelayedOperation的数量,因为超时的DelayedOperation会被放入一个线程池执行它的回调,所以总的数量还需要加上线程池中的Operation数量, 而且这个线程池是一个FixedThreadPool,它使用一个无界的queue)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// A linked list of watched delayed operations based on some key
private class Watchers(val key: Any) {
private[this] val operations = new LinkedList[T]()

// add the element to watch
def watch(t: T) {
operations synchronized operations.add(t)
}

// traverse the list and purge elements that are already completed by others
def purgeCompleted(): Int = {
var purged = 0
operations synchronized {
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) { // 操作已经完成
iter.remove() // 从监视链表中删除, 注意需要对operations加锁
purged += 1 // 计数器
}
}
}
if (operations.size == 0) removeKeyIfEmpty(key, this)
purged
}
}

// 这是在Purgatory中移除watchersForKey中监视列表为空的key
private def removeKeyIfEmpty(key: Any, watchers: Watchers) {
inWriteLock(removeWatchersLock) {
// if the current key is no longer correlated to the watchers to remove, skip
if (watchersForKey.get(key) != watchers) return

if (watchers != null && watchers.watched == 0) {
watchersForKey.remove(key)
}
}
}

DelayedOperationPurgatory.tryCompleteElseWatch

上面基本分析完了DelayedOperationPurgatory的两个内部类Watcher,Reaper.现在重点看下tryCompleteElseWatch

Check if the operation can be completed, if not, watch it based on the given watch keys.
Note that a delayed operation can be watched on multiple keys.
It is possible that an operation is completed after it has been added to the watch list for some,but not all of the keys.
In this case, the operation is considered completed and won’t be added to the watch list of the remaining keys.
The expiration reaper thread will remove this operation from any watcher list in which the operation exists.

检查操作是否可以完成,如果不能,则根据给定的watchKeys加入到watch中(即监视这个watch).
一个延迟的操作会被多个keys监视(因为一个ProduceRequest有多个TopicAndPartition,key为TAP,value都是这个Request).

  • 一个ProduceRequest有多个TopicAndPartition,其中TopicAndPartition会作为Key.
  • 多次不同的ProduceRequest请求,可能会落在同一个TopicAndPartition里.
  • 所以DelayedOperationPurgatory存储Key是TopicAndPartition,Value是ProduceRequest监视列表

k_purgatory-keys

有可能在将这个操作加入到某些key中(并不是所有的keys),这个操作就已经完成了(分成了两部分keys,前面的key加入的时候还没完成,后面的算完成了).
这种情况下,剩下的keys不需要将操作加入到监视列表中,因为这个时候操作已经完成了. 失效清理线程会从所有的keys中移除已经完成的这个操作.

1
2
3
4
5
6
 add   add   add   add     no need to add watch from here 
key1 key2 key3 key4 key5 key6 ...
|-------------------------|------------------------>
⬆️
这个时候Operation完成了!
|<---remaining keys

k_purgatory_key2

The cost of tryComplete() is typically proportional(均衡,成比例的) to the number of keys.
Calling tryComplete() for each key is going to be expensive if there are many keys. Instead,we do the check in the following way.
Call tryComplete(). If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again.
At this time, if the operation is still not completed, we are guaranteed that it won’t miss any future triggering event
since the operation is already on the watcher list for all keys.
This does mean that if the operation is completed (by another thread) between the two tryComplete() calls,
the operation is unnecessarily added for watch.However,this is a less severe issue since the expire reaper will clean it up periodically.

调用tryComplete的代价和keys的数量是成比例的.如果有很多keys,为每个key都调用一次tryComplete是很昂贵的操作.所以采用下面的检查方式:
调用一次tryComplete(Operation级别),如果操作还没完成,把这个操作添加到所有的keys中.再调用一次tryComplete.这个时候如果操作仍然没有完成,
我们能确保这个操作不会丢失即将触发的事件(什么事件?),因为这个操作已经在所有的keys的监听列表中.
同时意味着,如果这个操作在两次tryComplete调用中被另外一个线程完成了,它就不会被加入到监听列表中.
但是这并不是一个很严重的问题,因为(即使加入了)它会被reaper线程定时地清理掉.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class DelayedOperationPurgatory[T <: DelayedOperation](){
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
// 第一次调用Operatiron的tryComplete,即第一次尝试完成这个操作
var isCompletedByMe = operation synchronized operation.tryComplete()
if (isCompletedByMe) return true // 操作完成了,不需要加入到缓存中

var watchCreated = false // 这个操作是否被监视过
for(key <- watchKeys) { // 所有的key都监视这个相同的操作
if (operation.isCompleted()) return false // If the operation is already completed, stop adding it to the rest of the watcher list.
watchForOperation(key, operation) // 将Operation加入到key的监视列表中
if (!watchCreated) { // 有可能其他线程先执行,导致watchCreated=true
watchCreated = true // 因为这是在所有的key中设置,一旦为true,其他key就没有机会再执行了
estimatedTotalOperations.incrementAndGet() // 所以一个操作,即使有多个keys,这里也只会增加一次
}
}

// 第二次尝试完成这个操作
isCompletedByMe = operation synchronized operation.tryComplete()
if (isCompletedByMe) return true // 只要能完成,就立即结束

// if it cannot be completed by now and hence is watched, add to the expire queue also
if (! operation.isCompleted()) {
timeoutTimer.add(operation) // 现在还没有完成,加入到失效队列中
if (operation.isCompleted()) operation.cancel() // cancel the timer task
}
false
}
}

k_tryCompleteElseWatch

DelayedProduce.tryComplete

在DelayedOperationPurgatory中使用的是父类DelayedOperation,子类要实现自己的tryComplete检查操作是否完成.

注意DelayedProduce对象在创建的时候需要有ProduceMetadata:包括了acks和Produce关于所有Partition的状态.
ProducePartitionStatus有个很重要的字段acksPending表示acks是否还没有完成/正在进行中(出错或者完成了值为true).

注意:Producer发送请求到Kafka,只要这过程中出现任何错误,都会立即返回错误给客户端,而不需要等待其他任何条件.
这里的ProduceMetadata是针对一个请求的所有Partition.要确保检查所有Partition的acks是否都完成了(如果都没出错的话)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
extends DelayedOperation(delayMs) {

// first update the acks pending variable according to the error code
produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
if (status.responseStatus.error == ErrorMapping.NoError) {
// Timeout error state will be cleared when required acks are received
status.acksPending = true
status.responseStatus.error = ErrorMapping.RequestTimedOutCode
} else {
status.acksPending = false
}
}

/**
* The delayed produce operation can be completed if every partition it produces to is satisfied by one of the following:
*
* Case A: This broker is no longer the leader: set an error in response
* Case B: This broker is the leader:
* B.1 - If there was a local error thrown while checking if at least requiredAcks replicas have caught up to this operation: set an error in response
* B.2 - Otherwise, set the response with no error.
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks 检查请求中的每个Partition,是否有正在进行的acks.
produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
// skip those partitions that have already been satisfied 跳过已经满足条件的Partitions(即acksPending=false的,可能是出错了)
// 如果有正在进行中的acks, 表示这个Partition还没有成功acks. 则检查这个Partition是否满足条件.
if (status.acksPending) {
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
// 检查Partition是否有足够的副本, 以及在这个过程中是否有错误发生
val (hasEnough, errorCode) = partitionOpt match {
case Some(partition) => partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case None => (false, ErrorMapping.UnknownTopicOrPartitionCode) // Case A
}
if (errorCode != ErrorMapping.NoError) { // Case B.1 ==> Local Error
status.acksPending = false
status.responseStatus.error = errorCode
} else if (hasEnough) { // Case B.2 ==> 没有错误,并且有足够的副本,也可以结束!
status.acksPending = false
status.responseStatus.error = ErrorMapping.NoError
}
}
}

// check if each partition has satisfied at lease one of case A and case B
// 只有所有/每个Partition的acksPending都等于false, 就可以执行forceComplete. 比如出现了错误,或者操作已经完成了!
if (!produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete()
else false
}
}

在前面DelayedOperation中我们说过forceComplete的两次调用时机就包括这里:在tryComplete时如果操作完成就调用.

复习一下forceComplete的另一次调用是在超时的时候. 我们还没看到这里,不过可以提前透露下是在Timer相关类中.

操作完成指的是这个请求的所有ProducerPartitionStatus的acksPending都等于false.只要有一个为true都不行.
换成下面的形式就比较容易理解了,只要存在一个Partition的acksPending=true,那么tryComplete就表示没有完成.

1
2
if (produceMetadata.produceStatus.values.exists(p => p.acksPending)) false
else forceComplete() // 只有不存在acksPending=true,才会执行forceComplete.

最后, onComplete会在forceComplete中,由唯一的线程只调用一次:将completed从false更新到true状态的那个线程.
由于Producer的请求必须要返回响应给Producer,不管是因为超时还是出错,还是成功完成,将响应状态传给回调函数执行.

1
2
3
4
5
// Upon completion, return the current response status along with the error code per partition
override def onComplete() {
val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus)
}

Partition.checkEnoughReplicasReachOffset

以生产请求DelayedProduce.tryComplete为例,它的完成必须等待所有ISR完成复制(或者到达客户端的超时时间).
DelayedProduce每调用一次tryComplete,就要查下是否有足够副本超过本批消息的最后一条offset+1(requiredOffset)
因为这批消息追加到Leader之后,ISR中的副本的offset也要达到requiredOffset才表示它们完全复制了这批消息.

如果说HW没有更新的话(ISR都没有ack导致的),leaderReplica的HW肯定会比requiredOffset小.
因为requiredOffset表示的是这批消息最后一条消息的offset,它一定是比Leader现有的HW要大.
如果ISR都ack之后,才会更新Leader的HW,这时候才有可能将HW设置为requiredOffset一样的大小.
注意是有可能,因为有时候可能只是增加了一点点,在HW < requiredOffset的情况下,都是没有完全复制的.

k_offset_check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Note that this method will only be called if requiredAcks = -1 只有ack=-1时才需要判断,因为其他值并不需要ISR发送ack给Leader
// and we are waiting for all replicas in ISR to be fully caught up to the (local) leader's offset 完全赶上Leader的offset
// corresponding to this produce request before we acknowledge the produce request. 这个offset是Leader在这次ProduceRequest中最后一条消息的offset
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = {
leaderReplicaIfLocal() match { // 还是由Leader Partition所在的节点做这部分工作
case Some(leaderReplica) =>
val curInSyncReplicas = inSyncReplicas // keep the current immutable replica list reference
val numAcks = curInSyncReplicas.count(r => { // count计算返回值为true的个数,即副本的offset超过要求的offset.
if (!r.isLocal) // ISR节点中不是本地的, 即follower replicas
if (r.logEndOffset.messageOffset >= requiredOffset) true else false //副本的offset等于required可以理解,大于是什么意思?
else true // also count the local (leader) replica. Leader自己也在ISR中的
})
//----------------------------------------------上面计算出来的numAcks对返回结果没有影响.
val minIsr = leaderReplica.log.get.config.minInSyncReplicas
// 只有Leader的HW >= requiredOffset的时候,说明这个ProduceRequest请求的这批消息都已经被ISR中的所有replica完全复制了.
// 因为Leader的HW是针对整个Partition的, 如果发生了两次生产请求,而requiredOffset还是第一次的,在第二次请求都被完全同步后,HW就比第一次的requiredOffset要大了.
if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
// The topic may be configured not to accept messages if there are not enough replicas in ISR. 有些topic可能设置ISR的副本数量最少有minIsr个.
// in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
if (minIsr <= curInSyncReplicas.size) {
(true, ErrorMapping.NoError)
} else {
(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
}
} else (false, ErrorMapping.NoError) // 如果ISR都没有ack,不会更新Leader的HW,就会导致HW<requiredOffset
}
}

超时触发forceComplete

DelayedOperation是一个TimerTask定时任务,run方法调用forceComplete,如果操作还没有完成即completed=false,
就会执行cancel和onComplete方法. 返回true后, 继续执行onExpiration(),即onComplete和onExpiration都执行了?
这样好像有点怪.因为一般onComplete和onExpiration只能二选一(正常完成或超时).不过超时而被强制完成也是一种完成吧.

k_forceComplete2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging {
private val completed = new AtomicBoolean(false)

def forceComplete(): Boolean = {
if (completed.compareAndSet(false, true)) {
cancel() // cancel the timeout timer
onComplete()
true
} else {
false
}
}

// run() method defines a task that is executed on timeout
override def run(): Unit = {
if (forceComplete()) onExpiration()
}

不过我们更关心的是什么时候执行这个定时任务,如果一生成DelayedOperation就调用forceComplete,那还要Purgatory毛用啊.
以DelayedOperationPurgatory.tryCompleteElseWatch中将DelayedOperation加入到定时器Timer为线索.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = System.currentTimeMillis) {
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
private[this] val taskCounter = new AtomicInteger(0)
private[this] val timingWheel = new TimingWheel(tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue)

def add(timerTask: TimerTask): Unit = {
addTimerTaskEntry(new TimerTaskEntry(timerTask))
}

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}
}

假设TimerTask封装的TimerTaskEntry没有成功加入到TimingWheel中,说明这个操作对应的任务本身已经失效或者被取消了.
于是将这个任务提交给taskExecutor执行,然后就触发了TimerTask即DelayedOperation的run方法,接着是forceComplete.
这就是forceComplete的另一处调用的地方,即超时的时候调用.现在我们已经分析了forceComplete的两处触发时机了.Oh,Yeah!

TimingWheel时间轮

TODO

Ref


文章目录
  1. 1. DelayedOperation
    1. 1.1. ProducerRequest & DelayedProduce
    2. 1.2. FetchRequest & DelayedFetch
    3. 1.3. 多个Partition
    4. 1.4. responseCallback
    5. 1.5. Purgatory & Cache
  2. 2. DelayedOperation接口
  3. 3. Timer & Reaper & Watcher
  4. 4. DelayedOperationPurgatory.tryCompleteElseWatch
    1. 4.1. DelayedProduce.tryComplete
    2. 4.2. Partition.checkEnoughReplicasReachOffset
  5. 5. 超时触发forceComplete
  6. 6. TimingWheel时间轮
  7. 7. Ref