Kafka源码分析 Producer Scala客户端

Kafka的Producer Scala客户端API实现

Scala OldProducer

scala版本的生产者发送消息示例: 面向KeyedMessage,指定了topic和消息内容.

1
2
3
4
5
6
7
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.4.31:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
Producer<Integer,String> producer = new Producer<Integer, String>(new ProducerConfig(props));
KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>("myTopic", "message");
producer.send(data);
producer.close();

问:core下也有Producer和Consumer.scala, 和clients中的KafkaProducer,KafkaConsumer有什么区别?
答:从ConsoleProducer看到两种不同的实现:OldProducer->scala的Producer,NewShinyProducer->KafkaProducer

1
2
3
4
5
val producer = if(config.useOldProducer) {
new OldProducer(getOldProducerProps(config))
} else {
new NewShinyProducer(getNewProducerProps(config))
}

旧的Producer消息用KeyedMessage,新的用ProducerRecord.不同的Producer实现,用trait定义共同的send接口.
因为消息最终是以字节的形式存储在日志文件中的,所以字节数组的key和value可以作为两种不同实现的共同存储结构.

1
2
3
4
trait BaseProducer {
def send(topic: String, key: Array[Byte], value: Array[Byte])
def close()
}

k_base_producer

用scala实现的Producer构造方式是一样的,需要指定分区方式,Key,Value的序列化.如果是异步还有一个发送线程.

在java版本中用了RecordAccumulator,RecordBatch,MemoryRecords等来缓存一批消息,scala版本用简单的队列来缓存.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Producer[K,V](val config: ProducerConfig, private val eventHandler: EventHandler[K,V]) extends Logging {
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
private var producerSendThread: ProducerSendThread[K,V] = null
config.producerType match {
case "sync" =>
case "async" =>
sync = false
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-"+config.clientId, queue, eventHandler,config.queueBufferingMaxMs, config.batchNumMessages, config.clientId)
producerSendThread.start()
}

def this(config: ProducerConfig) = this(config, new DefaultEventHandler[K,V](config,
CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),
CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),
CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),
new ProducerPool(config)))
}

注意ProducerSendThread线程处理消息也是通过事件处理器eventHandler的,当然少不了阻塞队列queue.

同步发送消息直接调用事件处理器, 异步发送消息则会加入到阻塞队列BlockingQueue,
通过后台ProducerSendThread线程完成异步发送,类似于java版本的Sender线程

k_event_handler

1
2
3
4
5
6
7
8
def send(messages: KeyedMessage[K,V]*) {
lock synchronized {
sync match {
case true => eventHandler.handle(messages)
case false => asyncSend(messages)
}
}
}

上面的send方法其实就是最开始的Producer示例了,参数用*表示可以发送多条KeyedMessage消息.
异步发送就是将客户端代码中传入的消息messages转存到Producer的队列中.
然后ProducerSendThread在Producer中被启动了,就可以从队列中消费消息,完成消息的发送动作.

1
2
3
4
5
6
7
8
9
10
11
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
config.queueEnqueueTimeoutMs match {
case 0 => queue.offer(message)
case _ => config.queueEnqueueTimeoutMs < 0 match {
case true => queue.put(message); true
case _ => queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
}
}
}
}

在async方式中,将产生的数据放入queue时有三种不同的放入方式:

  • 当queue.enqueue.timeout.ms=0,则立即放入queue中并返回true,若queue已满,则立即返回false
  • 当queue.enqueue.timeout.ms<0,则立即放入queue,若queue已满,则一直等待queue释放空间
  • 当queue.enqueue.timeout.ms>0,则立即放入queue中并返回true,若queue已满,则等待queue.enqueue.timeout.ms指定的时间以让queue释放空间,若时间到queue还是没有足够空间,则立即返回false

ProducerSendThread

批处理的方式是在每次从queue中poll一条消息,先加入到一个数组events中,并在每次加入之后判断是否超过batchSize.
如果超过batchSize,则进行一次批处理,同时重置events数组和设置最后一次发送的时间.最后还需要有一次handl处理.

批处理的操作方式和java版本的RecordAccumulator类似,每次添加新消息时,都要判断加了之后,是否可以进行批处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private def processEvents() {
var lastSend = SystemTime.milliseconds
var events = new ArrayBuffer[KeyedMessage[K,V]]
var full: Boolean = false

// drain the queue until you get a shutdown command
Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
.takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
currentQueueItem =>
val expired = currentQueueItem == null
if(currentQueueItem != null) {
events += currentQueueItem // 加入到临时数组中
}
full = events.size >= batchSize // check if the batch size is reached
if(full || expired) { // 除了batch满了,还可能是没有消息了
tryToHandle(events) // 开始批处理
events = new ArrayBuffer[KeyedMessage[K,V]]
}
}
tryToHandle(events) // send the last batch of events
}

