为什么要将参数传递给运算符函数类is null of Flink的构造函数?


0

我在学习Flink,我想建立一个操作符函数来扩展ProcessWindowFunction,并用一个参数作为类的字段值重载一个新的构造函数,但是当这个类被实例化时,没有这个字段,我很困惑。代码如下。

import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.PutRecordsResult;
import io.github.streamingwithflink.chapter8.PoJoElecMeterSource;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class DataHubSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(10_000L);
env.setParallelism(2);

    RecordSchemaSer schema = new RecordSchemaSer();

    schema.addField(new Field("id", FieldType.STRING));            

    DataStream<PutRecordsResult> out = env
            .addSource(new PoJoElecMeterSource())
            .keyBy( r -> r.getId())
            .window(TumblingProcessingTimeWindows.of(Time.seconds(3))) 
            .process(new PutDatahubFunction<>(schema));  // PutDatahubFunction is my build a new Operator function class

    env.execute();
}

}

变量schema是我想发送给构造函数的一个参数,它是RecordSchemaSer类的一个实例

import com.aliyun.datahub.client.model.RecordSchema;
import java.io.Serializable;

public class RecordSchemaSer
extends RecordSchema
implements Serializable {

}

PutDatahubFunction是一个扩展ProcessWindowFunction的类,代码如下

import com.aliyun.datahub.client.model.*;

import io.github.streamingwithflink.chapter8.PUDAPoJo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

public class PutDatahubFunction<IN extends PUDAPoJo, KEY>
extends ProcessWindowFunction<IN, PutRecordsResult, KEY, TimeWindow> {

private DataHubBase dataHubHandler;
private List&lt;RecordEntry&gt; recordEntries;
private RecordSchema schema;

public PutDatahubFunction(RecordSchema schema) {

    this.schema = schema;
    System.out.println("field 'id' not exist ? " + this.schema.containsField("id"));  // it's true
}

@Override
public void open(Configuration parameters) throws Exception {
    .........
}

@Override
public void process(KEY KEY,
                    Context context,
                    Iterable&lt;IN&gt; elements,
                    Collector&lt;PutRecordsResult&gt; out)
        throws Exception {

    RecordEntry entry = new RecordEntry();

    for (IN e : elements) {
        System.out.println("field 'id' not exist ? " + this.schema.containsField("id")); // it's false
        ......
    }

}

}

第一个系统输出在构造函数中,this.schema.containsField(“id”)是真的,但是第二个系统输出过程中方法,this.schema.containsField(“id”)为false!为什么?我有系统输出实例的两个类名,均为PutDatahubFunction。

使用ValueState不 job ,因为构造函数没有调用getRuntimeContext(),否则线程“main”中出现异常java.lang.IllegalStateException:运行时上下文尚未初始化。代码如下:

private ValueState<RecordSchema>  schema;


public PutTupleDatahubFunction(RecordSchema schema) throws IOException {
ValueStateDescriptor schemaDes =
new ValueStateDescriptor("datahub schema", TypeInformation.of(RecordSchema.class));
/*
* error Exception in thread "main" java.lang.IllegalStateException:
* The runtime context has not been initialized.
*/
this.schema = getRuntimeContext().getState(schemaDes);
this.schema.update(schema);
}

我很模糊,谁能告诉我原因,有没有办法把参数传递给这个运算符函数类的构造函数?谢谢。

1 答案


0

我终于找到了原因,原因是序列化和反序列化。我没有编码RecordSchemaSer原因是序列化内容,由于空

public class RecordSchemaSer
        extends RecordSchema
        implements Serializable
{


}


我来回答

写文章

提问题

面试题