java—使用Flink数据流计算窗口持续时间的平均值


0

我使用的是flinkdatastreamapi,在那里可以找到机架,我想 root据机架id计算温度组的“平均值”。我的窗口持续时间是40秒,窗口每10秒滑动一次……下面是我的代码,我每10秒计算一次rackID的温度总和,但现在我要计算平均温度:

static Properties properties=new Properties();
    public static Properties getProperties()
    {
        properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
        properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
        //properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
        //properties.setProperty("group.id", "akshay");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }

@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props=Program.getProperties();
DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature");
env.execute("Temperature Consumer");
}

我如何计算上述例子的平均温度??

1 答案


0

据我所知,你需要自己写平均值函数。

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java

在你的情况下,你可能会

有点像

public class Avg implements WindowFunction<TemperatureEvent,  TemperatureEvent, Long, org.apache.flink.streaming.api.windowing.windows.Window> {

@Override
public void apply(Long key, Window window, Iterable<TemperatureEvent> values, Collector<TemperatureEvent> out) {
long sum = 0L;
int count = 0;
for (TemperatureEvent value : values) {
sum += value.getTemperature();
count ++;
}

TemperatureEvent result = values.iterator().next();
result.setTemperature(sum / count);
out.collect(result);

}
}

注:


我来回答

写文章

提问题

面试题