Kafka~消息系列问题解决:重复消费问题、消费重试机制、消息积压问题

重复消费问题

kafka 出现消息重复消费的原因:

  • 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
  • Kafka 侧由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。

解决方案:

  1. 消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效
  2. 将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。

那么这里会有个问题:什么时候提交 offset 合适?

  • 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样。
  • 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务扫离线数据检查。

在Kafka中,有三种常见的消息传递语义:At-least-once、At-most-once、Exactly-once。

其中At-least-once和Exactly-once是最常用的。

  • At-least-once消费语义

At-least-once消费语义意味着消费者至少消费一次消息,但可能会重复消费同一消息。在At-least-once语义中,当消费者从Kafka服务器读取消息时,消息的偏移量会被认己录下来。一旦消息被成功处理,消费者会将位移提交回Kafka服务器。如果消息处理失败,消费者不会提交立移。这意味着该消息将在下一次重试时再次被消费。

At-least-once语义通常用于实时数据处理或消费者不能容忍数据丢失的场景。

  • At-most-once消费语义

如果你可以容忍消息丢失,那这个就可以保证消息只消费一次,他的实现就是只要这个消费组消费了该条消息,就直接提交offset。

  • Exactly-once消费语义

Exactly-once消费语义意味着每个消息仅被消费一次,且不会被重复消费。在Exactly-once语义中,Kafka保证消息只被处理一次,同时保持消息的顺序性。为了实现Exactly-once语义,Kafka引入了一个新的概念:事务。

事务是一系列的读写操作,这些操作要么全部成功,要么全部失败。在Kafka中,,产者和消费者都可以使用事务,以保证消息的Exactly-once语义。具体来说,消费者可以使用事务来保证消息的消费和位移提交是原子的,而生产者可以使用事务来保证消息的生产和位移提交是原子的

在Kafka0.11版本之前,实现Exactly-once语义需要一些特殊的配置和设置。但是,在Kafka0.11版本之后,Kafka提供了原生的Exactly-once支持,使得实现Exactly-ondce变得更加简单和可靠。

总之,At-least-once消费语义保证了数据的可靠性,但可能会导致数据重复。而Exactly-once消费语义则解决了重复问题,但需要更复杂的设置和配置。选择哪种消费语义取决于业务需求和数据可靠性要求。

重试机制

在 Kafka 如何保证消息不丢失这里,我们提到了 Kafka 的重试机制。由于这部分内容对于消息可靠性的优化较为重要。

消费过程在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。因此,即使某个消息消费异常,Kafka 消费者仍然能够继续消费后续的消息,不会一直卡在当前消息,保证了业务的正常进行。

默认配置下,Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。

当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?

  • 死信队列(Dead Letter Queue,简称 DLQ) 是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。

Kafka 本身并不直接支持死信队列(Dead Letter Queue,DLQ)。然而,可以通过一些方式来模拟实现类似死信队列的功能。可以通过自定义实现来达到类似的效果。一种常见的做法是创建一个或多个特定的主题来作为“死信主题”。当消息处理出现错误时,将这些消息发送到对应的“死信主题”中进行存储。

实现的方式大致如下:

  • 在消息处理的代码中,添加 try-catch 块来捕获预期或意外的异常。如果没有发生错误,则正常处理消息。如果发生异常,可以将消息发送到专用的“死信主题”。同时,为了便于后续的分析和故障排查,最好在发送到“死信主题”的消息中添加一些额外的信息,例如错误原因等。

另外,一些与 Kafka 相关的框架或组件可能提供了对死信队列的开箱即用支持。例如,Kafka Connect 中可以通过配置来实现一定程度上的死信队列功能。

虽然 Kafka 没有内置的死信队列概念,但通过上述自定义或借助相关框架的方式,仍然可以满足对死信队列的需求,实现对无法正常处理消息的特殊处理和管理。

需注意,具体的实现方式可能会因项目的具体需求和技术架构而有所不同。在实际应用中,需要根据情况选择最适合的方法来模拟死信队列的功能。同时,要确保对“死信主题”中的消息进行适当的监控和处理,以避免这些消息被无限期地忽略或积累

消息积压问题

