Kafka Streams中文翻译

Confluent Kafka Streams Documentation 中文翻译: http://docs.confluent.io/3.0.1/streams/introduction.html

介绍

Kafka Streams

Kafka Streams, a component of open source Apache Kafka, is a powerful, easy-to-use library for building highly scalable, fault-tolerant, distributed stream processing applications on top of Apache Kafka. It builds upon important concepts for stream processing such as properly distinguishing between event-time and processing-time, handling of late-arriving data, and efficient management of application state.

Kafka Streams是构建在Apache Kafka的一个组件,它是一个功能强大的、对于构建高可用、故障容错、分布式流处理应用程序都很容易使用的库。它构建在流处理的重要概念之上,比如正确地区分事件时间(event-time)和处理时间(process-time),处理延时数据,高效的应用程序状态管理。

One of the mantras(祷文) of Kafka Streams is to “Build apps, not clusters!”, which means to bring stream processing out of the Big Data niche into the world of mainstream application development. Using the Kafka Streams library you can implement standard Java applications to solve your stream processing needs – whether at small or at large scale – and then run these applications on client machines at the perimeter(边界) of your Kafka cluster. Deployment-wise you are free to chose from any technology that can deploy Java applications, including but not limited to Puppet, Chef, Ansible, Docker, Mesos, YARN, Kubernetes, and so on. This lightweight and integrative(综合) approach of Kafka Streams is in stark(完全、突出) contrast(对比) to other stream processing tools that require you to install and operate separate stream processing clusters and similar heavy-weight infrastructure that come with their own special set of rules on how to use and interact with them.

Kafka Streams的一个思想是“构建应用程序,不要集群”,这意味着将流处理从大数据生态圈中解放出来,而专注于主流的应用程序开发。使用Kafka Streams客户端库,你可以用标准的Java应用程序(main方法)来解决你的流处理需求(不管是小规模还是大规模的数据),然后可以在你的Kafka集群之外的客户端机器执行这些应用程序。你可以选择任何可以部署Java应用的技术来部署Kafka Streams,包括但不限于Puppet、Chef、Ansible、Docker、Mesos、YARN、Kubernetes等等。Kafka Streams的轻量级以及综合能力使得它和其他流处理工具形成了鲜明的对比,后者需要你单独安装并维护一个流处理集群,需要依赖重量级的基础架构设施。

The following list highlights several key capabilities and aspects of Kafka Streams that make it a compelling(引人注目) choice for use cases such as stream processing applications, event-driven systems, continuous queries and transformations, reactive applications, and microservices.

下面列出了Kafka Streams的几个重要的功能,对于这些用例都是个吸引人的选择:流处理应用程序、事件驱动系统、持续查询和转换、响应式应用程序、微服务。

Powerful功能强大

  • Highly scalable, elastic, fault-tolerant 高可用、可扩展性、故障容错
  • Stateful and stateless processing 有状态和无状态的处理
  • Event-time processing with windowing, joins, aggregations 针对事件的窗口函数、联合操作、聚合操作

Lightweight轻量级

  • No dedicated cluster required 不需要专用的集群
  • No external dependencies 不需要外部的依赖
  • “It’s a library, not a framework.” 它是一个客户端库,不是一个框架

Fully integrated完全完整的

  • 100% compatible with Kafka 0.10.0.x 和Kafka完全兼容
  • Easy to integrate into existing applications 和已有应用程序容易集成
  • No artificial rules for deploying applications 对部署方式没有严格的规则限制

Real-time实时的

  • Millisecond processing latency 微秒级别的处理延迟
  • Does not micro-batch messages 不是micro-batch处理
  • Windowing with out-of-order data 对无序数据的窗口操作
  • Allows for arrival of late data 允许延迟的数据

A closer look

Before we dive into the details such as the concepts and architecture of Kafka Streams or getting our feet wet by following the Kafka Streams quickstart guide, let us provide more context to the previous list of capabilities.

在深入研究Kafka Streams的概念和架构细节之前,你应该先看下快速指南,现在我们为上面的那些特点提供更多的上下文信息(背景知识)。

1.Stream Processing Made Simple: Designed as a lightweight library in Apache Kafka, much like the Kafka producer and consumer client libraries. You can easily embed and integrate Kafka Streams into your own applications, which is a significant departure from framework-based stream processing tools that dictate many requirements upon you such as how you must package and “submit” processing jobs to their cluster.

流处理更加简单:被设计为一个轻量级的库,就像Kafka的生产者和消费者客户端库一样。你可以很方便地将Kafka Streams集成到你自己的应用程序中,这是和其他基于框架的流处理工具的主要区别,它们对你会有很多要求,比如你必须打包然后把流处理作业提交到集群上执行。

Has no external dependencies on systems other than Apache Kafka and can be used in any Java application. Read: You do not need to deploy and operate a separate cluster for your stream processing needs. Your Operations and Info Sec teams, among others, will surely be happy to hear this.

在应用程序中除了Apache Kafka之外没有别的依赖:你不需要为你的流处理需求部署或维护一个单独的集群。你们的运维和安全团队肯定听到这个信息肯定很happy吧。

2.Leverages Kafka as its internal messaging layer instead of (re)implementing a custom messaging layer like many other stream processing tools. Notably, Kafka Streams uses Kafka’s partitioning model to horizontally scale processing while maintaining strong ordering guarantees. This ensures high performance, scalability, and operational simplicity for production environments. A key benefit of this design decision is that you do not have to understand and tune two different messaging layers – one for moving data streams at scale (Kafka) plus a separate one for your stream processing tool. Similarly, any performance and reliability improvements of Kafka will automatically be available to Kafka Streams, too, thus tapping into the momentum of Kafka’s strong developer community.

利用Kafka作为它的内部消息层而不像其他流处理工具一样重新造轮子。特别是,Kafka Streams使用Kafka的分区模型在维护强一致性的同时也具备了线性的处理能力,这种设计的优点是:你不需要理解或者调整两种消息模型(一种是线性地移动数据流,另外一种是流处理的消息)。同样,任何针对Kafka的性能和可靠性的提升,Kafka Streams都会自动具备,这也促使了Kafka开发者社区的动力。

3.Is agnostic(不可知论) to resource management and configuration tools, so it integrates much more seamlessly(无缝) into the existing development, packaging, deployment, and operational practices of your organization. You are free to use your favorite tools such as Java application servers, Puppet, Ansible, Mesos, YARN, Docker – or even to run your application manually on a single machine for proof-of-concept scenarios.

Kafka Streams不需要依赖资源管理和配置工具,所以它可以和已有的开发环境、打包、部署等工具无缝集成。可以运行在Java应用服务器,甚至在单机环境下做原型验证(POC)。

4.Supports fault-tolerant local state, which enables very fast and efficient stateful operations like joins and windowed aggregations. Local state is replicated to Kafka so that, in case of a machine failure, another machine can automatically restore the local state and resume the processing from the point of failure.

支持本地状态的故障容错,这使得有状态的操作(比如联合、窗口聚合)更快速和高效。由于本地状态本身通过Kafka进行复制,所以当一个机器宕机时,其他机器可以自动恢复本地状态,并且从故障出错的那个点继续处理。

5.Employs one-record-at-a-time processing to achieve low processing latency, which is crucial(重要,决定性) for a variety of use cases such as fraud detection. This makes Kafka Streams different from micro-batch based stream processing tools.

一次处理一条记录的流处理模型,所以处理延迟很低,对于像欺诈检测等场景来说非常重要。这也是Kafka Streams有别于基于micro-batch的流处理工具的区别。

Furthermore, Kafka Streams has a strong focus on usability(可用性) and a great developer experience. It offers all the necessary stream processing primitives to allow applications to read data from Kafka as streams, process the data, and then either write the resulting data back to Kafka or send the final output to an external system. Developers can choose between a high-level DSL with commonly used operations like filter, map, join, as well as a low-level API for developers who need maximum control and flexibility.

另外,Kafka Streams对开发者是易用和友好的。它提供了所有必要的流处理算子,允许应用程序将从Kafka读取出来的数据作为一个流,然后处理数据,最后可以将处理结果写回到Kafka或者发送给外部系统。开发者可以使用高级DSL(提供很多常用的操作比如filter、map、join)或者低级API两种方式(当需要更好地控制和灵活性时)。

Finally, Kafka Streams helps with scaling developers, too – yes, the human side – because it has a low barrier(接线) to entry and a smooth path to scale from development to production: You can quickly write and run a small-scale proof-of-concept on a single machine because you don’t need to install or understand a distributed stream processing cluster; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently(透明地) handles the load balancing of multiple instances of the same application by leveraging Kafka’s parallelism model.

最后,Kafka Streams对于开发者也是扩展的。是的,从程序员的视角来看的话,它从开发环境到生产环境几乎没有界线:你可以在一台机器上运行一个很小批量的POC,因为你不需要安装或者理解一个分布式的流处理集群是怎么样(不需要知道程序在分布式环境下会有什么不同);在多台机器上时,你只需要多运行几个应用程序实例就可以扩展到大规模的生产负载(生产环境下负载很高,只需多启动几个新的实例)。Kafka Streams会利用Kafka的并行模型透明底在相同应用程序多个实例之间处理负载均衡。

In summary, Kafka Streams is a compelling choice for building stream processing applications. Give it a try and run your first Hello World Streams application! The next sections in this documentation will get you started.

总之Kafka Streams对于构建流处理应用程序是一个非常不错的选择。快来运行一个Hello World的流处理应用吧

快速开始

本节目标

The goal of this quickstart guide is to provide you with a first hands-on look at Kafka Streams. We will demonstrate how to run your first Java application that uses the Kafka Streams library by showcasing a simple end-to-end data pipeline powered by Kafka.

It is worth noting that this quickstart will only scratch the surface of Kafka Streams. More details are provided in the remainder of the Kafka Streams documentation, and we will include pointers throughout the quickstart to give you directions.

本节的目标是让你亲自看看Kafka Streams是如何实现的。我们会向你展示使用Kafka完成的一个端到端的数据流管道,以及运行你的第一个使用Kafka库的Java应用程序。注意这里仅仅会涉及到Kafka Streams的表层,后续的部分会深入一些细节。

我们要做什么

下面是使用Java8实现的WordCount示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic ""streams-file-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");

KStream<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words. The text lines are the message
// values, i.e. we can ignore whatever data is in the message keys and thus invoke
// `flatMapValues` instead of the more generic `flatMap`.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// We will subsequently invoke `countByKey` to count the occurrences of words, so we use
// `map` to ensure the words are available as message keys, too.
.map((key, value) -> new KeyValue<>(value, value))
// Count the occurrences of each word (message key).
// This will change the stream type from `KStream<String, String>` to
// `KTable<String, Long>` (word -> count), hence we must provide serdes for `String` and `Long`.
.countByKey(stringSerde, "Counts")
// Convert the `KTable<String, Long>` into a `KStream<String, Long>`.
.toStream();

// Write the `KStream<String, Long>` to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");

然后,我们会执行如下步骤来完成第一个流应用程序:

  1. 在一台机器上启动一个Kafka集群
  2. 使用Kafka内置的控制台生产者模拟往一个Kafka主题中写入一些示例数据
  3. 使用Kafka Streams库处理输入的数据,处理程序就是上面的wordcount示例
  4. 使用Kafka内置的控制台消费者检查应用程序的输出
  5. 停止Kafka集群
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
wget http://packages.confluent.io/archive/3.0.1/confluent-3.0.1-2.11.zip
unzip confluent-3.0.1-2.11.zip
cd confluent-3.0.1/
bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties

bin/kafka-topics --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt

cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-file-input

bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo

Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed “all” the input data. This is a typical difference between the class of algorithms that operate on unbounded streams of data and, say, batch processing algorithms such as Hadoop MapReduce. It will be easier to understand this difference once we inspect the actual output data later on.

WordCount程序会计算输入单词出现次数的直方图,和之前看到的其他应用程序在有界数据集上不同的是,本例是在一个无限的、无界的数据流上操作。和有界操作相同的是,它也是一个有状态的算法(跟踪和更新单词的次数)。不过,由于它必须假设无限的输入数据,它会定时地输出当前状态和结果,并且持续地处理更多的数据,因为它不知道什么时候它已经处理完了所有的输入数据。这和在有界流数据上的算法是不同的比如Hadoop的MapReduce。在我们检查了实际的输出结果后,你就会更加容易地理解这里的不同点。

The WordCount demo application will read from the input topic streams-file-input, perform the computations of the WordCount algorithm on the input data, and continuously write its current results to the output topic streams-wordcount-output (the names of its input and output topics are hardcoded). The demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically.

这个WordCount示例会从Kafka的输入主题“streams-file-input”中读取数据,在输入数据上执行WorldCount算法,并且持续地将当前结果写入到输出主题“streams-wordcount-output”。不过和其他流处理程序不同的是,这里为了实验,仅仅运行几秒钟后就会退出,通常实际运行的流应用程序是永远不会停止的。

1
2
3
4
5
6
7
bin/kafka-console-consumer --zookeeper localhost:2181 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

打印信息如下,这里第一列是Kafka消息的键(字符串格式),第二列是消息的值(Long类型)。

1
2
3
4
5
6
7
8
9
10
11
all     1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1

等等,输出结果看起来有点奇怪,为什么有重复的条目比如”streams”出现了两次,”kafka”出现了三次,难道不应该是下面这样的吗:

1
2
3
4
5
6
7
8
9
#为什么不是这样,你可能会有疑问
all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1

The explanation is that the output of the WordCount application is actually a continuous stream of updates, where each data record (i.e. each line in the original output above) is an updated count of a single word aka record key such as “kafka”. For multiple records with the same key, each later record is an update of the previous one.

合理的解释是:WordCount应用程序的输出实际上是一个持续不断的”更新流”,每条数据记录(上面示例中每一行的输出结果)都是一个单词(记录的key,比如”kafka”)的更新次数。对于相同key的多条记录,后面的记录都是对前面记录的更新。

The two diagrams below illustrate what is essentially(本质) happening behind the scenes. The first column shows the evolution of the current state of the KTable<String, Long> that is counting word occurrences for countByKey. The second column shows the change records that result from state updates to the KTable and that eventually, once converted to a KStream

下面的两幅图展示了发生在背后的本质,第一列表示KTable<String, Long>的当前状态的进化,通过countByKey计算单词的出现次数。第二列的结果显示了从状态改变到KTable的变更记录,最终被转换为一个KStream。

First the text line “all streams lead to kafka” is being processed. The KTable is being built up as each new word results in a new table entry (highlighted with a green background), and a corresponding change record is sent to the downstream KStream.

When the second text line “hello kafka streams” is processed, we observe, for the first time, that existing entries in the KTable are being updated (here: for the words “kafka” and for “streams”). And again, change records are being sent to the KStream.

当第一次处理文本行“all streams lead to kafka”时,KTable会在每个表的条目中构建一个新的单词结果(绿色高亮),并且把对应的变更记录发送给下游的KStream
当处理第二个文本行“hello kafka streams”时,我们注意到,和第一次不同的是,存在于KTable的条目会被更新(比如这里的”kafka”和”streams”),并且同样的,变更记录也会被发送到KStream。

10-1 ktable kstream

And so on (we skip the illustration of how the third line is being processed). This explains why the output topic has the contents we showed above, because it contains the full record of changes, i.e. the information shown in the second column for KStream above:

第三行的处理也是类似的,这里就不再累述。这就解释了为什么上面输出的主题内容是我们看到的那样,因为它包含了所有完整的变更记录,即上面第二列KStream的内容:

1
2
3
4
5
6
7
8
9
10
11
all     1
streams 1
lead 1
to 1
kafka 1 <- first line end
hello 1
kafka 2
streams 2 <- second line end
join 1
kafka 3
summit 1 <- third line end

为什么不输出KTable,这是因为KTable每次处理一条记录,都会发送变更记录给下游的KStream,即KTable每次处理一条记录,产生一条变更记录。而KTable本身是有状态的,可以看到在处理第一个单词时,KTable有一条记录,在处理第二个不同的单词时,KTable有两条记录,这个状态是一直保存的,如果说把KTable作为输出,那么就会有重复的问题,比如下面这样的输出肯定不是我们希望看到的:

1
2
3
4
5
6
7
all     1  <-处理第一个单词后的KTable
all 1
streams 1 <-处理第二个单词后的KTable
all 1
streams 1
lead 1 <-处理第三个单词后的KTable
...

Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to leverage the duality(二元性,对偶) between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream): you can publish every change of the table to a stream, and if you consume the entire changelog stream from beginning to end, you can reconstruct the contents of the table.

Kafka Streams这里做的工作利用了一张表和一个变更流的二元性(表指的是KTable,变更流指的是下游的KStream):你可以将表的每个变更记录发布给一个流,如果你从整个变更流的最开始消费到最后,你就可以重新构造出表的内容。

概念

Kafka 101

Kafka Streams is, by deliberate(深思熟虑) design, tightly integrated with Apache Kafka: it uses Kafka as its internal messaging layer. As such it is important to familiarize yourself with the key concepts of Kafka, too, notably the sections 1. Getting Started and 4. Design in the Kafka documentation. In particular you should understand:

Kafka Streams是经过深思熟虑的设计,它和Apache Kafka仅仅地集成:它使用Kafka作为内部的消息层。所以理解Kafka的关键概念非常重要,如果不熟悉,可以看Kafka的文档。

  • The who’s who: Kafka distinguishes producers, consumers, and brokers. In short, producers publish data to Kafka brokers, and consumers read published data from Kafka brokers. Producers and consumers are totally decoupled. A Kafka cluster consists of one or more brokers.
  • The data: Data is stored in topics. The topic is the most important abstraction provided by Kafka: it is a category or feed name to which data is published by producers. Every topic in Kafka is split into one or more partitions, which are replicated across Kafka brokers for fault tolerance.
  • Parallelism: Partitions of Kafka topics, and especially their number for a given topic, are also the main factor that determines the parallelism of Kafka with regards to reading and writing data. Because of their tight integration the parallelism of Kafka Streams is heavily influenced by and depending on Kafka’s parallelism.
  1. Kafka分成生产者、消费者、Brokers。生产者发布数据给Kafka的Brokers,消费者从Kafka的Brokers读取发布过的数据。生产者和消费者完全解耦。一个Kafka集群包括一个或多个Broekrs节点。
  2. 数据以主题的形式存储。主题是Kafka提供的最重要的一个抽象:它是生产者发布数据的一种分类(相同类型的消息应该发布到相同的主题)。每个主题会分成一个或多个分区,并且为了故障容错,每个分区都会在Kafka的Brokers中进行复制。
  3. Kafka主题的分区数量决定了读取或写入数据的并行度。因为Kafka Streams和Kafka结合的很紧,所以Kafka Streams也依赖于Kafka的并行度。

流、流处理、拓扑、算子

A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set, where unbounded means “of unknown or of unlimited size”. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.

流是Kafka Streams提供的最重要的抽象:它代表了一个无界的、持续更新的数据集。流是一个有序的、可重放的、容错的不可变数据记录序列,其中每个数据记录被定义成一个key-value键值对

stream-record

A stream processing application is any program that makes use of the Kafka Streams library. In practice, this means it is probably “your” Java application. It may define its computational logic through one or more processor topologies (see next section).

流处理应用程序是任何使用了Kafka Streams库进行开发的应用程序,它会通过一个或多个处理拓扑定义计算逻辑。

A processor topology or simply topology defines the computational logic of the data processing that needs to be performed by a stream processing application. A topology is a graph of stream processors (nodes) that are connected by streams (edges). Developers can define topologies either via the low-level Processor API or via the Kafka Streams DSL, which builds on top of the former.

处理拓扑或者叫拓扑定义了流处理应用程序对数据处理的计算逻辑。拓扑是一张由流处理算子和相连接的流组成的DAG图,其中算子是图的节点,流是图的边。开发者可以通过低级的Processor API或者高级的Kafka Streams DSL定义拓扑,其中后者实际上是构建在前者之上的。

A stream processor is a node in the processor topology(as shown in the diagram of section Processor Topology). It represents a processing step in a topology, i.e. it is used to transform data in streams. Standard operations such as map, filter, join, and aggregations are examples of stream processors that are available in Kafka Streams out of the box. A stream processor receives one input record at a time from its upstream processors in the topology, applies its operation to it, and may subsequently produce one or more output records to its downstream processors.

流算子是处理拓扑中的节点,它代表了在拓扑中的处理步骤,比如转换算子会在流中转换数据。标准的算子包括map/filter/join/aggregation,这些都是流算子的示例,并且内置在Kafka Streams中开箱即用。一个流算子从它在拓扑中的上游算子一次接收一条输入记录,将操作运用到记录,并且可能会产生一条或多条输出记录给下游的算子。Kafka Streams提供了两种方式来定义算子:

  • The Kafka Streams DSL provides the most common data transformation operations such as map and filter so you don’t have to implement these stream processors from scratch.
  • The low-level Processor API allows developers to define and connect custom processors as well as to interact with state stores.
  1. Kafka Streams DSL提供了最通用的数据转换操作,比如map、filter,这样你不需要自己实现这些算子
  2. 低级的Processor API,允许开发者定义和连接定制的算子,并且还可以和状态存储交互

时间

A critical aspect in stream processing is the the notion of time, and how it is modeled and integrated. For example, some operations such as Windowing are defined based on time boundaries.

流处理的一个重要概念是时间,如何对时间进行建模和整合非常重要,因为有些操作比如窗口函数会基于时间的边界来定义。有几种类型的时间表示方式:

  • Event-time: The point in time when an event or data record occurred, i.e. was originally created “by the source”. Achieving event-time semantics typically requires embedding timestamps in the data records at the time a data record is being produced. Example: If the event is a geo-location change reported by a GPS sensor in a car, then the associated event-time would be the time when the GPS sensor captured the location change.
  • Processing-time: The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed. The processing-time may be milliseconds, hours, or days etc. later than the original event-time. Example: Imagine an analytics application that reads and processes the geo-location data reported from car sensors to present it to a fleet management dashboard. Here, processing-time in the analytics application might be milliseconds or seconds (e.g. for real-time pipelines based on Apache Kafka and Kafka Streams) or hours (e.g. for batch pipelines based on Apache Hadoop or Apache Spark) after event-time.
  • Ingestion-time: The point in time when an event or data record is stored in a topic partition by a Kafka broker. Ingestion-time is similar to event-time, as a timestamp gets embedded in the data record itself. The difference is, that the timestamp is generated when the record is appended to the target topic by the Kafka broker, not when the record is created “at the source”. Ingestion-time may approximate event-time reasonably well if we assume that the time difference between creation of the record and its ingestion into Kafka is sufficiently small, where “sufficiently” depends on the specific use case. Thus, ingestion-time may be a reasonable alternative for use cases where event-time semantics are not possible, e.g. because the data producers don’t embed timestamps (e.g. older versions of Kafka’s Java producer client) or the producer cannot assign timestamps directly (e.g., it does not have access to a local clock).
  1. 事件时间:事件或数据记录发生的时间点,它是由事件源创建的。实现事件时间语义通常需要在记录中有内置的时间撮字段,表示这条记录在什么时候产生。比如一条事件是车辆传感器报告的地理位置变更,那么对应的事件时间表示GPS传感器捕获位置变更的时间点。
  2. 处理时间:事件被流处理应用程序处理的时间点,比如就被消费的时候,处理时间会比原始的事件时间要晚。举例一个分析应用程序读取并处理车辆上传的地理位置,并且呈现到一个dashboard上。这里分析程序的处理时间可能比事件的时间晚几毫米、几秒、甚至几个小时。
  3. 摄取时间:事件存储到Kafka Brokers的主题分区中的时间点。摄取时间和事件时间类似,它也是作为数据记录本身的一个内置字段,不同的是摄取时间是在追加到Kafka中时自动生成的,而不是数据源创建的时间。如果我们假设记录的创建时间和摄取到Kafka的时间间隔足够短的话,可以认为摄取时间近似于事件时间,当然足够短这个时间跟具体的用例有关。什么场景下采用摄取时间比较合理呢?比如数据源没有内置的事件时间(比如旧版本的Java生产者客户端在消息中不会带有时间撮,新版本则有),或者说生产者不能直接分配时间撮(无法获取到本地时钟)。

