kafka入门(三):kafka多线程消费
kafka消费积压
如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。
消费积压时,
(1) 可以增加Topic的分区数,并且增加消费组的消费者数量,让消费者数等于分区数。
(2) 还可以使用多线程消费,提高消费速度。
kafka多线程消费的代码:
public class ThirdMultiConsumerThreadDemo {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,Runtime.getRuntime().availableProcessors());consumerThread.start();}/**** kafka配置* @return*/public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}/*** kafka消费者线程*/public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties props, String topic, int threadNumber) {kafkaConsumer = new KafkaConsumer<>(props);kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records =kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {log.error("run error", e);} finally {kafkaConsumer.close();}}}/*** 处理消息*/public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {//处理records.for (ConsumerRecord<String, String> record : records) {System.out.println("==========>record:"+record.value() + ",thread:" + Thread.currentThread().getName());}}}}
发送消息后,使用多线程消息,运行结果如下:
==========>record:{"id":"1234","name":"lin"},thread:pool-1-thread-1
==========>record:{"id":"5678","name":"chen"},thread:pool-1-thread-2
==========>record:{"id":"91011","name":"wu"},thread:pool-1-thread-3
参考资料:
《深入理解Kafka:核心设计与实践原理》
相关文章:
kafka入门(三):kafka多线程消费
kafka消费积压 如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。 消费积压时, (1) 可以增加Topic的分区数,并且增加消费组的消费者数量&#…...
android通过广播打印RAM信息
通过广播打印ram相关log 参数说明: 广播:com.android.settings.action.RAM_INFO int型参数index:0 - 3h, 1 - 6h, 2 - 12h, 3 - 24h 代表过去时间app使用ram情况(平均/最大占用) Index: frameworks/base/services/cor…...
C++新经典模板与泛型编程:策略类模板
策略类模板 在前面的博文中,策略类SumPolicy和MinPolicy都是普通的类,其中包含的是一个静态成员函数模板algorithm(),该函数模板包含两个类型模板参数。其实,也可以把SumPolicy和MinPolicy类写成类模板—直接把algorithm()中的两…...
微信小程序引入Vant Weapp修改样式不起作用,使用外部样式类进行覆盖
一、引入Vant Weapp后样式问题 在项目中使用第三方组件修改css样式时,总是出现各种各样问题,修改的css样式不起作用,没有效果,效果不符合预期等。 栗子(引入一个搜索框组件)实现效果: 左侧有一个搜索文字背景为蓝色,接着跟一个搜索框 wxml <view class"container&q…...
python核酸检测 青少年电子学会等级考试 中小学生python编程等级考试二级真题答案解析2022年6月
目录 python核酸检测 一、题目要求 1、编程实现 2、输入输出...
搭建React项目,基于Vite+React+TS+ESLint+Prettier+Husky+Commitlint
基于ViteReactTSESLintPrettierHuskyCommitlint搭建React项目 node: 20.10.0 一、创建项目 安装包管理器pnpm npm i pnpm -g基于Vite创建项目 pnpm create vitelatest web-gis-react --template react-ts进入项目目录安装依赖 $ cd web-gis-react $ pnpm i启动项目 $ pnpm…...
ChatGPT在国内的使用限制,国内的ChatGPT替代工具
人工智能技术的发展不仅改变了我们的生活方式,也在各行各业发挥着越来越重要的作用。ChatGPT(Generative Pre-trained Transformer)作为一种先进的自然语言处理模型,由OpenAI推出,其在生成人类般流畅对话方面表现出色。…...
服务器如何保证数据安全_Maizyun
服务器如何保证数据安全 在当今的数字化时代,数据安全已经成为企业和社会组织必须面对的重要问题。服务器作为存储和处理大量数据的核心组件,必须采取有效的措施来确保数据的安全。本文将探讨服务器如何保证数据安全。 一、访问控制和身份认证 访问控…...
sql2005日志文件过大如何清理
由于安装的时候没有计划好空间,默认装在系统盘,而且又没有做自动备份、截断事务日志等,很快LDF文件就达到十几G,或者几十G ,此时就不得不处理了。 备份和计划就不说了,现在就说下怎么把它先删除吧…...
Linux--学习记录(2)
解压命令: gzip命令: 参数: -k:待压缩的文件会保留下来,生成一个新的压缩文件-d:解压压缩文件语法: gzip -k pathname(待压缩的文件夹名)gzip -kd name.gz(待解压的压缩包名&#x…...
字符串函数`strlen`、`strcpy`、`strcmp`、`strstr`、`strcat`的使用以及模拟实现
文章目录 🚀前言🚀库函数strlen✈️strlen的模拟实现 🚀库函数strcpy✈️strcpy的模拟实现 🚀strcmp✈️strcmp的模拟实现 🚀strstr✈️strstr的模拟实现 🚀strcat✈️strcat的模拟实现 🚀前言 …...
插入排序与希尔排序(C语言实现)
1.插入排序 由上面的动图可以知道插入排序的逻辑就是从第一个元素开始往后遍历,如果找到比前一个元素小的(或者大的)就往前排,所以插入排序的每一次遍历都会保证前面的数据是有序的,接下类用代码进行讲解。 我们这里传…...
【微软技术栈】与其他.NET语言的互操作性 (C++/CLI)
本文内容 使用 C# 索引器实现 C# 的 is 和 as 关键字实现 C# 的 lock 关键字 本节中的主题介绍如何在 Visual C 中创建程序集,这些程序集使用或提供以 C# 或 Visual Basic 编写的程序集的功能。 1、使用 C# 索引器 Visual C 不包含索引器;它具有索引…...
TCPUDP使用场景讨论
将链路从TCP改为UDP会对通信链路产生以下影响和注意事项: 可靠性:UDP是无连接的协议,与TCP相比,它不提供可靠性保证和重传机制。因此,当将链路从TCP改为UDP时,通信的可靠性会降低。如果在通信过程中丢失了U…...
C#最小二乘法线性回归
文章目录 SimpleRegressionMultipleRegression MathNet系列:矩阵生成 \quad 矩阵计算 LinearRegression是MathNet的线性回归模块,主要包括SimpleRegression和MultipleRegression这两个静态类,前者提供了最小二乘法的线性拟合,后…...
ULAM公链第九十六期工作总结
迈入12月,接下来就是雪花,圣诞,新年和更好的我们!愿生活不拥挤,笑容不必刻意,愿一切美好如期而至! 2023年11月01日—2023年12月01日关于ULAM这期工作汇报,我们通过技术板块ÿ…...
基于Echarts的大数据可视化模板:智慧交通管理
目录 引言智慧交通管理的重要性ECharts在智慧交通中的作用智慧交通管理系统架构系统总体架构数据收集与处理Echarts与大数据可视化Echarts库以及其在大数据可视化领域的应用优势开发过程和所选设计方案模板如何满足管理的特定需求模板功能与特性深入解析模板提供的各项功能模板…...
C#-快速剖析文件和流,并使用
目录 一、概述 二、文件系统 1、检查驱动器信息 2、Path 3、文件和文件夹 三、流 1、FileStream 2、StreamWriter与StreamReader 3、BinaryWriter与BinaryReader 一、概述 文件,具有永久存储及特定顺序的字节组成的一个有序、具有名称的集合; …...
【Linux】如何在Ubuntu 20.04上安装PostgreSQL
介绍 PostgreSQL或Postgres是一个关系数据库管理系统,提供SQL查询语言的实现。它符合标准,具有许多高级功能,如可靠的事务和无读锁的并发性。 本指南演示了如何在Ubuntu 20.04服务器上快速启动和运行Postgres,从安装PostgreSQL到…...
IT程序员面试题目汇总及答案-计算机面试
程序员面试题目汇总及答案-计算机面试 问题1:请你描述一下你在过去的工作中遇到的一个技术难题,你是如何解决的? 答案1:在我之前的工作中,我遇到了一个涉及大数据处理的问题。由于数据量巨大,传统的处理方法无法在规定的时间内完成。我最后采用了一种分布式计算的方法,…...
ComfyUI-WanVideoWrapper显存优化终极指南:让8GB显卡也能流畅生成高清视频
ComfyUI-WanVideoWrapper显存优化终极指南:让8GB显卡也能流畅生成高清视频 【免费下载链接】ComfyUI-WanVideoWrapper 项目地址: https://gitcode.com/GitHub_Trending/co/ComfyUI-WanVideoWrapper 还在为视频生成时的显存不足而烦恼吗?ComfyUI-…...
AI时代:重塑核心竞争力
一、企业的核心竞争力重塑未来企业的护城河是AI构建的流程,而不是的数据。 过去我们说数据是石油,但在 LLM 时代,通用数据的价值在被快速拉平。而公司内部独特的、经过千锤百炼的工作流程、决策逻辑、操作手册,这些才是无法被轻易…...
Qwen3-0.6B-FP8与STM32开发联动:生成嵌入式系统控制逻辑伪代码
Qwen3-0.6B-FP8与STM32开发联动:生成嵌入式系统控制逻辑伪代码 1. 引言 如果你是一位嵌入式开发者,或者正在学习STM32,下面这个场景你一定不陌生:拿到一个传感器模块,比如温湿度传感器,想用它来控制一个风…...
Qwen3.5-9B镜像免配置实战:Docker化迁移与端口映射最佳实践
Qwen3.5-9B镜像免配置实战:Docker化迁移与端口映射最佳实践 1. 项目概述 Qwen3.5-9B是一个拥有90亿参数的开源大语言模型,具备强大的逻辑推理、代码生成和多轮对话能力。该模型支持多模态理解(图文输入)和长上下文处理ÿ…...
NVIDIA Profile Inspector实战手册:从参数调试到显卡性能全面优化
NVIDIA Profile Inspector实战手册:从参数调试到显卡性能全面优化 【免费下载链接】nvidiaProfileInspector 项目地址: https://gitcode.com/gh_mirrors/nv/nvidiaProfileInspector 在PC硬件优化领域,专业工具与普通用户之间往往存在技术鸿沟。N…...
Calypso vs PC-DMIS:三坐标两大软件脱机编程实战对比与选型指南
Calypso vs PC-DMIS:三坐标测量软件脱机编程深度对比与实战选型策略 在精密制造领域,三坐标测量机(CMM)的脱机编程能力直接决定了检测效率与资源利用率。作为行业两大标杆,蔡司Calypso与海克斯康PC-DMIS在用户界面设计、编程逻辑、仿真验证等…...
RobotStudio新手必看:5分钟搞定夹取工件程序(附完整代码)
RobotStudio零基础实战:从夹取工件到高效编程的完整指南 第一次打开RobotStudio时,面对复杂的界面和陌生的术语,很多新手会感到无从下手。但别担心,掌握几个核心概念和操作步骤,你就能快速实现基础的夹取工件功能。本文…...
告别漫长等待:用EDGS(3DGS优化版)快速重建你的3D场景(附Ubuntu 22.04+PyTorch 2.0配置)
极速三维重建实战:EDGS技术解析与Ubuntu高效配置指南 当传统3D高斯喷溅技术(3DGS)还在以小时为单位计算训练时间时,EDGS已经将这一过程压缩到令人惊讶的分钟级。这就像从绿皮火车换乘复兴号高铁的体验升级——不仅速度更快&#x…...
解码汽车ECU的“健康档案”:剖析吉利Basetech五大运行周期计数器(OCC)的协同诊断逻辑
1. 汽车ECU的“健康档案”是什么? 当你去医院体检时,医生会查看你的病历记录、化验报告和近期症状,综合判断你的健康状况。汽车ECU(电子控制单元)也有类似的"健康档案",它就是吉利Basetech技术中…...
嵌入式Linux C++开发框架AppKit实战解析
1. 嵌入式Linux C开发框架AppKit深度解析在嵌入式Linux开发领域,C开发者经常面临一个尴尬局面:标准库功能有限,而ROS等框架又过于庞大。AppKit框架正是为解决这一痛点而生,它提供了恰到好处的中间层抽象。我在多个工业控制项目中实…...