消息堆积,一般都是因为消费者在消费过程中,由于消费耗时过长或消费并发度较小等原因,导致消费能力不足,出现消息堆积的问题。当线上出现消息堆积的问题时,一般有以下几种方式来解决:

  1. 增加消费者数量:消息堆积了,消费不过来了,那就把消费者的数量增加一下,让更多的实例来消费这些消息。
  2. 提升消费者消费速度:消费者消费的慢可能是消息堆积的主要原因,想办法提升消费速度,比如引入线程池、本地消息存储后即返回成功后续再慢慢消费等。
  3. 降低生产者的生产速度:如果生产者可控的话,可以让生产者生成我消息的速度慢一点。
  4. 清理过期消息:有一些过期消息、或者一直无法成功的消息,在业务做评估之后,如果无影响或者影响不大,其实是可以清理的。
  5. 增加Topic队列数:如果一个Topic的队列数比较少,那么就容易易出现消息堆积的情况。可以通过增加队列数来提高消息的处理并发度,从而减少消息堆积。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/769047.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

DeepFaceLive----AI换脸简单使用

非常强大的软件,官方github https://github.com/iperov/DeepFaceLive 百度云链接: 链接:https://pan.baidu.com/s/1VHY-wxqJXSh5lCn1c4whZg 提取码:nhev 1下载解压软件 下载完成后双击.exe文件进行解压.完成后双击.bat文件打开软件 2 视频使用图片换…

JAVA+SSM+VUE《病人跟踪治疗信息管理系统》

1病人功能模块 病人登录进入病人跟踪治疗信息管理系统可以查看首页、个人中心、病例采集管理、预约管理、医生管理、上传核酸检测报告管理、上传行动轨迹管理、病人治疗状况管理等内容。 病例采集管理,在病例采集管理页面可以查看账号、姓名、住院号、入院时间、病…

2024鲲鹏昇腾创新大赛集训营Ascend C算子学习笔记

异构计算架构(CANN) 对标英伟达的CUDA CuDNN的核心软件层,向上支持多种AI框架,向下服务AI处理器,发挥承上启下的关键作用,是提升昇腾AI处理器计算效率的关键平台。主要包括有各种引擎、编译器、执行器、算…

[leetcode hot 150]第三题,无重复字符的最长子串

题目: 给定一个字符串 s ,请你找出其中不含有重复字符的 最长 子串的长度。 可以使用"滑动窗口"的方法来解决这个问题。基本思路如下: 使用两个指针(start和end)来定义一个窗口移动end指针来扩大窗口,直到遇到重复字符如果遇到重复字符,移动s…

Spring源码九:BeanFactoryPostProcessor

上一篇Spring源码八:容器扩展一,我们看到ApplicationContext容器通过refresh方法中的prepareBeanFactory方法对BeanFactory扩展的一些功能点,包括对SPEL语句的支持、添加属性编辑器的注册器扩展解决Bean属性只能定义基础变量的问题、以及一些…

每周题解:最大半连通子图

题目链接 最大半连通子图 题目描述 一个有向图 G ( V , E ) G\left(V,E\right) G(V,E) 称为半连通的 (Semi-Connected),如果满足: ∀ u , v ∈ V \forall u,v\in V ∀u,v∈V,满足 u → v u\to v u→v 或 v → u v\to u v→u&#xff0…

Go语言实现钉钉机器人接入Dify工作流

go语言实现实现钉钉机器人接入dify工作流,完成ai 流式问答 代码地址 有用的话点个star github地址 效果 配置使用 修改.env_template文件 为.env 设置.env文件内的环境变量 API_KEY: dify的api_keyAPI_URL: dify 的api接口CLIENT_ID : 钉钉机器人应用的idCLIENT…

基于Java的家政预约系统设计与实现

作者介绍:计算机专业研究生,现企业打工人,从事Java全栈开发 主要内容:技术学习笔记、Java实战项目、项目问题解决记录、AI、简历模板、简历指导、技术交流、论文交流(SCI论文两篇) 上点关注下点赞 生活越过…

Docker-compose 实现Prometheus+Grafana监控MySQL及Linux主机

. ├── Grafana │ ├── data │ └── docker-compose.yaml ├── Mysql │ ├── conf │ ├── data │ ├── docker-compose.yaml │ └── logs ├── Mysqld_exporter │ ├── conf │ └── docker-compose.yaml ├── node-exporter │…