因为创建ProducerSendThread也指定了默认的eventHandler,所以在得到每一批消息时,可以交给handler处理了.
而对于同步的发送方式,是直接在handler上处理全部数据.而异步是将全部消息先放到队列中,再一小批一小批地处理.

1
2
3
4
def tryToHandle(events: Seq[KeyedMessage[K,V]]) {
val size = events.size
if(size > 0) handler.handle(events)
}

目前为止,scala的代码看起来非常简洁.主要是使用了比较简单的阻塞队列来缓存消息,而不像java中自己实现了很多类.

BrokerPartitionInfo

BrokerPartitionInfo -> topicPartitionInfo -> TopicMetadata -> PartitionMetadata

BrokerPartitionInfo的getBrokerPartitionInfo会根据topic名称获取对应的BrokerPartition列表.
对于客户端而言只关心这个Partition的Leader副本,所以返回的是PartitionAndLeader列表.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class BrokerPartitionInfo(producerConfig: ProducerConfig, producerPool: ProducerPool, topicPartitionInfo: HashMap[String, TopicMetadata]) {
def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {
val topicMetadata = topicPartitionInfo.get(topic) // check if the cache has metadata for this topic
val metadata: TopicMetadata = topicMetadata match {
case Some(m) => m
case None => // refresh the topic metadata cache
updateInfo(Set(topic), correlationId)
val topicMetadata = topicPartitionInfo.get(topic)
topicMetadata match {
case Some(m) => m
case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)
}
}
// 一个TopicMetadata会有多个PartitionMetadata,每个PartitionMetadata是Partition的元数据
val partitionMetadata = metadata.partitionsMetadata
partitionMetadata.map { m =>
m.leader match {
case Some(leader) => new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
case None => new PartitionAndLeader(topic, m.partitionId, None)
}
}.sortWith((s, t) => s.partitionId < t.partitionId) // 按照partitionId排序
}
}

在java版本中Cluster保存了broker-topic-partitions等的关系,PartitionInfo表示一个分区(有Leader,ISR等)
每条消息都要根据Partitioner为它选择一个PartitionInfo,然后得到这个Partition的Leader Broker.根据Leader分组.
PartitionAndLeader类似PartitionInfo,有一个可选的LeaderBrokerId,但是没有isr,replicas等信息.

1
case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])

updateInfo会更新每个Topic的元数据TopicMetadata. 更新动作也相当于向Kafka发送一种Producer请求(fetch request).
客户端发送TopicMetadata的fetch请求后,会收到topicMetadata的Response响应,最后放到topicPartitionInfo map中.

1
2
3
4
5
6
7
def updateInfo(topics: Set[String], correlationId: Int) {
var topicsMetadata: Seq[TopicMetadata] = Nil
val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
topicsMetadata = topicMetadataResponse.topicsMetadata
topicsMetadata.foreach(tmd => if(tmd.errorCode == Errors.NONE.code) topicPartitionInfo.put(tmd.topic, tmd))
producerPool.updateProducer(topicsMetadata)
}

这里更新之后的TopicMetadata还会返回给ProducerPool. 而Producer会从ProducerPool中获取可用的生产者实例服务于生产请求.

TopicMetadata是一个Topic的元数据.一个topic有多个Partitions,所以一个TopicMetadata对应多个PartitionMetadata.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object TopicMetadata {
val NoLeaderNodeId = -1
def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): TopicMetadata = {
val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
val topic = readShortString(buffer)
val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
val partitionsMetadata: Array[PartitionMetadata] = new Array[PartitionMetadata](numPartitions)
for(i <- 0 until numPartitions) {
val partitionMetadata = PartitionMetadata.readFrom(buffer, brokers)
partitionsMetadata(i) = partitionMetadata
}
new TopicMetadata(topic, partitionsMetadata, errorCode)
}
}
case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = Errors.NONE.code)

