译:Kafka权威指南-第四章-消费者

Kafka权威指南中文翻译:https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html

Applications that need to read data from Kafka use KafkaConsumer to subscribe to Kafka topics and receive messages from these topics. Reading data from Kafka is a bit different than reading data from other messaging systems and there are few unique concepts and ideas involved. It is difficult to understand how to use the consumer API without understanding these concepts first. So we’ll start by explaining some of the important concepts, and then we’ll go through some examples that show the different ways the consumer APIs can be used to implement applications with different requirements.

应用程序为了从Kafka中读取数据,会使用KafkaConsumer订阅Kafka的主题,然后就可以从这些主题中接收到消息。从Kafka读取数据和其他消息读取数据有点不同,有一些概念需要事先弄清楚,否则就对如何使用消费者API不知所措。下面我们会先解释一些重要的概念,然后通过示例的方式展示消费API的不同用法,从而实现不同的需求。

概念

消费者和消费组

Suppose you have an application that needs to read messages from a Kafka topic, run some validations against them and write the results to another data store. In this case your application will create a consumer object, subscribe to the appropriate topic and start receiving messages, validating them and writing the results. This can work well for a while, but what if the rate at which producers write messages to the topic exceed the rate at which your application can validate them? If you are limited to a single consumer reading and processing the data, your application may fall farther and farther behind, unable to keep up with the rate of incoming messages. Obviously there is a need to scale consumption from topics. Just like multiple producers can write to the same topic, we need to allow multiple consumers to read from the same topic, splitting the data between them.

Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and belong to the same consumer group, then each consumer in the group will receive messages from a different subset of the partitions in the topic.

假设你的应用程序要从Kafka的一个主题中读取消息,对消息进行验证,然后将结果写入到其他存储系统中。你的做法会是:创建一个消费者对象,订阅指定的主题,然后开始接收消息、数据验证、结果输出。这种做法在一段时间内可能工作的很好,但是如果生产者写入消息的速度超过应用程序执行验证逻辑的速度怎么办?如果你只有一个消费者负责读取和处理数据,消费者的读取进度最终会越来越跟不上生产者的写入进度,很显然我们需要对主题的消费进行扩展。就像多个生产者可以写到同一个主题一样,我们应该允许多个消费者同时从一个主题读取数据:通过将数据进行分离,每个消费者只负责一部分数据,达到负载均衡的目的。
Kafka的消费者通常都属于某一个消费组的一部分,当多个消费者订阅了一个主题并且属于同一个消费组,那么消费组中的每个消费者都会接收到主题的不同子集分区(一个主题分成多个分区,每个消费者分配到了不同的分区)。

假设主题t1有4个分区,刚开始我们创建了一个消费者c1,并且它是消费组g1的唯一成员,c1订阅了主题t1。消费者c1会获取到t1所有4个分区的消息。

如果添加了新的消费者c1到消费组g1,现在每个消费者(c1、c2)只会各自得到两个分区的消息,比如分区0和分区2的消息会到c1,分区1和分区3的消息会到c2。

如果消费组g1有4个消费者,每个消费者都会读取一个分区的消息。

如果再添加更多的消费者,消费者的数量比分区数量还要多,那么有一些消费者就会空闲而得不到任何消息。

线性扩展消费Kafka主题的数据的主要解决方式是为消费组增加更多的消费者。Kafka消费者通常会做一些延迟较高的操作,比如写入数据库或者HDFS、做一些耗时的数据计算。这种情况下,单一的消费者无法跟上数据流入Kafka主题的速度,所以增加更多的消费者一起共享负载,并且每个消费者只拥有分区和消息的子集,才是扩展消费能力解决之道。因此为主题创建很多分区是一个好的设计,它允许在负载增加的时候可以随时增加更多的消费者(来均衡负载)。不过注意,消费者的数量不能超过主题的分区数,否则有些消费者永远处于空闲状态。

除了通过添加消费者来扩展单一的应用程序(的处理能力),多个应用程序需要从同一个主题中读取数据也是很常见。实际上Kafka的一个设计目标就是确保数据生产到Kafka的主题后,对多个应用场景都是可用的。这种情况下我们希望每个业务场景对应的应用程序都能够得到所有的消息,而不是一部分消息子集。为了确保一个应用程序得到主题的所有消息,你要确保每个应用程序有单独的消费组。和其他消息系统不同的是,Kafka可以在不牺牲性能的前提下大规模扩展地消费者和消费组。

上面的示例中,如果添加了只有一个消费者的新消费组g2,这个消费者就会得到主题t1的所有消息,而它和消费组g1在做什么事情毫无关系。就像消费组g1一样,消费组g2也可以有多个消费者,每个消费者也可以获得所有分区的子集,从整体上来说消费组g2仍然会得到所有的消息,而不会受其他消费组的影响。

总结下上面的操作过程,你为每个需要读取一个或多个主题所有消息的应用程序都创建了新的消费组,然后为已有的消费组添加消费者来动态地扩展从主题中读取和处理消息的能力,每个新增加的消费者都只会得到消息的子集。

消费组平衡

As we’ve seen in the previous section, consumers in a consumer group share ownership of the partitions in the topics they subscribe to. When we add a new consumer to the group it starts consuming messages from partitions which were previously consumed by another consumer. The same thing happens when a consumer shuts down or crashes, it leaves the group, and the partitions it used to consume will be consumed by one of the remaining consumers. Reassignment of partitions to consumers also happen when the topics the consumer group is consuming are modified, for example if an administrator adds new partitions.

上面我们看到了消费组中的消费者共享了它们所订阅的主题的分区。当为消费组添加新消费者时,它会从之前其他消费者消费的分区开始消费消息。同样当发生消费者关闭、进程挂掉、离开消费组,它所使用的分区就会被其他剩余的消费者所消费。为消费者重新分配分区同样也会发生在消费组订阅的主题被修改时,比如管理员会添加一个新的分区。

The event in which partition ownership is moved from one consumer to another is called a rebalance. Rebalances are important since they provide the consumer group with both high-availability and scalability (allowing us to easily and safely add and remove consumers), but in the normal course of events they are fairly undesirable. During a rebalance, consumers can’t consume messaged, so a rebalance is in effect a short window of unavailability on the entire consumer group. In addition, when partitions are moved from one consumer to another the consumer loses its current state, if it was caching any data, it will need to refresh its caches - slowing down our application until the consumer sets up its state again. Throughout this chapter we will discuss how to safely handle rebalances and how to avoid unnecessary rebalances.

分区的所有权从一个消费者转移给另一个消费者,这个事件叫做平衡(rebalance)。平衡操作的重要性不言而喻,因为它提供了消费组的高可用、可扩展性(允许我们添加或删除消费者变得简单和安全),但对于事件的处理则有点不受欢迎。在平衡期间,消费者不能消费消息,所以平衡实际上造成了消费组短暂的不可用窗口。另外,当分区从一个消费者转移到另一个消费者时会丢失当前的状态,如果它缓存了数据的话,就需要重新刷新缓存,这会使得我们的应用程序响应变慢,知道消费者重新恢复到正常的状态。本章我们会讨论如何安全地处理平衡操作,并且怎么避免不必要的平衡。

The way consumers maintain their membership in a consumer group and their ownership on the partitions assigned to them is by sending heartbeats to a Kafka broker designated as the Group Coordinator (note that this broker can be different for different consumer groups). As long the consumer is sending heartbeats in regular intervals, it is assumed to be alive, well and processing messages from its partitions. In fact, the act of polling for messages is what causes the consumer to send those heartbeats. If the consumer stops sending heartbeats for long enough, its session will time out and the group coordinator will consider it dead and trigger a rebalance. Note that if a consumer crashed and stopped processing messages, it will take the group coordinator few seconds without heartbeats to decide it is dead and trigger the rebalance. During those seconds, no messages will be processed from the partitions owned by the dead consumer. When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing. Later in this chapter we will discuss configuration options that control heartbeat frequency and session timeouts and how to set those to match your requirements.

消费者为了维护它们在消费组中的成员地位,以及分配给它们的分区所有权,是通过发送心跳给被指定为消费组协调者(Group Coordinator)的一个Kafka代理(Broker),注意这个代理对于不同的消费组都是不同的。只要消费者能够在正常的时间间隔内发送心跳,它就会被认为是存活的、运行良好,就可以处理分区的消息。实际上消费者轮询消息的动作就是消费者发送心跳的原因。如果消费者很长时间没有发送心跳,它的会话会超时(服务端的协调者会保持每个消费者的连接会话),协调者就会认为消费者挂掉,从而触发一次平衡操作。注意如果消费者自己崩溃并且停止处理消息,协调者会在数秒之后判断消费者没有心跳,才决定它挂掉了并且触发平衡。在这数秒的时间段内,被挂掉消费者拥有的分区上不会处理任何消息。而如果是优雅地关闭一个消费者时,消费者会通知消费组说它正在离开,协调者就会立即触发平衡,从而减少了消息无法被处理的间隙。本章的后面我们会讨论一些关于控制心跳频率、会话超时的配置,以及如何设置它们来匹配我们的需求。