The choice between event-time and ingestion-time is actually done through the configuration of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically embedded into Kafka messages. Depending on Kafka’s configuration these timestamps represent event-time or ingestion-time. The respective Kafka configuration setting can be specified on the broker level or per topic. The default timestamp extractor in Kafka Streams will retrieve these embedded timestamps as-is. Hence, the effective time semantics of your application depend on the effective Kafka configuration for these embedded timestamps. See :ref:Developer Guide for detailed information.

选择事件时间还是摄取时间,是通过Kafka的配置文件(不是Kafka Streams的配置),在0.10版本之后,时间撮会自动内嵌到Kafka的消息中。根据Kafka的配置,时间撮可以指定为事件时间还是摄取时间,这个配置可以设置到Broker级别,也可以是每个Topic。默认的Kafka Streams时间撮抽取方式会取出内置的时间撮字段。所以应用程序的有效时间语义依赖于Kafka的内置时间撮。

Kafka Streams assigns a timestamp to every data record via so-called timestamp extractors. These per-record timestamps describe the progress of a stream with regards to time (although records may be out-of-order within the stream) and are leveraged by time-dependent operations such as joins. They are also used to synchronize multiple input streams within the same application.

Kafka Streams会通过时间撮抽取器把一个时间撮分配给每条记录。每条记录的时间撮描述了一条流关于时间的进度(尽管流中的记录可能没有顺序),这个时间撮会被时间相关的操作比如join所使用。同时它们也会被用来在同一个应用程序中多个输入流的同步。

Concrete implementations of timestamp extractors may retrieve or compute timestamps based on the actual contents of data records such as an embedded timestamp field to provide event-time or ingestion-time semantics, or use any other approach such as returning the current wall-clock time at the time of processing, thereby yielding processing-time semantics to stream processing applications. Developers can thus enforce(执行,强制) different notions/semantics of time depending on their business needs.

具体的时间撮抽取器实现,基于数据记录实际内容的时间撮,可能会读取或者计算,比如(数据记录中)提供的事件时间或者摄取时间语义的时间撮字段,或者使用其他的方式,比如返回当前的处理时间,即流处理应用程序的处理时间语义。开发者可以根据他们的业务需求使用不同的时间语义。

Be aware that ingestion-time in Kafka Streams is used slightly different as in other stream processing systems. Ingestion-time could mean the time when a record is fetched by a stream processing application’s source operator. In Kafka Streams, ingestion-time refers to the time when a record was appended to a Kafka topic partition.

注意Kafka Streams的摄取时间可能和其他流处理系统的使用方式有点不同。摄取时间可以表示为被流处理的源算子获取的时间点。而在Kafka Streams中,摄取时间指的是当一条记录被追加到Kafka主题分区的那个时间点(即Producer写入分区的时间)。

有状态的流处理

Some stream processing applications don’t require state, which means the processing of a message is independent from the processing of all other messages. If you only need to transform one message at a time, or filter out messages based on some condition, the topology defined in your stream processing application can be simple.

有些流处理应用程序并不需要状态,这意味着一条消息的处理和其他所有消息的处理都是独立的。如果你只需要在一个时间点转换一条消息,或者基于某些条件对消息进行过滤,你的流计算应用层序的拓扑可以非常简单。

However, being able to maintain state opens up many possibilities for sophisticated(复杂) stream processing applications: you can join input streams, or group and aggregate data records. Many such stateful operators are provided by the Kafka Streams DSL.

不过,对于复杂的流处理应用程序,为了能够维护状态,会有很多可能性:联合不同的输入流,对数据记录进行分组和聚合。Kafka Streams的DSL提供了很多有状态的操作算子。

Streams和Tables的二元性

Before we discuss concepts such as aggregations in Kafka Streams we must first introduce tables, and most importantly the relationship between tables and streams: the so-called stream-table duality. Essentially, this duality means that a stream can be viewed as a table, and vice versa. Kafka’s log compaction feature, for example, exploits(功绩,利用,开发) this duality.

在介绍Kafka Streams的概念之前(比如聚合),我们必须先介绍tables,以及tables和streams的关系(所谓的stream-table二元性)。从本质上来说,二元性意味着一个流可以被看做是一张表,反过来也是成立的。Kafka的日志压缩特性,可以实现这样的二元转换。一张表,简单来说就是一系列的键值对,或者被叫做字典、关联数组。

k-table

stream-table二元性描述了两者的紧密关系:

  • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise(假装), and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy(类比), aggregating data records in a stream – such as computing the total number of pageviews by user from a stream of pageview events – will return a table (here with the key and the value being the user and its corresponding pageview count, respectively).
  • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.
  1. 将流作为表:一个流可以被认为是一张表的变更记录(changelog),流中的每条数据记录捕获了表的每个变更状态。一个流可以假装是一张表,也可以通过从头到尾重放变更日志来重构表,很容易地变成真正的表。同样地,在流中聚合记录(比如从PV事件流中计算用户的PV数)会返回一张表(这里键值分别是用户,以及对应的PV数)。
  2. 将表作为流:一张表可以认为是在某个时间点的一份快照,是流的每个key对应的最近的值。一张表因此也可以假装是一个流,也可以通过迭代表中所有的键值条目转换成一个真实的流。

Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions(修正) of the table – can be represented as a changelog stream (second column).

Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

举例,一张表会跟踪用户的PV总数(左图第一列),当一条新的pageview事件被处理的时候,表的状态会相应地被更新。这里不同时间点的状态变更(针对表的不同修改),可以作为一个变更日志流(左图第二列)。有趣的是,由于stream-table的二元性,相同的流可以被用来构造出原始的表(右图第三列)。

k stream table durable

The same mechanism(机制) is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance. The stream-table duality is such an important concept that Kafka Streams models it explicitly(明确地) via the KStream and KTable interfaces, which we describe in the next sections.

这种类似的机制也被用在其他系统中,比如通过CDC复制数据库。在Kafka Streams中,为了容错处理,会将它的状态存储复制到多台机器上。stream-table的二元性是很重要的概念,Kafka Streams通过KStream和KTable接口对它们进行建模。

KStream(记录流 record stream)

A KStream is an abstraction of a record stream, where each data record represents a self-contained datum(基准,资料) in the unbounded data set. Using the table analogy(类比), data records in a record stream are always interpreted as “inserts” – think: append-only ledger(分类) – because no record replaces an existing row with the same key. Examples are a credit card transaction, a page view event, or a server log entry.

一个KStream是对记录流的抽象,每条数据记录能够表示在无限数据集中自包含的数据。用传统数据库中的表这个概念来类比,记录流中的数据可以理解为“插入”(只有追加),因为不会有记录会替换已有的相同key的行。比如信用卡交易、访问时间、服务端日志条目。举例有两条记录发送到流中:

1
("alice", 1) --> ("alice", 3)  //这两条记录依次发送到流中

If your stream processing application were to sum the values per user, it would return 4 for alice. Why? Because the second data record would not be considered an update of the previous record. Compare this behavior of KStream to KTable below, which would return 3 for alice.

如果你的流处理应用程序是为每个用户求和(记录的value含义不是很明确,但是我们只是要对value值求和),那么alice用户的返回结果是4。因为第二条记录不会被认为是对前一条记录的更新(第一条记录和第二条记录是同时存在的)。如果将其和下面的KTable对比,KTable中alice用户的返回结果是3。

KTable(变更流 changelog stream)

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is considered to be an update of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered a create). Using the table analogy, a data record in a changelog stream is interpreted as an update because any existing row with the same key is overwritten.

一个KTable是对变更日志流的抽象,每条数据记录代表的是一个更新。更准确的说,数据记录中的值被认为是对已有相同记录的key的值更新(如果存在key则更新,如果key不存在,更新操作会被认为是创建)。用传统数据库中的表这个概念来类比,变更流中的数据可以理解为“更新”,因为任何已经存在相同key的行都会被覆盖。

If your stream processing application were to sum the values per user, it would return 3 for alice. Why? Because the second data record would be considered an update of the previous record. Compare this behavior of KTable with the illustration for KStream above, which would return 4 for alice.

还是以上面的两条记录发送到流中为例,如果也是为每个用户求和,那么alice用户的返回结果是3。因为第二条记录会被认为是对前一条记录的更新(那么第一条记录实际上就不存在了)。如果将其和上面的KStream对比,KStream中alice用户的返回结果是4。

Effects of Kafka’s log compaction: Another way of thinking about KStream and KTable is as follows: If you were to store a KTable into a Kafka topic, you’d probably want to enable Kafka’s log compaction feature, e.g. to save storage space.

理解KStream和KTable的另外一种思路是:如果将KTable存储到Kafka主题中,你应该开启Kafka的日志压缩功能。

However, it would not be safe to enable log compaction in the case of a KStream because, as soon as log compaction would begin purging older data records of the same key, it would break the semantics of the data. To pick up the illustration example again, you’d suddenly get a 3 for alice instead of a 4 because log compaction would have removed the (“alice”, 1) data record. Hence log compaction is perfectly safe for a KTable (changelog stream) but it is a mistake for a KStream (record stream).

如果是KStream,开启日志压缩不是一个安全的做法,因为日志压缩会清除相同key的不同数据,这会破坏数据的语义。举例,你可能会突然看到用户alice的结果为3而不是4,因为日志压缩会删除(“alice”, 1)这条记录。所以日志压缩对于KTable是安全的,而对KSteram则是错误的用法。

We have already seen an example of a changelog stream in the section Duality of Streams and Tables. Another example are change data capture (CDC) records in the changelog of a relational database, representing which row in a database table was inserted, updated, or deleted.

在stream-table二元性中,我们已经看到了一个变更日志流的示例。另一个示例是关系型数据库中的CDC变更日志,表示数据库中哪一行执行了插入,更新、删除动作。

KTable also provides an ability to look up current values of data records by keys. This table-lookup functionality is available through join operations (see also Joining Streams in the Developer Guide).

KTable也支持根据记录的key查询当前的value,这种特性会在join操作时使用。

窗口操作

A stream processor may need to divide data records into time buckets, i.e. to window the stream by time. This is usually needed for for join and aggregation operations, etc. Windowed stream buckets can be maintained in the processor’s local state.

一个流算子可能需要将数据记录分成多个时间段,比如对流按照时间做成一个个窗口。通常在联合和聚合操作时需要这么做。在算子的本地状态中会维护窗口流。

Windowing operations are available in the Kafka Streams DSL, where users can specify a retention period for the window. This allows Kafka Streams to retain old window buckets for a period of time in order to wait for the late arrival of records whose timestamps fall within the window interval. If a record arrives after the retention period has passed, the record cannot be processed and is dropped.

窗口算子在Kafka Stream DSL中可以使用,用户可以指定窗口的保留时间。这样允许Kafka Streams会在一段时间内保留旧的窗口段,目的是等待迟来的记录,这些记录的时间撮落在窗口间隔内(虽然不一定是当前窗口,但可能是旧的窗口,如果没有保留旧窗口的话,迟来的记录就会被直接丢弃了,因为当前窗口不能存放旧记录)。如果一条记录在保留时间过去之后才到达,这条记录就不会被处理,只能被丢弃了。

Late-arriving records are always possible in real-time data streams. However, it depends on the effective time semantics how late records are handled. Using processing-time, the semantics are “when the data is being processed”, which means that the notion of late records is not applicable as, by definition, no record can be late. Hence, late-arriving records only really can be considered as such (i.e. as arriving “late”) for event-time or ingestion-time semantics. In both cases, Kafka Streams is able to properly handle late-arriving records.

迟到的记录在实时数据流中总是可能发生的。不过,它取决于记录到底有多晚才被处理的有效时间语义。使用处理时间,语义是“当数据正在被处理”,这就意味着迟来的记录是不适合的,也就是说不会有记录迟到的。所以迟到的记录只能针对事件时间或者摄取时间这两种语义。这两种情况下,Kafka Streams都可以很好地处理迟到的记录。

联合操作

A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely.

联合操作会合并两个流,基于他们的数据记录的keys,并产生一个新的流。在记录流上的join操作通常需要在窗口的基础上执行,否则为了执行join而需要维护的记录数量会无限膨胀(在无限的记录集上无法做join操作,因为你不知道什么时候结束,就无法join,联合操作必须是在有限的记录集上,而窗口正好是有限的记录集)。

The join operations available in the Kafka Streams DSL differ based on which kinds of streams are being joined (e.g. KStream-KStream join versus KStream-KTable join).

Kafka Streams DSL中的join操作跟流的类型有关,比如KStream-KStream进行join,或者KStream-KTable进行join。

聚合操作

An aggregation operation takes one input stream, and yields a new stream by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. An aggregation over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the aggregation may grow indefinitely.

一个聚合操作会接受一个输入流,然后通过合并多条输入记录产生一个新的流,最终生成一个单一的输出记录,聚合算子的示例比如计算次数或求和。和join操作一样,聚合操作也需要在窗口的基础上执行。

In the Kafka Streams DSL, an input stream of an aggregation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the late arrival of further records after the value was produced and emitted. When such late arrival happens, the aggregating KStream or KTable simply emits a new aggregate value. Because the output is a KTable, the new value is considered to overwrite the old value with the same key in subsequent processing steps.

在Kafka Streams DSL中,聚合操作的输入流可以是一个KStream或者是一个KTable,但是输出流只能是一个KTable。这就允许Kafka Streams在value被产生并发送出去之后,即使迟到的记录到来时,也可以更新聚合结果(第一次产生的结果是在当前窗口,然后把结果发送出去,第二次产生的结果已经不在当前窗口,它属于旧的窗口,也会更新对应的聚合结果,然后再把最新的结果发送出去)。当这样的迟到记录到来时,聚合的KStream或者KTable仅仅简单地发送新的聚合结果。由于输出是一个KTable,相同key下,在后续的处理步骤中,新的值会覆盖旧的值。

架构

Kafka Streams simplifies application development by building on the Kafka producer and consumer libraries and leveraging the native capabilities of Kafka to offer data parallelism, distributed coordination, fault tolerance, and operational simplicity. In this section, we describe how Kafka Streams works underneath the covers. Below is Logical view of a Kafka Streams application that contains multiple stream threads, each of which in turn containing multiple stream tasks.

Kafka Streams是构建在Kafka生产者和消费者的库,并且利用了Kafka本身的特性提供了数据的并行、分布式协调、容错,简化了应用程序的开发。下图是Kafka Streams应用程序的逻辑视图,包括了多个流线程,每个线程包括多个流任务。

kstream arch overview

拓扑

A processor topology or simply topology defines the stream processing computational logic for your application, i.e., how input data is transformed into output data. A topology is a graph of stream processors (nodes) that are connected by streams (edges). There are two special processors in the topology:

拓扑定义了流处理应用程序的计算逻辑,比如输入数据怎么转换成输出数据。拓扑是由流处理算子和相连的流组成的一张图。在拓扑中有两种特殊类型的流算子:

  • Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forward them to its down-stream processors.
  • Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.
  1. 源算子:没有任何上游算子,它从一个或多个Kafka主题中消费记录,然后产生一个到拓扑的输入流,并且转发到下游的算子
  2. 目标算子:没有任何的下游算子,它会把从上游算子接收到的任何记录,发送给指定的Kafka主题

kstream topo

A stream processing application – i.e., your application – may define one or more such topologies, though typically it defines only one. Developers can define topologies either via the low-level Processor API or via the Kafka Streams DSL, which builds on top of the former.

A processor topology is merely a logical abstraction for your stream processing code. At runtime, the logical topology is instantiated and replicated inside the application for parallel processing (see Parallelism Model).

一个流处理应用程序可以定义一个或多个拓扑,尽管通常你只会定义一个。开发者可以通过低级的Processor API或者高级的DSL方式定义拓扑。一个拓扑仅仅是流处理代码的逻辑抽象,在运行时,逻辑拓扑会被实例化,并且在应用程序中进行复制以获得并行处理的能力。

并行模型

Stream Partitions and Tasks

Kafka Streams uses the concepts of partitions and tasks as logical units of its parallelism model. There are close links between Kafka Streams and Kafka in the context of parallelism:

Kafka Streams使用分区和任务的概念作为它的并行模型的逻辑单元。Kafka Streams和Kafka在并行这个上下文上有紧密的联系:

  • Each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition.
  • A data record in the stream maps to a Kafka message from that topic.
  • The keys of data records determine the partitioning of data in both Kafka and Kafka Streams, i.e., how data is routed to specific partitions within topics.
  1. 每个分区流完全是一个有序的数据记录序列,映射到Kafka的主题分区
  2. 流中的一条数据记录,对应了Kafka主题中的一条消息
  3. 数据记录的Key决定了它在Kafka和Kafka Streams中的分区方式,比如数据怎么路由到主题的指定分区

An application’s processor topology is scaled by breaking it into multiple tasks. More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application. Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention.

应用程序的处理拓扑会被分成多个任务来进行扩展。更具体来说,Kafka Streams会基于输入流的分区创建固定数量的任务,每个任务会从输入流(Kafka的主题)分配到多个分区。每个任务分配的分区永远不会改变,这样每个任务作为应用程序固定的并行单元。任务可以基于分配给它们的分区实例化它们自己的处理拓扑;它们也会为每个分配的分区维护一个缓冲区,并且从这些记录的缓冲区中一次只处理一条消息。这样的好处是所有的流任务都各自独立地并行处理,并不需要人工干预。

Sub-topologies aka topology sub-graphs: If there are multiple processor topologies specified in a Kafka Streams application, each task will only instantiate one of the topologies for processing. In addition, a single processor topology may be decomposed(分离分解) into independent sub-topologies (sub-graphs) as long as sub-topologies are not connected by any streams in the topology; here, each task may instantiate only one such sub-topology for processing. This further scales out the computational workload to multiple tasks.

子拓扑或者叫拓扑子图:如果在Kafka Streams应用程序中指定了多个处理拓扑,每个任务只会实例化其中的一个拓扑并处理。另外,一个拓扑也可能分成多个独立的子拓扑,只要子拓扑不和拓扑中的任何流存在连接。这里每个任务可能只会实例化一个子拓扑并处理。

kstream task

It is important to understand that Kafka Streams is not a resource manager, but a library that “runs” anywhere its stream processing application runs. Multiple instances of the application are executed either on the same machine, or spread across multiple machines and tasks can be distributed automatically by the library to those running application instances. The assignment of partitions to tasks never changes; if an application instance fails, all its assigned tasks will be restarted on other instances and continue to consume from the same stream partitions.

注意Kafka Streams不是一个资源管理器,而是一个可以在和流处理应用程序一起运行在任何地方的客户端库。你可以在一台机器上运行应用程序的多个实例;或者分散在多台机器上,任务就会自动分布式地运行这些应用程序实例。注意分配给任务的分区永远不会改变(和Kafka消费者有点不同,消费者分配的分区是可以改变的);如果一个应用程序的实例失败了,它的所有任务会在其他实例上重新启动,并且从相同的流分区继续消费。总结下:一个流处理应用程序实例(进程)有多个Task,每个Task分配多个固定的分区,如果进程挂了,其上的所有Task都会在其他进程上执行。而不会说把分区重新分配给剩下的Task。由于Task的分区固定,实际上Task的数量也是固定的,Task会分布式地在多个进程上执行。

Threading Model

Kafka Streams allows the user to configure the number of threads that the library can use to parallelize processing within an application instance. Each thread can execute one or more tasks with their processor topologies independently.

Kafka Streams允许用户配置线程的数量,这样Kafka Streams库可以用来决定在一个应用程序实例中的处理并行粒度。每个线程可以执行一个或多个任务,它们的拓扑也都是独立的。

kstream thread

Starting more stream threads or more instances of the application merely amounts to replicating the topology and having it process a different subset of Kafka partitions, effectively parallelizing processing. It is worth noting that there is no shared state amongst the threads, so no inter-thread coordination is necessary. This makes it very simple to run topologies in parallel across the application instances and threads. The assignment of Kafka topic partitions amongst the various stream threads is transparently handled by Kafka Streams leveraging Kafka’s server-side coordination functionality.

开启更多的流线程或者更多的应用程序实例仅仅相当于复制拓扑,并且处理不同的Kafka分区子集,有效地并行化处理。注意线程之间不会共享状态,所以不需要内部的线程进行协调。这使得在多个应用程序或者线程之间并行地运行拓扑变得非常简单。不同线程分配到的Kafka主题分区会被Kafka Streams透明地处理,利用的是Kafka服务端的协调者特性。

As we described above, scaling your stream processing application with Kafka Streams is easy: you merely need to start additional instances of your application, and Kafka Streams takes care of distributing partitions amongst tasks that run in the application instances. You can start as many threads of the application as there are input Kafka topic partitions so that, across all running instances of an application, every thread (or rather, the tasks it runs) has at least one input partition to process.

正如上面所描述的,使用Kafka Streams扩展你的流处理应用程序非常简单:你只需要为你的应用程序启动额外的实例,然后Kafka Streams就会自动帮你将分区分布在任务之间,任务会运行在应用程序实例中。你可以启动和Kafka的输入主题分区相同数量的应用程序线程,这样在一个应用程序的所有运行实例中,每个线程(更精确地说,是运行的任务)至少都会处理一个输入分区。

Example

To understand the parallelism model that Kafka Streams offers, let’s walk through an example.

Imagine a Kafka Streams application that consumes from two topics, A and B, with each having 3 partitions. If we now start the application on a single machine with the number of threads configured to 2, we end up with two stream threads instance1-thread1 and instance1-thread2. Kafka Streams will break this topology by default into three tasks because the maximum number of partitions across the input topics A and B is max(3, 3) == 3, and then distribute the six input topic partitions evenly across these three tasks; in this case, each task will consume from one partition of each input topic, for a total of two input partitions per task. Finally, these three tasks will be spread evenly – to the extent this is possible – across the two available threads, which in this example means that the first thread will run 2 tasks (consuming from 4 partitions) and the second thread will run 1 task (consuming from 2 partitions).

为了理解Kafka Streams提供的并行度模型,我们来看一个示例。假设有一个Kafka Streams应用程序会消费两个主题:A和B,每个主题都有3个分区。如果我们在一台机器上启动了一个应用程序,配置的线程数量为2,最终我们会有两个流线程:instance1-thread1和instance1-thread2。Kafka Streams会默认将拓扑分成三个任务,因为所有输入主题A和B的最大分区数是max(3,3)=3,然后会将6个输入分区平均分配到这三个任务上。这种情况下,每个任务都会消费每个输入主题的一个分区,即每个任务分配到了总共两个分区。最后,这三个任务会被均匀地分散到两个可用的线程中,这里因为有两个线程,这就意味着第一个线程会运行两个任务(消费了4个分区),第二个线程会运行一个任务(消费了2个分区)。

Now imagine we want to scale out this application later on, perhaps because the data volume has increased significantly. We decide to start running the same application but with only a single thread on another, different machine. A new thread instance2-thread1 will be created, and input partitions will be re-assigned similar to:

现在假设我们要扩展应用程序,可能是因为数据量增长的很明显。我们决定在其他机器上运行相同的应用程序,不过只配置了一个线程。那么一个新的线程instance2-thread1就会被创建,输入分区会被重新分配成下面右图那样。

kstream example

When the re-assignment occurs, some partitions – and hence their corresponding tasks including any local state stores – will be “migrated” from the existing threads to the newly added threads (here: from instance1-thread1 on the first machine to instance2-thread1 on the second machine). As a result, Kafka Streams has effectively rebalanced the workload among instances of the application at the granularity of Kafka topic partitions.

当重新分配发生时,一些分区,以及它们对应的任务,包括本地存储的状态,都会从已有的线程迁移到新添加的线程。比如这里第一台机器的instance1-thread1线程会迁移到第二台机器的instance2-thread1线程。最终,Kafka Streamsh会在所有应用程序实例中有效地平衡负载,而且是以Kafka主题分区的粒度进行负载均衡。