PartitionMetadata就等于PartitionAndLeader的元数据,包含isr,replicas等(类似于java版本的PartitionInfo).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
object PartitionMetadata {
def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): PartitionMetadata = {
val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
val leaderId = buffer.getInt
val leader = brokers.get(leaderId) // brokers是集群所有节点,每个Partition都有一个leaderId,

/* list of all replicas */
val numReplicas = readIntInRange(buffer, "number of all replicas", (0, Int.MaxValue))
val replicaIds = (0 until numReplicas).map(_ => buffer.getInt)
val replicas = replicaIds.map(brokers)

/* list of in-sync replicas */
val numIsr = readIntInRange(buffer, "number of in-sync replicas", (0, Int.MaxValue))
val isrIds = (0 until numIsr).map(_ => buffer.getInt)
val isr = isrIds.map(brokers)

new PartitionMetadata(partitionId, leader, replicas, isr, errorCode)
}
}
case class PartitionMetadata(partitionId: Int, leader: Option[BrokerEndPoint],
replicas: Seq[BrokerEndPoint], isr: Seq[BrokerEndPoint] = Seq.empty, errorCode: Short = Errors.NONE.code)

Partition

1.为消息选择Partition

上一步获取每条消息所属的topic对应的PartitionAndLeader列表. PartitionAndLeader是这个topic的所有Partition.
但这些Partition并不一定都有Leader,所以PartitionAndLeader的leaderBrokerIdOpt是可选的.即还不确定有没有Leader.
为消息选择Partition一定是要选择有Leader的Partition.如果消息没有key则使用缓存,相同topic的消息都分配同一个partitionId.

k_getPartition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
topicPartitionsList
}

private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
val numPartitions = topicPartitionList.size
// 如果消息中没有key, 则从sendPartitionPerTopicCache中为这个topic的消息指定partitionId.
val partition = if(key == null) {
val id = sendPartitionPerTopicCache.get(topic)
id match {
case Some(partitionId) => partitionId
case None =>
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) // 存在Leader的partitions, 类似于Cluster中的availablePartitionsByTopic
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 随机选择一个partition
val partitionId = availablePartitions(index).partitionId // 对应的Partition的partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 放入cache中
partitionId // 对于没有key的消息,所有topic都放到了同一个partition里??
}
} else partitioner.partition(key, numPartitions) // 使用Partitioner对key指定partitionId
partition
}

2.根据Broker-Partition重新组织消息集

由于生产者的消息集messages可能没有区分topic. 对每条消息选择所属的Partition,要重新按照Broker组织数据.
通过将乱序的消息按照BrokerId进行分组,这样可以将属于某个Broker的消息一次性发送过去.Int就是BrokerId/NodeId.
对于某个Broker的消息,也要分成不同的TopicPartition,最终每个Partition都会分到一批消息.

1
2
3
[Map[Int,                               ⬅️ BrokerId
Map[TopicAndPartition, ⬅️ Partition
Seq[KeyedMessage[K,Message]]] ⬅️ 属于这个Partition的消息集

消息最终会追加到Partition的消息集里. 有多层Map,根据key获取集合,如果不存在,则新建并放入Map;如果存在则直接使用.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
for (message <- messages) {
//一个topic有多个partition
val topicPartitionsList = getPartitionListForTopic(message)
//一条消息只会写到一个partition, 根据Partitioner会分到一个partition编号
val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
//一个partition因为有副本,所以有多个broker,但是写的时候只写到Leader
val brokerPartition = topicPartitionsList(partitionIndex)
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)

// 每个Broker的data. ret里有嵌套的Map[Int, Map[Partition,Seq]].
var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
ret.get(leaderBrokerId) match {
case Some(element) => // Broker存在里层的Map,直接使用这个Map
dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
case None => // Broker不存在里层Map, 创建一个新的Map, 并放入ret里
dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
ret.put(leaderBrokerId, dataPerBroker)
}
// 构造Topic和Partition对象
val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
// Broker对应的消息集合, 即使是相同的Broker, Topic-Partition组合也不一定一样
var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
dataPerBroker.get(topicAndPartition) match {
case Some(element) => // Partition对应的消息集又是一个Seq,如果存在,直接使用
dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
case None => // Partition对应的消息集还不存在,创建一个列表, 并放入这个Broker Map里
dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
}
// 到这里,才是真正将消息添加到集合中
dataPerTopicPartition.append(message)
}
Some(ret)
}

3.消息分组

上面返回的结构包括了每个Broker的消息集,在实际处理时,会针对每个Broker的消息进一步分组.
groupMessagesToSet关于消息的输入和输出类型由KeyedMessage转为了ByteBufferMessageSet

1
2
3
4
5
6
7
8
9
10
11
12
13
private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = {
val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
// KeyedMessage包括了Key,Value, 其中message就是value原始数据
val rawMessages = messages.map(_.message)
// 输入是个Map(用case元组匹配),返回值也是元组,也会转成Map: key没有变化,value将Seq[Message]转成了MessageSet
(topicAndPartition, config.compressionCodec match {
case NoCompressionCodec => new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
case _ => new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
}
)
}
Some(messagesPerTopicPartition)
}

