# Kafka Streams中文翻译

Confluent Kafka Streams Documentation 中文翻译： http://docs.confluent.io/3.0.1/streams/introduction.html

# 介绍

## Kafka Streams

Kafka Streams, a component of open source Apache Kafka, is a powerful, easy-to-use library for building highly scalable, fault-tolerant, distributed stream processing applications on top of Apache Kafka. It builds upon important concepts for stream processing such as properly distinguishing between event-time and processing-time, handling of late-arriving data, and efficient management of application state.

Kafka Streams是构建在Apache Kafka的一个组件，它是一个功能强大的、对于构建高可用、故障容错、分布式流处理应用程序都很容易使用的库。它构建在流处理的重要概念之上，比如正确地区分事件时间（event-time）和处理时间（process-time），处理延时数据，高效的应用程序状态管理。

One of the mantras(祷文) of Kafka Streams is to “Build apps, not clusters!”, which means to bring stream processing out of the Big Data niche into the world of mainstream application development. Using the Kafka Streams library you can implement standard Java applications to solve your stream processing needs – whether at small or at large scale – and then run these applications on client machines at the perimeter(边界) of your Kafka cluster. Deployment-wise you are free to chose from any technology that can deploy Java applications, including but not limited to Puppet, Chef, Ansible, Docker, Mesos, YARN, Kubernetes, and so on. This lightweight and integrative(综合) approach of Kafka Streams is in stark(完全、突出) contrast(对比) to other stream processing tools that require you to install and operate separate stream processing clusters and similar heavy-weight infrastructure that come with their own special set of rules on how to use and interact with them.

Kafka Streams的一个思想是“构建应用程序，不要集群”，这意味着将流处理从大数据生态圈中解放出来，而专注于主流的应用程序开发。使用Kafka Streams客户端库，你可以用标准的Java应用程序（main方法）来解决你的流处理需求（不管是小规模还是大规模的数据），然后可以在你的Kafka集群之外的客户端机器执行这些应用程序。你可以选择任何可以部署Java应用的技术来部署Kafka Streams，包括但不限于Puppet、Chef、Ansible、Docker、Mesos、YARN、Kubernetes等等。Kafka Streams的轻量级以及综合能力使得它和其他流处理工具形成了鲜明的对比，后者需要你单独安装并维护一个流处理集群，需要依赖重量级的基础架构设施。

The following list highlights several key capabilities and aspects of Kafka Streams that make it a compelling(引人注目) choice for use cases such as stream processing applications, event-driven systems, continuous queries and transformations, reactive applications, and microservices.

Powerful功能强大

• Highly scalable, elastic, fault-tolerant 高可用、可扩展性、故障容错
• Stateful and stateless processing 有状态和无状态的处理
• Event-time processing with windowing, joins, aggregations 针对事件的窗口函数、联合操作、聚合操作

Lightweight轻量级

• No dedicated cluster required 不需要专用的集群
• No external dependencies 不需要外部的依赖
• “It’s a library, not a framework.” 它是一个客户端库，不是一个框架

Fully integrated完全完整的

• 100% compatible with Kafka 0.10.0.x 和Kafka完全兼容
• Easy to integrate into existing applications 和已有应用程序容易集成
• No artificial rules for deploying applications 对部署方式没有严格的规则限制

Real-time实时的

• Millisecond processing latency 微秒级别的处理延迟
• Does not micro-batch messages 不是micro-batch处理
• Windowing with out-of-order data 对无序数据的窗口操作
• Allows for arrival of late data 允许延迟的数据

## A closer look

Before we dive into the details such as the concepts and architecture of Kafka Streams or getting our feet wet by following the Kafka Streams quickstart guide, let us provide more context to the previous list of capabilities.

1.Stream Processing Made Simple: Designed as a lightweight library in Apache Kafka, much like the Kafka producer and consumer client libraries. You can easily embed and integrate Kafka Streams into your own applications, which is a significant departure from framework-based stream processing tools that dictate many requirements upon you such as how you must package and “submit” processing jobs to their cluster.

Has no external dependencies on systems other than Apache Kafka and can be used in any Java application. Read: You do not need to deploy and operate a separate cluster for your stream processing needs. Your Operations and Info Sec teams, among others, will surely be happy to hear this.

2.Leverages Kafka as its internal messaging layer instead of (re)implementing a custom messaging layer like many other stream processing tools. Notably, Kafka Streams uses Kafka’s partitioning model to horizontally scale processing while maintaining strong ordering guarantees. This ensures high performance, scalability, and operational simplicity for production environments. A key benefit of this design decision is that you do not have to understand and tune two different messaging layers – one for moving data streams at scale (Kafka) plus a separate one for your stream processing tool. Similarly, any performance and reliability improvements of Kafka will automatically be available to Kafka Streams, too, thus tapping into the momentum of Kafka’s strong developer community.

3.Is agnostic(不可知论) to resource management and configuration tools, so it integrates much more seamlessly(无缝) into the existing development, packaging, deployment, and operational practices of your organization. You are free to use your favorite tools such as Java application servers, Puppet, Ansible, Mesos, YARN, Docker – or even to run your application manually on a single machine for proof-of-concept scenarios.

Kafka Streams不需要依赖资源管理和配置工具，所以它可以和已有的开发环境、打包、部署等工具无缝集成。可以运行在Java应用服务器，甚至在单机环境下做原型验证（POC）。

4.Supports fault-tolerant local state, which enables very fast and efficient stateful operations like joins and windowed aggregations. Local state is replicated to Kafka so that, in case of a machine failure, another machine can automatically restore the local state and resume the processing from the point of failure.

5.Employs one-record-at-a-time processing to achieve low processing latency, which is crucial(重要，决定性) for a variety of use cases such as fraud detection. This makes Kafka Streams different from micro-batch based stream processing tools.

Furthermore, Kafka Streams has a strong focus on usability(可用性) and a great developer experience. It offers all the necessary stream processing primitives to allow applications to read data from Kafka as streams, process the data, and then either write the resulting data back to Kafka or send the final output to an external system. Developers can choose between a high-level DSL with commonly used operations like filter, map, join, as well as a low-level API for developers who need maximum control and flexibility.

Finally, Kafka Streams helps with scaling developers, too – yes, the human side – because it has a low barrier(接线) to entry and a smooth path to scale from development to production: You can quickly write and run a small-scale proof-of-concept on a single machine because you don’t need to install or understand a distributed stream processing cluster; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently(透明地) handles the load balancing of multiple instances of the same application by leveraging Kafka’s parallelism model.

In summary, Kafka Streams is a compelling choice for building stream processing applications. Give it a try and run your first Hello World Streams application! The next sections in this documentation will get you started.

# 快速开始

## 本节目标

The goal of this quickstart guide is to provide you with a first hands-on look at Kafka Streams. We will demonstrate how to run your first Java application that uses the Kafka Streams library by showcasing a simple end-to-end data pipeline powered by Kafka.

It is worth noting that this quickstart will only scratch the surface of Kafka Streams. More details are provided in the remainder of the Kafka Streams documentation, and we will include pointers throughout the quickstart to give you directions.

## 我们要做什么

1. 在一台机器上启动一个Kafka集群
2. 使用Kafka内置的控制台生产者模拟往一个Kafka主题中写入一些示例数据
3. 使用Kafka Streams库处理输入的数据，处理程序就是上面的wordcount示例
4. 使用Kafka内置的控制台消费者检查应用程序的输出
5. 停止Kafka集群

Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed “all” the input data. This is a typical difference between the class of algorithms that operate on unbounded streams of data and, say, batch processing algorithms such as Hadoop MapReduce. It will be easier to understand this difference once we inspect the actual output data later on.

The WordCount demo application will read from the input topic streams-file-input, perform the computations of the WordCount algorithm on the input data, and continuously write its current results to the output topic streams-wordcount-output (the names of its input and output topics are hardcoded). The demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically.

The explanation is that the output of the WordCount application is actually a continuous stream of updates, where each data record (i.e. each line in the original output above) is an updated count of a single word aka record key such as “kafka”. For multiple records with the same key, each later record is an update of the previous one.

The two diagrams below illustrate what is essentially(本质) happening behind the scenes. The first column shows the evolution of the current state of the KTable<String, Long> that is counting word occurrences for countByKey. The second column shows the change records that result from state updates to the KTable and that eventually, once converted to a KStream

First the text line “all streams lead to kafka” is being processed. The KTable is being built up as each new word results in a new table entry (highlighted with a green background), and a corresponding change record is sent to the downstream KStream.

When the second text line “hello kafka streams” is processed, we observe, for the first time, that existing entries in the KTable are being updated (here: for the words “kafka” and for “streams”). And again, change records are being sent to the KStream.

And so on (we skip the illustration of how the third line is being processed). This explains why the output topic has the contents we showed above, because it contains the full record of changes, i.e. the information shown in the second column for KStream above:

Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to leverage the duality(二元性，对偶) between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream): you can publish every change of the table to a stream, and if you consume the entire changelog stream from beginning to end, you can reconstruct the contents of the table.

Kafka Streams这里做的工作利用了一张表和一个变更流的二元性（表指的是KTable，变更流指的是下游的KStream）：你可以将表的每个变更记录发布给一个流，如果你从整个变更流的最开始消费到最后，你就可以重新构造出表的内容。

# 概念

## Kafka 101

Kafka Streams is, by deliberate(深思熟虑) design, tightly integrated with Apache Kafka: it uses Kafka as its internal messaging layer. As such it is important to familiarize yourself with the key concepts of Kafka, too, notably the sections 1. Getting Started and 4. Design in the Kafka documentation. In particular you should understand:

Kafka Streams是经过深思熟虑的设计，它和Apache Kafka仅仅地集成：它使用Kafka作为内部的消息层。所以理解Kafka的关键概念非常重要，如果不熟悉，可以看Kafka的文档。

• The who’s who: Kafka distinguishes producers, consumers, and brokers. In short, producers publish data to Kafka brokers, and consumers read published data from Kafka brokers. Producers and consumers are totally decoupled. A Kafka cluster consists of one or more brokers.
• The data: Data is stored in topics. The topic is the most important abstraction provided by Kafka: it is a category or feed name to which data is published by producers. Every topic in Kafka is split into one or more partitions, which are replicated across Kafka brokers for fault tolerance.
• Parallelism: Partitions of Kafka topics, and especially their number for a given topic, are also the main factor that determines the parallelism of Kafka with regards to reading and writing data. Because of their tight integration the parallelism of Kafka Streams is heavily influenced by and depending on Kafka’s parallelism.
1. Kafka分成生产者、消费者、Brokers。生产者发布数据给Kafka的Brokers，消费者从Kafka的Brokers读取发布过的数据。生产者和消费者完全解耦。一个Kafka集群包括一个或多个Broekrs节点。
2. 数据以主题的形式存储。主题是Kafka提供的最重要的一个抽象：它是生产者发布数据的一种分类（相同类型的消息应该发布到相同的主题）。每个主题会分成一个或多个分区，并且为了故障容错，每个分区都会在Kafka的Brokers中进行复制。
3. Kafka主题的分区数量决定了读取或写入数据的并行度。因为Kafka Streams和Kafka结合的很紧，所以Kafka Streams也依赖于Kafka的并行度。

## 流、流处理、拓扑、算子

A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set, where unbounded means “of unknown or of unlimited size”. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.

A stream processing application is any program that makes use of the Kafka Streams library. In practice, this means it is probably “your” Java application. It may define its computational logic through one or more processor topologies (see next section).

A processor topology or simply topology defines the computational logic of the data processing that needs to be performed by a stream processing application. A topology is a graph of stream processors (nodes) that are connected by streams (edges). Developers can define topologies either via the low-level Processor API or via the Kafka Streams DSL, which builds on top of the former.

A stream processor is a node in the processor topology(as shown in the diagram of section Processor Topology). It represents a processing step in a topology, i.e. it is used to transform data in streams. Standard operations such as map, filter, join, and aggregations are examples of stream processors that are available in Kafka Streams out of the box. A stream processor receives one input record at a time from its upstream processors in the topology, applies its operation to it, and may subsequently produce one or more output records to its downstream processors.

• The Kafka Streams DSL provides the most common data transformation operations such as map and filter so you don’t have to implement these stream processors from scratch.
• The low-level Processor API allows developers to define and connect custom processors as well as to interact with state stores.
1. Kafka Streams DSL提供了最通用的数据转换操作，比如map、filter，这样你不需要自己实现这些算子
2. 低级的Processor API，允许开发者定义和连接定制的算子，并且还可以和状态存储交互

## 时间

A critical aspect in stream processing is the the notion of time, and how it is modeled and integrated. For example, some operations such as Windowing are defined based on time boundaries.