What if we wanted to add even more instances of the same application? We can do so until a certain point, which is when the number of running instances is equal to the number of available input partitions to read from. At this point, before it would make sense to start further application instances, we would first need to increase the number of partitions for topics A and B; otherwise, we would over-provision the application, ending up with idle instances that are waiting for partitions to be assigned to them, which may never happen.

如果想要添加相同应用程序的更多实例呢?我们可以像上面那样做,直到运行实例的数量等于读取的可用分区数量(所有主题)。在这之后,如果想要启动更多的应用程序实例变得有意义,我们需要先为主题A和B增加分区;否则会存在空闲的应用程序实例,它们会等待有可用的分区分配给它们,但这可能永远都不会发生(虽然应用程序的实例比分区多,导致有些应用程序实例是空闲的,但是如果有应用程序挂掉了,那些空闲的应用程序就有可能分配到分区,而不再空闲。就像Kafka的消费者一样,如果消费者数量比分区数要多,空闲的消费者也会得不到分区,但如果有消费者挂掉了,空闲的消费者也是有机会得到分区的。不过我们无法保证空闲的应用程序实例或者消费者就一定有机会得到分区)。

状态

Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations. Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing. These state stores can either be a RocksDB database, an in-memory hash map, or another convenient data structure. Kafka Streams offers fault-tolerance and automatic recovery for local state stores.

Kafka Streams提供了所谓的状态存储,可以被流处理应用程序用来存储和查询数据,这在实现有状态的操作时是一个非常重要的功能。Kafka Streams中的每个任务内置了一个或多个状态存储,并且可以在流处理时通过API的方式存储或者查询状态存储中的数据。这些状态存储可以是RocksDB数据库、内存的hash map、或者其他的数据结构。Kafka Streams为本地状态存储提供了容错和自动恢复机制。

kstream state

容错

Kafka Streams builds on fault-tolerance capabilities integrated natively within Kafka. Kafka partitions are highly available and replicated; so when stream data is persisted to Kafka it is available even if the application fails and needs to re-process it. Tasks in Kafka Streams leverage the fault-tolerance capability offered by the Kafka consumer client to handle failures. If a task runs on a machine that fails, Kafka Streams automatically restarts the task in one of the remaining running instances of the application.

Kafka Streams的容错能力基于原生的Kafka,Kafka的分区是高可用和复制的;所以当流数据持久化到Kafka中,即使应用程序失败了或者需要重新处理,数据也还是可用的。Kafka Streams的任务利用了Kafka消费者客户端提供的容错机制来处理故障。如果运行在一台机器上的一个任务失败了,Kafka Streams会在剩余的应用程序实例选择一个自动重启任务。

In addition, Kafka Streams makes sure that the local state stores are robust to failures, too. It follows a similar approach as Apache Samza and, for each state store, maintains a replicated changelog Kafka topic in which it tracks any state updates. These changelog topics are partitioned as well so that each local state store instance, and hence the task accessing the store, has its own dedicated changelog topic partition. Log compaction is enabled on the changelog topics so that old data can be purged safely to prevent the topics from growing indefinitely. If tasks run on a machine that fails and are restarted on another machine, Kafka Streams guarantees to restore their associated state stores to the content before the failure by replaying the corresponding changelog topics prior to resuming the processing on the newly started tasks. As a result, failure handling is completely transparent to the end user.

另外,Kafka Streams确保了本地状态的存储对于故障是鲁棒性的。它采用了和Apache Samza类似的方法,每个状态存储,都维护了具有复制的变更日志(Kafka主题),变更日志(changelog)会跟踪每次状态的更新。这些变更日志主题(change topic)会进行分区,每个本地状态存储的实例(local state store instance),都可以被任务获取,任务都有自己专属的变更日志分区(changelog topic partition)。在变更日志主题上会开启日志压缩,来安全地删除旧数据,防止旧数据无限膨胀。如果在一台机器上的任务运行失败,会在其他机器上重新启动,Kafka Streams可以保证恢复故障发生之前相关的状态存储。这是通过在新启动的任务上恢复处理之前,重放对应的变更日志主题来做到的。最终,故障处理对终端用户而言是透明的。

Optimization: In order to minimize the time for the state restoration and hence the cost of task (re)initialization, users can configure their applications to have shadow copies of local states. When a task migration happens, Kafka Streams then attempts to assign a task to where a standby replica exists in order to minimize the task initialization cost. See setting num.standby.replicas at Optional configuration parameters in the Developer Guide.

优化点:为了最小化恢复状态的时间以及任务重新初始化的代价,用户可以为应用程序配置一个本地状态的shadow副本。当一个任务迁移发生时,Kafka Streams会尝试将任务分配到备用副本所在的节点,以尽可能最小化任务初始化的代价。

流处理的保证

Kafka Streams currently supports at-least-once processing guarantees in the presence of failure. This means that if your stream processing application fails, no data records are lost and fail to be processed, but some data records may be re-read and therefore re-processed.

It depends on the specific use case whether at-least-once processing guarantees are acceptable or whether you may need exactly-once processing.

Kafka Streams目前支持在错误场景下至少一次的处理语义。这意味着如果你的流处理应用程序失败了,数据不会丢失,也不会被漏掉处理,但是有些数据可能会被重复读取,并被重复处理。根据不同的用例,用户自己决定是否可以接受至少处理一次的保证,还是需要正好一次的处理。

For many processing use cases, at-least-once processing turns out to be perfectly acceptable: Generally, as long as the effect of processing a data record is idempotent, it is safe for the same data record to be processed more than once. Also, some use cases can tolerate processing data records more than once even if the processing is not idempotent. For example, imagine you are counting hits by IP address to auto-generate blacklists that help with mitigating DDoS attacks against your infrastructure; here, some overcounting is tolerable because hits from malicious IP addresses involved(涉及) in an attack(攻击) will vastly(极大地) outnumber hits from benign(良性的) IP addresses anyway.

对于很多处理场景,至少一次的处理被证明是可接受的:通常而言,只要处理一条记录的影响是幂等的,那么多次处理同一条记录就是安全的。同时,有些用例也允许容忍多次处理,即使处理的影响不是幂等的。比如,想象下你要根据IP地址计算命中次数,来生成帮你你与DDOS攻击的黑名单;这里,(重复处理导致)过高的计数也是允许的,因为来自恶意IP地址的计数参与的攻击相比良性的IP地址数量上会更多。

In general however, for non-idempotent operations such as counting, at-least-once processing guarantees may yield incorrect results. If a Kafka Streams application fails and restarts, it may double-count some data records that were processed shortly before the failure. We are planning to address this limitation and will support stronger guarantees and exactly-once processing semantics in a future release of Kafka Streams.

不过非幂等操作比如计数,在至少一次的处理语义下有可能得到错误的结果。如果流应用程序失败或重启,那么在错误发生前一小段时间内,相同的记录可能会被重复计数。我们正在考虑解决这种限制,并且尝试支持更强的消息处理保证。

流控

Kafka Streams regulates(控制) the progress of streams by the timestamps of data records by attempting to synchronize all source streams in terms of time. By default, Kafka Streams will provide your application with event-time processing semantics. This is important especially when an application is processing multiple streams (i.e., Kafka topics) with a large amount of historical data. For example, a user may want to re-process past data in case the business logic of an application was changed significantly, e.g. to fix a bug in an analytics algorithm. Now it is easy to retrieve a large amount of past data from Kafka; however, without proper flow control, the processing of the data across topic partitions may become out-of-sync and produce incorrect results.

Kafka Streams通过数据记录的时间撮控制流的进度,它会尝试根据时间来同步所有数据源产生的流。默认Kafka Streams为应用程序提供事件时间的处理语义。对于应用程序处理多个具有大量历史数据的流这种场景是特别重要的。举例应用程序的业务逻辑变化很显著时,用户可能想要重新处理过去的数据,比如在一个分析型的算法中修复一个错误。现在,我们可以很容易地从Kafka中接收大量的历史数据,不过如果没有做恰当的流控,在Kafka主题分区之间的数据处理可能变得不同步,并且产生错误的结果。

As mentioned in the Concepts section, each data record in Kafka Streams is associated with a timestamp. Based on the timestamps of the records in its stream record buffer, stream tasks determine the next assigned partition to process among all its input streams. However, Kafka Streams does not reorder records within a single stream for processing since reordering would break the delivery semantics of Kafka and make it difficult to recover in the face of failure. This flow control is of course best-effort(尽最大努力) because it is not always possible to strictly enforce execution order across streams by record timestamp; in fact, in order to enforce strict execution ordering, one must either wait until the system has received all the records from all streams (which may be quite infeasible in practice) or inject additional information about timestamp boundaries or heuristic estimates(启发式的预估) such as MillWheel’s watermarks.

Kafka Streams中的每条数据记录都关联了一个时间撮。基于流记录缓冲区(stream record buffer)中每条记录的时间撮,流任务会在所有输入流中决定下一个需要处理的分配分区。不过Kafka Streams不会在处理一个单一的流时对记录重新排序,因为重新排序会破坏Kafka的消息传递语义,并且在故障发生时不容易恢复(消息的顺序)。这种流控当然会尽最大努力,因为在流中并不可能总是按照记录的时间撮严格限制执行的顺序。实际上,如果要实现严格的执行顺序,一个流要么需要等待,直到系统(流处理应用程序)从所有的流中接收到所有的记录(这在实际中是不可实行的),或者注入关于时间边界的额外信息,或者使用类似MillWheel的水位概念做一些启发式的预估。

背压

Kafka Streams does not use a backpressure mechanism because it does not need one. Using a depth-first processing strategy, each record consumed from Kafka will go through the whole processor (sub-)topology for processing and for (possibly) being written back to Kafka before the next record will be processed. As a result, no records are being buffered in-memory between two connected stream processors. Also, Kafka Streams leverages Kafka’s consumer client behind the scenes, which works with a pull-based messaging model that allows downstream processors to control the pace(步伐) at which incoming data records are being read.

Kafka Streams不适用背压机制,因为它并不需要。使用深入优先的处理策略,从Kafka中消费的每条记录在处理时会流经整个处理拓扑,并且有可能会在处理下一条记录之前回写到Kafka。结果就是:在两个链接的流处理算子中不会有记录被缓存在内存中。同时Kafka Streams利用了Kafka中基于消息拉取模型的消费者客户端,允许下游处理算子控制读取的输入数据的消费速度。

The same applies to the case of a processor topology that contains multiple independent sub-topologies, which will be processed independently from each other (cf. Parallelism Model). For example, the following code defines a topology with two independent sub-topologies:

同样的方式也运用在包含多个独立子拓扑的处理拓扑,每个子拓扑都会各自独立地处理,比如下面的代码定义了一个拓扑,具有两个独立的子拓扑:

1
2
stream1.to("my-topic");
stream2 = builder.stream("my-topic");

Any data exchange between sub-topologies will happen through Kafka, i.e. there is no direct data exchange (in the example above, data would be exchanged through the topic “my-topic”). For this reason there is no need for a backpressure mechanism in this scenario, too.

在子拓扑中的任何数据交换都会经过Kafka,在上面的示例中,并没有直接的数据交换,而是通过”my-topic”进行数据交换。基于这些原因,这种场景下也不需要一个背压机制。

开发者指南

Kafka Streams配置

Kafka Streams的配置通过一个StreamsConfig实例完成。其中下面三个是必须要有的配置项:

  1. application.id:流处理应用程序的编号,在Kafka集群中必须是唯一的
  2. bootstrap.servers:建立和Kafka集群的初始连接
  3. zookeeper.connect:管理Kafka主题的ZooKeeper

每个流处理应用程序的编号必须是唯一的,相同的应用程序编号会给应用程序所有的实例,编号作为资源隔离的标识,用在下面几个地方

  1. 作为默认的Kafka生产者、消费者的client.id前缀
  2. 作为Kafka消费者的group.id,会用来协调工作
  3. 作为状态目录(state.dir)的子目录名称
  4. 作为内部Kafka主题名称的前缀
1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
// Any further settings
settings.put(... , ...);

// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(settings);

Ser-/Deserialization (key.serde, value.serde): Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, i.e.,:

  • Whenever data is read from or written to a Kafka topic (e.g., via the KStreamBuilder#stream() and KStream#to() methods).
  • Whenever data is read from or written to a state store.

当数据需要物化时,在Kafka Streams中会发生序列化和反序列化:

  1. 当从Kafka主题中读取数据,或者写入数据到Kafka主题(比如通过KStreamBuilder.stream()或者KStream.to()方法)
  2. 当从状态存储中读取数据,或者写入数据到状态存储

Number of Standby Replicas (num.standby.replicas): This specifies the number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the specified number of replicas and keep them up to date as long as there are enough instances running. Standby replicas are used to minimize the latency of task failover. A task that was previously running on a failed instance is preferred(优先) to restart on an instance that has standby replicas so that the local state store restoration process from its changelog can be minimized. Details about how Kafka Streams makes use of the standby replicas to minimize the cost of resuming tasks on failover can be found in the State section.

指定备用副本的数量,备用副本是本地状态存储的shadow复制。Kafka Streams会尝试创建指定数量的副本,并且使这些副本一直保持最新的状态,只要有足够的实例在运行的话。备用副本会在任务发生故障切换时最小化延迟。运行在失败实例上的任务会优先在含有备用副本的实例上重启任务,这样可以最小化从变更日志中恢复本地的状态存储。

Number of Stream Threads (num.stream.threads): This specifies the number of stream threads in an instance of the Kafka Streams application. The stream processing code runs in these threads. Details about Kafka Streams threading model can be found in section Threading Model.

指定一个Kafka Streams应用程序实例的流线程数量。流处理代码运行在这些线程上。

Replication Factor of Internal Topics (replication.factor): This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.

指定内部主题的副本因子,在使用本地状态或者流在聚合需要重新分区时,Kafka Streams会创建内部主题。副本对于故障容错非常重要。如果没有副本机制,即使一个Broker挂掉后,也会阻止流处理应用程序的正常进行。推荐设置为和源主题相同的副本因子。

State Directory (state.dir): Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting machine, whose name is the application id, directly under the state directory. The state stores associated with the application are created under this subdirectory.

Kafka Streams会在状态目录下持久化本地状态。每个应用程序在它的物理机的状态目录下都有一个子目录,名称是应用程序的编号。和应用程序关联的状态存储都会在这个子目录下创建。

Timestamp Extractor (timestamp.extractor): A timestamp extractor extracts a timestamp from an instance of ConsumerRecord. Timestamps are used to control the progress of streams.

The default extractor is ConsumerRecordTimestampExtractor. This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer client (introduced in Kafka 0.10.0.0, see KIP-32: Add timestamps to Kafka message). Depending on the setting of Kafka’s log.message.timestamp.type parameter, this extractor will provide you with:

  • event-time processing semantics if log.message.timestamp.type is set to CreateTime aka “producer time” (which is the default). This represents the time when the Kafka producer sent the original message.
  • ingestion-time processing semantics if log.message.timestamp.type is set to LogAppendTime aka “broker time”. This represents the time when the Kafka broker received the original message.

从一个ConsumerRecord实例解析出时间撮的解析器,时间撮会用来控制流的进度。默认的时间撮解析器是ConsumerRecordTimestampExtractor,这个解析器会获取被自动嵌入到Kafka消息中的内置时间撮(KIP-32:生产者产生消息时,会嵌入一个时间段到消息中)。根据log.message.timestamp.type的设置,有两种类型的解析器:

  1. 设置类型为CreateTime,即事件时间的处理语义。也是Producer的时间,作为默认值。表示Kafka生产者发送原始消息的时间点
  2. 设置类型为LogAppendTime,即摄取时间的处理语义。也是Broker的时间。表示Kafka Broker接收原始消息的时间点

Another built-in extractor is WallclockTimestampExtractor. This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock, which effectively means Streams will operate on the basis(基础) of the so-called processing-time of events.

You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of messages. Here is an example of a custom TimestampExtractor implementation:

另一个内置的解析器是WallclockTimestampExtractor,这个解析器并不会从消费记录中解析出一个时间撮,而是返回当前的系统时钟。你也可以提供自定义的时间撮解析器,比如从消息的内容(payload)中获取时间撮(通常是数据源自带的时间,而不是摄取时间),下面是一个自定义的TimestampExtractor实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

@Override
public long extract(ConsumerRecord<Object, Object> record) {
// `Foo` is your own custom class, which we assume has a method that returns
// the embedded timestamp (in milliseconds).
Foo myPojo = (Foo) record.value();
if (myPojo != null) {
return myPojo.getTimestampInMillis();
}
else {
// Kafka allows `null` as message value. How to handle such message values
// depends on your use case. In this example, we decide to fallback to
// wall-clock time (= processing-time).
return System.currentTimeMillis();
}
}

}

You would then define the custom timestamp extractor in your Streams configuration as follows:
然后你需要在Streams配置中指定自定义的时间撮解析器(就像自定义序列化和反序列化器一样,都需要在配置文件中明确指定):

1
2
3
4
5
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName());

Partition Grouper (partition.grouper): A partition grouper is used to create a list of stream tasks given the partitions of source topics, where each created task is assigned with a group of source topic partitions. The default implementation provided by Kafka Streams is DefaultPartitionGrouper, which assigns each task with at most one partition for each of the source topic partitions; therefore, the generated number of tasks is equal to the largest number of partitions among the input topics. Usually an application does not need to customize the partition grouper.

分区分组用来在给定源主题的分区下创建流任务列表,每个创建的任务都会分配一组源主题的分区。默认的实现是DefaultPartitionGrouper,每个任务至多分配到每个源主题分区的一个分区。因此生成的任务数量等于输入主题中的最大分区数量(假设主题A有3个分区,主题B有4个分区,任务数量就等于max(3,4)=4)。通常应用程序不需要自定义分区分组方式。

Apart from Kafka Streams’ own configuration parameters (see previous sections) you can also specify parameters for the Kafka consumers and producers that are used internally, depending on the needs of your application. Similar to the Streams settings you define any such consumer and/or producer settings via StreamsConfig:

除了Kafka Streams自己的配置,你也可以根据你自己应用程序的需求设置内部的Kafka消费者和生产者的配置。

1
2
3
4
5
6
Properties streamsSettings = new Properties();
// Example of a "normal" setting for Kafka Streams
streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings of your Streams application
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
StreamsConfig config = new StreamsConfig(streamsSettings);

编写一个流处理应用程序

Any Java application that makes use of the Kafka Streams library is considered a Kafka Streams application. The computational logic of a Kafka Streams application is defined as a processor topology, which is a graph of stream processors (nodes) and streams (edges).

Currently Kafka Streams provides two sets of APIs to define the processor topology:

  • A low-level Processor API that lets you add and connect processors as well as interact directly with state stores.
  • A high-level Kafka Streams DSL that provides common data transformation operations in a functional programming style such as map and filter operations. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs.

任何使用Kafka Streams客户端库的Java应用程序被认为是一个Kafka Streams应用程序。Kafka Streams应用程序的计算逻辑被定义为一个处理拓扑,它是流处理算子和流的一张图。目前支持两种API定义处理拓扑:

  1. 低级Processor API:允许你添加和连接Processor,以及和状态存储直接交互
  2. 高级DSL:以函数式变成的风格提供通用的数据转换算子

首先需要在pom.xml中定义Kafka Streams的依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>

<!-- Dependencies below are required/recommended only when using Apache Avro. -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.7</version>
</dependency>

你可以在应用程序代码的任何地方调用Kafka Streams库,通常是在main方法中。首先你需要创建一个KafkaStreams实例。KafkaStream构造器的第一个参数接收一个用来定义拓扑的builder(Kafka STreams DSL使用KStreamBuilder,Processor API使用TopologyBuilder);第二个参数是StreamsConfig实例,定义了这个指定拓扑的配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

KStreamBuilder builder = ...; // when using the Kafka Streams DSL
// OR
TopologyBuilder builder = ...; // when using the Processor API

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

现在内部结果已经初始化完毕,不过处理工作还没有开始。你需要手动调用start()方法显示地启动Kafka Streams的线程:

1
2
// Start the Kafka Streams threads
streams.start();

如果这个流处理应用程序还有其他实例运行在其他地方,Kafka Streams针对用户会透明地将任务从已经存在的实例重新分配到你刚刚启动的新实例上。为了捕获一些非预期的异常,你需要在启动应用程序之前设置一个UncaughtExceptionHandler异常处理器,这个处理器会在无论什么时候流处理线程因为非预期的异常而终结的时候被调用。

1
2
3
4
5
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public uncaughtException(Thread t, throwable e) {
// here you should examine the exception and perform an appropriate action!
}
);

停止应用程序实例的方式是调用KafkaStreams的close()方法

1
2
// Stop the Kafka Streams threads
streams.close();

为了保证你的应用程序在响应SIGTERM时优雅地退出,推荐在关闭钩子中调用KafkaStreams.close()方法,Java8中的关闭钩子使用方式如下:

1
2
// add shutdown hook to stop the Kafka Streams threads
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Java7中的而关闭钩子使用方式如下:

1
2
3
4
5
6
7
8
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
// Stop the Kafka Streams threads
streams.close();
}
}));

当一个特定的应用程序实例停止时,Kafka Streams会将运行在这个实例上的任何任务迁移到其他运行的实例上(假设还存在这样的应用程序实例)。下面我们会详细描述两种API的使用方式。

Processor API

As mentioned in the Concepts section, a stream processor is a node in the processor topology that represents a single processing step. With the Processor API users can define arbitrary stream processors that processes one received record at a time, and connect these processors with their associated state stores to compose the processor topology.

流处理算子是处理拓扑的一个节点,代表了一个单独的处理步骤。使用Processor API,用户可以定义任意的流处理算子,一次处理一条接收到的记录,并且将这些算子和它们对应的状态存储连接在一起,组成算子拓扑。用户可以实现Processor接口来定义定制的流算子,Processor接口有两个主要的方法:

DEFINING A STREAM PROCESSOR

Users can define their customized stream processor by implementing the Processor interface, which provides two main API methods: process() and punctuate().

  • process() is called on each of the received record.
  • punctuate() is called periodically based on advanced record timestamps. For example, if processing-time is used as the record timestamp, then punctuate() will be triggered every specified period of time.
  1. process()会在每个接收到的记录上调用
  2. punctuate()会基于记录的时间撮被定时调用

The Processor interface also has an init() method, which is called by the Kafka Streams library during task construction phase. Processor instances should perform any required initialization in this method. The init() method passes in a ProcessorContext instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, its corresponding message offset, and further such information . This context instance can also be used to schedule the punctuation period (via ProcessorContext#schedule()) for punctuate(), to forward a new record as a key-value pair to the downstream processors (via ProcessorContext#forward()), and to commit the current processing progress (via ProcessorContext#commit()).

Processor接口也有一个init()方法,它会在Kafka Streams库初始化任务的阶段调用。Processor实例需要在该方法上执行一些必须的初始化工作。init()方法传递了一个ProcessorContext实例,为当前处理的记录提供元数据的访问接口,包括输入源的Kafka主题和分区,对应的消息偏移量,以及更多的一些信息。这个上下文对象也可以被用来调度punctuation()方法的间隔(通过schedule方法),将新的记录作为键值对转发到下游的处理算子(通过forward方法),或者提交当前的处理进度(通过commit方法)。

下面的Processor实现类定义了一个简单的WordCount算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class WordCountProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;

@Override
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// call this processor's punctuate() method every 1000 time units.
this.context.schedule(1000);
// retrieve the key-value store named "Counts"
kvStore = (KeyValueStore) context.getStateStore("Counts");
}

@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");

for (String word : words) {
Long oldValue = kvStore.get(word);
if (oldValue == null) {
kvStore.put(word, 1L);
} else {
kvStore.put(word, oldValue + 1L);
}
}
}

@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, Long> iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
// commit the current processing progress
context.commit();
}

@Override
public void close() {
// close the key-value store
kvStore.close();
}
}

