当前位置: 首页 > news >正文

kafka消费数据太慢了,给优化下

原代码

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1024"); // 最小获取1KB的数据props.put("fetch.max.wait.ms", "500"); // 最大等待500ms// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息; 如果消息处理逻辑允许,可以批量处理消息,而不是逐条处理while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));if (!records.isEmpty()) {for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 批量提交偏移量consumer.commitSync();}}}}
}  

在 Kafka 消费者代码中,有几个地方可以进行优化以提高消费速度:

增加消费者线程:你已经增加了消费者的数量,这是一个很好的开始。确保每个消费者线程都在不同的分区上消费,以实现并行处理。

调整 fetch.min.bytes 和 fetch.max.wait.ms:这两个参数控制了消费者从 Kafka
服务器获取数据的最小字节数和最大等待时间。如果你的消息较小,增加 fetch.min.bytes
可能会导致消费者等待更长时间以收集足够的数据,这可能会降低消费速度。如果你的消息较大,可以适当增加这个值。

异步提交偏移量:你当前使用的是 consumer.commitSync() 进行同步提交偏移量,这会阻塞当前线程直到提交完成。可以考虑使用
consumer.commitAsync() 异步提交,这样可以不阻塞消费线程。

增加 max.poll.records:poll
方法有一个参数可以设置一次拉取的最大记录数。如果设置得太低,可能会导致频繁的网络请求,增加延迟。

合理设置 auto.commit.interval.ms:自动提交偏移量的间隔设置得太短可能会导致不必要的性能开销,可以适当增加这个值。

使用多线程处理消息:如果你的消息处理逻辑是 CPU 密集型的,可以考虑使用线程池来并行处理消息。

监控和调优:使用 Kafka 的监控工具来观察消费者的行为,找出瓶颈并进行调优。

下面是根据上述建议优化后的代码示例:

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1"); // 减少最小获取字节数props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {processRecords(records); // 异步处理消息consumer.commitAsync(); // 异步提交偏移量}}}private void processRecords(ConsumerRecords<String, String> records) {// 异步处理消息的逻辑for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 这里可以添加消息处理逻辑,例如使用线程池并行处理}}}
}

相关文章:

kafka消费数据太慢了,给优化下

原代码 public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers 5; // 增加消费者的数量for (int i 0; i < numConsumers; i) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runn…...

ASUS/华硕灵耀X双屏Pro UX8402Z 原厂Win11-22H2系统 工厂文件 带ASUS Recovery恢复

华硕工厂文件恢复系统 &#xff0c;安装结束后带隐藏分区&#xff0c;一键恢复&#xff0c;以及机器所有驱动软件。 系统版本&#xff1a;windows11 原厂系统下载网址&#xff1a;http://www.bioxt.cn 需准备一个20G以上u盘进行恢复 请注意&#xff1a;仅支持以上型号专用…...

【含开题报告+文档+PPT+源码】基于springboot的毕业设计选题管理系统

开题报告 毕业设计选题作为高校教学环节中的重要一环&#xff0c;其选题质量和管理效率直接关系到学生毕业设计的质量和毕业要求的达成。然而&#xff0c;传统的选题管理方式往往存在信息不对称、流程繁琐、效率低下等问题&#xff0c;无法满足高校教学管理现代化、信息化的需…...

fastadmin常用操作

数据库中遇到的操作 查询字段是json的某个值 $map[json_extract(goods, "$.brand_id")] (int)$params[brand_id]; //获取数据库中某个字段是json中得某个值&#xff0c;进行查询&#xff0c;goods是表中字段&#xff0c;brand_id是json中要查詢的字段。数据类型一定…...

IPguard与Ping32:谁是企业数据防泄密的最佳选择?

在当前信息化快速发展的背景下&#xff0c;企业数据安全已成为公司运营中最重要的议题之一。为了防止数据泄漏&#xff0c;越来越多的企业开始依赖专业的加密软件来进行防护。今天&#xff0c;我们对比了两款业内领先的加密软件——IPguard和Ping32&#xff0c;帮助您选择最适合…...

C++20新特性的补充讲解

C20 标志着 C 语言的一次重要更新&#xff0c;除了 Concepts、Ranges、协程等被广泛讨论的特性外&#xff0c;还有许多值得注意的改进。本文将详细探讨其他一些核心新特性&#xff0c;包括 constexpr 扩展、新增的 std::format、std::span、std::bit 操作、原子智能指针、char8…...

uni-app移动端与PC端兼容预览PDF文件

过程遇到的问题 1、如果用的是最新的版本的pdfjs的话&#xff0c;就会报Promise.withResolvers 不是一个方法的错误&#xff0c;原因是Promise.withResolvers是ES15新特性&#xff0c;想了解可参考链接&#xff0c;这里的解决方案是将插件里的涉及到Promise.withResolvers的地…...

Elman 神经网络算法详解

Elman 神经网络算法详解 一、引言 Elman 神经网络作为一种经典的递归神经网络&#xff08;RNN&#xff09;&#xff0c;在处理动态系统和时间序列数据方面具有独特的优势。它通过特殊的结构设计&#xff0c;能够有效地捕捉数据中的时间依赖关系&#xff0c;在语音识别、自然语…...

卓胜微嵌入式面试题及参考答案(2万字长文)

freeRTOS 任务是怎么调度的? 在 freeRTOS 中,任务调度主要是基于优先级的抢占式调度。每个任务都有一个优先级,系统会根据任务的优先级来决定哪个任务获得 CPU 的使用权。 当一个高优先级的任务准备运行,并且当前运行的任务优先级较低时,高优先级任务会抢占 CPU。例如,假…...

【Python】爬虫使用代理IP

