译:Kafka消费者的Offset管理

https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

介绍

实现内置的消费者offset管理功能有两种方式:

1.让消费者创建一个内置的producer示例,并且像生产消息一样发送offsets
2.让消费者使用offset commit请求API来提交offsets. 我们让brokers记录offset消息到磁盘上,
并且使用消息的复制特征,达到持久性和可用性. 所以实际上在broker端的offset提交处理和生产者请求
的处理是一样的逻辑.在broker端重构代码可以使我们重用已有的代码.

Offset Management

Offsets topic

消费者的offset提交请求会作为生产请求发送到”__offsets”的topic. 一个offset提交消息包括下面字段:

fields content
Key Consumer Group, topic, partition
Payload Offset, metadata, timestamp

offset提交消息会根据消费组的key(消费组名称)进行分区. 对于一个给定的消费组,它的所有消息都会发送到唯一的broker,
这对offset的抓取请求会更加容易,因为不需要以分散-收集的方式对多个brokers发送请求并收集结果(只针对一个broker).

k_offsets_topics

这种方式的缺点是如果任何一个消费组负责消费很多数量的partitions,会多某些brokers造成性能瓶颈,会增加需要
处理的offset提交请求. 不过offsets topic的消息量并不会很大,所以这应该不会成为一个问题.

Offset managers

Offset manager负责存储,抓取,和维护消费者的offsets. 每个broker都有一个offset manager实例. 有两种具体的实现:

  • ZookeeperOffsetManager: 调用zookeeper来存储和接收offset
  • DefaultOffsetManager: 提供消费者offsets内置的offset管理

在config/server.properties中新增了”offset.storage”属性,用来在两种实现中选择一种. 对于DefaultOffsetManager
还有两个其他属性: “offsets.topic.replication.factor”和”offsets.topic.num.partitions”,默认值都是1.
这两个属性会用来自动地创建”offsets topic”即”__offsets” topic(这种方式将offset作为和普通消息一样的topic).
理想情况下,”offsets topic”的副本因子(比如3或4个副本)应该比其他正常的kafka topic要高,目的是获得更高的可用性.

offsets是怎么保存到DefaultOffsetManager?

除了将offset作为logs保存到磁盘上,DefaultOffsetManager维护了一张能快速服务于offset抓取请求的consumer offsets表.
broker在offsets表(缓存)中包含仅仅是”offsets topic”的partitions中属于leader partition对应的条目(存储的是offset).
(即这些partitions的replicas只会在他们自己的logs中有offset信息,并不会对应offset表中的条目).

offsets topic的logs的segment大小相比正常的topics设置的很低,默认”offsets.topic.segment.bytes”=10MB.
这样LogCleaner会更快地选出未清理的segments,结果就是能够更频繁地清理. 同样log的大小设置的越小也有助于
减少读取logs的时间,在把读取出来的logs信息放到offsets缓存中也更快(比如重启kafka时会重新加载日志文件).

Offsets manager interface

下面是offset manager接口的概要.

Method DefaultOffsetManager ZookeeperOffsetManager
startup 初始化内部变量(zkClient和log manager) 初始化zkClient对象
getOffset 从offset table中读取 从zookeeper中读取
putOffset 添加offset到offset table 写入zookeeper
triggerLoadOffsets 当leader变化或者已经存在的broker成为某些partition的leader,需要从磁盘上为指定的partition加载logs,并形成offsets table nothing
cleanup leader变化或者某些partition的leader变成了follower,需要清理offsets table
shutdown 关闭scheduler,上面的triggerLoadOffsets是个异步线程 nothing

Offset Commit实现

消费端

一条offset提交消息会作为生产请求.当消费者启动时,会为”offsets topic”创建一个消费者. 下面是内置的生产者的一些属性:

|property|value|explain|
|producer.type|sync|可以使用异步.但是使用同步可以避免延迟的生产请求(因为是批量消息),并且我们需要立即知道offset消息是否被broker成功接收
|request.required.acks|-1|确保所有的replicas和leader是同步的,并且能看到所有的offset消息
|key.serializer.class|StringEncoder|key和payload都是strings

注意我们没有对提交的offset消息进行压缩,因为每条消息本身大小是很小的,如果压缩了反而适得其反.
目前key和offset的值通过纯文本方式传递. 我们可以转换为更加紧凑的二进制协议,而不是把
Long类型的offset和Int类型的partition作为字符串. 当然在不断演进时还要考虑版本和格式协议.

broker端

broker把接收到的offset提交信息当做一个正常的生产请求,对offset请求的处理和正常的生产者请求处理方式是一样的.
一旦将数据追加到leader的本地日志中,并且所有的replicas都赶上leader.leader检查生产请求是”offsets topic”,
(因为broker端的处理逻辑针对offset请求和普通生产请求是一样的,如果是offset请求,还需要有不同的处理分支)
它就会要求offset manager添加这个offset(对于延迟的生产请求,更新操作会在延迟的生产请求被完成的时候).
因为设置了acks=-1,只有当这些offsets成功地复制到ISR中的所有brokers,才会被提交给offset manager.

k_offset_commit_broker

offsets topic会在第一次的offset提交请求时被自动创建.即一个集群中的第一个消费者会触发创建过程.

Offset Fetch实现

消费端