HOW DOES THE PROCESS OF ASSIGNING PARTITIONS TO BROKERS WORK?
When a consumer wants to join a group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator (this will include all consumers that sent a heartbeat recently and are therefore considered alive) and it is responsible for assigning a subset of partitions to each consumer. It uses an implementation of PartitionAssignor interface to decide which partitions should be handled by which consumer. Kafka has two built-in partition assignment policies, which we will discuss in more depth in the configuration section. After deciding on the partition assignment, the consumer leader sends the list of assignments to the GroupCoordinator which sends this information to all the consumers. Each consumer only sees his own assignment - the leader is the only client process that has the full list of consumers in the group and their assignments. This process repeats every time a rebalance happens.

为消费者分配Partition是怎么工作的?
当一个消费者想要加入一个消费组,它会发送JoinGroup请求给消费组的协调者,第一个加入消费组的消费者会成为组的领导者(Leader)。领导者会从协调者接收到所有的消费者(包括最近发送了心跳,被认为是存活的所有消费者),并且负责为每个消费者分配分区子集。它会使用PartitionAssignor接口来决定哪个消费者应用处理哪些分区。Kafka内置了两种分区分配策略,后面在配置部分会详细介绍。在决定了分区分配之后,领导者发送每个消费者的分配列表给协调者,协调者会发送这些分配信息给所有的消费者。每个消费者只会看到它自己的分配结果。领导者是唯一有所有消费者列表和它们的分配信息的客户端进程。上面这个过程在每次平衡操作发生时都会重复执行。

创建一个新的消费者

消费者开始消费记录的第一步是创建一个KafkaConsumer实例,创建KafkaConsumer类似于创建KafkaProducer,首先创建一个Properties实例,传递消费者的配置属性。本章后面我们会讨论所有的属性,这里只需要三个必须的属性:bootstrap.serverskey.deserializervalue.deserializer

第一个属性bootstrap.servers指向Kafka集群的连接地址,它和KafkaProducer的使用方式一样。剩余的两个属性key.deserializervalue.deserializer和生产者的serializers类似。

还有一个属性group.id不是必须的,但现在我们假设它是必须的。group.id指定了KafkaConsumer实例所属的消费组。虽然创建不属于任何消费组的消费者也是可行的,但这种情况很少见,所以本章我们都会假设消费者是消费组的一部分。下面的代码实例了如何创建一个KafkaConsumer:

1
2
3
4
5
6
7
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

如果你读过第三章创建生产者的代码,你会发现这里看到的大部分代码都很熟悉。我们打算消费的消息格式都是字符串类型(反序列化:Kafka中存储的是二进制字节类型,Kafka内部会负责将字节类型转换为字符串类型,所以消费者读取到的消息是字符串类型),所以我们使用了内置的StringDeserializer反序列化器,最后创建的KafkaConsumer泛型类型也是字符串(KafkaConsumer类上的两个泛型类型分别表示消息的键值类型)。只有group.id这个属性可能你没见过,它表示的是这个消费者(作为消费组的一部分)所属的消费组名称。

订阅主题

一旦创建完消费组实例,下一步是让消费者订阅一个或多个主题。subscribe()方法会将多个主题的列表作为一个参数,使用起来非常简单,下面的代码创建了只有一个元素的列表,订阅的主题名称叫做”customerCountries”。

1
consumer.subscribe(Collections.singletonList("customerCountries"));

也可以使用正则表达式调用subscribe()方法,如果有人创建了和正则表达式匹配的新主题,平衡操作基本上会立即发生,消费者就会从新主题中立即开始消费。这种方式对于需要从多个主题消费消息的应用程序非常有用,这样就可以处理不同主题包含的不同类型的数据。为了订阅所有的test主题,调用方式如下:

1
consumer.subscribe("test.*");

轮询循环

消费者应用程序编程接口(API)的核心是一个简单的循环,会负责从服务端拉取更多的数据。一旦消费者订阅了主题,轮询循环会处理所有的协调细节、分区平衡、心跳、数据获取。返回给开发者的只是很简洁的API,仅仅返回分配分区的可用数据。消费者客户端代码的主体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
try {
while (true) { //1
ConsumerRecords<String, String> records = consumer.poll(100); //2
for (ConsumerRecord<String, String> record : records) { //3
log.debug("topic=%s,partition=%s,offset=%d,customer=%s,country=%s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());

int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)

JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4)) //4
}
}
} finally {
consumer.close(); //5
}
  1. 这里确实是一个死循环,消费者通常都是长时间运行的应用程序,会持续从Kafka中拉取数据。后面我们会展示如何干净地退出循环,并且关闭消费者。
  2. 这一行是本章最重要的一行代码。就像鲨鱼要么保持不断游动要么死亡,消费者必须一直轮询Kafka,否则就会被认为挂掉了,就会导致消费的分区被组中其他的消费者继续处理。
  3. poll()方法返回一批记录集。每条记录会包含这条记录来源于哪个主题和分区、这条记录在分区中的偏移量、当然还有这条记录的键值。通常我们会迭代列表,并且处理每一条单机的记录。poll()方法也可以接受一个超时时间参数,表示执行轮询最多花费多长时间,不管轮询的结果有没有数据。这个超时值通常由应用程序是否需要快速响应来决定,也就是你要在轮询之后多快返回对主线程的控制(消费者的轮询是单线程阻塞的,所以如果想要尽快在拉取到消息后马上处理,可以缩短超时时间,当时间超过后,轮询结束,就可以执行消息处理逻辑)。
  4. 消息处理通常最后会写入到数据存储系统或者更新已有的记录。本例的目标是为了跟踪每个国家的顾客数量,所以我们更新了字典表然后将结果打印为JSON字符串,实际应用中一般会将更新记录写入到存储系统中。
  5. 总是在退出时执行close()方法。这会关闭网络连接和Socket,并且会立即触发平衡操作,而不是让协调者来发现消费者可能因为挂掉而没有及时发送心跳,那样会等待更长的时间,也会导致分区子集的消息在更长的时间内不能被任何消费者所消费。

轮询操作不仅仅做了获取数据这个工作。当新的消费者第一次调用poll()方法时,它会负责找到协调者、加入消费组、接收到分配的分区。如果需要做平衡操作,也是在poll()方法中处理的。当然用来表示消费者存活状态的心跳请求也是在poll()中发送的。基于这些原因,我们要确保迭代处理消息时要足够快速和高效。

注意:你不能在一个线程中拥有属于同一个消费者的多个消费者,而且也不能在同一个消费者中使用多线程。一个线程对应一个消费者是最基本的原则(这里的线程指的是主线程,而不是消费者中的拉取线程,一个消费者实际上是可以有多个拉取线程的)。

在一个应用程序中如果要处理同一个消费组的多个消费者,你需要保证每个消费者运行在自己的线程中。通常将消费者逻辑保证成自定义的对象,然后使用Java的ExecutorService来启动各自的消费者线程。

提交和偏移量

无论什么时候我们调用poll()方法时,它会返回已经写入到Kafka,但是消费组的消费者还没有读取的记录。这就意味着我们需要有一种方式来跟踪消费者读取到了哪条记录。前面讨论过,Kafka不同于其他消息系统的一个独有特性是,它不会从消费者中跟踪应答。相反,它允许消费者使用Kafka来跟踪每个分区的位置(偏移量)。我们把更新分区中当前位置这个动作叫做提交(commit)。

那么消费者如何提交偏移量呢?它会往Kafka的一个特殊主题__consumer_offsets生产消息,这个主题保存了每个分区的提交位置。只要你的消费者是存活的、正在运行,也不会对它有任何影响。但是如果消费者挂了或者新消费者加入消费组,就会触发平衡。在平衡过后,每个消费者可能会被分配到和之前所处理的不同的新分区集合。为了明确要从哪里开始工作,消费者会读取每个分区最近的提交位置,然后从那个位置继续。

如果分区的提交位置比消费者客户端处理的最近一条消息的位置要小,那么在最近处理消息的位置和提交位置之间的消息就会被(消费者)处理两次。

相反如果提交位置比消费者实际处理的最近一条消息位置大,那么这两个位置中间的所有消息就都不会被消费组处理了(当然我们要尽量避免这种丢数据的场景)。