• Event-time: The point in time when an event or data record occurred, i.e. was originally created “by the source”. Achieving event-time semantics typically requires embedding timestamps in the data records at the time a data record is being produced. Example: If the event is a geo-location change reported by a GPS sensor in a car, then the associated event-time would be the time when the GPS sensor captured the location change.
• Processing-time: The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed. The processing-time may be milliseconds, hours, or days etc. later than the original event-time. Example: Imagine an analytics application that reads and processes the geo-location data reported from car sensors to present it to a fleet management dashboard. Here, processing-time in the analytics application might be milliseconds or seconds (e.g. for real-time pipelines based on Apache Kafka and Kafka Streams) or hours (e.g. for batch pipelines based on Apache Hadoop or Apache Spark) after event-time.
• Ingestion-time: The point in time when an event or data record is stored in a topic partition by a Kafka broker. Ingestion-time is similar to event-time, as a timestamp gets embedded in the data record itself. The difference is, that the timestamp is generated when the record is appended to the target topic by the Kafka broker, not when the record is created “at the source”. Ingestion-time may approximate event-time reasonably well if we assume that the time difference between creation of the record and its ingestion into Kafka is sufficiently small, where “sufficiently” depends on the specific use case. Thus, ingestion-time may be a reasonable alternative for use cases where event-time semantics are not possible, e.g. because the data producers don’t embed timestamps (e.g. older versions of Kafka’s Java producer client) or the producer cannot assign timestamps directly (e.g., it does not have access to a local clock).
1. 事件时间：事件或数据记录发生的时间点，它是由事件源创建的。实现事件时间语义通常需要在记录中有内置的时间撮字段，表示这条记录在什么时候产生。比如一条事件是车辆传感器报告的地理位置变更，那么对应的事件时间表示GPS传感器捕获位置变更的时间点。
2. 处理时间：事件被流处理应用程序处理的时间点，比如就被消费的时候，处理时间会比原始的事件时间要晚。举例一个分析应用程序读取并处理车辆上传的地理位置，并且呈现到一个dashboard上。这里分析程序的处理时间可能比事件的时间晚几毫米、几秒、甚至几个小时。
3. 摄取时间：事件存储到Kafka Brokers的主题分区中的时间点。摄取时间和事件时间类似，它也是作为数据记录本身的一个内置字段，不同的是摄取时间是在追加到Kafka中时自动生成的，而不是数据源创建的时间。如果我们假设记录的创建时间和摄取到Kafka的时间间隔足够短的话，可以认为摄取时间近似于事件时间，当然足够短这个时间跟具体的用例有关。什么场景下采用摄取时间比较合理呢？比如数据源没有内置的事件时间（比如旧版本的Java生产者客户端在消息中不会带有时间撮，新版本则有），或者说生产者不能直接分配时间撮（无法获取到本地时钟）。

The choice between event-time and ingestion-time is actually done through the configuration of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically embedded into Kafka messages. Depending on Kafka’s configuration these timestamps represent event-time or ingestion-time. The respective Kafka configuration setting can be specified on the broker level or per topic. The default timestamp extractor in Kafka Streams will retrieve these embedded timestamps as-is. Hence, the effective time semantics of your application depend on the effective Kafka configuration for these embedded timestamps. See :ref:Developer Guide for detailed information.

Kafka Streams assigns a timestamp to every data record via so-called timestamp extractors. These per-record timestamps describe the progress of a stream with regards to time (although records may be out-of-order within the stream) and are leveraged by time-dependent operations such as joins. They are also used to synchronize multiple input streams within the same application.

Kafka Streams会通过时间撮抽取器把一个时间撮分配给每条记录。每条记录的时间撮描述了一条流关于时间的进度（尽管流中的记录可能没有顺序），这个时间撮会被时间相关的操作比如join所使用。同时它们也会被用来在同一个应用程序中多个输入流的同步。

Concrete implementations of timestamp extractors may retrieve or compute timestamps based on the actual contents of data records such as an embedded timestamp field to provide event-time or ingestion-time semantics, or use any other approach such as returning the current wall-clock time at the time of processing, thereby yielding processing-time semantics to stream processing applications. Developers can thus enforce(执行，强制) different notions/semantics of time depending on their business needs.

Be aware that ingestion-time in Kafka Streams is used slightly different as in other stream processing systems. Ingestion-time could mean the time when a record is fetched by a stream processing application’s source operator. In Kafka Streams, ingestion-time refers to the time when a record was appended to a Kafka topic partition.

## 有状态的流处理

Some stream processing applications don’t require state, which means the processing of a message is independent from the processing of all other messages. If you only need to transform one message at a time, or filter out messages based on some condition, the topology defined in your stream processing application can be simple.

However, being able to maintain state opens up many possibilities for sophisticated(复杂) stream processing applications: you can join input streams, or group and aggregate data records. Many such stateful operators are provided by the Kafka Streams DSL.

## Streams和Tables的二元性

Before we discuss concepts such as aggregations in Kafka Streams we must first introduce tables, and most importantly the relationship between tables and streams: the so-called stream-table duality. Essentially, this duality means that a stream can be viewed as a table, and vice versa. Kafka’s log compaction feature, for example, exploits(功绩，利用，开发) this duality.

stream-table二元性描述了两者的紧密关系：

• Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise(假装), and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy(类比), aggregating data records in a stream – such as computing the total number of pageviews by user from a stream of pageview events – will return a table (here with the key and the value being the user and its corresponding pageview count, respectively).
• Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.
1. 将流作为表：一个流可以被认为是一张表的变更记录(changelog)，流中的每条数据记录捕获了表的每个变更状态。一个流可以假装是一张表，也可以通过从头到尾重放变更日志来重构表，很容易地变成真正的表。同样地，在流中聚合记录（比如从PV事件流中计算用户的PV数）会返回一张表（这里键值分别是用户，以及对应的PV数）。
2. 将表作为流：一张表可以认为是在某个时间点的一份快照，是流的每个key对应的最近的值。一张表因此也可以假装是一个流，也可以通过迭代表中所有的键值条目转换成一个真实的流。

Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions(修正) of the table – can be represented as a changelog stream (second column).

Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

The same mechanism(机制) is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance. The stream-table duality is such an important concept that Kafka Streams models it explicitly(明确地) via the KStream and KTable interfaces, which we describe in the next sections.

## KStream（记录流 record stream）

A KStream is an abstraction of a record stream, where each data record represents a self-contained datum(基准，资料) in the unbounded data set. Using the table analogy(类比), data records in a record stream are always interpreted as “inserts” – think: append-only ledger(分类) – because no record replaces an existing row with the same key. Examples are a credit card transaction, a page view event, or a server log entry.

If your stream processing application were to sum the values per user, it would return 4 for alice. Why? Because the second data record would not be considered an update of the previous record. Compare this behavior of KStream to KTable below, which would return 3 for alice.

## KTable（变更流 changelog stream）

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is considered to be an update of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered a create). Using the table analogy, a data record in a changelog stream is interpreted as an update because any existing row with the same key is overwritten.

If your stream processing application were to sum the values per user, it would return 3 for alice. Why? Because the second data record would be considered an update of the previous record. Compare this behavior of KTable with the illustration for KStream above, which would return 4 for alice.

Effects of Kafka’s log compaction: Another way of thinking about KStream and KTable is as follows: If you were to store a KTable into a Kafka topic, you’d probably want to enable Kafka’s log compaction feature, e.g. to save storage space.

However, it would not be safe to enable log compaction in the case of a KStream because, as soon as log compaction would begin purging older data records of the same key, it would break the semantics of the data. To pick up the illustration example again, you’d suddenly get a 3 for alice instead of a 4 because log compaction would have removed the (“alice”, 1) data record. Hence log compaction is perfectly safe for a KTable (changelog stream) but it is a mistake for a KStream (record stream).

We have already seen an example of a changelog stream in the section Duality of Streams and Tables. Another example are change data capture (CDC) records in the changelog of a relational database, representing which row in a database table was inserted, updated, or deleted.

KTable also provides an ability to look up current values of data records by keys. This table-lookup functionality is available through join operations (see also Joining Streams in the Developer Guide).

KTable也支持根据记录的key查询当前的value，这种特性会在join操作时使用。

## 窗口操作

A stream processor may need to divide data records into time buckets, i.e. to window the stream by time. This is usually needed for for join and aggregation operations, etc. Windowed stream buckets can be maintained in the processor’s local state.

Windowing operations are available in the Kafka Streams DSL, where users can specify a retention period for the window. This allows Kafka Streams to retain old window buckets for a period of time in order to wait for the late arrival of records whose timestamps fall within the window interval. If a record arrives after the retention period has passed, the record cannot be processed and is dropped.

Late-arriving records are always possible in real-time data streams. However, it depends on the effective time semantics how late records are handled. Using processing-time, the semantics are “when the data is being processed”, which means that the notion of late records is not applicable as, by definition, no record can be late. Hence, late-arriving records only really can be considered as such (i.e. as arriving “late”) for event-time or ingestion-time semantics. In both cases, Kafka Streams is able to properly handle late-arriving records.

## 联合操作

A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely.

The join operations available in the Kafka Streams DSL differ based on which kinds of streams are being joined (e.g. KStream-KStream join versus KStream-KTable join).

Kafka Streams DSL中的join操作跟流的类型有关，比如KStream-KStream进行join，或者KStream-KTable进行join。

## 聚合操作

An aggregation operation takes one input stream, and yields a new stream by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. An aggregation over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the aggregation may grow indefinitely.

In the Kafka Streams DSL, an input stream of an aggregation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the late arrival of further records after the value was produced and emitted. When such late arrival happens, the aggregating KStream or KTable simply emits a new aggregate value. Because the output is a KTable, the new value is considered to overwrite the old value with the same key in subsequent processing steps.

# 架构

Kafka Streams simplifies application development by building on the Kafka producer and consumer libraries and leveraging the native capabilities of Kafka to offer data parallelism, distributed coordination, fault tolerance, and operational simplicity. In this section, we describe how Kafka Streams works underneath the covers. Below is Logical view of a Kafka Streams application that contains multiple stream threads, each of which in turn containing multiple stream tasks.

Kafka Streams是构建在Kafka生产者和消费者的库，并且利用了Kafka本身的特性提供了数据的并行、分布式协调、容错，简化了应用程序的开发。下图是Kafka Streams应用程序的逻辑视图，包括了多个流线程，每个线程包括多个流任务。

## 拓扑

A processor topology or simply topology defines the stream processing computational logic for your application, i.e., how input data is transformed into output data. A topology is a graph of stream processors (nodes) that are connected by streams (edges). There are two special processors in the topology:

• Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forward them to its down-stream processors.
• Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.
1. 源算子：没有任何上游算子，它从一个或多个Kafka主题中消费记录，然后产生一个到拓扑的输入流，并且转发到下游的算子
2. 目标算子：没有任何的下游算子，它会把从上游算子接收到的任何记录，发送给指定的Kafka主题

A stream processing application – i.e., your application – may define one or more such topologies, though typically it defines only one. Developers can define topologies either via the low-level Processor API or via the Kafka Streams DSL, which builds on top of the former.

A processor topology is merely a logical abstraction for your stream processing code. At runtime, the logical topology is instantiated and replicated inside the application for parallel processing (see Parallelism Model).

## 并行模型

Kafka Streams uses the concepts of partitions and tasks as logical units of its parallelism model. There are close links between Kafka Streams and Kafka in the context of parallelism:

Kafka Streams使用分区和任务的概念作为它的并行模型的逻辑单元。Kafka Streams和Kafka在并行这个上下文上有紧密的联系：

• Each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition.
• A data record in the stream maps to a Kafka message from that topic.
• The keys of data records determine the partitioning of data in both Kafka and Kafka Streams, i.e., how data is routed to specific partitions within topics.
1. 每个分区流完全是一个有序的数据记录序列，映射到Kafka的主题分区
2. 流中的一条数据记录，对应了Kafka主题中的一条消息
3. 数据记录的Key决定了它在Kafka和Kafka Streams中的分区方式，比如数据怎么路由到主题的指定分区

An application’s processor topology is scaled by breaking it into multiple tasks. More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application. Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention.

Sub-topologies aka topology sub-graphs: If there are multiple processor topologies specified in a Kafka Streams application, each task will only instantiate one of the topologies for processing. In addition, a single processor topology may be decomposed(分离分解) into independent sub-topologies (sub-graphs) as long as sub-topologies are not connected by any streams in the topology; here, each task may instantiate only one such sub-topology for processing. This further scales out the computational workload to multiple tasks.

