0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看威廉希尔官方网站 视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

使用MQ消息队列时需要考虑的问题

我快闭嘴 来源:稀土掘金威廉希尔官方网站 社区 作者:稀土掘金威廉希尔官方网站 社区 2022-09-13 16:37 次阅读


引入 MQ 消息中间件最直接的目的:系统解耦以及流量控制(削峰填谷)

  • 系统解耦: 上下游系统之间的通信相互依赖,利用 MQ 消息队列可以隔离上下游环境变化带来的不稳定因素。
  • 流量控制: 超高并发场景中,引入 MQ 可以实现流量 “削峰填谷” 的作用以及服务异步处理,不至于打崩服务。

引入 MQ 同样带来其他问题:数据一致性。

在分布式系统中,如果两个节点之间存在数据同步,就会带来数据一致性的问题。消息生产端发送消息到 MQ 再到消息消费端需要保证消息不丢失。

1bed7338-3248-11ed-ba43-dac502259ad0.jpg

所以在使用 MQ 消息队列时,需要考虑这 3 个问题:

  • 如何知道有消息丢失?

  • 哪些环节可能丢消息?

  • 如何确保消息不丢失?

    1c0240d8-3248-11ed-ba43-dac502259ad0.jpg

1、如何知道有消息丢失?

如何感知消息是否丢失了?可总结如下:

  1. 他人反馈: 运营、PM 反馈消息丢失。
  2. 监控报警: 监控指定指标,即时报警人工调整。Kafka 集群异常、Broker 宕机、Broker 磁盘挂载问题、消费者异常导致消息积压等都会给用户直接感觉是消息丢失了。

案例:舆情分析中数据采集同步

1c12aa22-3248-11ed-ba43-dac502259ad0.jpg
  • PM 可自己下发采集调度指令,去采集特定数据。
  • PM 可通过 ES 近实时查询对应数据,若没相应数据可再次下发指令。

当感知消息丢失了,那就需要一种机制来检查消息是否丢失。

检索消息

运维工具有:

  1. 查看 Kafka 消费位置:
>基于SpringBoot+MyBatisPlus+Vue&Element实现的后台管理系统+用户小程序,支持RBAC动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
>
>*项目地址:
>*视频教程#查看某个topic的message数量
$./kafka-run-class.shkafka.tools.GetOffsetShell--broker-listlocalhost:9092--topictest_topic


>基于SpringCloudAlibaba+Gateway+Nacos+RocketMQ+Vue&Element实现的后台管理系统+用户小程序,支持RBAC动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
>
>*项目地址:
>*视频教程#查看consumerGroup列表
$./kafka-consumer-groups.sh--list--bootstrap-server192.168.88.108:9092

#查看offset消费情况
$./kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--groupconsole-consumer-1152--describe
GROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
console-consumer-1152test_topic0-4-consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942/127.0.0.1consumer-console-consumer-1152-1
  1. 利用工具:Kafka Tools
1c1fe430-3248-11ed-ba43-dac502259ad0.jpg
  1. 其他可见化界面工具

2、哪些环节可能丢消息?

一条消息从生产到消费完成经历 3 个环节:消息生产者、消息中间件、消息消费者。

1bed7338-3248-11ed-ba43-dac502259ad0.jpg

哪个环节都有可能出现消息丢失问题。

1)生产端

首先要认识到 Kafka 生产端发送消息流程:

调用 send() 方法时,不会立刻把消息发送出去,而是缓存起来,选择恰当时机把缓存里的消息划分成一批数据,通过 Sender 线程按批次发送给服务端 Broker

1c37d91e-3248-11ed-ba43-dac502259ad0.jpg

此环节丢失消息的场景有: 即导致 Producer 消息没有发送成功

  1. 网络波动: 生产者与服务端之间的链路不可达,发送超时。现象是:各端状态正常,但消费端就是没有消费消息,就像丢失消息一样。

  • *解决措施: *重试 props.put("retries", "10");
  • 不恰当配置: 发送消息无 ack 确认; 发送消息失败无回调,无日志。

    producer.send(newProducerRecord<>(topic,messageKey,messageStr),
    newCallBack(){...});
    
  • *解决措施: *设置 acks=1 或者 acks=all。发送消息设置回调。