可见对偏移量(即上文说的位置)的管理对客户端应用程序而言影响很大。KafkaConsumer的使用接口提供了多种提交偏移量的方式。

自动提交偏移量

提交偏移量最简单的方式是让消费者为你做这件事情。如果你设置了enable.auto.commit=true,那么每隔5秒消费者就会提交客户端轮询结果的最大偏移量。5秒是一个默认值,通过配置项auto.commit.interval.ms控制。就像消费者的其他逻辑一样,自动提交偏移量也是由poll()轮询驱动的。当你调用poll()轮询时,消费者会检查是否可以开始提交,如果需要提交,就会在最近一次轮询返回时提交偏移量。不过,在使用这个简便的选项之前,理解这种方式的一些不良后果是非常重要的。

Consider that by defaults automatic commit occurs every 5 seconds. Suppose that we are 3 seconds after the most recent commit and a rebalance is triggered. After the rebalancing all consumers will start consuming from the last offset committed. In this case the offset is 3 seconds old, so all the events that arrived in those 3 seconds will be processed twice. It is possible to configure the commit interval to commit more frequently and reduce the window in which records will be duplicated, but it is impossible to completely eliminate them.

Note that with auto-commit enabled, a call to poll will always commit the last offset returned by the previous poll. It doesn’t know which events were actually processed, so it is critical to always process all the events returned by poll before calling poll again (or before calling close(), it will also automatically commit offsets). This is usually not an issue, but pay attention when you handle exceptions or otherwise exit the poll loop prematurely.

考虑默认场景下自动提交会每隔5秒发生一次。假设在最近一次提交过后3秒发生了一次平衡操作。平衡过后所有的消费者都会从上次最近提交的偏移量开始消费。这种场景下偏移量是3秒之前的了,这就会导致在这3秒内到达的所有事件都会被处理两次。尽管可以设置更短的提交间隔,以便更频繁地提交,减少消息被重复处理的窗口,但还是无法从根本上彻底解决数据重复处理的问题。

注意在开启自动提交时,调用poll总是会提交上一次poll的最近偏移量。但是它并不知道都实际处理了哪些事件,所以客户端应该总是要保证在调用新的poll之前要处理上一次poll返回的所有事件(或者在调用close之前,也会自动提交偏移量)。通常这不是一个严重的问题,但在处理异常或者过早地退出轮询循环时需要特别注意。

自动提交偏移量非常方便,但是它的缺点也很明显:它不能够给予开发者足够的控制权来避免消息的重复处理。

手动提交偏移量

Most developers use to exercise more control over the time offsets are committed. Both to eliminate the possibility of missing messages and to reduce the number of messages duplicated during rebalancing. Te consumer API has the option of committing the current offset at a point that makes sense to the application developer rather than based on a timer.

By setting auto.commit.offset = false, offsets will only be committed when the application explicitly chooses to do so. The simplest and most reliable of the commit APIs is commitSync(). This API will commit the latest offset returned by poll() and return once the offset is committed, throwing an exception if commit fails for some reason.

It is important to remember that commitSync() will commit the latest offset returned by poll(), so make sure you call commitSync() after you are done processing all the records in the collection, or you risk missing messages as described above. Note that when rebalance is triggered, all the messages from the beginning of the most recent batch until the time of the rebalance will be processed twice.

Here is how we would use commitSync to commit offsets once we finished processing the latest batch of messages:

大多数开发者都希望在定时提交偏移量上能够对偏移量有更多足够的掌控能力,他们的想法不仅仅是要消除丢失数据的可能性,也希望在平衡发生时减少消息的重复处理数量。消费者API提供了在某个点上提交当前偏移量的选项,相比基于定时器的自动提交方式,这种方式对应用程序开发者而言更有意义。

通过设置auto.commit.offset=false,偏移量只会在应用程序显示调用时才会被提交。最简单和可靠的提交API是commitSync()方法。该API会提交poll()返回时的最近偏移量,并且一旦偏移量被提交后就会返回,如果因为某种原因提交失败了则会抛出异常。

注意commitSync()提交的会对poll()返回时提交这批数据的最近偏移量,所以要确保处理完集合中的所有记录后才调用commitSync(),否则你就会面临前面提到的丢失数据问题(先提交偏移量然后才处理记录,如果处理某条记录失败了,这条失败的记录以及之后的记录都不会有机会被处理)。当触发平衡时,从最近一批记录的最开始直到发生平衡这个时间点的所有消息都会被处理两次(消息重复处理仍然不可避免,比如处理顺序为:提交偏移量=5,拉取一批消息共10条,然后开始处理记录,假设只处理了3条就发生了平衡,平衡之后,消费者会重新从位置5开始处理,所以这批消息的开始到发生平衡时的3条记录就会被重新处理)。

下面的代码示例中,一旦在处理完最近的一批数据后,使用commitSync()方法提交偏移量:

1
2
3
4
5
6
7
8
9
10
11
12
13
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
println("topic=%s,partition=%s,offset=%d,customer=%s,country=%s",
record.topic(), record.partition(),
record.offset(), record.key(), record.value()); //1
}
try {
consumer.commitSync(); //2
} catch (CommitFailedException e) {
log.error("commit failed", e) //3
}
}
  1. 这里假设打印一条记录的内容表示已经处理完了该记录。你实际的应用程序肯定比这要复杂,你应该根据你的实际用例决定什么时候记录被处理完成。
  2. 一旦处理完了这一批的所有记录,我们调用了commitSync()提交这一批的最近偏移量,并且这个操作发生在下一次轮询拉取新消息之前。
  3. 在没有不可恢复的错误下,调用commitSync()提交偏移量失败应该重试,这里只记录错误日志。

异步提交偏移量

One drawback of manual commit is that the application is blocked until the broker responds to the commit request. This will limit the throughput of the application. Throughput can be improved by committing less frequently, but then we are increasing the number of potential duplicates that a rebalance will create.

手动提交的一个缺点是应用程序会在代理返回提交请求的响应之前一直被阻塞(客户端向Kafka的代理节点发送提交偏移量请求,在这期间,应用程序无法执行其他业务逻辑),这会直接限制了应用程序的吞吐量。虽然可以通过频度更少的提交来提高吞吐量,但代价是在发生平衡时增加了重复处理的可能性。

另外一个选项是异步提交API,我们发送完提交请求后继续后续的业务逻辑处理,而不需要等待代理节点返回提交响应。commitAsync()方法会提交偏移量,然后继续向下运行(即开始新的轮询)。

1
2
3
4
5
6
7
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//处理记录,比如简单的打印,略
}
consumer.commitAsync(); // 1
}

The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a non-retriable failure, commitAsync() will not retry. The reason it does not retry is that by the time commitAsync() receives a response from the server, there may have been a later commit which was already successful. Imagine that we sent a request to commit offset 2000. There is a temporary communication problem, so the broker never gets the request and therefore never respond. Meanwhile, we processed another batch and successfully committed offset 3000. If commitAsync() now retries the previously failed commit, it may succeed in committing offset 2000 after offset 3000 was already processed and committed. In case of a rebalance, this will cause more duplicates.

We are mentioning this complication and the importance of correct order of commits, because commitAsync() also gives you an option to pass in a callback that will be triggered when the broker responds. It is common to use the callback to log commit errors or to count them in a metric, but if you want to use the callback for retries, you need to be aware of the problem with commit order.

同步的commitSync()方法缺点是如果提交成功会一直尝试提交,或者在遇到无法重试的场景下才会结束,但异步的commitAsync()方法不管有没有执行成功都不会重试。不需要重试的原因是当commitAsync()接收到服务端的响应时,可能已经存在一个更新的提交已经成功了。假设我们发送了一个要提交偏移量为2000的请求,但是由于临时的通信问题,代理节点从来就没有机会收到这个请求,所以也就不会发送响应。同时我们处理了新的一批数据,并且成功地提交了偏移量=3000。如果第一次的commitAsync()由于提交失败现在执行重试,它这是可能也会成功地提交了偏移量=2000,但实际上偏移量=3000已经完成并提交成功了,在平衡的场景下,这种场景会导致更多的重复处理。

我们提到的这种混乱主要是为了让大家知道提交顺序的重要性,commitAsync()方法还提供了自定义的处理作为回调函数传入,自定义的回调函数会在代理返回响应时触发执行。通常会使用回调函数记录提交错误或者在监控系统中计数,但如果你要用回调来做重试的话,你就需要担心提交顺序的问题。下面的代码中我们发送了提交请求,然后继续,但如果提交发生失败,会记录失败的偏移量,以及异常原因。

