Kafka系列之如何提高消费者消费速度
前言
在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速度可能决定系统性能瓶颈。
实现方案
为了提高消费者的消费速度,我们可以采取以下措施:
- 将主题的分区数量增大,如 20,通过
concurrency将消费者的消费线程数增大到 10(2个pod),提高消息处理的并发能力。 - 将每次批量拉取消息的数量
max.poll.records增大到 500,提高单次处理消息的数量。 - 将消息切分成批次,将单个批次的数据处理业务逻辑放进线程池中异步进行,提高并发处理消息的速度。
- 将异步线程池的拒绝模式调整为 CallerRunsPolicy,这个配置非常重要。当线程池的任务队列已满且所有线程都在忙碌时,新的任务将由提交任务的线程(即调用者线程)来执行。否则在消息量特别大的情况下,很可能会因为线程池任务队列满了而丢失数据。
- 将异步线程池的队列容量设置为 0,这样意味着所有任务必须立即由线程池中的线程来处理,减少在队列中的等待时间。
- 在数据上报的时候进行幂等性验证,防止重复上报数据。
@Component
public class OrderConsumer {@Resource(name = "execThreadPool")private ThreadPoolTaskExecutor execThreadPool;@KafkaListener(id = "record_consumer",topics = "record",groupId = "g_record_consumer",concurrency = "10",properties = {"max.poll.interval.ms:300000", "max.poll.records:500"})public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {execThreadPool.submit(()-> {// 业务逻辑});ack.acknowledge();}}
ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现,用于管理和执行多线程任务。它是 TaskExecutor 接口的实现,提供了在 Spring 应用程序中创建和配置线程池的便捷方式。
ThreadPoolTaskExecutor主要特点:
-
线程池配置: ThreadPoolTaskExecutor 允许你配置核心线程数、最大线程数、队列容量等线程池属性。
-
线程创建和销毁: 它会根据任务的需求自动创建和销毁线程,避免不必要的线程创建和销毁开销。
-
线程复用: 线程池中的线程可以被复用,从而减少线程创建的开销。
-
队列管理: 当线程池达到最大线程数时,新任务会被放入队列中等待执行。
-
拒绝策略: 当线程池已满并且队列也已满时,可以配置拒绝策略来处理新任务的方式。
RejectedExecutionHandler 是 Java 线程池的一个重要接口,用于定义当线程池已满并且无法接受新任务时,如何处理被拒绝的任务。当线程池的队列和线程都已满,新任务就会被拒绝执行,这时就会使用 RejectedExecutionHandler 来处理这些被拒绝的任务。
在 Java 中,有几种内置的 RejectedExecutionHandler 实现可供选择,每种实现都有不同的拒绝策略:
AbortPolicy(默认策略): 这是默认的拒绝策略,它会抛出一个 RejectedExecutionException 异常,表示任务被拒绝执行。
CallerRunsPolicy: 当线程池已满时,将任务返回给提交任务的调用者(Caller)。这意味着提交任务的线程会尝试执行被拒绝的任务。
DiscardPolicy: 这个策略会默默地丢弃被拒绝的任务,不会产生任何异常。
DiscardOldestPolicy: 这个策略会丢弃队列中最老的任务,然后尝试将新任务添加到队列中。除了这些内置的策略,你还可以实现自定义的 RejectedExecutionHandler 接口,以定义特定于你应用程序需求的拒绝策略。你可以根据业务需求来决定拒绝策略,比如记录日志、通知管理员、重试等。
@Configuration
public class ThreadPoolConfig {@Beanprivate ThreadPoolTaskExecutor execThreadPool() {ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();pool.setCorePoolSize(50); // 核心线程数pool.setMaxPoolSize(10000); // 最大线程数pool.setQueueCapacity(0); // 等待队列sizepool.setKeepAliveSeconds(60); // 线程最大空闲存活时间pool.setWaitForTasksToCompleteOnShutdown(true);pool.setAwaitTerminationSeconds(60); // 程序shutdown时最多等60秒钟让现存任务结束pool.setRejectedExecutionHandler(new CallerRunsPolicy()); // 拒绝策略return pool;}
}
通过以上方案,我们可以提高消费侧的TPS,同时杜绝重复上报的现象,极大提高数据准确性和用户体验。
相关文章:
Kafka系列之如何提高消费者消费速度
前言 在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速度可能决定系统性能瓶颈。 实现方案 为了提高消费者的消费速度,我们可以采取以下措施: 将主题的分区数量增大,如 20&…...
mac安装Whisper
Whisper 官方git https://github.com/openai/whisper?tabreadme-ov-file 基本上参考官方的安装流程 pip3 install -U openai-whisper pip3 install githttps://github.com/openai/whisper.git pip3 install --upgrade --no-deps --force-reinstall githttps://github.com/…...
Linux:进程概述(什么是进程、进程控制块PCB、并发与并行、进程的状态、进程的相关命令)
进程概述 (1)What(什么是进程) 程序:磁盘上的可执行文件,它占用磁盘、是一个静态概念 进程:程序执行之后的状态,占用CPU和内存,是一个动态概念;每一个进程都有一个对应的进程控制块…...
Unity UGUI 之 坐标转换
本文仅作学习笔记与交流,不作任何商业用途 本文包括但不限于unity官方手册,唐老狮,麦扣教程知识,引用会标记,如有不足还请斧正 本文在发布时间选用unity 2022.3.8稳定版本,请注意分别 前置知识:…...
使用 uPlot 在 Vue 中创建交互式图表
本文由ScriptEcho平台提供技术支持 项目地址:传送门 使用 uPlot 在 Vue 中创建交互式图表 应用场景介绍 uPlot 是一个轻量级、高性能的图表库,适用于创建各种交互式图表。它具有丰富的功能,包括可自定义的轴、网格、刻度和交互性。本篇博…...
SpringBoot 项目配置文件注释乱码的问题解决方案
一、问题描述 在项目的配置文件中,我们写了一些注释,如下所示: 但是再次打开注释会变成乱码,如下所示: 那么如何解决呢? 二、解决方案 1. 点击” File→Setting" 2. 搜索“File Encodings”, 将框…...
TTS如何正确读AI缩写、金额和数字
案例:Tell me whats AI(a i), you need pay $186.30, your card Number is 1 2 3, your work Number is 5 6 7 8...
python基础知识点(蓝桥杯python科目个人复习计划75)
第一题:ip补充 题目描述: 小蓝的ip地址为192.168.*.21,其中*是一个数字,请问这个数字最大可能是多少? import os import sys# 请在此输入您的代码 print("255") 第二题:出现最多的字符 题目描…...
小技巧:如何在已知PDF密码情况下去掉PDF的密码保护
第一步,用Edge打开你的pdf,输入密码进去 第二步,点击打印 第三步,选择导出PDF,选择彩印 第四步,选择导出位置,导出成功后打开发现没有密码限制了!...
Java泛型的介绍和基本使用
什么是泛型 泛型就是将类型参数化,比如定义了一个栈,你必须在定义之前声明这个栈中存放的数据的类型,是int也好是double或者其他的引用数据类型也好,定义好了之后这个栈就无法用来存放其他类型的数据。如果这时候我们想要使用这…...
【C++】动态内存管理与模版
目录 1、关键字new: 1、用法: 2、理解: 3、与malloc的相同与不同: 1、相同: 2、不同: 2、模版初阶: 1、函数模版: 1、概念: 2、关键字:template&…...
MongoDB - 组合聚合阶段:$group、$match、$limit、$sort、$skip、$project、$count
文章目录 1. $group2. $group-> $project2.1 $group2.2 $group-> $project2.3 SpringBoot 整合 MongoDB 3. $match-> $group -> $match3.1 $match3.2 $match-> $group3.3 $match-> $group-> $match3.4 SpringBoot 整合 MongoDB 4. $match-> $group->…...
vue element-ui日期控件传参
前端:Vue element-ui <el-form-item label"过期时间" :rules"[ { required: true, message: 请选择过期时间, trigger: blur }]"><el-date-picker v-model"form.expireTime" type"date" format"yyyy-MM-dd&…...
MacOS安装SDKMan管理Java版本
文章目录 1 简介2 安装与卸载2.1 安装2.2 卸载 3 使用3.1 查看其他工具:支持 Ant, Maven 等3.2 查看Java版本3.3 安装Java,加上相关的版本3.4 设置Java版本(全局)3.5 只在当前窗口生效3.6 卸载1 默认环境无法卸载 4 jdk安装的位置5 与IDEA集成参考 1 简介…...
【网络安全的神秘世界】文件包含漏洞
🌝博客主页:泥菩萨 💖专栏:Linux探索之旅 | 网络安全的神秘世界 | 专接本 | 每天学会一个渗透测试工具 一、概述 文件包含:重复使用的函数写在文件里,需要使用某个函数时直接调用此文件,而无需再…...
并发编程--volatile
1.什么是volatile volatile是 轻 量 级 的 synchronized,它在多 处 理器开 发 中保 证 了共享 变 量的 “ 可 见 性 ” 。可 见 性的意思是当一个 线 程 修改一个共享变 量 时 ,另外一个 线 程能 读 到 这 个修改的 值 。如果 volatile 变 量修 饰 符使用…...
记录unraid docker更新的域名
背景:级联 一、安装内容 unraid更新docker,之前一直失败,修改网络后可以进行安装。 二、查看域名 查看域名,发现是走github的,怪不得有一些docker无法正常更新 三、解决方法 更改代理,这里为unraid的…...
SpringCloud+Vue3多对多,多表联查
♥️作者:小宋1021 🤵♂️个人主页:小宋1021主页 ♥️坚持分析平时学习到的项目以及学习到的软件开发知识,和大家一起努力呀!!! 🎈🎈加油! 加油!…...
麒麟系统信创改造
麒麟系统信创改造 一、查看操作系统架构下载相应的依赖,压缩包1、查看Linux系统架构、CPU(1)uname -m(2)lscpu(3)cat /proc/cpuinfo(4)arch(5)getconf LONG_BIT(6)dmidecode2、根据Linux系统架构、CPU的差异进行下载相关依赖,看第二项二、以下是根据本系统的aarc…...
【Android】ListView和RecyclerView知识总结
文章目录 ListView步骤适配器AdpterArrayAdapterSimpleAdapterBaseAdpter效率问题 RecyclerView具体实现不同布局形式的设置横向滚动瀑布流网格 点击事件 ListView ListView 是 Android 中的一种视图组件,用于显示可滚动的垂直列表。每个列表项都是一个视图对象&…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
19c补丁后oracle属主变化,导致不能识别磁盘组
补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...
【OSG学习笔记】Day 18: 碰撞检测与物理交互
物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
五年级数学知识边界总结思考-下册
目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...
九天毕昇深度学习平台 | 如何安装库?
pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子: 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...
Modbus RTU与Modbus TCP详解指南
目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...
