Hadoop-尝试让Spark流媒体从网站上读取数据流,什么是套接字?


0

我正在尝试将这些数据http://stream.meetup.com/2/rsvps导入spark stream

它们是JSON对象,我知道行是字符串,我只希望在尝试JSON之前它能 job。

我不知道把什么作为端口,我想这就是问题所在。

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Spark Streaming");

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("http://stream.meetup.com/2/rsvps", 80);

lines.print();

jssc.start();
jssc.awaitTermination();

这是我的错误

java.net.UnknownHostException: http://stream.meetup.com/2/rsvps
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at java.net.Socket.connect(Socket.java:528)
    at java.net.Socket.<init>(Socket.java:425)
    at java.net.Socket.<init>(Socket.java:208)

2 答案

0

sockettexstream不是设计为作为HTTP客户机 job的。正如您注意到的,您需要创建一个自定义接收器,一个可能的启动位置是基于作为meetup流数据源一部分创建的接收器(请参见https://github.com/actions/meetup-stream/blob/master/src/main/scala/receiver/meetupreceiver.scala)。


0

下面是一个自定义URLReceiver,它遵循自定义接收器上的Spark文档:

class UrlReceiver(urlStr: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

override def onStart() = {
new Thread("Url Receiver") {
override def run() = {
val urlConnection: URLConnection = new URL(urlStr).openConnection
val bufferedReader: BufferedReader = new BufferedReader(
new InputStreamReader(urlConnection.getInputStream)
)
var msg = bufferedReader.readLine
while (msg != null) {
if (!msg.isEmpty) {
store(msg)
}
msg = bufferedReader.readLine
}
bufferedReader.close()
}
}.start()
}

override def onStop() = {
// nothing to do
}
}

然后这样使用:

val lines = sc.receiverStream(new UrlReceiver("http://stream.meetup.com/2/rsvps"))

我来回答