1
2
3
4
5
6
7
8
9
10
11
12
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//处理记录,比如简单的打印,略
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (e != null) log.error("Commit failed for offsets {}", offsets, e);
}
}); //1
}

A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and the sequence number at the time of the commit to the asyncCommit callback. When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable, if it is - there was no newer commit and it is safe to retry. If the instance sequence number is higher, don’t retry since a newer commit was already sent.

保证异步重试时提交顺序的正确性,可以使用单调递增的序号。每次提交时增加序号,并且在异步提交的回调里执行成功时也增加序号。当你准备重试时,检查回调总的提交序列号是否和实例变量的相同,如果相同,说明没有新的提交,那么就可以安全地重试。但如果实例变量比回调的序号要搞,就不需要重试,因为新的提交已经发送出去了。

结合同步和异步提交

Normally, occasional failures to commit without retrying are not a huge problem, since if the problem is temporary the following commit will be successful. But if we know that this is the last commit before we close the consumer, or before a rebalance, we want to make extra sure that the commit succeeds.

通常,偶然性出现提交故障并不是一个大问题,因为如果这个问题是短暂的,后续的提交也会成功。但如果在关闭消费者之前,我们确切地知道这就是最后一次提交了,或者在发生平衡之前,我们都要确保提交必须成功。

Therefore a common pattern is to combine commitAsync with commitSync just before shutdown. Here is how it works (We will discuss how to commit just before rebalance when we get to the section about rebalance listeners):

因此一种普遍的做法是在关闭之前结合使用commitAsync()commitSync(后面我们会在谈到平衡监听器时讨论怎么在平衡之前提交),做法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//处理记录,略
}
consumer.commitAsync(); //1
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(); //2
} finally {
consumer.close();
}
}
  1. 当一切工作的很正常时,我们使用异步的commitAsync,它很快,而且如果一次提交失败了,下一次会重试
  2. 在关闭消费者时,不会有下一次提交了,我们就调用同步的commitSync,它会重试直到提交成功

提交指定的偏移量

Committing the latest offset only allows you to commit as often as you finish processing batches. But what if you want to commit more frequently than that? What if poll() returns a huge batch and you want to commit offsets in the middle of the batch to avoid having to process all those rows again if a rebalance occures? You can’t just call commitSync() or commitAsync() - this will commit the last offset returned, which you didn’t get to process yet.

Fortunately, the consumer API allows you to call commitSync() and commitAsync() and pass a map of partitions and offsets that you wish to commit. If you are in the middle of processing a batch of records, and the last message you got from partition 3 in topic “customers” has offset 5000, you can call commitSync() to commit offset 5000 for partition 3 in topic “customers”. Since your consumer may be consuming more than a single partition, you will need to track offsets on all of them, so moving to this level of precision in controlling offset commits adds complexity to your code.

提交最近的偏移量只会允许你在处理完批记录后才会提交,但如果你想要更频繁地提交呢?如果说poll()返回的是一批很大的记录集,你想要在这批记录集的中间某个位置提交偏移量,避免在平衡发生时不得不重新处理这些所有的记录?你不能仅仅调用commitSync()或者commitAsync(),它们只会提交返回的最近偏移量,而返回的这些记录你都还没有执行。

幸运的是,消费者API允许你调用commitSync()commitAsync()时传递一个你希望提交的分区和偏移量的字典。如果你已经处理了一批记录的中间,并且你从主题为“客户端”的分区3得到的最近一条消息的偏移量=5000,你可以立即调用commitSync()来提交主题为“客户端”分区3的偏移量。由于你的消费者可能会消费多个分区,你需要跟踪所有分区的偏移量,所以用这种更细粒度的方式控制偏移量的提交会增加你的代码的复杂性。下面是提交指定偏移量的代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Map<TopicPartition, OffsetAndMetadata> currentOffsets; //1
int count = 0;
....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//处理逻辑,略 //2
currentOffsets.put(new TopicPartition(record.topic(),record.partition()),
record.offset()); //3
if (count % 1000 == 0) //4
consumer.commitAsync(currentOffsets); //5
count++;
}
}
  1. 我们使用这个Map字典结构用来手动跟踪偏移量
  2. 这里用打印记录的方式代替实际的业务处理
  3. 读取完每条记录后,用最近的偏移量更新偏移量字典
  4. 这里我们决定每隔1000条记录提交一次,实际应用中可以根据时间提交甚至是记录内容
  5. 这里选择调用commitAsync,不过commitSync也同样有效。当然提交指定的偏移量,也仍然需要处理前面章节中提到的错误。

平衡监听器

As we mentioned in previous section about committing offsets, a consumer will want to do some cleanup work before exiting and also before partition rebalancing.

If you know your consumer is about to lose ownership of a partition, you will want to commit offsets of the last event you’ve processed. If your consumer maintained a buffer with events that it only processes occasionally (for example, the currentRecords map we used when explaining pause() functionality), you will want to process the events you accumulated before losing ownership of the partition. Perhaps you also need to close file handles, database connections and such.

前面章节中说过提交偏移量时,消费者会在分区平衡之前或者退出时执行一些清理工作。如果你知道消费者即将失去一个分区的所有权,你应当要提交已处理完最近事件的偏移量。如果你的消费者维护了一个事件缓冲区,并且偶尔才会处理一次(比如在使用pause()功能时会使用currentRecords字典暂存记录),你也应当在失去分区的所有权之前处理目前为止收集的所有事件。也许还需要做其他的工作比如关闭文件句柄,释放数据库连接等等。

消费者API允许你在消费者所属的分区被添加和移除时,运行自定义的代码逻辑。可以通过在调用subscribe()方法时传递一个ConsumerRebalanceListener监听器来完成,该监听器接口有两个需要的方法:

  • public void onPartitionsRevoked(Collection<TopicPartition> partitions)会在平衡开始之前以及消费者停止消费消息之后调用。在这里通常要提交偏移量,这样无论下一个消费者是谁,它获得到分区后,就知道要从哪里开始。
  • public void onPartitionsAssigned(Collection<TopicPartition> partitions)会在分区重新分配给消费者之后,在消费者开始消费消息之前调用。

下面的示例展示了如何使用onPartitionsRevoked()方法在失去一个分区的所有权之前提交偏移量。后面我们会展示同时模拟使用了onPartitionsAssigned()方法的更复杂示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private Map<TopicPartition, OffsetAndMetadata> currentOffsets;

private class HandleRebalance implements ConsumerRebalanceListener { //1
public void onPartitionsAssigned(Collection<TopicPartition> partitions){//2
}

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets); //3
}
}

try {
consumer.subscribe(topics, new HandleRebalance()); //4

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//处理记录,略
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
record.offset());
}
consumer.commitAsync(currentOffsets);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
}
}

In this example we don’t need to do anything when we get a new partition, we’ll just start consuming messages.

3
However, when we are about to lose a partition due to rebalancing, we need to commit offsets. Note that we are committing the latest offsets we’ve processed, not the latest offsets in the batch we are still processing. This is because a partition could get revoked while we are still in the middle of a batch. We are committing offsets for all partitions, not just the partitions we are about to lose - since the offsets are for events that were already processed, there is no harm in that. Last, note that we are using syncCommit() to make sure the offsets are committed before the rebalance proceeds.

4
The most important part - pass the ConsumerRebalanceListener to subscribe() method so it will get invoked by the consumer.

  1. 我们从实现ConsumerRebalanceListener监听器开始
  2. 本例中,在分配到新分区之后我们没有做任何事情,接下来只是消费消息而已
  3. 然而,由于平衡导致失去分区的控制权时,需要提交偏移量。注意我们提交的是已经处理完的消息的最近偏移量,而不是当前一批仍然在处理的最近偏移量,因为在处理一批记录的中间也有可能分区被取消(这样可以最大限度地减少平衡之后重复处理的数据量,但还是不可避免数据重复)。同时我们会提交所有分区的偏移量(属于当前消费者的),而不是我们即将失去的某些分区,因为currentOffsets字典针对的是所有已经处理完的事件,所以这并没有什么大的影响。最后,我们使用了同步的syncCommit来确保在平衡发生时成功地提交了偏移量。
  4. 最重要的一部分,将步骤1创建的监听器传递给subscribe()方法,这样就可以被消费者调用

正好一次的处理语义

So far we’ve seen how to use poll() to start consuming messages from the last committed offset in each partition and to proceed in processing all messages in sequence. However, sometimes you want to start reading at a different offset.

If you want to start reading all messages from the beginning of the partition, or you want to skip all the way to the end of the partition and start consuming only new messages, there are APIs specifically for that: seekToBeginning(TopicPartition tp) and seekToEnd(TopicPartition tp).

