kafka重复消费的问题

如题所述

第1个回答  2022-06-17
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

造成的问题:假如consumer.properties配置中max.poll.records=40 (一次最多拉取40条数据) session.timeout.ms=30000 (会话时间)

假设kafka此时一次拉取了40条数据,但在处理第31条的时候抛出了如上的异常,就会导致,本次offset不会提交,完了这40条消息都会在接下来的某刻被再次消费,这其中就包含了其实已经消费了的30条数据

原因:the poll loop is spending too much time message processing, the time between subsequent calls to poll() was longer than the configured session.timeout.ms,好吧其实是一个意思!

意思就是说poll下来数据后,处理这些数据的时间比 session.timeout.ms配置的时间要长,从而导致the group has already rebalanced

解决办法是最后一句话:You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

即要不增大 session.timeout.ms,要不减小max.poll.records ,至于具体配置为多少,得看你处理一条消息花费多长时间 x,需要满足 x乘以max.poll.records < session.timeout.ms

另一种解决思路:

解决此类重复消费的方式:将能够唯一标识消息的信息存储在其他系统,比如redis,什么能够唯一标识消息呢?就是consumergroup+topic+partition+offset,更准确的应该是consumergroup+" "+topic+" "+partition+"_"+offset组成的key,value可以是处理时间存放在redis中,每次处理kafka消息时先从redis中根据key获取value,如果value为空,则表明该消息是第一次被消费,不为空则表示时已经被消费过的消息;

参考:

相关了解……

你可能感兴趣的内容

本站内容来自于网友发表,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
相关事宜请发邮件给我们
© 非常风气网