# 译：Kafka权威指南-第四章-消费者

Applications that need to read data from Kafka use KafkaConsumer to subscribe to Kafka topics and receive messages from these topics. Reading data from Kafka is a bit different than reading data from other messaging systems and there are few unique concepts and ideas involved. It is difficult to understand how to use the consumer API without understanding these concepts first. So we’ll start by explaining some of the important concepts, and then we’ll go through some examples that show the different ways the consumer APIs can be used to implement applications with different requirements.

## 概念

### 消费者和消费组

Suppose you have an application that needs to read messages from a Kafka topic, run some validations against them and write the results to another data store. In this case your application will create a consumer object, subscribe to the appropriate topic and start receiving messages, validating them and writing the results. This can work well for a while, but what if the rate at which producers write messages to the topic exceed the rate at which your application can validate them? If you are limited to a single consumer reading and processing the data, your application may fall farther and farther behind, unable to keep up with the rate of incoming messages. Obviously there is a need to scale consumption from topics. Just like multiple producers can write to the same topic, we need to allow multiple consumers to read from the same topic, splitting the data between them.

Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and belong to the same consumer group, then each consumer in the group will receive messages from a different subset of the partitions in the topic.

Kafka的消费者通常都属于某一个消费组的一部分，当多个消费者订阅了一个主题并且属于同一个消费组，那么消费组中的每个消费者都会接收到主题的不同子集分区（一个主题分成多个分区，每个消费者分配到了不同的分区）。

### 消费组平衡

As we’ve seen in the previous section, consumers in a consumer group share ownership of the partitions in the topics they subscribe to. When we add a new consumer to the group it starts consuming messages from partitions which were previously consumed by another consumer. The same thing happens when a consumer shuts down or crashes, it leaves the group, and the partitions it used to consume will be consumed by one of the remaining consumers. Reassignment of partitions to consumers also happen when the topics the consumer group is consuming are modified, for example if an administrator adds new partitions.

The event in which partition ownership is moved from one consumer to another is called a rebalance. Rebalances are important since they provide the consumer group with both high-availability and scalability (allowing us to easily and safely add and remove consumers), but in the normal course of events they are fairly undesirable. During a rebalance, consumers can’t consume messaged, so a rebalance is in effect a short window of unavailability on the entire consumer group. In addition, when partitions are moved from one consumer to another the consumer loses its current state, if it was caching any data, it will need to refresh its caches - slowing down our application until the consumer sets up its state again. Throughout this chapter we will discuss how to safely handle rebalances and how to avoid unnecessary rebalances.

The way consumers maintain their membership in a consumer group and their ownership on the partitions assigned to them is by sending heartbeats to a Kafka broker designated as the Group Coordinator (note that this broker can be different for different consumer groups). As long the consumer is sending heartbeats in regular intervals, it is assumed to be alive, well and processing messages from its partitions. In fact, the act of polling for messages is what causes the consumer to send those heartbeats. If the consumer stops sending heartbeats for long enough, its session will time out and the group coordinator will consider it dead and trigger a rebalance. Note that if a consumer crashed and stopped processing messages, it will take the group coordinator few seconds without heartbeats to decide it is dead and trigger the rebalance. During those seconds, no messages will be processed from the partitions owned by the dead consumer. When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing. Later in this chapter we will discuss configuration options that control heartbeat frequency and session timeouts and how to set those to match your requirements.

HOW DOES THE PROCESS OF ASSIGNING PARTITIONS TO BROKERS WORK?
When a consumer wants to join a group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator (this will include all consumers that sent a heartbeat recently and are therefore considered alive) and it is responsible for assigning a subset of partitions to each consumer. It uses an implementation of PartitionAssignor interface to decide which partitions should be handled by which consumer. Kafka has two built-in partition assignment policies, which we will discuss in more depth in the configuration section. After deciding on the partition assignment, the consumer leader sends the list of assignments to the GroupCoordinator which sends this information to all the consumers. Each consumer only sees his own assignment - the leader is the only client process that has the full list of consumers in the group and their assignments. This process repeats every time a rebalance happens.

## 轮询循环

