ApacheFlink-使用数据流中的值动态创建流数据源


0

我尝试使用Apache Flink构建一个示例应用程序,它执行以下操作:

    从Kafka队列读取股票符号流(例如“CSCO”、“FB”)。

*更新到原始帖子*

我将map函数移到一个单独的类中,没有得到运行时错误消息“MapFunction的实现不再是可序列化的”。对象可能包含或引用了不可序列化的字段”。

我现在面临的问题是,我试图写价格的 Kafka 主题“股票价格”没有收到。我正在尝试解决问题,并将发布任何更新。

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

    Properties properties = new Properties(); 
    properties.setProperty("bootstrap.servers", "localhost:9092"); 
    properties.setProperty("zookeeper.connect", "localhost:2181"); 
    properties.setProperty("group.id", "stocks"); 

    DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

    DataStream<String> stockPrice = 
        streamOfStockSymbols 
        //get unique keys 
        .keyBy(new KeySelector<String, String>() { 
            @Override 
            public String getKey(String trend) throws Exception {
                return trend; 
            }
            }) 
        //collect events over a window 
        .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
        //return the last event from the window...all elements are the same "Symbol" 
        .apply(new WindowFunction<String, String, String, TimeWindow>() {
            @Override 
            public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
                out.collect(input.iterator().next().toString()); 
            }
        })
        .map(new StockSymbolToPriceMapFunction());

    streamExecEnv.execute("Retrieve Stock Prices"); 
}

}

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
@Override
public String map(String stockSymbol) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);

    DataStream&lt;String&gt; stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
    stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08&lt;String&gt;("localhost:9092", "stockprices", new SimpleStringSchema()));

    return "100000";
}

private static class CustomKeySelector implements KeySelector&lt;String, String&gt; {
    @Override
    public String getKey(String arg0) throws Exception {
        return arg0.trim();
    }
}

}

public class LookupStockPrice extends RichSourceFunction<String> {
public String stockSymbol = null;
public boolean isRunning = true;

public LookupStockPrice(String inSymbol) { 
        stockSymbol = inSymbol; 
} 

@Override 
public void open(Configuration parameters) throws Exception { 
        isRunning = true; 
} 


@Override 
public void cancel() { 
        isRunning = false; 
} 

@Override 
public void run(SourceFunction.SourceContext&lt;String&gt; ctx) 
                throws Exception { 
        String stockPrice = "0";
        while (isRunning) { 
            //TODO: query Google Finance API 
            stockPrice = Integer.toString((new Random()).nextInt(100)+1);
            ctx.collect(stockPrice);
            Thread.sleep(10000);
        } 
} 

}

1 答案


0

StreamExecutionEnvironment不会缩进以在流式应用程序的运算符内部使用。不是故意的意思,这是不被 test和鼓励的。它可能会 job 并做一些事情,但很可能表现不好,并可能会杀死您的应用程序。

程序中的stocksymboltopricemap函数为每个传入的记录指定一个全新的独立的流式应用程序。但是,既然你不 调用拖缆执行()程序未启动,map方法返回时不执行任何操作。

如果你愿意 调用拖缆执行(),函数将在workers JVM中启动一个新的 localFlink集群,并在这个 localFlink集群上启动应用程序。 localFlink实例将占用大量堆空间,在启动几个集群之后,worker可能会由于OutOfMemoryError而死亡,这不是您希望发生的情况。


我来回答

写文章

提问题

面试题