Deep into Apache Gearpump

Deep into Apache Gearpump

Prefix: I’ve heard Gearpump nearly one or two years ago, but never take a deep look inside. Until recently I’m almost done writing my chinese book about kafka internal implimentation, and decide to add some kafka relation opensouce system to my book’s appendix, such as spark streaming,storm,flink, and gearpump! So I finaly have a chance to deep into Gearpump.

Introduce

According to offical documentation: “Gearpump is a 100% Akka based platform. We model big data streaming within the Akka actor hierarchy”. Below It’s Gearpump Actor Hierarchy architecture. PS: If you don’t know Actor right now, It’s fine, just think that’s another RPC layer or message transformer.

geararch

Everything in the diagram is an actor; they fall into two categories, Cluster Actors and Application Actors.

Cluster Actors

Worker: Maps to a physical worker machine. It is responsible for managing resources and report metrics on that machine.

Master: Heart of the cluster, which manages workers, resources, and applications. The main function is delegated to three child actors, App Manager, Worker Manager, and Resource Scheduler.

Application Actors

AppMaster: Responsible to schedule the tasks to workers and manage the state of the application. Different applications have different AppMaster instances and are isolated.

Executor: Child of AppMaster, represents a JVM process. Its job is to manage the life cycle of tasks and recover the tasks in case of failure.

Task: Child of Executor, does the real job. Every task actor has a global unique address. One task actor can send data to any other task actors. This gives us great flexibility of how the computation DAG is distributed.

All actors in the graph are weaved together with actor supervision, and actor watching and every error is handled properly via supervisors. In a master, a risky job is isolated and delegated to child actors, so it’s more robust. In the application, an extra intermediate layer “Executor” is created so that we can do fine-grained and fast recovery in case of task failure. A master watches the lifecycle of AppMaster and worker to handle the failures, but the life cycle of Worker and AppMaster are not bound to a Master Actor by supervision, so that Master node can fail independently. Several Master Actors form an Akka cluster, the Master state is exchanged using the Gossip protocol in a conflict-free consistent way so that there is no single point of failure. With this hierarchy design, we are able to achieve high availability.

Next It’s a good entrance to knowing some basic concepts. It’s very necessary, you should first take a detail/serious look at if you want to know how gearpump works.

Master & Worker

Gearpump follow master slave architecture. Every cluster contains one or more Master node, and several worker nodes. Worker node is responsible to manage local resources on single machine, and Master node is responsible to manage global resources of the whole cluster.

If you have already know hadoop/spark such bigdata system, you should familiar those terminology. Here is the first comparison about gearpump and other system.

bigdata system Master Slave
Hadoop HDFS NameNode DataNode
Hadoop YARN ReourceManager NodeManager
Spark ClusterManagement Worker
Storm Nimbus Supervisor
Gearpump Master Worker

Application & AppMaster & Executor

Application is what we want to parallel and run on the cluster. There are different application types, for example MapReduce application and streaming application are different application types. Gearpump natively supports Streaming Application types, it also contains several templates to help user to create custom application types, like distributedShell.

In runtime, every application instance is represented by a single AppMaster and a list of Executors. AppMaster represents the command and controls center of the Application instance. It communicates with user, master, worker, and executor to get the job done. Each executor is a parallel unit for distributed application. Typically AppMaster and Executor will be started as JVM processes on worker nodes.

Now we have talking all important components in gearpump. Notice here we did’t mentioned Task as appeared in previous actor hierarchy. Also notice that Application is not an actor but an Java main class. Next take a look at Application Submission Flow in gearpump.

When user submits an application to Master, Master will first find an available worker to start the AppMaster. After AppMaster is started, AppMaster will request Master for more resources (worker) to start executors. The Executor now is only an empty container. After the executors are started, the AppMaster will then distribute real computation tasks to the executor and run them in parallel way.

To submit an application, a Gearpump client specifies a computation defined within a DAG and submits this to an active master. The SubmitApplication message is sent to the Master who then forwards this to an AppManager.

submit app

The AppManager locates an available worker and launches an AppMaster in a sub-process JVM of the worker. The AppMaster will then negotiate with the Master for Resource allocation in order to distribute the DAG as defined within the Application. The allocated workers will then launch Executors (new JVMs).

