StreamingPro

https://github.com/allwefantasy/streamingpro/

单个Job的配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
{
"you-first-streaming-job": {
"desc": "just a example",
"strategy": "spark",
"algorithm": [],
"ref": [
],
"compositor": [
{
"name": "stream.sources",
"params": [
{
"format": "socket",
"outputTable": "test",
"port": "9999",
"host": "localhost",
"path": "-"
}
]
},
{
"name": "stream.sql",
"params": [
{
"sql": "select avg(value) avgAge from test",
"outputTableName": "test3"
}
]
},
{
"name": "stream.sql",
"params": [
{
"sql": "select count(value) as nameCount from test",
"outputTableName": "test1"
}
]
},
{
"name": "stream.sql",
"params": [
{
"sql": "select sum(value) ageSum from test",
"outputTableName": "test2"
}
]
},
{
"name": "stream.sql",
"params": [
{
"sql": "select * from test1 union select * from test2 union select * from test3",
"outputTableName": "test4"
}
]
},
{
"name": "stream.outputs",
"params": [
{
"name": "jack",
"format": "console",
"path": "-",
"inputTableName": "test4",
"mode": "Overwrite"
}
]
}
],
"configParams": {
}
}
}

多个Job的配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"you-first-streaming-job": {
"desc": "just a example",
"strategy": "spark",
"algorithm": [],
"ref": [
],
"compositor": [
],
"configParams": {
}
},
"you-second-streaming-job": {
"desc": "just a example",
"strategy": "spark",
"algorithm": [],
"ref": [
],
"compositor": [
],
"configParams": {
}
}
}

StreamingPro支持Spark、SparkStreaming、SparkStruncture、Flink。入口类都是统一的StreamingApp

1
2
3
4
5
6
7
object StreamingApp {
def main(args: Array[String]): Unit = {
val params = new ParamsUtil(args)
require(params.hasParam("streaming.name"), "Application name should be set")
PlatformManager.getOrCreate.run(params)
}
}

通过streaming.platform可以指定不同的运行平台。当然,不同的运行引擎的jar包也不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SHome=/Users/allwefantasy/streamingpro

./bin/spark-submit --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.15-SNAPSHOT.jar \
-streaming.name test \
-streaming.platform spark_streaming \
-streaming.job.file.path file://$SHome/spark-streaming.json

bin/flink run -c streaming.core.StreamingApp \
/Users/allwefantasy/streamingpro/streamingpro.flink-0.4.14-SNAPSHOT-online-1.2.0.jar \
-streaming.name god \
-streaming.platform flink_streaming \
-streaming.job.file.path file:///Users/allwefantasy/streamingpro/flink.json

jar包会被用来加载不同的Runtime。Runtime运行的映射关系定义在PlatformManagerplatformNameMapping变量中。
Runtime是一个接口,最主要的是startRuntime方法和params方法。后面我们把Runtime叫做执行引擎

1
2
3
4
5
6
7
8
9
10
11
trait StreamingRuntime {
def startRuntime: StreamingRuntime
def destroyRuntime(stopGraceful: Boolean, stopContext: Boolean = false): Boolean
def streamingRuntimeInfo: StreamingRuntimeInfo
def resetRuntimeOperator(runtimeOperator: RuntimeOperator)
def configureStreamingRuntimeInfo(streamingRuntimeInfo: StreamingRuntimeInfo)
def awaitTermination
def startThriftServer
def startHttpServer
def params: JMap[Any, Any]
}

StreamingPro本质上还是通过spark-submit运行。框架的整体运行流程在PlatformManagerrun方法中。主要的步骤有:

  1. 设置配置信息
  2. 根据反射机制,创建并获取运行时环境
  3. 获取dispatcher以及所有的strategies
  4. 启动REST服务、Thrift服务、注册ZK(可选)
  5. 启动执行引擎,并等待作业完成

关于Dispatcher、Strategy的概念,参考作者的ServiceframeworkDispatcher项目。
反射创建执行引擎,调用的是对应Object类的getOrCreate方法,并传入params参数,最后实例化为StreamingRuntime。

1
2
3
4
5
6
7
def platformNameMapping = Map[String, String](
SPAKR_S_S -> "streaming.core.strategy.platform.SparkStructuredStreamingRuntime",
SPAKR_STRUCTURED_STREAMING -> "streaming.core.strategy.platform.SparkStructuredStreamingRuntime",
FLINK_STREAMING -> "streaming.core.strategy.platform.FlinkStreamingRuntime",
SPAKR_STREAMING -> "streaming.core.strategy.platform.SparkStreamingRuntime",
SPARK -> "streaming.core.strategy.platform.SparkRuntime"
)

注意:StreamingPro的Runtime只是Spark作业的执行引擎,具体根据配置文件加载策略是ServiceframeworkDispatcher的工作。
假设我们定义了下面的一个配置文件,由于采用了shortName,需要定义一个ShortNameMapping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"convert-multi-csv-to-json": {
"desc": "测试",
"strategy": "spark",
"algorithm": [
{
"name": "testProcessor"
}
],
"ref": [],
"compositor": [
{
"name": "testCompositor"
}
],
"configParams": {
}
}
}

