2) public void onPartitionsAssigned( Collection< TopicPartition> partitions)方法会在重新分配分区之后和消费者开始读取消息之前被调用 。
3.8.2.从特定偏移量处开始记录到目前为止,我们知道了如何使用 poll()方法从各个分区的最新偏移量处开始处理消息 。不过,有时候我们也需要从特定的偏移量处开始读取消息 。
如果想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息,可以使 seekToBeginning(Collectiontp)和seekToEnd( Collectiontp)这两个方法 。
不过,Kaka 也为我们提供了用于查找特定偏移量的 API 。它有很多用途,比如向后回退几个消息或者向前跳过几个消息(对时间比较敏感的应用程序在处理滞后的情况下希望能够向前跳过若干个消息) 。在使用 Kafka 以外的系统来存储偏移量时,它将给我们带来更大的惊喜--让消息的业务处理和偏移量的提
交变得一致 。试想一下这样的场景:应用程序从 Kaka 读取事件(可能是网站的用户点击事件流),对它们进行处理(可能是使用自动程序清理点击操作并添加会话信息),然后把结果保存到数据库 。假设我们真的不想丢失任何数据,也不想在数据库里多次保存相同的结果 。
我们可能会,毎处理一条记录就提交一次偏移量 。尽管如此,在记录被保存到数据库之后以及偏移量被提交之前,应用程序仍然有可能发生崩溃,导致重复处理数据,数据库里就会出现重复记录 。
如果保存记录和偏移量可以在一个原子操作里完成,就可以避免出现上述情况 。记录和偏移量要么都被成功提交,要么都不提交 。如果记录是保存在数据库里而偏移量是提交到Kafka上,那么就无法实现原子操作不过,如果在同一个事务里把记录和偏移量都写到数据库里会怎样呢?那么我们就会知道记录和偏移量要么都成功提交,要么都没有,然后重新处理记录 。
现在的问题是:如果偏移量是保存在数据库里而不是 Kafka 里,那么消费者在得到新分区时怎么知道该从哪里开始读取?这个时候可以使用 seek()方法 。在消费者启动或分配到新分区时,可以使用 seck()方法查找保存在数据库里的偏移量 。我们可以使用使用 Consumer Rebalancelistener 和 seek()方法确保我们是从数据库里保存的偏移量所指定的位置开始处理消息的 。
3.9.优雅退出如果确定要退出循环,需要通过另一个线程调用 consumer. wakeup()方法 。如果循环运行在主线程里,可以在 ShutdownHook 里调用该方法 。要记住,consumer. wakeup()是消费者唯一一个可以从其他线程里安全调用的方法 。调用 consumer. wakeup()可以退出 poll(),并抛出 WakeupException 异常 。我们不需要处理 Wakeup Exception,因为它只是用于跳出循环的一种方式 。不过,在退出线程之前调用 consumer.close()是很有必要的,它会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时 。
3.10.反序列化不过就是序列化过程的一个反向,原理和实现可以参考生产者端的实现,同样也可以自定义反序列化器 。
3.11.独立消费者到目前为止,我们讨论了消费者群组,分区被自动分配给群组里的消费者,在群组里新增或移除消费者时自动触发再均衡 。不过有时候可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据 。这个时候就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量 。
如果是这样的话,就不需要订阅主题,取而代之的是为自己分配分区 。一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情 。
独立消费者相当于自己来分配分区,但是这样做的好处是自己控制,但是就没有动态的支持了,包括加入消费者(分区再均衡之类的),新增分区,这些都需要代码中去解决,所以一般情况下不推荐使用 。
推荐阅读
- 程序员遭遇:一觉睡来7个未接电话,到公司时发现已被踢出群
- 程序员为什么一定要进大公司,除了薪资,这些才是决定性因素
- 某程序员跳槽到银行9天后辞职,晒出技术水平后留言:太落后了
- 成都与盖碗茶,饮盖碗茶有五道程序先容盖碗茶品饮程序
- 高级程序员到底长什么样子?
- 第一个登月的地球人是谁?
- 程序员工作必备:10个超实用的GitHub库
- 应用程序加固Tomcat篇
- 公众号小程序有什么用?
- 程序员用Python实现自动化控制键盘和鼠标