It is important to understand that Kafka Streams is not a resource manager, but a library that “runs” anywhere its stream processing application runs. Multiple instances of the application are executed either on the same machine, or spread across multiple machines and tasks can be distributed automatically by the library to those running application instances. The assignment of partitions to tasks never changes; if an application instance fails, all its assigned tasks will be restarted on other instances and continue to consume from the same stream partitions.

Kafka Streams allows the user to configure the number of threads that the library can use to parallelize processing within an application instance. Each thread can execute one or more tasks with their processor topologies independently.

Kafka Streams允许用户配置线程的数量，这样Kafka Streams库可以用来决定在一个应用程序实例中的处理并行粒度。每个线程可以执行一个或多个任务，它们的拓扑也都是独立的。

Starting more stream threads or more instances of the application merely amounts to replicating the topology and having it process a different subset of Kafka partitions, effectively parallelizing processing. It is worth noting that there is no shared state amongst the threads, so no inter-thread coordination is necessary. This makes it very simple to run topologies in parallel across the application instances and threads. The assignment of Kafka topic partitions amongst the various stream threads is transparently handled by Kafka Streams leveraging Kafka’s server-side coordination functionality.

As we described above, scaling your stream processing application with Kafka Streams is easy: you merely need to start additional instances of your application, and Kafka Streams takes care of distributing partitions amongst tasks that run in the application instances. You can start as many threads of the application as there are input Kafka topic partitions so that, across all running instances of an application, every thread (or rather, the tasks it runs) has at least one input partition to process.

### Example

To understand the parallelism model that Kafka Streams offers, let’s walk through an example.

Now imagine we want to scale out this application later on, perhaps because the data volume has increased significantly. We decide to start running the same application but with only a single thread on another, different machine. A new thread instance2-thread1 will be created, and input partitions will be re-assigned similar to:

When the re-assignment occurs, some partitions – and hence their corresponding tasks including any local state stores – will be “migrated” from the existing threads to the newly added threads (here: from instance1-thread1 on the first machine to instance2-thread1 on the second machine). As a result, Kafka Streams has effectively rebalanced the workload among instances of the application at the granularity of Kafka topic partitions.

What if we wanted to add even more instances of the same application? We can do so until a certain point, which is when the number of running instances is equal to the number of available input partitions to read from. At this point, before it would make sense to start further application instances, we would first need to increase the number of partitions for topics A and B; otherwise, we would over-provision the application, ending up with idle instances that are waiting for partitions to be assigned to them, which may never happen.

## 状态

Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations. Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing. These state stores can either be a RocksDB database, an in-memory hash map, or another convenient data structure. Kafka Streams offers fault-tolerance and automatic recovery for local state stores.

Kafka Streams提供了所谓的状态存储，可以被流处理应用程序用来存储和查询数据，这在实现有状态的操作时是一个非常重要的功能。Kafka Streams中的每个任务内置了一个或多个状态存储，并且可以在流处理时通过API的方式存储或者查询状态存储中的数据。这些状态存储可以是RocksDB数据库、内存的hash map、或者其他的数据结构。Kafka Streams为本地状态存储提供了容错和自动恢复机制。

## 容错

Kafka Streams builds on fault-tolerance capabilities integrated natively within Kafka. Kafka partitions are highly available and replicated; so when stream data is persisted to Kafka it is available even if the application fails and needs to re-process it. Tasks in Kafka Streams leverage the fault-tolerance capability offered by the Kafka consumer client to handle failures. If a task runs on a machine that fails, Kafka Streams automatically restarts the task in one of the remaining running instances of the application.

Kafka Streams的容错能力基于原生的Kafka，Kafka的分区是高可用和复制的；所以当流数据持久化到Kafka中，即使应用程序失败了或者需要重新处理，数据也还是可用的。Kafka Streams的任务利用了Kafka消费者客户端提供的容错机制来处理故障。如果运行在一台机器上的一个任务失败了，Kafka Streams会在剩余的应用程序实例选择一个自动重启任务。

In addition, Kafka Streams makes sure that the local state stores are robust to failures, too. It follows a similar approach as Apache Samza and, for each state store, maintains a replicated changelog Kafka topic in which it tracks any state updates. These changelog topics are partitioned as well so that each local state store instance, and hence the task accessing the store, has its own dedicated changelog topic partition. Log compaction is enabled on the changelog topics so that old data can be purged safely to prevent the topics from growing indefinitely. If tasks run on a machine that fails and are restarted on another machine, Kafka Streams guarantees to restore their associated state stores to the content before the failure by replaying the corresponding changelog topics prior to resuming the processing on the newly started tasks. As a result, failure handling is completely transparent to the end user.

Optimization: In order to minimize the time for the state restoration and hence the cost of task (re)initialization, users can configure their applications to have shadow copies of local states. When a task migration happens, Kafka Streams then attempts to assign a task to where a standby replica exists in order to minimize the task initialization cost. See setting num.standby.replicas at Optional configuration parameters in the Developer Guide.

## 流处理的保证

Kafka Streams currently supports at-least-once processing guarantees in the presence of failure. This means that if your stream processing application fails, no data records are lost and fail to be processed, but some data records may be re-read and therefore re-processed.

It depends on the specific use case whether at-least-once processing guarantees are acceptable or whether you may need exactly-once processing.

Kafka Streams目前支持在错误场景下至少一次的处理语义。这意味着如果你的流处理应用程序失败了，数据不会丢失，也不会被漏掉处理，但是有些数据可能会被重复读取，并被重复处理。根据不同的用例，用户自己决定是否可以接受至少处理一次的保证，还是需要正好一次的处理。

For many processing use cases, at-least-once processing turns out to be perfectly acceptable: Generally, as long as the effect of processing a data record is idempotent, it is safe for the same data record to be processed more than once. Also, some use cases can tolerate processing data records more than once even if the processing is not idempotent. For example, imagine you are counting hits by IP address to auto-generate blacklists that help with mitigating DDoS attacks against your infrastructure; here, some overcounting is tolerable because hits from malicious IP addresses involved(涉及) in an attack(攻击) will vastly(极大地) outnumber hits from benign(良性的) IP addresses anyway.

In general however, for non-idempotent operations such as counting, at-least-once processing guarantees may yield incorrect results. If a Kafka Streams application fails and restarts, it may double-count some data records that were processed shortly before the failure. We are planning to address this limitation and will support stronger guarantees and exactly-once processing semantics in a future release of Kafka Streams.

## 流控

Kafka Streams regulates(控制) the progress of streams by the timestamps of data records by attempting to synchronize all source streams in terms of time. By default, Kafka Streams will provide your application with event-time processing semantics. This is important especially when an application is processing multiple streams (i.e., Kafka topics) with a large amount of historical data. For example, a user may want to re-process past data in case the business logic of an application was changed significantly, e.g. to fix a bug in an analytics algorithm. Now it is easy to retrieve a large amount of past data from Kafka; however, without proper flow control, the processing of the data across topic partitions may become out-of-sync and produce incorrect results.

Kafka Streams通过数据记录的时间撮控制流的进度，它会尝试根据时间来同步所有数据源产生的流。默认Kafka Streams为应用程序提供事件时间的处理语义。对于应用程序处理多个具有大量历史数据的流这种场景是特别重要的。举例应用程序的业务逻辑变化很显著时，用户可能想要重新处理过去的数据，比如在一个分析型的算法中修复一个错误。现在，我们可以很容易地从Kafka中接收大量的历史数据，不过如果没有做恰当的流控，在Kafka主题分区之间的数据处理可能变得不同步，并且产生错误的结果。

As mentioned in the Concepts section, each data record in Kafka Streams is associated with a timestamp. Based on the timestamps of the records in its stream record buffer, stream tasks determine the next assigned partition to process among all its input streams. However, Kafka Streams does not reorder records within a single stream for processing since reordering would break the delivery semantics of Kafka and make it difficult to recover in the face of failure. This flow control is of course best-effort(尽最大努力) because it is not always possible to strictly enforce execution order across streams by record timestamp; in fact, in order to enforce strict execution ordering, one must either wait until the system has received all the records from all streams (which may be quite infeasible in practice) or inject additional information about timestamp boundaries or heuristic estimates(启发式的预估) such as MillWheel’s watermarks.

Kafka Streams中的每条数据记录都关联了一个时间撮。基于流记录缓冲区（stream record buffer）中每条记录的时间撮，流任务会在所有输入流中决定下一个需要处理的分配分区。不过Kafka Streams不会在处理一个单一的流时对记录重新排序，因为重新排序会破坏Kafka的消息传递语义，并且在故障发生时不容易恢复（消息的顺序）。这种流控当然会尽最大努力，因为在流中并不可能总是按照记录的时间撮严格限制执行的顺序。实际上，如果要实现严格的执行顺序，一个流要么需要等待，直到系统（流处理应用程序）从所有的流中接收到所有的记录（这在实际中是不可实行的），或者注入关于时间边界的额外信息，或者使用类似MillWheel的水位概念做一些启发式的预估。

## 背压

Kafka Streams does not use a backpressure mechanism because it does not need one. Using a depth-first processing strategy, each record consumed from Kafka will go through the whole processor (sub-)topology for processing and for (possibly) being written back to Kafka before the next record will be processed. As a result, no records are being buffered in-memory between two connected stream processors. Also, Kafka Streams leverages Kafka’s consumer client behind the scenes, which works with a pull-based messaging model that allows downstream processors to control the pace(步伐) at which incoming data records are being read.

Kafka Streams不适用背压机制，因为它并不需要。使用深入优先的处理策略，从Kafka中消费的每条记录在处理时会流经整个处理拓扑，并且有可能会在处理下一条记录之前回写到Kafka。结果就是：在两个链接的流处理算子中不会有记录被缓存在内存中。同时Kafka Streams利用了Kafka中基于消息拉取模型的消费者客户端，允许下游处理算子控制读取的输入数据的消费速度。

The same applies to the case of a processor topology that contains multiple independent sub-topologies, which will be processed independently from each other (cf. Parallelism Model). For example, the following code defines a topology with two independent sub-topologies:

Any data exchange between sub-topologies will happen through Kafka, i.e. there is no direct data exchange (in the example above, data would be exchanged through the topic “my-topic”). For this reason there is no need for a backpressure mechanism in this scenario, too.

# 开发者指南

## Kafka Streams配置

Kafka Streams的配置通过一个StreamsConfig实例完成。其中下面三个是必须要有的配置项：

1. application.id：流处理应用程序的编号，在Kafka集群中必须是唯一的
2. bootstrap.servers：建立和Kafka集群的初始连接
3. zookeeper.connect：管理Kafka主题的ZooKeeper

1. 作为默认的Kafka生产者、消费者的client.id前缀
2. 作为Kafka消费者的group.id，会用来协调工作
3. 作为状态目录（state.dir）的子目录名称
4. 作为内部Kafka主题名称的前缀

Ser-/Deserialization (key.serde, value.serde): Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, i.e.,:

