flatMap函数中scala-Apache-Flink流类型不匹配


0

尝试在Scala2.10.4中使用0.10.0Flink版本的流api。在尝试编译此第一个版本时:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._

object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)

val words : DataStream[String] = text.flatMap[String](
  new Function[String,TraversableOnce[String]] { 
    def apply(line:String):TraversableOnce[String] = line.split(" ")
  })

env.execute("Window Stream wordcount")

}
}

我遇到编译时错误:

[error]  found   : String => TraversableOnce[String]
[error]  required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error]       new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error]       ^

在反编译版本的数据流.class我在项目中包含了一些接受这种类型的函数(最后一个):

public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
        if (flatMapper == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        TypeInformation outType = (TypeInformation)Predef..MODULE$.implicitly(evidence$12);
        return package..MODULE$.javaToScalaStream((org.apache.flink.streaming.api.datastream.DataStream)this.javaStream.flatMap(flatMapper).returns(outType));
    }

public &lt;R&gt; DataStream&lt;R&gt; flatMap(Function2&lt;T, Collector&lt;R&gt;, BoxedUnit&gt; fun, TypeInformation&lt;R&gt; evidence$14, ClassTag&lt;R&gt; evidence$15) {
    if (fun == null) {
        throw new NullPointerException("FlatMap function must not be null.");
    }
    Function2&lt;T, Collector&lt;R&gt;, BoxedUnit&gt; cleanFun = this.clean((F)fun);
    .anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
    return this.flatMap((FlatMapFunction&lt;T, R&gt;)flatMapper, evidence$14, evidence$15);
}

public &lt;R&gt; DataStream&lt;R&gt; flatMap(Function1&lt;T, TraversableOnce&lt;R&gt;&gt; fun, TypeInformation&lt;R&gt; evidence$16, ClassTag&lt;R&gt; evidence$17) {
    if (fun == null) {
        throw new NullPointerException("FlatMap function must not be null.");
    }
    Function1&lt;T, TraversableOnce&lt;R&gt;&gt; cleanFun = this.clean((F)fun);
    .anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
    return this.flatMap((FlatMapFunction&lt;T, R&gt;)flatMapper, evidence$16, evidence$17);
}

这里可能有什么问题?如果您能提供一些见解,我将不胜感激。

1 答案


0

问题是您正在导入Flink的Java StreamExecutionEnvironment:网址:apache.flink. streaming.api.environment.StreamExecutionEnvironment。

您必须像这样使用StreamExecutionEnvironment的Scala变体:import网址:apache.flink. streaming.api.scala.StreamExecutionEnvironment。

原始答案:

 val words : DataStream[String] = text.flatMap[String](
      new FlatMapFunction[String,String] {
        override def flatMap(t: String, collector: Collector[String]): Unit = t.split(" ")
      })

我来回答

写文章

提问题

面试题