当前位置: 首页 > news >正文

kafka入门(三):kafka多线程消费

kafka消费积压

如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。

消费积压时,

(1) 可以增加Topic的分区数,并且增加消费组的消费者数量,让消费者数等于分区数。
(2) 还可以使用多线程消费,提高消费速度。

kafka多线程消费的代码:

public class ThirdMultiConsumerThreadDemo {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,Runtime.getRuntime().availableProcessors());consumerThread.start();}/**** kafka配置* @return*/public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}/*** kafka消费者线程*/public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties props, String topic, int threadNumber) {kafkaConsumer = new KafkaConsumer<>(props);kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records =kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {log.error("run error", e);} finally {kafkaConsumer.close();}}}/*** 处理消息*/public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {//处理records.for (ConsumerRecord<String, String> record : records) {System.out.println("==========>record:"+record.value() + ",thread:" + Thread.currentThread().getName());}}}}

发送消息后,使用多线程消息,运行结果如下:

==========>record:{"id":"1234","name":"lin"},thread:pool-1-thread-1
==========>record:{"id":"5678","name":"chen"},thread:pool-1-thread-2
==========>record:{"id":"91011","name":"wu"},thread:pool-1-thread-3

参考资料:

《深入理解Kafka:核心设计与实践原理》

相关文章:

kafka入门(三):kafka多线程消费

kafka消费积压 如果生产者发送消息的速度过快&#xff0c;或者是消费者处理消息的速度太慢&#xff0c;那么就会有越来越多的消息无法及时消费&#xff0c;也就是消费积压。 消费积压时&#xff0c; (1) 可以增加Topic的分区数&#xff0c;并且增加消费组的消费者数量&#…...

android通过广播打印RAM信息

通过广播打印ram相关log 参数说明&#xff1a; 广播&#xff1a;com.android.settings.action.RAM_INFO int型参数index&#xff1a;0 - 3h, 1 - 6h, 2 - 12h, 3 - 24h 代表过去时间app使用ram情况&#xff08;平均/最大占用&#xff09; Index: frameworks/base/services/cor…...

C++新经典模板与泛型编程:策略类模板

策略类模板 在前面的博文中&#xff0c;策略类SumPolicy和MinPolicy都是普通的类&#xff0c;其中包含的是一个静态成员函数模板algorithm()&#xff0c;该函数模板包含两个类型模板参数。其实&#xff0c;也可以把SumPolicy和MinPolicy类写成类模板—直接把algorithm()中的两…...

微信小程序引入Vant Weapp修改样式不起作用,使用外部样式类进行覆盖

一、引入Vant Weapp后样式问题 在项目中使用第三方组件修改css样式时,总是出现各种各样问题,修改的css样式不起作用,没有效果,效果不符合预期等。 栗子(引入一个搜索框组件)实现效果: 左侧有一个搜索文字背景为蓝色,接着跟一个搜索框 wxml <view class"container&q…...

python核酸检测 青少年电子学会等级考试 中小学生python编程等级考试二级真题答案解析2022年6月

目录 python核酸检测 一、题目要求 1、编程实现 2、输入输出...

搭建React项目,基于Vite+React+TS+ESLint+Prettier+Husky+Commitlint

基于ViteReactTSESLintPrettierHuskyCommitlint搭建React项目 node: 20.10.0 一、创建项目 安装包管理器pnpm npm i pnpm -g基于Vite创建项目 pnpm create vitelatest web-gis-react --template react-ts进入项目目录安装依赖 $ cd web-gis-react $ pnpm i启动项目 $ pnpm…...

ChatGPT在国内的使用限制,国内的ChatGPT替代工具

人工智能技术的发展不仅改变了我们的生活方式&#xff0c;也在各行各业发挥着越来越重要的作用。ChatGPT&#xff08;Generative Pre-trained Transformer&#xff09;作为一种先进的自然语言处理模型&#xff0c;由OpenAI推出&#xff0c;其在生成人类般流畅对话方面表现出色。…...

服务器如何保证数据安全_Maizyun

服务器如何保证数据安全 在当今的数字化时代&#xff0c;数据安全已经成为企业和社会组织必须面对的重要问题。服务器作为存储和处理大量数据的核心组件&#xff0c;必须采取有效的措施来确保数据的安全。本文将探讨服务器如何保证数据安全。 一、访问控制和身份认证 访问控…...

sql2005日志文件过大如何清理

由于安装的时候没有计划好空间&#xff0c;默认装在系统盘&#xff0c;而且又没有做自动备份、截断事务日志等&#xff0c;很快LDF文件就达到十几G&#xff0c;或者几十G &#xff0c;此时就不得不处理了。 备份和计划就不说了&#xff0c;现在就说下怎么把它先删除吧&#xf…...

Linux--学习记录(2)

解压命令&#xff1a; gzip命令&#xff1a; 参数&#xff1a; -k&#xff1a;待压缩的文件会保留下来&#xff0c;生成一个新的压缩文件-d&#xff1a;解压压缩文件语法&#xff1a; gzip -k pathname(待压缩的文件夹名)gzip -kd name.gz&#xff08;待解压的压缩包名&#x…...

字符串函数`strlen`、`strcpy`、`strcmp`、`strstr`、`strcat`的使用以及模拟实现

文章目录 &#x1f680;前言&#x1f680;库函数strlen✈️strlen的模拟实现 &#x1f680;库函数strcpy✈️strcpy的模拟实现 &#x1f680;strcmp✈️strcmp的模拟实现 &#x1f680;strstr✈️strstr的模拟实现 &#x1f680;strcat✈️strcat的模拟实现 &#x1f680;前言 …...

插入排序与希尔排序(C语言实现)

1.插入排序 由上面的动图可以知道插入排序的逻辑就是从第一个元素开始往后遍历&#xff0c;如果找到比前一个元素小的&#xff08;或者大的&#xff09;就往前排&#xff0c;所以插入排序的每一次遍历都会保证前面的数据是有序的&#xff0c;接下类用代码进行讲解。 我们这里传…...

【微软技术栈】与其他.NET语言的互操作性 (C++/CLI)

本文内容 使用 C# 索引器实现 C# 的 is 和 as 关键字实现 C# 的 lock 关键字 本节中的主题介绍如何在 Visual C 中创建程序集&#xff0c;这些程序集使用或提供以 C# 或 Visual Basic 编写的程序集的功能。 1、使用 C# 索引器 Visual C 不包含索引器&#xff1b;它具有索引…...

TCPUDP使用场景讨论

将链路从TCP改为UDP会对通信链路产生以下影响和注意事项&#xff1a; 可靠性&#xff1a;UDP是无连接的协议&#xff0c;与TCP相比&#xff0c;它不提供可靠性保证和重传机制。因此&#xff0c;当将链路从TCP改为UDP时&#xff0c;通信的可靠性会降低。如果在通信过程中丢失了U…...

C#最小二乘法线性回归

文章目录 SimpleRegressionMultipleRegression MathNet系列&#xff1a;矩阵生成 \quad 矩阵计算 LinearRegression是MathNet的线性回归模块&#xff0c;主要包括SimpleRegression和MultipleRegression这两个静态类&#xff0c;前者提供了最小二乘法的线性拟合&#xff0c;后…...

ULAM公链第九十六期工作总结

迈入12月&#xff0c;接下来就是雪花&#xff0c;圣诞&#xff0c;新年和更好的我们&#xff01;愿生活不拥挤&#xff0c;笑容不必刻意&#xff0c;愿一切美好如期而至&#xff01; 2023年11月01日—2023年12月01日关于ULAM这期工作汇报&#xff0c;我们通过技术板块&#xff…...

基于Echarts的大数据可视化模板:智慧交通管理

目录 引言智慧交通管理的重要性ECharts在智慧交通中的作用智慧交通管理系统架构系统总体架构数据收集与处理Echarts与大数据可视化Echarts库以及其在大数据可视化领域的应用优势开发过程和所选设计方案模板如何满足管理的特定需求模板功能与特性深入解析模板提供的各项功能模板…...

C#-快速剖析文件和流,并使用

目录 一、概述 二、文件系统 1、检查驱动器信息 2、Path 3、文件和文件夹 三、流 1、FileStream 2、StreamWriter与StreamReader 3、BinaryWriter与BinaryReader 一、概述 文件&#xff0c;具有永久存储及特定顺序的字节组成的一个有序、具有名称的集合&#xff1b; …...

【Linux】如何在Ubuntu 20.04上安装PostgreSQL

介绍 PostgreSQL或Postgres是一个关系数据库管理系统&#xff0c;提供SQL查询语言的实现。它符合标准&#xff0c;具有许多高级功能&#xff0c;如可靠的事务和无读锁的并发性。 本指南演示了如何在Ubuntu 20.04服务器上快速启动和运行Postgres&#xff0c;从安装PostgreSQL到…...

IT程序员面试题目汇总及答案-计算机面试

程序员面试题目汇总及答案-计算机面试 问题1:请你描述一下你在过去的工作中遇到的一个技术难题,你是如何解决的? 答案1:在我之前的工作中,我遇到了一个涉及大数据处理的问题。由于数据量巨大,传统的处理方法无法在规定的时间内完成。我最后采用了一种分布式计算的方法,…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩

目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)