launch

Here I summary basic steps of submit application. notice the step number below are’t corresponding to the official pictures above.

  1. User(client) submits an streaming application to gearpump Master;
  2. Master forward SubmitApplication request to AppManager;
  3. Master will first find an available worker to start the AppMaster;
  4. AppMaster started(as Executor) on one of worker which master specified, until now, AppManager on Master can send SubmitApplicationResult to client;
  5. AppMaster send RequestResource to master, the purpose of this step is ask resources to run/launch Tasks which doing real job. After all, AppMaster is not responsible to running job, but instead let Tasks doing the job. Notice the lifecycle of both AppMaster and Tasks all resides in Executors. So If you want to start AppMaster or Task, you first must start Executor, then let Executor start AppMaster and Task;
  6. Once AppMaster receive ResouceAllocated response, it’ll send LaunchExecutor to workers which Master pointing out where to go. For ex, the ResouceAllocated response says by Master to AppMaster: you can run executors on workers #1 and #2. Then AppMaster will send LaunchExecutor request to this two workers;
  7. The Workers receive LaunchExecutor request from AppMaster, it then spawn an Executor as a java process. The reason why spawn a new process here is that the Executor and Worker thread should separate, which means the working process of Executor and Worker shouldn’t affect each other;
  8. Just like Worker register to Master for reporting resources, the Executor also register to AppMaster by sending RegisterExecutor request. If someone regist to other-one, that means someone wants to be managed/controlled by other-one. for example, students regist to school, company regist to Mainland China, employee regist to company and so on;
  9. The AppMaster receive RegisterExecutor request from Executor on Worker, it then ask Executor to start Task;
  10. As AppMaster may getting more than one resouce at step6, and each Executor all register to AppMaster, so AppMaster can start multi task on this registerd Executor;
  11. Each Task reside in Executor has DAG information defined within Application, so every Task can doing real job.

The workflow above was extraordinary like yarn application below. I take the picture and description from this excellent hortonworks blog.

  1. A client program submits the application, including the necessary specifications to launch the application-specific ApplicationMaster itself.
  2. The ResourceManager assumes the responsibility to negotiate a specified container in which to start the ApplicationMaster and then launches the ApplicationMaster.
  3. The ApplicationMaster, on boot-up, registers with the ResourceManager – the registration allows the client program to query the ResourceManager for details, which allow it to directly communicate with its own ApplicationMaster.
  4. During normal operation the ApplicationMaster negotiates appropriate resource containers via the resource-request protocol.
  5. On successful container allocations, the ApplicationMaster launches the container by providing the container launch specification to the NodeManager. The launch specification, typically, includes the necessary information to allow the container to communicate with the ApplicationMaster itself.
  6. The application code executing within the container then provides necessary information (progress, status etc.) to its ApplicationMaster via an application-specific protocol.
  7. During the application execution, the client that submitted the program communicates directly with the ApplicationMaster to get status, progress updates etc. via an application-specific protocol.
  8. Once the application is complete, and all necessary work has been finished, the ApplicationMaster deregisters with the ResourceManager and shuts down, allowing its own container to be repurposed.

yarn

The picture above start two client application to yarn cluster, the ApplicationMaster reside on node2 of red one start three containers on node1 and node3, the ApplicationMaster reside on node1 of blue one only start one container.

In yarn, ResouceManager take responsible to launch ApplicationManager on one of container, and launching Tasks on containers is the responsibility of ApplicationManager. But as you know, the ApplicationManager did’t know cluster resources, so he ask ResouceManager to give him the information of where to start tasks. Now we summary some conclusions:

  1. ResouceManager launch ApplicationManager on one of NodeManager.
  2. ApplicationManager launch Tasks on multi NodeManagers.
  3. NodeManagers report resouce to ResouceManager.
  4. Containers report task execution progress to ApplicationManager.
  5. ResouceManager manager ApplicationManager, and ApplicationManager manager tasks. If all tasks monitored by ApplicationManager was finished, then Application registered to ReousceManager was completed.