1. 这里确实是一个死循环，消费者通常都是长时间运行的应用程序，会持续从Kafka中拉取数据。后面我们会展示如何干净地退出循环，并且关闭消费者。
2. 这一行是本章最重要的一行代码。就像鲨鱼要么保持不断游动要么死亡，消费者必须一直轮询Kafka，否则就会被认为挂掉了，就会导致消费的分区被组中其他的消费者继续处理。
3. poll()方法返回一批记录集。每条记录会包含这条记录来源于哪个主题和分区、这条记录在分区中的偏移量、当然还有这条记录的键值。通常我们会迭代列表，并且处理每一条单机的记录。poll()方法也可以接受一个超时时间参数，表示执行轮询最多花费多长时间，不管轮询的结果有没有数据。这个超时值通常由应用程序是否需要快速响应来决定，也就是你要在轮询之后多快返回对主线程的控制（消费者的轮询是单线程阻塞的，所以如果想要尽快在拉取到消息后马上处理，可以缩短超时时间，当时间超过后，轮询结束，就可以执行消息处理逻辑）。
4. 消息处理通常最后会写入到数据存储系统或者更新已有的记录。本例的目标是为了跟踪每个国家的顾客数量，所以我们更新了字典表然后将结果打印为JSON字符串，实际应用中一般会将更新记录写入到存储系统中。
5. 总是在退出时执行close()方法。这会关闭网络连接和Socket，并且会立即触发平衡操作，而不是让协调者来发现消费者可能因为挂掉而没有及时发送心跳，那样会等待更长的时间，也会导致分区子集的消息在更长的时间内不能被任何消费者所消费。

## 提交和偏移量

### 自动提交偏移量

Consider that by defaults automatic commit occurs every 5 seconds. Suppose that we are 3 seconds after the most recent commit and a rebalance is triggered. After the rebalancing all consumers will start consuming from the last offset committed. In this case the offset is 3 seconds old, so all the events that arrived in those 3 seconds will be processed twice. It is possible to configure the commit interval to commit more frequently and reduce the window in which records will be duplicated, but it is impossible to completely eliminate them.

Note that with auto-commit enabled, a call to poll will always commit the last offset returned by the previous poll. It doesn’t know which events were actually processed, so it is critical to always process all the events returned by poll before calling poll again (or before calling close(), it will also automatically commit offsets). This is usually not an issue, but pay attention when you handle exceptions or otherwise exit the poll loop prematurely.

### 手动提交偏移量

Most developers use to exercise more control over the time offsets are committed. Both to eliminate the possibility of missing messages and to reduce the number of messages duplicated during rebalancing. Te consumer API has the option of committing the current offset at a point that makes sense to the application developer rather than based on a timer.

By setting auto.commit.offset = false, offsets will only be committed when the application explicitly chooses to do so. The simplest and most reliable of the commit APIs is commitSync(). This API will commit the latest offset returned by poll() and return once the offset is committed, throwing an exception if commit fails for some reason.

It is important to remember that commitSync() will commit the latest offset returned by poll(), so make sure you call commitSync() after you are done processing all the records in the collection, or you risk missing messages as described above. Note that when rebalance is triggered, all the messages from the beginning of the most recent batch until the time of the rebalance will be processed twice.

Here is how we would use commitSync to commit offsets once we finished processing the latest batch of messages:

1. 这里假设打印一条记录的内容表示已经处理完了该记录。你实际的应用程序肯定比这要复杂，你应该根据你的实际用例决定什么时候记录被处理完成。
2. 一旦处理完了这一批的所有记录，我们调用了commitSync()提交这一批的最近偏移量，并且这个操作发生在下一次轮询拉取新消息之前。
3. 在没有不可恢复的错误下，调用commitSync()提交偏移量失败应该重试，这里只记录错误日志。

### 异步提交偏移量

One drawback of manual commit is that the application is blocked until the broker responds to the commit request. This will limit the throughput of the application. Throughput can be improved by committing less frequently, but then we are increasing the number of potential duplicates that a rebalance will create.