在上面的实现中,执行了下列操作:

  • In the init() method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.
  • In the process() method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).
  • In the punctuate() method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.
  1. init()方法,调度punctuation每隔1000m个时间单元(这个时间单元通常是ms,这里表示每隔一秒调度一次punctuation),并且通过“Counts”名称获取本地的状态存储
  2. process()方法,当接收到每条记录时,将字符串分成多个单词,并且将它们的次数更新到状态存储中
  3. punctuate()方法,迭代本地状态存储,发送聚合次数给下游的处理算子,最后提交当前的流状态

DEFINING A STATE STORE 定义一个状态存储

Note that the WordCountProcessor defined above can not only access the currently received record in the process() method, but also maintain processing states to keep recently arrived records for stateful processing needs such as aggregations and joins. To take advantage of these states, users can define a state store by implementing the StateStore interface (the Kafka Streams library also has a few extended interfaces such as KeyValueStore); in practice, though, users usually do not need to customize such a state store from scratch but can simply use the Stores factory to define a state store by specifying whether it should be persistent, log-backed, etc.

上面定义的WordCountProcessor不仅可以在process()方法中访问当前接收到的记录,也会维护处理状态,并保持最近到达的记录,可以被用于有状态的处理比如聚合和联合操作。为了利用这些状态的优势,用户可以实现自定义的StateStore接口。不过实际中用户通常不需要从头开始实现一个这样的状态存储,而只需要使用Stores的工厂类来定义一个状态存储。下面的示例中,定义了一个持久化的键值存储,名字叫做“Counts”,并且Key的类型是String,Value的类型是Long。

1
2
3
4
5
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();

CONNECTING PROCESSORS AND STORES 连接处理算子和状态

Now that we have defined the processor and the state stores, we can now construct the processor topology by connecting these processors and state stores together by using the TopologyBuilder instance. In addition, users can add source processors with the specified Kafka topics to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate output data streams out of the topology.

现在我们已经定义了处理算子和状态存储,我们可以开始构造处理拓扑:通过使用TopologyBuilder的实例来连接这些处理算子以及状态存储。另外,用户可以添加输入源处理算子(source processors),并且指定特定的Kafka主题,这样就可以(读取Kafka主题)生成输入数据流进入到拓扑中;也可以指定目标处理算子(sink processors),也会指定特定的Kafka主题,这样就可以(写入Kafka主题)生成输出数据流到拓扑之外。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
TopologyBuilder builder = new TopologyBuilder();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")

// add the WordCountProcessor node which takes the source processor as its upstream processor
.addProcessor("Process", () -> new WordCountProcessor(), "Source")

// create the countStore associated with the WordCountProcessor processor
.addStateStore(countStore, "Process")

// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
.addSink("Sink", "sink-topic", "Process");

There are several steps in the above implementation to build the topology, and here is a quick walk through:
上面的实现中构造一个拓扑有几个步骤:

  • A source processor node named “Source” is added to the topology using the addSource method, with one Kafka topic “source-topic” fed to it.
  • A processor node named “Process” with the pre-defined WordCountProcessor logic is then added as the downstream processor of the “Source” node using the addProcessor method.
  • A predefined persistent key-value state store countStore is created and associated to the “Process” node.
  • A sink processor node is then added to complete the topology using the addSink method, taking the “Process” node as its upstream processor and writing to a separate “sink-topic” Kafka topic.
  1. 一个叫做“Source”的源算子被添加到拓扑中,使用了addSource方法,并且有一个Kafka的主题“source-topic”会(将数据)喂到这个源算子里
  2. 一个叫做“Process”的处理节点,使用了预定义的WordCountProcessor逻辑,然后通过addProcessor方法将其添加到(作为)“Source”节点的下游处理算子
  3. 创建了一个预定义的持久化键值存储,保存了countStore,并且关联了“Process”节点
  4. 使用addSink方法添加了一个目标处理节点,构成了一个完整的拓扑。将“Process”节点作为它的上游处理算子,并且写到一个Kafka的输出主题

In this defined topology, the “Process” stream processor node is considered a downstream processor of the “Source” node, and an upstream processor of the “Sink” node. As a result, whenever the “Source” node forward a newly fetched record from Kafka to its downstream “Process” node, WordCountProcessor#process() method is triggered to process the record and update the associated state store; and whenever context#forward() is called in the WordCountProcessor#punctuate() method, the aggregate key-value pair will be sent via the “Sink” processor node to the Kafka topic “sink-topic”. Note that in the WordCountProcessor implementation, users need to refer with the same store name “Counts” when accessing the key-value store; otherwise an exception will be thrown at runtime indicating that the state store cannot be found; also if the state store itself is not associated with the processor in the TopologyBuilder code, accessing it in the processor’s init() method will also throw an exception at runtime indicating the state store is not accessible from this processor.

在上面定义的拓扑中,“Process”流处理节点被认为是“Source”节点的下游处理算子,也是“Sink”节点的上游处理算子。结果就是:无论什么时候,当”Source“节点从Kafka拉取一条新的记录,并转发给下游的”Process“节点,WordCountProcessor#process()方法就会被触发,并且会处理这条记录,以及更新相应的状态存储;并且无论什么时候当在WordCountProcessor#punctuate()方法中调用context#forward()时,聚合的键值对会通过”Sink“处理节点发送到Kafka的输出主题“sink-topic”中。注意在WordCountProcessor的实现中,用户在获取键值存储时需要参考这里(构造拓扑)相同的状态存储名称“Counts”(即拓扑定义的Counts键值存储名称,在WordCountProcessor中为了获取这个键值存储,两者的名称必须是一致的),否则在运行时会抛出一个异常说状态存储未找到。如果状态存储本身没有和TopologyBuilder代码中的处理节点(Processor节点,只有WordCountProcessor这一个节点)相关联的话,在Processor的init方法中访问状态存储也会抛出运行时的异常(状态存储不能在当前Processor节点访问)。

With the defined processor topology, users can now start running a Kafka Streams application instance. Please read how to run a Kafka Streams application for details.

有了定义好的处理拓扑,用户现在就可以启动一个Kafka Streams应用程序实例了。

Kafka Streams DSL

As mentioned in the Concepts section, a stream is an unbounded, continuously updating data set. With the Kafka Streams DSL users can define the processor topology by concatenating multiple transformation operations where each operation transforming one stream into other stream(s); the resulted topology then takes the input streams from source Kafka topics and generates the final output streams throughout its concatenated transformations. However, different streams may have different semantics in their data records:

一个流是一个无界的,持续更新的数据集。使用Kakfa Streams的DSL,用户可以通过连接多个转换操作的方式来定义处理拓扑,其中每个操作会从一个流转换到新的其他流。最终拓扑会将从Kafka源读取的主题作为输入流,并且在贯穿连接的转换操作中,生成最终的输出流。不过不同的流在它们的数据记录中,可能有不同的语义:

  • In some streams, each record represents a new immutable datum in their unbounded data set; we call these record streams.
  • In other streams, each record represents a revision (or update) of their unbounded data set in chronological(按时间顺序) order; we call these changelog streams.
  1. 在一些流中,每条记录代表了无界数据集中新的不可变数据,我们叫做记录流(record streams)
  2. 在其他流中,每条记录代表了无界数据集中按照时间顺序排序的更新,我们叫做变更日志流(changelog streams)

这两种类型的流都可以存储成Kafka的主题。不过,它们的计算语义则截然不同。举例有一个聚合操作,为指定的key计算记录的次数。对于记录流(record streams)而言,每条记录都是来自于Kafka主题中带有key的消息(比如一个页面浏览的数据流会以用户的编号作为key):

Both of these two types of streams can be stored as Kafka topics. However, their computational semantics can be quite different. Take the example of an aggregation operation that counts the number of records for the given key. For record streams, each record is a keyed message from a Kafka topic (e.g., a page view stream keyed by user ids):

1
2
3
4
5
6
# Example: a record stream for page view events
# Notation is <record key> => <record value>
1 => {"time":1440557383335, "user_id":1, "url":"/home?user=1"}
5 => {"time":1440557383345, "user_id":5, "url":"/home?user=5"}
2 => {"time":1440557383456, "user_id":2, "url":"/profile?user=2"}
1 => {"time":1440557385365, "user_id":1, "url":"/profile?user=1"}

The counting operation for record streams is trivial to implement: you can maintain a local state store that tracks the latest count for each key, and, upon receiving a new record, update the corresponding key by incrementing its count by one.

对记录流进行计数操作非常容易实现:你可以维护一个本地的状态存储,用来跟踪每个key的最近的次数。并且,当接收到一条新的记录时,更新对应key的值,把它的次数加1。

For changelog streams, on the other hand, each record is an update of the unbounded data set (e.g., a changelog stream for a user profile table, where the user id serves as both the primary key for the table and as the record key for the stream; here, a new record represents a changed row of the table). In practice you would usually store such streams in Kafka topics where log compaction is enabled.

对于变更日志流而言,每条记录都是对于无界数据集的一次更新(比如一个变更日志流是针对用户的个人信息表,用户的编号既作为表的主键,也会作为日志流记录的key,这里一条新记录表示的是表的一行更新)。实际应用中在Kafka的主题存储这样的路,应该开启日志压缩。

1
2
3
4
5
6
# Example: a changelog stream for a user profile table
1 => {"last_modified_time":1440557383335, "user_id":1, "email":"user1@aol.com"}
5 => {"last_modified_time":1440557383345, "user_id":5, "email":"user5@gmail.com"}
2 => {"last_modified_time":1440557383456, "user_id":2, "email":"user2@yahoo.com"}
1 => {"last_modified_time":1440557385365, "user_id":1, "email":"user1-new-email-addr@comcast.com"}
2 => {"last_modified_time":1440557385395, "user_id":2, "email":null} <-- user has been deleted

As a result the counting operation for changelog streams is no longer monotonically incrementing: you need to also decrement the counts when a delete update record is received on some given key as well. In addition, even for counting aggregations on an record stream, the resulting aggregate is no longer an record stream but a relation / table, which can then be represented as a changelog stream of updates on the table.

对变更日志流的计数操作不再是单调递增的了:当接收到指定key的一个删除更新记录时,你需要减少计数。另外,即使是在记录流上做聚合计数,聚合的结果也不是一个记录流,而是一张表,它代表的是在表上的变更日志流的更新。

The counting operation of a user profile changelog stream is peculiar(罕见的) because it will generate, for a given key, a count of either 0 (meaning the key does not exist or does not exist anymore) or 1 (the key exists) only. Multiple records for the same key are considered duplicate, old information of the most recent record, and thus will not contribute to the count.

For changelog streams developers usually prefer counting a non-primary-key field. We use the example above just for the sake of illustration.

对用户的个人信息变更日志流做计数操作是比较少见的,因为它为指定的key生成的计数要么是0(表示key不存在),要么是1(key存在)。相同key的多个记录被认为是重复的、相对当前最近记录而言是旧的信息,因此不会对计数结果产生影响。对于变更日志流,开发者通常会在非主键字段上计数,上面的示例仅仅是为了演示(有主键)。

One of the key design principles of the Kafka Streams DSL is to understand and distinguish between record streams and changelog streams and to provide operators with the correct semantics for these two different types of streams. More concretely, in the Kafka Streams DSL we use the KStream interface to represent record streams, and we use a separate KTable interface to represent changelog streams. The Kafka Streams DSL is therefore the recommended API to implement a Kafka Streams application. Compared to the lower-level Processor API, its benefits are:

Kafka Streams DSL的一个主要设计原则是理解和区分记录流合变更日志流,并且为这两种类型流的操作提供正确的语义。更具体地来说,在Kafka Streams DSL中,我们使用KStream接口来表示记录流,使用KTable接口代表变更日志流。所以Kafka Streams DSL是用来在实现Kafka Streams应用程序时推荐的API,和低级Processor API比较,它有几个优点:

  • More concise and expressive code, particularly when using Java 8+ with lambda expressions.
  • Easier to implement stateful transformations such as joins and aggregations.
  • Understands the semantic differences of record streams and changelog streams, so that transformations such as aggregations work as expected depending on which type of stream they operate against.
  1. 更简洁、更具有表达力的代码,尤其是使用Java8的lambda表达式时
  2. 可以很容易地实现一个有状态的操作,比如联合和聚合操作
  3. 理解记录流和变更日志流的语义区别,这样转换操作(比如聚合)根据流的类型按照预期的方式工作

CREATING SOURCE STREAMS FROM KAFKA

Both KStream or KTable objects can be created as a source stream from one or more Kafka topics via KStreamBuilder, an extended class of TopologyBuilder used in the lower-level Processor API (for KTable you can only create the source stream from a single topic).

KStream和KTable对象都可以通过KStreamBuilder创建,作为从一个或多个Kafka主题的输入源流。KStreamBuilder是TopologyBuilder(低级的Processor API)的继承类。对于KTable,你只能从一个Kafka主题中创建一个输入源流。

Interface How to instantiate
KStream<K, V> KStreamBuilder#stream(…)
KTable<K, V> KStreamBuilder#table(…)

When creating an instance, you may override the default serdes for record keys (K) and record values (V) used for reading the data from Kafka topics (see Data types and serdes for more details); otherwise the default serdes specified through StreamsConfig will be used.

当创建一个(KStream或KTable)实例时,需要指定输入源的Kafka主题,你可能需要指定读取记录的序列化器,如果不指定时,会使用StreamsConfig中指定的序列化器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

KStreamBuilder builder = new KStreamBuilder();

// In this example we assume that the default serdes for keys and values are
// the String serde and the generic Avro serde, respectively.

// Create a stream of page view events from the PageViews topic, where the key of
// a record is assumed to be the user id (String) and the value an Avro GenericRecord
// that represents the full details of the page view event.
KStream<String, GenericRecord> pageViews = builder.stream("PageViews");

// Create a changelog stream for user profiles from the UserProfiles topic,
// where the key of a record is assumed to be the user id (String) and its value
// an Avro GenericRecord.
KTable<String, GenericRecord> userProfiles = builder.table("UserProfiles");

TRANSFORM A STREAM

KStream and KTable support a variety of transformation operations. Each of these operations can be translated into one or more connected processors into the underlying processor topology. Since KStream and KTable are strongly typed, all these transformation operations are defined as generics functions where users could specify the input and output data types.

KStream和KTable支持很多类型的转换操作。每种操作都可以翻译成底层处理拓扑的一个或多个相连起来的处理算子。由于KStream和KTable是强类型的,所有这些转换操作被定义为通用的函数,这样用户可以指定输入和输出的数据类型。

Some KStream transformations may generate one or more KStream objects (e.g., filter and map on KStream generate another KStream, while branch on KStream can generate multiple KStream) while some others may generate a KTable object (e.g., aggregation) interpreted(理解) as the changelog stream to the resulted relation. This allows Kafka Streams to continuously update the computed value upon arrivals of late records after it has already been produced to the downstream transformation operators. As for KTable, all its transformation operations can only generate another KTable (though the Kafka Streams DSL does provide a special function to convert a KTable representation into a KStream, which we will describe later). Nevertheless, all these transformation methods can be chained together to compose a complex processor topology.

有些KStream转换可能产生不止一个KStream对象(在KStream上进行过滤和映射会生成新的KStream,而在KStream上进行分支则会产生多个KStream),而有些可能产生一个KTable对象(比如聚合转换)。这就允许Kafka Streams即使在记录已经发送给下游的转换算子的情况下,在迟到的记录到来时,也可以持续地更新已经计算过的值。对于KTable而言,它的所有转换操作只会生成新的KTable(尽管Kafka Streams DSL提供额外的函数可以将一个KTable转换成KStream)。不仅如此,所有这些转换操作都可以被链接在一起,从而构造出一个复杂的处理拓扑。

We describe these transformation operations in the following subsections, categorizing them as two categories: stateless and stateful transformations.

下面我们会将这些转换操作分成两类:无状态的和有状态的转换。

STATELESS TRANSFORMATIONS

Stateless transformations include filter, filterNot, foreach, map, mapValues, selectKey, flatMap, flatMapValues, branch. Most of them can be applied to both KStream and KTable, where users usually pass a customized function to these functions as a parameter; e.g. a Predicate for filter, a KeyValueMapper for map, and so on. Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise they do not require a state store associated with the stream processor.

无状态的转换包括:filter, filterNot, foreach, map, mapValues, selectKey, flatMap, flatMapValues, branch。它们中的大部分都可以被同时用在KStream和KTable上,用户通常需要为这些函数传递一个自定义的函数作为参数。比如对于filter算子,需要传递一个Predicate,对于map算子需要传递一个KeyValueMapper等等。无状态的操作,从定义上来说,不依赖于处理的任何状态,因此它们并不需要和流处理算子相关联的状态存储。

下面是mapValues分别使用Java8的lambda,以及Java7的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//Java8
KStream<Long, String> uppercased =
nicknameByUserId.mapValues(nickname -> nickname.toUpperCase());

//Java7
KStream<Long, String> uppercased =
nicknameByUserId.mapValues(
new ValueMapper<String>() {
@Override
public String apply(String nickname) {
return nickname.toUpperCase();
}
}
);

The function is applied to each record, and its result will trigger the creation a new record.

这个函数会作用于每条记录,它的结果会创建一个新的记录。

STATEFUL TRANSFORMATIONS

有状态的操作包括如下几个:

  • joins (KStream/KTable): join, leftJoin, outerJoin
  • aggregations (KStream): countByKey, reduceByKey, aggregateByKey
  • aggregations (KTable): groupBy plus count, reduce, aggregate (via KGroupedTable)
  • general transformations (KStream): process, transform, transformValues

Stateful transformations are transformations where the processing logic requires accessing an associated state for processing and producing outputs. For example, in join and aggregation operations, a windowing state is usually used to store all the records received so far within the defined window boundary. The operators can then access accumulated records in the store and compute based on them (see Windowing a Stream for details).

有状态的操作指的是转换的处理逻辑需要访问相关的状态,来做处理操作和产生结果。举例联合和聚合操作,会使用一个窗口状态,存储定义的窗口边界内接收到的所有记录。操作算子就可以从存储中访问收集到的记录,基于这些记录做计算。使用Java8的WordCoun示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// We assume message values represent lines of text.  For the sake of this example, we ignore
// whatever may be stored in the message keys.
KStream<String, String> textLines = ...;

KStream<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words. The text lines are the message
// values, i.e. we can ignore whatever data is in the message keys and thus invoke
// `flatMapValues` instead of the more generic `flatMap`.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// We will subsequently invoke `countByKey` to count the occurrences of words, so we use
// `map` to ensure the key of each record contains the respective word.
.map((key, word) -> new KeyValue<>(word, word))
// Count the occurrences of each word (record key).
//
// This will change the stream type from `KStream<String, String>` to
// `KTable<String, Long>` (word -> count). We must provide a name for
// the resulting KTable, which will be used to name e.g. its associated
// state store and changelog topic.
.countByKey("Counts")
// Convert the `KTable<String, Long>` into a `KStream<String, Long>`.
.toStream();

WordCount使用Java 7:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Code below is equivalent to the previous Java 8+ example above.
KStream<String, String> textLines = ...;

KStream<String, Long> wordCounts = textLines
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase().split("\\W+"));
}
})
.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String word) {
return new KeyValue<String, String>(word, word);
}
})
.countByKey("Counts")
.toStream();

WINDOWING A STREAM

Windowing is a common prerequisite for stateful transformations which group records in a stream, for example, by their timestamps. A local state store is usually needed for a windowing operation to store recently received records based on the window interval, while old records in the store are purged after the specified window retention period. The retention time can be set via Windows#until().

窗口是有状态转换操作的基本条件,它会在流中对记录进行分组,比如根据时间撮的方式进行分组。一个窗口相关的操作通常需要一个本地状态存储,来保存最近接收到的记录,这些记录是基于窗口间隔,状态存储中旧的记录会在指定的窗口保留时间过去后会被删除。保留时间是通过Windows#until()设置的。Kafka Streams目前支持以下类型的窗口:

Window name Behavior Short description
Tumbling time window(翻转窗口) Time-based Fixed-size, non-overlapping, gap-less windows
Hopping time window(跳跃时间窗口) Time-based Fixed-size, overlapping windows
Sliding time window(滑动时间窗口) Time-based Fixed-size, overlapping windows that work on differences between record timestamps

Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.

翻转窗口是跳跃窗口的一个特例,和后者一样,所有的窗口都是基于时间间隔。窗口的大小是固定的,窗口之间不会重复,也没有间隙。一个翻转窗口之定义了窗口大小这个简单的属性,默认它的窗口大小和前进间隔相等。由于翻转窗口不会有重叠,所以一条记录指挥属于一个窗口。

1
2
3
4
5
6
7
8
// A tumbling time window with a size 60 seconds (and, by definition, an implicit
// advance interval of 60 seconds).
// The window's name -- the string parameter -- is used to e.g. name the backing state store.
long windowSizeMs = 60 * 1000L;
TimeWindows.of("tumbling-window-example", windowSizeMs);

// The above is equivalent to the following code:
TimeWindows.of("tumbling-window-example", windowSizeMs).advanceBy(windowSizeMs);

Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap – and in general they do – a data record may belong to more than one such windows.

跳跃时间窗口是基于时间间隔的窗口,固定大小,但是窗口之间可能有重叠。跳跃窗口定义了两个属性:窗口大小和跳跃间隔(hop的中文意思是跳跃)。跳跃间隔指定了一个窗口相对于前一个窗口的移动大小。比如你可以配置一个跳跃窗口的大小为5分钟,跳跃间隔为1分钟。由于跳跃窗口允许重叠,所以一条记录可能属于不止一个窗口。

1
2
3
4
5
// A hopping time window with a size of 5 minutes and an advance interval of 1 minute.
// The window's name -- the string parameter -- is used to e.g. name the backing state store.
long windowSizeMs = 5 * 60 * 1000L;
long advanceMs = 1 * 60 * 1000L;
TimeWindows.of("hopping-window-example", windowSizeMs).advanceBy(advanceMs);

Pay attention, that tumbling and hopping time windows are aligned to the epoch and that the lower window time interval bound is inclusive, while the upper bound is exclusive.

Aligned to the epoch means, that the first window starts at timestamp zero. For example, hopping windows with size of 5000ms and advance of 3000ms, have window boundaries [0;5000),[3000;8000),…— and not [1000;6000),[4000;9000),… or even something “random” like [1452;6452),[4452;9452),…, which might be the case if windows get initialized depending on system/application start-up time, introducing non-determinism.

注意,翻转窗口和跳跃窗口是和时间点对齐的,所以窗口时间的下界(lower bound)被包含,而下界(upper bound)不被包含。和时间点对齐表示,第一个窗口从时间点为0开始。比如跳跃窗口的大小=5000ms,跳跃间隔=3000ms,对应的窗口边界分别是[0;5000),[3000;8000),...,而不是[1000;6000),[4000;9000),...,也不是[1452;6452),[4452;9452),...。窗口计数的示例如下:

1
2
3
4
5
KStream<String, GenericRecord> viewsByUser = ...;

KTable<Windowed<String>, Long> userCounts =
// count users, using hopping windows of size 5 minutes that advance every 1 minute
viewsByUser.countByKey(TimeWindows.of("GeoPageViewsWindow", 5 * 60 * 1000L).advanceBy(60 * 1000L));

Unlike non-windowed aggregates that we have seen previously, windowed aggregates return a windowed KTable whose key type is Windowed. This is to differentiate aggregate values with the same key from different windows. The corresponding window instance and the embedded key can be retrieved as Windowed#window() and Windowed#key(), respectively.

和前面看到的没有窗口的聚合不同的是,窗口聚合操作返回一个带有窗口的KTable,它的key是Windowed,目的是区分不同窗口中存在相同的key(如果没有带窗口,那么相同的key在不同的窗口中就无法区分)。对应的窗口实例以及内置的key分别可以通过Windowed#window()Windowed#key()获取到。

Sliding windows are actually quite different from hopping and tumbling windows. A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps. Pay attention, that in contrast to hopping and tumbling windows, lower and upper window time interval bounds are both inclusive. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the JoinWindows class.