1、代理池 IP 代理池可以理解为一个池子&#xff0c;里面装了很多代理IP。 池子里的IP是有生命周期的&#xff0c;它们将被定期验证&#xff0c;其中失效的将被从池子里面剔除池子里的ip是有补充渠道的&#xff0c;会有新的代理ip不断被加入池子中池子中的代理ip是可以被随机…...

金融机构-业务架构方案(高光版)

一、金融机构的设计架构 首先视角很重要,比如这样的战略视角,站得高、看得远。设计业务架构,一定要有战略高度和前瞻性。 二、什么样的架构更适合你们公司呢? 三、从架构着手,进行产品和服务创新性变革 四、具体如何设计业务架构呢?...

ubuntu内核切换network unclaimed 网卡丢失

现象一、 查网络的时候 提示只有lo network unclaimed wifi 本地局域网全部丢失 显卡丢失 解决思路 首先查看了 网卡类型 sudo lshw -C network 会显示使用的网卡 然后把这个网卡 去到realtek的官网去找驱动 驱动下下来发现debug提示 没有build目录 /libs/modules/6.8…...

【人工智能】揭秘可解释性AI(XAI):从原理到实战的终极指南

文章目录 开篇&#xff1a;AI的黑箱时代&#xff0c;你准备好揭开真相了吗&#xff1f;&#x1f50d;什么是可解释性AI&#xff08;XAI&#xff09;&#xff1f;XAI的定义XAI的分类 可解释性AI的重要性与价值建立用户信任遵循法规和伦理发现和纠正模型偏见提高模型性能促进跨领…...

小面馆叫号取餐流程 佳易王面馆米线店点餐叫号管理系统操作教程

一、概述 【软件资源文件下载在文章最后】 小面馆叫号取餐流程 佳易王面馆米线店点餐叫号管理系统操作教程 点餐软件以其实用的功能和简便的操作&#xff0c;为小型餐饮店提供了高效的点餐管理解决方案&#xff0c;提高了工作效率和服务质量 ‌点餐管理‌&#xff1a;支持电…...

图形 2.6 伽马校正

伽马校正 B站视频&#xff1a;图形 2.6 伽马校正 文章目录 伽马校正颜色空间传递函数 Gamma校正校正过程为什么需要校正&#xff1f;CRT与转换函数 为什么sRGB在Gamma 0.45空间&#xff1f; 人对亮度的敏感韦伯定律中灰值 线性工作流不在线性空间下进行渲染的问题统一到线性空…...

LLM - 计算 多模态大语言模型 的参数量(Qwen2-VL、Llama-3.1) 教程

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/143749468 免责声明&#xff1a;本文来源于个人知识与公开资料&#xff0c;仅用于学术交流&#xff0c;欢迎讨论&#xff0c;不支持转载。 影响 (…...

数据可视化这样做,汇报轻松拿捏(附免费好用可视化工具推荐)

一、数据可视化的定义 数据可视化是数据分析中重要的工作之一。在完成数据采集之后&#xff0c;通过可视化方式&#xff0c;将数据转化为美观且浅显易懂的统计图/表/视频&#xff0c;从而进一步解读数据背后隐藏的价值&#xff0c;这种方数据处理方式就叫做数据可视化。近些年…...

杂七杂八之基于JSON Web Token (JWT) 进行API认证和鉴权(Java版)

杂七杂八之基于JSON Web Token (JWT) 进行API认证和鉴权&#xff08;Java版&#xff09; 在现代Web应用和API开发中&#xff0c;JSON Web Token (JWT) 是一种广泛使用的认证和鉴权机制。JWT不仅简化了认证流程&#xff0c;还提供了安全的令牌传递方式&#xff0c;使得跨域认证…...

建设展示型网站企业渠道用户递达

展示型网站的主要作用便是作为企业线上门户平台、信息承载形式、拓客咨询窗口、服务/产品宣传订购、其它内容/个人形式呈现等&#xff0c;网站发展多年&#xff0c;现在依然是企业线上发展的主要工具之一且有建设的必要性。 谈及整体价格&#xff0c;自制、定制开发、SAAS系统…...

如何通过AB测试找到最适合的Yandex广告内容

想要在Yandex上找到最能吸引目标受众的广告内容&#xff0c;A/B测试是一个不可或缺的步骤。通过对比不同版本的广告&#xff0c;我们可以发现哪些元素最能引起用户的共鸣。首先&#xff0c;设计两个或多个广告版本&#xff0c;确保每个版本在标题、文案、图片等关键元素上有所不…...

测试微信模版消息推送

进入“开发接口管理”--“公众平台测试账号”&#xff0c;无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息&#xff1a; 关注测试号&#xff1a;扫二维码关注测试号。 发送模版消息&#xff1a; import requests da…...

python打卡day49

知识点回顾&#xff1a; 通道注意力模块复习空间注意力模块CBAM的定义 作业&#xff1a;尝试对今天的模型检查参数数目&#xff0c;并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

<6>-MySQL表的增删查改

目录 一&#xff0c;create&#xff08;创建表&#xff09; 二&#xff0c;retrieve&#xff08;查询表&#xff09; 1&#xff0c;select列 2&#xff0c;where条件 三&#xff0c;update&#xff08;更新表&#xff09; 四&#xff0c;delete&#xff08;删除表&#xf…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

全球首个30米分辨率湿地数据集(2000—2022)

数据简介 今天我们分享的数据是全球30米分辨率湿地数据集&#xff0c;包含8种湿地亚类&#xff0c;该数据以0.5X0.5的瓦片存储&#xff0c;我们整理了所有属于中国的瓦片名称与其对应省份&#xff0c;方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!

简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求&#xff0c;并检查收到的响应。它以以下模式之一…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...