PlayFramework快速入门

Scala PlayFramework(2.4)入门
示例程序:https://github.com/zqhxuyuan/first-player

  • Run! Run!! Run!!!
  • Hello World!
    • 路由和Controller
    • 页面和渲染
      • 模板编译
  • Products Example
    • implicit
  • Spark集成
    • play-spark-module
    • Spark Launcher
      • mesos
      • 问题
    • Spark Cassandra
    • 预览
    • Actor
    • Quartz
      • Job依赖注入
      • Quartz作业的自动重启
    • Kafka
  • 附录
    • sbt私服
    • Play MultiProject
    • Run & Debug With IDEA
        1. Play App(❌)
        1. Sbt Task(✅)
        1. jvm-debug(✅)

Run! Run!! Run!!!

安装activator

1
2
3
4
brew install typesafe-activator
activator new first-player
cd first-player
activator run

导入IntelliJ IDEA

  1. install IDEA professional
  2. install PlayFramework support
  3. import SBT project
  4. right click controllers.Products, Run Play2 App in idea

Hello World!

activator new first-player生成的目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
➜  first-player tree -L 2
.
├── activator 使用activator new才有该文件
├── activator-launch-1.3.4.jar
├── app
│   ├── controllers 控制器
│   ├── models 模型
│   └── views 视图(Web页面)
├── build.sbt
├── conf
│   ├── application.conf 配置文件
│   └── routes 路由配置
├── project
│   ├── build.properties
│   ├── plugins.sbt 插件配置
├── public 样式和脚本
│   ├── images
│   ├── javascripts
│   └── stylesheets

路由和Controller

路由配置:

1
2
3
4
5
# http://localhost:9000
GET / controllers.HomeController.index

# http://localhost:9000/hello?n=Play!
GET /hello controllers.HomeController.hello(n: String)

对应的HomeController方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import javax.inject._
import models.Product
import play.api._
import play.api.mvc._

@Singleton
class HomeController @Inject() extends Controller {
def index = Action {
Ok(views.html.index("Your new application is ready.")) //①
}

def hello(name: String) = Action {
//Ok("Hello " + name)
Ok(views.html.hello(name)) //②
}
}

①:index方法的Response响应会返回一个Html页面,指向app/views/index.scala.html页面
②:hello方法的响应返回app/views/hello.scala.html页面,只不过这个页面会渲染一个参数

注:如果Controller的Action方法是Ok(“字符串”)则返回的是一个字符串。
2.2版本中Controller可以用object,在2.4需要用class,以及@Inject和@Singleton

本节知识点:

  1. 路由配置到控制器的映射
  2. Action的Response返回值如何指向页面
  3. Action的参数如何传递给页面

页面和渲染

views是Play的html页面,也可以在该页面下新建子目录。在IntelliJ中,在views下创建子目录,
也是创建一个Package,,因为views和controllers的等级相同。

1
2
3
4
5
6
7
8
9
10
➜  app tree views
views
├── hello.scala.html
├── index.scala.html
├── main.scala.html
├── products
│   ├── details.scala.html
│   ├── edit.scala.html
│   ├── list.scala.html
│   └── main.scala.html

app/views/index.scala.html:index方法中的”Your new application is ready.”
会传递到index.scala.html作为第一行的message参数。

@main调用的是同一个路径的main.scala.html模板页面,也可以用@views.html.main绝对路径。
Play的scala.html页面实际上是模板,可以用Scala代码的方式来调用。
所以@main方法的第一参数是字符串(标题),第二个参数是Html对象用来表示page body。
@play20.welcome是Play内置的一个方法(绝对路径是@views.html.play20)

1
2
3
4
5
@(message: String)

@main("Welcome to Play") {
@play20.welcome(message, style = "Scala")
}

上面的@main方法会调用同路径的main.scala.html模板页面(Layout嵌套)。
app/views/main.scala.html:第一行有两个参数,而且是参数列表的方式,而不是两个参数放在一起。

index页面的”Welcome to Play”作为title参数,第二个参数则作为content参数,正好对应了Html类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
@(title: String)(content: Html)

<!DOCTYPE html>
<html lang="en">
<head>
<title>@title</title>
<link rel="stylesheet" media="screen" href="@routes.Assets.versioned("stylesheets/main.css")">
<link rel="shortcut icon" type="image/png" href="@routes.Assets.versioned("images/favicon.png")">
</head>
<body>
@content
</body>
</html>

两个参数放在一起的话,类似:@(title: String, content: Html)。对应的调用方式也要更改为:

1
2
3
@main("Welcome to Play", {
@play20.welcome(message, style = "Scala")
})

本节知识点:

  1. views子目录的创建,以及在控制器中如何访问到指定子目录下的页面
  2. 页面的参数定义和使用(@标记后面的都是Scala代码),参数列表
  3. 嵌套模板的使用,对应的Scala方法
  4. 缺省的模板调用路径是和当前页面相同路径,绝对路径是`@views.html.`

模板编译

views下的每个页面都会被编译成Scala的模板类,下面右图views.html实际上就和控制器的调用是类似的。

如果是其他格式,比如json、xml,则用views.json或者views.xml,可见不总是views.html。

play vies

HomeController的idnex方法

1
2
3
def index = Action {
Ok(views.html.index("Your new application is ready."))
}

编译后的文件路径:target/scala-2.11/twirl/main/views.html/index.template.scala
index是一个object,对应index类的apply方法,接收一个参数,参数名称为message。

1
2
3
4
5
6
7
8
9
object index_Scope0 {
class index {
def apply(message: String):play.twirl.api.HtmlFormat.Appendable = {...}
def render(message:String): play.twirl.api.HtmlFormat.Appendable = apply(message)
def f:((String) => play.twirl.api.HtmlFormat.Appendable) = (message) => apply(message)
def ref: this.type = this
}
}
object index extends index_Scope0.index

index.scala.html调用@main方法对应的views/main.scala.html

1
2
3
4
@(message: String)
@main("Welcome to Play") {
@play20.welcome(message, style = "Scala")
}

main方法对应的编译文件:target/scala-2.11/twirl/main/views.html/main.template.scala
main类的apply方法有两个参数,分别是(title: String)(content: Html)

1
2
3
4
5
6
7
8
9
object main_Scope0 {
class main {
def apply(title: String)(content: Html):play.twirl.api.HtmlFormat.Appendable = { ... }
def render(title:String,content:Html): play.twirl.api.HtmlFormat.Appendable = apply(title)(content)
def f:((String) => (Html) => play.twirl.api.HtmlFormat.Appendable) = (title) => (content) => apply(title)(content)
def ref: this.type = this
}
}
object main extends main_Scope0.main

Products Example

路由配置:

1
2
3
4
POST    /products                      controllers.Products.save
GET /products/new controllers.Products.newProduct
GET /products controllers.Products.products
GET /products/:ean controllers.Products.show(ean: Long)

Products控制器,为了支持国际化,添加I18nSupport接口。注意每个Action都添加了implicit request =>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Singleton
class Products @Inject() (val messagesApi: MessagesApi) extends Controller with I18nSupport{

def products = Action { implicit request =>
val products = Product.findAll
Ok(views.html.products.list(products))
}

def show(ean: Long) = Action { implicit request =>
Product.findByEan(ean).map { product =>
Ok(views.html.products.details(product))
}.getOrElse(NotFound)
}
}

views/products/list.scala.html

1
2
3
4
5
6
7
8
9
10
11
@(productList: List[Product])(implicit lang: Messages)

@products.main(Messages("application.name")) {
<dl class="products">
@for(product <- productList) {
<dt><a href="@controllers.routes.Products.show(product.ean)">@product.ean</a></dt>
<dt>@product.name</dt>
<dd>@product.description</dd>
}
</dl>
}

注意第一行的参数productList不能命名为:products,否则第二行的`@products.main会报错, 它会认为products来自于第一行的参数,但这个参数并没有main方法(List没有main方法)。而实际上@products.main代表的是要调用@views.html.products.main对应的views/products/main.scala.html页面。 注意:productList参数名称不需要和Products控制器的products方法的products`变量相同,只要类型相同即可,所以可以是任意的名称。

这里还有一个隐式的Messages,用来做国际化的支持。

反向路由:在product的en上超链接:”@controllers.routes.Products.show(product.ean)”,也可以省略掉controllers:”@routes.Products.show(product.ean)”。
在从路由配置到控制器时,Play会帮我们创建一个控制器到路由的反向路由配置。比如这里页面要访问”Products.show”方法。

reverse route

