当前位置: 首页 > news >正文

当参数调优无法解决kafka消息积压时可以这么做

今天的议题是:如何快速处理kafka的消息积压

通常的做法有以下几种:

  1. 增加消费者数
  2. 增加 topic 的分区数,从而进一步增加消费者数
  3. 调整消费者参数,如max.poll.records
  4. 增加硬件资源

常规手段不是本文的讨论重点或者当上面的手段已经使用过依然存在很严重的消息积压时该怎么办?本文给出一种增加消费者消费速率的方案。我们知道消息积压往往是因为生产速率远大于消费速率,本文的重点就是通过提高消费速率来解决消息积压。

经验判断,消费速率低下的主要原因往往都是数据处理时间长,业务逻辑复杂最终导致一次 poll 的时间被无限拉长,如果可以通过增加数据处理的线程数来降低一次 poll 的时间那么问题就解决了。但是需要注意一下几点:

  1. 业务逻辑对乱序数据不敏感,因为并行一定会导致乱序问题
  2. kafka 的消费者是线程不安全的
  3. 如何提交 offset

基于上述几点,思路就是消费者 poll 下来一批数据,交给多个线程去并行处理,消费者等待所有线程执行完后提交。为了减少线程的创建与销毁则维护一个线程池。代码如下:

第一步:创建一个MultipleConsumer类用于封装消费者和线程池

public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}
}

理论上相较于传统的消费速率可以提升 threadNum 倍。

第二步:因为需要并行处理一批 poll 数据,因此需要对数据进行切分,切分逻辑如下

private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}

这里采用轮训的方式且切分的个数与 threadNum 一致,尽可能保证每个线程处理的数据数量相差不大

第三步:定义一个静态内部类用来处理数据,并处理同步逻辑(因为需要等待所有线程执行完再提交 offset)

private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}

使用 CountDownLatch 实现线程同步逻辑,假设每条数据的业务处理时间为 1 s

第四步:消费者 poll 逻辑

public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}