消费者启动时,会首先创建到任意一个存活的brokers的通道.因此消费者会发送它所有”OffsetFetchRequest”
到这个随机选中的broker. 如果出现错误,这个通道就会被关闭,并重新创建一个随机的通道.

broker端

一个Offset抓取请求包含了多个topic-partitions. 接收请求的broker可能有也可能没有请求的partitions的offset信息.
因此接收请求的brokers也会和其他broker通信. 一个通道连接池会用来转发请求给partition的leader broker.
下面是一个broker在接收到一个offset抓取请求后的步骤:

  • 接收请求的broker首先决定”offset topic”的哪个partition负责这个请求
  • 从broker的leader cache中找出对应partition的leader(会在controller的每次metadata更新请求中更新缓存)
  • 如果接收请求的broker就是leader,它会从自己的offset manager中读取出offset,并添加到响应中
    如果offset不存在,返回UnknownTopicOrPartitionCode
    如果broker正在加载offsets table,返回OffsetLoadingCode.消费者受到这个状态码会在之后重试
  • 如果接收请求的broker不是指定topic-partition的leader,它会将OffsetFetchRequest转发给这个partition的当前leader
  • 如果”offsets topic”这个时候不存在,它会尝试自动创建,在创建成果后,会返回offset=-1

k_offset_fetch_broker

Broker failover

当broker启动时,它会按照正常流程执行. 但是当对”offsets topic”的某些partition而言,broker状态发生改变,
即被当做partition的leader或者follower时,LeaderAndIsrRequest请求会被触发:

  • 如果broker是”offsets topic”中一些partitions的leader, broker会读取指定partition的logs文件,
    并将offsets加载到offset table缓存中.
    1) 任何对这些partition的提交请求仍然会更新offsets表.我们会防止日志文件中过期的offsets覆盖最近的提交请求的offsets.
    2) 被”offsets topic”中partition管理的offset抓取请求的keys直到加载成功之前是不会被使用的.
    broker会返回OffsetLoadingCode的OffsetFetchResponse给消费者客户端.
  • 如果broker是follower: 和其他正常的kafka topic一样,follower会从leader中抓取数据.
    由于follower的offset manager不再负责partitions,它们会在cleanup方法被调用时清理数据.

Offset清理

Offset移除协议

带有null的payload的OffsetCommitRequest表示指定的key(group-topic-partition)的offset应该被删除.
假设一个broker拥有offsets topic的partition X,收到了null payloads的这样一个请求.
正常的append-to-logs流程会继续运行,并且followers也会同步leader的数据. 当offset manager提交offset时,
它检查到payload是”null”,就会删除zk和offsets表中有这个key的条目.
如果一个消费组的所有条目都被删除了,就会删除zk中当前消费组对应的znode(/consumers/[group_id]).
这只是覆盖了zk和offset表的清理. 不过数据仍然存在于offsets topic的logs中.
log cleaner线程运行在”dedupe”模式时也会对offsets topic的logs使用null payload清理数据.

手动清理

用户可以通过自定义一个要删除的offset的消费组. 该工具首先和zk进行联系得到如下信息:

  • 请求的消费组是否存在,如果不存在,则停止(消费组必须存在)
  • 消费组有没有存活的消费者, 如果有,也停止(消费组必须没有存活的消费者)
  • 从zk中得到这个消费组下所有的消费topics(/consumers/[group]/owners)
  • 得到每个topics的partition编号

在得到上面这些信息后发送一个OffsetCommitRequest,其中key是所有的group-topic-partition,
value是null payloads. 移除offset的过程和上面的移除协议是一样的.
根据时间的移除策略也是可行的,只要在OffsetFetchResponse中返回每个offset的时间撮.
(offset抓取之后,就开始重置计时器,如果在指定时间内这个partition没有再被fetch,就可以被移除了)

自动清理

在OffsetManager中有一个单独的线程周期性地运行着自动清理offset. 该线程会读取当前broker中对应了
offsets topic所有partitions的logs,对每个key(group-topic-partition),会保存logs中
最近发现的条目的时间撮. 然后(清理线程)会向自己(broker)发送一个OffsetCommitRequest.
这个请求的条件应该满足:(当前时间撮-最近offset条目的时间撮)>OFFSET_RETENTION_PERIOD(保留间隔)

当所有的brokers记录了指定的group-topic-partition的payload=null,毫无疑问可以放心地删除.
即使broker在log cleaner在完成清理之前当掉了不过又马上恢复过来. 在加载阶段就会防止删除的offsets
被加入到offset表. 有一点要注意,如果broker记录了要提交的消息为null payload到磁盘上,但是在
清理zk之前失败了, 会导致zk中存在过期的条目. 在ZookeeperOffsetManager的zk加载阶段需要处理这种情况.


文章目录
  1. 1. 介绍
  2. 2. Offset Management
    1. 2.1. Offsets topic
    2. 2.2. Offset managers
    3. 2.3. Offsets manager interface
  3. 3. Offset Commit实现
    1. 3.1. 消费端
    2. 3.2. broker端
  4. 4. Offset Fetch实现
    1. 4.1. 消费端
    2. 4.2. broker端
  5. 5. Broker failover
  6. 6. Offset清理
    1. 6.1. Offset移除协议
    2. 6.2. 手动清理
    3. 6.3. 自动清理