• Whenever data is read from or written to a Kafka topic (e.g., via the KStreamBuilder#stream() and KStream#to() methods).
• Whenever data is read from or written to a state store.

1. 当从Kafka主题中读取数据，或者写入数据到Kafka主题（比如通过KStreamBuilder.stream()或者KStream.to()方法）
2. 当从状态存储中读取数据，或者写入数据到状态存储

Number of Standby Replicas (num.standby.replicas): This specifies the number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the specified number of replicas and keep them up to date as long as there are enough instances running. Standby replicas are used to minimize the latency of task failover. A task that was previously running on a failed instance is preferred(优先) to restart on an instance that has standby replicas so that the local state store restoration process from its changelog can be minimized. Details about how Kafka Streams makes use of the standby replicas to minimize the cost of resuming tasks on failover can be found in the State section.

Number of Stream Threads (num.stream.threads): This specifies the number of stream threads in an instance of the Kafka Streams application. The stream processing code runs in these threads. Details about Kafka Streams threading model can be found in section Threading Model.

Replication Factor of Internal Topics (replication.factor): This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.

State Directory (state.dir): Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting machine, whose name is the application id, directly under the state directory. The state stores associated with the application are created under this subdirectory.

Kafka Streams会在状态目录下持久化本地状态。每个应用程序在它的物理机的状态目录下都有一个子目录，名称是应用程序的编号。和应用程序关联的状态存储都会在这个子目录下创建。

Timestamp Extractor (timestamp.extractor): A timestamp extractor extracts a timestamp from an instance of ConsumerRecord. Timestamps are used to control the progress of streams.

The default extractor is ConsumerRecordTimestampExtractor. This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer client (introduced in Kafka 0.10.0.0, see KIP-32: Add timestamps to Kafka message). Depending on the setting of Kafka’s log.message.timestamp.type parameter, this extractor will provide you with:

• event-time processing semantics if log.message.timestamp.type is set to CreateTime aka “producer time” (which is the default). This represents the time when the Kafka producer sent the original message.
• ingestion-time processing semantics if log.message.timestamp.type is set to LogAppendTime aka “broker time”. This represents the time when the Kafka broker received the original message.

1. 设置类型为CreateTime，即事件时间的处理语义。也是Producer的时间，作为默认值。表示Kafka生产者发送原始消息的时间点
2. 设置类型为LogAppendTime，即摄取时间的处理语义。也是Broker的时间。表示Kafka Broker接收原始消息的时间点

Another built-in extractor is WallclockTimestampExtractor. This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock, which effectively means Streams will operate on the basis(基础) of the so-called processing-time of events.

You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of messages. Here is an example of a custom TimestampExtractor implementation:

You would then define the custom timestamp extractor in your Streams configuration as follows:

Partition Grouper (partition.grouper): A partition grouper is used to create a list of stream tasks given the partitions of source topics, where each created task is assigned with a group of source topic partitions. The default implementation provided by Kafka Streams is DefaultPartitionGrouper, which assigns each task with at most one partition for each of the source topic partitions; therefore, the generated number of tasks is equal to the largest number of partitions among the input topics. Usually an application does not need to customize the partition grouper.

Apart from Kafka Streams’ own configuration parameters (see previous sections) you can also specify parameters for the Kafka consumers and producers that are used internally, depending on the needs of your application. Similar to the Streams settings you define any such consumer and/or producer settings via StreamsConfig:

## 编写一个流处理应用程序

Any Java application that makes use of the Kafka Streams library is considered a Kafka Streams application. The computational logic of a Kafka Streams application is defined as a processor topology, which is a graph of stream processors (nodes) and streams (edges).

Currently Kafka Streams provides two sets of APIs to define the processor topology:

• A low-level Processor API that lets you add and connect processors as well as interact directly with state stores.
• A high-level Kafka Streams DSL that provides common data transformation operations in a functional programming style such as map and filter operations. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs.

1. 低级Processor API：允许你添加和连接Processor，以及和状态存储直接交互
2. 高级DSL：以函数式变成的风格提供通用的数据转换算子

Java7中的而关闭钩子使用方式如下：

### Processor API

As mentioned in the Concepts section, a stream processor is a node in the processor topology that represents a single processing step. With the Processor API users can define arbitrary stream processors that processes one received record at a time, and connect these processors with their associated state stores to compose the processor topology.

DEFINING A STREAM PROCESSOR

Users can define their customized stream processor by implementing the Processor interface, which provides two main API methods: process() and punctuate().

• process() is called on each of the received record.
• punctuate() is called periodically based on advanced record timestamps. For example, if processing-time is used as the record timestamp, then punctuate() will be triggered every specified period of time.
1. process()会在每个接收到的记录上调用
2. punctuate()会基于记录的时间撮被定时调用

The Processor interface also has an init() method, which is called by the Kafka Streams library during task construction phase. Processor instances should perform any required initialization in this method. The init() method passes in a ProcessorContext instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, its corresponding message offset, and further such information . This context instance can also be used to schedule the punctuation period (via ProcessorContext#schedule()) for punctuate(), to forward a new record as a key-value pair to the downstream processors (via ProcessorContext#forward()), and to commit the current processing progress (via ProcessorContext#commit()).

Processor接口也有一个init()方法，它会在Kafka Streams库初始化任务的阶段调用。Processor实例需要在该方法上执行一些必须的初始化工作。init()方法传递了一个ProcessorContext实例，为当前处理的记录提供元数据的访问接口，包括输入源的Kafka主题和分区，对应的消息偏移量，以及更多的一些信息。这个上下文对象也可以被用来调度punctuation()方法的间隔（通过schedule方法），将新的记录作为键值对转发到下游的处理算子（通过forward方法），或者提交当前的处理进度（通过commit方法）。

• In the init() method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.
• In the process() method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).
• In the punctuate() method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.
1. init()方法，调度punctuation每隔1000m个时间单元（这个时间单元通常是ms，这里表示每隔一秒调度一次punctuation），并且通过“Counts”名称获取本地的状态存储
2. process()方法，当接收到每条记录时，将字符串分成多个单词，并且将它们的次数更新到状态存储中
3. punctuate()方法，迭代本地状态存储，发送聚合次数给下游的处理算子，最后提交当前的流状态

DEFINING A STATE STORE 定义一个状态存储

Note that the WordCountProcessor defined above can not only access the currently received record in the process() method, but also maintain processing states to keep recently arrived records for stateful processing needs such as aggregations and joins. To take advantage of these states, users can define a state store by implementing the StateStore interface (the Kafka Streams library also has a few extended interfaces such as KeyValueStore); in practice, though, users usually do not need to customize such a state store from scratch but can simply use the Stores factory to define a state store by specifying whether it should be persistent, log-backed, etc.

CONNECTING PROCESSORS AND STORES 连接处理算子和状态

Now that we have defined the processor and the state stores, we can now construct the processor topology by connecting these processors and state stores together by using the TopologyBuilder instance. In addition, users can add source processors with the specified Kafka topics to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate output data streams out of the topology.

There are several steps in the above implementation to build the topology, and here is a quick walk through:

• A source processor node named “Source” is added to the topology using the addSource method, with one Kafka topic “source-topic” fed to it.
• A processor node named “Process” with the pre-defined WordCountProcessor logic is then added as the downstream processor of the “Source” node using the addProcessor method.
• A predefined persistent key-value state store countStore is created and associated to the “Process” node.
• A sink processor node is then added to complete the topology using the addSink method, taking the “Process” node as its upstream processor and writing to a separate “sink-topic” Kafka topic.
3. 创建了一个预定义的持久化键值存储，保存了countStore，并且关联了“Process”节点

In this defined topology, the “Process” stream processor node is considered a downstream processor of the “Source” node, and an upstream processor of the “Sink” node. As a result, whenever the “Source” node forward a newly fetched record from Kafka to its downstream “Process” node, WordCountProcessor#process() method is triggered to process the record and update the associated state store; and whenever context#forward() is called in the WordCountProcessor#punctuate() method, the aggregate key-value pair will be sent via the “Sink” processor node to the Kafka topic “sink-topic”. Note that in the WordCountProcessor implementation, users need to refer with the same store name “Counts” when accessing the key-value store; otherwise an exception will be thrown at runtime indicating that the state store cannot be found; also if the state store itself is not associated with the processor in the TopologyBuilder code, accessing it in the processor’s init() method will also throw an exception at runtime indicating the state store is not accessible from this processor.

With the defined processor topology, users can now start running a Kafka Streams application instance. Please read how to run a Kafka Streams application for details.

### Kafka Streams DSL

As mentioned in the Concepts section, a stream is an unbounded, continuously updating data set. With the Kafka Streams DSL users can define the processor topology by concatenating multiple transformation operations where each operation transforming one stream into other stream(s); the resulted topology then takes the input streams from source Kafka topics and generates the final output streams throughout its concatenated transformations. However, different streams may have different semantics in their data records:

• In some streams, each record represents a new immutable datum in their unbounded data set; we call these record streams.
• In other streams, each record represents a revision (or update) of their unbounded data set in chronological(按时间顺序) order; we call these changelog streams.
1. 在一些流中，每条记录代表了无界数据集中新的不可变数据，我们叫做记录流（record streams）
2. 在其他流中，每条记录代表了无界数据集中按照时间顺序排序的更新，我们叫做变更日志流（changelog streams）

Both of these two types of streams can be stored as Kafka topics. However, their computational semantics can be quite different. Take the example of an aggregation operation that counts the number of records for the given key. For record streams, each record is a keyed message from a Kafka topic (e.g., a page view stream keyed by user ids):

The counting operation for record streams is trivial to implement: you can maintain a local state store that tracks the latest count for each key, and, upon receiving a new record, update the corresponding key by incrementing its count by one.

For changelog streams, on the other hand, each record is an update of the unbounded data set (e.g., a changelog stream for a user profile table, where the user id serves as both the primary key for the table and as the record key for the stream; here, a new record represents a changed row of the table). In practice you would usually store such streams in Kafka topics where log compaction is enabled.

As a result the counting operation for changelog streams is no longer monotonically incrementing: you need to also decrement the counts when a delete update record is received on some given key as well. In addition, even for counting aggregations on an record stream, the resulting aggregate is no longer an record stream but a relation / table, which can then be represented as a changelog stream of updates on the table.

The counting operation of a user profile changelog stream is peculiar(罕见的) because it will generate, for a given key, a count of either 0 (meaning the key does not exist or does not exist anymore) or 1 (the key exists) only. Multiple records for the same key are considered duplicate, old information of the most recent record, and thus will not contribute to the count.

For changelog streams developers usually prefer counting a non-primary-key field. We use the example above just for the sake of illustration.

One of the key design principles of the Kafka Streams DSL is to understand and distinguish between record streams and changelog streams and to provide operators with the correct semantics for these two different types of streams. More concretely, in the Kafka Streams DSL we use the KStream interface to represent record streams, and we use a separate KTable interface to represent changelog streams. The Kafka Streams DSL is therefore the recommended API to implement a Kafka Streams application. Compared to the lower-level Processor API, its benefits are:

Kafka Streams DSL的一个主要设计原则是理解和区分记录流合变更日志流，并且为这两种类型流的操作提供正确的语义。更具体地来说，在Kafka Streams DSL中，我们使用KStream接口来表示记录流，使用KTable接口代表变更日志流。所以Kafka Streams DSL是用来在实现Kafka Streams应用程序时推荐的API，和低级Processor API比较，它有几个优点：

• More concise and expressive code, particularly when using Java 8+ with lambda expressions.
• Easier to implement stateful transformations such as joins and aggregations.
• Understands the semantic differences of record streams and changelog streams, so that transformations such as aggregations work as expected depending on which type of stream they operate against.
1. 更简洁、更具有表达力的代码，尤其是使用Java8的lambda表达式时
2. 可以很容易地实现一个有状态的操作，比如联合和聚合操作
3. 理解记录流和变更日志流的语义区别，这样转换操作（比如聚合）根据流的类型按照预期的方式工作

#### CREATING SOURCE STREAMS FROM KAFKA

Both KStream or KTable objects can be created as a source stream from one or more Kafka topics via KStreamBuilder, an extended class of TopologyBuilder used in the lower-level Processor API (for KTable you can only create the source stream from a single topic).

KStream和KTable对象都可以通过KStreamBuilder创建，作为从一个或多个Kafka主题的输入源流。KStreamBuilder是TopologyBuilder（低级的Processor API）的继承类。对于KTable，你只能从一个Kafka主题中创建一个输入源流。

Interface How to instantiate
KStream<K, V> KStreamBuilder#stream(…)
KTable<K, V> KStreamBuilder#table(…)

When creating an instance, you may override the default serdes for record keys (K) and record values (V) used for reading the data from Kafka topics (see Data types and serdes for more details); otherwise the default serdes specified through StreamsConfig will be used.

#### TRANSFORM A STREAM

KStream and KTable support a variety of transformation operations. Each of these operations can be translated into one or more connected processors into the underlying processor topology. Since KStream and KTable are strongly typed, all these transformation operations are defined as generics functions where users could specify the input and output data types.

KStream和KTable支持很多类型的转换操作。每种操作都可以翻译成底层处理拓扑的一个或多个相连起来的处理算子。由于KStream和KTable是强类型的，所有这些转换操作被定义为通用的函数，这样用户可以指定输入和输出的数据类型。

Some KStream transformations may generate one or more KStream objects (e.g., filter and map on KStream generate another KStream, while branch on KStream can generate multiple KStream) while some others may generate a KTable object (e.g., aggregation) interpreted(理解) as the changelog stream to the resulted relation. This allows Kafka Streams to continuously update the computed value upon arrivals of late records after it has already been produced to the downstream transformation operators. As for KTable, all its transformation operations can only generate another KTable (though the Kafka Streams DSL does provide a special function to convert a KTable representation into a KStream, which we will describe later). Nevertheless, all these transformation methods can be chained together to compose a complex processor topology.

We describe these transformation operations in the following subsections, categorizing them as two categories: stateless and stateful transformations.

#### STATELESS TRANSFORMATIONS

Stateless transformations include filter, filterNot, foreach, map, mapValues, selectKey, flatMap, flatMapValues, branch. Most of them can be applied to both KStream and KTable, where users usually pass a customized function to these functions as a parameter; e.g. a Predicate for filter, a KeyValueMapper for map, and so on. Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise they do not require a state store associated with the stream processor.

The function is applied to each record, and its result will trigger the creation a new record.

#### STATEFUL TRANSFORMATIONS

• joins (KStream/KTable): join, leftJoin, outerJoin
• aggregations (KStream): countByKey, reduceByKey, aggregateByKey
• aggregations (KTable): groupBy plus count, reduce, aggregate (via KGroupedTable)
• general transformations (KStream): process, transform, transformValues

Stateful transformations are transformations where the processing logic requires accessing an associated state for processing and producing outputs. For example, in join and aggregation operations, a windowing state is usually used to store all the records received so far within the defined window boundary. The operators can then access accumulated records in the store and compute based on them (see Windowing a Stream for details).

WordCount使用Java 7:

#### WINDOWING A STREAM

Windowing is a common prerequisite for stateful transformations which group records in a stream, for example, by their timestamps. A local state store is usually needed for a windowing operation to store recently received records based on the window interval, while old records in the store are purged after the specified window retention period. The retention time can be set via Windows#until().

Window name Behavior Short description
Tumbling time window（翻转窗口） Time-based Fixed-size, non-overlapping, gap-less windows
Hopping time window（跳跃时间窗口） Time-based Fixed-size, overlapping windows
Sliding time window（滑动时间窗口） Time-based Fixed-size, overlapping windows that work on differences between record timestamps

Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.

Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap – and in general they do – a data record may belong to more than one such windows.

Pay attention, that tumbling and hopping time windows are aligned to the epoch and that the lower window time interval bound is inclusive, while the upper bound is exclusive.

Aligned to the epoch means, that the first window starts at timestamp zero. For example, hopping windows with size of 5000ms and advance of 3000ms, have window boundaries [0;5000),[3000;8000),…— and not [1000;6000),[4000;9000),… or even something “random” like [1452;6452),[4452;9452),…, which might be the case if windows get initialized depending on system/application start-up time, introducing non-determinism.

Unlike non-windowed aggregates that we have seen previously, windowed aggregates return a windowed KTable whose key type is Windowed. This is to differentiate aggregate values with the same key from different windows. The corresponding window instance and the embedded key can be retrieved as Windowed#window() and Windowed#key(), respectively.

Sliding windows are actually quite different from hopping and tumbling windows. A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps. Pay attention, that in contrast to hopping and tumbling windows, lower and upper window time interval bounds are both inclusive. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the JoinWindows class.

#### JOINING STREAMS

Many stream processing applications can be coded as stream join operations. For example, applications backing an online shop might need to access multiple, updating database tables (e.g. sales prices, inventory, customer information) when processing a new record. These applications can be implemented such that they work on the tables’ changelog streams directly, i.e. without requiring to make a database query over the network for each record. In this example, the KTable concept in Kafka Streams would enable you to track the latest state (think: snapshot) of each table in a local key-value store, thus greatly reducing the processing latency as well as reducing the load of the upstream databases.

• Join a KStream with another KStream or KTable.
• Join a KTable with another KTable only.

• KStream-to-KStream Joins are always windowed joins, since otherwise the join result size might grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream’s records within the specified window interval to produce one result for each matching pair based on user-provided ValueJoiner. A new KStream instance representing the result stream of the join is returned from this operator.
• KTable-to-KTable Joins are join operations designed to be consistent(一致) with the ones in relational databases. Here, both changelog streams are materialized into local state stores to represent the latest snapshot of the their data table duals(双重). When a new record is received from one of the streams, it is joined with the other stream’s materialized state stores to produce one result for each matching pair based on user-provided ValueJoiner. A new KTable instance representing the result stream of the join, which is also a changelog stream of the represented table, is returned from this operator.
• KStream-to-KTable Joins allow you to perform table lookups against a changelog stream (KTable) upon receiving a new record from another record stream (KStream). An example use case would be to enrich(使丰富) a stream of user activities (KStream) with the latest user profile information (KTable). Only records received from the record stream will trigger the join and produce results via ValueJoiner, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store). A new KStream instance representing the result stream of the join is returned from this operator.
1. KStream-to-KStream Join操作总是针对窗口的Join，否则Join结果的大小会无限膨胀。从其中一个流接收到的新记录会和另外一个流的记录进行Join，后者的流会指定窗口间隔，最后会基于用户提供的ValueJoiner为每个匹配对产生一个结果。这里返回的结果是一个新的KStream实例，代表了Join操作的结果流。
2. KTable-to-KTable Join操作被设计为和关系型数据库类似的操作。两个变更日志流（KTable）都会被物化成本地状态存储，表示数据表的最近快照。当从其中的一个流接收到一条新记录，它会和另外一个流的物化状态存储进行join，并根据用户提供的ValueJoiner产生一个匹配的结果。返回的结果是新的KTable实例，代表Join操作的结果流，它也是一个变更日志流。
3. KStream-to-KTable Join操作允许你在记录流（KStream）上接收到一条新记录时，从一个变更日志流（KTable）上执行表（级别的记录）查询。比如对一个用户活动流（KStream）使用最近的用户个人信息（KTable）进行信息增强。只有从记录流接收的记录才会触发join操作，并通过ValueJoiner产生结果，反过来则不行（比如从变更日志流中接收的新记录只会被用来更新物化的状态存储，而不会和KStream记录流进行join）。返回的结果是一个新的KStream，代表了Join操作的结果流。