完整代码如下:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;/*** @author wjun* @date 2023/3/1 14:50* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}public void stop() {isRunning = false;threadPool.shutdown();}private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}
}

测试一下:

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;/*** @author wjun* @date 2023/3/1 16:03* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumerTest {private static final Properties properties = new Properties();private static final List<String> topics = new ArrayList<>();public static void before() {properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test");properties.put("enable.auto.commit", "false");properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");topics.add("multiple_demo");}public static void main(String[] args) {new MultipleConsumer(properties, topics, 5).start();}
}

20 条数据的处理事件只需要 4s(threadNume = 5,即缩短 5 倍)

image-20230301172739782

但是此方法的缺点:

  1. 只适用于业务逻辑复杂导致的处理时间长的场景
  2. 对数据乱序不敏感的业务场景

相关文章:

当参数调优无法解决kafka消息积压时可以这么做

今天的议题是&#xff1a;如何快速处理kafka的消息积压 通常的做法有以下几种&#xff1a; 增加消费者数增加 topic 的分区数&#xff0c;从而进一步增加消费者数调整消费者参数&#xff0c;如max.poll.records增加硬件资源 常规手段不是本文的讨论重点或者当上面的手段已经使…...

Java线程池源码分析

Java 线程池的使用&#xff0c;是面试必问的。下面我们来从使用到源码整理一下。 1、构造线程池 通过Executors来构造线程池 1、构造一个固定线程数目的线程池&#xff0c;配置的corePoolSize与maximumPoolSize大小相同&#xff0c; 同时使用了一个无界LinkedBlockingQueue存…...

手撕八大排序(下)

目录 交换排序 冒泡排序&#xff1a; 快速排序 Hoare法 挖坑法 前后指针法【了解即可】 优化 再次优化&#xff08;插入排序&#xff09; 迭代法 其他排序 归并排序 计数排序 排序总结 结束了上半章四个较为简单的排序&#xff0c;接下来的难度将会大幅度上升&…...

SAP 详细解析SCC4

事务代码&#xff1a;SCC4&#xff0c;选择一个客户端&#xff0c;点击进入&#xff0c;如图&#xff1a; 一、客户端角色 客户控制&#xff1a;客户的角色&#xff08;生产性&#xff0c;测试&#xff0c;...&#xff09; 此属性表示 R/3 系统中的客户端角色。其中可能包括…...

java异常分类和finally代码块中return语句的影响

首先看一下java中异常相关类的继承关系&#xff1a; 引用 1、分类 异常可以分为受查异常和非受查异常&#xff0c;Error和RuntimeException及其所有的子类都是非受查异常&#xff0c;其他的是受查异常。 两者的区别主要在&#xff1a; 受检的异常是由编译器&#xff08;编译…...

【链表OJ题(二)】链表的中间节点

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;数据结构 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录链表OJ题(二)1. 链表…...

【强烈建议收藏:MySQL面试必问系列之并发事务锁专题】

一.知识回顾 上节课我们一起学习了MySQL面试必问系列之事务&#xff0c;没有学习的同学可以看一下上一篇文章&#xff0c;肯定对你会有帮助&#xff0c;学习过的同学肯定知道&#xff0c;上节课我们留了一个小尾巴&#xff0c;这个小尾巴是什么呢&#xff1f;就是没有详细展开…...

Linux下使用Makefile实现条件编译

在Linux系统下Makefile和C/C语言都有提供条件选择编译的语法&#xff0c;就是在编译源码的时候&#xff0c;可以选择性地编译指定的代码。这种条件选择编译的使用场合有好多&#xff0c;例如我们开发一个兼容标准版本与定制版本兼容的项目&#xff0c;那么&#xff0c;一些与需…...

java 应用cpu飙升(超过100%)故障排查

前言害。。。昨天刚写完一份关于jvm问题排查相关的博客&#xff0c;今天线上项目就遇到了一个突发问题。现象是用户反映系统非常卡&#xff0c;无法操作。然后登录服务器查看发现cpu 一直100%以上。具体排查步骤&#xff1a;1&#xff0c;首先top命令查看服务器cpu等情况&#…...

光学设计软件Ansys的Lumerical 2023版本下载与安装使用

文章目录前言一、许可管理工具安装二、许可管理器配置三、Lumerical安装四、工具使用配置总结前言 Lumerical是一款功能强大的软件&#xff0c;用于设计和分析从组件到系统阶段的光子学和电磁学。这个版本的Lumerical改进了电子和光子学设计工具&#xff0c;用于复杂光子学&am…...

Java 异常

文章目录1. 异常概述2. JVM 的默认处理方案3. 异常处理之 try...catch4. Throwable 的成员方法5. 编译异常和运行异常的区别6. 异常处理之 throws7. 自定义异常8. throws 和 throw 的区别1. 异常概述 异常就是程序出现了不正常的情况。 ① Error&#xff1a;严重问题&#xff…...

JavaSE学习笔记day17

零、 复习昨日 File: 通过路径代表一个文件或目录 方法: 创建型,查找类,判断类,其他 IO 输入& 输出字节&字符 try-catch代码 一、作业 给定路径删除该文件夹 public static void main(String[] args) {deleteDir(new File("E:\\A"));}// 删除文件夹public s…...

【项目】Vue3+TS 动态路由 面包屑 查询重置 列表

&#x1f4ad;&#x1f4ad; ✨&#xff1a;【项目】Vue3TS 动态路由 面包屑 查询重置 列表   &#x1f49f;&#xff1a;东非不开森的主页   &#x1f49c;: 热烈的不是青春&#xff0c;而是我们&#x1f49c;&#x1f49c;   &#x1f338;: 如有错误或不足之处&#xff0…...

前脚背完这些接口自动化测试面试题,后脚就进了字节测试岗

1、请结合你熟悉的项目&#xff0c;介绍一下你是怎么做测试的&#xff1f; -首先要自己熟悉项目&#xff0c;熟悉项目的需求、项目组织架构、项目研发接口等 -功能 接口 自动化 性能 是怎么处理的&#xff1f; -第一步&#xff1a; 进行需求分析&#xff0c;需求评审&#…...

termux 安装centos

相关链接 centos官网rootfs制作其他人提供的安装脚本centos镜像列表其他人提供的安装脚本的说明 如果想使用老版本的centos7跟着上面链接5走就行 如果想用新系统比如centos9 stream&#xff0c;就跟我来 Q:为什么要装新系统? A:旧系统太多软件已过时&#xff0c;升级费时费…...

从菜鸟程序员到高级架构师,竟然是因为这个字final

final实现原理 简介 final关键字&#xff0c;实际的含义就一句话&#xff0c;不可改变。什么是不可改变&#xff1f;就是初始化完成之后就不能再做任何的修改&#xff0c;修饰成员变量的时候&#xff0c;成员变量变成一个常数&#xff1b;修饰方法的时候&#xff0c;方法不允…...

【vulhub漏洞复现】CVE-2018-2894 Weblogic任意文件上传漏洞

一、漏洞详情影响版本weblogic 10.3.6.0、weblogic 12.1.3.0、weblogic 12.2.1.2、weblogic 12.2.1.3WebLogic是美国Oracle公司出品的一个application server&#xff0c;确切的说是一个基于JAVAEE架构的中间件&#xff0c;WebLogic是用于开发、集成、部署和管理大型分布式Web应…...

函数栈帧详解

写在前面 这个模块临近C语言的边界&#xff0c;学起来需要一定的时间&#xff0c;不过当我们知道这些知识后&#xff0c;在C语言函数这块我们看到的不仅仅是表象了&#xff0c;可以真正了解函数是怎么调用的。不过我的能力有限&#xff0c;下面的的知识若是不当&#xff0c;还…...

Spring 事务(编程式事务、声明式事务@Transactional、事务隔离级别、事务传播机制)

文章目录1. 事务的定义2. Spring 中事务的实现2.1 MySQL 中使用事务2.2 Spring 中编程式事务的实现2.3 Spring 中声明式事务2.3.1 声明式事务的实现 Transactional2.3.2 Transactional 作用域2.3.3Transactional 参数设置2.3.4 Transactional 异常情况2.3.5 Transactional 工作…...

车载技术——Window Display之surface的绘制过程与原理

一、Surface 概述 OpenGL ES/Skia定义了一组绘制接口的规范&#xff0c;为什么能够跨平台&#xff1f; 本质上需要与对应平台上的本地窗口建立连接。也就是说OpenGL ES负责输入了绘制的命令&#xff0c;但是需要一个 “画布” 来承载输出结果&#xff0c;最终展示到屏幕。这个…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

1688商品列表API与其他数据源的对接思路

将1688商品列表API与其他数据源对接时&#xff0c;需结合业务场景设计数据流转链路&#xff0c;重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点&#xff1a; 一、核心对接场景与目标 商品数据同步 场景&#xff1a;将1688商品信息…...

EtherNet/IP转DeviceNet协议网关详解

一&#xff0c;设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络&#xff0c;本网关连接到EtherNet/IP总线中做为从站使用&#xff0c;连接到DeviceNet总线中做为从站使用。 在自动…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...

Mysql8 忘记密码重置,以及问题解决

1.使用免密登录 找到配置MySQL文件&#xff0c;我的文件路径是/etc/mysql/my.cnf&#xff0c;有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...

【JavaSE】多线程基础学习笔记

多线程基础 -线程相关概念 程序&#xff08;Program&#xff09; 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序&#xff0c;比如我们使用QQ&#xff0c;就启动了一个进程&#xff0c;操作系统就会为该进程分配内存…...

C# 表达式和运算符(求值顺序)

求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如&#xff0c;已知表达式3*52&#xff0c;依照子表达式的求值顺序&#xff0c;有两种可能的结果&#xff0c;如图9-3所示。 如果乘法先执行&#xff0c;结果是17。如果5…...

Leetcode33( 搜索旋转排序数组)

题目表述 整数数组 nums 按升序排列&#xff0c;数组中的值 互不相同 。 在传递给函数之前&#xff0c;nums 在预先未知的某个下标 k&#xff08;0 < k < nums.length&#xff09;上进行了 旋转&#xff0c;使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...

Android写一个捕获全局异常的工具类

项目开发和实际运行过程中难免会遇到异常发生&#xff0c;系统提供了一个可以捕获全局异常的工具Uncaughtexceptionhandler&#xff0c;它是Thread的子类&#xff08;就是package java.lang;里线程的Thread&#xff09;。本文将利用它将设备信息、报错信息以及错误的发生时间都…...