本节知识点:

  1. 国际化的支持,以及implicit隐式参数的使用
  2. 参数名称不能和嵌套模板的子目录名称一样(比如参数productList,子目录是products,两者不能相同)
  3. 在页面中访问控制器方法,使用反向路由

implicit

正常的Action调用

1
2
3
4
5
6
7
8
//normal invoke, must pass Cart Object
def catalog10() = Action { request =>
val products = ProductDAO.list
Ok(views.html.shop.catalog10(products, cart(request)))
}
def cart(request: RequestHeader) = {
Cart.demoCart()
}

对应的页面:

1
2
3
4
5
6
@(products: Seq[Product3], cart: Cart)

@shop.cart10("")(cart) {
<h2>Catalog</h2>
@views.html.shop.productList(products)
}

使用implicit,不需要传递Cat对象,不过request需要使用implicit,同时还要提供一个能够生成Cart对象的方法或者trait接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//implicit invoke, no need to pass Cart
def catalog11() = Action { implicit request =>
val products = ProductDAO.list
//too many arguments for method apply: (content: C)(implicit writeable: play.api.http.Writeable[C])play.api.mvc.Result in class Status
//Ok(views.html.shop.catalog11(products), cart(request))

//如果没有定义implicit的方法,是不会直接使用上面的cart方法的
//could not find implicit value for parameter cart: models.Cart
Ok(views.html.shop.catalog11(products))
}

//方法名称任意,只要定义成implicit即可
//注意:如果去掉返回类型: models.Cart,也会报错说找不到models.Cart,所以要显示声明返回类型
implicit def cartImplicit(implicit request: RequestHeader): models.Cart = {
Cart.demoCart()
}

对应的页面:可以看到参数中cart变成implicit,调用时,不需要传递cart了。

1
2
3
4
5
6
@(products: Seq[Product3])(implicit cart: Cart)

@shop.cart11("") {
<h2>Catalog</h2>
@views.html.shop.productList(products)
}

如果没有implicit方法没有显示指定类型,也会报错:

implicit 1

We’ve moved the Cart parameter to a second parameter list and made it implicit,
so we can apply this template and omit the second parameter list if an implicit Cart is available on the calling side.

Now we’ve declared the cart method as implicit. In addition, we’ve declared the RequestHeader parameter of
both our action and the cart method as implicit. If we now call the views.html.shop.catalog template and
omit the Cart parameter, the Scala compiler will look for an implicit Cart in scope. It’ll find the cart method,
which requires a RequestHeader parameter that’s also declared as implicit, but that’s also available.
We can make our newly created cart method reusable, by moving it into a trait. Then We can
now mix this trait into every controller where we need access to our implicit Cart.

我们定义了Action方法的RequestHeader(implicit request)以及cart方法(implicit def cartImplicit)都是implicit。
那么调用catalog模板(views.html.shop.catalog11(products))时就可以省略Cart参数,Scala的编译器会在范围内寻找隐式的Cart对象。
它会找到cart方法(implicit def cartImplicit)。为了让隐式的Cart方法可重用,可以定义在一个接口中,这样其他控制器都可以使用这个方法

1
2
3
4
5
trait WithCart {
implicit def cart(implicit request: RequestHeader) = {
// Get cart from session
}
}

implicit cart

虽然使用implicit有诸多好处,但是在需要添加其他implicit参数时,还要在页面中依次添加对应的(implicit)声明。解决办法是用一个对象来表示所有的隐式参数
然后在页面中使用这个隐式的对象即可,当需要添加隐式参数时,往对象中添加即可,页面中因为引用的是对象,所以自动继承了这个对象的所有隐式参数。

It’s often necessary to pass multiple values from your controller into your main template. Even with implicit parameters,
it would be a hassle to have to add another one each time, because you’d still have to add the implicit parameter to all
of the template definitions. One straightforward solution is to create a single class that contains all the objects you
need in your template, and pass an instance of that. If you want to add a value to it, you only need to adapt the template
where you use it, and the method that constructs it.

It’s common to pass the RequestHeader or Request to templates, as we’ll see in section 6.7.2.
Play provides a WrappedRequest class, which wraps a Request and implements the interface itself as well,
so it’s usable as if it were a regular Request. But by extending WrappedRequest, you can add other fields:

1
case class UserDataRequest[A](val user: User, val cart: Cart, request: Request[A]) extends WrappedRequest(request)

If you pass an instance of this UserDataRequest to your template, you have a refer- ence to the Request, User, and Cart.

Spark集成

Play和Spark使用的jackson依赖包有冲突,如果没有显示指定jackson,报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[RuntimeException: 
java.lang.VerifyError: class com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides final method withResolved.
(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;
Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:280)
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:206)
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
at Global$.onError(Global.scala:11)
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:98)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:344)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:343)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

https://groups.google.com/forum/#!topic/play-framework/LIf_Ughidcc
https://github.com/FasterXML/jackson-module-scala/issues/214
http://stackoverflow.com/questions/33815396/spark-com-fasterxml-jackson-module-error

没有指定jackson时,jackson-module-scala和jackson-databind的不一致:

play-spark-jackson1

在build.sbt手动添加jackson-module-scala的依赖,版本和Play的一致。

1
libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.7.6"

play-spark-jackson2

如果Actor版本不对,执行Spark时可能报错:比如引入了libraryDependencies += “com.github.dnvriend” %% “akka-persistence-jdbc” % “2.6.8”
是的Actor版本从2.4.10升级到2.4.12

1
2
3
4
5
6
7
8
9
10
11
play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[RuntimeException: java.lang.NoSuchMethodError: akka.actor.LocalActorRefProvider.log()Lakka/event/LoggingAdapter;]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:293)
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:220)
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
at Global$.onError(Global.scala:11)
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:344)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:343)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

play-spark-module

https://github.com/JoaoVasques/play-spark-module

修改SparkController的timeout时间为30s,访问:http://localhost:9000/spark/count

1
2
3
4
[warn] o.a.h.u.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[info] p.m.i.j.p.s.w.SparkJobWorker - Starting job 1561847669 future
3.216
[info] p.m.i.j.p.s.w.SparkJobWorker - Job is done. Shutting down worker

Spark Launcher

Package org.apache.spark.launcher:Library for launching Spark applications.
This library allows applications to launch Spark programmatically. There’s only one entry point to the library - the SparkLauncher class.

The SparkLauncher.startApplication( org.apache.spark.launcher.SparkAppHandle.Listener…) can be used to start Spark and provide a handle to monitor and control the running application:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

public class MyLauncher {
public static void main(String[] args) throws Exception {
SparkAppHandle handle = new SparkLauncher()
.setAppResource("/my/app.jar")
.setMainClass("my.spark.app.Main")
.setMaster("local")
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.startApplication();
// Use handle API to monitor / control application.
}
}

It’s also possible to launch a raw child process, using the SparkLauncher.launch() method:

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.launcher.SparkLauncher;

public class MyLauncher {
public static void main(String[] args) throws Exception {
Process spark = new SparkLauncher()
.setAppResource("/my/app.jar")
.setMainClass("my.spark.app.Main")
.setMaster("local")
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.launch();
spark.waitFor();
}
}

This method requires the calling code to manually manage the child process, including its output streams (to avoid possible deadlocks). It’s recommended that SparkLauncher.startApplication( org.apache.spark.launcher.SparkAppHandle.Listener…) be used instead.

mesos

代码中指定本地文件(不推荐用法)

1
2
3
4
cd /Users/zhengqh/Github/scala/simple-app
mvn clean package
~/Soft/spark-1.6.2-bin-hadoop2.6/bin/spark-submit --class "SimpleApp" --master local\[2\] \
target/scala-2.10/simple-project_2.10-1.0.jar

命令行指定file路径(也可以指定文件类型,比如本地或者hdfs,推荐用法)

1
2
3
~/Soft/spark-1.6.2-bin-hadoop2.6/bin/spark-submit --class "SimpleApp" --master local\[2\] \
--conf spark.app.logfile="file:///Users/zhengqh/Github/scala/simple-app/README.md" \
target/scala-2.10/simple-project_2.10-1.0.jar

如果本地环境变量有HADOOP_CONF_DIR,需要注释掉,否则会使用hdfs,报错:

1
2
3
Exception in thread "main" java.net.ConnectException: Call From zqhmac/10.57.2.31 to localhost:9000 
failed on connection exception: java.net.ConnectException: Connection refused; For more details see:
http://wiki.apache.org/hadoop/ConnectionRefused

命令行也可以指定其他运行方式,比如mesos,所以不推荐在代码中写死setMaster,除非开发时设置为setMaster(“local[*]”)

