Kafka集群消息积压问题及处理策略

VSole2022-08-04 16:45:53

通常情况下,企业中会采取轮询或者随机的方式,通过Kafka的producer向Kafka集群生产数据,来尽可能保证Kafk分区之间的数据是均匀分布的。

在分区数据均匀分布的前提下,如果我们针对要处理的topic数据量等因素,设计出合理的Kafka分区数量。对于一些实时任务,比如Spark Streaming/Structured-Streaming、Flink和Kafka集成的应用,消费端不存在长时间"挂掉"的情况即数据一直在持续被消费,那么一般不会产生Kafka数据积压的情况。

但是这些都是有前提的,当一些意外或者不合理的分区数设置情况的发生,积压问题就不可避免。

Kafka消息积压的典型场景:

1.实时/消费任务挂掉

比如,我们写的实时应用因为某种原因挂掉了,并且这个任务没有被监控程序监控发现通知相关负责人,负责人又没有写自动拉起任务的脚本进行重启。

那么在我们重新启动这个实时应用进行消费之前,这段时间的消息就会被滞后处理,如果数据量很大,可就不是简单重启应用直接消费就能解决的。

2.Kafka分区数设置的不合理(太少)和消费者"消费能力"不足

Kafka单分区生产消息的速度qps通常很高,如果消费者因为某些原因(比如受业务逻辑复杂度影响,消费时间会有所不同),就会出现消费滞后的情况。

此外,Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会影响Kafka consumer消费的吞吐量。

3.Kafka消息的key不均匀,导致分区间数据不均衡

在使用Kafka producer消息时,可以为消息指定key,但是要求key要均匀,否则会出现Kafka分区间数据不均衡。

那么,针对上述的情况,有什么好的办法处理数据积压呢?

一般情况下,针对性的解决办法有以下几种:

1.实时/消费任务挂掉导致的消费滞后

a.任务重新启动后直接消费最新的消息,对于"滞后"的历史数据采用离线程序进行"补漏"。

此外,建议将任务纳入监控体系,当任务出现问题时,及时通知相关负责人处理。当然任务重启脚本也是要有的,还要求实时框架异常处理能力要强,避免数据不规范导致的不能重新拉起任务。

b.任务启动从上次提交offset处开始消费处理

如果积压的数据量很大,需要增加任务的处理能力,比如增加资源,让任务能尽可能的快速消费处理,并赶上消费最新的消息

2.Kafka分区少了

如果数据量很大,合理的增加Kafka分区数是关键。如果利用的是Spark流和Kafka direct approach方式,也可以对KafkaRDD进行repartition重分区,增加并行度处理。

3.由于Kafka消息key设置的不合理,导致分区数据不均衡

可以在Kafka producer处,给key加随机后缀,使其均衡。

kafka
本作品采用《CC 协议》,转载必须注明作者和本文链接
如果是在消费端丢失数据,那么多次消费结果完全一模一样的几率很低。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。网络负载很高或者磁盘很忙写入失败的情况下,没有自动重试重发消息。
Kafka消息积压的典型场景:1.实时/消费任务挂掉比如,我们写的实时应用因为某种原因挂掉了,并且这个任务没有被监控程序监控发现通知相关负责人,负责人又没有写自动拉起任务的脚本进行重启。此外,Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会影响Kafka consumer消费的吞吐量。
Kafka consumer为了及时消费消息,会以Consumer Group(消费组)的形式,启动多个consumer消费消息。不同的消费组在消费消息时彼此互不影响,同一个消费组的consumer协调在一起消费订阅的topic所有分区消息。
随机读写会导致寻址时间延长,从而影响磁盘的读写速度。而Kafka在将数据持久化到磁盘时,采用只追加的顺序写,有效降低了寻址时间,提高效率。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。对应到Kafka生产和消费消息中:producer把消息发到broker后,数据并不是直接落入磁盘的,而是先进入PageCache。
Apache Kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。
分布式流平台Kafka
2022-08-02 10:13:27
无论消息是否被消费,Kafka集群都会持久的保存所有发布的消息,直到过期。Kafka中采用分区的设计主要有两个目的:第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。在Kafka中实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。
为了存放这些元数据,kafka 集群会为每一个 partition 在 zk 集群上创建一个节点,partition 的数量直接决定了 zk 上的节点数。当 partition 数量变多,意味着单个 broker 节点上的 partitiion Leader 切换时间变长。
假设Mysql中canal_test库下有一张表policy_cred,需要统计实时统计policy_status状态为1的mor_rate的的变化趋势,并标注比率的风险预警等级。?本次安装的canal版本为1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ选用Kafka。服务器系统为Centos
Kafka只有在消息提交之后,才会将消息暴露给消费者。不论是producer端还是consumer端发往partition的请求,都通过leader数据副本所在的broker进行处理。这个过程由Kafka controller节点broker自动完成,主要是从Zookeeper读取和修改受影响partition的一些元数据信息
在企业实时处理架构中,通常将spark streaming和kafka集成作为整个大数据处理架构的核心环节之一。
VSole
网络安全专家