Flink总结之一文彻底搞懂处理函数

处理函数是Flink底层的函数,工作中通常用来做一些更复杂的业务处理,这次把Flink的处理函数做一次总结,处理函数分好几种,主要包括基本处理函数,keyed处理函数,window处理函数,通过源码说明和案例代码进行测试 。

Flink总结之一文彻底搞懂处理函数

文章插图
 
处理函数就是位于底层API里,熟悉处理函数能够更好的处理Flink流处理 。
Flink官方文档:https://nightlies.Apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/process_function/
1. 基本处理函数(ProcessFunction)首先我们看ProcessFunction的源码,ProcessFunction是一个抽象类,继承了AbstractRichFunction类,那么处理函数就拥有了富函数的所有特性 。
1. 拥有的方法如下processElement:编写我们的处理逻辑,每个数据到来都会走这个函数,有三个参数,第一个参数是输入值类型,第二个参数是上下文Context,第三个参数是收集器(输出) 。
onTimer:定时器,通过TimerService 进行注册,当定时时间到达的时候就会执行onTimer函数 。只有在KeyedStream中才可以使用 。
2. 拥有的抽象类Context:上下文抽象类,在这个类中可以获取到当前时间戳,以及时间服务timerService,可用来注册定时器和查询时间 。
3. 源码//I:输入类型//O:收集输出类型public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;public ProcessFunction() {}public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;// 定时器触发时执行的逻辑 当前不支持public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {}public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {public OnTimerContext() {super();}public abstract TimeDomain timeDomain();}public abstract class Context {public Context() {}public abstract Long timestamp();public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> var1, X var2);}}3. 测试代码使用linux的nc服务进行端口监听,并向9999端口发送数据,然后我们通过Flink监听9999端口,并获取数据进行数据处理 。
安装nc组件:
sudo yum install nc -y
开启9999端口:
nc -lk 9999
Flink总结之一文彻底搞懂处理函数

文章插图
 
代码如下:
/*** 处理函数测试*/public class ProcessFunctionTest {public static void main(String[] args) throws Exception {// todo 构建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// todo 监听hadoop110服务器9999端口,获取输入流DataStreamSource<String> streamSource = env.socketTextStream("hadoop110", 9999);// todo 封装输入流,将数据处理成{"userName":"aa","time",xxxxx}这个结构SingleOutputStreamOperator<JSONObject> mapStream = streamSource.map(t -> {JSONObject jsonObject = new JSONObject();jsonObject.put("userName",t);jsonObject.put("time",System.currentTimeMillis());return jsonObject;});// TODO 调用处理函数mapStream.process(new MyProcessFunction()).print("调用处理函数接收到的数据:");env.execute();}}// 自定义处理函数class MyProcessFunction extends ProcessFunction<JSONObject,String>{@Overridepublic void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {System.out.println("processElement方法接收到的用户数据:"+ jsonObject.getString("userName"));collector.collect(jsonObject.getString("userName")+"-----");}}
Flink总结之一文彻底搞懂处理函数

文章插图
 
2. 按键分区处理函数(KeyedProcessFunction)按键分区处理函数是重点,用在keyby后面,对keyedStream进行处理,keyby将会按照Key进行分区,然后将不同key的数据分配到不同并行子任务上进行执行 。
KeyedProcessFunction可以使用定时器和定时服务,代码中使用定时器和定时服务查看数据和完成定时任务 。
KeyedProcessFunction:处理分区数据,每个元素执行一次processElement方法
1. KeyedProcessFunction源码【Flink总结之一文彻底搞懂处理函数】public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;public KeyedProcessFunction() {}// 处理方法,每个数据都会走这个方法public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;// 定时器逻辑,定时器触发时会走这个方法的逻辑public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {}public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {public OnTimerContext() {super();}public abstract TimeDomain timeDomain();public abstract K getCurrentKey();}public abstract class Context {public Context() {}public abstract Long timestamp();public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> var1, X var2);public abstract K getCurrentKey();}}


推荐阅读