1
2
3
~/Soft/spark-1.6.2-bin-hadoop2.6/bin/spark-submit --class "SimpleApp" --master "mesos://192.168.6.52:5050" \
--conf spark.app.logfile="file:///Users/zhengqh/Github/scala/simple-app/README.md" \
target/scala-2.10/simple-project_2.10-1.0.jar

在本地运行时报错:

1
2
3
Failed to load native Mesos library from /Users/zhengqh/Library/Java/Extensions:/Library/Java/Extensions:
/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
Exception in thread "main" java.lang.UnsatisfiedLinkError: no mesos in java.library.path

所以需要在本地安装有mesos环境,Mac安装mesos可以参考https://mesosphere.com/blog/2014/07/07/installing-mesos-on-your-mac-with-homebrew/

1
2
brew install mesos
/usr/local/sbin/mesos-master --registry=in_memory --ip=127.0.0.1

环境变量添加mesos:

1
2
3
4
# For Linux
$ export MESOS_NATIVE_JAVA_LIBRARY='/usr/local/lib/libmesos.so'
# For OSX
$ export MESOS_NATIVE_JAVA_LIBRARY='/usr/local/lib/libmesos.dylib'

本地安装好mesos,设置好mesos库,指定远程mesos:mesos://192.168.6.52:5050

1
2
3
4
5
**************************************************
Scheduler driver bound to loopback interface! Cannot communicate with remote master(s). You might want to set 'LIBPROCESS_IP' environment variable to use a routable IP address.
**************************************************
I1121 15:07:33.008949 778174464 sched.cpp:157] Version: 0.22.1
I1121 15:07:33.016233 651489280 sched.cpp:254] New master detected at master@192.168.6.52:5050

但是,执行到上面后就不动了,可能是本地的mesos和远程mesos版本不对:

1
2
3
4
5
6
7
8
9
10
~/Soft/spark-1.6.2-bin-hadoop2.6/bin/spark-submit --class "SimpleApp" --master mesos://zk://192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181/mesos \
--conf spark.app.logfile="file:///Users/zhengqh/Github/scala/simple-app/README.md" \
target/scala-2.10/simple-project_2.10-1.0.jar

I1121 15:34:42.195302 655667200 group.cpp:313] Group process (group(1)@127.0.0.1:61225) connected to ZooKeeper
I1121 15:34:42.195796 655667200 group.cpp:790] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0)
I1121 15:34:42.196069 655667200 group.cpp:385] Trying to create path '/mesos' in ZooKeeper
I1121 15:34:42.208238 662253568 detector.cpp:138] Detected a new leader: (id='62')
I1121 15:34:42.208575 655667200 group.cpp:659] Trying to get '/mesos/json.info_0000000062' in ZooKeeper
Failed to detect a master: Failed to parse data of unknown label 'json.info

如果指定本地mesos(要先在本地启动mesos-local):

1
2
3
4
5
6
7
8
9
10
~/Soft/spark-1.6.2-bin-hadoop2.6/bin/spark-submit --class "SimpleApp" --master "mesos://localhost:5050" \
--conf spark.app.logfile="file:///Users/zhengqh/Github/scala/simple-app/README.md" \
target/scala-2.10/simple-project_2.10-1.0.jar

I1121 15:11:07.611460 515149824 sched.cpp:448] Framework registered with 20161121-150645-16777343-5050-14247-0000 出现Framework registered,实际上就说明连接本地mesos成功了
16/11/21 15:11:07 INFO mesos.CoarseMesosSchedulerBackend: Registered as framework ID 20161121-150645-16777343-5050-14247-0000
16/11/21 15:11:08 INFO mesos.CoarseMesosSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
16/11/21 15:11:08 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now TASK_RUNNING
16/11/21 15:11:12 INFO spark.SparkContext: Starting job: count at SimpleApp.scala:13
Lines with a: 1, Lines with b: 0

或者将jar包上传到具有mesos环境的客户端节点,因为远程spark-mesos已经集成好了,所以可以不写–master

1
2
3
/usr/install/spark/bin/spark-submit --class "SimpleApp" --master "mesos://192.168.6.52:5050" \
--conf spark.app.logfile="/user/qihuang.zheng/hello.txt" \
simple-project_2.10-1.0.jar

问题

远程运行任务,并指定proxy-user

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
sudo -u admin /usr/install/spark/bin/spark-submit \
--master mesos://zk://192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181/mesos \
--class cn.fraudmetrix.pontus.demo.SimpleApp \
--proxy-user qihuang.zheng \
--conf spark.app.logfile="/user/qihuang.zheng/hello.txt" \
/usr/install/pontus/pontus-spark-1.0.0-SNAPSHOT-fat.jar

sudo -u admin /usr/install/spark/bin/spark-submit \
--master "mesos://zk://192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181/mesos" \
--proxy-user qihuang.zheng \
--class cn.fraudmetrix.pontus.cassandra.PontusCassandra2HDFSJobHandler \
--conf spark.app.logfile="/usr/install/spark/README.md" \
--conf spark.pontus.destTable="" \
--conf spark.pontus.tableId="" \
--conf spark.pontus.destType="HDFS" \
--conf spark.cores.max="10" \
--conf spark.pontus.execution.id="3" \
--conf spark.pontus.destUsername="" \
--conf spark.pontus.creator="qihuang.zheng" \
--conf spark.pontus.sourceUri="192.168.6.53/keyspace1" \
--conf spark.pontus.sourcePassword="" \
--conf spark.mesos.role="test" \
--conf spark.pontus.sourceTable="standard1" \
--conf spark.pontus.sourceType="Cassandra" \
--conf spark.pontus.readMode="1" \
--conf spark.pontus.tableTs="" \
--conf spark.pontus.destUri="/user/tongdun/pontus/standard1" \
--conf spark.executor.memory="4g" \
--conf spark.pontus.name="cassandra-hdfs" \
--conf spark.pontus.destPassword="" \
--conf spark.pontus.id="2" \
--conf spark.pontus.writeMode="1" \
--conf spark.pontus.sourceUsername="" \
/usr/install/pontus/pontus-spark-1.0.0-SNAPSHOT-fat.jar

mesos kill framework, but won’t kill spark driver

1
curl -XPOST http://192.168.6.52:5050/master/teardown -d 'frameworkId=475189a7-dcde-4859-9af8-6fc2e63be94e-0556'

手动设置spark.app.id无效:

1
2
3
4
5
6
/usr/install/spark/bin/spark-submit \
--master mesos://zk://192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181/mesos \
--class cn.fraudmetrix.pontus.demo.SimpleApp \
--conf "spark.app.id=SimpleApp" \
--conf spark.app.logfile="/user/qihuang.zheng/hello.txt" \
/usr/install/pontus/pontus-spark-1.0.0-SNAPSHOT-fat.jar

序列化不适配?? spark_2.10和2.11的版本问题??

play_2.5.9不支持scal_2.10: https://www.playframework.com/documentation/2.5.x/Migration25
Play 2.3 and 2.4 supported both Scala 2.10 and 2.11. Play 2.5 has dropped support for Scala 2.10 and now only supports Scala 2.11.

Jira: https://issues.apache.org/jira/browse/SPARK-13956
说的是driver版本和executor版本不一致导致的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
I1209 20:58:27.190430  3039 sched.cpp:703] Framework registered with 475189a7-dcde-4859-9af8-6fc2e63be94e-0423
[info] p.m.i.j.p.s.w.SparkJobWorker - Starting job -1651747420 future
[Stage 0:> (0 + 0) / 2][error] o.a.s.n.s.TransportRequestHandler - Error while invoking RpcHandler#receive() on RPC id 8897300371223004861
java.io.InvalidClassException: org.apache.spark.rpc.netty.RequestMessage; local class incompatible: stream classdesc serialVersionUID = -2221986757032131007, local class serialVersionUID = -5447855329526097695
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:258)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
[error] o.a.s.n.s.TransportRequestHandler - Error while invoking RpcHandler#receive() on RPC id 9207561725902043069
java.io.InvalidClassException: org.apache.spark.rpc.netty.RequestMessage; local class incompatible: stream classdesc serialVersionUID = -2221986757032131007, local class serialVersionUID = -5447855329526097695
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:258)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
[warn] o.a.s.s.TaskSchedulerImpl - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
[error] application -

! @72b33a0kb - Internal server error, for (GET) [/spark] ->