滑动窗口和跳跃窗口、翻转窗口都不同,它也有固定的窗口大小,不过是在时间轴上持续地滑动。比如有两条记录的时间撮差别是在窗口大小内的,这两条记录就会被包含在同一个窗口中。所以滑动窗口并不是和时间点对齐,而是和记录的时间撮对齐。注意和跳跃窗口、翻转窗口相反的是,滑动窗口会同时包含窗口边界的上界和下界。在Kafka Streams中,滑动窗口仅用于join操作。

JOINING STREAMS

Many stream processing applications can be coded as stream join operations. For example, applications backing an online shop might need to access multiple, updating database tables (e.g. sales prices, inventory, customer information) when processing a new record. These applications can be implemented such that they work on the tables’ changelog streams directly, i.e. without requiring to make a database query over the network for each record. In this example, the KTable concept in Kafka Streams would enable you to track the latest state (think: snapshot) of each table in a local key-value store, thus greatly reducing the processing latency as well as reducing the load of the upstream databases.

许多流处理应用程序都需要流式的join操作。比如在线商店应用程序在处理一条新记录时可能需要访问或更新多张数据库表(比如销售价格表,库存表,用户信息表)。这些应用程序可以在表的变更日志流(KTable是变更日志流,KStream是记录流)上直接实现,比如对每条记录不需要跨网络的数据库查询。在这个示例中,Kafka Streams的KTable概念使你可以在一个本地键值存储中跟踪每张表的最新状态(快照),因此可以很明显地减少处理延迟,以及减少上游数据库的压力。在Kafka Streams中,有下面两种的join操作:

  • Join a KStream with another KStream or KTable.
  • Join a KTable with another KTable only.

有三种join组合方式:

  • KStream-to-KStream Joins are always windowed joins, since otherwise the join result size might grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream’s records within the specified window interval to produce one result for each matching pair based on user-provided ValueJoiner. A new KStream instance representing the result stream of the join is returned from this operator.
  • KTable-to-KTable Joins are join operations designed to be consistent(一致) with the ones in relational databases. Here, both changelog streams are materialized into local state stores to represent the latest snapshot of the their data table duals(双重). When a new record is received from one of the streams, it is joined with the other stream’s materialized state stores to produce one result for each matching pair based on user-provided ValueJoiner. A new KTable instance representing the result stream of the join, which is also a changelog stream of the represented table, is returned from this operator.
  • KStream-to-KTable Joins allow you to perform table lookups against a changelog stream (KTable) upon receiving a new record from another record stream (KStream). An example use case would be to enrich(使丰富) a stream of user activities (KStream) with the latest user profile information (KTable). Only records received from the record stream will trigger the join and produce results via ValueJoiner, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store). A new KStream instance representing the result stream of the join is returned from this operator.
  1. KStream-to-KStream Join操作总是针对窗口的Join,否则Join结果的大小会无限膨胀。从其中一个流接收到的新记录会和另外一个流的记录进行Join,后者的流会指定窗口间隔,最后会基于用户提供的ValueJoiner为每个匹配对产生一个结果。这里返回的结果是一个新的KStream实例,代表了Join操作的结果流。
  2. KTable-to-KTable Join操作被设计为和关系型数据库类似的操作。两个变更日志流(KTable)都会被物化成本地状态存储,表示数据表的最近快照。当从其中的一个流接收到一条新记录,它会和另外一个流的物化状态存储进行join,并根据用户提供的ValueJoiner产生一个匹配的结果。返回的结果是新的KTable实例,代表Join操作的结果流,它也是一个变更日志流。
  3. KStream-to-KTable Join操作允许你在记录流(KStream)上接收到一条新记录时,从一个变更日志流(KTable)上执行表(级别的记录)查询。比如对一个用户活动流(KStream)使用最近的用户个人信息(KTable)进行信息增强。只有从记录流接收的记录才会触发join操作,并通过ValueJoiner产生结果,反过来则不行(比如从变更日志流中接收的新记录只会被用来更新物化的状态存储,而不会和KStream记录流进行join)。返回的结果是一个新的KStream,代表了Join操作的结果流。

根据操作对象,不同操作类型支持不同的join语义:

Join operands (INNER) JOIN OUTER JOIN LEFT JOIN Result
KStream-to-KStream Supported Supported Supported KStream
KTable-to-KTable Supported Supported Supported KTable
KStream-to-KTable N/A N/A Supported KStream

Kafka Streams的Join语义类似于关系型数据库的操作符:

  • Inner join produces new joined records when the join operator finds some records with the same key in the other stream / materialized store.
  • Outer join works like inner join if some records are found in the windowed stream / materialized store. The difference is that outer join still produces a record even when no records are found. It uses null as the value of missing record.
  • Left join is like outer join except that for KStream-KStream join it is always driven by record arriving from the primary stream; while for KTable-KTable join it is driven by both streams to make the result consistent with the left join of databases while only permits missing records in the secondary stream. In a KStream-KTable left join, a KStream record will only join a KTable record if the KTable record arrived before the KStream record (and is in the KTable). Otherwise, the join result will be null
  1. Inner join:当join操作符在两个流或物化存储中都找到相同key的记录,产生新的join记录。
  2. Outer join:如果在窗口流或物化视图中找到记录,则和inner join类似。不同的是,outer join即使在没有找到记录也会输出一条记录,对于缺失的记录使用null作为value。
  3. Left join:和outer join类似,不过对于KStream-KSteram的join(KStream left join KStream),它总是在主要流(A left join B,则A是主要的流)的记录到达时驱动的;对于KTable-KTable的join(KTable left join KTable),它是由两个流一起驱动,并且结果和left join左边的流是一致的,只允许右边流的记录缺失;对于KStream-KTable的left join(KStream left join KTable),一条KStream的记录只会和一条KTable的记录join,并且这条KTable的记录必须要在KStream的记录之前到达(当然必须在KTable中),否则join结果为null。

Since stream joins are performed over the keys of records, it is required that joining streams are co-partitioned by key, i.e., their corresponding Kafka topics must have the same number of partitions and partitioned on the same key so that records with the same keys are delivered to the same processing thread. This is validated by Kafka Streams library at runtime (we talked about the threading model and data parallelism with more details in the Architecture section).

由于流的join是在记录的key上执行的,这就要求参与join的流能够按照key协同分区。比如它们(流)对应的Kafka主题必须有相等数量的分区,而且在相同的key上进行分区。这样相同key的记录会被发送到相同的处理线程。上面的限制会在Kafka Streams库中在运行时进行验证。

Join示例,使用Java8:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Key is user, value is number of clicks by that user
KStream<String, Long> userClicksStream = ...;
// Key is user, value is the geo-region of that user
KTable<String, String> userRegionsTable = ...;

// KStream-KTable join
KStream<String, RegionWithClicks> userClicksWithRegion = userClicksStream
// Null values possible: In general, null values are possible for region (i.e. the value of
// the KTable we are joining against) so we must guard against that (here: by setting the
// fallback region "UNKNOWN").
//
// Also, we need to return a tuple of (region, clicks) for each user. But because Java does
// not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to
// achieve the same effect. This class two fields -- the region (String) and the number of
// clicks (Long) for that region -- as well as a matching constructor, which we use here.
.leftJoin(userRegionsTable,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks));

Join示例,使用Java7:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Key is user, value is number of clicks by that user
KStream<String, Long> userClicksStream = ...;
// Key is user, value is the geo-region of that user
KTable<String, String> userRegionsTable = ...;

// KStream-KTable join
KStream<String, RegionWithClicks> userClicksWithRegion = userClicksStream
// Null values possible: In general, null values are possible for region (i.e. the value of
// the KTable we are joining against) so we must guard against that (here: by setting the
// fallback region "UNKNOWN").
//
// Also, we need to return a tuple of (region, clicks) for each user. But because Java does
// not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to
// achieve the same effect. This class two fields -- the region (String) and the number of
// clicks (Long) for that region -- as well as a matching constructor, which we use here.
.leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() {
@Override
public RegionWithClicks apply(Long clicks, String region) {
return new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks);
}
});

APPLYING A CUSTOM PROCESSOR

Beyond the provided transformation operators, users can also specify any customized processing logic on their stream data via the KStream#process() method, which takes an implementation of the ProcessorSupplier interface as its parameter. This is essentially equivalent to the addProcessor() method in the Processor API.

The following example shows how to leverage, via the process() method, a custom processor that sends an email notification whenever a page view count reaches a predefined threshold.

除了Kafka Streams提供的转换操作(DSL),用户可以在流数据中通过KStream#process()方法指定定制的处理逻辑:将ProcessorSupplier接口的实现作为参数,这和Processor API的addProcessor()方法是等价的。下面的示例展示了通过process()方法如何使用自定义处理器,这个处理器会在当页面浏览计数到达指定的阈值时发送一个邮件通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Send an email notification when the view count of a page reaches one thousand. 
// JAVA8
pageViews.countByKey("PageViewCounts")
.filter((PageId pageId, Long viewCount) -> viewCount == 1000)
// PopularPageEmailAlert is your custom processor that implements the
// `Processor` interface, see further down below.
.process(() -> new PopularPageEmailAlert("alerts@yourcompany.com"));

// JAVA7
pageViews.countByKey("PageViewCounts")
.filter(
new Predicate<PageId, Long>() {
public boolean test(PageId pageId, Long viewCount) {
return viewCount == 1000;
}
})
.process(
new ProcessorSupplier<PageId, Long>() {
public Processor<PageId, Long> get() {
// PopularPageEmailAlert is your custom processor that implements
// the `Processor` interface, see further down below.
return new PopularPageEmailAlert("alerts@yourcompany.com");
}
});

上面的示例中PopularPageEmailAlert是一个自定义实现了Processor接口的流处理算子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// A processor that sends an alert message about a popular page to a configurable email address
public class PopularPageEmailAlert implements Processor<PageId, Long> {
private final String emailAddress;
private ProcessorContext;

public PopularPageEmailAlert(String emailAddress) {
this.emailAddress = emailAddress;
}

@Override
public void init(ProcessorContext context) {
this.context = context;
// Here you would perform any additional initializations
// such as setting up an email client.
}

@Override
void process(PageId pageId, Long count) {
// Here would format and send the alert email.
//
// In this specific example, you would be able to include information
// about the page's ID and its view count (because the class implements
// `Processor<PageId, Long>`).
}

@Override
void punctuate(long timestamp) {
// Stays empty. In this use case there would be no need for a periodical
// action of this processor.
}

@Override
void close() {
// Any code for clean up would go here.
// This processor instance will not be used again after this call.
}
}

就像前面提到的,一个流处理算子可以通过调用ProcessorContext#getStateStore()方法访问任何可用的状态存储。可用指的是:这些状态存储的名称在调用KStream#process()方法时指定(注意和Processor#process()方法不同,KStream#process()是在构造拓扑时定义)。

WRITING STREAMS BACK TO KAFKA

Any streams may be (continuously) written back to a Kafka topic via KStream#to() and KTable#to().

任何的流都可能会通过KStream#to()和KTable#to()方法持续地(将记录)写到Kafka主题中。

1
2
// Write the stream userCountByRegion to the output topic 'RegionCountsTopic'
userCountByRegion.to("RegionCountsTopic");

Best practice: It is strongly recommended to manually create output topics ahead of time rather than relying on auto-creation of topics. First, auto-creation of topics may be disabled in your Kafka cluster. Second, auto-creation will always apply the default topic settings such as the replicaton factor, and these default settings might not be what you want for certain output topics (cf. auto.create.topics.enable=true in the Kafka broker configuration).

最佳实践:推荐手动创建输出主题而不是依赖于自动创建。首先自动创建主题可能会被你的Kafka集群禁用掉;其二,自动创建会运用一些默认的设置,比如副本因子,而这些默认的设置可能在某些输出主题上不是你想要的。

If your application needs to continue reading and processing the records after they have been written to a topic via to() above, one option is to construct a new stream that reads from the output topic:

如果你的应用程序需要持续读取并处理的记录是通过to()方法写到的输出主题,一种方式是从输出主题中构造新的流:

1
2
3
4
5
6
// Write to a Kafka topic.
userCountByRegion.to("RegionCountsTopic");

// Read from the same Kafka topic by constructing a new stream from the
// topic RegionCountsTopic, and then begin processing it (here: via `map`)
builder.stream("RegionCountsTopic").map(...)...;

Kafka Streams provides a convenience method called through() that is equivalent to the code above:

Kafka Streams还提供了一种方便的方式:调用through()方法和上面的代码是类似的:

1
2
// `through` combines write-to-Kafka-topic and read-from-same-Kafka-topic operations
userCountByRegion.through("RegionCountsTopic").map(...)...;

Whenever data is read from or written to a Kafka topic, Streams must know the serdes to be used for the respective data records. By default the to() and through() methods use the default serdes defined in the Streams configuration. You can override these default serdes by passing explicit serdes to the to() and through() methods.

当数据从一个Kafka主题读取或者写入时,流必须知道记录使用的序列化方式,默认to()和through()方法使用了Streams配置中默认的序列化方式。你可以通过传递明确的序列化器给to()和through()方法来覆写这些默认的配置。

Besides writing the data back to Kafka, users can also apply a custom processor as mentioned above to write to any other external stores, for example, to materialize a data store, as stream sinks at the end of the processing.

除了将数据写回到Kafka,用户也可以像上面提到的那样运用一个自定义的处理器,并写到其他的外部存储介质中,比如物化数据存储,作为流处理的目标。

运行流处理程序

A Java application that uses the Kafka Streams library can be run just like any other Java application – there is no special magic or requirement on the side of Kafka Streams.

一个使用Kafka Streams客户端库的Java应用程序,它的运行方式和其他普通的Java应用程序一样,没有特殊的魔法,也没有额外的限制。比如你可以将你的Java应用程序打成一个fat jar包,然后启动:

1
2
3
# Start the application in class `com.example.MyStreamsApp`
# from the fat jar named `path-to-app-fatjar.jar`.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp

It is important to understand that, when starting your application as described above, you are actually launching what Kafka Streams considers to be one instance of your application. More than one instance of your application may be running at a time, and in fact the common scenario is that there are indeed multiple instances of your application running in parallel. See Parallelism Model for further information.

当启动应用程序时,在Kafka Streams看来,是启动了应用程序的一个实例。可能在同一个时间点,你的应用程序会同时运行多个实例,而实际上通常的场景的确是你的多个应用程序实例是并行执行的。

水平扩展你的应用程序

Kafka Streams makes your stream processing applications elastic and scalable: you can add and remove processing capacity dynamically during the runtime of your application, and you can do so without any downtime or data loss. This means that, unlike other stream processing technologies, with Kafka Streams you do not have to completely stop your application, recompile/reconfigure, and then restart. This is great not just for intentionally(故意地) adding or removing processing capacity, but also for being resilient(有弹性的) in the face of failures (e.g. machine crashes, network outages) and for allowing maintenance work (e.g. rolling upgrades).

Kafka Streams会让你的流处理应用程序可伸缩和可扩展。你可以在应用程序的运行时动态地添加或删除流处理能力,并且不需要停机维护,也不会丢失数据。这意味着,和其他流处理技术不同,使用Kafka Streams你不需要完全地停止你的应用程序,重新编译,重新配置,然后重启。对于故意地添加或删除处理能力,或者失败时的弹性机制,以及维护工作都是有好处的。

If you are wondering how this elasticity is actually achieved behind the scenes, you may want to read the Architecture chapter, notably the Parallelism Model section. In a nutshell(概括), Kafka Streams leverages existing functionality in Kafka, notably its group management functionality. This group management, which is built right into the Kafka wire protocol, is the foundation that enables the elasticity of Kafka Streams applications: members of a group will coordinate and collaborate(合作) jointly(共同地) on the consumption and processing of data in Kafka. On top of this foundation Kafka Streams provides some additional functionality, e.g. to enable stateful processing and to allow for fault-tolerante state in environment where application instances may come and go at any time.

如果你对如何实现扩展能力的背后机制感兴趣,可以阅读架构章节,尤其是并行模型那一部分。简单来说,Kafka Streams利用了Kafka已有的特性,尤其是组管理协议的功能。组管理协议构建在Kafka的协议之上,是Kafka Streams应用程序具有可伸缩性的基础:组的成员会协调合作,共同消费和处理Kafka中的数据。基于这些基础,Kafka Streams还提供了额外的功能,比如有状态的处理,以及在应用程序实例随时添加和删除的环境中,允许故障容错的状态存储。

增加处理能力(Expand)

If you need more processing capacity for your stream processing application, you can simply start another instance of your stream processing application, e.g. on another machine, in order to scale out. The instances of your application will become aware of each other and automatically begin to share the processing work. More specifically, what will be handed over from the existing instances to the new instances is (some of) the stream tasks that have been run by the existing instances. Moving stream tasks from one instance to another results in moving the processing work plus any internal state of these stream tasks (the state of a stream task will be re-created in the target instance by restoring the state from its corresponding changelog topic).

如果你需要为流处理应用程序添加更多的处理能力,你只需要在其他机器上简单地启动新的流处理应用程序实例,来达到扩展的目的。应用程序的所有示例都会彼此感知,并且自动地开始共享处理工作。更具体地说,从已有的实例移交给新实例的工作是在已有实例上运行的流任务。将流任务从一个实例移动到另一个实例的结果是,将这些流任务的处理工作以及内部状态都一起移动(一个流任务的状态会在目标实例中重新创建,从对应的变更日志主题中恢复数据来构造状态)。

The various instances of your application each run in their own JVM process, which means that each instance can leverage all the processing capacity that is available to their respective JVM process (minus the capacity that any non-Kafka-Streams part of your application may be using). This explains why running additional instances will grant your application additional processing capacity. The exact capacity you will be adding by running a new instance depends of course on the environment in which the new instance runs: available CPU cores, available main memory and Java heap space, local storage, network bandwidth, and so on. Similarly, if you stop any of the running instances of your application, then you are removing and freeing up the respective processing capacity.

应用程序的不同实例运行在它们各自的JVM进程中,这意味着每个实例可以利用它们对应的JVM进程的所有处理能力/资源(减去应用程序中不是Kafka Streams部分使用的资源)。这就解释了为什么运行额外的实例可以提升应用程序的处理能力。不过运行一个新实例期望新增加的处理能力当然会和新实例所在的环境有关:比如可用的CPU核,可用的主内存和Java堆空间大小,本地存储,网络带宽等等。同样,如果你停止了运行中的任意一个实例,你就删除并且释放掉相应的处理能力。

kstream expand

Before adding capacity: only a single instance of your Kafka Streams application is running. At this point the corresponding Kafka consumer group of your application contains only a single member (this instance). All data is being read and processed by this single instance.

After adding capacity: now two additional instances of your Kafka Streams application are running, and they have automatically joined the application’s Kafka consumer group for a total of three current members. These three instances are automatically splitting the processing work between each other. The splitting is based on the Kafka topic partitions from which data is being read.

上图左边是添加处理能力之前,只有一个Kafka Streams应用程序实例在运行,这时你的应用程序对应的Kafka消费组只有一个成员(就是这个实例),所有的数据都通过这个唯一的实例读取并处理。右图是添加处理能力之后,现在增加了两个额外的应用程序运行实例,而且它们自动加入到应用程序对应的Kafka消费组中,这个消费组目前总共有3个成员。这三个实例两两之间都会自动地均摊处理工作。分摊是基于Kafka的主题分区,即(每个实例)从不同分区读取不同的数据。

减少处理能力(Shrink)

If you need less processing capacity for your stream processing application, you can simply stop one or more running instances of your stream processing application, e.g. shut down 2 of 4 running instances. The remaining instances of your application will become aware that other instances were stopped and automatically take over the processing work of the stopped instances. More specifically, what will be handed over from the stopped instances to the remaining instances is the stream tasks that were run by the stopped instances. Moving stream tasks from one instance to another results in moving the processing work plus any internal state of these stream tasks (the state of a stream task will be re-created in the target instance by restoring the state from its corresponding changelog topic).

如果你的流处理应用程序需要较少的处理能力,你只需要停止一个或多个运行的流处理应用程序即可,比如将4个运行的实例关闭掉2个。剩余的应用程序实例会感知到其他实例已经被停止了,并且会自动接管这些停掉示例的处理工作。更具体的来说,从停止实例移交给剩余实例的工作是在停止实例上运行的流任务。将流任务从一个实例移动到另一个实例的结果是,将这些流任务的处理工作以及内部状态都一起移动。

kstream shrink

If one of the application instances is stopped (e.g. intentional reduction of capacity, maintenance, machine failure), it will automatically leave the application’s consumer group, which causes the remaining instances to automatically take over the stopped instance’s processing work.

图中如果停止(故意减少容量,维护,或者机器故障都可能停止)了一个应用程序实例,它就会自动离开应用程序的消费组,并导致剩余的示例自动接管这个停止实例的处理工作。

运行多少个应用程序实例

How many instances can or should you run for your application? Is there an upper limit for the number of instances and, similarly, for the parallelism of your application? In a nutshell, the parallelism of a Kafka Streams application – similar to the parallelism of Kafka – is primarily determined by the number of partitions of the input topic(s) from which your application is reading. For example, if your application reads from a single topic that has 10 partitions, then you can run up to 10 instances of your applications (note that you can run further instances but these will be idle).

The number of topic partitions is the upper limit for the parallelism of your Kafka Streams application and thus for the number of running instances of your application.

那么到底可以或者应该运行多少个应用程序实例?是否有一个数量的上限,来并行化你的应用程序?简单来说,一个Kafka Streams应用程序的并行度,类似于Kafka的并行度,主要的决定因素是:应用程序所读取的输入主题的分区数量。比如你的应用程序读取的一个主题有10个分区,那么你就可以运行最多10个应用程序实例(虽然你可以运行更多的实例,但是会有一些实例是空闲的)。所以,主题分区的数量是你的流处理应用程序并行度的上限,因此也是你的应用程序运行实例的上限。

How to achieve(获得) a balanced processing workload across application instances to prevent processing hotpots: The balance of the processing work between application instances depends on factors such as how well data messages are balanced between partitions (think: if you have 2 topic partitions, having 1 million messages in each partition is better than having 2 million messages in the first partition and no messages in the second) and how much processing capacity is required to process the messages (think: if the time to process messages varies heavily, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition).

那么怎么在应用程序实例之前来保证处理的负载是平衡的,防止发生处理热点。工作负载是否平衡的决定因素是分区的数据有多平衡(比如你有两个分区,每个分区有一百万条消息要比一个分区有两百万条消息,而另外一个分区没有一条消息要好的多),以及消息的处理能力(比如处理消息的时间变化很大,那么将处理比较耗时的消息分散在多个分区,要比这些消息都存储在一个分区也要好得多)。

If your data happens to be heavily skewed(倾斜) in the way described above, some application instances may become processing hotspots (say, when most messages would end up being stored in only 1 of 10 partitions, then the application instance that is processing this one partition would be performing most of the work while other instances might be idle). You can minimize the likelihood of such hotspots by ensuring better data balancing across partitions (i.e. minimizing data skew at the point in time when data is being written to the input topics in Kafka) and by over-partitioning the input topics (think: use 50 or 100 partitions instead of just 10), which lowers the probability that a small subset of partitions will end up storing most of the topic’s data.

如果你的数据恰巧倾斜的很严重,有一些应用程序实例就会变成处理热点。你可以通过确保数据在分区之间有更好的平衡来最小化出现这种热点的可能性(比如当大部分的消息都只存储在10个分区中的一个时,那么处理这个分区的应用程序实例就会处理大部分的工作,而其他实例则可能很空闲),或者对输入主题采用更多的分区数(比如在数据写入到Kafka的输入主题是就尽量最小化数据的倾斜)来减少一个很小的分区字节存储了大部分主题数据的这种可能性。

数据类型和序列化

应用重置工具

The Application Reset Tool allows you to quickly reset an application in order to reprocess its data from scratch – think: an application “reset” button. Scenarios when would you like to reset an application include:

应用程序重置工具允许你快速地重置一个应用程序,然后重新处理数据,可以认为是应用程序的一个“重置”按钮。需要重置应用程序的场景包括:

  • Development and testing 开发和测试时
  • Addressing bugs in production 在生产环境定位问题时
  • Demos 演示

