Scala- Flinkjava.lang.ClassNotFoundException: play.api.libs.json.Reads


0

我正在编写一个从文本文件读取并将每一行都解析为Json的代码,但是Flink(1.0.3-包含2个TaskManager的集群)中的 job失败,原因是:java.lang.ClassNotFoundException: play.api.libs.json.Reads

我的代码:

import org.apache.flink.streaming.api.scala._
import play.api.libs.json.Json
import org.joda.time.{DateTime, DateTimeZone}

object rabbitjob {

case class MyJson(pr: Long,
dv: Long,
ty: Int,
cr: String,
rc: String,
vl: Boolean,
ss: String,
id: Long
)

def main (args:Array[String]){
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs:///test/ignicion.io")

implicit val myJsonReads = Json.reads[MyJson]

def jsontr(cuerpo: String): Unit ={

 val inputJson = Json.parse(cuerpo)
 val myJsonInstance: MyJson = inputJson.as[MyJson]

 println(DateTime.now(DateTimeZone.UTC).getMillis() + " " +  myJsonInstance.cr + " " + matchtype(myJsonInstance.ty) + " " + " " + matchvalue(myJsonInstance.vl))

def matchtype (x: Int): String = x match{
case 1 => "Door"
case 2 => "Window"
case _ => "otros"
}

def matchvalue (x: Boolean): String = x match{
case true => "ON"
case false => "OFF"
}
}

println(stream)
stream.map(jsontr(_))

env.execute("Test Rabbit")
}
}

你知道吗??提前谢谢你

1 答案


0

你需要把你的程序和你的外部依赖文件一起提交到一个fat文件里。

Flink文档展示了如何使用Maven实现这一点。


我来回答

写文章

提问题

面试题