回顾下重要的参数 acks

  • acks=0:不需要等待服务器的确认. 这是 retries 设置无效. 响应里来自服务端的 offset 总是 -1producer只管发不管发送成功与否。延迟低,容易丢失数据。
  • acks=1:表示 leader 写入成功(但是并没有刷新到磁盘)后即向 producer 响应。延迟中等,一旦 leader 副本挂了,就会丢失数据。
  • acks=all:等待数据完成副本的复制, 等同于 -1. 假如需要保证消息不丢失, 需要使用该设置. 同时需要设置 unclean.leader.election.enabletrue, 保证当 ISR 列表为空时, 选择其他存活的副本作为新的 leader.
2)服务端

先来了解下 Kafka Broker 写入数据的过程:

  1. Broker 接收到一批数据,会先写入内存 PageCacheOS Cache)中。
  2. 操作系统会隔段时间把 OS Cache 中数据进行刷盘,这个过程会是 「异步批量刷盘」
1c46c898-3248-11ed-ba43-dac502259ad0.jpg

这里就有个隐患,如果数据写入 PageCacheKafka Broker宕机会怎样?机子宕机/掉电?

  • Kafka Broker 宕机: 消息不会丢失。因为数据已经写入 PageCache,只等待操作系统刷盘即可。

  • 机子宕机/掉电: 消息会丢失。因为数据仍在内存里,内存RAM 掉电后就会丢失数据。

  • 解决方案 :使用带蓄电池后备电源的缓存 cache,防止系统断电异常。
  1. 对比学习 MySQL 的 “双1” 策略,基本不使用这个策略,因为 “双1” 会导致频繁的 I/O 操作,也是最慢的一种。
  2. 对比学习 RedisAOF 策略,默认且推荐的策略:**Everysec(AOF_FSYNC_EVERYSEC) 每一秒钟保存一次(默认):** 。每个写命令执行完, 只是先把日志写到 AOF 文件的内存缓冲区, 每隔一秒把缓冲区中的内容写入磁盘。

拓展:Kafka 日志刷盘机制

# 推荐采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。
# 针对 broker 配置:
log.flush.interval.messages=10000 # 日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。
log.flush.interval.ms=1000        # 日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。

# 针对 topic 配置:
flush.messages.flush.ms=1000  # topic下每1s刷盘
flush.messages=1              # topic下每个消息都落盘


# 查看 Linux 后台线程执行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10      # 表示当脏页占总内存的的百分比超过这个值时,后台线程开始刷新脏页。
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000    # 表示脏数据多久会被刷新到磁盘上(30秒)。
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500  # 表示多久唤醒一次刷新脏页的后台线程(5秒)。
vm.dirtytime_expire_seconds = 43200

Broker 的可靠性需要依赖其多副本机制: 一般副本数 3 个(配置参数:replication.factor=3

  • Leader Partition 副本:提供对外读写机制。
  • Follower Partition 副本:同步 Leader 数据。
1c4de3c6-3248-11ed-ba43-dac502259ad0.jpg

副本之间的数据同步也可能出现问题:数据丢失问题和数据不一致问题。

解决方案:ISREpoch 机制

  • ISR(In-Sync Replicas) :Le``ader 宕机,可以从 ISR 中选择一个 Follower 作为 Leader

  • Epoch 机制: 解决 Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配问题。

    Tips: Kafka 0.11.x 版本才引入 leader epoch 机制解决高水位机制弊端。

对应需要的配置参数如下:

  1. acks=-1 或者 acks=all 必须所有副本均同步到消息,才能表明消息发送成功。

  2. replication.factor >= 3 副本数至少有 3 个。

  3. min.insync.replicas > 1 代表消息至少写入 2个副本才算发送成功。前提需要 acks=-1

    举个栗子:Leader 宕机了,至少要保证 ISR 中有一个 Follower,这样这个Follwer被选举为Leader 且不会丢失数据。

    公式:replication.factor = min.insync.replicas + 1

  4. unclean.leader.election.enable=false 防止不在 ISR 中的 Follower 被选举为 Leader

    Kafka 0.11.0.0版本开始默认 unclean.leader.election.enable=false

3)消费端

