java-apacheflinkwikipedia用Scala编辑分析


0

我试图将apacheflink教程中的wikipedia编辑流分析重写为Scalahttps://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html

本教程中的代码是

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WikipediaAnalysis {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
  .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    @Override
    public String getKey(WikipediaEditEvent event) {
      return event.getUser();
    }
  });

DataStream<Tuple2<String, Long>> result = keyedEdits
  .timeWindow(Time.seconds(5))
  .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    @Override
    public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
      acc.f0 = event.getUser();
      acc.f1 += event.getByteDiff();
      return acc;
    }
  });

result.print();

see.execute();

}
}

下面是我在scala中的尝试

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.windowing.time.Time


object WikipediaAnalytics extends App{

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val edits = env.addSource(new WikipediaEditsSource());

val keyedEdits = edits.keyBy(event => event.getUser)

val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) =>
(we.getUser, t._2 + we.getByteDiff))

}

这或多或少是一个到scala的字到字的 transformation,基于这个 transformation,val结果的类型应该是DataStream[(String,Long)],但是fold()之后推断出的实际类型不是where close。

请帮助确定scala代码有什么问题

编辑1:使用折叠[R]的当前示意图进行了以下更改,类型现在与预期类型相符,但仍然无法确定原因

  val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] =
    keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L))

val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent ) =>
(we.getUser, t._2 + we.getByteDiff))

1 答案


0

问题似乎出在折叠上,你必须在你的累加器初始值后面有一个右括号。当您修复这个问题时,代码将无法编译,因为它没有wikipediaditevent可用的类型信息。解决这个问题最简单的方法是导入更多的flinkscalaapi。完整示例见下文:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
import org.apache.flink.streaming.api.windowing.time.Time

object WikipediaAnalytics extends App {
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits = see.addSource(new WikipediaEditsSource())
val userEditsVolume: DataStream[(String, Int)] = edits
.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
userEditsVolume.print()
see.execute("Wikipedia User Edit Volume")
}


我来回答

写文章

提问题

面试题