Step into gearpump, there are similiarity idea inspired from yarn. We could take yarn’s container as gearpump’s Executor, and yarn’s NodeManager as gearpump’s Worker. Because Containers reside in NodeManager at yarn world, and Executors reside in Worker at gearpump world.

yarn-gp

We could also consider yarn’s ResouceManager as gearpump’s AppManager. Note that AppManager is different from AppMaster, which the former is at Master side, and the latter is at Worker side.

The Master in Gearpump have three main components: AppManager,Scheduler,Worker Manager. In reality, there are non WorkerManager class around gearpump source code,but Master indeed has a map which mapping Worker ActorRef to WorkerId.

After oveview gearpump architecture, Let’s begin explore gearpump inside now.

Part-1: Application

First given a WordCount example, We sumbit an StreamApplication through ClientContext. Inside the application() method, we create three Processor and connect by ~ to construct a DAG graph.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object WordCount extends AkkaApp {
def application(config: ParseResult, system: ActorSystem):
StreamApplication = {
implicit val actorSystem = system
val split = new Split
val sourceProcessor = DataSourceProcessor(split, 2, "Split")
val sum = Processor[Sum](2)
val computation = sourceProcessor ~ HashPartitioner ~> sum
val app = StreamApplication("wordCount", Graph(computation))
app
}

override def main(akkaConf: Config, args: Array[String]): Unit = {
val context: ClientContext = ClientContext(akkaConf)
val app = application(config, context.system)
context.submit(app)
}
}

StreamApplication is one of gearpump supported application type, there’re other applications such as MapReduce could run in gearpump. Each Application type has special appMaster class, StreamApplication’s appMaster is AppMaster. There’re some other ApplicationMaster actor implementation embeded: DistShellAppMaster,DistServiceAppMaster,and AppMaster.

Note Application is a scala App, but ApplicationMaster is an Actor. So what’s different between an App and and Actor? Well, App normaly has a main method doing what you want, but actor doing much more complicate thing.

1
2
3
4
5
6
7
8
9
10
11
12
trait Application {
def name: String
def userConfig(implicit system: ActorSystem): UserConfig
def appMaster: Class[_ <: ApplicationMaster]
}
abstract class ApplicationMaster extends Actor

class StreamApplication() extends Application {
override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
}
class AppMaster(appContext: AppMasterContext, app: AppDescription)
extends ApplicationMaster {...}

ClientContext is a user facing util to submit/manage an application. The AppDescription describe application metadata such as appMaster name(here is AppMaster).

In the Akka world, Actor is the king. Client send SubmitApplication request to Master Actor, and expect get SubmitApplicationResult response from Master. Messages are sent to an Actor through one of the following methods.

  • ! means “fire-and-forget”, e.g. send a message asynchronously and return immediately. Also known as tell.
  • ? sends a message asynchronously and returns a Future representing a possible reply. Also known as ask. That’s the way client submit application doing here.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
def submit(app: Application, jar: String, executorNum: Int)= {
val appName = ...
val submissionConfig = ...
val appDescription = AppDescription(appName,app.appMaster.getName,...)
val appJar = Option(jar).map(loadFile)
submitApplication(SubmitApplication(appDescription, appJar))
}
private def submitApplication(submitApplication: SubmitApplication)={
val result = ActorUtil.askActor[SubmitApplicationResult](
master, submitApplication, masterClientTimeout)
val application = result.appId match {
case Success(appId) =>
Console.println(s"Submit app succeed. The app id is $appId")
new RunningApplication(appId, master, masterClientTimeout)
case Failure(ex) => throw ex
}
application
}
}

Now Let’s see how Master deal with SubmitApplication. Before this, you should know that client only submit application when Master has started. Also note that when start Master, we also start some Workers to form a gearpump cluster. Only then the cluster is stabled, client then can submit application. We can see that when startup Master, in preStart() method, Master created an AppManager and Scheduler by invoking context.actorOf(...). That means before client submit application, AppManager and Scheduler already exists in Master, and they both preparing to work.