消费端消息丢失场景有:

  1. 消息堆积: 几个分区的消息都没消费,就跟丢消息一样。

  • 解决措施: 一般问题都出在消费端,尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
  • 自动提交: 消费端拉下一批数据,正在处理中自动提交了 offset,这时候消费端宕机了; 重启后,拉到新一批数据,而上一批数据却没处理完。

  • 解决措施: 取消自动提交 auto.commit = false,改为手动 ack
  • 心跳超时,引发 Rebalance 客户端心跳超时,触发 Rebalance被踢出消费组。如果只有这一个客户端,那消息就不会被消费了。

    同时避免两次 poll 的间隔时间超过阈值:

  • max.poll.records:降低该参数值,建议远远小于 <单个线程每秒消费的条数> * <消费线程的个数> * 的积。

  • max.poll.interval.ms: 该值要大于 / (<单个线程每秒消费的条数> * <消费线程的个数>) 的值。

  • 解决措施: 客户端版本升级至 0.10.2 以上版本。

案例:凡凡曾遇到数据同步时,消息中的文本需经过 NLPNER 分析,再同步到 ES

这个过程的主要流程是:

1c5a2208-3248-11ed-ba43-dac502259ad0.jpg
  1. 数据同步程序从 Kafka 中拉取消息。
  2. 数据同步程序将消息内的文本发送的 NER 进行分析,得到特征数组。
  3. 数据同步程序将消息同步给 ES

现象:线上数据同步程序运行一段时间后,消息就不消费了。

  • 排查日志: 发现有 Rebalance 日志,怀疑是客户端消费太慢被踢出了消费组。
  • 本地测试: 发现运行一段时间也会出现 Rebalance,且 NLPNER 服务访问 HTTP 500 报错。
  • 得出结论:NER服务异常,导致数据同步程序消费超时。且当时客户端版本为 v0.10.1Consumer 没有独立线程维持心跳,而是把心跳维持与 poll 接口耦合在一起,从而也会造成心跳超时。

当时解决措施是:

  1. session.timeout.ms 设置为 25s,当时没有升级客户端版本,怕带来其他问题。
  2. 熔断机制: 增加 Hystrix,超过 3 次服务调用异常就熔断,保护客户端正常消费数据。

3、如何确保消息不丢失?

掌握这些技能:

  1. 熟悉消息从发送到消费的每个阶段
  2. 监控报警 Kafka 集群
  3. 熟悉方案 “MQ 可靠消息投递”
怎么确保消息 100% 不丢失?

到这,总结下:

  1. 生产端:
  • 设置重试:props.put("retries", "10");
  • 设置 acks=all
  • 设置回调:producer.send(msg, new CallBack(){...});
  1. Broker:
  • 内存:使用带蓄电池后备电源的缓存 cache
  • Kafka 版本 0.11.x 以上:支持 Epoch 机制。
  • replication.factor >= 3 副本数至少有 3 个。
  • min.insync.replicas > 1 代表消息至少写入 2个副本才算发送成功。前提需要 acks=-1
  • unclean.leader.election.enable=false 防止不在 ISR 中的 Follower 被选举为 Leader
  1. 消费端
  • 客户端版本升级至 0.10.2 以上版本。
  • 取消自动提交 auto.commit = false,改为手动 ack
  • 尽量提高客户端的消费速度,消费逻辑另起线程进行处理。


审核编辑:汤梓红


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 消息队列
    +关注

    关注

    0

    文章

    33

    浏览量

    2987
  • kafka
    +关注

    关注

    0

    文章

    51

    浏览量

    5222