The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a non-retriable failure, commitAsync() will not retry. The reason it does not retry is that by the time commitAsync() receives a response from the server, there may have been a later commit which was already successful. Imagine that we sent a request to commit offset 2000. There is a temporary communication problem, so the broker never gets the request and therefore never respond. Meanwhile, we processed another batch and successfully committed offset 3000. If commitAsync() now retries the previously failed commit, it may succeed in committing offset 2000 after offset 3000 was already processed and committed. In case of a rebalance, this will cause more duplicates.

We are mentioning this complication and the importance of correct order of commits, because commitAsync() also gives you an option to pass in a callback that will be triggered when the broker responds. It is common to use the callback to log commit errors or to count them in a metric, but if you want to use the callback for retries, you need to be aware of the problem with commit order.

A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and the sequence number at the time of the commit to the asyncCommit callback. When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable, if it is - there was no newer commit and it is safe to retry. If the instance sequence number is higher, don’t retry since a newer commit was already sent.

### 结合同步和异步提交

Normally, occasional failures to commit without retrying are not a huge problem, since if the problem is temporary the following commit will be successful. But if we know that this is the last commit before we close the consumer, or before a rebalance, we want to make extra sure that the commit succeeds.

Therefore a common pattern is to combine commitAsync with commitSync just before shutdown. Here is how it works (We will discuss how to commit just before rebalance when we get to the section about rebalance listeners):

1. 当一切工作的很正常时，我们使用异步的commitAsync，它很快，而且如果一次提交失败了，下一次会重试
2. 在关闭消费者时，不会有下一次提交了，我们就调用同步的commitSync，它会重试直到提交成功

### 提交指定的偏移量

Committing the latest offset only allows you to commit as often as you finish processing batches. But what if you want to commit more frequently than that? What if poll() returns a huge batch and you want to commit offsets in the middle of the batch to avoid having to process all those rows again if a rebalance occures? You can’t just call commitSync() or commitAsync() - this will commit the last offset returned, which you didn’t get to process yet.

Fortunately, the consumer API allows you to call commitSync() and commitAsync() and pass a map of partitions and offsets that you wish to commit. If you are in the middle of processing a batch of records, and the last message you got from partition 3 in topic “customers” has offset 5000, you can call commitSync() to commit offset 5000 for partition 3 in topic “customers”. Since your consumer may be consuming more than a single partition, you will need to track offsets on all of them, so moving to this level of precision in controlling offset commits adds complexity to your code.

1. 我们使用这个Map字典结构用来手动跟踪偏移量
2. 这里用打印记录的方式代替实际的业务处理
3. 读取完每条记录后，用最近的偏移量更新偏移量字典
4. 这里我们决定每隔1000条记录提交一次，实际应用中可以根据时间提交甚至是记录内容
5. 这里选择调用commitAsync，不过commitSync也同样有效。当然提交指定的偏移量，也仍然需要处理前面章节中提到的错误。

## 平衡监听器

As we mentioned in previous section about committing offsets, a consumer will want to do some cleanup work before exiting and also before partition rebalancing.

If you know your consumer is about to lose ownership of a partition, you will want to commit offsets of the last event you’ve processed. If your consumer maintained a buffer with events that it only processes occasionally (for example, the currentRecords map we used when explaining pause() functionality), you will want to process the events you accumulated before losing ownership of the partition. Perhaps you also need to close file handles, database connections and such.

• public void onPartitionsRevoked(Collection<TopicPartition> partitions)会在平衡开始之前以及消费者停止消费消息之后调用。在这里通常要提交偏移量，这样无论下一个消费者是谁，它获得到分区后，就知道要从哪里开始。
• public void onPartitionsAssigned(Collection<TopicPartition> partitions)会在分区重新分配给消费者之后，在消费者开始消费消息之前调用。

In this example we don’t need to do anything when we get a new partition, we’ll just start consuming messages.

3
However, when we are about to lose a partition due to rebalancing, we need to commit offsets. Note that we are committing the latest offsets we’ve processed, not the latest offsets in the batch we are still processing. This is because a partition could get revoked while we are still in the middle of a batch. We are committing offsets for all partitions, not just the partitions we are about to lose - since the offsets are for events that were already processed, there is no harm in that. Last, note that we are using syncCommit() to make sure the offsets are committed before the rebalance proceeds.

4
The most important part - pass the ConsumerRebalanceListener to subscribe() method so it will get invoked by the consumer.