We’re also seeing a receiveHandler() method return Receive object, and was invoked by waitForNextWorkerId() method. What context.become() and orElse meaning? well, normaly you define one receive method, but here you have seen there’re multi receive method, so become() method of ActorContext is used for switchover between different receive method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private[cluster] class Master extends Actor with Stash {
private val kvService = context.actorOf(Props(new InMemoryKVService()))
private var appManager: ActorRef = null
private var scheduler: ActorRef = null
private var workers = new immutable.HashMap[ActorRef, WorkerId]

// when start up Master, send GetKV to kvService immediatery
kvService ! GetKV(MASTER_GROUP, WORKER_ID)
context.become(waitForNextWorkerId) // wait for getting result

def waitForNextWorkerId: Receive = {
case GetKVSuccess(_, result) => // receive GetKV response
context.become(receiveHandler) // switchover to receiveHandler
unstashAll()
case msg => stash() // why do we stash here?
}

def receiveHandler: Receive = workerMsgHandler orElse
appMasterMsgHandler orElse // AppMaster to Master
onMasterListChange orElse // Master change
clientMsgHandler orElse // Client to Master. you'll see submit app here
kvServiceMsgHandler orElse ActorUtil.defaultMsgHandler(self)

override def preStart(): Unit = {
appManager = context.actorOf(
Props(new AppManager(kvService, AppMasterLauncher)),
classOf[AppManager].getSimpleName)
scheduler = context.actorOf(Props(schedulerClass))
context.system.eventStream.subscribe(self,classOf[DisassociatedEvent])
}
}

Now you have overview the main function in Master, lets see how clientMsgHandler receive method response to client’s submit application request. I have omit other unimportance request only left submit and restart application. The Master delegate/forward reqeust to AppManager.

1
2
3
4
5
def clientMsgHandler: Receive = {
case app: SubmitApplication => appManager.forward(app)
case app: RestartApplication => appManager.forward(app)
case register: RegisterAppResultListener => appManager forward register
}

AppManager is dedicated child of Master to manager all applications. The AppManager behaviour similar as Master.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private[cluster] class AppManager(
kvService: ActorRef, launcher: AppMasterLauncherFactory)
extends Actor with Stash with TimeOutScheduler {

kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
context.become(waitForMasterState)

def waitForMasterState: Receive = {
case GetKVSuccess(_, result) =>
context.become(receiveHandler)
unstashAll()
case msg => stash()
}
def receiveHandler: Receive = {
clientMsgHandler orElse // Client to AppManager
appMasterMessage orElse // AppMaster to AppManager
selfMsgHandler orElse
workerMessage orElse // Worker to AppManager
appDataStoreService orElse terminationWatch
}

def clientMsgHandler: Receive = {
case SubmitApplication(app, jar, username) =>
val client = sender()
context.actorOf(launcher.props(
nextAppId, -1, app, jar, username, context.parent, client))
// ommit something like save application metadata to kv store
}
}

Master create AppManager by invoke context.actorOf(Props(...)), here AppManager create AppMasterLauncher Actor by context.actorOf(launcher.props(..)). AppMasterLauncher is a child Actor of AppManager, it is responsible to launch the AppMaster on the cluster.

When AppManager receive SubmitApplication from client, it create AppMasterLauncher, and send RequestResource to master then wait for ResourceAllocation.