However, resetting an application manually is a bit involved. Kafka Streams is designed to hide many details about operator state, fault-tolerance, and internal topic management from the user (especially when using Kafka Streams DSL). In general this hiding of details is very desirable(值得要的,令人满意的) but it also makes manually resetting an application more difficult. The Application Reset Tool fills this gap and allows you to quickly reset an application.

不过,手动方式重置一个应用程序需要做很多工作。Kafka Streams提供了一个重置工具,帮你隐藏了很多细节,比如操作状态,故障容错,内部的主题管理。

用户主题和内部主题

在Kafka Streams中,我们会区分用户主题和内部主题,这两种都是普通的Kafka主题,不过对于内部主题,有一些特定的命名约定。

用户主题包括输入主题、输出主题、临时主题。这些主题是用户创建或管理的,包括应用程序的输入和输出主题,以及通过through()方法指定的临时主题,临时主题实际上同时既是输出也是输入主题。

内部主题是由Kafka Streams底层自动创建的。比如针对状态存储的变更日志主题就是一个内部主题。内部主题的命名约定是:<application.id>--

应用程序重置工具做的工作有:

  1. 对输入主题:重置应用程序所有分区的消费者提交偏移量到0
  2. 对临时主题:跳到主题的最后,比如设置应用程序的消费者提交偏移量到每个分区的logSize(实际上就是分区的最后位置)
  3. 对内部主题:除了重置偏移量到0,还要删除内部主题

应用程序重置工具不做的:

  1. 不会重置应用程序的输出主题。如果任何的输出主题或者临时主题被下游的应用程序消费,那么调整这些下游应用程序是你自己的责任
  2. 不会重置应用程序实例的本地环境。同样删除应用程序实例运行所在机器的本地状态,也是你自己的责任。

步骤1:运行重置工具

执行bin/kafka-streams-application-reset命令,需要指定如下的参数:

1
2
3
4
5
6
7
8
9
Option (* = required)         Description
--------------------- -----------
* --application-id <id> The Kafka Streams application ID (application.id)
--bootstrap-servers <urls> Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2
(default: localhost:9092)
--intermediate-topics <list> Comma-separated list of intermediate user topics
--input-topics <list> Comma-separated list of user input topics
--zookeeper <url> Format: HOST:POST
(default: localhost:2181)

You can combine the parameters of the script as needed. For example, if an application should only restart from an empty internal state but not reprocess previous data, simply omit the parameters –input-topics and –intermediate-topics.

你可以任意组合上面的参数,比如如果应用程序只会从空的内部状态重启,不会重新处理已有的数据,可以忽略输入主题和临时主题这两个参数。

On intermediate topics: In general, we recommend to manually delete and re-create any intermediate topics before running the application reset tool. This allows to free disk space in Kafka brokers early on. It is important to first delete and re-create intermediate topics before running the application reset tool.

Not deleting intermediate topics and only using the application reset tool is preferable:

  • when there are external downstream consumers for the application’s intermediate topics
  • during development, where manually deleting and re-creating intermediate topics might be cumbersome and often unnecessary

关于临时主题:通常我们推荐在运行重置工具之前,手动删除然后重建临时主题。这样可以尽早释放Kafka的集群磁盘空间。什么时候不需要删除临时主题:

  1. 存在外部的下游消费者订阅了应用程序的临时主题
  2. 在开发环境,手动删除并重建主题可能很繁琐,而且通常没有必要这么做

步骤2:重置本地环境

Running the application reset tool (step 1) ensures that your application’s state – as tracked globally in the application’s configured Kafka cluster – is reset. However, by design the reset tool does not modify or reset the local environment of your application instances, which includes the application’s local state directory.

For a complete application reset you must also delete the application’s local state directory on any machines on which an application instance was run prior to restarting an application instance on the same machines. You can either use the API method KafkaStreams#cleanUp() in your application code or manually delete the corresponding local state directory (default location: /var/lib/kafka-streams/<application.id>, cf. state.dir configuration parameter).

运行步骤1的应用程序重置工具确保你的应用程序状态被重置(应用程序的状态实际上是在应用程序配置的Kafka集群被全局地跟踪)。不过,重置工具被设计的时候,并不会修改或重置应用程序实例的本地环境,包括应用程序的本地状态目录。

如果要彻底重置应用程序,你必须删除应用程序的本地状态目录,而且任何之前运行过的应用程序实例所在的机器都需要在重启应用程序实例之前删除干净。你可以在应用程序中调用KafkaStreams#cleanUp()方法,或者手动删除对应的本地状态目录(默认的路径是: /var/lib/kafka-streams/<application.id>,即state.dir的配置参数)来清理。

示例

Let’s imagine you are developing and testing an application locally and want to iteratively improve your application via run-reset-modify cycles. You might have code such as the following:

假设你在本地开发和测试一个应用程序,并且通过运行-重置-修改的循环开发模式来不断迭代提升你的应用程序,你可能需要这样的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ResetDemo {
public static void main(String[] args) throws Exception {
// Kafka Streams configuration
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
// ...and so on...

// Define the processing topology
KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic")
.selectKey(...)
.through("rekeyed-topic")
.countByKey("global-count")
.to("my-output-topic");

KafkaStreams app = new KafkaStreams(builder, streamsConfiguration);

// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions. See tip on `cleanUp()` below.
app.cleanUp();

app.start();

// Note: In real applications you would register a shutdown hook
// that would trigger the call to `app.close()` rather than
// using the sleep-then-close example we show here.
Thread.sleep(30 * 1000L);
app.close();
}
}

Calling cleanUp() is safe but do so judiciously(明智的): It is always safe to call KafkaStreams#cleanUp() because the local state of an application instance can be recovered from the underlying internal changelog topic(s). However, to avoid the corresponding recovery overhead it is recommended to not call cleanUp() unconditionally and every time an application instance is restarted/resumed. A production application would therefore use e.g. command line arguments to enable/disable the cleanUp() call as needed.

调用cleanUp()方法是安全的,不过要谨慎调用:调用KafkaStreams#cleanUp()总是安全的,因为应用程序实例的本队状态可以从底层的内部变更日志主题恢复过来。不过,为了防止由此产生的恢复开销,推荐不要盲目调用cleanUp(),或者说在每次应用程序重启/恢复的时候就调用cleanUp()。生产环境下的应用程序因此会使用命令行的参数来允许或禁止调用cleanUp()。

然后你就可以执行如下的“run-reset-modify”循环:

1
2
3
4
5
6
7
8
9
10
11
# Run your application
$ bin/kafka-run-class io.confluent.examples.streams.ResetDemo

# After stopping all application instances, reset the application
$ bin/kafka-streams-application-reset --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic

# Now you can modify/recompile as needed and then re-run the application again.
# You can also experiment, for example, with different input data without
# modifying the application.

EOF. 翻译完毕 @2016.11.5

重置流处理应用程序

Confluent关于重置的实现翻译:https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

示例

As a running example we assume you have the following Kafka Streams application, and we subsequently demonstrate (1) how to make this application “reset ready” and (2) how to perform an actual application reset.

下面的Kafka Streams应用程序,我们会展示两个步骤:1)怎么让应用程序开始准备重置,2)怎么执行真正的应用程序重置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ResetDemo {
public static void main(final String[] args) throws Exception {
// Kafka Streams configuration
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
// make sure to consume the complete topic via "auto.offset.reset = earliest"
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...and so on...

// define the processing topology
final KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic")
.selectKey(...)
.through("rekeyed-topic")
.countByKey("global-count")
.to("my-output-topic");

// ...run application...
}
}

This application reads data from the input topic “my-input-topic”, then selects a new record key, and then writes the result into the intermediate topic “rekeyed-topic” for the purpose of data re-partitioning. Subsequently, the re-partitioned data is aggregated by a count operator, and the final result is written to the output topic “my-output-topic”. Note that in this blog post we don’t put the focus on what this topology is actually doing — the point is to have a running example of a typical topology that has input topics, intermediate topics, and output topics.

这个应用程序会从输入主题“my-input-topic”读取数据,然后选择新的记录key,将结果写到临时的主题“rekeyed-topic”,目的是将数据重新分区。随后,重新分区的数据会通过count算子被聚合,最终的结果写到输出主题“my-output-topic”。本篇博客我们不会将重点放到拓扑是怎么工作的,重点是典型的拓扑,会有输入主题,临时主题,输出主题。

Step 1: Prepare your application for resets

The first step is to make your application “reset ready”. For this, the only thing you need to do is to include a call to KafkaStreams#cleanUp() in your application code (for details about cleanUp() see Section ”Application Reset Tool Details”).

Calling cleanUp() is required because resetting a Streams applications consists of two parts: global reset and local reset. The global reset is covered by the new application reset tool (see “Step 2”), and the local reset is performed through the Kafka Streams API. Because it is a local reset, it must be performed locally for each instance of your application. Thus, embedding it in your application code is the most convenient way for a developer to perform a local reset of an application (instance).

步骤1是让你的应用程序准备好重置,你需要做的是在你的应用程序代码包含对KafkaStreams#cleanUp()方法的调用。调用cleanUp()是必要的,因为重置一个流应用程序包括两个部分:全局的重置和本地的重置。全局的重置会通过新的应用程序重置工具完成,本地的重置是通过Kafka Streams的API完成的。因为它(即调用Kafka Streams API)是一个本地的重置,你的应用程序的每个实例都应该本地地执行。因此将它嵌入到应用程序代码中,对于开发者而言这是本队重置一个应用程序实例的最方便做法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ResetDemo {
public static void main(final String[] args) throws Exception {
// ...prepare your application configuration and processing topology...

final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);

// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under certain conditions.
// See Confluent Docs for more details:
// http://docs.confluent.io/3.0.1/streams/developer-guide.html#step-2-reset-the-local-environments-of-your-application-instances
streams.cleanUp();

streams.start();

// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
}
}));
}
}

At this point the application is ready for being reset (when needed), and you’d start one or multiple instances of your application as usual, possibly on different hosts.

现在应用程序已经准备好重置了,你启动一个或多个应用程序的方式跟平常一样,可能会在不同的节点启动多个实例。

1
2
3
4
5
6
# Run your application
# For the sake of this short blog post we use the Kafka helper script
# `kafka-run-class` here, but your mileage may vary.
# You could also, for example, call `java …` directly. Remember that
# an application that uses the Kafka Streams library is a standard Java application.
$ bin/kafka-run-class io.confluent.examples.streams.ResetDemo

Step 2: Reset the application

So what would we need to do to restart this application from scratch, i.e., not resume the processing from the point the application was stopped before, but rather to reprocess all its input data again?

First you must stop all running application instances and make sure the whole consumer group is not active anymore (you can use bin/kafka-consumer-groups to list active consumer groups). Typically, the consumer group should become inactive one minute after you stopped all the application instances. This is important because the reset behavior is undefined if you use the reset tool while some application instances are still running — the running instances might produce wrong results or even crash.

那么我们怎么从头开始重启这个应用程序呢,比如不是从应用程序上次停止的地方继续回复处理,而是重新处理所有的输入数据?

首先,你必须要停止所有正在运行的应用程序实例,确保整个消费组是不活动的(你可以使用kafka-consumer-groups来列出仍然存活的消费组)。通常,消费组会在你停止所有的应用程序实例之后的一分钟状态才为不活动。这非常重要,如果你使用了重置工具,但同时有一些应用程序仍然在运行,重置这个动作是不确定的,因为正在运行的示例可能会产生错误的结果,甚至挂掉(实际上就是说先停止所有的应用程序实例,然后检查消费组不活动,最后才可以使用重置工具)。

Once all application instances are stopped you can call the application reset tool as follows:

当所有的应用程序实例都停止后,你可以像下面那样使用应用程序重置工具:

1
2
3
4
5
6
# After stopping all application instances, reset the application
$ bin/kafka-streams-application-reset --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic \
--bootstrap-servers brokerHost:9092 \
--zookeeper zookeeperHost:2181

As you can see, you only need to provide the application ID (“my-streams-app”) as specified in your application configuration, and the names of all input and intermediate topics. Furthermore, you might need to specify Kafka connection information (bootstrap servers) and Zookeeper connection information. Both parameters have default values localhost:9092 and localhost:2181, respectively. Those are convenient to use during development with a local single Zookeeper/single broker setup. For production/remote usage you need to provide appropriate host:port values.

可以看到,你需要提供一个应用程序编号(“my-streams-app”,这是在你的应用程序配置中指定的),以及所有的输入主题和临时主题的名称。此外,你可能还需要指定Kafka以及ZooKeeper的连接信息。

Once the application reset tool has completed its run (and its internal consumer is not active anymore), you can restart your application as usual, and it will now reprocess its input data from scratch again. We’re done!

一旦应用程序重置工具完成后,你可以像平常那样重启你的应用程序了,现在它就会从头开始重新处理输入数据了!

It’s important to highlight that, to prevent possible collateral(并行,附属) damage, the application reset tool does not reset the output topics of an application. If any output (or intermediate) topics are consumed by downstream applications, it is your responsibility to adjust those downstream applications as appropriate when you reset the upstream application.

很重要的一点是,为了防止可能的附属损坏,应用程序重置工具并不会重置输出主题。如果有任何的输出主题(或者临时主题)被下游的应用程序所消费,那么这就是你的责任来调整这些下游的应用程序了(当你重置上游的应用程序时)。

Use application reset tool with care and double-check its parameters: If you provide wrong parameter values (e.g. typos in application.id) or specify parameters inconsistently (e.g. specifying the wrong input topics for the application), this tool might invalidate the application’s state or even impact other applications, consumer groups, or Kafka topics of your Kafka cluster.

使用应用程序重置工具要很小心,并且要仔细检查它的参数:如果你提供了一个错误的参数值(比如应用程序编号写错了),或者指定了不一致的参数(比如指定了这个应用程序错误的输入主题),该工具可能会使得应用程序的状态失效,甚至影响其他的应用程序,消费组,或者你的Kafka集群的主题。

As we have shown above, using the new application reset tool you can easily reprocess data from scratch with Kafka Streams. Perhaps somewhat surprisingly, there’s actually a lot going on behind the scenes to make application resets work as easily. The following sections are a deep dive on these internals for the curious reader.

使用新的应用程序重置工具,你可以使用Kafka Streams很容易地从头开始重新处理数据。你可能会觉得有点奇怪,不过实际上为了让应用程序重置工作的很简单,后台做了很多的工作。

Behind the Scenes of Kafka Streams

In this second part of the blog post we discuss those Kafka Streams internals that are required to understand the details of a proper application reset. Figure 1 shows a Kafka Streams application before its first run. The topology has as single input topic with two partitions. The current offset of each partition is zero (or there is no committed offsets and parameter auto.offset.reset = earliest is used). Also the topology writes into a single output topic with two partitions which are both empty. No offsets are shown as output topics are not consumed by the application itself. Furthermore, the topology contains a call to through(), thus, it writes/reads into/from an additional (intermediate) topic with two empty partitions and it contains a stateful operator.

下图展示了一个Kafka Streams应用程序第一次运行之前的状态。这个拓扑只有一个输入主题(两个分区)。当前每个分区的偏移量是0(或者说没有提交偏移量,而且使用了参数auto.offset.reset = earliest,表示没有偏移量或者偏移量超出时,会重置到分区的最开始位置,即偏移量=0的位置)。同时拓扑还会写到一个输出主题,同样也有两个分区,现在都还是空的。还没有偏移量,因为输出主题还没有从应用程序中消费数据。另外,拓扑包括了through()调用,因此它会写入/读取临时主题,这个临时主题也有两个空的分区,并且包括了有状态的操作算子。

After the Kafka Streams application was executed and stopped, the application state changed as shown in Figure 2. In the following we discuss the relevant parts of Figure 2 with regard to reprocessing.

Figure 2: The application after it was stopped. The small vertical arrows denote committed consumer offsets for the input and intermediate topics (colors denote the corresponding sub-topology). You can see, for example, that sub-topology A has so far written more data to the intermediate topic than sub-topology B has been able to consume (e.g. the last message written to partition 1 has offset 7, but B has only consumed messages up to offset 4). Also, sub-topology performs stateful operations and thus has created a local state store and an accompanying(伴随) internal changelog topic for this state stores.

当Kafka Streams应用程序执行(一段时间后),然后停止,应用程序的状态改变如下图。(分区上的)垂直箭头表示输入主题和临时主题的消费者提交偏移量。可以看到子拓扑A写入到临时主题的数据要比子拓扑B已经消费者的数据要多很多(A的临时主题分区1最近写入消息的偏移量是7,但是B只消费到了偏移量4的位置)。同时,子拓扑因为执行了有状态的操作,所以创建了一个本地状态存储,伴随了针对这个状态存储的内部变更日志主题。

reprocessing Input Topics

Kafka Streams builds upon existing Kafka functionality to provide scalability and elasticity, security, fault-tolerance, and more. For reprocessing input topics from scratch, one important concern is Kafka Streams’ fault-tolerance mechanism. If an application is stopped and restarted, per default it does not reread previously processed data again, but it resumes processing where it left off when it was stopped (cf. committed input topic offsets in Figure 2).
Internally, Kafka Streams leverages Kafka’s consumer client to read input topics and to commit offsets of processed messages in regular intervals (see commit.interval.ms). Thus, on restart an application does not reprocess the data from its previous run. In case that a topic was already processed completely, the application will start up but then be idle and wait until new data is available for processing. So one of the steps we need to take care of when manually resetting an application is to ensure that input topics are reread from scratch. However, this step alone is not sufficient to get a correct reprocessing result.

Kafka Streams构建在Kafka已有的功能之上来提供扩展性、伸缩性、安全性、故障容错等等。对于从头开始重新处理输入主题,一个重要的关注点是Kafka Streams的故障容错机制。如果一个应用程序停止并重启,默认情况下它不会重新读取已经处理过的数据,而是从上一次停止时离开的地方恢复处理(图2中为输入主题提交偏移量)。内部实现中,Kafka Streams利用了Kafka的消费者客户端读取输入主题,然后定期地为处理完的消息提交偏移量。因此应用程序重启的时候,不会重新处理上一次运行过的数据。如果有一种场景是:一个主题已经都被处理完成了,应用程序启动之后会空转,直到有新数据可用时才(应用程序才)会处理。所以当手动重置一个应用程序时,要非常小心的一个步骤是:确保输入主题从头开始重新读取。不过,单独这个步骤并不足以得到一个正确的重新处理的结果。

Sub-Topologies and Internal State in Kafka Streams

First, an application can consist of multiple sub-topologies that are connected via intermediate or internal topics (cf. Figure 2 with two sub-topologies A and B). In Kafka Streams, intermediate topics are user-specified topics that are used as both an input and an output topic within a single application (e.g., a topic that is used in a call to through()). Internal topics are those topics that are created by Kafka Streams “under the hood” (e.g., internal repartitioning topics which are basically internal intermediate topics).

首先,一个应用程序可以包含多个子拓扑,它们会通过临时或中间主题互相连接(比如图2中两个子拓扑A和B是通过临时主题相连接在一起的)。在Kafka Streams中,临时的主题是用户指定的,在一个应用程序中会同时作为输入和输出主题(通过调用through()方法定义的就是临时主题)。内部的主题是由Kafka Streams底层创建的(内部重新分区的主题,基本上是内部的临时主题)。

If there are multiple sub-topologies, it might happen that an upstream sub-topology produces records faster into intermediate topics than a downstream sub-topology can consume (cf. committed intermediate topic offsets in Figure 2). Such a consumption delta (cf. the notion of consumer lag in Kafka) within an application would cause problems in the context of an application reset because, after an application restart, the downstream sub-topology would resume reading from intermediate topics from the point where it stopped before the restart. While this behavior is very much desired during normal operations of your application, it would lead to data inconsistencies when resetting an application. For a proper application reset we must therefore tell the application to skip to the very end of any intermediate topics.

如果有多个子拓扑,有可能会发生上游的子拓扑生产记录到临时主题,要比下游子拓扑消费的速度快(比如图2中临时主题的提交偏移量)。应用程序的这种消费差距(即Kafka中消费者的落后进度,用lag表示)在一个应用程序的重置场景下可能会导致出现问题,因为当一个应用程序重启后,下游的子拓扑会在重启之前的上一次停止位置,从临时主题恢复读取数据。虽然这种行为在应用程序正常的操作时是你非常想要的结果,但是在重置一个应用程序时,则可能会导致数据的不一致性。对于正确的应用程序重置方式,我们必须告诉应用程序跳到任何一个临时主题的最后位置。

Second, for any stateful operation like aggregations or joins, the internal state of these operations is written to a local state store that is backed by an internal changelog topic (cf. sub-topology B in Figure 2). On application restart, Kafka Streams “detects” these changelog topics and any existing local state data, and it ensures that the internal state is fully built up and ready before the actual processing starts. To reset an application we must therefore also reset the application’s internal state, which means we must delete all its local state stores and their corresponding internal changelog topics.

其次,对于任何有状态的操作比如聚合或联合,这些操作的内部状态会被写入到一个临时状态存储中,这个存储也依赖于一个内部的变更日志主题(比如图2的子拓扑B)。在应用程序重启是,Kafka Streams会检测到这些变更日志流,以及已经存在的本地状态数据,它会确保内部状态被完整地构建起来,并且会在实际的处理开始之前准备完毕。所以为了重置一个应用程序,我们也必须要重置应用程序的内部状态,这意味着我们必须要删除所有的本地状态存储,以及对应的内部变更日志主题。

If you are interested in more details than we could cover in this blog post, please take a look at Kafka Streams: Internal Data Management in the Apache Kafka wiki.

Resetting a Kafka Streams Application Manually

In order to reprocess topics from scratch, it is required to reset the application state that consists of multiple parts as described above:
为了从头开始重新处理主题,重置应用程序的状态要重置以下数据:

  1. committed offsets of input topics 输入主题的提交偏移量
  2. committed offsets of intermediate topics 临时主题的提交偏移量
  3. content and committed offsets of internal topics 内容,以及内部主题的提交偏移量
  4. local state store 本地状态存储

Figure 3: The application after reset: (1) input topic offsets were reset to zero (2) intermediate topic offsets were advanced to end (3) internal topics were deleted (4) any local state stores were deleted (5) the output topic was not modified.

图3中应用程序在重置后:1)输入主题的偏移量被重置为0;2)临时主题的偏移量被跳跃到最后;3)内部主题被删除;4)任何的本地状态存储被删除;5)输出主题不会被修改