引言 工欲善其事&#xff0c;必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后&#xff0c;我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集&#xff0c;就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1&#xff1a;通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分&#xff0c;设置 Gradle JDK 方法2&#xff1a;通过 Settings File → Settings... (或 CtrlAltS)…...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 &#xff0c;不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源&#xff08;最常用&#xff09; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...

客户案例 | 短视频点播企业海外视频加速与成本优化:MediaPackage+Cloudfront 技术重构实践

01技术背景与业务挑战 某短视频点播企业深耕国内用户市场&#xff0c;但其后台应用系统部署于东南亚印尼 IDC 机房。 随着业务规模扩大&#xff0c;传统架构已较难满足当前企业发展的需求&#xff0c;企业面临着三重挑战&#xff1a; ① 业务&#xff1a;国内用户访问海外服…...

ArcPy扩展模块的使用(3)

管理工程项目 arcpy.mp模块允许用户管理布局、地图、报表、文件夹连接、视图等工程项目。例如&#xff0c;可以更新、修复或替换图层数据源&#xff0c;修改图层的符号系统&#xff0c;甚至自动在线执行共享要托管在组织中的工程项。 以下代码展示了如何更新图层的数据源&…...

从实验室到产业:IndexTTS 在六大核心场景的落地实践

一、内容创作&#xff1a;重构数字内容生产范式 在短视频创作领域&#xff0c;IndexTTS 的语音克隆技术彻底改变了配音流程。B 站 UP 主通过 5 秒参考音频即可克隆出郭老师音色&#xff0c;生成的 “各位吴彦祖们大家好” 语音相似度达 97%&#xff0c;单条视频播放量突破百万…...

如何把工业通信协议转换成http websocket

1.现状 工业通信协议多数工作在边缘设备上&#xff0c;比如&#xff1a;PLC、IOT盒子等。上层业务系统需要根据不同的工业协议做对应开发&#xff0c;当设备上用的是modbus从站时&#xff0c;采集设备数据需要开发modbus主站&#xff1b;当设备上用的是西门子PN协议时&#xf…...

2025.6.9总结(利与弊)

凡事都有两面性。在大厂上班也不例外。今天找开发定位问题&#xff0c;从一个接口人不断溯源到另一个 接口人。有时候&#xff0c;不知道是谁的责任填。将工作内容分的很细&#xff0c;每个人负责其中的一小块。我清楚的意识到&#xff0c;自己就是个可以随时替换的螺丝钉&…...