ApacheFlink0.10如何从一个无界的输入数据流中获得复合密钥的第一次出现?


0

我是 Apache· Flink的新手。我的输入中有一个未绑定的数据流(通过kakfa输入到flink0.10)。

我想得到每个主 key的第一次出现(主 key是contract num和event dt)。

这是我的输入数据:

contract_num, event_dt, attr 
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C

以下是我想要的输出数据:

A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C

注:第2行已删除,因为第1行中已出现A001和“2016-02-24 10:25:08”的 key组合。

我怎么能用flink 0.10做到这一点?

我正在考虑使用keyBy(0,1),但之后我不知道该怎么办!

(我用乔达的时间组织FlinkSpitter设置这些 test)。

@Test
public void test() {
    DateTime threeSecondsAgo = (new DateTime()).minusSeconds(3);
    DateTime twoSecondsAgo = (new DateTime()).minusSeconds(2);
    DateTime oneSecondsAgo = (new DateTime()).minusSeconds(2);

DataStream<Tuple3<String, Date, String>> testStream =
        createTimedTestStreamWith(
                Tuple3.of("A1", threeSecondsAgo.toDate(), "X"))
        .emit(Tuple3.of("A1", threeSecondsAgo.toDate(), "Y"), after(0, TimeUnit.NANOSECONDS))
        .emit(Tuple3.of("A1", twoSecondsAgo.toDate(), "Z"), after(0, TimeUnit.NANOSECONDS))
        .emit(Tuple3.of("A2", oneSecondsAgo.toDate(), "C"), after(0, TimeUnit.NANOSECONDS))
        .close();

testStream.keyBy(0,1);

}

2 答案


0

如果密钥空间大于可用存储空间,则在无限流上过滤重复项最终将失败。原因是您必须将已经看到的密钥存储在某个位置以过滤掉重复的密钥。因此,最好定义一个时间窗口,在这个时间窗口之后,您可以清除当前看到的一组关 key点。

如果您知道这个问题,但还是想尝试一下,那么可以在keyBy调用之后应用有状态的flatMap操作。有状态 map器使用Flink的状态抽象来存储它是否已经看到具有这个 key的元素。这样,您还将受益于Flink的容错机制,因为您的状态将被自动检查。

一个Flink程序做你的 job

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple3<String, Date, String>> input = env.fromElements(Tuple3.of("foo", new Date(1000), "bar"), Tuple3.of("foo", new Date(1000), "foobar"));

input.keyBy(0, 1).flatMap(new DuplicateFilter()).print();

env.execute("Test");

}

其中DuplicateFilter的实现取决于Flink的版本。

版本>=1.0实施

public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {

static final ValueStateDescriptor&lt;Boolean&gt; descriptor = new ValueStateDescriptor&lt;&gt;("seen", Boolean.class, false);
private ValueState&lt;Boolean&gt; operatorState;

@Override
public void open(Configuration configuration) {
    operatorState = this.getRuntimeContext().getState(descriptor);
}

@Override
public void flatMap(Tuple3&lt;String, Date, String&gt; value, Collector&lt;Tuple3&lt;String, Date, String&gt;&gt; out) throws Exception {
    if (!operatorState.value()) {
        // we haven't seen the element yet
        out.collect(value);
        // set operator state to true so that we don't emit elements with this key again
        operatorState.update(true);
    }
}

}

版本0.10实现

public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {

private OperatorState&lt;Boolean&gt; operatorState;

@Override
public void open(Configuration configuration) {
    operatorState = this.getRuntimeContext().getKeyValueState("seen", Boolean.class, false);
}

@Override
public void flatMap(Tuple3&lt;String, Date, String&gt; value, Collector&lt;Tuple3&lt;String, Date, String&gt;&gt; out) throws Exception {
    if (!operatorState.value()) {
        // we haven't seen the element yet
        out.collect(value);
        operatorState.update(true);
    }
}

}

更新:使用翻滚时间窗口

input.keyBy(0, 1).timeWindow(Time.seconds(1)).apply(new WindowFunction<Iterable<Tuple3<String,Date,String>>, Tuple3<String, Date, String>, Tuple, TimeWindow>() {
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Date, String>> input, Collector<Tuple3<String, Date, String>> out) throws Exception {
        out.collect(input.iterator().next());
    }
})

0

这是我刚刚写的另一种方法。它的缺点是它是一个更多的自定义代码,因为它不使用内置的Flink窗口函数,但它没有延迟惩罚,直到提到。GitHub上的完整示例。

package com.dataartisans.filters;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;

import java.io.Serializable;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;

/**

  • This class filters duplicates that occur within a configurable time of each other in a data stream.

  • /
    public class DedupeFilterFunction<T, K extends Serializable> extends RichFilterFunction<T> implements CheckpointedAsynchronously<HashSet<K>> {

    private LoadingCache<K, Boolean> dedupeCache;
    private final KeySelector<T, K> keySelector;
    private final long cacheExpirationTimeMs;

    /**

    • @param cacheExpirationTimeMs The expiration time for elements in the cache
    • /
      public DedupeFilterFunction(KeySelector<T, K> keySelector, long cacheExpirationTimeMs){
      this.keySelector = keySelector;
      this.cacheExpirationTimeMs = cacheExpirationTimeMs;
      }

    @Override
    public void open(Configuration parameters) throws Exception {
    createDedupeCache();
    }

@Override
public boolean filter(T value) throws Exception {
K key = keySelector.getKey(value);
boolean seen = dedupeCache.get(key);
if (!seen) {
dedupeCache.put(key, true);
return true;
} else {
return false;
}
}

@Override
public HashSet<K> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return new HashSet<>(dedupeCache.asMap().keySet());
}

@Override
public void restoreState(HashSet<K> state) throws Exception {
createDedupeCache();
for (K key : state) {
dedupeCache.put(key, true);
}
}

private void createDedupeCache() {
dedupeCache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpirationTimeMs, TimeUnit.MILLISECONDS)
.build(new CacheLoader<K, Boolean>() {
@Override
public Boolean load(K k) throws Exception {
return false;
}
});
}
}


我来回答

写文章

提问题

面试题