play.api.UnexpectedException: Unexpected exception[TimeoutException: Futures timed out after [30 seconds]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:276)
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:206)
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
at util.Global$.onError(Global.scala:22)
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:98)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:344)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:343)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:167)
at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:165)
at akka.dispatch.BatchingExecutor$BlockableBatch.blockOn(BatchingExecutor.scala:106)
at scala.concurrent.Await$.result(package.scala:190)
at controllers.SparkController$$anonfun$sparkJob$1.apply(SparkController.scala:56)
at controllers.SparkController$$anonfun$sparkJob$1.apply(SparkController.scala:42)
[warn] o.a.s.s.TaskSchedulerImpl - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

把spark-core打包方式改为provided是不行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getDeclaredConstructors(Class.java:2020)
at com.google.inject.spi.InjectionPoint.forConstructorOf(InjectionPoint.java:245)
at com.google.inject.internal.ConstructorBindingImpl.create(ConstructorBindingImpl.java:99)
at com.google.inject.internal.InjectorImpl.createUninitializedBinding(InjectorImpl.java:658)
at com.google.inject.internal.InjectorImpl.createJustInTimeBinding(InjectorImpl.java:882)
at com.google.inject.internal.InjectorImpl.createJustInTimeBindingRecursive(InjectorImpl.java:805)
at com.google.inject.internal.InjectorImpl.getJustInTimeBinding(InjectorImpl.java:282)
at com.google.inject.internal.InjectorImpl.getBindingOrThrow(InjectorImpl.java:214)
at com.google.inject.internal.InjectorImpl.getProviderOrThrow(InjectorImpl.java:1006)
at com.google.inject.internal.InjectorImpl.getProvider(InjectorImpl.java:1038)
at com.google.inject.internal.InjectorImpl.getProvider(InjectorImpl.java:1001)
at com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1051)
at play.api.inject.guice.GuiceInjector.instanceOf(GuiceInjectorBuilder.scala:405)
at play.api.inject.RoutesProvider$$anonfun$2.apply(BuiltinModule.scala:82)
at play.api.inject.RoutesProvider$$anonfun$2.apply(BuiltinModule.scala:82)
at scala.Option.fold(Option.scala:158)
at play.api.inject.RoutesProvider.get$lzycompute(BuiltinModule.scala:82)
at play.api.inject.RoutesProvider.get(BuiltinModule.scala:78)
at play.api.inject.RoutesProvider.get(BuiltinModule.scala:77)
at com.google.inject.internal.ProviderInternalFactory.provision(ProviderInternalFactory.java:81)
at com.google.inject.internal.BoundProviderFactory.provision(BoundProviderFactory.java:72)
at com.google.inject.internal.ProviderInternalFactory.circularGet(ProviderInternalFactory.java:61)
at com.google.inject.internal.BoundProviderFactory.get(BoundProviderFactory.java:62)
at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:38)
at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:62)
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:104)
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:85)
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:267)
at com.google.inject.internal.FactoryProxy.get(FactoryProxy.java:56)
at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:38)
at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:62)
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:104)
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:85)
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:267)
at com.google.inject.internal.ProviderToInternalFactoryAdapter$1.call(ProviderToInternalFactoryAdapter.java:46)
at com.google.inject.internal.InjectorImpl.callInContext(InjectorImpl.java:1103)
at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)
at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:145)
at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:41)
at com.google.inject.internal.FactoryProxy.get(FactoryProxy.java:56)
at com.google.inject.internal.InternalInjectorCreator$1.call(InternalInjectorCreator.java:205)
at com.google.inject.internal.InternalInjectorCreator$1.call(InternalInjectorCreator.java:199)
at com.google.inject.internal.InjectorImpl.callInContext(InjectorImpl.java:1092)
at com.google.inject.internal.InternalInjectorCreator.loadEagerSingletons(InternalInjectorCreator.java:199)
at com.google.inject.internal.InternalInjectorCreator.injectDynamically(InternalInjectorCreator.java:180)
at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:110)
at com.google.inject.Guice.createInjector(Guice.java:96)
at com.google.inject.Guice.createInjector(Guice.java:84)
at play.api.inject.guice.GuiceBuilder.injector(GuiceInjectorBuilder.scala:181)
at play.api.inject.guice.GuiceApplicationBuilder.build(GuiceApplicationBuilder.scala:123)
at play.api.inject.guice.GuiceApplicationLoader.load(GuiceApplicationLoader.scala:21)
at play.core.server.ProdServerStart$.start(ProdServerStart.scala:47)
at play.core.server.ProdServerStart$.main(ProdServerStart.scala:22)
at play.core.server.ProdServerStart.main(ProdServerStart.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 68 more

在pontus-web中直接运行Spark作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
! @72ck129fb - Internal server error, for (GET) [/preview/HDFS/activity] ->

play.api.UnexpectedException: Unexpected exception[RuntimeException: java.lang.NoSuchMethodError: akka.actor.LocalActorRefProvider.log()Lakka/event/LoggingAdapter;]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:289)
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:220)
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
at util.Global$.onError(Global.scala:20)
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:344)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:343)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodError: akka.actor.LocalActorRefProvider.log()Lakka/event/LoggingAdapter;
at play.api.mvc.ActionBuilder$$anon$2.apply(Action.scala:463)
at play.api.mvc.Action$$anonfun$apply$2$$anonfun$apply$5$$anonfun$apply$6.apply(Action.scala:112)
at play.api.mvc.Action$$anonfun$apply$2$$anonfun$apply$5$$anonfun$apply$6.apply(Action.scala:112)
at play.utils.Threads$.withContextClassLoader(Threads.scala:21)
at play.api.mvc.Action$$anonfun$apply$2$$anonfun$apply$5.apply(Action.scala:111)
at play.api.mvc.Action$$anonfun$apply$2$$anonfun$apply$5.apply(Action.scala:110)
at scala.Option.map(Option.scala:146)
at play.api.mvc.Action$$anonfun$apply$2.apply(Action.scala:110)
at play.api.mvc.Action$$anonfun$apply$2.apply(Action.scala:103)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
Caused by: java.lang.NoSuchMethodError: akka.actor.LocalActorRefProvider.log()Lakka/event/LoggingAdapter;
at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:128)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(ReflectiveDynamicAccess.scala:32)
at scala.util.Try$.apply(Try.scala:192)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:27)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(ReflectiveDynamicAccess.scala:38)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(ReflectiveDynamicAccess.scala:38)

http://stackoverflow.com/questions/40883978/migration-to-play-2-5-leads-to-this-error-nosuchmethoderror-akka-actor-locala

监听器

1
2
3
4
5
6
7
ssc.sparkContext.addSparkListener(new SparkListener(){
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
println("PontusKafka2HDFSJobHandler onApplicationEnd....")
PontusExecutionDao.updateStatus(conf)
}
})

使用命令行测试,当关闭程序后,会打印onApplicationEnd事件。在mesos上使用kill driver直接杀死应用程序,也会调用监听器的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2016-12-13 11:34:45,534:41791(0x2b1d28803700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5
2016-12-13 11:34:45,534:41791(0x2b1d28803700):ZOO_INFO@log_env@716: Client environment:host.name=dp0653
2016-12-13 11:34:45,534:41791(0x2b1d28803700):ZOO_INFO@log_env@723: Client environment:os.name=Linux
2016-12-13 11:34:45,534:41791(0x2b1d28803700):ZOO_INFO@log_env@724: Client environment:os.arch=3.10.97-1.el6.elrepo.x86_64
2016-12-13 11:34:45,534:41791(0x2b1d28803700):ZOO_INFO@log_env@725: Client environment:os.version=#1 SMP Sat Feb 20 11:55:29 EST 2016
2016-12-13 11:34:45,534:41791(0x2b1d28803700):ZOO_INFO@log_env@733: Client environment:user.name=qihuang.zheng
2016-12-13 11:34:45,534:41791(0x2b1d28803700):ZOO_INFO@log_env@741: Client environment:user.home=/home/qihuang.zheng
2016-12-13 11:34:45,534:41791(0x2b1d28803700):ZOO_INFO@log_env@753: Client environment:user.dir=/home/qihuang.zheng
2016-12-13 11:34:45,534:41791(0x2b1d28803700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181 sessionTimeout=10000 watcher=0x2b1d000e1645 sessionId=0 sessionPasswd=<null> context=0x2b1d44000960 flags=0
2016-12-13 11:34:45,535:41791(0x2b1d2950a700):ZOO_INFO@check_events@1703: initiated connection to server [192.168.6.57:2181]
I1213 11:34:45.536334 42031 sched.cpp:222] Version: 0.28.2
2016-12-13 11:34:45,555:41791(0x2b1d2950a700):ZOO_INFO@check_events@1750: session establishment complete on server [192.168.6.57:2181], sessionId=0x3589e76ef2f82e0, negotiated timeout=10000
I1213 11:34:45.556421 42015 group.cpp:349] Group process (group(1)@192.168.6.53:45534) connected to ZooKeeper
I1213 11:34:45.556823 42015 group.cpp:831] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0)
I1213 11:34:45.556903 42015 group.cpp:427] Trying to create path '/mesos' in ZooKeeper
I1213 11:34:45.561131 42008 detector.cpp:152] Detected a new leader: (id='70')
I1213 11:34:45.561971 42011 group.cpp:700] Trying to get '/mesos/json.info_0000000070' in ZooKeeper
I1213 11:34:45.565317 42023 detector.cpp:479] A new leading master (UPID=master@192.168.6.52:5050) is detected
I1213 11:34:45.565773 42026 sched.cpp:326] New master detected at master@192.168.6.52:5050
I1213 11:34:45.567448 42026 sched.cpp:336] No credentials provided. Attempting to register without authentication
I1213 11:34:45.574049 42026 sched.cpp:703] Framework registered with 475189a7-dcde-4859-9af8-6fc2e63be94e-0558

