Flink总结之一文彻底搞懂处理函数( 四 )

5. 合并流处理函数(CoProcessFunction)对于连接流ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用 。我们把这种接口叫作“协同处理函数”(co-process function) 。与 CoMapFunction 类似,如果是调用.flatMap()就需要传入一个 CoFlatMapFunction,需要实现 flatMap1()、flatMap2()两个方法;而调用.process()时,传入的则是一个 CoProcessFunction 。
1. 源码public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {private static final long serialVersionUID = 1L;public CoProcessFunction() {}// 第一条流处理方法public abstract void processElement1(IN1 var1, CoProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;// 第二条流处理方法public abstract void processElement2(IN2 var1, CoProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;public void onTimer(long timestamp, CoProcessFunction<IN1, IN2, OUT>.OnTimerContext ctx, Collector<OUT> out) throws Exception {}public abstract class OnTimerContext extends CoProcessFunction<IN1, IN2, OUT>.Context {public OnTimerContext() {super();}public abstract TimeDomain timeDomain();}public abstract class Context {public Context() {}public abstract Long timestamp();public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> var1, X var2);}}6. 连接流处理函数(ProcessJoinFunction)ProcessJoinFunction和CoProcessFunction类似,但是有区别 。
ProcessJoinFunction 用于join流操作,可以拿到两个流数据处理
CoProcessFunction 用于连接流处理,两个流数据分别处理
1. 源码//IN1:第一条流输入类型//IN2:第二条流处理类型//OUT:输出类型public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {private static final long serialVersionUID = -2444626938039012398L;public ProcessJoinFunction() {}// 流join后处理逻辑,可以获取到两个流的数据public abstract void processElement(IN1 var1, IN2 var2, ProcessJoinFunction<IN1, IN2, OUT>.Context var3, Collector<OUT> var4) throws Exception;public abstract class Context {public Context() {}public abstract long getLeftTimestamp();public abstract long getRightTimestamp();public abstract long getTimestamp();public abstract <X> void output(OutputTag<X> var1, X var2);}}7. 广播流处理函数(BroadcastProcessFunction)广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入 。这里的“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物 。
1. 源码// IN1:输入的第一条流// IN2:输入的第二条流// OUT:输出类型public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {private static final long serialVersionUID = 8352559162119034453L;public BroadcastProcessFunction() {}// 处理流逻辑// IN1:输入流数据// ReadOnlyContext:只读上下文// Collector<OUT>:输出public abstract void processElement(IN1 var1, BroadcastProcessFunction<IN1, IN2, OUT>.ReadOnlyContext var2, Collector<OUT> var3) throws Exception;// 处理广播流逻辑public abstract void processBroadcastElement(IN2 var1, BroadcastProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;// 只读的上下文public abstract class ReadOnlyContext extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.ReadOnlyContext {public ReadOnlyContext() {super(BroadcastProcessFunction.this);}}public abstract class Context extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.Context {public Context() {super(BroadcastProcessFunction.this);}}}8. 按键分区的广播连接流处理函数(KeyedBroadcastProcessFunction)按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入 。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物 。
1.源码// KS:当调用keyBy时所依赖的Key 的类型// IN1:第一条流类型// IN2:广播流类型// OUT:输出类型public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {private static final long serialVersionUID = -2584726797564976453L;public KeyedBroadcastProcessFunction() {}// 第一条流处理逻辑public abstract void processElement(IN1 var1, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.ReadOnlyContext var2, Collector<OUT> var3) throws Exception;// 广播流处理逻辑public abstract void processBroadcastElement(IN2 var1, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;// 定时器出发后执行的逻辑public void onTimer(long timestamp, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext ctx, Collector<OUT> out) throws Exception {}public abstract class OnTimerContext extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.ReadOnlyContext {public OnTimerContext() {super();}public abstract TimeDomain timeDomain();public abstract KS getCurrentKey();}public abstract class ReadOnlyContext extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.ReadOnlyContext {public ReadOnlyContext() {super(KeyedBroadcastProcessFunction.this);}public abstract TimerService timerService();public abstract KS getCurrentKey();}public abstract class Context extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.Context {public Context() {super(KeyedBroadcastProcessFunction.this);}public abstract <VS, S extends State> void ApplyToKeyedState(StateDescriptor<S, VS> var1, KeyedStateFunction<KS, S> var2) throws Exception;}}


推荐阅读