java- Kafka ->flink-性能问题


0

我在看一些 Kafka 主题,产生约3万条信息/秒。我有一个flink拓扑设置来读取其中一个,聚合一个位(5秒窗口),然后(最终)写入DB。

当我运行我的拓扑并删除除read->aggregate步骤以外的所有内容时,我每分钟只能收到~30K条消息。没有任何地方会发生背压。

我做错什么了?

编辑:

    我不能改变话题空间。每个主题都有一个单独的分区,并且有数百个分区。

看来我只能获得~1.5 MB/s。而不是接近所提到的100 MB/s。

当前代码路径:

DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);  
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
    private String mapId;
    public mapper2(String mapId) {
        this.mapId = mapId;
    }

@Override
public void flatMap(byte[] bytes, Collector&lt;Tuple4&lt;Long, Long, Integer, String&gt;&gt; collector) throws Exception {
    TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
    Tuple4 tuple4 = new Tuple4&lt;Long, Long, Integer, String&gt;();
    tuple4.f0 = timeData.getId();
    tuple4.f1 = timeData.getOtherId();
    tuple4.f2 = timeData.getSections().size();
    tuple4.f3 = mapId;

    collector.collect(tuple4);
}

}

2 答案


0

从代码中,我看到了两个可能导致性能问题的潜在组件:

    Flinkkafka消费者

为了理解瓶颈 where,我首先从 Kafka 的主题来衡量 Flink 读取的原始 读取性能。

因此,您可以在集群上运行以下代码吗?

public class RawKafka {

private static final Logger LOG = LoggerFactory.getLogger(RawKafka.class);

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream&lt;byte[]&gt; dataStream4 = env.addSource(new FlinkKafkaConsumer081&lt;&gt;("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);

dataStream4.flatMap(new FlatMapFunction&lt;byte[], Integer&gt;() {
    long received = 0;
    long logfreq = 50000;
    long lastLog = -1;
    long lastElements = 0;

    @Override
    public void flatMap(byte[] element, Collector&lt;Integer&gt; collector) throws Exception {
        received++;
        if (received % logfreq == 0) {
            // throughput over entire time
            long now = System.currentTimeMillis();

            // throughput for the last "logfreq" elements
            if(lastLog == -1) {
                // init (the first)
                lastLog = now;
                lastElements = received;
            } else {
                long timeDiff = now - lastLog;
                long elementDiff = received - lastElements;
                double ex = (1000/(double)timeDiff);
                LOG.info("During the last {} ms, we received {} elements. That's {} elements/second/core. GB received {}",
                        timeDiff, elementDiff, elementDiff*ex, (received * 2500) / 1024 / 1024 / 1024);
                // reinit
                lastLog = now;
                lastElements = received;
            }
        }
    }
});

env.execute("Raw kafka throughput");

}
}

这段代码测量从 Kafka 读取的50k个元素之间的时间,并记录从 Kafka 读取的元素数。

16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 86 ms, we received 30000 elements. That's 348837.20930232556 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 90 ms, we received 30000 elements. That's 333333.3333333333 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 91 ms, we received 30000 elements. That's 329670.3296703297 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0

我真的很想看看你读 Kafka 的时候达到了什么样的吞吐量。


0

我从来没有用过Flink或者它是 Kafka 苏美尔,但是我有在暴风雨中使用 Kafka 的经验。以下是我的一些想法。 Kafka 的速度决定有很多变数。这里有一些事情要考虑和调查,当你有问题的时候,再加上更多的细节。

    添加更多的分区会增加吞吐量。因此,添加更多的分区和使用者应该可以看到性能的线性提升。

可能有很多原因导致它的消费缓慢,我试图强调一些与 Kafka 有关的东西。我敢肯定,在Flink,你可以做一些加速消费的事情,但我不知道,因为我从来没有用过。


我来回答

写文章

提问题

面试题