1. 我们从实现ConsumerRebalanceListener监听器开始
2. 本例中，在分配到新分区之后我们没有做任何事情，接下来只是消费消息而已
3. 然而，由于平衡导致失去分区的控制权时，需要提交偏移量。注意我们提交的是已经处理完的消息的最近偏移量，而不是当前一批仍然在处理的最近偏移量，因为在处理一批记录的中间也有可能分区被取消（这样可以最大限度地减少平衡之后重复处理的数据量，但还是不可避免数据重复）。同时我们会提交所有分区的偏移量（属于当前消费者的），而不是我们即将失去的某些分区，因为currentOffsets字典针对的是所有已经处理完的事件，所以这并没有什么大的影响。最后，我们使用了同步的syncCommit来确保在平衡发生时成功地提交了偏移量。
4. 最重要的一部分，将步骤1创建的监听器传递给subscribe()方法，这样就可以被消费者调用

## 正好一次的处理语义

So far we’ve seen how to use poll() to start consuming messages from the last committed offset in each partition and to proceed in processing all messages in sequence. However, sometimes you want to start reading at a different offset.

If you want to start reading all messages from the beginning of the partition, or you want to skip all the way to the end of the partition and start consuming only new messages, there are APIs specifically for that: seekToBeginning(TopicPartition tp) and seekToEnd(TopicPartition tp).

However, the Kafka API also lets you seek to a specific offset. This ability can be used in a variety of ways, for example to go back few messages or skip ahead few messages (perhaps a time-sensitive application that is falling behind will want to skip ahead to more relevant messages), but the most exciting use-case for this ability is when offsets are stored in a system other than Kafka.

Think about this common scenario: Your application is reading events from Kafka (perhaps a clickstream of users in a website), processes the data (perhaps clean up clicks by robots and add session information) and then store the results in a database, NoSQL store or Hadoop. Suppose that we really don’t want to lose any data, nor do we want to store the same results in the database twice.

Note that we are very paranoid, so we commit offsets after processing each record. However, there is still a chance that our application will crash after the record was stored in the database but before we committed offsets, causing the record to be processed again and the database to contain duplicates.

This could be avoided if there was only a way to store both the record and the offset in one atomic action. Either both the record and the offset are committed, or neither of them are committed. As long as the records are written to a database and the offsets to Kafka, this is impossible.

But what if we wrote both the record and the offset to the database, in one transaction? Then we’ll know that either we are done with the record and the offset is committed or we are not, and the record will be reprocessed.

Now the only problem is: if the record is stored in a database and not in Kafka, how will our consumer know where to start reading when it is assigned a partition? This is exactly what seek() can be used for. When the consumer starts or when new partitions are assigned, it can look up the offset in the database and seek() to that location.

1. 我们使用了一个虚构的方法来保证提交事务操作到数据库中。这里的考虑是在处理记录的时候会插入记录和偏移量到数据库中，所以我们只需要在即将事务分区的所有权时提交这个事务，来确保这些信息被持久化。
2. 我们还有一个虚构的方法会从数据库中读取分区的偏移量，然后在获得新分区的所有权时通过消费者的seek()方法定位到这些记录。
3. 当消费者订阅主题并第一次启动时，立即调用一次无阻塞的poll(0)方法，来确保加入消费组，并且得到分配的分区。然后紧接着调用seek()定位到分配给我们（当前消费者）的分区的正确位置。注意seek()仅仅更新了我们要从哪里开始消费的位置，所以下一次调用poll()才会开始拉取正确的消息。如果在seek()时发生错误（比如偏移量不存在），调用poll()时就会抛出异常。
4. 又一个虚构的方法，这次我们更新了数据库中存储偏移量的一张表。这里我们假设更新记录的操作很快就完成了，所以我们在每条记录上都执行了更新操作。不过提交偏移量是比较慢的，所以我们只在一批数据都处理完成后才执行提交操作。不过这里面仍然有很多优化的方法。

## 如何退出