Join operands (INNER) JOIN OUTER JOIN LEFT JOIN Result
KStream-to-KStream Supported Supported Supported KStream
KTable-to-KTable Supported Supported Supported KTable
KStream-to-KTable N/A N/A Supported KStream

Kafka Streams的Join语义类似于关系型数据库的操作符：

• Inner join produces new joined records when the join operator finds some records with the same key in the other stream / materialized store.
• Outer join works like inner join if some records are found in the windowed stream / materialized store. The difference is that outer join still produces a record even when no records are found. It uses null as the value of missing record.
• Left join is like outer join except that for KStream-KStream join it is always driven by record arriving from the primary stream; while for KTable-KTable join it is driven by both streams to make the result consistent with the left join of databases while only permits missing records in the secondary stream. In a KStream-KTable left join, a KStream record will only join a KTable record if the KTable record arrived before the KStream record (and is in the KTable). Otherwise, the join result will be null
1. Inner join：当join操作符在两个流或物化存储中都找到相同key的记录，产生新的join记录。
2. Outer join：如果在窗口流或物化视图中找到记录，则和inner join类似。不同的是，outer join即使在没有找到记录也会输出一条记录，对于缺失的记录使用null作为value。
3. Left join：和outer join类似，不过对于KStream-KSteram的join（KStream left join KStream），它总是在主要流（A left join B，则A是主要的流）的记录到达时驱动的；对于KTable-KTable的join（KTable left join KTable），它是由两个流一起驱动，并且结果和left join左边的流是一致的，只允许右边流的记录缺失；对于KStream-KTable的left join（KStream left join KTable），一条KStream的记录只会和一条KTable的记录join，并且这条KTable的记录必须要在KStream的记录之前到达（当然必须在KTable中），否则join结果为null。

Since stream joins are performed over the keys of records, it is required that joining streams are co-partitioned by key, i.e., their corresponding Kafka topics must have the same number of partitions and partitioned on the same key so that records with the same keys are delivered to the same processing thread. This is validated by Kafka Streams library at runtime (we talked about the threading model and data parallelism with more details in the Architecture section).

Join示例，使用Java8：

Join示例，使用Java7：

#### APPLYING A CUSTOM PROCESSOR

Beyond the provided transformation operators, users can also specify any customized processing logic on their stream data via the KStream#process() method, which takes an implementation of the ProcessorSupplier interface as its parameter. This is essentially equivalent to the addProcessor() method in the Processor API.

The following example shows how to leverage, via the process() method, a custom processor that sends an email notification whenever a page view count reaches a predefined threshold.

#### WRITING STREAMS BACK TO KAFKA

Any streams may be (continuously) written back to a Kafka topic via KStream#to() and KTable#to().

Best practice: It is strongly recommended to manually create output topics ahead of time rather than relying on auto-creation of topics. First, auto-creation of topics may be disabled in your Kafka cluster. Second, auto-creation will always apply the default topic settings such as the replicaton factor, and these default settings might not be what you want for certain output topics (cf. auto.create.topics.enable=true in the Kafka broker configuration).

If your application needs to continue reading and processing the records after they have been written to a topic via to() above, one option is to construct a new stream that reads from the output topic:

Kafka Streams provides a convenience method called through() that is equivalent to the code above:

Kafka Streams还提供了一种方便的方式：调用through()方法和上面的代码是类似的：

Whenever data is read from or written to a Kafka topic, Streams must know the serdes to be used for the respective data records. By default the to() and through() methods use the default serdes defined in the Streams configuration. You can override these default serdes by passing explicit serdes to the to() and through() methods.

Besides writing the data back to Kafka, users can also apply a custom processor as mentioned above to write to any other external stores, for example, to materialize a data store, as stream sinks at the end of the processing.

## 运行流处理程序

A Java application that uses the Kafka Streams library can be run just like any other Java application – there is no special magic or requirement on the side of Kafka Streams.

It is important to understand that, when starting your application as described above, you are actually launching what Kafka Streams considers to be one instance of your application. More than one instance of your application may be running at a time, and in fact the common scenario is that there are indeed multiple instances of your application running in parallel. See Parallelism Model for further information.

### 水平扩展你的应用程序

Kafka Streams makes your stream processing applications elastic and scalable: you can add and remove processing capacity dynamically during the runtime of your application, and you can do so without any downtime or data loss. This means that, unlike other stream processing technologies, with Kafka Streams you do not have to completely stop your application, recompile/reconfigure, and then restart. This is great not just for intentionally(故意地) adding or removing processing capacity, but also for being resilient(有弹性的) in the face of failures (e.g. machine crashes, network outages) and for allowing maintenance work (e.g. rolling upgrades).

Kafka Streams会让你的流处理应用程序可伸缩和可扩展。你可以在应用程序的运行时动态地添加或删除流处理能力，并且不需要停机维护，也不会丢失数据。这意味着，和其他流处理技术不同，使用Kafka Streams你不需要完全地停止你的应用程序，重新编译，重新配置，然后重启。对于故意地添加或删除处理能力，或者失败时的弹性机制，以及维护工作都是有好处的。

If you are wondering how this elasticity is actually achieved behind the scenes, you may want to read the Architecture chapter, notably the Parallelism Model section. In a nutshell(概括), Kafka Streams leverages existing functionality in Kafka, notably its group management functionality. This group management, which is built right into the Kafka wire protocol, is the foundation that enables the elasticity of Kafka Streams applications: members of a group will coordinate and collaborate(合作) jointly(共同地) on the consumption and processing of data in Kafka. On top of this foundation Kafka Streams provides some additional functionality, e.g. to enable stateful processing and to allow for fault-tolerante state in environment where application instances may come and go at any time.

### 增加处理能力（Expand）

If you need more processing capacity for your stream processing application, you can simply start another instance of your stream processing application, e.g. on another machine, in order to scale out. The instances of your application will become aware of each other and automatically begin to share the processing work. More specifically, what will be handed over from the existing instances to the new instances is (some of) the stream tasks that have been run by the existing instances. Moving stream tasks from one instance to another results in moving the processing work plus any internal state of these stream tasks (the state of a stream task will be re-created in the target instance by restoring the state from its corresponding changelog topic).

The various instances of your application each run in their own JVM process, which means that each instance can leverage all the processing capacity that is available to their respective JVM process (minus the capacity that any non-Kafka-Streams part of your application may be using). This explains why running additional instances will grant your application additional processing capacity. The exact capacity you will be adding by running a new instance depends of course on the environment in which the new instance runs: available CPU cores, available main memory and Java heap space, local storage, network bandwidth, and so on. Similarly, if you stop any of the running instances of your application, then you are removing and freeing up the respective processing capacity.

Before adding capacity: only a single instance of your Kafka Streams application is running. At this point the corresponding Kafka consumer group of your application contains only a single member (this instance). All data is being read and processed by this single instance.

After adding capacity: now two additional instances of your Kafka Streams application are running, and they have automatically joined the application’s Kafka consumer group for a total of three current members. These three instances are automatically splitting the processing work between each other. The splitting is based on the Kafka topic partitions from which data is being read.

### 减少处理能力（Shrink）

If you need less processing capacity for your stream processing application, you can simply stop one or more running instances of your stream processing application, e.g. shut down 2 of 4 running instances. The remaining instances of your application will become aware that other instances were stopped and automatically take over the processing work of the stopped instances. More specifically, what will be handed over from the stopped instances to the remaining instances is the stream tasks that were run by the stopped instances. Moving stream tasks from one instance to another results in moving the processing work plus any internal state of these stream tasks (the state of a stream task will be re-created in the target instance by restoring the state from its corresponding changelog topic).