When AppMasterLauncher receive ResourceAllocated response from master, it will Try to launch a executor for AppMaster on worker specified by ResourceAllocated response.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class AppMasterLauncher(...,master: ActorRef, client: ActorRef) extends Actor {
LOG.info(s"Ask Master resource to start AppMaster $appId...")
master ! RequestResource(appId, ResourceRequest(Resource(1))
def receive: Receive = waitForResourceAllocation

def waitForResourceAllocation: Receive = {
case ResourceAllocated(allocations) =>
val ResourceAllocation(resource, worker, workerId) = allocations(0)
val workerInfo = WorkerInfo(workerId, worker)
val appMasterContext = AppMasterContext(...)
// Try to launch a executor for AppMaster on worker for app
val name = ActorUtil.actorNameForExecutor(appId, executorId)
val selfPath = ActorUtil.getFullPath(context.system, self.path)

val executorJVM = ExecutorJVMConfig(
classOf[ActorSystemBooter].getName, Array(name, selfPath), jar,
username, appMasterAkkaConfig)

worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
context.become(waitForActorSystemToStart(worker, appMasterContext, resource))
}
}

Let’s see how Worker deal with LaunchExecutor reqeust from AppMasterLauncher.

1
2
3
4
5
6
7
8
9
private[cluster] class Worker(masterProxy: ActorRef) extends Actor{
def service: Receive = appMasterMsgHandler orElse clientMessageHandler
def appMasterMsgHandler: Receive = {
case launch: LaunchExecutor =>
val executor = context.actorOf(Props(classOf[ExecutorWatcher],
launch, masterInfo, ioPool, jarStoreClient, executorProcLauncher))
context.watch(executor)
}
}

The ExecutorWatcher create a java process and the main class ActorSystemBooter is coming from ExecutorJVMConfig which defined in AppMasterLauncher.

1
2
3
4
5
class ExecutorWatcher(launch: LaunchExecutor, 
procLauncher: ExecutorProcessLauncher) extends Actor {
val ctx = launch.executorJvmConfig
procLauncher.createProcess(ctx.mainClass, ctx.arguments)
}

ExecutorWatcher is an Actor, ActorSystemBooter is an pure scala app. But inside ActorSystemBooter’s main method, it create another actor: Daemon.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ActorSystemBooter(config: Config) {
def boot(name: String, reportBackActor: String): ActorSystem = {
system.actorOf(Props(classOf[Daemon], name, reportBackActor), "daemon")
}
}
object ActorSystemBooter {
def apply(config: Config): ActorSystemBooter = new ActorSystemBooter(config)

def main(args: Array[String]) {
val name = args(0) // The parameter was passed when construnct
val reportBack = args(1) // ExecutorJVMConfig at AppMasterLauncher
apply(config).boot(name, reportBack)
}
class Daemon(val name: String, reportBack: String) extends Actor {
val reportBackActor = context.actorSelection(reportBack)
reportBackActor ! RegisterActorSystem(
ActorUtil.getSystemAddress(context.system).toString)
}
}

Those many Actor headache me, and the invoke chain nest and nest again. So I draw a picture to help me understand what happend all the way around. To make my picture looks vividly, I use gear to indicate an Actor, you can see except ActorSystemBooter, all others are Actor. The underline character means request. Let me outlines some import steps.

  1. AppManager create AppMasterLauncher which then send RequestResource to Master
  2. After AppMasterLauncher receive ResourceAllocated, it send LauncherExecutor request to Worker
  3. Worker create an ExecutorWatcher and create a java Daemon process which send RegisterActorSystem request back to Master

9

Now the AppMasterLauncher is going to deal with RegisterActorSystem request. If you backward to check AppMasterLauncher, you can find that: after AppMasterLauncher send LaunchExecutor, it is waiting for ActorSystem to start.

After Daemon actor in Worker send RegisterActorSystem request to AppMasterLauncher, the AppMasterLauncher finally have chance to receive RegisterActorSystem event, first it send ActorSystemRegistered request to Daemon, and then send another request CreateActor to Daemon again.

  1. Daemon on Worker send RegisterActorSystem request to AppMasterLauncher
  2. AppMasterLauncher on Master send ActorSystemRegistered to Daemon on Worker
  3. AppMasterLauncher on Master send CreateActor to Daemon on Worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class AppMasterLauncher(...,master: ActorRef, client: ActorRef) extends Actor {
def waitForResourceAllocation: Receive = {
worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
context.become(
waitForActorSystemToStart(worker, appMasterContext, resource))
}

def waitForActorSystemToStart(worker: ActorRef, appContext: AppMasterContext,
resource: Resource): Receive = {
case RegisterActorSystem(systemPath) =>
sender ! ActorSystemRegistered(worker)
// There're many masters construct Master HA in case of fault
val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
.asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath)

sender ! CreateActor(
AppMasterRuntimeEnvironment.props(masterAddress, app, appContext))
context.become(waitForAppMasterToStart(worker, appMasterTimeout))
}

def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable)= {
case ActorCreated(appMaster, _) =>
cancel.cancel()
sender ! BindLifeCycle(appMaster)
LOG.info(s"AppMaster is created, mission complete...")
replyToClient(SubmitApplicationResult(Success(appId)))
context.stop(self)
}
}