When you decide to exit the poll loop, you will need another thread to call consumer.wakeup(). If you are running the consumer loop in the main thread, this can be done from a ShutdownHook. Note that consumer.wakeup() is the only consumer method that is safe to call from a different thread. Calling wakeup will cause poll() to exit with WakeupException, or if consumer.wakeup() was called while the thread was not waiting on poll, the exception will be thrown on the next iteration when poll is called. The WakeupException doesn’t need to be handled, it was just a way of breaking out of the loop, but it is important that before exitting the thread, you will call consumer.close(), this will do any last commits if needed and will send the group coordinator a message that the consumer is leaving the group, so rebalancing will be triggered immediately and you won’t need to wait for the session to time out.

1. 关闭钩子运行在独立的线程（不同于主线程），所以唯一安全的操作是调用wakeup()中断轮询循环。
2. 其他线程调用wakeup()会导致poll()调用抛出WakeupException异常。你应该捕获这个异常，确保应用程序不会以非预期的方式退出，不过这里不需要做任何事情。
3. 在退出消费者之前，确保干净地关闭消费者。

## 反序列化

As discussed in the previous chapter, Kafka Producers require serializers to convert objects into byte arrays that are then sent to Kafka. Similarly, Kafka Consumers require deserializers to convert byte arrays recieved from Kafka into Java objects. In previous examples, we just assumed that both the key and the value of each message are Strings and we used the default StringDeserializer in the Consumer configuration.

In the previous chapter about the Kafka Producer, we’ve seen how to serialize custom types and how to use Avro and AvroSerializers to generate Avro objects from schema definitions and then serialize them when producing messages to Kafka. We will now look at how to create custom deserializers for your own objects and how to use Avro and its deserializers.

It should be obvious that the serializer that was used in producing events to Kafka must match the deserializer that will be used when consuming events. Serializing with IntSerializer and then deserializing with StringDeserializer will not end well. This means that as a developer you need to keep track of which serializers were used to write into each topic, and make sure each topic only contains data that the deserializers you use can interpret. This is one of the benefits of using Avro and the Schema Repository for serializing and deserializing - the AvroSerializer can make sure that all the data written to a specific topic is compatible with the schema of the topic, which means it can be deserialized with the matching deserializer and schema. Any errors in compatibility - in the producer or the consumer side will be caught easily with an appropriate error message, which means you will not need to try to debug byte arrays for serialization errors.

### 自定义反序列化

1. 自定义的CustomerDeserializer实现了Deserializer接口，泛型为Customer对象，注意类（Customer）和序列化器（CustomerDeserializer）需要在生产者和消费者应用程序中完全匹配。在一个规模庞大的组织机构中，有很多的生产者和消费者应用程序会共享Kafka的数据，对于这种约定而言是一个很大的挑战。
2. 我们这里仅仅是对序列化逻辑的逆向处理，从字节数组中获取顾客编号和名称，并且使用它们来构造自定义的对象。

Again, it is important to note that implementing custom serializer and deserializer is not a recommended practice. It tightly couples producers and consumers and is fragile and error-prone. A better solution would be to use a standard message format such as Thrift, Protobuf or Avro. We’ll now see how to use Avro deserializers with the kafka consumer. For background on Apache Avro, its schemas and schema-compatibility capabilities, please refer back to Chapter 3.

### Avro反序列化

Lets assume we are using the implementation of Customer class in Avro that was shown in Chapter 3. In order to consume those objects from Kafka, you want to implement a consuming application similar to this:

1. 我们使用了KafkaAvroDeserializer来反序列化Avro消息
2. schema.registry.url是一个新的参数，它仅仅表示存储模式的地址。这样消费者就可以使用生产者序列化消息时注册的模式。
3. 指定生成的Customer类，作为记录的值类型
4. record.value()返回值是一个Customer实例，于是我们就可以直接使用它

## 消费者配置

fetch.min.bytes
This property allows a consumer to specify the minimum amount of data that it wants to receive from the broker when fetching records. If a Broker receives a request for records from a Consumer but the new records amount to fewer bytes than min.fetch.bytes, the broker will wait until more messages are available before sending the records back to the consumer. This reduces the load on both the Consumer and the Broker as they have to handle fewer back-and-forward messages in cases where the topics don’t have much new activity (or for lower activity hours of the day). You will want to set this parameter higher than the default if the Consumer is using too much CPU when there isn’t much data available, or to reduce load on the brokers when you have large number of consumers.