在有了上面这些基础后,我们来看DefaultEventHandler如何处理一批不同topic的消息集.

DefaultEventHandler

事件处理器首先序列化,然后通过dispatchSerializedData发送消息,这里还带了重试发送功能.

1
2
3
4
5
6
7
8
9
10
11
def handle(events: Seq[KeyedMessage[K,V]]) {
val serializedData = serialize(events) // ① 序列化事件
var outstandingProduceRequests = serializedData // 未完成的请求
var remainingRetries = config.messageSendMaxRetries + 1
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) // ② 分发数据
if (outstandingProduceRequests.size > 0) { // 有返回值表示出错的,未完成的,继续重试
remainingRetries -= 1 // 重试次数减1
}
}
}

在发送消息过程中,只要有失败的消息就加入到failedProduceRequests,这样返回的集合不为空,就会重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
val partitionedDataOpt = partitionAndCollate(messages) // ① BrokerId -> (TopicAndPartition -> Seq[KeyedMessage])
partitionedDataOpt match {
case Some(partitionedData) =>
val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
for ((brokerid, messagesPerBrokerMap) <- partitionedData) { // ② 每个BrokerId都有Partition->Messages的Map
val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap) // ③ 消息分组,如果有压缩,则对value压缩
messageSetPerBrokerOpt match {
case Some(messageSetPerBroker) =>
val failedTopicPartitions = send(brokerid, messageSetPerBroker) // ④ 发送消息集,返回值表示发送失败的Partitions
failedTopicPartitions.foreach(topicPartition => {
messagesPerBrokerMap.get(topicPartition) match { // 在全集中找出属于这个Partition的消息
case Some(data) => failedProduceRequests.appendAll(data) // 添加到failed列表中,在重试时会使用failed继续发送
case None => // nothing, 所有的消息都发送成功
}
})
case None => messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m)) // failed to group messages
}
}
failedProduceRequests
case None => messages // failed to collate messages
}
}

发送消息,从生产者池中获取SyncProducer(每个Broker一个Producer),将消息集封装到ProducerRequest,调用Producer.send发送请求

java版本的发送请求是创建ProduceRequest-RequestSend-ClientRequest.并交给Sender-NetworkClient处理.

1
2
3
4
5
6
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
val currentCorrelationId = correlationId.getAndIncrement
val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks, config.requestTimeoutMs, messagesPerTopic)
val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest)
}

发送消息,只需要指定brokerId,以及消息内容(TopicPartition->MessageSet),同时指定本请求是否需要ack和超时时间.

1
2
case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, correlationId: Int, clientId: String, 
requiredAcks: Short, ackTimeoutMs: Int, data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])

ProducerPool

每个Broker都有一个SyncProducer,因为同步的Producer一次只会有一个请求发生在Broker上.
如果请求没有结束会一直阻塞的,其他请求没机会执行,所以没有必要一个Broker有多个Producer实例.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class ProducerPool(val config: ProducerConfig) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer]
private val lock = new Object()

def updateProducer(topicMetadata: Seq[TopicMetadata]) {
val newBrokers = new collection.mutable.HashSet[BrokerEndPoint]
// 首先根据topicMetadata找出所有的Leader Broker.
topicMetadata.foreach(tmd => {
tmd.partitionsMetadata.foreach(pmd => {
if(pmd.leader.isDefined) {
newBrokers += pmd.leader.get
}
})
})
// 每个Broker都创建一个同步类型的SyncProducer,并放入缓存中,等待getProducer获取
lock synchronized {
newBrokers.foreach(b => {
if(syncProducers.contains(b.id)){
syncProducers(b.id).close()
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
} else
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
})
}
}

def getProducer(brokerId: Int) : SyncProducer = {
lock.synchronized {
val producer = syncProducers.get(brokerId)
producer match {
case Some(p) => p
case None => throw new UnavailableProducerException("Sync producer for broker id %d does not exist".format(brokerId))
}
}
}
}

创建SyncProducer,需要指定Broker的地址,因为这个Producer会负责和Broker通信,消息通过Producer发送到Broker.

1
2
3
4
5
6
7
8
9
10
object ProducerPool {
// Used in ProducerPool to initiate a SyncProducer connection with a broker.
def createSyncProducer(config: ProducerConfig, broker: BrokerEndPoint): SyncProducer = {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
props.putAll(config.props.props)
new SyncProducer(new SyncProducerConfig(props))
}
}

