为什么我的flink程序不加入两个流?


0

我想 root据id加入Customer和Address对象。

{"id": 1,"name": "Yogesh"}
{"id": 2,"name": "Swati" }
{"id": 3,"name": "Shruti"}
{"id": 4,"name": "Amol"  }
{"id": 5,"name": "Pooja" }
{"id": 6,"name": "Kiran" }

并跟随fro address

{"id": 1,"address":"Pune" }
{"id": 2,"address":"Pune" }
{"id": 3,"address":"Pune" }
{"id": 4,"address":"Kalyan"}
{"id": 5,"address": "Pimpri"}

我使用了interval join以及JoinFunction,使用TumblingEventTimeWindow和滑动窗口,但它没有 join客户和地址流。我不明白我在代码中遗漏了什么。

public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = setupEnvironment();
            final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
            env.getConfig().setGlobalJobParameters(params);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            init(params);

        FlinkKafkaConsumerBase<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(customerTopic,
                new SimpleStringSchema(), CommonConfig.getConsumerProperties(ip, port, customerTopic))
                        .setStartFromEarliest();

        DataStream<Customer> customerStream = env.addSource(flinkKafkaConsumer)
                .flatMap(new FlatMapFunction<String, Customer>() {
                    private static final long serialVersionUID = 2142214034515856836L;

                    @Override
                    public void flatMap(String value, Collector<Customer> out) throws Exception {
                        Customer customer = null;
                        try {
                            customer = mapper.readValue(value, Customer.class);
                        } catch (Exception exception) {
                            System.out.println(exception);
                        }
                        if (null != customer) {
                            out.collect(customer);
                        }
                    }
                });

        customerStream.print();

        DataStream<Address> addressStream = env
                .addSource(new FlinkKafkaConsumer<>(addressTopic, new SimpleStringSchema(),
                        CommonConfig.getConsumerProperties(ip, port, addressTopic)).setStartFromEarliest())
                .flatMap(new FlatMapFunction<String, Address>() {
                    private static final long serialVersionUID = 2142214034515856836L;

                    @Override
                    public void flatMap(String value, Collector<Address> out) throws Exception {
                        Address address = null;
                        try {
                            address = mapper.readValue(value, Address.class);
                        } catch (Exception exception) {
                            System.out.println(exception);
                        }
                        if (null != address) {
                            out.collect(address);
                        }
                    }
                });

        addressStream.print();

        customerStream.keyBy(new IdSelectorCustomer()).intervalJoin(addressStream.keyBy(new IdSelectorAddress()))
                .between(Time.seconds(-2), Time.seconds(1))
                .process(new ProcessJoinFunction<Customer, Address, CustomerInfo>() {
                    private static final long serialVersionUID = -3658796606815087434L;
                    @Override
                    public void processElement(Customer customer, Address address,
                            ProcessJoinFunction<Customer, Address, CustomerInfo>.Context ctx,
                            Collector<CustomerInfo> collector) throws Exception {
                        collector.collect(new CustomerInfo(customer.getId(), customer.getName(), address.getAddress()));
                    }
                }).print();

        DataStream<CustomerInfo> joinResultStream = customerStream.join(addressStream).where(new IdSelectorCustomer())
                .equalTo(new IdSelectorAddress()).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Customer, Address, CustomerInfo>() {
                    private static final long serialVersionUID = -8913244745978230585L;
                    @Override
                    public CustomerInfo join(Customer first, Address second) throws Exception {
                        return new CustomerInfo(first.getId(), first.getName(), second.getAddress());
                    }
                });
        joinResultStream.print();
        env.execute("Execute");

}

// ===============================================================================
public class IdSelectorAddress implements KeySelector<Address,Long> {
    private static final long serialVersionUID = 7642739595630647992L;
    @Override
    public Long getKey(Address value) throws Exception {
        return value.getId();
    }
}

// ========================================================================
public class IdSelectorCustomer implements KeySelector<Customer,Long> {
    private static final long serialVersionUID = 7642739595630647992L;
    @Override
    public Long getKey(Customer value) throws Exception {
        return value.getId();
    }
}

2 答案


0

我想你没有看到任何结果。原因是你的窗口从未被激活/ evaluated /关闭。

您的事件没有时间戳。还是你定的时间特征。事件时间. 由于您不指定时间戳和 watermark,所以Flink无法判断如何在窗口 join或间隔 join中 join事件。

使用DataStream#assignTimesSandWatermarks或FlinkKakConsumer#AssignTimesAndWatermarks处理事件时间或将时间特性更改为ProcessingTime。

我希望这将引导你朝着正确的方向前进。


0

由于您使用的是事件时间,因此必须在两个流上使用assignTimestamps和 watermark,以便Flink可以将事件分配给窗口,并知道窗口何时完成并可以触发。

要 读取有关此主题的更多信息,您可以从有关流分析的本教程开始,或者在文档中介绍如何实现时间戳提取器和 watermark分配器。


我来回答

写文章

提问题

面试题