# Introduce Kafka

## Background

MQ的通讯模式

Mode Feature

Concepts Function
Topic 用于划分Message的逻辑概念，一个Topic可以分布在多个Broker上。
Partition 是Kafka中横向扩展和一切并行化的基础，每个Topic都至少被切分为1个Partition。
Offset 消息在Partition中的编号，编号顺序不跨Partition(在Partition内有序)。
Consumer 用于从Broker中取出/消费Message。
Producer 用于往Broker中发送/生产Message。
Replication Kafka支持以Partition为单位对Message进行冗余备份，每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)。

Consumergroup 各个consumer可以组成一个组，每个消息只能被组中的一个consumer消费，如果一个消息可以被多个consumer消费的话，那么这些consumer必须在不同的组。

push-and-pull Kafka中的Producer和consumer采用的是push-and-pull模式，即Producer只管向broker push消息，consumer只管从broker pull消息，两者对消息的生产和消费是异步的。
Broker之间的关系 不是主从关系，各个broker在集群中地位一样，我们可以随意的增加或删除任何一个broker节点。

## Kafka Replication

The leader maintains a set of in-sync replicas (ISR): the set of replicas that have fully caught up with the leader.
For each partition, we store in Zookeeper the current leader and the current ISR.

Each replica stores messages in a local log and maintains a few important offset positions in the log.
The log end offset (LEO) represents the tail of the log.
The high watermark (HW) is the offset of the last committed message.

LEO代表了日志的最新的偏移量,HW是最近提交消息的偏移量(HW也是每个Replica都有的吗?).

Each log is periodically synced to disks. Data before the flushed offset is guaranteed to be persisted on disks.
As we will see, the flush offset can be before or after HW.

follower在内存中的消息不一定什么时候写到磁盘上,即可能在hw增加前就写到磁盘,或者等hw增加后才写到磁盘).

Writes
To publish a message to a partition, the client first finds the leader of the partition from
Zookeeper and sends the message to the leader.The leader writes the message to its local log.
Each follower constantly pulls new messages from the leader using a single socket channel.
That way, the follower receives all messages in the same order as written in the leader.

The follower writes each received message to its own log and sends an acknowledgment back to the leader.
Once the leader receives the acknowledgment from all replicas in ISR, the message is committed.
The leader advances the HW and sends an acknowledgment to the client.

For better performance, each follower sends an acknowledgment after the message is written to memory.
So, for each committed message, we guarantee that the message is stored in multiple replicas in memory.
However, there is no guarantee that any replica has persisted the commit message to disks though.
Given that correlated failures are relatively rare, this approach gives us a good balance between response time and durability.
In the future, we may consider adding options that provide even stronger guarantees.

The leader also periodically broadcasts the HW to all followers.
The broadcasting can be piggybacked(背负) on the return value of the fetch requests from the followers.
From time to time, each replica checkpoints its HW to its disk.

For simplicity, reads are always served from the leader. Only messages up to the HW are exposed to the reader.

Kafka的复制机制既不是完全的同步复制，也不是单纯的异步复制。事实上，

Checkpoint用在Follower failure是怎么解决HW的同步问题:
After a configured timeout period, the leader will drop the failed follower from its ISR
and writes will continue on the remaining replicas in ISR.

(注意:这里讨论的是一个Partition的follower副本,而不是节点,如果是一个节点,它不止存储一个Partition,而且不都是follower)

If the failed follower comes back, it first truncates its log to the last checkpointed HW.
It then starts to catch up all messages after its HW from the leader.
When the follower fully catches up, the leader will add it back to the current ISR.

## Consumer Design

### Group management protocol

Rebalancing is the process where a group of consumer instances (belonging to the same group) co-ordinate to own a mutually exclusive set of partitions of topics that the group is subscribed to. At the end of a successful rebalance operation for a consumer group, every partition for all subscribed topics will be owned by a single consumer instance within the group. The way rebalancing works is as follows. Every broker is elected as the coordinator for a subset of the consumer groups. The co-ordinator broker for a group is responsible for orchestrating a rebalance operation on consumer group membership changes or partition changes for the subscribed topics. It is also responsible for communicating the resulting partition ownership configuration to all consumers of the group undergoing a rebalance operation.

Consumer消费者的工作过程:

2.消费者连接协调节点,并发送HeartbeatRequest.如果返回的HeartbeatResponse中返回IllegalGeneration错误码,说明协调节点已经在初始化平衡.

3.如果HeartbeatResponse没有错误返回,消费者会从它上次拥有的partitions列表继续抓取数据,这个过程是不会被中断的.

Co-ordinator协调节点的工作过程:
1.在稳定状态下,协调节点通过故障检测协议跟踪每个消费组中每个消费者的健康状况.
2.在选举和启动时,协调节点读取它管理的消费组列表,以及从ZK中读取每个消费组的成员信息.如果之前没有成员信息,它不会做任何动作.

3.当协调节点完全加载完它所负责的消费组列表的所有组成员之前,它会在以下几种请求的响应中返回CoordinatorStartupNotComplete错误码:
HeartbeatRequest,OffsetCommitRequest,JoinGroupRequest.这样消费者就会果断时间重试(直到完全加载,没有错误码返回为止).

5.当HeartbeatResponse返回IllegalGeneration错误码,就会触发平衡操作. 一旦所有存活的消费者通过JoinGroupRequests重新注册到协调节点

6.协调节点会跟踪任何一个消费者已经注册的topics的topic-partition的变更. 如果它检测到某个topic新增的partition,就会触发平衡操作.

### High Level Consumer

high-level方式的消费不用关心offset,它会自动的读ZK中这个Consumer Group的last offset(最近读取过的消息).

• 如果consumer比partition多，是浪费，因为kafka的设计是在一个partition上是不允许并发的，所以consumer数不要大于partition数
• 如果consumer比partition少，一个consumer会对应于多个partitions，这里主要合理分配consumer数和partition数，否则会导致partition里面的数据被取的不均匀.
最好partiton数目是consumer数目的整数倍，所以partition数目很重要，比如取24，就很容易设定consumer数目
• 如果consumer从多个partition读到数据，不保证数据间的顺序性，kafka只保证在一个partition上数据是有序的，但多个partition，根据你读的顺序会有不同
• 增减consumer，broker，partition会导致rebalance，所以rebalance后consumer对应的partition会发生变化
• High-level接口中获取不到数据的时候是会block住消费者线程的

### Simple Consumer

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

• 同一条消息读多次
• 在一个线程中只读取某个Topic的部分Partition
• 管理事务，从而确保每条消息被处理一次，且仅被处理一次

• 必须跟踪offset，从而确定下一条应该消费哪条消息(上一次消费到哪里了)

• 找出每个Partition的Follower(Replica Brokers)
• 定义好请求，该请求应该能描述应用程序需要哪些数据
• Fetch数据

### New Consumer

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

### Partitions

http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

When publishing a keyed message, Kafka deterministically maps the message to a partition based on the hash of the key. This provides a guarantee that messages with the same key are always routed to the same partition. This guarantee can be important for certain applications since messages within a partition are always delivered in order to the consumer. If the number of partitions changes, such a guaranteemay no longer hold. 之前认为只用一个Partition保证有序性，其实还可以这么干..

The end-to-end latency in Kafka is defined by the time from when a message is published by the producer to when the message is read by the consumer. Kafka only exposes a message to a consumer after it has been committed