Committed offsets of input topics: Internally, Kafka Streams leverages Kafka’s consumer client to read a topic and to commit offsets of processed messages in regular intervals (see commit.interval.ms). Thus, as a first step to reprocess data, the committed offsets need to be reset. This can be accomplished as follows: Write a special Kafka client application (e.g., leveraging Kafka’s Java consumer client or any other available language) that uses the application.id of your Kafka Streams application as its consumer group ID. The only thing this special client application does is to seek to offset zero for all partitions of all input topics and commit the offset (you should disable auto commit for this application). As this special application uses the same group ID as your Kafka Streams application (the application ID is used a consumer group ID internally), committing all offsets to zero allows your Streams application to consume its input topics from scratch when it is started again (cf. #1 in Figure 3).

输入主题的提交偏移量:在内部实现中,Kakfa Streams利用了Kafka消费者客户端来读取一个主题,并且定期地提交已经处理过的消息的偏移量。因此作为重新处理数据的第一个步骤,提交偏移量需要被重置。完成这一步可以这么做:编写一个特殊的Kafka消费者应用程序,使用流处理应用程序的application.id作为消费组名称。这个特殊的消费者唯一要做的事情是为所有输入主题的所有分区定位到偏移量=0,然后提交偏移量(这个客户端程序应该禁用自动提交偏移量)。由于这个特殊的消费者应用程序使用了和Kafka Streams应用程序相同的消费组编号(Kafka Streams在内部也会将application.id作为消费组编号),提交所有的偏移量到0,这样就允许你的流应用程序再次启动的时候可以从头开始消费输入主题(图3的步骤1)。

Intermediate topics: For intermediate topics we must ensure to not consume any data from previous application runs. The simplest and recommended way is to delete and recreate those intermediate topics (recall that it is recommended to create user-specified topics manually before you run a Kafka Streams application). Additionally, it is required to reset the offsets to zero for the recreated topics (same as for input topics). Resetting the offsets is important because the application would otherwise pick up those invalid offsets on restart.

对于临时主题,我们要保证不能从上次应用程序运行的地方消费任何数据。最简单和推荐的方式是删除并重建这些临时主题(回顾下前面我们也说过推荐在运行Kafka Streams应用程序之前手动创建用户指定的主题)。另外,也需要将重新创建的主题的偏移量重置到0,这非常重要,否则应用程序在重启的时候可能会获得无效的偏移量。

As an alternative(供替代的选择), it is also possible to only modify the committed offsets for intermediate topics. You should consider this less invasive(侵入性) approach when there are other consumers for intermediate topics and thus deleting the topics is not possible. However, in contrast to modifying the offsets of input topics or deleted intermediate topic, the offsets for kept intermediate topics must be set to the largest value (i.e., to the current log-size) instead of zero, thus skipping over any not-yet consumed data. This ensure that, on restart, only data from the new run will be consumed by the application (cf. #2 in Figure 3). This alternative approach is used by the application reset tool.

除了删除和重建临时主题这个选择外,也可以值修改临时主题的提交偏移量。当临时主题有其他消费者时,可以采用这种方案较少侵入性的方案,因为(主题有消费者时)删除主题是不可能的。不过和修改输入主题、删除主题不同的是,临时主题的偏移量必须要设置到最大的值(当前的logSize),而不是0,这样就会跳过还没有被消费的任意数据。确保了在重启的时候,只有新的数据才会被应用程序消费(图3中的步骤2),应用程序重置工具使用的就是这种方法(修改临时主题的偏移量到最大)。总结下重置临时主题有两种方案:

  1. 删除临时主题,重建临时主题,设置提交偏移量=0,提交偏移量
  2. 修改临时主题的提交偏移量到最大

Internal topics: Internal topics can simply be deleted (cf. #3 in Figure 3). As those are created automatically by Kafka Streams, the library can recreate them in the reprocessing case. Similar to deleting intermediate user topics, make sure that committed offsets are either deleted or set to zero.

内部主题可以简单地删除掉(图3的步骤3)。因为它们会被Kafka Streams自动创建,所以在重新处理的时候也会重新创建它们(内部主题)。和删除用户指定的临时主题一样,要确保提交偏移量要么删除,要么设置到0。

In order to delete those topics, you need to identify them. Kafka Streams creates two types of internal topics (repartitioning and state-backup) and uses the following naming convention (this naming convention could change in future releases however, which is one of the reasons we recommend the use of the application reset tool rather than manually resetting your applications):

为了删除内部主题,首先你要定位这些主题。Kafka Streams会创建两种类型的内部主题(重新分区和状态备份),并且会使用下面的命名约定(这个命名约定在未来的版本可能会变化,这也是推荐使用应用程序重置工具而不是手动重置应用程序的一个原因,因为应用程序重置工具可以帮我们隐藏这些内部细节,而手动重置则需要知道当前版本的命名约定):

  • --repartition 应用程序编号-操作符名称-repartition
  • --changelog 应用程序编号-操作符名称-changelog

比如以我们前面一开始的示例为例,会创建内部主题:“my-streams-app-global-count-changelog”,因为countByKey()方法的操作符名称被指定为“global-count”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ResetDemo {
public static void main(final String[] args) throws Exception {
// Kafka Streams configuration
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
// make sure to consume the complete topic via "auto.offset.reset = earliest"
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...and so on...

// define the processing topology
final KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic")
.selectKey(...)
.through("rekeyed-topic")
.countByKey("global-count")
.to("my-output-topic");

// ...run application...
}
}

Local State Stores: Similar to internal topics, local state stores can just be deleted (cf. #4 in Figure 3). They will be recreated automatically by Kafka Streams. All state of an application (instance) is stored in the application’s state directory (cf. parameter state.dir, with default value /var/lib/kafka-streams in Confluent Platform releases and /tmp/kafka-streams for Apache Kafka releases). Within the state store directory, each application has its own directory tree within a sub-folder that is named after the application ID. The simplest way to delete the state store for an application is therefore rm -rf <state.dir>/<application.id> (e.g., rm -rf /var/lib/kafka-streams/my-streams-app).

本地状态存储:类似于内部主题,本地状态存储也可以删除(图3的步骤4),它们也会被Kafka Streams自动重新创建。一个应用程序实例的所有状态都存储在应用程序的状态目录下(参数state.dir,默认是/tmp/kafka-streams)。在状态存储目录中,每个应用程序都有自己的目录树,子目录的名称是应用程序的编号。删除一个应用程序的状态存储最简单的方式是直接执行命令:rm -rf <state.dir>/<application.id>(比如rm -rf /tmp/kafka-streams/my-streams-app)。

After this discussion, we see that manually resetting a Stream application is cumbersome(累赘) and error-prone. Thus, we developed the new “Application Reset Tool” to simplify this process.

可以看到,手动重置一个流应用程序非常繁琐并且容易出错。因此我们开发了应用程序重置工具来简化这个流程。

Application Reset Tool Details

In more detail, the application reset tool (bin/kafka-streams-application-reset) performs the following actions (cf. Figure 3):

具体来说,应用程序重置工具(bin/kafka-streams-application-reset)执行了一下的操作(图3):

  1. for any specified input topic, it resets all offsets to zero 对任何指定的输入主题,重置所有的偏移量到0
  2. for any specified intermediate topic, seeks to the end for all partitions 对任何指定的临时主题,跳到所有分区的末尾
  3. for all internal topic 对所有的内部主题
    3.1 resets all offsets to zero 重置所有的偏移量到0
    3.2 deletes the topic 删除(内部)主题

To use the script, as a minimum you need to specify the application ID. For this case, only internal topics will be deleted. Additionally, you can specify input topics and/or intermediate topics. More details about resetting a Streams application, can be found in the Confluent documentation.

使用这个脚本,你最少必须要指定应用程序编号,这样只有内部主题会被删除。当然你也可以指定输入主题和临时主题(这样就会重置相应主题的偏移量)。

Pay attention, that the application reset tool only covers the “global reset” part. Additionally, to the global reset, for each application instance a local reset of the application state directory is required, too. (cf. #4 in Figure 3) This can be done directly within your application using the method KafkaStreams#cleanUp(). Calling cleanUp() is only valid as long as the application instance is not running (i.e., before start() or after close()).

注意:应用程序重置工具只覆盖“全局重置”的部分,除了全局重置,每个应用程序实例也需要本地重置应用程序的状态目录(图3的步骤4)。这可以通过在应用程序中调用KafkaStreams#cleanUp()来完成。调用cleanUp()方法只有在应用程序实例还没有运行的时候才是有效的(在start()之前,或者在close()之后)。

Because resetting the local state store is embedded in your code, there is no additional work to do for local reset — local reset is included in restarting an application instance. For global reset, a single run of the application reset tool is sufficient.

因为重置本地状态存储是内嵌在你的代码中的,所以本地重置没有额外的工作,即本地重置包含在重启应用程序实例的过程中。对于全局重置,只需要运行一次应用程序重置工具即可。

重置时指定了新的应用程序编号会发生什么?

Before we close we want to discuss what happens when you configure your Kafka Streams application to use a new application ID. Until now this has been a common workaround(变通方法) for resetting an application manually.

在结束本篇博文之前,我们想要讨论下当你配置Kafka Streams应用程序时使用了一个新的应用程序编号会发生什么事。目前为止,这种方式实际上也是手动重置应用程序的一种变通方法。

On the positive side, renaming the application ID does cause your application to reprocess its input data from scratch. Why? When a new application ID is used, the application does not have any committed offsets, internal topics, or local state associated with itself, because all of those use the application ID in some way to get linked to a Kafka Streams application. Of course, you also need to delete and recreate all intermediate user topics.

实际上,重新命名应用程序编号确实会让你的应用程序会从头开始重新处理输入数据。当使用了新的应用程序编号,新的应用没有任何的提交偏移量、内部主题、或相关联的本地状态,因为所有这些(数据)都使用应用程序编号的某种方式来和一个Kafka Streams应用程序相关联。当然,你还是需要删除并重建用户指定的所有内部主题。

So why all the fuss(小题大作) about resetting a Kafka Streams application if we could use this workaround?

那么如果我们可以使用这种变通的方法(重命名应用程序编号)来重置Kafka Streams应用程序,为什么还要小题大做(开发一个应用程序重置工具呢)?

First, resetting an application is more than just enabling it to reprocess its input data. An important part of the reset is to also clean-up all internal data that is created by a running application in the background. For example, all the internal topics (that are no longer used) consume storage space in your Kafka cluster if nobody deletes them. Second, the same clean-up must be performed for data written to the local state directories. If not explicitly deleted, disk storage is wasted on those machines that hosted an application instance. Last but not least there is all kind of metadata like topic names, consumer groups, committed offsets that are not used any more and linger around(游荡,苟延残喘). For those reasons, using a new application ID is considered nothing more than a crude workaround to reset a Kafka Streams application, and we wouldn’t recommend its use for production scenarios.

首先,重置一个应用程序并不仅仅是为了让它能重新处理输入数据。重置的一个重要部分是清理后台运行的应用程序创建的所有内部数据。比如所有不会再被使用的内部主题,如果没有人去删除它们,就会消耗Kafka集群的存储空间。其次,同样的清理工作也必须针对写入到本地状态存储目录的数据。如果没有显示删除,就会浪费运行应用程序实例的所在机器的磁盘。最后,有很多的元数据比如主题名称,消费组,提交偏移量,这些都不会再被使用了。基于这些理由使用新的应用程序编号被认为是重置应用程序的一个粗糙方案,因此我们不推荐在生产环境中使用这个种方式。

One more hint at the end: if you do not use a Kafka Streams application anymore (e.g., it gets replaced by a new version of the application or is just not longer needed), we recommend to run the reset tool once to clean-up all the left-overs(剩余) of the retired application.

最后再提示一点:如果你不再使用一个Kafka Streams应用程序了(比如应用程序有新的版本替换了,或者就只是不用了),推荐运行一次重置工具来清理所有剩余的退役应用。

KIP-28:Add a processor client

翻译:https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client

动机

A common use case for Kafka is real-time processes that transform data from input topics to output topics. Today there are a couple of options available for users to process such data:

使用Kafka的一个典型用例是实时处理,从输入主题中转换数据到输出主题。现在用户有两种方式来处理这样的数据:

1.使用Kafka的生产者和消费者API,自己定义处理逻辑,比如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// create a producer and a consumer
KafkaProducer producer = new KafkaProducer(configs);
KafkaConsumer consumer = new KafkaConsumer(configs);

// start a thread with a producer and consumer client
// for data IO and execute processing logic
new Thread(new Runnable {
@Override
void run() {
while (isRunning) {
// read some data from up-stream Kafka
List<Message> inputMessages = consumer.poll();

// do some processing..

// send the output to the down-stream Kafka
producer.send(outputMessages);
}
}
}).start()

2.使用成熟的流处理系统比如Storm、Samza、Spark Streaming、或者Flink,将Kafka作为它们的源/目标的流数据存储。

Both of those approaches have some downsides. Downsides of using the first option are that the producer and consumer APIs used for writing transformations are somewhat low level; simple examples are relatively simple, but any kind of more complex transformation tends to be a bit complicated. The opportunities(因素,机会) for a richer client to add value beyond what the producer and consumer do would be:

这两种方式都有优缺点,第一种方案的缺点是:用生产者和消费者API来写转换操作有些低级,简单的例子可能还好,稍微复杂的转换则不可取。提供一个富客户端相对原始的生产者和消费者有以下好处:

  1. Manage multi-threading and parallelism within a process. 在一个进程中管理多线程和并行
  2. Manage partitioning assignment to processes / threads. 管理分区如何分配给进程或现场
  3. Manage journaled local state storage. 管理本地状态存储(文件存储系统)
  4. Manage offset commits and “exactly-once” guarantees as appropriate features are added in Kafka to support this. 管理偏移量的提交和正好一次的保证

The second option, i.e. using a full stream processing framework can be a good solution but a couple of things tend to(趋向,易于) make it a bit heavy-weight (a brief and still-going survey can be found here):

第二种选项,使用流处理框架可能会是一个好的方案,不过下面这些附带产物很容易让它们变得非常重量级。

  1. These frameworks are poorly integrated with Kafka (different concepts, configuration, monitoring, terminology). For example, these frameworks only use Kafka as its stream data source / sink of the whole processing topology, while using their own in-memory format for storing intermediate data (RDD, Bolt memory map, etc). If users want to persist these intermediate results to Kafka as well, they need to break their processing into multiple topologies that need to be deployed separately, increasing operation and management costs.
  2. These frameworks either duplicate or force the adoption(采用) of a packaging, deployment, and clustering solution. For example, in Storm you need to run a Storm cluster which is a separate thing that has to be monitored and operated. In an elastic environment like AWS, Mesos, YARN, etc this is sort of silly since then you have a Storm cluster inside the YARN cluster vs just directly running the jobs in Mesos; similarly Samza is tied up with YARN.
  3. These frameworks can’t be integrated with existing services or applications. For example, you can’t just embed a light transformation library inside an existing app, but rather the entire framework that runs as a service.

  4. 这些框架与Kafka的集成很贫乏(不同的概念,配置,监控,术语)。比如这些框架都只会使用Kafka作为它们的整个处理拓扑中的流数据源或目标,但同时也会使用它们自己的内存格式来存储内部数据(RDD,Bolt内存字典)。如果用户想要持久化这些临时结果到Kafka中,他们需要将流处理分成多个部署独立的拓扑,而这显然增加了操作和维护的成本。

  5. 这些框架针对打包、部署、集群的方案会有重复或强制采用。比如在Storm中你需要运行一个独立的Storm集群,并需要监控和管理这个集群。
  6. 这些框架不能和已有的服务或应用程序继承。比如你不能简单地将一个轻量级的转换客户端库嵌入到已有的程序中,而是让整个框架作为一个服务运行。

Processor客户端提议

We want to propose another standalone “processor” client besides the existing producer and consumer clients for processing data consumed from Kafka and storing results back to Kafka.

除了已经存在的生产者和消费者客户端,我们想要提供一个新的标准“Processor”客户端,它会处理从Kafka消费的数据,然后将结果存储回Kafka。

Data Processing 数据处理

A processor computes on a stream of messages, with each message composed as a key-value pair.
Processor receives one message at a time and does not have access to the whole data set at once.

  1. Per-message processing: this is the basic function that can be triggered once a new message has arrived from the stream.
  2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.

一个Processor会在消息流上做计算,每个消息由键值对组成。Processor一次只接收一条消息,并不需要一次访问所有的数据。

  1. 每条消息处理:这是一个最基本的功能,当一条心的消息从流中到达时,应该触发一次处理逻辑
  2. 时间触发处理:当一个指定的时间间隔过去后,这个函数应该被触发。比如,它可以用在窗口计算

Compossible Processing 共存处理

Multiple processors should be able to chained up to form a DAG (i.e. the processor topology) for complex processing logic.
Users can define such processor topology in a exploring REPL manner: make an initial topology, deploy and run, check the results and intermediate values, and pause the job and edit the topology on-the-fly.

针对复杂的处理逻辑,多个Processor应该被链接在一起,形成一个DAG(Processor处理拓扑)。用户可以定义采用REPL的方式定义这样的处理拓扑:创建和初始化一个拓扑,部署和运行,检查结果和中间数据,暂停作业,编辑拓扑。

Local State Storage 本地状态存储

Users can create state storage inside a processor that can be accessed locally.
For example, a processor may retain a (usually most recent) subset of data for a join, aggregation / non-monolithic operations.

用户可以在一个Processor中创建状态存储,并且只能在本地访问(所以叫做本地状态存储)。比如一个Processor可能会保存数据的子集(通常是最近的数据)用在join操作,聚合操作。

Processor接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface ProcessorContext {
void send(String topic, Object key, Object value); // send the key value-pair to a Kafka topic
void schedule(long timestamp); // repeatedly schedule the punctuation function for the period
void commit(); // commit the current state, along with the upstream offset and the downstream sent data
String topic(); // return the Kafka record's topic of the current processing key-value pair
int partition(); // return the Kafka record's partition id of the current processing key-value pair
long offset(); // return the Kafka record's offset of the current processing key-value pair
}

public interface Processor<K, V> {
void init(ProcessorContext context); // initialize the processor
void process(K1 key, V1 value); // process a key-value pair
void punctuate(); // process when the the scheduled time has reached
void close(); // close the processor
}

public interface ProcessorDef {
Processor instance(); // create a new instance of the processor from its definition
}

public class TopologyBuilder {
// add a source node to the topology which generates incoming traffic with the specified Kafka topics
public final TopologyBuilder addSource(String name, String... topics) { ... }
// add a sink node to the topology with the specified parent nodes that sends out-going traffic to the specified Kafka topics
public final TopologyBuilder addSink(String name, String topic, String... parentNames) { ... }
// add a processor node to the topology with the specified parent nodes
public final TopologyBuilder addProcessor(String name, ProcessorDef definition, String... parentNames) { ... }
}

用户可以使用创建的Processor拓扑创建处理作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class ProcessorJob {
private static class MyProcessorDef implements ProcessorDef {
@Override
public Processor<String, Integer> instance() {
return new Processor<String, Integer>() {
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;

@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(this, 1000);
this.kvStore = new InMemoryKeyValueStore<>("local-state", context);
}

@Override
public void process(String key, Integer value) {
Integer oldValue = this.kvStore.get(key);
if (oldValue == null) {
this.kvStore.put(key, value);
} else {
int newValue = oldValue + value;
this.kvStore.put(key, newValue);
}
context.commit();
}

@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Integer> iter = this.kvStore.all();
while (iter.hasNext()) {
Entry<String, Integer> entry = iter.next();
System.out.println("[" + entry.key() + ", " + entry.value() + "]");
context.forward(entry.key(), entry.value());
}
}

@Override
public void close() {
this.kvStore.close();
}
};
}
}

public static void main(String[] args) throws Exception {
StreamingConfig config = new StreamingConfig(new Properties());

// build topology
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "topic-source");
.addProcessor("PROCESS", new MyProcessorDef(), "SOURCE");
.addSink("SINK", "topic-sink", "PROCESS");

// start process
KafkaStreaming streaming = new KafkaStreaming(builder, config);
streaming.start();
}
}

上面的API示例了低级消费者/生产者接口的抽象,比如consumer.poll() / commit(), producer.send(callback), producer.flush()。

High-level Stream DSL

除了Processor API,我们也会引入高级Stream DSL,覆盖了常见的处理实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public interface KStream<K, V> {
//Creates a new stream consists of all elements of this stream which satisfy a predicate
KStream<K, V> filter(Predicate<K, V> predicate);

//Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream
<K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, K1, V1> mapper);

//Creates a new stream by transforming valuesa by a mapper to all values of this stream
<V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);

//Creates a new stream by applying a flat-mapper to all elements of this stream
<K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, K1, ? extends Iterable<V1>> mapper);

//Creates a new stream by applying a flat-mapper to all values of this stream
<V1> KStream<K, V1> flatMapValues(ValueMapper<V, ? extends Iterable<V1>> processor);

//Creates a new windowed stream using a specified window instance.
KStreamWindowed<K, V> with(Window<K, V> window);

//Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in supplied predicates in the same order.
KStream<K, V>[] branch(Predicate<K, V>... predicates);

void sendTo(String topic); //Sends key-value to a topic.

//Sends key-value to a topic, also creates a new stream from the topic.
//This is mostly used for repartitioning and is equivalent to calling sendTo(topic) and from(topic).
KStream<K, V> through(String topic);

//Processes all elements in this stream by applying a processor.
<K1, V1> KStream<K1, V1> process(KafkaProcessor<K, V, K1, V1> processor);
// .. more operators
}

public interface KStreamWindowed<K, V> extends KStream<K, V> {
/**
* Creates a new stream by joining this windowed stream with the other windowed stream.
* Each element arrived from either of the streams is joined with elements with the same key in another stream.
* The resulting values are computed by applying a joiner.
*/
<V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);

/**
* Creates a new stream by joining this windowed stream with the other windowed stream.
* Each element arrived from either of the streams is joined with elements with the same key in another stream
* if the element from the other stream has an older timestamp.
* The resulting values are computed by applying a joiner.
*/
<V1, V2> KStream<K, V2> joinPrior(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);
}

使用高级接口,用户的程序可以很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class KStreamJob {
public static void main(String[] args) throws Exception {
StreamingConfig config = new StreamingConfig(props);

// build the topology
KStreamBuilder builder = new KStreamBuilder();

KStream<String, String> stream1 = builder.from("topic1");

KStream<String, Integer> stream2 =
stream1.map((key, value) -> new KeyValue<>(key, new Integer(value)))
.filter(((key, value) -> true));

KStream<String, Integer>[] streams = stream2
.branch((key, value) -> value > 10,
(key, value) -> value <= 10);

streams[0].sendTo("topic2");
streams[1].sendTo("topic3");

// start the process
KafkaStreaming kstream = new KafkaStreaming(builder, config);
kstream.start();
}
}

架构设计

下面我们会总结一些重要的架构设计要点:

kstream arch

分区分布