目前为止我们已经看到了如何使用poll()从每个分区的最近提交位置开始消费消息,并且顺序地处理所有的消息。不过有时候,你可能想要在一个不同的任务位置开始读取(而不总是从最近的位置)。

如果你想要从分区的最开始位置读取所有的消息,或者你想要跳过中间所有的数据直接到达分区的最末尾,只想要消费新的消息。也确实有这样的API供你使用:seekToBeginning()seekToEnd()分别满足了上面两种需求。

However, the Kafka API also lets you seek to a specific offset. This ability can be used in a variety of ways, for example to go back few messages or skip ahead few messages (perhaps a time-sensitive application that is falling behind will want to skip ahead to more relevant messages), but the most exciting use-case for this ability is when offsets are stored in a system other than Kafka.

Think about this common scenario: Your application is reading events from Kafka (perhaps a clickstream of users in a website), processes the data (perhaps clean up clicks by robots and add session information) and then store the results in a database, NoSQL store or Hadoop. Suppose that we really don’t want to lose any data, nor do we want to store the same results in the database twice.

不过Kafka API还允许你定位到指定的位置(在谈到提交时我们会说提交偏移量,在谈到定位时我们会说位置,位置这个概念用在现实生活中表示要到哪个地方,而偏移量更多表示的是处于一种什么状态,提交时主要关注的是状态数据,当然你不需要纠结这么多,位置和偏移量其实是相同的概念)。这种特性可以用在很多地方,比如回退几个消息重新处理,或者跳过一些消息(也许是一个时间敏感的应用程序,如果数据处理的进度落后太多时,你会想跳到最近的时间点,因为这些消息更能表示相关的当前状态)。但这种特性最令人兴奋的一个用例是:将偏移量存储到其他系统而不是Kafka中。

考虑下面的通用场景:应用程序从Kafka中读取事件(也许是一个网站的用户点击流)、处理数据(也许是清理机器点击,添加会话信息),然后存储结果到数据库、NoSQL或者Hadoop。假设我们真的不希望丢失任何数据,也不希望存储两份相同的数据。这种场景下,消费者的循环会是这样的:

1
2
3
4
5
6
7
8
9
10
11
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
record.offset());
processRecord(record); //处理每一条记录
storeRecordInDB(record); //存储记录到数据库、NoSQL或Hadoop
consumer.commitAsync(currentOffsets); //提交偏移量
}
}

Note that we are very paranoid, so we commit offsets after processing each record. However, there is still a chance that our application will crash after the record was stored in the database but before we committed offsets, causing the record to be processed again and the database to contain duplicates.

This could be avoided if there was only a way to store both the record and the offset in one atomic action. Either both the record and the offset are committed, or neither of them are committed. As long as the records are written to a database and the offsets to Kafka, this is impossible.

But what if we wrote both the record and the offset to the database, in one transaction? Then we’ll know that either we are done with the record and the offset is committed or we are not, and the record will be reprocessed.

Now the only problem is: if the record is stored in a database and not in Kafka, how will our consumer know where to start reading when it is assigned a partition? This is exactly what seek() can be used for. When the consumer starts or when new partitions are assigned, it can look up the offset in the database and seek() to that location.

请注意,我们非常偏执,在处理完每条记录都执行一次提交偏移量的动作。但是即使如此,应用程序仍然有可能会在存储到数据库之后,并且在提交偏移量之前挂掉,从而导致记录会被重新处理,最终数据库中的记录仍然会有重复的。

如果存在一种方式只有以原子操作的方式同时存储记录和偏移量,就可以避免上面的问题了。原子的意思是:记录和偏移量要么同时都提交了,要么都没有提交。只要是记录写到数据库,而偏移量写到Kafka中,这就不可能做到原子操作(毕竟写两个不同的存储系统是没有事务保证的)。

但如果我们在一个事务中同时写入记录和偏移量到数据库中呢?那么我们就会知道要么我们处理完了这条记录并且偏移量也成功提交了,要么这两个操作都没有发生,后者就会重新处理记录(因为记录和偏移量都没有成功存储,所以重复处理并不会使得存储系统有重复的数据)。

现在问题只有一个了:如果将记录存储在数据库而不是Kafka,我们的应用程序怎么知道要从分配分区的哪里开始读取?这就是seek()方法发挥作用的地方。当消费者启动或者分配到新的分区,可以先去数据库中查询分区的最近偏移量,然后通过seek()方法定位到这个位置。

下面的代码是这种做法的基本骨架,我们使用了ConsumerRebalanceLister监听器和seek()方法,来确保从数据库中存储的偏移量开始处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//消费者平衡的监听器
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener{
public void onPartitionsRevoked(Collection<TopicPartition> partitions){
commitDBTransaction(); //1
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions){
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition)); //2
}
}
}

//消费者主逻辑
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);

for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition)); //3

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(),record.partition(),record.offset());//4
}
commitDBTransaction();
}
  1. 我们使用了一个虚构的方法来保证提交事务操作到数据库中。这里的考虑是在处理记录的时候会插入记录和偏移量到数据库中,所以我们只需要在即将事务分区的所有权时提交这个事务,来确保这些信息被持久化。
  2. 我们还有一个虚构的方法会从数据库中读取分区的偏移量,然后在获得新分区的所有权时通过消费者的seek()方法定位到这些记录。
  3. 当消费者订阅主题并第一次启动时,立即调用一次无阻塞的poll(0)方法,来确保加入消费组,并且得到分配的分区。然后紧接着调用seek()定位到分配给我们(当前消费者)的分区的正确位置。注意seek()仅仅更新了我们要从哪里开始消费的位置,所以下一次调用poll()才会开始拉取正确的消息。如果在seek()时发生错误(比如偏移量不存在),调用poll()时就会抛出异常。
  4. 又一个虚构的方法,这次我们更新了数据库中存储偏移量的一张表。这里我们假设更新记录的操作很快就完成了,所以我们在每条记录上都执行了更新操作。不过提交偏移量是比较慢的,所以我们只在一批数据都处理完成后才执行提交操作。不过这里面仍然有很多优化的方法。

通过将偏移量和数据一起存储在外部存储系统中,有多种方式可以实现正好一次的语义,不过所有的这些方法都需要使用ConsumerRebalanceListenerseek()一起协调,来确保偏移量及时地被存储,这样消费者就可以从正确的位置开始读取消息(只有发生平衡后分配到新的分区才需要从数据库中读取偏移量)。

如何退出

When you decide to exit the poll loop, you will need another thread to call consumer.wakeup(). If you are running the consumer loop in the main thread, this can be done from a ShutdownHook. Note that consumer.wakeup() is the only consumer method that is safe to call from a different thread. Calling wakeup will cause poll() to exit with WakeupException, or if consumer.wakeup() was called while the thread was not waiting on poll, the exception will be thrown on the next iteration when poll is called. The WakeupException doesn’t need to be handled, it was just a way of breaking out of the loop, but it is important that before exitting the thread, you will call consumer.close(), this will do any last commits if needed and will send the group coordinator a message that the consumer is leaving the group, so rebalancing will be triggered immediately and you won’t need to wait for the session to time out.

本章开始当我们在讨论轮询循环时,我们说过不用担心消费者的轮询是在一个无限死循环里,这里我们来讨论下如何干净地退出循环。当你决定要退出轮询循环时,你(当前消费者线程)需要其他的线程(其他消费者线程)调用consumer.wakeup()(这里的consumer是第一个消费者的线程)。如果你在主线程中正在运行消费者的循环,可以通过ShutdownHook钩子的方式来完成(退出时会在主线程中执行钩子函数)。注意consumer.wakeup()是唯一一个可以在其他线程中被安全调用的消费者方法。调用wakeup()方法会使得被阻塞的poll()方法退出,并且附带了一个WakeupException异常。或者如果在调用consumer.wakeup()时,主线程并没有被阻塞在poll()方法上时,异常会在下次调用poll()的时候才抛出。

不需要特别地处理WakeupException异常,它只是表示退出循环的一种方式而已。不过重要的是在退出线程之前,你应该调用consumer.close()方法。必要的话还应该做最后的一次提交,并且向协调者通知消费者正在离开消费组。这样平衡就会被立即触发,你就不需要等到会话超时时才会被协调者检测出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("准备退出消费者客户端...");
consumer.wakeup(); //1
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