fetch.max.wait.ms
By setting fetch.min.bytes you tell Kafka to wait until it has enough data to send before responding to the consumer. fetch.max.wait.ms lets you control how long to wait. By default Kafka will wait up to 500ms. This results in up to 500ms of extra latency in case there is not enough data flowing to the Kafka topic to satisfy the minimum amount of data to return. If you want to limit the potential latency (usually due to SLAs controlling the maximum latency of the application), you can set fetch.max.wait.ms to lower value. If you set fetch.max.wait.ms to 100ms and fetch.min.bytes to 1MB, Kafka will recieve a fetch request from the consumer and will respond with data either when it has 1MB of data to return or after 100ms, whichever happens first.

max.partition.fetch.bytes
This property controls the maximum number of bytes the server will return per partition. The default is 1MB, which means that when KafkaConsumer.poll() returns ConsumerRecords, the record object will use at most max.partition.fetch.bytes per partition assigned to the Consumer. So if a topic has 20 partitions, and you have 5 consumers, each consumer will need to have 4MB of memory available for ConsumerRecords. In practice, you will want to allocate more memory as each consumer will need to handle more partitions if other consumers in the group fail.max.partition.fetch.bytes must be larger than the largest message a broker will accept (max.message.size property in the broker configuration), or the broker may have messages that the consumer will be unable to consumer, in which case the consumer will hang trying to read them.

Another important consideration when setting max.partition.fetch.bytes is the amount of time it takes the consumer to process data. As you recall, the consumer must call poll() frequently enough to avoid session timeout and subsequent rebalance. If the amount of data a single poll() returns is very large, it may take the consumer longer to process, which means it will not get to the next iteration of the poll loop in time to avoid a session timeout. If this occures the two options are either to lower max.partition.fetch.bytes or to increase the session timeout.

session.timeout.ms
The amount of time a consumer can be out of contact with the brokers while still considered alive, defaults to 3 seconds. If a consumer goes for more than session.timeout.ms without sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group. This property is closely related to heartbeat.interval.ms. heartbeat.interval.ms controls how frequently the KafkaConsumer poll() method will send a heartbeat to the group coordinator, while session.timeout.ms controls how long can a consumer go without sending a heartbeat.

Therefore, thoese two properties are typically modified together - heatbeat.interval.ms must be lower than session.timeout.ms, and is usually set to a 1/3 of the timeout value. So if session.timeout.ms is 3 seconds, heartbeat.interval.ms should be 1 second. Setting session.timeout.ms lower than default will allow consumer groups to detect and recover from failure sooner, but may also cause unwanted rebalances as result of consumers taking longer to complete the poll loop or garbage collection. Setting session.timeout.ms higher will reduce the chance of accidental rebalance, but also means it will take longer to detect a real failure.

session.timeout.ms设置的比默认值要低时，允许消费组更快地检测和恢复故障。但同时也会造成不必要的平衡，比如消费者可能花费比较长的时间完成一次轮询调用或者正在发送垃圾回收，而并没有真正挂掉，但是因为会话超时时间很短，导致发生更频繁的平衡。如果设置session.timeout.ms太大了，虽然可以减少偶然的平衡，但同时也意味着要花费更长的时间才能检测到真正的错误。

auto.offset.reset
This property controls the behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid (usually because the consumer was down for so long that the record with that offset was already aged out of the broker). The default is “latest”, which means that lacking a valid offset the consumer will start reading from the newest records (records which were written after the consumer started running). The alternative is “earliest”, which means that lacking a valid offset the consumer will read all the data in the partition, starting from the very beginning.

enable.auto.commit
We discussed the different options for committing offsets earlier in this chapter. This parameter controls whether the consumer will commit offsets automatically and defaults to true. Set it to false if you prefer to control when offsets are committed, which is necessary to minimize duplicates and avoid missing data. If you set enable.auto.commit to true then you may also want to control how frequently offsets will be committed using auto.commit.interval.ms.