SyncProducer

如果请求需要ack,则需要返回ProducerResponse给客户端(返回的消息内容放在response的payload字节缓冲区中).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.sendBufferBytes, config.requestTimeoutMs)

def send(producerRequest: ProducerRequest): ProducerResponse = {
val readResponse = if(producerRequest.requiredAcks == 0) false else true
var response: NetworkReceive = doSend(producerRequest, needAcks)
if(readResponse) ProducerResponse.readFrom(response.payload)
else null // 如果生产请求需要响应(等待Leader响应, 或者除了Leader,ISR也要响应),从response响应读取内容设置到ProducerResponse对象中.否则直接返回
}
private def doSend(request: RequestOrResponse, readResponse: Boolean = true): NetworkReceive = {
verifyRequest(request)
getOrMakeConnection() // 创建连接,如果已经有连接,则直接使用已有的连接通道发送数据
var response: NetworkReceive = null // 准备一个NetworkReceive,如果有响应,会读取服务端的响应到这里
blockingChannel.send(request) // 向阻塞类型的连接通道发送请求
if(readResponse) response = blockingChannel.receive() // 从阻塞类型的连接通道读取响应
response
}

这里还有一个send方法,不过发送的是TopicMetadata请求,这个请求一定是有响应的.

1
2
3
4
def send(request: TopicMetadataRequest): TopicMetadataResponse = {
val response = doSend(request)
TopicMetadataResponse.readFrom(response.payload)
}

ProducerResponse和TopicMetadataResponse的readFrom参数都是ByteBuffer,类似于反序列化.

发生在BlockingChannel的读写操作,前提是先建立连接,所以在doSend之前会getOrMakeConnection.
注意: 由于一个Broker只有一个SyncProducer,一个Producer也就只有一个BlockingChannel.

1
2
3
4
5
6
7
8
9
10
private def getOrMakeConnection() {
if(!blockingChannel.isConnected) connect()
}

private def connect(): BlockingChannel = {
if (!blockingChannel.isConnected && !shutdown) {
blockingChannel.connect()
}
blockingChannel
}

BlockingChannel

实际的发送请求是交给BlockingChannel,它实现了I/O中的连接connect,发送请求send,接收响应receive
从它的名字看出这是一个阻塞类型的Channel,所以并没有用到NIO的多路选择特性,难怪这是Old的设计.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class BlockingChannel( val host: String,  val port: Int,  val readBufferSize: Int,  val writeBufferSize: Int,  val readTimeoutMs: Int ) extends Logging {
private var connected = false
private var channel: SocketChannel = null
private var readChannel: ReadableByteChannel = null
private var writeChannel: GatheringByteChannel = null
private val lock = new Object()
private val connectTimeoutMs = readTimeoutMs
private var connectionId: String = ""

def connect() = lock synchronized {
if(!connected) {
channel = SocketChannel.open()
if(readBufferSize > 0) channel.socket.setReceiveBufferSize(readBufferSize)
if(writeBufferSize > 0) channel.socket.setSendBufferSize(writeBufferSize)
channel.configureBlocking(true)
channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)

writeChannel = channel
readChannel = Channels.newChannel(channel.socket().getInputStream)
connected = true
connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
}
}
}

KafkaProducer构造的请求是从ProduceRequest到RequestSend最后形成ClientRequest中.这里把ProduceRequest
(是一种RequestOrResponse)封装到RequestOrResponseSend,他们都是ByteBufferSend的子类.

k_send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def send(request: RequestOrResponse): Long = {
val send = new RequestOrResponseSend(connectionId, request)
send.writeCompletely(writeChannel)
}

def receive(): NetworkReceive = {
val response = readCompletely(readChannel)
response.payload().rewind() //读取到响应的ByteBuffer,回到缓冲区的最开始,便于读取
response //返回响应, 如果客户端需要ack,则直接使用response.payload即可
}
private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
val response = new NetworkReceive
while (!response.complete())
response.readFromReadableChannel(channel)
response
}

SyncProducer是阻塞类型的,所以并没有像java版本的NetworkClient使用Selector非阻塞异步模式.


文章目录
  1. 1. Scala OldProducer
    1. 1.1. ProducerSendThread
    2. 1.2. BrokerPartitionInfo
    3. 1.3. Partition
    4. 1.4. DefaultEventHandler
    5. 1.5. ProducerPool
    6. 1.6. SyncProducer
    7. 1.7. BlockingChannel