If one of the application instances is stopped (e.g. intentional reduction of capacity, maintenance, machine failure), it will automatically leave the application’s consumer group, which causes the remaining instances to automatically take over the stopped instance’s processing work.

### 运行多少个应用程序实例

How many instances can or should you run for your application? Is there an upper limit for the number of instances and, similarly, for the parallelism of your application? In a nutshell, the parallelism of a Kafka Streams application – similar to the parallelism of Kafka – is primarily determined by the number of partitions of the input topic(s) from which your application is reading. For example, if your application reads from a single topic that has 10 partitions, then you can run up to 10 instances of your applications (note that you can run further instances but these will be idle).

The number of topic partitions is the upper limit for the parallelism of your Kafka Streams application and thus for the number of running instances of your application.

How to achieve(获得) a balanced processing workload across application instances to prevent processing hotpots: The balance of the processing work between application instances depends on factors such as how well data messages are balanced between partitions (think: if you have 2 topic partitions, having 1 million messages in each partition is better than having 2 million messages in the first partition and no messages in the second) and how much processing capacity is required to process the messages (think: if the time to process messages varies heavily, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition).

If your data happens to be heavily skewed(倾斜) in the way described above, some application instances may become processing hotspots (say, when most messages would end up being stored in only 1 of 10 partitions, then the application instance that is processing this one partition would be performing most of the work while other instances might be idle). You can minimize the likelihood of such hotspots by ensuring better data balancing across partitions (i.e. minimizing data skew at the point in time when data is being written to the input topics in Kafka) and by over-partitioning the input topics (think: use 50 or 100 partitions instead of just 10), which lowers the probability that a small subset of partitions will end up storing most of the topic’s data.

## 应用重置工具

The Application Reset Tool allows you to quickly reset an application in order to reprocess its data from scratch – think: an application “reset” button. Scenarios when would you like to reset an application include:

• Development and testing 开发和测试时
• Addressing bugs in production 在生产环境定位问题时
• Demos 演示

However, resetting an application manually is a bit involved. Kafka Streams is designed to hide many details about operator state, fault-tolerance, and internal topic management from the user (especially when using Kafka Streams DSL). In general this hiding of details is very desirable(值得要的，令人满意的) but it also makes manually resetting an application more difficult. The Application Reset Tool fills this gap and allows you to quickly reset an application.

1. 对输入主题：重置应用程序所有分区的消费者提交偏移量到0
2. 对临时主题：跳到主题的最后，比如设置应用程序的消费者提交偏移量到每个分区的logSize（实际上就是分区的最后位置）
3. 对内部主题：除了重置偏移量到0，还要删除内部主题

1. 不会重置应用程序的输出主题。如果任何的输出主题或者临时主题被下游的应用程序消费，那么调整这些下游应用程序是你自己的责任
2. 不会重置应用程序实例的本地环境。同样删除应用程序实例运行所在机器的本地状态，也是你自己的责任。

You can combine the parameters of the script as needed. For example, if an application should only restart from an empty internal state but not reprocess previous data, simply omit the parameters –input-topics and –intermediate-topics.

On intermediate topics: In general, we recommend to manually delete and re-create any intermediate topics before running the application reset tool. This allows to free disk space in Kafka brokers early on. It is important to first delete and re-create intermediate topics before running the application reset tool.

Not deleting intermediate topics and only using the application reset tool is preferable:

• when there are external downstream consumers for the application’s intermediate topics
• during development, where manually deleting and re-creating intermediate topics might be cumbersome and often unnecessary

1. 存在外部的下游消费者订阅了应用程序的临时主题
2. 在开发环境，手动删除并重建主题可能很繁琐，而且通常没有必要这么做

Running the application reset tool (step 1) ensures that your application’s state – as tracked globally in the application’s configured Kafka cluster – is reset. However, by design the reset tool does not modify or reset the local environment of your application instances, which includes the application’s local state directory.

For a complete application reset you must also delete the application’s local state directory on any machines on which an application instance was run prior to restarting an application instance on the same machines. You can either use the API method KafkaStreams#cleanUp() in your application code or manually delete the corresponding local state directory (default location: /var/lib/kafka-streams/<application.id>, cf. state.dir configuration parameter).

Let’s imagine you are developing and testing an application locally and want to iteratively improve your application via run-reset-modify cycles. You might have code such as the following:

Calling cleanUp() is safe but do so judiciously(明智的): It is always safe to call KafkaStreams#cleanUp() because the local state of an application instance can be recovered from the underlying internal changelog topic(s). However, to avoid the corresponding recovery overhead it is recommended to not call cleanUp() unconditionally and every time an application instance is restarted/resumed. A production application would therefore use e.g. command line arguments to enable/disable the cleanUp() call as needed.

EOF. 翻译完毕 @2016.11.5

## 重置流处理应用程序

### 示例

As a running example we assume you have the following Kafka Streams application, and we subsequently demonstrate (1) how to make this application “reset ready” and (2) how to perform an actual application reset.

This application reads data from the input topic “my-input-topic”, then selects a new record key, and then writes the result into the intermediate topic “rekeyed-topic” for the purpose of data re-partitioning. Subsequently, the re-partitioned data is aggregated by a count operator, and the final result is written to the output topic “my-output-topic”. Note that in this blog post we don’t put the focus on what this topology is actually doing — the point is to have a running example of a typical topology that has input topics, intermediate topics, and output topics.

#### Step 1: Prepare your application for resets

The first step is to make your application “reset ready”. For this, the only thing you need to do is to include a call to KafkaStreams#cleanUp() in your application code (for details about cleanUp() see Section ”Application Reset Tool Details”).

Calling cleanUp() is required because resetting a Streams applications consists of two parts: global reset and local reset. The global reset is covered by the new application reset tool (see “Step 2”), and the local reset is performed through the Kafka Streams API. Because it is a local reset, it must be performed locally for each instance of your application. Thus, embedding it in your application code is the most convenient way for a developer to perform a local reset of an application (instance).

At this point the application is ready for being reset (when needed), and you’d start one or multiple instances of your application as usual, possibly on different hosts.

#### Step 2: Reset the application

So what would we need to do to restart this application from scratch, i.e., not resume the processing from the point the application was stopped before, but rather to reprocess all its input data again?

First you must stop all running application instances and make sure the whole consumer group is not active anymore (you can use bin/kafka-consumer-groups to list active consumer groups). Typically, the consumer group should become inactive one minute after you stopped all the application instances. This is important because the reset behavior is undefined if you use the reset tool while some application instances are still running — the running instances might produce wrong results or even crash.

Once all application instances are stopped you can call the application reset tool as follows:

As you can see, you only need to provide the application ID (“my-streams-app”) as specified in your application configuration, and the names of all input and intermediate topics. Furthermore, you might need to specify Kafka connection information (bootstrap servers) and Zookeeper connection information. Both parameters have default values localhost:9092 and localhost:2181, respectively. Those are convenient to use during development with a local single Zookeeper/single broker setup. For production/remote usage you need to provide appropriate host:port values.

Once the application reset tool has completed its run (and its internal consumer is not active anymore), you can restart your application as usual, and it will now reprocess its input data from scratch again. We’re done!

It’s important to highlight that, to prevent possible collateral(并行，附属) damage, the application reset tool does not reset the output topics of an application. If any output (or intermediate) topics are consumed by downstream applications, it is your responsibility to adjust those downstream applications as appropriate when you reset the upstream application.

Use application reset tool with care and double-check its parameters: If you provide wrong parameter values (e.g. typos in application.id) or specify parameters inconsistently (e.g. specifying the wrong input topics for the application), this tool might invalidate the application’s state or even impact other applications, consumer groups, or Kafka topics of your Kafka cluster.

As we have shown above, using the new application reset tool you can easily reprocess data from scratch with Kafka Streams. Perhaps somewhat surprisingly, there’s actually a lot going on behind the scenes to make application resets work as easily. The following sections are a deep dive on these internals for the curious reader.

### Behind the Scenes of Kafka Streams

In this second part of the blog post we discuss those Kafka Streams internals that are required to understand the details of a proper application reset. Figure 1 shows a Kafka Streams application before its first run. The topology has as single input topic with two partitions. The current offset of each partition is zero (or there is no committed offsets and parameter auto.offset.reset = earliest is used). Also the topology writes into a single output topic with two partitions which are both empty. No offsets are shown as output topics are not consumed by the application itself. Furthermore, the topology contains a call to through(), thus, it writes/reads into/from an additional (intermediate) topic with two empty partitions and it contains a stateful operator.

After the Kafka Streams application was executed and stopped, the application state changed as shown in Figure 2. In the following we discuss the relevant parts of Figure 2 with regard to reprocessing.

Figure 2: The application after it was stopped. The small vertical arrows denote committed consumer offsets for the input and intermediate topics (colors denote the corresponding sub-topology). You can see, for example, that sub-topology A has so far written more data to the intermediate topic than sub-topology B has been able to consume (e.g. the last message written to partition 1 has offset 7, but B has only consumed messages up to offset 4). Also, sub-topology performs stateful operations and thus has created a local state store and an accompanying(伴随) internal changelog topic for this state stores.

#### reprocessing Input Topics

Kafka Streams builds upon existing Kafka functionality to provide scalability and elasticity, security, fault-tolerance, and more. For reprocessing input topics from scratch, one important concern is Kafka Streams’ fault-tolerance mechanism. If an application is stopped and restarted, per default it does not reread previously processed data again, but it resumes processing where it left off when it was stopped (cf. committed input topic offsets in Figure 2).
Internally, Kafka Streams leverages Kafka’s consumer client to read input topics and to commit offsets of processed messages in regular intervals (see commit.interval.ms). Thus, on restart an application does not reprocess the data from its previous run. In case that a topic was already processed completely, the application will start up but then be idle and wait until new data is available for processing. So one of the steps we need to take care of when manually resetting an application is to ensure that input topics are reread from scratch. However, this step alone is not sufficient to get a correct reprocessing result.

Kafka Streams构建在Kafka已有的功能之上来提供扩展性、伸缩性、安全性、故障容错等等。对于从头开始重新处理输入主题，一个重要的关注点是Kafka Streams的故障容错机制。如果一个应用程序停止并重启，默认情况下它不会重新读取已经处理过的数据，而是从上一次停止时离开的地方恢复处理（图2中为输入主题提交偏移量）。内部实现中，Kafka Streams利用了Kafka的消费者客户端读取输入主题，然后定期地为处理完的消息提交偏移量。因此应用程序重启的时候，不会重新处理上一次运行过的数据。如果有一种场景是：一个主题已经都被处理完成了，应用程序启动之后会空转，直到有新数据可用时才（应用程序才）会处理。所以当手动重置一个应用程序时，要非常小心的一个步骤是：确保输入主题从头开始重新读取。不过，单独这个步骤并不足以得到一个正确的重新处理的结果。

#### Sub-Topologies and Internal State in Kafka Streams

First, an application can consist of multiple sub-topologies that are connected via intermediate or internal topics (cf. Figure 2 with two sub-topologies A and B). In Kafka Streams, intermediate topics are user-specified topics that are used as both an input and an output topic within a single application (e.g., a topic that is used in a call to through()). Internal topics are those topics that are created by Kafka Streams “under the hood” (e.g., internal repartitioning topics which are basically internal intermediate topics).

If there are multiple sub-topologies, it might happen that an upstream sub-topology produces records faster into intermediate topics than a downstream sub-topology can consume (cf. committed intermediate topic offsets in Figure 2). Such a consumption delta (cf. the notion of consumer lag in Kafka) within an application would cause problems in the context of an application reset because, after an application restart, the downstream sub-topology would resume reading from intermediate topics from the point where it stopped before the restart. While this behavior is very much desired during normal operations of your application, it would lead to data inconsistencies when resetting an application. For a proper application reset we must therefore tell the application to skip to the very end of any intermediate topics.

Second, for any stateful operation like aggregations or joins, the internal state of these operations is written to a local state store that is backed by an internal changelog topic (cf. sub-topology B in Figure 2). On application restart, Kafka Streams “detects” these changelog topics and any existing local state data, and it ensures that the internal state is fully built up and ready before the actual processing starts. To reset an application we must therefore also reset the application’s internal state, which means we must delete all its local state stores and their corresponding internal changelog topics.

If you are interested in more details than we could cover in this blog post, please take a look at Kafka Streams: Internal Data Management in the Apache Kafka wiki.

#### Resetting a Kafka Streams Application Manually