try {
// 循环直到通过Ctrl-C中断客户端,关闭钩子会在退出时执行清理工作
while (true) {
ConsumerRecords<String, String> records = movingAvg.consumer.poll(1000);
System.out.println(System.currentTimeMillis() + "--等待新数据...");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset:%d,key:%s,value:%s\n",
record.offset(),record.key(),record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("提交偏移量的位置:" + consumer.position(tp));
movingAvg.consumer.commitSync();
}
} catch (WakeupException e) {
// ignore for shutdown //2
} finally {
consumer.close(); //3
System.out.println("消费者关闭成功,结束!");
}
  1. 关闭钩子运行在独立的线程(不同于主线程),所以唯一安全的操作是调用wakeup()中断轮询循环。
  2. 其他线程调用wakeup()会导致poll()调用抛出WakeupException异常。你应该捕获这个异常,确保应用程序不会以非预期的方式退出,不过这里不需要做任何事情。
  3. 在退出消费者之前,确保干净地关闭消费者。

反序列化

As discussed in the previous chapter, Kafka Producers require serializers to convert objects into byte arrays that are then sent to Kafka. Similarly, Kafka Consumers require deserializers to convert byte arrays recieved from Kafka into Java objects. In previous examples, we just assumed that both the key and the value of each message are Strings and we used the default StringDeserializer in the Consumer configuration.

In the previous chapter about the Kafka Producer, we’ve seen how to serialize custom types and how to use Avro and AvroSerializers to generate Avro objects from schema definitions and then serialize them when producing messages to Kafka. We will now look at how to create custom deserializers for your own objects and how to use Avro and its deserializers.

上一章中Kafka的生产者要求将对象序列化为字节数组以便发送给Kafka。类似的Kafka的消费者要求从Kafka中读出的二进制字节数组反序列化为Java对象。前面的示例中我们在消费者的配置中只是假设了使用了默认的StringDeserializer反序列化器。

在上一章关于生产者,我们看到了如何序列化自定义的类型,怎么使用Avro和AvroSerializers序列化器从模式定义生成Avro对象,最后经过序列化后向Kafka生产消息。现在我们会看下怎么对你的对象使用自定义的反序列化器,以及如何使用Avro和它的反序列化器。

It should be obvious that the serializer that was used in producing events to Kafka must match the deserializer that will be used when consuming events. Serializing with IntSerializer and then deserializing with StringDeserializer will not end well. This means that as a developer you need to keep track of which serializers were used to write into each topic, and make sure each topic only contains data that the deserializers you use can interpret. This is one of the benefits of using Avro and the Schema Repository for serializing and deserializing - the AvroSerializer can make sure that all the data written to a specific topic is compatible with the schema of the topic, which means it can be deserialized with the matching deserializer and schema. Any errors in compatibility - in the producer or the consumer side will be caught easily with an appropriate error message, which means you will not need to try to debug byte arrays for serialization errors.

显然,用在向Kafka生产事件的序列化器,必须和消费事件使用的反序列化器是匹配的。如果用IntSerializer但却用StringDeserializer反序列化是不行的。这意味着作为开发者,你需要跟踪写入每个主题使用的序列化器,并且确保每个主题只能包含你使用的反序列化器可以解释通过的数据。这就是使用Avro和模式存储作为序列化和反序列化的好处之一,AvroSerializer序列化器可以确保所有写入指定主题的数据和这个主题的模式是兼容的,这也意味着这些数据也能够被匹配的反序列化器和模式用来反序列化。对于兼容性的任何错误问题,在生产者或者消费者端都可以很容易地使用合适的错误类型捕获,这还意味着你不需要为序列化错误调试字节数组,程序开发变得更加简单高效。

下面我们会展示如何快速地编写自定义的反序列化器,尽管这不是推荐的做法,然后我们会接着使用一个Avro示例来反序列化消息的键值。

自定义反序列化

我们以第三章中序列化时使用的一样的自定义对象作为示例,并且为它编写一个反序列化器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Customer {
private int customerID;
private String customerName;

public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}

public int getID() {
return customerID;
}

public String getName() {
return customerName;
}
}

自定义序列化器的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerDeserializer implements Deserializer<Customer> { //1
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null) return null;
ByteBuffer buffer = ByteBuffer.wrap(data);
id = buffer.getInt();
String nameSize = buffer.getInt();

byte[] nameBytes = new Array[Byte](nameSize);
buffer.get(nameBytes);
name = new String(nameBytes, 'UTF-8');
return new Customer(id, name); //2
} catch (Exception e) {
throw new SerializationException("Error when serialize Customer to byte[] "+e);
}
}
public void close() {
// nothing to close
}
}
  1. 自定义的CustomerDeserializer实现了Deserializer接口,泛型为Customer对象,注意类(Customer)和序列化器(CustomerDeserializer)需要在生产者和消费者应用程序中完全匹配。在一个规模庞大的组织机构中,有很多的生产者和消费者应用程序会共享Kafka的数据,对于这种约定而言是一个很大的挑战。
  2. 我们这里仅仅是对序列化逻辑的逆向处理,从字节数组中获取顾客编号和名称,并且使用它们来构造自定义的对象。

使用反序列化器的消费者看起来是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.CustomerDeserializer");
KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);
consumer.subscribe("customerCountries");
while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(100);
for (ConsumerRecord<String, Customer> record : records) {
System.out.println("current customer Id: " + record.value().getId() +
" and current customer name: " + record.value().getName());
}
}

Again, it is important to note that implementing custom serializer and deserializer is not a recommended practice. It tightly couples producers and consumers and is fragile and error-prone. A better solution would be to use a standard message format such as Thrift, Protobuf or Avro. We’ll now see how to use Avro deserializers with the kafka consumer. For background on Apache Avro, its schemas and schema-compatibility capabilities, please refer back to Chapter 3.

再次强调,自己实现定制的序列化器和反序列化器是不被推荐。它紧紧地把生产者和消费者绑定在一起,导致程序非常脆弱并且很容易出问题。一种更好的方式是使用标准的消息格式,比如Thrift、Protobuf或者Avro。下面我们会看到如何在Kafka消费者中结合使用Avro反序列化器。如果想巩固Apache Avro的背景知识、它的模式、模式匹配功能,可以复习下第三章的相关内容。

Avro反序列化

Lets assume we are using the implementation of Customer class in Avro that was shown in Chapter 3. In order to consume those objects from Kafka, you want to implement a consuming application similar to this:

假设我们使用了第三章中展示的,用Avro实现的Customer类,为了从Kafka中消费这些对象,你的消费者应用程序会是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer"); //1
props.put("schema.registry.url", schemaUrl); //2
String topic = "customerContacts"
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);

while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(1000); //3
for (ConsumerRecord<String, Customer> record: records) {
System.out.println("Current customer name is: "
+ record.value().getName()); //4
}
consumer.commitSync();
}
  1. 我们使用了KafkaAvroDeserializer来反序列化Avro消息
  2. schema.registry.url是一个新的参数,它仅仅表示存储模式的地址。这样消费者就可以使用生产者序列化消息时注册的模式。
  3. 指定生成的Customer类,作为记录的值类型
  4. record.value()返回值是一个Customer实例,于是我们就可以直接使用它

消费者配置

目前为止我们主要专注于学习消费者API的使用方法,但我们只看到了很少的配置属性,即只有那些必须的bootstrap.serversgroup.idkey.deserializervalue.deserializer。在Apache Kafka的官方文档中列出了所有的消费者配置,大部分配置参数都有一个合理的默认值,并且不需要修改,不过有些参数对消费者的性能和可用性影响较大,让我们看下这些比较重要的配置。

fetch.min.bytes
This property allows a consumer to specify the minimum amount of data that it wants to receive from the broker when fetching records. If a Broker receives a request for records from a Consumer but the new records amount to fewer bytes than min.fetch.bytes, the broker will wait until more messages are available before sending the records back to the consumer. This reduces the load on both the Consumer and the Broker as they have to handle fewer back-and-forward messages in cases where the topics don’t have much new activity (or for lower activity hours of the day). You will want to set this parameter higher than the default if the Consumer is using too much CPU when there isn’t much data available, or to reduce load on the brokers when you have large number of consumers.

消费者拉取记录时从服务端代理节点接收的最小数据大小(单位为字节)。如果一个代理节点接收到一个消费者的拉取记录请求,但是新的记录字节大小小于min.fetch.bytes,代理节点在发送记录给消费者之前会等待直到有更多可用的消息(直到满足条件才发送记录给消费者)。通过这种方式可以减少消费者端和代理节点端的负载,因为如果主题没有太多的活动数据(或者一天中的业务低峰期),他们处理的消息来回次数更少。如果消费者没有太多的数据但使用了太多的CPU,可以设置该参数为一个更大的值,在有很多消费者的场景下,也可以减少服务端的负载(假设这个值很低的话,有很多消费者都向服务端拉取记录,只要有一点点新数据,服务端就要立即返回给消费者,对于同样的数据量,如果这个值比较高的话,网络的来回次数更少)。