原文标题:案例 | Kafka 为什么会丢消息?

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    Linux下进程通讯消息队列

    MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已。MQ 是在消息的传输过程中保存消息的容器。多用于分
    的头像 发表于 08-19 19:56 1830次阅读
    Linux下进程通讯消息<b class='flag-5'>队列</b>

    RT-thread内核之消息队列

    ; pointer indicated the free node of queue *///指向空闲队列};typedef struct rt_messagequeue *rt_mq_t;#endif
    发表于 03-06 17:17

    请问ucosIII的消息队列怎么使用?

    POST消息,但是 接收到的消息 是空的不知道是哪的原因。并且 怎么设置 消息队列的数据 存储区呢 ?我记得在 ucosII 中可以直接 定义 :gsm_req_event = OSQCreate(gsm_req_mq, MAX_GSM_REQ_
    发表于 08-06 04:36

    RT-Thread系统消息队列常用的函数接口有哪些

    struct rt_messagequeue 表示。另外 rt_mq_t 表示消息队列的句柄,即指向消息队列控制块的指针。消息队列控制块的数据结构定义如下:结构体定义中,继承关系一目
    发表于 03-31 14:14

    使用消息队列的rt_mq_send参数如果不相同会怎么样

    求助1.看论坛的文章里这里写的消息队列不可以直接发变长数据吗?意思就是使用rt_mq_send函数的时候,size参数必须和rt_mq_create中的msg_size相同吗?如果不相同会怎么样?2.多个不同优先级的线程和中断向
    发表于 07-29 10:11

    rt_mq_recv函数是怎么从消息队列读取到消息的呢

    在使用rt_mq_recv函数是,遇到这样一段代码:rt_uint8_t rx_size;while(1){ //从消息队列中获取一条信息 ret = rt_mq
    发表于 08-25 14:30

    有什么方法解决RTT消息队列的数据发送问题

    静态创建了一个消息队列struct rt_messagequeue usart2_mq;static rt_uint8_t msg_pool[300];result = rt_mq
    发表于 08-31 14:37

    串口open参数对消息队列rt_mq_recv执行的影响线程假死如何解决?

    = rt_mq_send(&rx_mq, &msg, sizeof(msg)); if ( result == -RT_EFULL) {/* 消息队列满 */rt_kprintf("
    发表于 02-08 10:51

    创建消息队列失败,STM32F103RET6使用rt_mq_init创建消息队列出错怎么排查啊

    user_task_thread()在mian进入 使用rt_mq_init创建消息队列出错,出现HardFault_Handler 断点调试最后到 rt_mq
    发表于 07-31 09:40

    发送队列长度功率控制

    无线多跳网络具有信道时变性强、拓扑动态变化等特点,需要简单高效的功率控制机制。发射功率影响数据发送速率,而基于发送队列长度的功率控制机制存在可行解。为此,结合无线多跳网络中间节点需要协助其他节点进行
    发表于 03-20 15:07 0次下载
    发送<b class='flag-5'>队列</b>长度功率控制

    Linux IPC POSIX 消息队列

    模型:#include#include #include mq_open() //创建/获取消息队列fd mq_get() //设置/获取消息队列
    发表于 04-02 14:46 581次阅读

    引入消息队列会多出哪些问题

    前言 最近,消息队列(Message Queue ,简称 MQ)越来越火。很多公司在用,很多人在用,其重要性不言而喻。 如果让你回答下面这些问题,你的心中是否有答案了呢? 为什么要用 MQ? 引入
    的头像 发表于 09-23 14:53 1728次阅读

    设计一个MQ需要考虑哪些问题

    本文主要讲解 MQ 的通用知识,让大家先弄明白:如果让你来设计一个 MQ,该如何下手?需要考虑哪些问题?又有哪些威廉希尔官方网站 挑战? 有了这个基础后,我相信后面几篇文章再讲 Kafka 和 Ro
    的头像 发表于 11-19 14:21 1939次阅读

    消息队列经典十连问

    我们通常说的消息队列,简称MQ(Message Queue),它其实就指消息中间件,当前业界比较流行的开源消息中间件包括:RabbitMQ、RocketMQ、Kafka。
    的头像 发表于 03-22 10:08 1277次阅读

    MQ消息乱序问题解析与实战解决方案

    作者:京东物流 刘浩 1. 背景 在分布式系统中,消息队列MQ)是实现系统解耦、异步通信的重要工具。然而,MQ消费时出现的消息乱序问题,经常会对业务逻辑的正确执行和系统稳定性产生不良影响。本文将
    的头像 发表于 12-06 09:46 190次阅读