^CPontusKafka2HDFSJobHandler onApplicationEnd....
I1213 11:42:21.526136 4384 sched.cpp:1911] Asked to stop the driver
I1213 11:42:21.526562 42020 sched.cpp:1143] Stopping framework '475189a7-dcde-4859-9af8-6fc2e63be94e-0558'

Spark Cassandra

IDE中直接运行,spark依赖包不能为provided,否则找不到Spark的相关包,运行正常

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
</dependency>

使用maven-assembly-plugin插件打包运行,spark环境使用本地安装,scope=provide

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

打包,并用完整的依赖包执行spark-submit

1
2
3
4
5
6
mvn clean package

/Users/zhengqh/Soft/spark-1.6.2-bin-hadoop2.6/bin/spark-submit \
--master "local[2]" \
--class cn.fraudmetrix.pontus.demo.CassandraReadWrite \
/Users/zhengqh/Github/vulcan/pontus-spark/target/pontus-spark-1.0.0-SNAPSHOT-jar-with-dependencies.jar

报错guava版本不对,即使手动加上guava版本也报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
        <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>

Exception in thread "main" java.lang.ExceptionInInitializerError
at com.datastax.spark.connector.cql.DefaultConnectionFactory$.clusterBuilder(CassandraConnectionFactory.scala:35)
at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:87)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:153)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at cn.fraudmetrix.pontus.demo.CassandraReadWrite$.cn$fraudmetrix$pontus$demo$CassandraReadWrite$$executeCommand(CassandraReadWrite.scala:45)
at cn.fraudmetrix.pontus.demo.CassandraReadWrite$delayedInit$body.apply(CassandraReadWrite.scala:53)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at cn.fraudmetrix.pontus.demo.CassandraReadWrite$.main(CassandraReadWrite.scala:36)
at cn.fraudmetrix.pontus.demo.CassandraReadWrite.main(CassandraReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use. This introduces codec resolution issues and potentially other incompatibility issues in the driver. Please upgrade to Guava 16.01 or later.
at com.datastax.driver.core.SanityChecks.checkGuava(SanityChecks.java:62)
at com.datastax.driver.core.SanityChecks.check(SanityChecks.java:36)
at com.datastax.driver.core.Cluster.<clinit>(Cluster.java:67)
... 29 more

参考:http://stackoverflow.com/questions/36877897/detected-guava-issue-1635-which-indicates-that-a-version-of-guava-less-than-16
添加maven-shade-plugin插件,重新打包,除了其他包,还会生成fat包,并用fat包运行,最后正常

1
2
3
4
5
6
7
8
9
10
ll target
-rw-r--r-- 1 zhengqh staff 14M 11 24 19:17 pontus-spark-1.0.0-SNAPSHOT-fat.jar
-rw-r--r-- 1 zhengqh staff 19M 11 24 19:15 pontus-spark-1.0.0-SNAPSHOT-jar-with-dependencies.jar
-rw-r--r-- 1 zhengqh staff 14K 11 24 19:15 pontus-spark-1.0.0-SNAPSHOT-sources.jar
-rw-r--r-- 1 zhengqh staff 94K 11 24 19:15 pontus-spark-1.0.0-SNAPSHOT.jar

/Users/zhengqh/Soft/spark-1.6.2-bin-hadoop2.6/bin/spark-submit \
--master "local[2]" \
--class cn.fraudmetrix.pontus.demo.CassandraReadWrite \
/Users/zhengqh/Github/vulcan/pontus-spark/target/pontus-spark-1.0.0-SNAPSHOT-fat.jar

测试读取Cassandra写入到HDFS是否正确(需要启动本地dfs和Cassandra)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/Users/zhengqh/Soft/spark-1.6.2-bin-hadoop2.6/bin/spark-submit \
--master "local[2]" \
--class cn.fraudmetrix.pontus.cassandra.PontusCassandra2HDFSJobHandler \
--conf spark.pontus.destTable="" \
--conf spark.pontus.tableId="" \
--conf spark.pontus.destType="HDFS" \
--conf spark.pontus.destUsername="" \
--conf spark.pontus.creator="" \
--conf spark.pontus.sourceUri="localhost/demo" \
--conf spark.pontus.sourcePassword="" \
--conf spark.pontus.sourceTable="sales" \
--conf spark.pontus.sourceType="Cassandra" \
--conf spark.pontus.readMode="1" \
--conf spark.pontus.tableTs="" \
--conf spark.pontus.destUri="hdfs://localhost:9000/test" \
--conf spark.pontus.name="cassandra-hdfs" \
--conf spark.pontus.destPassword="" \
--conf spark.pontus.id="1" \
--conf spark.pontus.writeMode="1" \
--conf spark.pontus.sourceUsername="" \
/Users/zhengqh/Github/vulcan/pontus-spark/target/pontus-spark-1.0.0-SNAPSHOT-fat.jar

16/11/24 19:19:16 INFO DAGScheduler: Job 0 finished: show at PontusCassandra2HDFSJobHandler.scala:42, took 4.044479 s
+---------+--------+-----+
| center|products|total|
+---------+--------+-----+
| Sevilla| 28| 3200|
| Valencia| 23| 3300|
| Bilbao| 25| 2500|
| Madrid| 51| 7700|
|Barcelona| 47| 6400|
+---------+--------+-----+

上面的Spark作业会将Parquet文件写入HDFS的根路径,可以用spark-shell测试读取parquet文件。

Spark作业执行页面:

pontus-spark

Pontus作业执行成功页面:

pontus-job

HDFS追加作业:

hdfs

空值问题:解决办法:Option

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[RuntimeException: ColumnName(PONTUS_EXECUTION.LOGOUT,Some(LOGOUT))]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:293)
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:220)
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
at util.Global$.onError(Global.scala:8)
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:344)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:343)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
Caused by: java.lang.RuntimeException: ColumnName(PONTUS_EXECUTION.LOGOUT,Some(LOGOUT))
at scala.sys.package$.error(package.scala:27)
at anorm.SqlRequestError$class.toFailure(Anorm.scala:20)
at anorm.UnexpectedNullableFound.toFailure(Anorm.scala:37)
at anorm.Sql$$anonfun$asTry$2$$anonfun$apply$7.apply(Anorm.scala:303)
at anorm.Sql$$anonfun$asTry$2$$anonfun$apply$7.apply(Anorm.scala:303)
at anorm.SqlResult$class.fold(SqlResult.scala:23)
at anorm.Error.fold(SqlResult.scala:31)
at anorm.Sql$$anonfun$asTry$2.apply(Anorm.scala:303)
at anorm.Sql$$anonfun$asTry$2.apply(Anorm.scala:303)
at scala.util.Success.flatMap(Try.scala:231)

预览

http://192.168.6.53:9000/preview/HDFS/%2Fuser%2Ftongdun%2Fpontus%2Fantifraud%2Ffeedback%2Fyear=2016%2Fmonth=12%2Fday=15

http://192.168.6.53:9000/preview/HDFS/hdfs:%2F%2F192.168.6.52:9000%2Fuser%2Ftongdun%2Fpontus%2Fantifraud%2Ffeedback%2Fyear=2016%2Fmonth=12%2Fday=15

http://192.168.6.53:9000/preview/HDFS/hdfs:%2F%2Ftdfs%2Fuser%2Ftongdun%2Fpontus%2Fantifraud%2Ffeedback%2Fyear=2016%2Fmonth=12%2Fday=15