fetch.max.wait.ms
By setting fetch.min.bytes you tell Kafka to wait until it has enough data to send before responding to the consumer. fetch.max.wait.ms lets you control how long to wait. By default Kafka will wait up to 500ms. This results in up to 500ms of extra latency in case there is not enough data flowing to the Kafka topic to satisfy the minimum amount of data to return. If you want to limit the potential latency (usually due to SLAs controlling the maximum latency of the application), you can set fetch.max.wait.ms to lower value. If you set fetch.max.wait.ms to 100ms and fetch.min.bytes to 1MB, Kafka will recieve a fetch request from the consumer and will respond with data either when it has 1MB of data to return or after 100ms, whichever happens first.

上面通过设置fetch.min.bytes告诉Kafka在没有足够的数据之前不要发送响应给消费者。fetch.max.wait.ms则允许你在客户端自己控制最多等待多长的时间。默认值是500ms,即如果Kafka在500ms内还没有收集到fetch.min.bytes字节的消息,也要返回响应给消费者。消费者客户端在某些场景下最多只会有500ms的延迟:没有足够的数据流向Kafka,来满足要返回给客户端大小为fetch.min.bytes的数据量。如果你想要限制可能的延迟(通常是由应用程序的SLA控制最大的延迟时间),你需要设置fetch.max.wait.ms为一个更小的值(以便更快地收到响应)。举例你设置了fetch.max.wait.ms为100ms,fetch.min.bytes为1MB,那么Kafka接收到消费者的拉取请求后,当数据量有1MB后,或者超过100ms时,这两种情况无论哪种先发生都会返回目前收集的记录给消费者客户端。

max.partition.fetch.bytes
This property controls the maximum number of bytes the server will return per partition. The default is 1MB, which means that when KafkaConsumer.poll() returns ConsumerRecords, the record object will use at most max.partition.fetch.bytes per partition assigned to the Consumer. So if a topic has 20 partitions, and you have 5 consumers, each consumer will need to have 4MB of memory available for ConsumerRecords. In practice, you will want to allocate more memory as each consumer will need to handle more partitions if other consumers in the group fail.max.partition.fetch.bytes must be larger than the largest message a broker will accept (max.message.size property in the broker configuration), or the broker may have messages that the consumer will be unable to consumer, in which case the consumer will hang trying to read them.

Another important consideration when setting max.partition.fetch.bytes is the amount of time it takes the consumer to process data. As you recall, the consumer must call poll() frequently enough to avoid session timeout and subsequent rebalance. If the amount of data a single poll() returns is very large, it may take the consumer longer to process, which means it will not get to the next iteration of the poll loop in time to avoid a session timeout. If this occures the two options are either to lower max.partition.fetch.bytes or to increase the session timeout.

这个配置控制了服务端为每个分区返回的最大字节。默认值是1MB,这就意味着当调用KafkaConsumer.poll()返回ConsumerRecords,这个记录集对象中最多每个分区(分配给消费者的分区)占用1MB。举例一个主题有20个分区,你有5个消费者,每个消费者存储ConsumerRecords对象需要有4M的可用内存。实际中你应该为每个消费者分配更多的内存,因为消费组中的其他消费者失败时,当前消费者要处理更多的分区。max.partition.fetch.bytes需要比服务端能接受的最大消息大小(服务端的max.message.size配置,表示一条消息不能超过该值,否则不会被服务端接受)还要大,否则代理节点有些消息就无法被消费者所消费,就会导致消费者在尝试读取比较大的消息时被暂停掉(比如拉取的最大分区大小=1M,而一条消息最大可以有2M,由于客户端拉取的分区最大只有1M,超过1M以上的消息就没办法被拉取到了)。

设置max.partition.fetch.bytes的另一个重要的衡量标准是消费者处理数据花费的时间。回忆下之前的知识点,消费者必须足够频繁地调用poll()方法来避免会话超时和随后的平衡(因为都是在一个线程里完成所有的IO操作)。如果调用一次poll()返回的数据非常大,消费者可能需要花费更长的时间才能处理完成,这就是说它无法及时地迭代下一次轮询循环来避免会话超时。如果发生这种情况,有两种解决办法,降低max.partition.fetch.bytes或者增加会话的超时时间。

session.timeout.ms
The amount of time a consumer can be out of contact with the brokers while still considered alive, defaults to 3 seconds. If a consumer goes for more than session.timeout.ms without sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group. This property is closely related to heartbeat.interval.ms. heartbeat.interval.ms controls how frequently the KafkaConsumer poll() method will send a heartbeat to the group coordinator, while session.timeout.ms controls how long can a consumer go without sending a heartbeat.

Therefore, thoese two properties are typically modified together - heatbeat.interval.ms must be lower than session.timeout.ms, and is usually set to a 1/3 of the timeout value. So if session.timeout.ms is 3 seconds, heartbeat.interval.ms should be 1 second. Setting session.timeout.ms lower than default will allow consumer groups to detect and recover from failure sooner, but may also cause unwanted rebalances as result of consumers taking longer to complete the poll loop or garbage collection. Setting session.timeout.ms higher will reduce the chance of accidental rebalance, but also means it will take longer to detect a real failure.

该配置表示消费者在一段时间内不需要和代理节点联系也仍然认为是存活的,默认值是3秒。如果消费者超过session.timeout.ms时间还没有发送心跳给协调者,消费者就会被认为挂掉。协调者会触发一次消费组的平衡,并将属于挂掉的消费者的分区分配给消费组中其他的消费者。该配置和heartbeat.interval.ms密切相关。heartbeat.interval.ms会控制每隔多长时间消费者的poll()方法会发送一次心跳给协调者,而session.timeout.ms则控制消费者在没有发送心跳时多久会离开消费组。因此这两个配置通常一起修改,而且heatbeat.interval.ms必须要比session.timeout.ms低,通常心跳间隔会设置为超时时间的1/3。比如session.timeout.ms设置为3秒,那么heartbeat.interval.ms就应该设置为1秒。

session.timeout.ms设置的比默认值要低时,允许消费组更快地检测和恢复故障。但同时也会造成不必要的平衡,比如消费者可能花费比较长的时间完成一次轮询调用或者正在发送垃圾回收,而并没有真正挂掉,但是因为会话超时时间很短,导致发生更频繁的平衡。如果设置session.timeout.ms太大了,虽然可以减少偶然的平衡,但同时也意味着要花费更长的时间才能检测到真正的错误。

auto.offset.reset
This property controls the behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid (usually because the consumer was down for so long that the record with that offset was already aged out of the broker). The default is “latest”, which means that lacking a valid offset the consumer will start reading from the newest records (records which were written after the consumer started running). The alternative is “earliest”, which means that lacking a valid offset the consumer will read all the data in the partition, starting from the very beginning.

该配置控制了消费者在一个没有提交的偏移量,或者偏移量是无效(通常是因为消费者挂掉太长的时间,带有偏移量的那条记录对于代理节点而言太旧了)的分区上,如何开始读取数据的行为。默认值是“最近的”(latest),表示在没有有效偏移量的情况下,消费者会从最新的记录开始读取(在消费者开始运行之后才被写入的记录,如果说消费者还没正式运行,即使写入记录,在消费者正式启动之后也不会被读取到,只从消费者正式启动的那一刻之后的记录才会被读取)。另外一种选择方式是“最早的”(“earliest”),表示在没有有效的偏移量时,消费者会从分区的最开始读取所有的数据。

enable.auto.commit
We discussed the different options for committing offsets earlier in this chapter. This parameter controls whether the consumer will commit offsets automatically and defaults to true. Set it to false if you prefer to control when offsets are committed, which is necessary to minimize duplicates and avoid missing data. If you set enable.auto.commit to true then you may also want to control how frequently offsets will be committed using auto.commit.interval.ms.

本章我们已经讨论了提交偏移量的不同方式。这个参数控制了消费者是否会自动提交偏移量,默认值为true。如果你想要自己控制什么时候提交偏移量,可以设置为false。这对于减少数据的重复处理以及避免丢失数据是非常有必要的。如果你设置了enable.auto.commit为true,你通常也会使用auto.commit.interval.ms来控制多长时间提交一次偏移量。

