将消费后的数据进行分类,编写业务SQL语句,将消费的数据作为日志记录,发送到Hive表进行存储,这样Kafka中的实时数据就存储到Hive了,方便使用Hive来对Kafka数据进行即席分析 。
2.2.4 避坑技巧
使用这种方式在处理的过程中,如果配置使用的是EventTime,在程序中配置'sink.partition-commit.trigger'='partition-time',最后会出现无法提交分区的情况 。经过对源代码PartitionTimeCommitTigger的分析,找到了出现这种异常情况的原因 。
我们可以通过看
org.Apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitionsorg.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions
中的一个函数,来说明具体的问题,部分源代码片段如下:
// PartitionTimeCommitTigger源代码函数代码片段@Overridepublic List<String> committablePartitions(long checkpointId) { if (!watermarks.containsKey(checkpointId)) {throw new IllegalArgumentException(String.format("Checkpoint(%d) has not been snapshot. The watermark information is: %s.",checkpointId, watermarks)); } long watermark = watermarks.get(checkpointId); watermarks.headMap(checkpointId, true).clear();List<String> needCommit = new ArrayList<>(); Iterator<String> iter = pendingPartitions.iterator(); while (iter.hasNext()) {String partition = iter.next();// 通过分区的值来获取分区的时间LocalDateTime partTime = extractor.extract(partitionKeys, extractPartitionValues(new Path(partition)));// 判断水印是否大于分区创建时间+延迟时间if (watermark > toMills(partTime) + commitDelay) {needCommit.add(partition);iter.remove();} } return needCommit;}通过分析上述代码片段,我们可以知道系统通过分区值来抽取相应的分区来创建时间,然后进行比对,比如我们设置的时间 pattern 是 '$dt $h:$m:00' , 某一时刻我们正在往 /2022-02-26/18/20/ 这个分区下写数据,那么程序根据分区值,得到的 pattern 将会是2022-02-26 18:20:00,这个值在SQL中是根据 DATA_FORMAT 函数获取的 。
而这个值是带有时区的,比如我们的时区设置为东八区,2022-02-26 18:20:00这个时间是东八区的时间,换成标准 UTC 时间是减去8个小时,也就是2022-02-26 10:20:00,而在源代码中的 toMills 函数在处理这个东八区的时间时,并没有对时区进行处理,把这个其实应该是东八区的时间当做了 UTC 时间来处理,这样计算出来的值就比实际值大8小时,导致一直没有触发分区的提交 。
如果我们在数据源中构造的分区是 UTC 时间,也就是不带分区的时间,那么这个逻辑就是没有问题的,但是这样又不符合我们的实际情况,比如对于分区2022-02-26 18:20:00,我希望我的分区肯定是东八区的时间,而不是比东八区小8个小时的UTC时间2022-02-26 10:20:00 。
在明白了原因之后,我们就可以针对上述异常情况进行优化我们的实现方案,比如自定义一个分区类、或者修改缺省的时间分区类 。比如,我们使用TimeZ.NETableFunction类来实现一个自定义时区,部分参考代码片段如下:
public class CustomTimeZoneTableFunction implements TimeZoneTableFunction {private transient DateTimeFormatter formatter;private String timeZoneId;public CustomTimeZoneTableFunction(String timeZoneId) {this.timeZoneId = timeZoneId;}@Overridepublic void open(FunctionContext context) throws Exception {// 初始化 DateTimeFormatter 对象formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:00");formatter = formatter.withZone(ZoneId.of(timeZoneId));}@Overridepublic void eval(Long timestamp, Collector<TimestampWithTimeZone> out) {// 将时间戳转换为 LocalDateTime 对象LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);// 将 LocalDateTime 对象转换为指定时区下的 LocalDateTime 对象LocalDateTime targetDateTime = localDateTime.atZone(ZoneId.of(timeZoneId)).toLocalDateTime();// 将 LocalDateTime 对象转换为 TimestampWithTimeZone 对象,并输出到下游out.collect(TimestampWithTimeZone.fromLocalDateTime(targetDateTime, ZoneId.of(timeZoneId)));}}2.3 方案二:Flink DataStream写Hive在一些特殊的场景下,Flink SQL如果无法实现我们复杂的业务需求,那么我们可以考虑使用Flink DataStream写Hive这种实现方案 。比如如下业务场景,现在需要实现这样一个业务需求,内容方将实时数据写入到Kafka消息队列中,然后由数据侧通过Flink任务消费内容方提供的数据源,接着对消费的数据进行分流处理(这里的步骤和Flink SQL写Hive的步骤类似),每分钟进行存储到HDFS(MapReduce任务需要计算和重跑HDFS数据),然后通过MapReduce任务将HDFS上的这些日志数据生成Hive所需要格式,最后将这些Hive格式数据文件加载到Hive表中 。实现Kafka数据到Hive的即席分析功能,具体实现流程细节如下图所示:
推荐阅读
- 用.NET爬虫轻松获取招标网站数据
- 量化交易—Python基础语法与数据结构
- 数据结构与算法 --- “哨兵”思想
- |剑宗,重剑轻剑皆有神器,数据远超全职业,成为版本热门选择!
- |胡胖单飞后,“青春老男孩”数据下滑明显,团队分裂的真相是什么
- 苹果|销毁核心数据,裁员700人!摆了9年架子后,260亿巨头扛不住了
- |郭德纲严重警告!曹云金用数据强势反击:30万人在线,6400万点赞!
- 5个等级的数据分析,哪个最深入?
- 求职|在美国,大数据助力大学生职业选择
- Apache Doris 极速数据湖分析技术细节公开!