Actor

1
2
3
4
5
6
7
SparkJobActor preStart().............
PontusSchedule(1,1,qihuang.zheng,2016-11-29 21:10:00.0,5d)
[warn] o.a.h.u.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Global onStart().............
mode:demo
PontusSchedule(1,1,qihuang.zheng,2016-11-29 21:10:00.0,5d)
[info] play.api.Play - Application started (Dev)

Quartz

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.quartz.JobBuilder._
import org.quartz.TriggerBuilder._
import org.quartz.SimpleScheduleBuilder._
import org.quartz.CronScheduleBuilder._
import org.quartz.CalendarIntervalScheduleBuilder._
import org.quartz.JobKey._
import org.quartz.TriggerKey._
import org.quartz.DateBuilder._
import org.quartz.impl.matchers.KeyMatcher._
import org.quartz.impl.matchers.GroupMatcher._
import org.quartz.impl.matchers.AndMatcher._
import org.quartz.impl.matchers.OrMatcher._
import org.quartz.impl.matchers.EverythingMatcher._

Job依赖注入

Job注入其他实例报错无法实例化,那么如何注入DAO对象?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class PontusSparkJob @Inject()(configuration: Configuration,
executionRepository: ExecutionRepository,
scheduleRepository: ScheduleRepository,
jobRepository: JobRepository
) extends Job {
}

[error] o.q.c.ErrorLogger - An error occured instantiating job to be executed. job= 'Cassandra.pontusjob_1'
org.quartz.SchedulerException: Problem instantiating class 'schedule.PontusSparkJob'
at org.quartz.simpl.SimpleJobFactory.newJob(SimpleJobFactory.java:58)
at org.quartz.simpl.PropertySettingJobFactory.newJob(PropertySettingJobFactory.java:69)
at org.quartz.core.JobRunShell.initialize(JobRunShell.java:127)
at org.quartz.core.QuartzSchedulerThread.run(QuartzSchedulerThread.java:375)
Caused by: java.lang.InstantiationException: schedule.PontusSparkJob
at java.lang.Class.newInstance(Class.java:427)
at org.quartz.simpl.SimpleJobFactory.newJob(SimpleJobFactory.java:56)
... 3 common frames omitted
Caused by: java.lang.NoSuchMethodException: schedule.PontusSparkJob.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 4 common frames omitted

解决办法:启动时通过Global注册Application实例,在Job中通过Scheduler获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object Global extends GlobalSettings {
override def onStart(application: play.api.Application) {
schedule.PontusQuartzScheduler.scheduler.start()
schedule.PontusQuartzScheduler.scheduler.getContext.put("configuration", application.configuration)
schedule.PontusQuartzScheduler.scheduler.getContext.put("application", application)
}
}

class PontusSparkJob extends Job {
override def execute(context: JobExecutionContext): Unit = {
val application = context.getScheduler.getContext.get("application").asInstanceOf[Application]
val configuration = context.getScheduler.getContext.get("configuration").asInstanceOf[Configuration]
val jobRepository = application.injector.instanceOf(classOf[JobRepository])
val executionRepository = application.injector.instanceOf(classOf[ExecutionRepository])

val jobId = context.getMergedJobDataMap.getString("jobId").toLong
}
}

TODO:自定义JobFactory

mesos任务报错,页面会显示申请不到资源:

1
2
3
4
5
I1205 15:15:20.217545 19517 exec.cpp:143] Version: 0.28.2
I1205 15:15:20.243422 19521 exec.cpp:217] Executor registered on slave 475189a7-dcde-4859-9af8-6fc2e63be94e-S0
Unrecognized VM option 'UseCompressedStrings'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

去掉spark中executor的jvm配置

Quartz作业的自动重启

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
创建时:

mysql> select * from QRTZ_TRIGGERS;
+-----------------+--------------+---------------+-------------+-----------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
| SCHED_NAME | TRIGGER_NAME | TRIGGER_GROUP | JOB_NAME | JOB_GROUP | DESCRIPTION | NEXT_FIRE_TIME | PREV_FIRE_TIME | PRIORITY | TRIGGER_STATE | TRIGGER_TYPE | START_TIME | END_TIME | CALENDAR_NAME | MISFIRE_INSTR | JOB_DATA |
+-----------------+--------------+---------------+-------------+-----------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
| QuartzScheduler | trigger2_1 | Cassandra | pontusjob_1 | Cassandra | NULL | 1480523100000 | 1480519500000 | 5 | WAITING | SIMPLE | 1480515900000 | 0 | NULL | 1 | |
+-----------------+--------------+---------------+-------------+-----------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
1 row in set (10.33 sec)

创建时间(START_TIME): 1480515900000 - 2016年11月30日 星期三 22时25分00秒 CST
第一次执行时间(PREV_FIRE_TIME):1480519500000 - 2016年11月30日 星期三 23时25分00秒 CST
下一次执行时间(NEXT_FIRE_TIME):1480523100000 - 2016年12月 1日 星期四 00时25分00秒 CST

停止服务器

第二天早上重启,重启后如果没有访问页面,仍然不会调度,但是只要登录,就会开始调度

mysql> select * from QRTZ_TRIGGERS;
+-----------------+--------------+---------------+-------------+-----------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
| SCHED_NAME | TRIGGER_NAME | TRIGGER_GROUP | JOB_NAME | JOB_GROUP | DESCRIPTION | NEXT_FIRE_TIME | PREV_FIRE_TIME | PRIORITY | TRIGGER_STATE | TRIGGER_TYPE | START_TIME | END_TIME | CALENDAR_NAME | MISFIRE_INSTR | JOB_DATA |
+-----------------+--------------+---------------+-------------+-----------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
| QuartzScheduler | trigger2_1 | Cassandra | pontusjob_1 | Cassandra | NULL | 1480561432130 | 1480557832130 | 5 | WAITING | SIMPLE | 1480557832130 | 0 | NULL | 1 | |
+-----------------+--------------+---------------+-------------+-----------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
1 row in set (0.01 sec)

START_TIME: 1480557832130 - 2016年12月 1日 星期四 10时03分52秒 CST
PREV_FIRE_TIME: 1480557832130 - 2016年12月 1日 星期四 10时03分52秒 CST
NEXT_FIRE_TIME: 1480561432130 - 2016年12月 1日 星期四 11时03分52秒 CST

相同Job不允许同时有两个Trigger在运行。解决办法:可以设置不同的JobId。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
! @729o8hm27 - Internal server error, for (GET) [/job/execute/Cassandra/HDFS/2] ->

play.api.UnexpectedException: Unexpected exception[ObjectAlreadyExistsException: Unable to store Job : 'Cassandra-HDFS.pontusjob_2', because one already exists with this identification.]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:289)
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:220)
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
at util.Global$.onError(Global.scala:23)
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:344)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:343)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
Caused by: org.quartz.ObjectAlreadyExistsException: Unable to store Job : 'Cassandra-HDFS.pontusjob_2', because one already exists with this identification.
at org.quartz.impl.jdbcjobstore.JobStoreSupport.storeJob(JobStoreSupport.java:1108)
at org.quartz.impl.jdbcjobstore.JobStoreSupport$2.executeVoid(JobStoreSupport.java:1062)
at org.quartz.impl.jdbcjobstore.JobStoreSupport$VoidTransactionCallback.execute(JobStoreSupport.java:3715)
at org.quartz.impl.jdbcjobstore.JobStoreSupport$VoidTransactionCallback.execute(JobStoreSupport.java:3713)
at org.quartz.impl.jdbcjobstore.JobStoreSupport.executeInNonManagedTXLock(JobStoreSupport.java:3799)
at org.quartz.impl.jdbcjobstore.JobStoreTX.executeInLock(JobStoreTX.java:93)
at org.quartz.impl.jdbcjobstore.JobStoreSupport.storeJobAndTrigger(JobStoreSupport.java:1058)
at org.quartz.core.QuartzScheduler.scheduleJob(QuartzScheduler.java:886)
at org.quartz.impl.StdScheduler.scheduleJob(StdScheduler.java:249)
at controllers.PontusJobController$$anonfun$execute$1.apply(PontusJobController.scala:72)