Seems AppMasterLauncher and Daemon are playing ping-pong, and they both back and forth many times. Finally after Daemon create another Actor which we’ll talk about later, it then send ActorCreated back to AppMasterLauncher.

  1. Daemon on Worker send ActorCreated reqeust to AppMasterLauncher on Master
  2. AppMasterLauncher send BindLifeCycle request back to Daemon on Worker
  3. and then send SubmitApplicationResult back to Client
  4. Daemon on Worker receive BindLifeCycle request from AppMasterLauncher and watch the actor. this actor being watched by Daemon is AppMaster.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Daemon(val name: String, reportBack: String) extends Actor {
def waitForRegisterResult: Receive = {
case ActorSystemRegistered(parent) =>
timeout.cancel()
context.watch(parent)
context.become(waitCommand)
}
def waitCommand: Receive = {
case BindLifeCycle(actor) =>
LOG.info(s"ActorSystem $name Binding life cycle with actor: $actor")
context.watch(actor)
case create@CreateActor(props: Props, name: String) =>
val actor = Try(context.actorOf(props, name)) // create another actor
actor match {
case Success(actor) => sender ! ActorCreated(actor, name)
case Failure(e) => sender ! CreateActorFailed(props.clazz.getName, e)
}
case PoisonPill =>
context.stop(self)
case Terminated(actor) =>
LOG.info(s"System $name Watched actor is terminated $actor")
context.stop(self)
}
}

9

Daemon create an Actor which defined in RegisterActorSystem on AppMasterLauncher. This Actor is AppMasterRuntimeEnvironment, it’ll create AppMaster.

We know that create Actor can use context.actorOf(props) method, here the props is passed from AppMasterLauncher to Daemon, but not created on Daemon side. Why do we doing this way? Because only AppMasterLauncher know how to create an AppMaster. Passing the props inside CreateActor is just like passing other request. Now the mainpoint focus transfer to AppMasterRuntimeEnvironment.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object AppMasterRuntimeEnvironment {
def props(masters: Iterable[ActorPath],
app: AppDescription, appContextInput: AppMasterContext
): Props = {
val master = (appId: AppId, masterProxy: MasterActorRef) =>
MasterWithExecutorSystemProvider.props(appId, masterProxy)

val appMaster = (appContext: AppMasterContext, app: AppDescription) =>
LazyStartAppMaster.props(appContext, app)

val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster:
RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper(
registerAppMaster, master, masterStatusListener = listener))

Props(new AppMasterRuntimeEnvironment(appContextInput, app, masters,
master, appMaster, masterConnectionKeeper))
}
}

AppMasterRuntimeEnvironment will create three Actor once it’s created. It serves as runtime environment for AppMaster. When starting an AppMaster, we need to setup the connection to master(an MasterProxy which substitute to Master), and prepare other environments.

The MasterProxy also extend the function of Master, by providing a scheduler service for Executor System. AppMaster can ask Master for executor system directly. details like requesting resource, contacting worker to start a process, and then starting an executor system is hidden from AppMaster.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private[appmaster] class AppMasterRuntimeEnvironment(
appContextInput: AppMasterContext,
app: AppDescription,
masters: Iterable[ActorPath],
masterFactory: (AppId, MasterActorRef) => Props,
appMasterFactory: (AppMasterContext, AppDescription) => Props,
masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props) extends Actor {

private val master = context.actorOf(
masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds)))))
private val appContext = appContextInput.copy(masterProxy = master)

// Create appMaster proxy to receive command and forward to appmaster
private val appMaster = context.actorOf(appMasterFactory(appContext, app))
context.watch(appMaster)

private val registerAppMaster = RegisterAppMaster(
appId, appMaster, appContext.workerInfo)
private val masterConnectionKeeper = context.actorOf(
masterConnectionKeeperFactory(master, registerAppMaster, self))
context.watch(masterConnectionKeeper)