As shown in the digram above, each KStream process could have multiple threads (#.threads configurable in the properties), with each thread having a separate consumer and producer. So the first question is how can we distribute the partitions of the subscribed topics in the source processor among all the processes / threads. There are a couple of common cases for partition management in KStream:

如上图所示,每个KStream进程允许有多个线程(配置文件指定的线程数量),每个线程都有一个独立的消费者和生产者。所以第一个问题是:对于源处理算子订阅主题的分区,我们怎么分布这些分区。在KStream中有一些通用的分区管理场景:

  1. Co-partitioning: for windowed-joins.
  2. Sticky partitioning: for stateful processing, users may want to have a static mapping from stream partitions to process threads.
  3. N-way partitioning: when we have stand-by processor instances, users may want to assign a single stream partition to multiple process threads.

  4. 协调分区,针对窗口的join

  5. 粘性分区:对于有状态的操作,从流分区到处理线程,用户可能会用静态的映射方式
  6. 多路分区:当我们有备用的Processor实例,用户可能想要将一个流应用程序分配到多个处理线程上

These use cases would require more flexible assignments than today’s server-side strategies, so we need to extend the consumer coordinator protocol in the way that:

  1. Consumers send JoinGroup with their subscribed topics, and receive the JoinGroup responses with the list of members in the group and the list of topic-partitions.
  2. All consumers will get the same lists, and they can execute the same deterministic partition assignment algorithm to get their assigned topic-partitions.

这些用例都需要比现有的服务端策略有更加灵活的分配方式,所以我们需要扩展消费者的协调协议:

  1. 消费者发送带有订阅主题的JoinGroup,接收到带有消费组成员的JoinGroup响应,以及主题分区列表
  2. 所有消费者接收到相同的列表,执行相同的分区分配算法,来得到属于它们自己的主题分区

With this new assignment protocol (details of this change can be found here), we distribute the partitions among worker thread as the following:

使用新的分配协议,我们会用下面的方式将分区在所有工作线程上进行分布:

0.Upon starting the KStream process, user-specified number of KStream threads will be created. There is no shared variables between threads and no synchronization barriers as well hence these threads will execute completely asynchronously. Hence we will describe the behavior of a single thread in all the following steps.

在启动KStream进程时,指定数量的KStream线程会被创建。线程之间没有共享的变量,也没有同步的屏障,所以这些线程会完全异步地执行。所以在下面的步骤中我们只会描述一个线程的行为,其他线程都是类似的。

1.Thread constructs the user-specified processor topology without initializing it just in order to extract the list of subscribed topics from the topology.

线程会构造用户指定的处理拓扑,但是不会初始化它,仅仅只是为了从拓扑中抽取中订阅的主题列表

2.Thread uses its consumer’s partitionsFor() to fetch the metadata for each of the subscribed topics to get a information of topic -> #.partitions.

线程使用消费者的partitionsFor()方法获取订阅的每个主题的元数据,得到topic和分区数量的信息

3.Thread now triggers consumer’s subscribe() with the subscribed topics, which will then applies the new rebalance protocol. The join-group request will be instantiated as follows (for example):

现在线程会调用消费者的subscribe()方法,传递订阅的主题,然后会运用新的平衡协议。JoinGroup请求实例化对象如下:

1
2
3
JoinGroupRequest =>
GroupId => "KStream-[JobName]"
GroupType => "KStream"

And the assignor interface is implemented as follows: 分配分区的接口如下:

1
2
3
4
5
6
7
8
List<TopicPartition> assign(String consumerId, //消费者编号
Map<String, Integer> partitionsPerTopic, //每个主题有多少个分区
List<ConsumerMetadata<T>> consumers) { //所有的消费者元数据

// 1. trigger user-customizable grouping function to group the partitions into groups. 将分区进行分组
// 2. assign partitions to consumers at the granularity of partition-groups. 以分区分组的粒度将分区分配给消费者
// 3*. persist the assignment result using commit-offset to Kafka. 持久化分配信息
}

The interface of the grouping function is the following, it is very similar to the assign() interface above, with the only difference that it does not have the consumer-lists.

分组函数的接口如下,它和上面的assign()接口方法很类似,唯一的区别是没有消费者列表

1
2
3
4
interface PartitionGrouper {
//Group partitions into partition groups
List<Set<TopicPartition>> group(Map<String, Integer> partitionsPerTopic);
}

So after the rebalance completes, each partition-group will be assigned as a whole to the consumers, i.e. no partitions belonging to the same group will be assigned to different consumers. The default grouping function maps partitions with the same id across topics to into a group (i.e. co-partitioning).

在平衡完成后,每个分区分组都会作为一个整体分配给消费者,也就是说不会有一个分区,在同一个组中,但被分配给不同的消费者(同一组中的分区总是分配给同一个消费者)。默认的分组方法会在多个主题之间,将相同的分区编号映射到同一个组中(比如协调分区)。

4.Upon getting the partition-groups, thread creates one task for each partition-group. And for each task, constructs the processor topology AND initializes the topology with the task context.

在得到分区分组后,线程会为每个分区分组创建一个任务。对每个任务,都会构造处理拓扑,然后使用任务的上下文信息初始化拓扑

a. Initialization process will trigger Processor.init() on each processor in the topology following topology's DAG order.
b. All user-specified local states will also be created during the initialization process (we will talk about this later in the later sections).
c. Creates the record queue for each one of the task's associated partition-group's partitions, so that when consumers fetches new messages, it will put them into the corresponding queue.

a.初始化过程会在拓扑的每个Processor上调用rocessor.init()方法,每个Processor按照拓扑的DAG顺序依次初始化  
b.所有用户指定的本地状态也会在初始化过程中被创建  
c.为每个任务关联的分区分组的每个分区创建一个记录队列(每个分区都有一个对了),这样当消费者拉取新消息时,可以将它们放到对应的队列中

Hence all tasks’ topologies have the same “skeleton” but different processor / state instantiated objects; in addition, partitions are also synchronized at the tasks basis (we will talk about partition-group synchronization in the next section).

所以所有任务的拓扑都有相同的“骨架”,但是有不同的处理/状态实例化对象;另外,分区也会在任务的基础上进行同步。

5.When rebalance is triggered, the consumers will read its last persisted partition assignment from Kafka and checks if the following are true when comparing with the new assignment result:

当平衡触发时,消费者会从Kafka中读取最近持久化的分区分配,并检查下面的条件,和新的分配结果比较是否成立

a. Existing partitions are still assigned to the same partition-groups.
b. New partitions are assigned to the existing partition-groups instead of creating new groups.
c. Partition groups are assigned to the specific consumers instead of randomly / round-robin.

a.已经存在的分区仍然分配给相同的分区组  
b.新的分区分配给已经存在的分区组,而不是创建新的组  
c.分区组分配给指定的消费者,而不是采用随机或者轮询方式

For a), since the partition-group’s associated task-id is used as the corresponding change-log partition id, if a partition gets migrated from one group to another during the rebalance, its state will be no longer valid; for b) since we cannot (yet) dynamically change the #.partitions from the consumer APIs, dynamically adding more partition-groups (hence tasks) will cause change log partitions possibly not exist yet. for c) there are some more advanced partitioning setting such as sticky-partitioning / consistent hashing that we want to support in the future, which may then require additionally.

对于a),由于分区组关联的任务编号作为对应的变更日志主题的分区编号,如果在平衡式,一个分区从一组迁移到另一个组,它的状态可能不再有效;对于b),由于我们不能在消费者API中动态地修改分区数量,如果动态地添加更多的分区组(以及任务),会导致变更日志主题的分区可能还不存在;对于c),未来我们想要支持一些更加高级的分区方式比如粘性分区/一致性哈希,这可能就需要我们将分区组分配给指定的消费者。

流时间和同步

Time in the stream processing is very important. Windowing operations (join and aggregation) are defined by time. Since Kafka can replay stream, wall-clock based time (system time) may not make sense due to delayed messages / out-of-order messages. Hence we need to define a “time” for each stream according to its progress. We call it stream time.

流处理中的时间非常重要。窗口函数操作(比如聚合和联合)都是通过时间定义的。由于Kafka可以重放流,基于系统时间的时钟对于延迟的、无需的消息可能没有多大意义。所以我们需要为每个流根据它的处理进度定义一个时间,这个时间叫做流的时间。

Stream Time

A stream is defined to abstract all the partitions of the same topic within a task, and its name is the same as the topic name. For example if a task’s assigned partitions are {Topic1-P1, Topic1-P2, Topic1-P3, Topic2-P1, Topic2-P2}, then we treat this task as having two streams: “Topic1” and “Topic2” where “Topic1” represents three partitions P1 P2 and P3 of Topic1, and stream “Topic2” represents two partitions P1 and P2 of Topic2.

一个流的定义是在一个任务中,对相同主题的所有分区的抽象,它的名称和主题的名称相同。比如一个任务分配的分区有:{Topic1-P1, Topic1-P2, Topic1-P3, Topic2-P1, Topic2-P2},那么我们就会认为这个任务有两个流:“Topic1”和“Topic2”,其中流“Topic1”代表了Topic1主题的三个分区P1,P2和P3,而流“Topic2”代表了Topic2主题的两个分区P1和P2。

Each message in a stream has to have a timestamp to perform window based operations and punctuations. Since Kafka message does not have timestamp in the message header, users can define a timestamp extractor based on message content that is used in the source processor when deserializing the messages. This extractor can be as simple as always returning the current system time (or wall-clock time), or it can be an Avro decoder that gets the timestamp field specified in the record schema.

流中的每条消息都必须有一个时间撮,才可以执行基于窗口的操作。由于Kafka消息头中没有时间撮(在新版本中其实已经有时间撮了),用户在序列化消息时,源处理算子会基于消息内容定义一个时间撮解析器。这个解析器可以简单地返回当前的系统时间(即时钟时间),或者是一个能够从记录的Schema获取时间撮字段的Avro解码器。

In addition, since Kafka supports multiple producers sending message to the same topic, brokers may receive messages in order that is not strictly following their timestamps (i.e. out-of-order messages). Therefore, we cannot simply define the “stream time” as the timestamp of the currently processed message in the stream hence that time can move back and forth.

此外,由于Kafka支持多个生产者发送消息到同一个主题,Broker接收到消息的顺序可能并不是严格按照它们的时间撮(即乱序的消息)。因此我们不能简单地将流中当前处理过的消息的时间撮作为“流时间”,因为那个时间可能会来回地移动。

We can define the “stream time” as a monotonically increasing value as the following:

  1. For each assigned partition, the thread maintains a record queue for buffering the fetched records from the consumer.
  2. Each message has an associated timestamp that is extracted from the timestamp extractor in the message content.
  3. The partition time is defined as the lowest message timestamp value in its buffer.
    a. When the lowest timestamp corresponding record gets processed by the thread, the partition time possibly gets advanced.
    b. The partition time will NOT gets reset to a lower value even if a later message was put in a buffer with a even lower timestamp.
  4. The stream time is defined as the lowest partition timestamp value across all its partitions in the task:
    a. Since partition times are monotonically increasing, stream times are also monotonically increasing.
  5. Any newly created streams through the upstream processors inherits the stream time of the parents; for joins, the bigger parent’s stream time is taken.

我们可以定义“流时间”是一个单调递增的值:

  1. 对每个分配的分区,线程维护了一个记录队列,用来缓冲从消费者拉取到的记录
  2. 每条消息都有一个关联的时间撮,它是从消息内容中用时间撮解析器抽取出来的
  3. 分区的时间会被定义为缓冲区中最低的消息时间撮
    3.1 当最低时间撮对应的记录被线程处理后,分区的时间可能会增长
    3.2 分区时间不会被重置为一个更低的值,即使一条迟到的消息放到缓冲区,而它的时间撮比分区时间还要低
  4. 流时间被定义为任务中所有分区的最低的分区时间
    4.1 由于分区时间是单调递增的,所以流时间也是单调递增的
  5. 任何通过上游处理节点新创建的流都继承了所有父节点的流时间。对于join操作而言,会选择所有父节点中最大的流时间作为它的流时间

Stream Synchronization

When joining two streams, their progress need to be synchronized. If they are out of sync, a time window based join becomes faulty. Say a delay of one stream is negligible(微不足道的) and a delay of the other stream is one day, doing join over 10 minutes window does not make sense. To handle this case, we need to make sure that the consumption rates of all partitions within each task’s assigned partition-group are “synchronized”. Note that each thread may have one or more tasks, but it does not need to synchronize the partitions across tasks’ partition-groups.

当联合两个流时,它们的进度需要被同步。如果它们状态不同步,基于时间窗口的join就会出错。比如一个流的延迟很小,但是另一个流的延迟有一天,在做10分钟的窗口join时就没有意义了。为了处理这种场景,我们要确保每个任务分配的分区组中所有分区的消费速率是同步的。注意:由于每个线程可能有多个任务,但是并不需要在任务的分区组之间同步。

Work thread synchronizes the consumption within each one of such groups through consumer’s pause / resume APIs as following:

  1. When one un-paused partition is a head of time (partition time defined as above) beyond some defined threshold with other partitions, notify the corresponding consumer to pause.
  2. When one paused partition is a head of time below some defined with other partitions, notify the corresponding consumer to un-pause.

工作线程同步分区组中每个分区的消费进度,是通过消费者的pause/resume API完成的:

  1. 当一个还没暂停的分区比其他分区的时间(这个时间指的是分区的时间)超前定义的阈值,通知对应的消费者暂停
  2. 当一个暂停的分区比其他分区的时间落后定义的阈值,通知对应的消费者不要暂停(即恢复)

Two streams that are joined together have to be in the same task, and their represented partition lists have to match each other. That is, for example, a stream representing P1, P2 and P3 can be joined with another stream also representing P1, P2 and P3.

两个join的流必须在同一个任务中,它们对应的分区列表必须互相匹配。举例一个流有P1,P2,P3三个分区,可以和另外一个也有三个分区的流进行join(如果另外一个流的分区不是3个,就无法join)。

本地状态管理

Users can create one or more state stores during their processing logic, and each task will have a state manager that keeps an instance of each specified store inside the task. Since a single store instance will not be shared across multiple partition groups, and each partition group will only be processed by a single thread, this guarantees any store will not be accessed concurrently by multiple thread at any given time.

用户可以在流处理逻辑中创建一个或多个状态存储,每个任务在其任务内部都有一个状态管理器,保存了每个指定存储的实例。由于一个单一的存储不会在多个分区组中共享,而且每个分区组都只会被一个线程处理,这就保证了在任何时间,都不会有多个线程并发地访问任何的存储(所以访问存储是线程安全的)。

Log-backed State Storage

Each state store will be backed up by a different Kafka change log topic, and each instance of the store correlates to one partition of the topic, such that:

每个状态存储后台都是一个不同的Kafka变更日志主题,每个存储实例都会存储主题的一个分区

1
#. tasks == #. partition groups == #. store instances for each state store == #.partitions of the change log for each state store

For example, if a processor instance consumes from upstream Kafka topic “topic-A” with 4 partitions, and creates two stores, namely store1 and store2, and user groups the 4 partitions into {topic-A-p1, topic-A-p2} and {topic-A-p3, topic-A-p4}; then two change log topics, for example namely “topic-store1-changelog” and “topic-store2-changelog”, need to be created beforehand, each with two partitions.

比如有一个Processor实例从上游有4个分区的Kafka主题“topic-A”消费数据,并创建了两个存储即store1和store2,用户将这4个分区分成{topic-A-p1, topic-A-p2}和{topic-A-p3, topic-A-p4},那么就需要事先创建两个主题:”topic-store1-changelog”和”topic-store2-changelog”,每个主题有两个分区。

After processor writes to a store instance, it first sends the change message to its corresponding changelog topic partition. When user calls commit() in his processor, KStream needs to flush both the store instance as well as the producer sending to the changelog, as well as committing the offset in the upstream Kafka. If these three operations cannot be done atomically, then if there is a crash in between this operations duplicates could be generated since the upstream Kafka committing offset is executed in the last step; if there three operations can be done atomically, then we can guarantee “exactly-once” semantics.

当Processor写入存储实例,它首先会将变更消息发送到对应的变更日志主题分区中。当用户在Processor中调用commit()方法时,KStream需要同时刷新存储实例、生产者发送到变更日志的消息(之前的发送不一定真正写入,只有flush时才会确保消息真正写入)、提交上游Kafka的偏移量。如果这三个操作不能以原子操作完成,那么如果在这些步骤中发生崩溃,就会生成重复的数据,因为上游的Kafka提交偏移量是在最后一步执行的(和消费者的处理类似,最后提交偏移量,只能保证至少一次,但数据可能会重复);如果这三个步骤能原子地完成,我们就可以保证“正好一次”的语义了。

Persisting and Restoring State

When we close a KStream instance, the following steps are executed:

  1. Flush all store’s state as mentioned above.
  2. Write the change log offsets for all stores into a local offset checkpoint file. The existence of the offset checkpoint file indicates if the instance was cleanly shutdown.

当我们关闭一个KStream示例时,会执行下面的步骤:

  1. 刷新上面提到的所有状态存储
  2. 将所有存储的变更日志偏移量写到本地的偏移量检查点文件中。是否存在偏移量检查点文件,可以用力爱判断实例是否关闭的很干净(如果没有,说明KStream没有被彻底关闭)。

Upon (re-)starting the KStream instance:

  1. Try to read the local offset checkpoint file into memory, and delete the file afterwards.
  2. Check the offset of the corresponding change log partition read from the checkpoint file.
    a. If the offset is read successfully, load the previously flushed state and replay the change log from the read offset up to the log-end-offset.
    b. Otherwise, do not load the previously flushed state and replay the change log from the beginning up to the log-end-offset.

重启KStream实例时:

  1. 尝试读取本地的偏移量检查点文件到内存中,然后删除删除这个文件
  2. 读取检查点文件,检查对应的变更日志分区的偏移量
    2.1 如果成功读取了偏移量,加载之前(关闭时)刷新的状态,从读取的偏移量到日志文件的最后,重放变更日志
    2.2 否则,不要加载之前刷新的状态,也不需要重放变更日志

工作流程总结

下面总结Kafka Streams处理的步骤:

启动

Upon user calling KafkaStreaming.start(), the process instance creates the worker threads given user specified #.threads. In each worker thread:

  1. Construct the producer and consumer client, extract the subscription topic names from the topology.
  2. Let the consumer to subscribe to the topics and gets the assigned partitions.
  3. Trigger the grouping function with the assigned partitions get the returned list of partition-groups (hence tasks) with associated ids.
  4. Initialize each task by:
    a. Creates a record queue for buffering the fetched records for each partition.
    b. Initialize a topology instance for the task from the builder with a newly created processor context.
    c. Initialize the state manager of the task and constructs / resumes user defined local states.
  5. Runs the loop at its own pace until notified to be shutdown: there is no synchronization between these threads. In each iteration of the loop:
    a. Thread checks if the record queues are empty / low, and if yes calls consumer.poll(timeout) / consumer.poll(0) to re-fill the buffer.
    b. Choose one record from the queues and process it through the processor topology.
    c. Check if some of the processors’ punctuate functions need to be triggered, and if yes, execute the function.
    d. Check if user calls commit() during the processing of this records; if yes commit the offset / flush the local state / flush the producer.

在调用KafkaStreaming.start()时,Processor实例会创建指定数量的工作线程,每个线程中:

  1. 构造生产者和消费者客户端,从拓扑中解析订阅的主题名称
  2. 消费者订阅主题,并得到分配的分区
  3. 使用分配的分区触发分组方法,返回分区分组列表以及对应的编号(任务)
  4. 初始化每个任务
    4.1 为每个分区创建一个记录队列,用来缓冲每个分区的拉取记录
    4.2 从Builder中为任务初始化拓扑实例
    4.3 初始化任务的状态管理器,构造或恢复用户定义的本地状态
  5. 以自己的步伐运行循环,直到收到关闭的通知,在这些线程中不需要同步,在循环的每次迭代中:
    5.1 线程检查记录队列空了或者记录很少,则调用onsumer.poll(timeout) / consumer.poll(0)重新填充缓冲区
    5.2 从队列中选择一条记录,并将其放入处理拓扑中处理
    5.3 检查一些Processor的punctuate方法是否需要触发,如果需要则执行函数
    5.4 在处理记录时,检查是否可以调用commit(),如果是,则提交偏移量、刷新本地状态、刷新生产者

这里的Processor实例指的是拓扑中的处理节点,而不是拓扑本身,也不是指应用程序实例

关闭

Upon user calling KafkaStreaming.shutdown(), the following steps are executed:

  1. Commit / flush each partition-group’s current processing state as described in the local state management section.
  2. Close the embedded producer and consumer clients.

当用户调用KafkaStreaming.shutdown(),会执行以下步骤:

  1. 提交或刷新每个分区组的当前处理状态
  2. 关闭内置的生产者和消费者客户端

一些重要的类:

  1. PartitionGroup: a set of partitions along with their queuing buffers and timestamp extraction logic. 分区的集合,联通它们的队列缓冲区,以及时间撮解析逻辑
  2. ProcessorStateManager: the manager of the local states within a task. 任务的本地状态管理器
  3. ProcessorTopology: the instance of the topology generated by the TopologyBuilder. 通过TopologyBuilder生成的拓扑实例
  4. StreamTask: the task of the processing tasks unit, which include a ProcessorTopology, a ProcessorStateManager and a PartitionGroup. 处理任务的单元,包括前面三个概念
  5. StreamThread: contains multiple StreamTasks, a Consumer and a Producer client. 包括多个流任务,一个生产者、消费者客户端
  6. KStreamFilter/Map/Branch/…: implementations of high-level KStream topology builder operators. 实现高级KStream拓扑构造器的算子

KIP-67:Queryable state for Kafka Streams

翻译:https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams

Today a Kafka Streams application will implicitly create state. This state is used for storing intermediate data such as aggregation results. The state is also used to store KTable’s data when they are materialized. The problem this document addresses is that this state is hidden from application developers and they cannot access it directly. The DSL allows users to make a copy of the data (using the through operator) but this leads to a doubling in the amount of state that is kept. In addition, this leads to extra IOs to external databases/key value stores that could potentially slow down the entire pipeline. Here is a simple example that illustrates the problem:

一个Kafka Streams应用程序通常都会在后台隐式地创建状态。这个状态用来存储临时数据,比如聚合的结果。状态也会被用在当物化KTable时存储KTable数据。这篇文档要解决的问题是状态对于应用开发者是隐藏的,他们不能直接访问状态。DSL操作允许用户使用through操作符复制数据,但是导致了需要保存的状态数量翻倍。另外,也导致了和外部数据库/键值存储产生额外的IO开销,这可能会降低整个数据管道的响应。下面模拟了这个问题:

1
2
3
4
5
1 KTable<String, Long> wordCounts = textLine
2 .flatMapValues(value ->Arrays.asList(value.toLowerCase().split("\\W+")))
3 .map((key, word) -> new KeyValue<>(word, word)
4 .countByKey("StoreName")
5 wordCounts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");

In line 4, the aggregation already maintains state in a store called StoreName, however that store cannot be directly queried by the developer. Instead, the developer makes a copy of the data in that store into a topic called streams-wordcount-output. Subsequently, the developer might instantiate its own database after reading the data from that topic (this step is not shown above). This is shown in illustration (a):

在第四行中,聚合操作维护了一个状态存储叫做“StoreName”,不过这个存储不能直接被开发者用来查询。相反,开发者会在这个存储中复制数据到一个叫做“streams-wordcount-output”的主题。然后,开发者可能会在从这个主题读到数据后,实例化自己的数据库(这里没有展示出用法)。


文章目录
  1. 1. 介绍
    1. 1.1. Kafka Streams
    2. 1.2. A closer look
  2. 2. 快速开始
    1. 2.1. 本节目标
    2. 2.2. 我们要做什么
  3. 3. 概念
    1. 3.1. Kafka 101
    2. 3.2. 流、流处理、拓扑、算子
    3. 3.3. 时间
    4. 3.4. 有状态的流处理
    5. 3.5. Streams和Tables的二元性
    6. 3.6. KStream(记录流 record stream)
    7. 3.7. KTable(变更流 changelog stream)
    8. 3.8. 窗口操作
    9. 3.9. 联合操作
    10. 3.10. 聚合操作
  4. 4. 架构
    1. 4.1. 拓扑
    2. 4.2. 并行模型
      1. 4.2.1. Stream Partitions and Tasks
      2. 4.2.2. Threading Model
      3. 4.2.3. Example
    3. 4.3. 状态
    4. 4.4. 容错
    5. 4.5. 流处理的保证
    6. 4.6. 流控
    7. 4.7. 背压
  5. 5. 开发者指南
    1. 5.1. Kafka Streams配置
    2. 5.2. 编写一个流处理应用程序
      1. 5.2.1. Processor API
      2. 5.2.2. Kafka Streams DSL
        1. 5.2.2.1. CREATING SOURCE STREAMS FROM KAFKA
        2. 5.2.2.2. TRANSFORM A STREAM
        3. 5.2.2.3. STATELESS TRANSFORMATIONS
        4. 5.2.2.4. STATEFUL TRANSFORMATIONS
        5. 5.2.2.5. WINDOWING A STREAM
        6. 5.2.2.6. JOINING STREAMS
        7. 5.2.2.7. APPLYING A CUSTOM PROCESSOR
        8. 5.2.2.8. WRITING STREAMS BACK TO KAFKA
    3. 5.3. 运行流处理程序
      1. 5.3.1. 水平扩展你的应用程序
      2. 5.3.2. 增加处理能力(Expand)
      3. 5.3.3. 减少处理能力(Shrink)
      4. 5.3.4. 运行多少个应用程序实例
    4. 5.4. 数据类型和序列化
    5. 5.5. 应用重置工具
    6. 5.6. 重置流处理应用程序
      1. 5.6.1. 示例
        1. 5.6.1.1. Step 1: Prepare your application for resets
        2. 5.6.1.2. Step 2: Reset the application
      2. 5.6.2. Behind the Scenes of Kafka Streams
        1. 5.6.2.1. reprocessing Input Topics
        2. 5.6.2.2. Sub-Topologies and Internal State in Kafka Streams
        3. 5.6.2.3. Resetting a Kafka Streams Application Manually
      3. 5.6.3. Application Reset Tool Details
        1. 5.6.3.1. 重置时指定了新的应用程序编号会发生什么?
  6. 6. KIP-28:Add a processor client
    1. 6.1. 动机
    2. 6.2. Processor客户端提议
      1. 6.2.1. Processor接口
      2. 6.2.2. High-level Stream DSL
    3. 6.3. 架构设计
      1. 6.3.1. 分区分布
      2. 6.3.2. 流时间和同步
      3. 6.3.3. 本地状态管理
      4. 6.3.4. 工作流程总结
  7. 7. KIP-67:Queryable state for Kafka Streams