spark streaming一次性作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
mysql> select * from QRTZ_TRIGGERS;
+-----------------+--------------+----------------+--------------+----------------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
| SCHED_NAME | TRIGGER_NAME | TRIGGER_GROUP | JOB_NAME | JOB_GROUP | DESCRIPTION | NEXT_FIRE_TIME | PREV_FIRE_TIME | PRIORITY | TRIGGER_STATE | TRIGGER_TYPE | START_TIME | END_TIME | CALENDAR_NAME | MISFIRE_INSTR | JOB_DATA |
+-----------------+--------------+----------------+--------------+----------------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
| QuartzScheduler | trigger1_2 | Kafka-HDFS | pontusjob1_2 | Kafka-HDFS | NULL | -1 | 1481165690342 | 5 | COMPLETE | SIMPLE | 1481165690342 | 0 | NULL | 0 | |
| QuartzScheduler | trigger2_1 | Cassandra-HDFS | pontusjob2_1 | Cassandra-HDFS | NULL | 1481165913700 | 1481165433700 | 5 | WAITING | SIMPLE | 1481103993700 | 0 | NULL | 1 | |
+-----------------+--------------+----------------+--------------+----------------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
2 rows in set (0.01 sec)

mysql> select * from QRTZ_SIMPLE_TRIGGERS;
+-----------------+--------------+----------------+--------------+-----------------+-----------------+
| SCHED_NAME | TRIGGER_NAME | TRIGGER_GROUP | REPEAT_COUNT | REPEAT_INTERVAL | TIMES_TRIGGERED |
+-----------------+--------------+----------------+--------------+-----------------+-----------------+
| QuartzScheduler | trigger1_2 | Kafka-HDFS | 0 | 0 | 1 |
| QuartzScheduler | trigger2_1 | Cassandra-HDFS | -1 | 480000 | 129 |
+-----------------+--------------+----------------+--------------+-----------------+-----------------+
2 rows in set (0.00 sec)

过了会儿,一次性的任务信息会从Quartz删除,但是实际上spark streaming还在运行!
这时,如果再次点击执行,虽然triggerKey不存在,但是也不应该允许再次执行!否则就会存在两个spark streaming程序!
还有一个问题:由于spark streaming不会结束,所以无法看到日志,状态也永远是正在执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> select * from QRTZ_SIMPLE_TRIGGERS;
+-----------------+--------------+----------------+--------------+-----------------+-----------------+
| SCHED_NAME | TRIGGER_NAME | TRIGGER_GROUP | REPEAT_COUNT | REPEAT_INTERVAL | TIMES_TRIGGERED |
+-----------------+--------------+----------------+--------------+-----------------+-----------------+
| QuartzScheduler | trigger2_1 | Cassandra-HDFS | -1 | 480000 | 129 |
+-----------------+--------------+----------------+--------------+-----------------+-----------------+
1 row in set (0.01 sec)

mysql> select * from QRTZ_TRIGGERS;
+-----------------+--------------+----------------+--------------+----------------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
| SCHED_NAME | TRIGGER_NAME | TRIGGER_GROUP | JOB_NAME | JOB_GROUP | DESCRIPTION | NEXT_FIRE_TIME | PREV_FIRE_TIME | PRIORITY | TRIGGER_STATE | TRIGGER_TYPE | START_TIME | END_TIME | CALENDAR_NAME | MISFIRE_INSTR | JOB_DATA |
+-----------------+--------------+----------------+--------------+----------------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
| QuartzScheduler | trigger2_1 | Cassandra-HDFS | pontusjob2_1 | Cassandra-HDFS | NULL | 1481165913700 | 1481165433700 | 5 | WAITING | SIMPLE | 1481103993700 | 0 | NULL | 1 | |
+-----------------+--------------+----------------+--------------+----------------+-------------+----------------+----------------+----------+---------------+--------------+---------------+----------+---------------+---------------+----------+
1 row in set (0.01 sec)

Kafka

创建测试主题

1
2
3
bin/kafka-topics.sh --create --zookeeper 192.168.6.55:2181 --replication-factor 1 --partitions 1 --topic graylog_nginx
bin/kafka-console-producer.sh --broker-list 192.168.6.55:9092,192.168.6.56:9092,192.168.6.57:9092 --topic graylog_nginx
bin/kafka-console-consumer.sh --zookeeper 192.168.6.55:2181 --topic graylog_nginx --from-beginning

准备json数据

1
2
3
4
5
6
7
8
9
10
11
12
{"id":1,"name":"Betty","email":"bsmithrl@simplemachines.org","city":"Eláteia","country":"Greece","ip":"9.19.204.44"}
{"id":2,"name":"Anna","email":"alewisrm@canalblog.com","city":"Shangjing","country":"China","ip":"14.207.119.126"}
{"id":3,"name":"David","email":"dgarrettrn@japanpost.jp","city":"Tsarychanka","country":"Ukraine","ip":"111.252.63.159"}
{"id":4,"name":"Heather","email":"hgilbertro@skype.com","city":"Koilás","country":"Greece","ip":"29.57.181.250"}
{"id":5,"name":"Diane","email":"ddanielsrp@statcounter.com","city":"Mapiripán","country":"Colombia","ip":"19.205.181.99"}
{"id":6,"name":"Philip","email":"pfullerrq@reuters.com","city":"El Cairo","country":"Colombia","ip":"210.248.121.194"}
{"id":7,"name":"Maria","email":"mfordrr@shop-pro.jp","city":"Karabash","country":"Russia","ip":"224.21.41.52"}

{"id":8,"username":"Bety","email":"bsmithrl@simplemachines.org","city":"Eláteia","country":"Greece","ipAddress":"9.19.204.44"}
{"id":9,"username":"Ana","email":"alewisrm@canalblog.com","city":"Shangjing","country":"China","ipAddress":"14.207.119.126"}
{"id":10,"username":"David","email":"dgarrettrn@japanpost.com","city":"Tsarychanka","country":"Ukraine","ipAddress":"111.252.63.159"}
{"id":11,"name":"Calis","email":"carlis@google.com","city":"Koran","country":"Greece","ip":"29.57.181.250"}

附录

sbt私服

安装repox:https://github.com/Centaur/repox/wiki
可以在本机开发环境编译jar包,再上次到服务器(因为服务器上可能没有编译所需的node环境)。

启动repox,然后打开:http://192.168.6.53:8078/admin/admin.html#/upstreams,密码:zhimakaimen

1
nohup java -Xmx512m -jar repox-assembly-0.1-SNAPSHOT.jar &

~/.sbt/repositories添加两行配置:

1
2
repox-maven: http://192.168.6.53:8078/
repox-ivy: http://192.168.6.53:8078/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]

完整的repositories文件:

1
2
3
4
5
6
7
8
9
10
11
[repositories]
local
local-maven: file:///Users/zhengqh/.m2/repository/
repox-maven: http://192.168.6.53:8078/
repox-ivy: http://192.168.6.53:8078/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]
#osc: http://maven.oschina.net/content/groups/public/
#oschina-ivy: http://maven.oschina.net/content/groups/public/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots

使用activator运行play时,创建activatorconfig.txt文件,添加以下配置:

1
2
$ cat ~/.activator/activatorconfig.txt
-Dsbt.override.build.repos=true

命令行运行activator,如果出现downloading http://192.168.6.53:8078,表示私服搭建成功。
如果本地不存在jar包,则私服会自动下载,然后再下载到本地。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
➜  first-player git:(master) ✗ ./activator run
[info] Loading global plugins from /Users/zhengqh/.sbt/0.13/plugins
[info] Loading project definition from /Users/zhengqh/Github/first-player/project
[info] Updating {file:/Users/zhengqh/Github/first-player/project/}first-player-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] downloading http://192.168.6.53:8078/com.typesafe.play/sbt-plugin/scala_2.10/sbt_0.13/2.5.10/jars/sbt-plugin.jar ...
[info] [SUCCESSFUL ] com.typesafe.play#sbt-plugin;2.5.10!sbt-plugin.jar (7242ms)
[info] downloading http://192.168.6.53:8078/com/typesafe/play/sbt-routes-compiler_2.10/2.5.10/sbt-routes-compiler_2.10-2.5.10.jar ...
[info] [SUCCESSFUL ] com.typesafe.play#sbt-routes-compiler_2.10;2.5.10!sbt-routes-compiler_2.10.jar (9753ms)
[info] downloading http://192.168.6.53:8078/com/typesafe/play/sbt-run-support_2.10/2.5.10/sbt-run-support_2.10-2.5.10.jar ...
[info] [SUCCESSFUL ] com.typesafe.play#sbt-run-support_2.10;2.5.10!sbt-run-support_2.10.jar (6686ms)
[info] downloading http://192.168.6.53:8078/com/typesafe/play/build-link/2.5.10/build-link-2.5.10.jar ...
[info] [SUCCESSFUL ] com.typesafe.play#build-link;2.5.10!build-link.jar (6589ms)
[info] downloading http://192.168.6.53:8078/com/typesafe/play/play-exceptions/2.5.10/play-exceptions-2.5.10.jar ...
[info] [SUCCESSFUL ] com.typesafe.play#play-exceptions;2.5.10!play-exceptions.jar (2358ms)
[info] Done updating.