def receive: Receive = {
case MasterConnected =>
LOG.info(s"Master is connected, start AppMaster $appId...")
appMaster ! StartAppMaster
case MasterStopped =>
LOG.error(s"Master is stopped, stop AppMaster $appId...")
context.stop(self)
case Terminated(actor) => actor match {
case `appMaster` =>
LOG.error(s"AppMaster $appId is stopped, shutdown myself")
context.stop(self)
case `masterConnectionKeeper` =>
LOG.error(s"Master connection keeper is stopped, appId: $appId, shutdown myself")
context.stop(self)
case _ => // Skip
}
}

The workflow from creating AppMasterRuntimeEnvironment to create AppMaster is trigged through MasterConnectionKeeper by sending RegisterAppMaster request to AppMasterLauncher. Finally when AppMasterRuntimeEnvironment receive MasterConnected from MasterConnectionKeeper, it send StartAppMaster to AppMaster. happy now! Take long long way bring up to AppMaster.

9

Note AppMasterRuntimeEnvironment did not send StartAppMaster directory to AppMaster but to LazyStartAppMaster. and Every message send to LazyStartAppMaster will forward to AppMaster. Why do we need a Lazy AppMaster? If you take look at LazyStartAppMaster, you’ll notice that LazyStartAppMaster is not really an AppMaster but it’s responsible to create AppMaster only when it receive StartAppMaster request from AppMasterRuntimeEnvironment. So you wont’t find StartAppMaster on AppMaster.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class LazyStartAppMaster(appId: Int, appMasterProps: Props) 
extends Actor with Stash {
def receive: Receive = null
context.become(startAppMaster)

def startAppMaster: Receive = {
case StartAppMaster =>
val appMaster = context.actorOf(appMasterProps, "appmaster")
context.watch(appMaster)
context.become(terminationWatch(appMaster) orElse
appMasterService(appMaster))
unstashAll()
case _ => stash()
}
def appMasterService(appMaster: ActorRef): Receive = {
case msg => appMaster forward msg
}
}
private[appmaster] object LazyStartAppMaster {
def props(appContext: AppMasterContext, app: AppDescription): Props = {
// the class name of app.appMaster is AppMaster
// which will create when receive StartAppMaster
val appMasterProps = Props(Class.forName(app.appMaster), appContext, app)
Props(new LazyStartAppMaster(appContext.appId, appMasterProps))
}
}

The AppMaster is the head of a streaming application. It contains:

  1. ExecutorManager to manage all executors.
  2. TaskManager to manage all tasks,
  3. ClockService to track the global clock for this streaming application.
  4. Scheduler to decide which a task should be scheduled to.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class AppMaster(appContext: AppMasterContext, app: AppDescription) 
extends ApplicationMaster {
private val dagManager = context.actorOf(Props(
new DagManager(appContext.appId, userConfig, store,
Some(getUpdatedDAG))))
private var taskManager: Option[ActorRef] = None
private var clockService: Option[ActorRef] = None

private val executorManager: ActorRef =
context.actorOf(ExecutorManager.props(userConfig, appContext, app.clusterConfig, app.name),
ActorPathUtil.executorManagerActorName)

for (dag <- getDAG) {
clockService = Some(context.actorOf(Props(new ClockService(dag, self, store))))
val jarScheduler = new JarScheduler(appId, app.name, systemConfig, context)
taskManager = Some(context.actorOf(Props(new TaskManager(appContext.appId, dagManager,
jarScheduler, executorManager, clockService.get, self, app.name))))
}

override def receive: Receive = {
taskMessageHandler orElse
executorMessageHandler orElse
ready orElse
recover orElse
appMasterService orElse
ActorUtil.defaultMsgHandler(self)
}
}

At now I lost my line of argument, as there’re no request send trigger inside AppMaster, so what’s the entry of AppMaster?

Keep in mind, once create AppMaster, it will create ExecutorManager and TaskManager. Althrough we did’t see request send directory from AppMaster, we could find if there’re something inside ExecutorManager or TaskManager.

Suddenly comeup so many Managers make me unprepared. But unlike AppManager reside in Master, ExecutorManager and TaskManager both reside in Worker!

9