partition.assignment.strategy
We learned that partitions are assigned to consumers in a consumer group. A PartitionAssignor is a class that, given consumers and topics they subscribed to, decides which partitions will be assigned to which consumer. By default Kafka has two assignment strategies: * Range - which assigns to each consumer a consecutive subset of partitions from each topic it subscribes to. So if consumers C1 and C2 are subscribed to two topics, T1 and T2 and each of the topics has 3 partitions. Then C1 will be assigned partitions 0 and 1 from topics T1 and T2, while C2 will be assigned partition 2 from those topics. Note that because each topic has uneven number of partitions and the assignment is done for each topic independently, the first consumer ended up with more partitions than the second. This happens whenever Range assignment is used and the number of consumers does not divide the number of partitions in each topic neatly.

• RoundRobin - which takes all the partitions from all subscribed topics and assigns them to consumers sequentially, one by one. If C1 and C2 described above would use RoundRobin assignment, C1 would have partitions 0 and 2 from topic T1 and partition 1 from topic T2. C2 would have partition 1 from topic T1 and partitions 0 and 2 from topic T2. In general, if all consumers are subscribed to the same topics (a very common scenario), RoundRobin assignment will end up with all consumers having the same number of partitions (or at most 1 partition difference). partition.assignment.strategy allows you to choose a partition assignment strategy. The default is org.apache.kafka.clients.consumer.RangeAssignor which implements the Range strategy described above. You can replace it with org.apache.kafka.clients.consumer.RoundRobinAssignor. A more advanced option will be to implement your own assignment strategy, in which case partition.assignment.strategy should point to the name of your class.

partition.assignment.strategy配置允许你选择一个分区的分配策略。默认是实现了Range策略的org.apache.kafka.clients.consumer.RangeAssignor类，你也可以替换成org.apache.kafka.clients.consumer.RoundRobinAssignor类。当然你也可以实现自定义的分配策略，这种情况下应该将partition.assignment.strategy指向你自定义的类路径。

client.id
This can be any string, and will be used by the brokers to identify messages sent from the client. It is used in logging, metrics and for quotas.

## 单机消费者（不使用消费组）

So far we discussed consumer groups, where partitions are assigned automatically to consumers and are rebalanced automatically when consumers are added or removed from the group. Typically, this behavior is just what you want, but in some cases you want something much simpler. Sometimes you know you have a single consumer that always needs to read data from all the partitions in a topic, or from a specific partition in a topic. In this case there is no reason for groups or rebalances, just subscribe to specific topic and/or partitions, consume messages and commit offsets on occasion.

If this is the case, you don’t subscribe to a topic, instead you assign yourself few partitions. Here is an example of how a consumer can assign itself all partitions of a specific topic and consume from them:

1. 我们向集群请求获取指定主题的所有可用分区，如果你只需要消费某个特定的分区，可以跳过这一步。
2. 当我们知道了想要的哪些分区，调用assign()方法时，传递分区列表即可。

Note that other than the lack of rebalances and the need to manually find the partitions, everything looks normal. Just remember that if someone adds new partitions to the topic, the consumer will not be notified. So either handle this by checking consumer.partitionsFor() periodically or keep in mind that if an admin add partitions, the applications will require bouncing. Also note that a consumer can either subscribe to topics (and be part of a consumer group), or assign itself partitions, but not both at the same time.

## 旧的消费者API

In this chapter we discussed the Java KafkaConsumer client that is part of org.apache.kafka.clients package. At the time of writing this chapter, Apache Kafka still has two older clients written in Scala that are part of kafka.consumer package which is part of the core Kafka module. These consumers are called SimpleConsumer (which is not very simple. It is a thin wrapper around the Kafka APIs that allow you to consume from specific partitions and offsets) and the High Level Consumer, also known as ZookeeperConsumerConnector, which is somewhat similar to the current consumer in that it has consumer groups and it rebalances partitions - but it uses Zookeeper to manage consumer groups and it does not give you the same control over commits and rebalances as we have now.

Because the current consumer supports both behaviors and gives much more reliability and control to the developer, we will not discuss the older APIs. If you are interested in using them, please think twice and then refer to Apache Kafka documentation to learn more.

Because the current consumer supports both behaviors and gives much more reliability and control to the developer, we will not discuss the older APIs. If you are interested in using them, please think twice and then refer to Apache Kafka documentation to learn more.

EOF. 2016.10.29