使用sbt运行时(sbt run),需要更改sbt的选项,比如下面是mac环境使用brew安装sbt配置

1
2
3
4
5
6
7
8
$ cat /usr/local/bin/sbt
#!/bin/sh
export SBT_OPTS="-Dsbt.override.build.repos=true"
if [ -f "$HOME/.sbtconfig" ]; then
echo "Use of ~/.sbtconfig is deprecated, please migrate global settings to /usr/local/etc/sbtopts" >&2
. "$HOME/.sbtconfig"
fi
exec "/usr/local/Cellar/sbt/0.13.8/libexec/sbt" "$@"

IDEA开发环境,还需要配置:

3

Play MultiProject

参考:https://github.com/aaronp/multi-project

增加play后,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java.lang.ClassCastException: org.slf4j.helpers.NOPLoggerFactory cannot be cast to ch.qos.logback.classic.LoggerContext
at play.api.libs.logback.LogbackLoggerConfigurator.configure(LogbackLoggerConfigurator.scala:80)
at play.api.libs.logback.LogbackLoggerConfigurator.init(LogbackLoggerConfigurator.scala:26)
at play.core.server.DevServerStart$$anonfun$mainDev$1.apply(DevServerStart.scala:94)
at play.core.server.DevServerStart$$anonfun$mainDev$1.apply(DevServerStart.scala:65)
at play.utils.Threads$.withContextClassLoader(Threads.scala:21)
at play.core.server.DevServerStart$.mainDev(DevServerStart.scala:64)
at play.core.server.DevServerStart$.mainDevHttpMode(DevServerStart.scala:54)
at play.core.server.DevServerStart.mainDevHttpMode(DevServerStart.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at play.runsupport.Reloader$.startDevMode(Reloader.scala:207)
at play.sbt.run.PlayRun$$anonfun$playRunTask$1$$anonfun$apply$2$$anonfun$apply$3.devModeServer$lzycompute$1(PlayRun.scala:73)
at play.sbt.run.PlayRun$$anonfun$playRunTask$1$$anonfun$apply$2$$anonfun$apply$3.play$sbt$run$PlayRun$$anonfun$$anonfun$$anonfun$$devModeServer$1(PlayRun.scala:73)
at play.sbt.run.PlayRun$$anonfun$playRunTask$1$$anonfun$apply$2$$anonfun$apply$3.apply(PlayRun.scala:99)
at play.sbt.run.PlayRun$$anonfun$playRunTask$1$$anonfun$apply$2$$anonfun$apply$3.apply(PlayRun.scala:52)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
[trace] Stack trace suppressed: run last web/compile:run for the full output.
[error] (web/compile:run) java.lang.reflect.InvocationTargetException
[error] Total time: 37 s, completed 2017-3-8 20:28:39
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory cannot be cast to ch.qos.logback.classic.LoggerContext
at play.api.libs.logback.LogbackLoggerConfigurator.configure(LogbackLoggerConfigurator.scala:80)
at play.api.libs.logback.LogbackLoggerConfigurator.init(LogbackLoggerConfigurator.scala:26)
at play.core.server.DevServerStart$$anonfun$mainDev$1.apply(DevServerStart.scala:94)
at play.core.server.DevServerStart$$anonfun$mainDev$1.apply(DevServerStart.scala:65)
at play.utils.Threads$.withContextClassLoader(Threads.scala:21)
at play.core.server.DevServerStart$.mainDev(DevServerStart.scala:64)
at play.core.server.DevServerStart$.mainDevHttpMode(DevServerStart.scala:54)
at play.core.server.DevServerStart.mainDevHttpMode(DevServerStart.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at play.runsupport.Reloader$.startDevMode(Reloader.scala:234)
at play.sbt.run.PlayRun$$anonfun$playRunTask$1$$anonfun$apply$2$$anonfun$apply$3.devModeServer$lzycompute$1(PlayRun.scala:74)
at play.sbt.run.PlayRun$$anonfun$playRunTask$1$$anonfun$apply$2$$anonfun$apply$3.play$sbt$run$PlayRun$$anonfun$$anonfun$$anonfun$$devModeServer$1(PlayRun.scala:74)
at play.sbt.run.PlayRun$$anonfun$playRunTask$1$$anonfun$apply$2$$anonfun$apply$3.apply(PlayRun.scala:100)
at play.sbt.run.PlayRun$$anonfun$playRunTask$1$$anonfun$apply$2$$anonfun$apply$3.apply(PlayRun.scala:53)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
[trace] Stack trace suppressed: run last web/compile:run for the full output.
[error] (web/compile:run) java.lang.reflect.InvocationTargetException
[error] Total time: 3 s, completed 2017-3-8 20:16:56

解决方式:

1
2
3
4
5
val webExcludeDependencies = Seq(
SbtExclusionRule("org.slf4j" ,"slf4j-simple"),
SbtExclusionRule("org.slf4j", "slf4j-jdk12") ,
SbtExclusionRule("org.slf4j", "slf4j-log4j12")
)

Run & Debug With IDEA

参考:http://stackoverflow.com/questions/24218341/how-to-run-play-framework-2-x-in-debug-mode-in-intellij-idea
http://stackoverflow.com/questions/22195364/debugging-sbt-project-with-play-in-intellij-idea
http://stackoverflow.com/questions/4150776/debugging-scala-code-with-simple-build-tool-sbt-and-intellij

命令行运行: sbt web/run -Dconfig.resource=local/application.zqh.conf -Dhttp.port=9091

1. Play App(❌)

IDEA中运行Play App,或者直接在Controller上右键点击Run Play 2 App

1

sbt多模块无法定位到web

1
2
3
4
5
6
7
8
9
10
11
[info] Loading project definition from /Users/zhengqh/Github/pontus/project
[info] Updating {file:/Users/zhengqh/Github/pontus/project/}pontus-build...
[info] Compiling 3 Scala sources to /Users/zhengqh/Github/pontus/project/target/scala-2.10/sbt-0.13/classes...
[info] Set current project to pontus-root (in build file:/Users/zhengqh/Github/pontus/)
[warn] compile:run::javaOptions will be ignored, compile:run::fork is set to false
java.lang.RuntimeException: No main class detected.
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run 'last root/compile:run' for the full output.
[error] (root/compile:run) No main class detected.
[error] Total time: 0 s, completed 2017-5-12 10:35:01
Disconnected from the target VM, address: '127.0.0.1:62036', transport: 'socket'

只有Play工程,应该是可以的

2. Sbt Task(✅)

IDEA中运行Sbt Task,注意-D参数必须添加到VM arguments,不能放在Tasks中

2

3. jvm-debug(✅)

Play工程的命令行执行方式:sbt -jvm-debug 9999 “run 9091”

Sbt多模块的命令行执行方式:sbt -jvm-debug 9999 web/run -Dconfig.resource=local/application.zqh.conf -Dhttp.port=9091

先在命令行启动sbt,然后在IDEA中配置Remote

3

用启动Remote的debug,设置断点,可以进入断点,控制器会打印

Connected to the target VM, address: ‘localhost:9999’, transport: ‘socket’

访问:http://localhost:9091/api/v1/job ,进入DEBUG模式

4


文章目录
  1. 1. Run! Run!! Run!!!
  2. 2. Hello World!
    1. 2.1. 路由和Controller
    2. 2.2. 页面和渲染
      1. 2.2.1. 模板编译
  3. 3. Products Example
    1. 3.1. implicit
  4. 4. Spark集成
    1. 4.1. play-spark-module
    2. 4.2. Spark Launcher
      1. 4.2.1. mesos
      2. 4.2.2. 问题
    3. 4.3. Spark Cassandra
    4. 4.4. 预览
    5. 4.5. Actor
    6. 4.6. Quartz
      1. 4.6.1. Job依赖注入
      2. 4.6.2. Quartz作业的自动重启
    7. 4.7. Kafka
  5. 5. 附录
    1. 5.1. sbt私服
    2. 5.2. Play MultiProject
    3. 5.3. Run & Debug With IDEA
      1. 5.3.1. 1. Play App(❌)
      2. 5.3.2. 2. Sbt Task(✅)
      3. 5.3.3. 3. jvm-debug(✅)