微信号:java_daren

介绍:精通java技术;具备互联网思维,进可创业,退可求职谋生,本号正是为了召集和培养这样的达人.

一段解决kafka消息处理异常的经典对话

2018-04-21 10:41 java达人


对kafka不了解的童鞋可以先看看Kafka漫游记


有一天,卡尔维护的购买系统发生了一个奇怪的异常,从日志里看到,购买后的任务处理竟然先于购买任务执行了。“不可能啊,按照代码的顺序,一定是先执行购买流程,再发送消息到kafka,最后消费端接收到消息后执行购买后的一些善后任务。从A到B到C,顺序清清楚楚。” 于是,他请教了马克,马克眯着眼睛细看了一会,道:"问题是不是出在这段@Transaction注解上?"


伪代码:

@Transaction
public void buy(){
//购买业务逻辑
user.buy();
//发送消息
kafkaTemplete.sendMdg(); } buy();

马克说道:“这kafka消息鬼的很,它没准在事务提交之前就发送出去了,而消费者在fetch消息执行业务流程的时候这段事务仍然没有提交,这就导致了数据上的乱序,看上去就像购买后任务先于购买任务执行。”


“那该怎么改呢?把kafkaTemplete.sendMdg()这段移出方法,等事务提交了再发送消息?但我把消息发送这步写在事务注解的方法内部,就是为了在消息发送失败的时候能够实现回滚。如果移出来,而消息发送的时候失败,那怎么办?” 卡尔问道。


“可以考虑使用本地消息表。” 马克说着在白板上写下了一段伪代码:


producer:

@Transaction
public void buy(){ user.buy();
//新建一张msgtable本地消息表,消息在这里插入
insertInitMsgToDB(); }

public void sendMsg(){
//发送消息移到了新方法里
kafkaTemplete.sendMdg(); }

//主流程执行 buy(); sendMsg();


consumer:

@Kafkalistener
@Transaction
public void msgConsume(Record record){    
  if(isConsumed(record)){      
     //查询本地消息表,已经消费过的则不处理      return;    }
   //处理业务逻辑    deal(record);
   // 更改本地消息表消息状态为成功    changeRecord(record); }


"主要是通过时效性高的MQ,自动触发事件;万一消息发送失败,也可以通过数据表的消息记录轮询来保证。不过关系数据库的吞吐量和性能存在瓶颈,频繁的读写消息会给数据库造成压力,考虑当前场景,稳定性要求较高,而并发量还没有上来。可以考虑这种方法。” 马克道。


卡尔改完后,测试发布,之后再也没出现乱序了,但消息有时会莫名地丢失或者重复消费,卡尔不得不经常查看线上日志,手工修复一些数据问题,费时费力,只能在晚上加班加点开发新的业务。


马克也一直在跟踪这个问题,有一天,他有了发现,走过来对卡尔说道:“我研究了一些kafka的机制,问题可能是我们kafka中的配置enable.auto.commit 是 true的缘故?”


卡尔道:“是不是自动提交会带来一些不可控因素?”


马克道:“对,当我们的配置是自动提交的时候,消费者的消息投递保证有可能是at least once,或者at most once。当到达提交时间间隔,触发Kafka自动提交上次的偏移量时,就可能发生at most once的情况, 在这段时间,如果消费者还没完成消息的处理进程就崩溃了, 消费者进程重新启动时,它开始接收上次提交的偏移量之后的消息,实际上消费者可能会丢失几条消息;而当消费者处理完消息并将消息提交到持久化存储系统,而消费者进程崩溃时,会发生at least once的情况。 在此期间,kafka没有向broker提交offset,因为自动提交时间间隔没有过去。 当消费者进程重新启动时,会收到从上次提交的偏移量开始的一些旧消息。”


“正是这个导致消息丢失或者重复消费现象,那你想怎么改呢?” 卡尔道。


马克继续道:“不仅如此,即使消费者进程没有崩溃,假如中间有一个消息的业务逻辑执行抛出了异常,消费者也当作是接收到了消息,程序执行回滚,这条消息也等同于丢失了。我关闭了自动提交(enable.auto.commit:false),当消费者每次 poll 处理完业务逻辑后必须完成手动同步提交(commitSync),如果消费者在消费过程中发生 crash,或者执行业务逻辑发生异常回滚,下次启动时依然会从之前的位置开始消费,从而保证每次提交的内容都能被消费,即实现了at least once保证。”


“这防止了消息丢失,但消息重复问题该怎么解决?”


“先别急,另外需要注意的是,这只是对消费者的配置,为了使消息在发送时不丢失,我们对生产者也要做相应的配置优化。即配置 request.required.acks 参数。” 马克说着,在纸上列出了一张表格:


1(默认)

leader 已成功收到的数据并得到确认后发送下一条 message。如果 leader 宕机,则会丢失数据。

0

送端无需等待来自 broker 的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。

-1(ALL)

发送端需要等待 ISR 列表中所有列表都确认接收数据后才算一次发送完成,可靠性最高。



“至于你说的消费重复问题,主要解决思路是在服务层实现幂等性。让接收端支持消息去重的功能。比如在上面的伪代码中,record中放一个唯一键字段,消费时根据唯一键查询这条消息,判断是否消费过。也可以通过redis缓存来实现类似的机制。”


卡尔道:“真是这样子的吗?”


“尽信书不如无书,尤其是技术,是需要经过长时间的时间检验的,你对此有所怀疑的话可以在本地开发环境优化试试看。” 马克道。


更多精彩:

本地消息表只是分布式事务处理的一个经典方法,要想了解更全面具体的方案,请关注知识星球,获取支付宝大神的方案杰作,另外有重要开源框架推荐。



java达人

ID:drjava

(长按识别)

  





 
java达人 更多文章 程序员技术变现之路 51%攻击解析 阿里面试题及相关参考链接(修订版) 分分钟了解区块链和挖矿 以太坊:比特币+一切可能
猜您喜欢 快速定位不合理的索引——MySQL索引调优(一) 【推荐】在R中无缝集成Github云端代码托管 进击的功能测试-探索性测试 产品经理的学习之道 Reactive-Streams API(二):Single,SingleDelayed 和 RangeSubscription