DefaultShortNameMapping的定义如下。这样配置文件中的spark就和ServiceframeworkDispatcher的加载过程对应起来了。

1
2
3
4
5
6
7
8
9
10
11
class DefaultShortNameMapping extends ShortNameMapping {
private val compositorNameMap: Map[String, String] = Map[String, String](
"spark" -> "serviceframework.dispatcher.test.DefaultStrategy",
"testProcessor" -> "serviceframework.dispatcher.test.TestProcessor",
"testCompositor" -> "serviceframework.dispatcher.test.TestCompositor"
)
override def forName(shortName: String): String = {
if (compositorNameMap.contains(shortName)) compositorNameMap(shortName)
else shortName
}
}

ServiceframeworkDispatcher的核心是StrategyDispatcher,这个类在创建的时候,会读取配置文件。
然后解析配置文件中的strategy、algorithm(processor)、ref、compositor、configParams等配置项,并构造对应的对象。
ServiceframeworkDispatcher是一个模块组合框架,它主要定义了Compositor、Processor、Strategy三个接口。

Strategy接口包含了processor、ref、compositor,以及初始化和result方法。

1
2
3
4
5
6
7
8
9
10
trait Strategy[T] extends ServiceInj{
def processor:JList[Processor[T]]
def ref:JList[Strategy[T]]
def compositor:JList[Compositor[T]]
def name:String
def initialize(name:String,alg:JList[Processor[T]],ref:JList[Strategy[T]],com:JList[Compositor[T]],params:JMap[Any,Any])
def result(params:JMap[Any,Any]):JList[T]
def configParams:util.Map[Any, Any]
def stop = {}
}

Strategy策略的初始化需要算法、引用、组合器,以及配置信息,对应的方法是StrategyDispatcher的createStrategy方法。

注意下面的initialize方法,createAlgorithms和createCompositors初始化时
会读取params配置,这是一个嵌套了Map的列表:JList[JMap[String, Any]]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def createStrategy(name: String, desc: JMap[_, _]): Option[Strategy[T]] = {
if (_strategies.contains(name)) return None;
// 实例化策略,如果有shortName,则先获取fullName,并通过Class.forName实例化具体的策略类
val strategy = Class.forName(shortNameMapping.forName(desc.get("strategy").asInstanceOf[String])).newInstance().asInstanceOf[Strategy[T]]
// 读取配置信息,并实例化为Map[Any,Any]
val configParams: JMap[Any, Any] = if (desc.containsKey("configParams")) desc.get("configParams").asInstanceOf[JMap[Any, Any]] else new java.util.HashMap()
// 初始化策略,需要创建算法、引用、组合器
strategy.initialize(name, createAlgorithms(desc), createRefs(desc), createCompositors(desc), configParams)
_strategies.put(name, strategy)
Option(strategy)
}

// 创建算法。一个策略由0个或者多个算法提供结果
private def createAlgorithms(jobJMap: JMap[String, Any]): JList[Processor[T]] = {
if (!jobJMap.contains("algorithm") && !jobJMap.contains("processor")) return new AList[Processor[T]]()
val processors = if (jobJMap.contains("algorithm")) jobJMap("algorithm") else jobJMap("processor")
processors.asInstanceOf[JList[JMap[String, Any]]].map {
alg =>
val name = shortName2FullName(alg)
val processor = Class.forName(name).newInstance().asInstanceOf[Processor[T]]
val params: JList[JMap[String, Any]] = if (alg.contains("params")) alg("params").asInstanceOf[JList[JMap[String, Any]]] else new AList[JMap[String, Any]]()
processor.initialize(name, params)
processor
}
}

// 创建组合器,可以多个,按顺序调用。有点类似过滤器链。第一个过滤器会接受算法或者策略的结果。后续的组合器就只能处理上一阶段的组合器吐出的结果
private def createCompositors(jobJMap: JMap[String, Any]): JList[Compositor[T]] = {
if (!jobJMap.contains("compositor")) return new AList()
val compositors = jobJMap.get("compositor")
compositors.asInstanceOf[JList[JMap[String, Any]]].map {
f =>
val compositor = Class.forName(shortName2FullName(f)).newInstance().asInstanceOf[Compositor[T]]
val params: JList[JMap[String, Any]] = if (f.contains("params")) f.get("params").asInstanceOf[JList[JMap[String, Any]]] else new AList[JMap[String, Any]]()
compositor.initialize(f.get("typeFilter").asInstanceOf[JList[String]], params)
compositor
}
}

ServiceframeworkDispatcher的核心是StrategyDispatcher,而StrategyDispatcher的核心是其dispatch方法。

1
2
3
4
5
6
def dispatch(params: JMap[Any, Any]): JList[T] = {
findStrategies(clientType) match {
case Some(strategies) =>
strategies.flatMap { f => f.result(params) }
}
}

不同执行引擎的启动方法实现不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with PlatformManagerListener {
override def startRuntime: StreamingRuntime = this

var sparkSession: SparkSession = createRuntime
def createRuntime = {
//...创建SparkSession,这里会根据参数判断是否支持Hive、Carbondata
}

params.put("_session_", sparkSession) //将SparkSession放入params中
registerUDF

override def params: JMap[Any, Any] = _params
}

class SparkStreamingRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with PlatformManagerListener { self =>
var streamingContext: StreamingContext = createRuntime
def createRuntime = {
//创建StreamingContext,并将SparkSession放入params中
}

override def startRuntime = {
streamingContext.start()
this
}
override def awaitTermination = streamingContext.awaitTermination()
}

但真正执行StreamingPro主流程在streamingpro-commons下的SparkStreamingStrategy类。
注意:如果是spark-1.6,则streamingpro-spark下也有一个SparkStreamingStrategy类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class SparkStreamingStrategy[T] extends Strategy[T] with DebugTrait with JobStrategy {
var _ref: util.List[Strategy[T]] = _
var _compositor: util.List[Compositor[T]] = _
var _processor: util.List[Processor[T]] = _
var _configParams: util.Map[Any, Any] = _

def result(params: util.Map[Any, Any]): util.List[T] = {
ref.foreach { r => r.result(params) } // 先执行ref
if (compositor != null && compositor.size() > 0) {
// 第一个Compositor, 产生第一个中间结果
var middleR = compositor.get(0).result(processor, ref, null, params)
// 将新的中间结果运用到下一个Compositor
// 第一个Compositor的结果运用到第二个的输入, 第二个Compositor的结果运用到第三个Compositor的输入...
// 所以不同Compositor是链式执行的
for (i <- 1 until compositor.size()) {
middleR = compositor.get(i).result(processor, ref, middleR, params)
}
middleR
} else new util.ArrayList[T]()
}
}

注意:配置文件中每个Job都有一个strategy级别的configParamsref也会使用这个全局的configParams
它是一个Map[String, Any]的结构。每个Compositor和Processor内部也有一个params配置,这是一个数组。

实际上,全局的configParams参数会被用在Strategy、Ref/Processor和Compositor的result()方法的最后一个参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
"compositor": [
{
"name": "testCompositor",
"params": [
{
"sql": "select avg(value) avgAge from test",
"outputTableName": "test3"
},
{
"sql": "select sum(value) sumAge from test",
"outputTableName": "test4"
}
]
}
],

接下来以读取多个数据源的Compositor实现类为例:

  • _configParams是在创建Compositor时初始化调用的,这是一个List[Map[String, Any]]的结构,对应了params列表配置
  • 如果需要替换,则会先处理配置信息
  • 接着,从params中获取SparkSession(还记得之前创建Runtime时放入Map中吗?),
  • 然后,执行sparkSession.read.format(xx).options(Map).load(path)
  • 最后,通过df.createOrReplaceTempView创建Spark SQL的临时表,名称为outputTable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MultiSQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
private var _configParams: util.List[util.Map[Any, Any]] = _

override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
this._configParams = configParams
}

override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {

_configParams.foreach { sourceConfig =>
val name = sourceConfig.getOrElse("name", "").toString

val _cfg = sourceConfig.map(f => (f._1.toString, f._2.toString)).map { f =>
(f._1, params.getOrElse(s"streaming.sql.source.${name}.${f._1}", f._2).toString)
}.toMap

val sourcePath = _cfg("path")
val df = sparkSession(params).read.format(sourceConfig("format").toString).options(
(_cfg - "format" - "path" - "outputTable").map(f => (f._1.toString, f._2.toString))).load(sourcePath)
df.createOrReplaceTempView(_cfg.getOrElse("outputTable", _cfg.getOrElse("outputTableName", "")))
}
List()
}
}

为了支持配置的动态替换,_cfg参数会做一些处理,比如上面的s"streaming.sql.source.${name}.${f._1}"如果需要被替换,则会被替换为f._2
下表列举了StreamingPro支持的几种替换方式。

配置参数 配置示例 动态传参数
streaming.sql.source.[name].[参数] “path”: “file:///tmp/sample_article.txt” -streaming.sql.source.firstSource.path file:///tmp/wow.txt
streaming.sql.out.[name].[参数] “path”: “file:///tmp/sample_article.txt” -streaming.sql.source.firstSink.path file:///tmp/wow_20170101.txt
streaming.sql.params.[param-name] “sql”: “select * from test where hp_time=:today” -streaming.sql.params.today “20170101”

假设有两个数据输入源和一个输出目标的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
"name": "batch.sources",
"params": [
{
"name":"firstSource",
"path": "file:///tmp/sample_article.txt",
"format": "com.databricks.spark.csv",
"outputTable": "article",
"header":true
},
{
"name":"secondSource",
"path": "file:///tmp/sample_article2.txt",
"format": "com.databricks.spark.csv",
"outputTable": "article2",
"header":true
}
]
},
{
"name": "batch.outputs",
"params": [
{
"name":"firstSink",
"path": "file:///tmp/sample_article.txt",
"format": "com.databricks.spark.csv",
"outputTable": "article",
"header":true
}
]
}

Source的功能是:读取输入源形成DataFrame,然后创建临时表。其他组件比如SQL也是类似的。至此StreamingPro的大致流程就分析完了。


文章目录