2. 测试代码public class ProcessWindowsFunctionTest {public static void main(String[] args) throws Exception {// todo 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// todo 处理原始数据并增加时间戳SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {Thread.sleep(5000);JSONObject jsonObject = new JSONObject();jsonObject.put("userName", t);jsonObject.put("time", System.currentTimeMillis()-10000);return jsonObject;});// todo sourceStream设置水位线,指定事件时间字段sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {@Overridepublic long extractTimestamp(JSONObject jsonObject, long l) {return jsonObject.getLong("time");}}));// todo 按照用户名分组// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);sourceStream.keyBy(data -> data.getString("userName")).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new MyProcessWindowsFunction()).print();env.execute();}}class MyProcessWindowsFunction extends ProcessWindowFunction<JSONObject, HashMap<String,Long>, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<JSONObject> iterable, Collector<HashMap<String,Long>> collector) throws Exception {// 创建map对象HashMap<String, Long> result = new HashMap<>();// 遍历窗口中的数据for (JSONObject jsonObject : iterable) {String userName = jsonObject.getString("userName");if(result.containsKey(userName)){Long aLong = result.get(userName);result.put(userName,aLong+1);}else {result.put(userName,1l);}}collector.collect(result);}}4. 全窗口处理函数(ProcessAllWindowFunction)ProcessAllWindowFunction和ProcessFunction类相似,都是用来对上游过来的元素做处理,不过ProcessFunction是每个元素执行一次processElement方法,ProcessAllWindowFunction是每个窗口执行一次process方法(方法内可以遍历该窗口内的所有元素);
1. ProcessAllWindowFunction源码//IN: input,数据流中窗口任务的输入数据类型 。//OUT: output,窗口任务进行计算之后的输出数据类型 。//W:窗口的类型,是 Window 的子类型 。一般情况下我们定义时间窗口,W就是 TimeWindow 。public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {private static final long serialVersionUID = 1L;public ProcessAllWindowFunction() {}// 处理方法逻辑public abstract void process(ProcessAllWindowFunction<IN, OUT, W>.Context var1, Iterable<IN> var2, Collector<OUT> var3) throws Exception;// 清除窗口内数据public void clear(ProcessAllWindowFunction<IN, OUT, W>.Context context) throws Exception {}// 窗口上下文信息public abstract class Context {public Context() {}public abstract W window();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract <X> void output(OutputTag<X> var1, X var2);}}2. 测试代码public class ProcessAllWindowFunctionTest {public static void main(String[] args) throws Exception {// todo 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// todo 处理原始数据并增加时间戳SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {JSONObject jsonObject = new JSONObject();jsonObject.put("userName", t);jsonObject.put("time", System.currentTimeMillis()-10000);return jsonObject;});// todo sourceStream设置水位线,指定事件时间字段sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {@Overridepublic long extractTimestamp(JSONObject jsonObject, long l) {return jsonObject.getLong("time");}}));// todo 按照用户名分组// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);sourceStream.keyBy(data -> data.getString("userName")).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new MyProcessAllWindowFunction()).print();env.execute();}}class MyProcessAllWindowFunction extends ProcessAllWindowFunction<JSONObject, HashMap<String,Long>, TimeWindow>{@Overridepublic void process(Context context, Iterable<JSONObject> iterable, Collector<HashMap<String, Long>> collector) throws Exception {// 创建map对象HashMap<String, Long> result = new HashMap<>();// 遍历窗口中的数据for (JSONObject jsonObject : iterable) {String userName = jsonObject.getString("userName");if(result.containsKey(userName)){Long aLong = result.get(userName);result.put(userName,aLong+1);}else {result.put(userName,1l);}}collector.collect(result);}}
推荐阅读
- 生成式AI时代的著作权之困
- 一个更好的视频码头
- 软件架构设计:B/S层次架构之MVC/MVP/MVVM
- 玉雕|玉之俏色,令人心醉!
- 5G、6G之间还隔着5.5G?听听华为专家怎么说
- |北洋造34年中上百万的和普通版有何区别之处
- 王昭君|网红王昭君减肥逆袭之路,一句“我要渣男后悔”,一年减掉306斤
- 鬼吹灯之南海归墟|即将播出!《鬼吹灯之南海归墟》传出新消息,粉丝却喜忧参半
- 人生之路|高中一毕业就参加NBA选秀?美媒列出10名最伟大的高中生球员
- TVB|TVB前知名女星罕见露面,在国外低调结婚,错爱有妇之夫致抑郁