In order to reprocess topics from scratch, it is required to reset the application state that consists of multiple parts as described above:

1. committed offsets of input topics 输入主题的提交偏移量
2. committed offsets of intermediate topics 临时主题的提交偏移量
3. content and committed offsets of internal topics 内容，以及内部主题的提交偏移量
4. local state store 本地状态存储

Figure 3: The application after reset: (1) input topic offsets were reset to zero (2) intermediate topic offsets were advanced to end (3) internal topics were deleted (4) any local state stores were deleted (5) the output topic was not modified.

Committed offsets of input topics: Internally, Kafka Streams leverages Kafka’s consumer client to read a topic and to commit offsets of processed messages in regular intervals (see commit.interval.ms). Thus, as a first step to reprocess data, the committed offsets need to be reset. This can be accomplished as follows: Write a special Kafka client application (e.g., leveraging Kafka’s Java consumer client or any other available language) that uses the application.id of your Kafka Streams application as its consumer group ID. The only thing this special client application does is to seek to offset zero for all partitions of all input topics and commit the offset (you should disable auto commit for this application). As this special application uses the same group ID as your Kafka Streams application (the application ID is used a consumer group ID internally), committing all offsets to zero allows your Streams application to consume its input topics from scratch when it is started again (cf. #1 in Figure 3).

Intermediate topics: For intermediate topics we must ensure to not consume any data from previous application runs. The simplest and recommended way is to delete and recreate those intermediate topics (recall that it is recommended to create user-specified topics manually before you run a Kafka Streams application). Additionally, it is required to reset the offsets to zero for the recreated topics (same as for input topics). Resetting the offsets is important because the application would otherwise pick up those invalid offsets on restart.

As an alternative(供替代的选择), it is also possible to only modify the committed offsets for intermediate topics. You should consider this less invasive(侵入性) approach when there are other consumers for intermediate topics and thus deleting the topics is not possible. However, in contrast to modifying the offsets of input topics or deleted intermediate topic, the offsets for kept intermediate topics must be set to the largest value (i.e., to the current log-size) instead of zero, thus skipping over any not-yet consumed data. This ensure that, on restart, only data from the new run will be consumed by the application (cf. #2 in Figure 3). This alternative approach is used by the application reset tool.

1. 删除临时主题，重建临时主题，设置提交偏移量=0，提交偏移量
2. 修改临时主题的提交偏移量到最大

Internal topics: Internal topics can simply be deleted (cf. #3 in Figure 3). As those are created automatically by Kafka Streams, the library can recreate them in the reprocessing case. Similar to deleting intermediate user topics, make sure that committed offsets are either deleted or set to zero.

In order to delete those topics, you need to identify them. Kafka Streams creates two types of internal topics (repartitioning and state-backup) and uses the following naming convention (this naming convention could change in future releases however, which is one of the reasons we recommend the use of the application reset tool rather than manually resetting your applications):

• --repartition 应用程序编号-操作符名称-repartition
• --changelog 应用程序编号-操作符名称-changelog

Local State Stores: Similar to internal topics, local state stores can just be deleted (cf. #4 in Figure 3). They will be recreated automatically by Kafka Streams. All state of an application (instance) is stored in the application’s state directory (cf. parameter state.dir, with default value /var/lib/kafka-streams in Confluent Platform releases and /tmp/kafka-streams for Apache Kafka releases). Within the state store directory, each application has its own directory tree within a sub-folder that is named after the application ID. The simplest way to delete the state store for an application is therefore rm -rf <state.dir>/<application.id> (e.g., rm -rf /var/lib/kafka-streams/my-streams-app).

After this discussion, we see that manually resetting a Stream application is cumbersome(累赘) and error-prone. Thus, we developed the new “Application Reset Tool” to simplify this process.

### Application Reset Tool Details

In more detail, the application reset tool (bin/kafka-streams-application-reset) performs the following actions (cf. Figure 3):

1. for any specified input topic, it resets all offsets to zero 对任何指定的输入主题，重置所有的偏移量到0
2. for any specified intermediate topic, seeks to the end for all partitions 对任何指定的临时主题，跳到所有分区的末尾
3. for all internal topic 对所有的内部主题
3.1 resets all offsets to zero 重置所有的偏移量到0
3.2 deletes the topic 删除（内部）主题

To use the script, as a minimum you need to specify the application ID. For this case, only internal topics will be deleted. Additionally, you can specify input topics and/or intermediate topics. More details about resetting a Streams application, can be found in the Confluent documentation.

Pay attention, that the application reset tool only covers the “global reset” part. Additionally, to the global reset, for each application instance a local reset of the application state directory is required, too. (cf. #4 in Figure 3) This can be done directly within your application using the method KafkaStreams#cleanUp(). Calling cleanUp() is only valid as long as the application instance is not running (i.e., before start() or after close()).

Because resetting the local state store is embedded in your code, there is no additional work to do for local reset — local reset is included in restarting an application instance. For global reset, a single run of the application reset tool is sufficient.

#### 重置时指定了新的应用程序编号会发生什么？

Before we close we want to discuss what happens when you configure your Kafka Streams application to use a new application ID. Until now this has been a common workaround(变通方法) for resetting an application manually.

On the positive side, renaming the application ID does cause your application to reprocess its input data from scratch. Why? When a new application ID is used, the application does not have any committed offsets, internal topics, or local state associated with itself, because all of those use the application ID in some way to get linked to a Kafka Streams application. Of course, you also need to delete and recreate all intermediate user topics.

So why all the fuss(小题大作) about resetting a Kafka Streams application if we could use this workaround?

First, resetting an application is more than just enabling it to reprocess its input data. An important part of the reset is to also clean-up all internal data that is created by a running application in the background. For example, all the internal topics (that are no longer used) consume storage space in your Kafka cluster if nobody deletes them. Second, the same clean-up must be performed for data written to the local state directories. If not explicitly deleted, disk storage is wasted on those machines that hosted an application instance. Last but not least there is all kind of metadata like topic names, consumer groups, committed offsets that are not used any more and linger around(游荡，苟延残喘). For those reasons, using a new application ID is considered nothing more than a crude workaround to reset a Kafka Streams application, and we wouldn’t recommend its use for production scenarios.

One more hint at the end: if you do not use a Kafka Streams application anymore (e.g., it gets replaced by a new version of the application or is just not longer needed), we recommend to run the reset tool once to clean-up all the left-overs(剩余) of the retired application.

## 动机

A common use case for Kafka is real-time processes that transform data from input topics to output topics. Today there are a couple of options available for users to process such data:

1.使用Kafka的生产者和消费者API，自己定义处理逻辑，比如

Both of those approaches have some downsides. Downsides of using the first option are that the producer and consumer APIs used for writing transformations are somewhat low level; simple examples are relatively simple, but any kind of more complex transformation tends to be a bit complicated. The opportunities(因素，机会) for a richer client to add value beyond what the producer and consumer do would be:

1. Manage multi-threading and parallelism within a process. 在一个进程中管理多线程和并行
2. Manage partitioning assignment to processes / threads. 管理分区如何分配给进程或现场
3. Manage journaled local state storage. 管理本地状态存储（文件存储系统）
4. Manage offset commits and “exactly-once” guarantees as appropriate features are added in Kafka to support this. 管理偏移量的提交和正好一次的保证

The second option, i.e. using a full stream processing framework can be a good solution but a couple of things tend to(趋向，易于) make it a bit heavy-weight (a brief and still-going survey can be found here):

1. These frameworks are poorly integrated with Kafka (different concepts, configuration, monitoring, terminology). For example, these frameworks only use Kafka as its stream data source / sink of the whole processing topology, while using their own in-memory format for storing intermediate data (RDD, Bolt memory map, etc). If users want to persist these intermediate results to Kafka as well, they need to break their processing into multiple topologies that need to be deployed separately, increasing operation and management costs.
2. These frameworks either duplicate or force the adoption(采用) of a packaging, deployment, and clustering solution. For example, in Storm you need to run a Storm cluster which is a separate thing that has to be monitored and operated. In an elastic environment like AWS, Mesos, YARN, etc this is sort of silly since then you have a Storm cluster inside the YARN cluster vs just directly running the jobs in Mesos; similarly Samza is tied up with YARN.
3. These frameworks can’t be integrated with existing services or applications. For example, you can’t just embed a light transformation library inside an existing app, but rather the entire framework that runs as a service.

4. 这些框架与Kafka的集成很贫乏（不同的概念，配置，监控，术语）。比如这些框架都只会使用Kafka作为它们的整个处理拓扑中的流数据源或目标，但同时也会使用它们自己的内存格式来存储内部数据（RDD，Bolt内存字典）。如果用户想要持久化这些临时结果到Kafka中，他们需要将流处理分成多个部署独立的拓扑，而这显然增加了操作和维护的成本。

5. 这些框架针对打包、部署、集群的方案会有重复或强制采用。比如在Storm中你需要运行一个独立的Storm集群，并需要监控和管理这个集群。
6. 这些框架不能和已有的服务或应用程序继承。比如你不能简单地将一个轻量级的转换客户端库嵌入到已有的程序中，而是让整个框架作为一个服务运行。

## Processor客户端提议

We want to propose another standalone “processor” client besides the existing producer and consumer clients for processing data consumed from Kafka and storing results back to Kafka.

Data Processing 数据处理

A processor computes on a stream of messages, with each message composed as a key-value pair.
Processor receives one message at a time and does not have access to the whole data set at once.

1. Per-message processing: this is the basic function that can be triggered once a new message has arrived from the stream.
2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.

1. 每条消息处理：这是一个最基本的功能，当一条心的消息从流中到达时，应该触发一次处理逻辑
2. 时间触发处理：当一个指定的时间间隔过去后，这个函数应该被触发。比如，它可以用在窗口计算

Compossible Processing 共存处理

Multiple processors should be able to chained up to form a DAG (i.e. the processor topology) for complex processing logic.
Users can define such processor topology in a exploring REPL manner: make an initial topology, deploy and run, check the results and intermediate values, and pause the job and edit the topology on-the-fly.

Local State Storage 本地状态存储

Users can create state storage inside a processor that can be accessed locally.
For example, a processor may retain a (usually most recent) subset of data for a join, aggregation / non-monolithic operations.

## 架构设计

### 分区分布

As shown in the digram above, each KStream process could have multiple threads (#.threads configurable in the properties), with each thread having a separate consumer and producer. So the first question is how can we distribute the partitions of the subscribed topics in the source processor among all the processes / threads. There are a couple of common cases for partition management in KStream:

1. Co-partitioning: for windowed-joins.
2. Sticky partitioning: for stateful processing, users may want to have a static mapping from stream partitions to process threads.
3. N-way partitioning: when we have stand-by processor instances, users may want to assign a single stream partition to multiple process threads.

4. 协调分区，针对窗口的join

5. 粘性分区：对于有状态的操作，从流分区到处理线程，用户可能会用静态的映射方式
6. 多路分区：当我们有备用的Processor实例，用户可能想要将一个流应用程序分配到多个处理线程上

These use cases would require more flexible assignments than today’s server-side strategies, so we need to extend the consumer coordinator protocol in the way that:

1. Consumers send JoinGroup with their subscribed topics, and receive the JoinGroup responses with the list of members in the group and the list of topic-partitions.
2. All consumers will get the same lists, and they can execute the same deterministic partition assignment algorithm to get their assigned topic-partitions.

1. 消费者发送带有订阅主题的JoinGroup，接收到带有消费组成员的JoinGroup响应，以及主题分区列表
2. 所有消费者接收到相同的列表，执行相同的分区分配算法，来得到属于它们自己的主题分区

With this new assignment protocol (details of this change can be found here), we distribute the partitions among worker thread as the following:

0.Upon starting the KStream process, user-specified number of KStream threads will be created. There is no shared variables between threads and no synchronization barriers as well hence these threads will execute completely asynchronously. Hence we will describe the behavior of a single thread in all the following steps.

1.Thread constructs the user-specified processor topology without initializing it just in order to extract the list of subscribed topics from the topology.

2.Thread uses its consumer’s partitionsFor() to fetch the metadata for each of the subscribed topics to get a information of topic -> #.partitions.

3.Thread now triggers consumer’s subscribe() with the subscribed topics, which will then applies the new rebalance protocol. The join-group request will be instantiated as follows (for example):

And the assignor interface is implemented as follows: 分配分区的接口如下：

The interface of the grouping function is the following, it is very similar to the assign() interface above, with the only difference that it does not have the consumer-lists.

So after the rebalance completes, each partition-group will be assigned as a whole to the consumers, i.e. no partitions belonging to the same group will be assigned to different consumers. The default grouping function maps partitions with the same id across topics to into a group (i.e. co-partitioning).

4.Upon getting the partition-groups, thread creates one task for each partition-group. And for each task, constructs the processor topology AND initializes the topology with the task context.

a. Initialization process will trigger Processor.init() on each processor in the topology following topology's DAG order.
b. All user-specified local states will also be created during the initialization process (we will talk about this later in the later sections).
c. Creates the record queue for each one of the task's associated partition-group's partitions, so that when consumers fetches new messages, it will put them into the corresponding queue.

a.初始化过程会在拓扑的每个Processor上调用rocessor.init()方法，每个Processor按照拓扑的DAG顺序依次初始化
b.所有用户指定的本地状态也会在初始化过程中被创建
c.为每个任务关联的分区分组的每个分区创建一个记录队列（每个分区都有一个对了），这样当消费者拉取新消息时，可以将它们放到对应的队列中


Hence all tasks’ topologies have the same “skeleton” but different processor / state instantiated objects; in addition, partitions are also synchronized at the tasks basis (we will talk about partition-group synchronization in the next section).

5.When rebalance is triggered, the consumers will read its last persisted partition assignment from Kafka and checks if the following are true when comparing with the new assignment result:

a. Existing partitions are still assigned to the same partition-groups.
b. New partitions are assigned to the existing partition-groups instead of creating new groups.
c. Partition groups are assigned to the specific consumers instead of randomly / round-robin.

a.已经存在的分区仍然分配给相同的分区组
b.新的分区分配给已经存在的分区组，而不是创建新的组
c.分区组分配给指定的消费者，而不是采用随机或者轮询方式


For a), since the partition-group’s associated task-id is used as the corresponding change-log partition id, if a partition gets migrated from one group to another during the rebalance, its state will be no longer valid; for b) since we cannot (yet) dynamically change the #.partitions from the consumer APIs, dynamically adding more partition-groups (hence tasks) will cause change log partitions possibly not exist yet. for c) there are some more advanced partitioning setting such as sticky-partitioning / consistent hashing that we want to support in the future, which may then require additionally.

### 流时间和同步

Time in the stream processing is very important. Windowing operations (join and aggregation) are defined by time. Since Kafka can replay stream, wall-clock based time (system time) may not make sense due to delayed messages / out-of-order messages. Hence we need to define a “time” for each stream according to its progress. We call it stream time.

Stream Time

A stream is defined to abstract all the partitions of the same topic within a task, and its name is the same as the topic name. For example if a task’s assigned partitions are {Topic1-P1, Topic1-P2, Topic1-P3, Topic2-P1, Topic2-P2}, then we treat this task as having two streams: “Topic1” and “Topic2” where “Topic1” represents three partitions P1 P2 and P3 of Topic1, and stream “Topic2” represents two partitions P1 and P2 of Topic2.

Each message in a stream has to have a timestamp to perform window based operations and punctuations. Since Kafka message does not have timestamp in the message header, users can define a timestamp extractor based on message content that is used in the source processor when deserializing the messages. This extractor can be as simple as always returning the current system time (or wall-clock time), or it can be an Avro decoder that gets the timestamp field specified in the record schema.

In addition, since Kafka supports multiple producers sending message to the same topic, brokers may receive messages in order that is not strictly following their timestamps (i.e. out-of-order messages). Therefore, we cannot simply define the “stream time” as the timestamp of the currently processed message in the stream hence that time can move back and forth.

We can define the “stream time” as a monotonically increasing value as the following:

1. For each assigned partition, the thread maintains a record queue for buffering the fetched records from the consumer.
2. Each message has an associated timestamp that is extracted from the timestamp extractor in the message content.
3. The partition time is defined as the lowest message timestamp value in its buffer.
a. When the lowest timestamp corresponding record gets processed by the thread, the partition time possibly gets advanced.
b. The partition time will NOT gets reset to a lower value even if a later message was put in a buffer with a even lower timestamp.
4. The stream time is defined as the lowest partition timestamp value across all its partitions in the task:
a. Since partition times are monotonically increasing, stream times are also monotonically increasing.
5. Any newly created streams through the upstream processors inherits the stream time of the parents; for joins, the bigger parent’s stream time is taken.

1. 对每个分配的分区，线程维护了一个记录队列，用来缓冲从消费者拉取到的记录
2. 每条消息都有一个关联的时间撮，它是从消息内容中用时间撮解析器抽取出来的
3. 分区的时间会被定义为缓冲区中最低的消息时间撮
3.1 当最低时间撮对应的记录被线程处理后，分区的时间可能会增长
3.2 分区时间不会被重置为一个更低的值，即使一条迟到的消息放到缓冲区，而它的时间撮比分区时间还要低
4. 流时间被定义为任务中所有分区的最低的分区时间
4.1 由于分区时间是单调递增的，所以流时间也是单调递增的
5. 任何通过上游处理节点新创建的流都继承了所有父节点的流时间。对于join操作而言，会选择所有父节点中最大的流时间作为它的流时间

Stream Synchronization

When joining two streams, their progress need to be synchronized. If they are out of sync, a time window based join becomes faulty. Say a delay of one stream is negligible(微不足道的) and a delay of the other stream is one day, doing join over 10 minutes window does not make sense. To handle this case, we need to make sure that the consumption rates of all partitions within each task’s assigned partition-group are “synchronized”. Note that each thread may have one or more tasks, but it does not need to synchronize the partitions across tasks’ partition-groups.

Work thread synchronizes the consumption within each one of such groups through consumer’s pause / resume APIs as following:

1. When one un-paused partition is a head of time (partition time defined as above) beyond some defined threshold with other partitions, notify the corresponding consumer to pause.
2. When one paused partition is a head of time below some defined with other partitions, notify the corresponding consumer to un-pause.

1. 当一个还没暂停的分区比其他分区的时间（这个时间指的是分区的时间）超前定义的阈值，通知对应的消费者暂停
2. 当一个暂停的分区比其他分区的时间落后定义的阈值，通知对应的消费者不要暂停（即恢复）

Two streams that are joined together have to be in the same task, and their represented partition lists have to match each other. That is, for example, a stream representing P1, P2 and P3 can be joined with another stream also representing P1, P2 and P3.

### 本地状态管理

Users can create one or more state stores during their processing logic, and each task will have a state manager that keeps an instance of each specified store inside the task. Since a single store instance will not be shared across multiple partition groups, and each partition group will only be processed by a single thread, this guarantees any store will not be accessed concurrently by multiple thread at any given time.

Log-backed State Storage

Each state store will be backed up by a different Kafka change log topic, and each instance of the store correlates to one partition of the topic, such that:

For example, if a processor instance consumes from upstream Kafka topic “topic-A” with 4 partitions, and creates two stores, namely store1 and store2, and user groups the 4 partitions into {topic-A-p1, topic-A-p2} and {topic-A-p3, topic-A-p4}; then two change log topics, for example namely “topic-store1-changelog” and “topic-store2-changelog”, need to be created beforehand, each with two partitions.

After processor writes to a store instance, it first sends the change message to its corresponding changelog topic partition. When user calls commit() in his processor, KStream needs to flush both the store instance as well as the producer sending to the changelog, as well as committing the offset in the upstream Kafka. If these three operations cannot be done atomically, then if there is a crash in between this operations duplicates could be generated since the upstream Kafka committing offset is executed in the last step; if there three operations can be done atomically, then we can guarantee “exactly-once” semantics.

Persisting and Restoring State

When we close a KStream instance, the following steps are executed:

1. Flush all store’s state as mentioned above.
2. Write the change log offsets for all stores into a local offset checkpoint file. The existence of the offset checkpoint file indicates if the instance was cleanly shutdown.

1. 刷新上面提到的所有状态存储
2. 将所有存储的变更日志偏移量写到本地的偏移量检查点文件中。是否存在偏移量检查点文件，可以用力爱判断实例是否关闭的很干净（如果没有，说明KStream没有被彻底关闭）。

Upon (re-)starting the KStream instance:

1. Try to read the local offset checkpoint file into memory, and delete the file afterwards.
2. Check the offset of the corresponding change log partition read from the checkpoint file.
a. If the offset is read successfully, load the previously flushed state and replay the change log from the read offset up to the log-end-offset.
b. Otherwise, do not load the previously flushed state and replay the change log from the beginning up to the log-end-offset.

1. 尝试读取本地的偏移量检查点文件到内存中，然后删除删除这个文件
2. 读取检查点文件，检查对应的变更日志分区的偏移量
2.1 如果成功读取了偏移量，加载之前（关闭时）刷新的状态，从读取的偏移量到日志文件的最后，重放变更日志
2.2 否则，不要加载之前刷新的状态，也不需要重放变更日志

### 工作流程总结

Upon user calling KafkaStreaming.start(), the process instance creates the worker threads given user specified #.threads. In each worker thread:

1. Construct the producer and consumer client, extract the subscription topic names from the topology.
2. Let the consumer to subscribe to the topics and gets the assigned partitions.
3. Trigger the grouping function with the assigned partitions get the returned list of partition-groups (hence tasks) with associated ids.
a. Creates a record queue for buffering the fetched records for each partition.
b. Initialize a topology instance for the task from the builder with a newly created processor context.
c. Initialize the state manager of the task and constructs / resumes user defined local states.
5. Runs the loop at its own pace until notified to be shutdown: there is no synchronization between these threads. In each iteration of the loop:
a. Thread checks if the record queues are empty / low, and if yes calls consumer.poll(timeout) / consumer.poll(0) to re-fill the buffer.
b. Choose one record from the queues and process it through the processor topology.
c. Check if some of the processors’ punctuate functions need to be triggered, and if yes, execute the function.
d. Check if user calls commit() during the processing of this records; if yes commit the offset / flush the local state / flush the producer.

1. 构造生产者和消费者客户端，从拓扑中解析订阅的主题名称
2. 消费者订阅主题，并得到分配的分区
3. 使用分配的分区触发分组方法，返回分区分组列表以及对应的编号（任务）
4. 初始化每个任务
4.1 为每个分区创建一个记录队列，用来缓冲每个分区的拉取记录
4.2 从Builder中为任务初始化拓扑实例
4.3 初始化任务的状态管理器，构造或恢复用户定义的本地状态
5. 以自己的步伐运行循环，直到收到关闭的通知，在这些线程中不需要同步，在循环的每次迭代中：
5.1 线程检查记录队列空了或者记录很少，则调用onsumer.poll(timeout) / consumer.poll(0)重新填充缓冲区
5.2 从队列中选择一条记录，并将其放入处理拓扑中处理
5.3 检查一些Processor的punctuate方法是否需要触发，如果需要则执行函数
5.4 在处理记录时，检查是否可以调用commit()，如果是，则提交偏移量、刷新本地状态、刷新生产者

Upon user calling KafkaStreaming.shutdown(), the following steps are executed:

1. Commit / flush each partition-group’s current processing state as described in the local state management section.
2. Close the embedded producer and consumer clients.

1. 提交或刷新每个分区组的当前处理状态
2. 关闭内置的生产者和消费者客户端

1. PartitionGroup: a set of partitions along with their queuing buffers and timestamp extraction logic. 分区的集合，联通它们的队列缓冲区，以及时间撮解析逻辑
2. ProcessorStateManager: the manager of the local states within a task. 任务的本地状态管理器
3. ProcessorTopology: the instance of the topology generated by the TopologyBuilder. 通过TopologyBuilder生成的拓扑实例
4. StreamTask: the task of the processing tasks unit, which include a ProcessorTopology, a ProcessorStateManager and a PartitionGroup. 处理任务的单元，包括前面三个概念
5. StreamThread: contains multiple StreamTasks, a Consumer and a Producer client. 包括多个流任务，一个生产者、消费者客户端
6. KStreamFilter/Map/Branch/…: implementations of high-level KStream topology builder operators. 实现高级KStream拓扑构造器的算子

# KIP-67:Queryable state for Kafka Streams

Today a Kafka Streams application will implicitly create state. This state is used for storing intermediate data such as aggregation results. The state is also used to store KTable’s data when they are materialized. The problem this document addresses is that this state is hidden from application developers and they cannot access it directly. The DSL allows users to make a copy of the data (using the through operator) but this leads to a doubling in the amount of state that is kept. In addition, this leads to extra IOs to external databases/key value stores that could potentially slow down the entire pipeline. Here is a simple example that illustrates the problem:

In line 4, the aggregation already maintains state in a store called StoreName, however that store cannot be directly queried by the developer. Instead, the developer makes a copy of the data in that store into a topic called streams-wordcount-output. Subsequently, the developer might instantiate its own database after reading the data from that topic (this step is not shown above). This is shown in illustration (a):