RPA 第一课

RPA 是 Robotic Process Automation 的简称,意思是「机器人流程自动化」。 顾名思义,它是一种以机器人(软件)来替代人,实现重复工作自动化的工具。 首先要说一句,RPA 不是 ChatGPT 出来之后的产物&#x…

推荐三款常用接口测试工具!

接口测试是软件开发中至关重要的一环,通过对应用程序接口进行测试,可以验证其功能、性能和稳定性。随着互联网和移动应用的快速发展,接口测试变得越来越重要。为了提高测试效率和质量,开发人员和测试人员需要使用专业的接口测试工…

自然语言处理学习(2)基本知识 文本预处理+文本数据分析+文本增强

conda activate DL conda deactivate课程链接 一 一些包的安装 1 stanfordcorenlp 在anoconda prompt 里面:进入自己的conda环境,pip install stanfordcorenlp 进入方式 相关包下载,Jar包我没有下载下来,太慢了,这个…

提高Python爬虫的匿名性:代理ip的配置策略

在数字化时代的今天,网络数据采集已成为获取信息的重要手段,尤其在竞争激烈的商业环境中。Python作为一种强大的编程语言,广泛应用于开发各种数据爬虫来自动化地抓取网络信息。然而,随着网站安全意识的提高,越来越多的…

牛客小白月赛97

A.三角形 判断等边三角形&#xff0c;题不难&#xff0c;代码如下&#xff1a; #include <iostream>using namespace std;int a[110];int main() {int n;cin >> n;int x;int mx 0;for(int i 1; i < n; i){cin >> x;mx max(mx, x);a[x];}for(int i 1…

Java OnVif应用PTZ控制

研究OnVif在Java程序中应用&#xff0c;在此作记录&#xff0c;onvif-java-lib/release at master milg0/onvif-java-lib GitHub&#xff0c;在此连接中下载jar&#xff0c;并在项目中引用&#xff0c;该jar封装很好&#xff0c;可以方便快速完成功能 1.登录OnVif 2.PTZ控制…

【大数据】—美国交通事故分析(2016 年 2 月至 2020 年 12 月)

引言 在当今快速发展的数字时代&#xff0c;大数据已成为我们理解世界、做出决策的重要工具。特别是在交通安全领域&#xff0c;大数据分析能够揭示事故模式、识别风险因素&#xff0c;并帮助制定预防措施&#xff0c;从而挽救生命。本文将深入探讨2016年2月至2020年12月期间&…

反射(通俗易懂)

一、反射(Reflection) 反射就是:加载类&#xff0c;并允许以编程的方式解剖类中的各种成分(成员变量、方法、构造器等) 动态语言&#xff0c;是一类在运行时可以改变其结构的语言&#xff1a;例如新的函数、对象、甚至代码可以被引进&#xff0c;已有的函数可以被删除或是其他…

强化学习的数学原理:值迭代与策略迭代

概述 从课程地图上可以看出来&#xff0c;这是本门课程中第一次正式的介绍强化学习的算法&#xff0c;并且是一个 model-based 的算法&#xff0c;而在下一节课将会介绍第一个 model-free 的算法&#xff08;在 chapter 5&#xff09;。而这两节和之前所学的 BOE 是密切相关的&…

笔记-python爬虫概述

目录 常用第三方库 爬虫框架 动态页面渲染1. url请求分析2. selenium3. phantomjs4. splash5. spynner 爬虫防屏蔽策略1. 修改User-Agent2. 禁止cookies3. 设置请求时间间隔4. 代理IP池5. 使用Selenium6. 破解验证码常用第三方库 对于爬虫初学者&#xff0c;建议在了解爬虫原…

DEX: Scalable Range Indexing on Disaggregated Memory——论文泛读

arXiv Paper 论文阅读笔记整理 问题 内存优化索引[2&#xff0c;3&#xff0c;18&#xff0c;27&#xff0c;42]对于加速OLTP至关重要&#xff0c;但随着数据大小&#xff08;以及索引大小&#xff09;的增长&#xff0c;对内存容量的需求可能会超过单个服务器所能提供的容量…