partition.assignment.strategy
We learned that partitions are assigned to consumers in a consumer group. A PartitionAssignor is a class that, given consumers and topics they subscribed to, decides which partitions will be assigned to which consumer. By default Kafka has two assignment strategies: * Range - which assigns to each consumer a consecutive subset of partitions from each topic it subscribes to. So if consumers C1 and C2 are subscribed to two topics, T1 and T2 and each of the topics has 3 partitions. Then C1 will be assigned partitions 0 and 1 from topics T1 and T2, while C2 will be assigned partition 2 from those topics. Note that because each topic has uneven number of partitions and the assignment is done for each topic independently, the first consumer ended up with more partitions than the second. This happens whenever Range assignment is used and the number of consumers does not divide the number of partitions in each topic neatly.

  • RoundRobin - which takes all the partitions from all subscribed topics and assigns them to consumers sequentially, one by one. If C1 and C2 described above would use RoundRobin assignment, C1 would have partitions 0 and 2 from topic T1 and partition 1 from topic T2. C2 would have partition 1 from topic T1 and partitions 0 and 2 from topic T2. In general, if all consumers are subscribed to the same topics (a very common scenario), RoundRobin assignment will end up with all consumers having the same number of partitions (or at most 1 partition difference). partition.assignment.strategy allows you to choose a partition assignment strategy. The default is org.apache.kafka.clients.consumer.RangeAssignor which implements the Range strategy described above. You can replace it with org.apache.kafka.clients.consumer.RoundRobinAssignor. A more advanced option will be to implement your own assignment strategy, in which case partition.assignment.strategy should point to the name of your class.

我们知道分区被分配给一个消费组中的所有消费者。PartitionAssignor类的作用是:给定消费者和它们订阅的主题,它来决定哪个分区应该被分配给哪个消费者。默认Kafka有两种分配策略:平行(Range):分配给每个消费者的是,从它订阅的每个主题中连续的分区子集。举例消费者C1和C2都订阅了两个主题:T1和T2,这两个主题每个都有3个分区。那么消费者C1会分配到T1和T2主题的分区0和分区1,而消费者C2只会分配到这两个主题的分区2。注意由于每个主题的分区数量是奇数的,而且每个主题的分配都是独立的,所以第一个消费者会比第二个消费者分配到更多的分区。使用Range分配方式,当消费者的数量不能整除每个主题的分区数时就会发生这种情况。

轮询分配(RoundRobin):从所有订阅主题的所有分区中一个接一个顺序分配地分配给所有的消费者。以上面的两个消费者C1和C2为例,使用RoundRobin分配策略时,消费者C1会得到主题T1的分区0和分区2,以及主题T2的分区1;而消费者C2会得到主题T1的分区1,以及主题T2的分区0和分区2。通常情况下,如果所有的消费者订阅了相同的主题(这是很常见的场景),采用轮询分配的方式时,最终所有消费者都会分配到相同数量的分区(或者说最多有一个分区是不同的)。

partition.assignment.strategy配置允许你选择一个分区的分配策略。默认是实现了Range策略的org.apache.kafka.clients.consumer.RangeAssignor类,你也可以替换成org.apache.kafka.clients.consumer.RoundRobinAssignor类。当然你也可以实现自定义的分配策略,这种情况下应该将partition.assignment.strategy指向你自定义的类路径。

client.id
This can be any string, and will be used by the brokers to identify messages sent from the client. It is used in logging, metrics and for quotas.

该设置可以是任何的字符串,它会被用在代理节点定位消息是从哪个消费者客户端发送的,通常用在日志记录,性能监控,限额等。

单机消费者(不使用消费组)

So far we discussed consumer groups, where partitions are assigned automatically to consumers and are rebalanced automatically when consumers are added or removed from the group. Typically, this behavior is just what you want, but in some cases you want something much simpler. Sometimes you know you have a single consumer that always needs to read data from all the partitions in a topic, or from a specific partition in a topic. In this case there is no reason for groups or rebalances, just subscribe to specific topic and/or partitions, consume messages and commit offsets on occasion.

目前为止我们讨论了消费组的功能是:分区会自动地分配给所有的消费者,当消费者添加到消费组或者从消费组中移除时会自动平衡。这种场景通常是你需要的,不过有些情况下你可能想要的更加简单。比如你知道你的一个简单的消费者总是需要读取一个主题所有分区的数据,或者是一个主题的某个指定的分区。那么你没有理由需要使用消费组来平衡,你只需要订阅指定的主题或分区,偶尔地消费消息或者提交偏移量就可以了(不需要定时提交或者复杂的逻辑,因为我们不想要其他的消费者来代替这个消费者)。

If this is the case, you don’t subscribe to a topic, instead you assign yourself few partitions. Here is an example of how a consumer can assign itself all partitions of a specific topic and consume from them:

这种场景下你不需要订阅一个主题,而是直接分配一些分区给消费者就可以了。下面的示例展示了怎么将一个指定主题的所有分区分配给一个消费者,然后消费它们(分区的消息)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
List<PartitionInfo> partitionInfos=consumer.partitionsFor("topic"); //1

if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(),partition.partition()));
consumer.assign(partitions); //2

while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000); //3
for (ConsumerRecord<String, String> record: records) {
//处理记录,略
}
consumer.commitSync();
}
}
  1. 我们向集群请求获取指定主题的所有可用分区,如果你只需要消费某个特定的分区,可以跳过这一步。
  2. 当我们知道了想要的哪些分区,调用assign()方法时,传递分区列表即可。

Note that other than the lack of rebalances and the need to manually find the partitions, everything looks normal. Just remember that if someone adds new partitions to the topic, the consumer will not be notified. So either handle this by checking consumer.partitionsFor() periodically or keep in mind that if an admin add partitions, the applications will require bouncing. Also note that a consumer can either subscribe to topics (and be part of a consumer group), or assign itself partitions, but not both at the same time.

注意,除了没有平衡,以及手动获取分区的需要外,其他一切看起来都很正常。不过要记住如果有人为主题添加了新的分区,这个消费者不会被通知到。所以要么定期地调用consumer.partitionsFor(),或者记住在管理员添加分区的时候,应用程序需要做一些变动。还要记住:消费者可以订阅主题(作为消费组的一部分),也可以分配分区给它自己,但是不能同时使用这两个功能。

旧的消费者API

In this chapter we discussed the Java KafkaConsumer client that is part of org.apache.kafka.clients package. At the time of writing this chapter, Apache Kafka still has two older clients written in Scala that are part of kafka.consumer package which is part of the core Kafka module. These consumers are called SimpleConsumer (which is not very simple. It is a thin wrapper around the Kafka APIs that allow you to consume from specific partitions and offsets) and the High Level Consumer, also known as ZookeeperConsumerConnector, which is somewhat similar to the current consumer in that it has consumer groups and it rebalances partitions - but it uses Zookeeper to manage consumer groups and it does not give you the same control over commits and rebalances as we have now.

Because the current consumer supports both behaviors and gives much more reliability and control to the developer, we will not discuss the older APIs. If you are interested in using them, please think twice and then refer to Apache Kafka documentation to learn more.

本章我们讨论了Java版本的KafkaConsumer客户端,它属于org.apache.kafka.clients包下的一部分。本章写作的时候,Apache Kafka仍然还有另外两种使用Scala编写的旧客户端(高级API和低级API),它是Kafka核心模块中kafka.consumer的一部分。这两个消费者分别被叫做SimpleConsumer(实际上一点都不简单,它是对Kafka APIs的简单封装,允许你从指定的分区消费和提交偏移量,类似于新消费者的手动分配分区API)和高级Consumer,后者也被叫做ZookeeperConsumerConnector,它和当前版本有消费组语义的消费者有点类似,它也会平衡分区,不过使用ZooKeeper来管理消费组,但是并没有在提交和平衡上提供一样的控制能力。

Because the current consumer supports both behaviors and gives much more reliability and control to the developer, we will not discuss the older APIs. If you are interested in using them, please think twice and then refer to Apache Kafka documentation to learn more.

因为当前的消费者都支持这两种行为,并且能给予开发者更多的可靠性和控制权,所以我们这里不会讨论旧的APIs。如果你打算使用它们,建议你三思而后行,然后再去Apache Kafka的官方文档学习更多的知识。

EOF. 2016.10.29


文章目录
  1. 1. 概念
    1. 1.1. 消费者和消费组
    2. 1.2. 消费组平衡
  2. 2. 创建一个新的消费者
  3. 3. 订阅主题
  4. 4. 轮询循环
  5. 5. 提交和偏移量
    1. 5.1. 自动提交偏移量
    2. 5.2. 手动提交偏移量
    3. 5.3. 异步提交偏移量
    4. 5.4. 结合同步和异步提交
    5. 5.5. 提交指定的偏移量
  6. 6. 平衡监听器
  7. 7. 正好一次的处理语义
  8. 8. 如何退出
  9. 9. 反序列化
    1. 9.1. 自定义反序列化
    2. 9.2. Avro反序列化
  10. 10. 消费者配置
  11. 11. 单机消费者(不使用消费组)
  12. 12. 旧的消费者API