Processor, OP, Task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class Planner {
/**
* Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low level Graph API.
*/
def plan(dag: Graph[Op, OpEdge])
(implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {

val graph = optimize(dag)
graph.mapEdge { (node1, edge, node2) =>
edge match {
case Shuffle =>
node2 match {
case op: GroupByOp[_, _] =>
new GroupByPartitioner(op.groupBy.groupByFn)
case _ => new HashPartitioner
}
case Direct =>
new CoLocationPartitioner
}
}.mapVertex(_.getProcessor)
}

private def optimize(dag: Graph[Op, OpEdge])
(implicit system: ActorSystem): Graph[Op, OpEdge] = {
val graph = dag.copy
val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse
for (node <- nodes) {
val outGoingEdges = graph.outgoingEdgesOf(node)
for (edge <- outGoingEdges) {
merge(graph, edge._1, edge._3)
}
}
graph
}

private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op)
(implicit system: ActorSystem): Unit = {
if (graph.outDegreeOf(node1) == 1 &&
graph.inDegreeOf(node2) == 1 &&
// For processor node, we don't allow it to merge with downstream operators
!node1.isInstanceOf[ProcessorOp[_ <: Task]] &&
!node2.isInstanceOf[ProcessorOp[_ <: Task]]) {
val (_, edge, _) = graph.outgoingEdgesOf(node1).head
if (edge == Direct) {
val chainedOp = node1.chain(node2)
graph.addVertex(chainedOp)
for (incomingEdge <- graph.incomingEdgesOf(node1)) {
graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp)
}

for (outgoingEdge <- graph.outgoingEdgesOf(node2)) {
graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3)
}

// Remove the old vertex
graph.removeVertex(node1)
graph.removeVertex(node2)
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
case class DataSourceOp(
dataSource: DataSource,
parallelism: Int = 1,
userConfig: UserConfig = UserConfig.empty,
description: String = "source")
extends Op {

override def chain(other: Op)(implicit system: ActorSystem): Op = {
DataSourceOp(dataSource, parallelism,
userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, other.fn),
description)
}

override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
Processor[DataSourceTask[Any, Any]](parallelism, description,
userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
class TaskWrapper(
val taskId: TaskId, val taskClass: Class[_ <: Task], context: TaskContextData,
userConf: UserConfig) extends TaskContext with TaskInterface {

private var task: Option[Task] = None

override def onStart(startTime: Instant): Unit = {
val constructor = taskClass.getConstructor(
classOf[TaskContext], classOf[UserConfig])
task = Some(constructor.newInstance(this, userConf))
task.foreach(_.onStart(startTime))
}
}

Utility that helps user to create a DAG starting with [[DataSourceTask]] user should pass in a [[DataSource]]

Here is an example to build a DAG that reads from Kafka source followed by word count

1
2
3
4
5
val source = new KafkaSource()
val sourceProcessor = DataSourceProcessor(source, 1)
val split = Processor[Split](1)
val sum = Processor[Sum](1)
val dag = sourceProcessor ~> split ~> sum
1
2
3
4
5
6
7
8
9
10
11
object DataSourceProcessor {
def apply(
dataSource: DataSource,
parallelism: Int = 1,
description: String = "",
taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
: Processor[DataSourceTask[Any, Any]] = {
Processor[DataSourceTask[Any, Any]](parallelism, description,
taskConf.withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
}
}

Default Task container for [[org.apache.gearpump.streaming.source.DataSource]] that reads from DataSource in batch

DataSourceTask calls:

  • DataSource.open() in onStart and pass in [[org.apache.gearpump.streaming.task.TaskContext]]

and application start time

  • DataSource.read() in each onNext, which reads a batch of messages
  • DataSource.close() in onStop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class DataSourceTask[IN, OUT] private[source](
context: TaskContext,
conf: UserConfig,
source: DataSource,
transform: Transform[IN, OUT])
extends Task(context, conf) {

def this(context: TaskContext, conf: UserConfig) = {
this(context, conf,
conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
new Transform[IN, OUT](context,
conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system))
)
}
}

文章目录
  1. 1. Introduce
  2. 2. Part-1: Application
  3. 3. Processor, OP, Task