引入 MQ
消息中间件最直接的目的:系统解耦以及流量控制(削峰填谷)
-
系统解耦: 上下游系统之间的通信相互依赖,利用
MQ
消息队列可以隔离上下游环境变化带来的不稳定因素。 -
流量控制: 超高并发场景中,引入
MQ
可以实现流量 “削峰填谷” 的作用以及服务异步处理,不至于打崩服务。
引入 MQ
同样带来其他问题:数据一致性。
在分布式系统中,如果两个节点之间存在数据同步,就会带来数据一致性的问题。消息生产端发送消息到
MQ
再到消息消费端需要保证消息不丢失。
所以在使用 MQ
消息队列时,需要考虑这 3 个问题:
1、如何知道有消息丢失?
如何感知消息是否丢失了?可总结如下:
-
他人反馈: 运营、
PM
反馈消息丢失。 -
监控报警: 监控指定指标,即时报警人工调整。
Kafka
集群异常、Broker
宕机、Broker
磁盘挂载问题、消费者异常导致消息积压等都会给用户直接感觉是消息丢失了。
案例:舆情分析中数据采集同步
-
PM
可自己下发采集调度指令,去采集特定数据。 -
PM
可通过ES
近实时查询对应数据,若没相应数据可再次下发指令。
当感知消息丢失了,那就需要一种机制来检查消息是否丢失。
检索消息
运维工具有:
-
查看
Kafka
消费位置:
>基于SpringBoot+MyBatisPlus+Vue&Element实现的后台管理系统+用户小程序,支持RBAC动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
>
>*项目地址:
>*视频教程:#查看某个topic的message数量
$./kafka-run-class.shkafka.tools.GetOffsetShell--broker-listlocalhost:9092--topictest_topic
>基于SpringCloudAlibaba+Gateway+Nacos+RocketMQ+Vue&Element实现的后台管理系统+用户小程序,支持RBAC动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
>
>*项目地址:
>*视频教程:#查看consumerGroup列表
$./kafka-consumer-groups.sh--list--bootstrap-server192.168.88.108:9092
#查看offset消费情况
$./kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--groupconsole-consumer-1152--describe
GROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
console-consumer-1152test_topic0-4-consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942/127.0.0.1consumer-console-consumer-1152-1
-
利用工具:
Kafka Tools
- 其他可见化界面工具
2、哪些环节可能丢消息?
一条消息从生产到消费完成经历 3 个环节:消息生产者、消息中间件、消息消费者。
哪个环节都有可能出现消息丢失问题。
1)生产端
首先要认识到 Kafka
生产端发送消息流程:
调用
send()
方法时,不会立刻把消息发送出去,而是缓存起来,选择恰当时机把缓存里的消息划分成一批数据,通过Sender
线程按批次发送给服务端Broker
。
此环节丢失消息的场景有: 即导致 Producer
消息没有发送成功
-
网络波动: 生产者与服务端之间的链路不可达,发送超时。现象是:各端状态正常,但消费端就是没有消费消息,就像丢失消息一样。
-
*解决措施: *重试
props.put("retries", "10");
-
不恰当配置: 发送消息无
ack
确认; 发送消息失败无回调,无日志。producer.send(newProducerRecord<>(topic,messageKey,messageStr), newCallBack(){...});
-
*解决措施: *设置
acks=1
或者acks=all
。发送消息设置回调。
回顾下重要的参数: acks
-
acks=0
:不需要等待服务器的确认. 这是retries
设置无效. 响应里来自服务端的offset
总是-1
,producer
只管发不管发送成功与否。延迟低,容易丢失数据。 -
acks=1
:表示leader
写入成功(但是并没有刷新到磁盘)后即向producer
响应。延迟中等,一旦leader
副本挂了,就会丢失数据。 -
acks=all
:等待数据完成副本的复制, 等同于-1
. 假如需要保证消息不丢失, 需要使用该设置. 同时需要设置unclean.leader.election.enable
为true
, 保证当ISR
列表为空时, 选择其他存活的副本作为新的leader
.
2)服务端
先来了解下 Kafka Broker
写入数据的过程:
-
Broker
接收到一批数据,会先写入内存PageCache
(OS Cache
)中。 -
操作系统会隔段时间把
OS Cache
中数据进行刷盘,这个过程会是 「异步批量刷盘」 。
这里就有个隐患,如果数据写入 PageCache
后 Kafka Broker
宕机会怎样?机子宕机/掉电?
-
Kafka Broker
宕机: 消息不会丢失。因为数据已经写入PageCache
,只等待操作系统刷盘即可。 -
机子宕机/掉电: 消息会丢失。因为数据仍在内存里,内存
RAM
掉电后就会丢失数据。
- 解决方案 :使用带蓄电池后备电源的缓存
cache
,防止系统断电异常。
- 对比学习
MySQL
的 “双1” 策略,基本不使用这个策略,因为 “双1” 会导致频繁的I/O
操作,也是最慢的一种。- 对比学习
Redis
的AOF
策略,默认且推荐的策略:**Everysec
(AOF_FSYNC_EVERYSEC
) 每一秒钟保存一次(默认):** 。每个写命令执行完, 只是先把日志写到AOF
文件的内存缓冲区, 每隔一秒把缓冲区中的内容写入磁盘。
拓展:Kafka
日志刷盘机制
# 推荐采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。
# 针对 broker 配置:
log.flush.interval.messages=10000 # 日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。
log.flush.interval.ms=1000 # 日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。
# 针对 topic 配置:
flush.messages.flush.ms=1000 # topic下每1s刷盘
flush.messages=1 # topic下每个消息都落盘
# 查看 Linux 后台线程执行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10 # 表示当脏页占总内存的的百分比超过这个值时,后台线程开始刷新脏页。
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000 # 表示脏数据多久会被刷新到磁盘上(30秒)。
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500 # 表示多久唤醒一次刷新脏页的后台线程(5秒)。
vm.dirtytime_expire_seconds = 43200
Broker
的可靠性需要依赖其多副本机制: 一般副本数 3 个(配置参数:replication.factor=3
)
-
Leader Partition
副本:提供对外读写机制。 -
Follower Partition
副本:同步Leader
数据。
副本之间的数据同步也可能出现问题:数据丢失问题和数据不一致问题。
解决方案:ISR
和 Epoch
机制
-
ISR
(In-Sync Replicas
) : 当Le``ader
宕机,可以从ISR
中选择一个Follower
作为Leader
。 -
Epoch
机制: 解决Leader
副本高水位更新和Follower
副本高水位更新在时间上是存在错配问题。Tips
:Kafka 0.11.x
版本才引入leader epoch
机制解决高水位机制弊端。
对应需要的配置参数如下:
-
acks=-1
或者acks=all
: 必须所有副本均同步到消息,才能表明消息发送成功。 -
replication.factor >= 3
: 副本数至少有 3 个。 -
min.insync.replicas > 1
: 代表消息至少写入 2个副本才算发送成功。前提需要acks=-1
。举个栗子:
Leader
宕机了,至少要保证ISR
中有一个Follower
,这样这个Follwer
被选举为Leader
且不会丢失数据。公式:
replication.factor = min.insync.replicas + 1
-
unclean.leader.election.enable=false
: 防止不在ISR
中的Follower
被选举为Leader
。Kafka 0.11.0.0版本开始默认
unclean.leader.election.enable=false
3)消费端
消费端消息丢失场景有:
-
消息堆积: 几个分区的消息都没消费,就跟丢消息一样。
- 解决措施: 一般问题都出在消费端,尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
-
自动提交: 消费端拉下一批数据,正在处理中自动提交了
offset
,这时候消费端宕机了; 重启后,拉到新一批数据,而上一批数据却没处理完。
-
解决措施: 取消自动提交
auto.commit = false
,改为手动ack
。
-
心跳超时,引发
Rebalance
: 客户端心跳超时,触发Rebalance
被踢出消费组。如果只有这一个客户端,那消息就不会被消费了。同时避免两次
poll
的间隔时间超过阈值:
-
max.poll.records
:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> *
的积。 -
max.poll.interval.ms
: 该值要大于/ (<单个线程每秒消费的条数> * <消费线程的个数>)
的值。 -
解决措施: 客户端版本升级至
0.10.2
以上版本。
案例:凡凡曾遇到数据同步时,消息中的文本需经过 NLP
的 NER
分析,再同步到 ES
。
这个过程的主要流程是:
-
数据同步程序从
Kafka
中拉取消息。 -
数据同步程序将消息内的文本发送的
NER
进行分析,得到特征数组。 -
数据同步程序将消息同步给
ES
。
现象:线上数据同步程序运行一段时间后,消息就不消费了。
-
排查日志: 发现有
Rebalance
日志,怀疑是客户端消费太慢被踢出了消费组。 -
本地测试: 发现运行一段时间也会出现
Rebalance
,且NLP
的NER
服务访问HTTP 500
报错。 -
得出结论: 因
NER
服务异常,导致数据同步程序消费超时。且当时客户端版本为v0.10.1
,Consumer
没有独立线程维持心跳,而是把心跳维持与poll
接口耦合在一起,从而也会造成心跳超时。
当时解决措施是:
-
session.timeout.ms
: 设置为25s
,当时没有升级客户端版本,怕带来其他问题。 -
熔断机制: 增加
Hystrix
,超过 3 次服务调用异常就熔断,保护客户端正常消费数据。
3、如何确保消息不丢失?
掌握这些技能:
- 熟悉消息从发送到消费的每个阶段
-
监控报警
Kafka
集群 - 熟悉方案 “MQ 可靠消息投递”
怎么确保消息 100% 不丢失?
到这,总结下:
- 生产端:
-
设置重试:
props.put("retries", "10");
-
设置
acks=all
-
设置回调:
producer.send(msg, new CallBack(){...});
- Broker:
-
内存:使用带蓄电池后备电源的缓存
cache
。 -
Kafka
版本0.11.x
以上:支持Epoch
机制。 -
replication.factor >= 3
: 副本数至少有 3 个。 -
min.insync.replicas > 1
: 代表消息至少写入 2个副本才算发送成功。前提需要acks=-1
。 -
unclean.leader.election.enable=false
: 防止不在ISR
中的Follower
被选举为Leader
。
- 消费端
-
客户端版本升级至
0.10.2
以上版本。 -
取消自动提交
auto.commit = false
,改为手动ack
。 - 尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
审核编辑:汤梓红
-
消息队列
+关注
关注
0文章
33浏览量
2987 -
kafka
+关注
关注
0文章
51浏览量
5222
原文标题:案例 | Kafka 为什么会丢消息?
文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
评论