当前位置: 首页 > article >正文

云原生时代 Kafka 深度实践:05性能调优与场景实战

5.1 性能调优全攻略

Producer调优

批量发送与延迟发送

通过调整batch.sizelinger.ms参数提升吞吐量:

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  // 默认16KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);      // 等待10ms以积累更多消息
  • batch.size:批量发送的字节数,达到该大小或linger.ms超时即发送。
  • linger.ms:消息在缓冲区的最大停留时间,即使未达到batch.size也会发送。
压缩算法选择

启用压缩可显著减少网络传输和磁盘存储开销:

props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");  // 可选:gzip、snappy、lz4、zstd
  • Snappy:压缩速度快,压缩比适中。
  • LZ4:压缩比和速度平衡,推荐大多数场景。
  • ZSTD:压缩比最高,但CPU开销较大。

Broker调优

内存与线程配置

调整Broker的网络和IO线程池大小:

# server.properties
num.network.threads=8    # 网络处理线程数,默认3
num.io.threads=16        # IO处理线程数,默认8
socket.send.buffer.bytes=102400  # 发送缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400  # 接收缓冲区大小,默认100KB
磁盘与日志管理

优化日志存储和清理策略:

# 日志段滚动大小,默认1GB
log.segment.bytes=536870912  # 日志保留时间,默认7天
log.retention.hours=168  # 日志清理策略:delete(按时间删除)或compact(按key压缩)
log.cleanup.policy=delete  # 后台日志清理线程数
log.cleaner.threads=2  

Consumer调优

并行消费与反序列化优化

增加Consumer实例数或使用多线程消费:

// 增加Consumer Group中的Consumer数量,实现分区级并行
KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
consumer1.subscribe(Collections.singletonList("topic"));
consumer2.subscribe(Collections.singletonList("topic"));// 或在单个Consumer中使用多线程处理消息
ExecutorService executor = Executors.newFixedThreadPool(10);
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {executor.submit(() -> process(record));}
}

使用高效的序列化格式(如Protobuf替代JSON):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufSerializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufDeserializer.class.getName());

5.2 实战场景模拟

场景一:高并发日志采集(每秒10W+消息写入)

架构设计
  • Topic配置:创建100个分区的Topic,利用多分区并行写入提升吞吐量。
  • Producer配置
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);    // 32KB批次
    props.put(ProducerConfig.LINGER_MS_CONFIG, 5);        // 5ms延迟
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
    props.put(ProducerConfig.ACKS_CONFIG, "1");           // 牺牲部分可靠性换取高吞吐量
    
  • Broker配置
    num.partitions=100                   # 默认分区数
    log.flush.interval.messages=100000   # 每10W条消息刷盘一次
    log.flush.interval.ms=10000          # 每10秒刷盘一次
    
性能测试

使用kafka-producer-perf-test.sh工具测试写入性能:

bin/kafka-producer-perf-test.sh --topic log-topic --num-records 10000000 \--record-size 100 --throughput -1 --producer-props bootstrap.servers=localhost:9092

场景二:实时数据分析(电商实时大屏)

数据流设计
  1. 数据源:用户浏览、下单、支付等行为数据实时写入Kafka。
  2. 流处理:Kafka Streams计算实时指标(如UV、GMV、转化率):
KStream<String, String> userEvents = builder.stream("user-events-topic");
KTable<Windowed<String>, Long> hourlyUV = userEvents.selectKey((key, value) -> value.getUserId()).groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1))).count(Materialized.as("hourly-uv-store"));hourlyUV.toStream().map((windowedKey, count) -> new KeyValue<>(windowedKey.key(), count)).to("hourly-uv-topic", Produced.with(Serdes.String(), Serdes.Long()));
  1. 结果存储:计算结果写入Redis,供前端大屏实时查询。
性能优化
  • Kafka配置
    # 减少消息延迟
    queued.max.requests=1000
    replica.lag.time.max.ms=30000
    
  • Kafka Streams配置
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);  // 10MB缓存
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);  // 1秒提交一次
    

场景三:金融级数据一致性(事务消息实现分布式事务)

架构设计
  1. 订单服务:接收用户订单请求,发送订单创建消息到Kafka。
  2. 库存服务:消费订单消息,扣减库存,发送库存扣减结果。
  3. 支付服务:消费库存扣减结果,处理支付,发送支付结果。
事务消息实现
// 初始化事务
producer.initTransactions();try {producer.beginTransaction();// 发送订单创建消息producer.send(new ProducerRecord<>("order-topic", orderId, order));// 执行本地事务(如更新订单状态)orderService.updateOrderStatus(orderId, "PROCESSING");// 提交事务producer.commitTransaction();
} catch (Exception e) {// 回滚事务producer.abortTransaction();
}
幂等性保障

消费端通过唯一ID去重,确保同一消息只处理一次:

@KafkaListener(topics = "inventory-topic")
public void processInventory(InventoryMessage message) {// 检查是否已处理过if (inventoryService.isProcessed(message.getId())) {return;}// 处理库存扣减inventoryService.decreaseStock(message.getProductId(), message.getQuantity());// 标记为已处理inventoryService.markAsProcessed(message.getId());
}

相关文章:

云原生时代 Kafka 深度实践:05性能调优与场景实战

5.1 性能调优全攻略 Producer调优 批量发送与延迟发送 通过调整batch.size和linger.ms参数提升吞吐量&#xff1a; props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 默认16KB props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待10ms以积累更多消息ba…...

Ubuntu安装Docker命令清单(以20.04为例)

在你虚拟机上完成Ubuntu的下载后打开终端&#xff01;&#xff01;&#xff01; Ubuntu安装Docker终极命令清单&#xff08;以20.04为例&#xff09; # 1. 卸载旧版本&#xff08;全新系统可跳过&#xff09; sudo apt-get remove docker docker-engine docker.io containerd …...

使用 Python 制作 GIF 动图,并打包为 EXE 可执行程序

文章目录 成品百度网盘下载&#x1f3ac; 使用 Python 制作 GIF 动图&#xff0c;并打包为 EXE 可执行程序&#xff08;含图形界面&#xff09;&#x1f9f0; 环境准备&#x1f4bb; 功能预览&#x1f9d1;‍&#x1f4bb; 完整代码&#xff08;图形界面 功能&#xff09;如何…...

HarmonyOS Next 弹窗系列教程(2)

HarmonyOS Next 弹窗系列教程&#xff08;2&#xff09; 上一章节我们讲了自定义弹出框 (openCustomDialog)&#xff0c;那对于一些简单的业务场景&#xff0c;不一定需要都是自定义&#xff0c;也可以使用 HarmonyOS Next 内置的一些弹窗效果。比如&#xff1a; 名称描述不依…...

Ubuntu 18.04 上源码安装 protobuf 3.7.0

&#x1f527; 1️⃣ 安装依赖 sudo apt update sudo apt install -y autoconf automake libtool curl make g unzip&#x1f4e5; 2️⃣ 下载源码 cd ~ git clone https://github.com/protocolbuffers/protobuf.git cd protobuf git checkout v3.7.0⚙️ 3️⃣ 编译 & 安…...

中小企业搭建网站选择虚拟主机还是云服务器?华为云有话说

这是一个很常见的问题&#xff0c;许多小企业在搭建网站时都会面临这个选择。虚拟主机和云服务器都有各自的优缺点&#xff0c;需要根据自己的需求和预算来决定。 虚拟主机是指将一台物理服务器分割成多个虚拟空间&#xff0c;每个空间都可以运行一个网站。虚拟主机的优点是价格…...

使用 HTML + JavaScript 在高德地图上实现物流轨迹跟踪系统

在电商行业蓬勃发展的今天&#xff0c;物流信息查询已成为人们日常生活中的重要需求。本文将详细介绍如何基于高德地图 API 利用 HTML JavaScript 实现物流轨迹跟踪系统的开发。 效果演示 项目概述 本项目主要包含以下核心功能&#xff1a; 地图初始化与展示运单号查询功能…...

19-项目部署(Linux)

Linux是一套免费使用和自由传播的操作系统。说到操作系统&#xff0c;大家比较熟知的应该就是Windows和MacOS操作系统&#xff0c;我们今天所学习的Linux也是一款操作系统。 我们作为javaEE开发工程师&#xff0c;将来在企业中开发时会涉及到很多的数据库、中间件等技术&#…...

html基础01:前端基础知识学习

html基础01&#xff1a;前端基础知识学习 1.个人建立打造 -- 之前知识的小总结1.1个人简历展示1.2简历信息填写页面 1.个人建立打造 – 之前知识的小总结 1.1个人简历展示 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8&qu…...

Golang学习之旅

Golang学习之旅&#xff1a;初探Go语言的奥秘 在当今这个快速发展的技术时代&#xff0c;编程语言层出不穷&#xff0c;每一种都有其独特的魅力和适用场景。作为一名对技术充满热情的开发者&#xff0c;我一直在探索新的知识&#xff0c;以提升自己的编程技能。最近&#xff0…...

【RoadRunner】自动驾驶模拟3D场景构建 | 软件简介与视角控制

&#x1f4af; 欢迎光临清流君的博客小天地&#xff0c;这里是我分享技术与心得的温馨角落 &#x1f4af; &#x1f525; 个人主页:【清流君】&#x1f525; &#x1f4da; 系列专栏: 运动控制 | 决策规划 | 机器人数值优化 &#x1f4da; &#x1f31f;始终保持好奇心&…...

基于RK3576+FPGA芯片构建的CODESYS软PLC Linux实时系统方案,支持6T AI算力

基于RK3576芯片构建的CODESYS软PLC Linux实时系统方案&#xff0c;结合了异构计算架构与工业实时控制技术&#xff0c;主要特点如下&#xff1a; 一、硬件架构设计 ‌异构多核协同‌ ‌Cortex-A72四核‌&#xff08;2.3GHz&#xff09;&#xff1a;处理运动轨迹规划、AI视觉等…...

鸿蒙OSUniApp复杂表单与动态验证实践:打造高效的移动端表单解决方案#三方框架 #Uniapp

UniApp复杂表单与动态验证实践&#xff1a;打造高效的移动端表单解决方案 引言 在移动应用开发中&#xff0c;表单处理一直是一个既常见又具有挑战性的任务。随着HarmonyOS生态的蓬勃发展&#xff0c;越来越多的开发者开始关注跨平台解决方案。本文将深入探讨如何使用UniApp框…...

在linux系统上搭建git服务器(ssh协议)

1.在windows上生成RSA密钥对 ssh-keygen -t rsa -b 2048 -C"git用户名/邮箱地址" 命令执行后会在 C:\Users\${windows登录账户}\.ssh 目录下生成密钥对 其中 id_rsa 为私钥&#xff0c;id_rsa.pub 为公钥 2.在 linux 系统上登记公钥 vim ~/.ssh/authorized_keys…...

适配器模式:让不兼容接口协同工作

文章目录 1. 适配器模式概述2. 适配器模式的分类2.1 类适配器2.2 对象适配器 3. 适配器模式的结构4. C#实现适配器模式4.1 对象适配器实现4.2 类适配器实现 5. 适配器模式的实际应用场景5.1 第三方库集成5.2 遗留系统集成5.3 系统重构与升级5.4 跨平台开发 6. 类适配器与对象适…...

NodeJS全栈开发面试题讲解——P12高性能场景题

12.1 设计一个高并发点赞接口&#xff0c;如何优化性能&#xff1f; 设计要点&#xff1a; 问题&#xff1a; 点赞操作是高频写操作&#xff0c;数据库直接写可能成为瓶颈。 优化方案&#xff1a; 缓存计数 异步落库 点赞先写缓存&#xff08;Redis Hash / Sorted Set&…...

DDP与FSDP:分布式训练技术全解析

DDP与FSDP:分布式训练技术全解析 DDP(Distributed Data Parallel)和 FSDP(Fully Sharded Data Parallel)均为用于深度学习模型训练的分布式训练技术,二者借助多 GPU 或多节点来提升训练速度。 1. DDP(Distributed Data Parallel) 实现原理 数据并行:把相同的模型复…...

【Spring AI 1.0.0】Spring AI 1.0.0框架快速入门(1)——Chat Client API

Spring AI框架快速入门 一、前言二、前期准备2.1 运行环境2.2 maven配置2.3 api-key申请 三、Chat Client API3.1 导入pom依赖3.2 配置application.properties文件3.3 创建 ChatClient3.3.1 使用自动配置的 ChatClient.Builder3.3.2 使用多个聊天模型 3.4 ChatClient请求3.5 Ch…...

【笔记】在 MSYS2(MINGW64)中正确安装 Rust

#工作记录 1. 环境信息 Windows系统: MSYS2 MINGW64当前时间: 2025年6月1日Rust 版本: rustc 1.87.0 (17067e9ac 2025-05-09) (Rev2, Built by MSYS2 project) 2. 安装步骤 步骤 1: 更新系统包数据库并升级已安装的包 首先&#xff0c;确保我们的 MSYS2 系统是最新状态。打…...

从汇编的角度揭秘C++引用,豁然开朗

C中的引用是指已有对象的别名&#xff0c;可以通过该别名访问并修改被引用的对象。那么其背后的原理是什么呢&#xff1f;引用是否会带来额外的开销呢&#xff1f;我们从一段代码入手&#xff0c;来分析一下引用的本质。 #include <stdio.h> int main() {int a 10;int …...

设计模式系列(07):建造者模式(Builder)

本文为设计模式系列第7篇&#xff0c;聚焦创建型模式中的建造者模式&#xff0c;涵盖定义、原理、实际业务场景、优缺点、最佳实践及详细代码示例&#xff0c;适合系统学习与实战应用。 目录 1. 模式概述2. 使用场景3. 优缺点分析4. 实际应用案例5. 结构与UML类图6. 代码示例7…...

Maven 项目中集成数据库文档生成工具

在 Maven 项目中&#xff0c;可以通过集成 数据库文档生成工具&#xff08;如 screw-maven-plugin、mybatis-generator 或 liquibase&#xff09;来自动生成数据库文档。以下是使用 screw-maven-plugin&#xff08;推荐&#xff09;的完整配置步骤&#xff1a; 1. 添加插件配置…...

聊聊Tomato Architecture

序 本文主要研究一下Tomato Architecture Clean/Onion/Hexagonal/Ports&Adapters Architectures Clean Architecture clean architecture定义了四层结构&#xff0c;最内层是entities(enterprise business rules)&#xff0c;再往外是use cases(application business ru…...

小白的进阶之路系列之十二----人工智能从初步到精通pytorch综合运用的讲解第五部分

在本笔记本中,我们将针对Fashion-MNIST数据集训练LeNet-5的变体。Fashion-MNIST是一组描绘各种服装的图像瓦片,有十个类别标签表明所描绘的服装类型。 # PyTorch model and training necessities import torch import torch.nn as nn import torch.nn.functional as F impor…...

Java并发编程实战 Day 6:Future与异步编程模型

【Java并发编程实战 Day 6】Future与异步编程模型 在今天的课程中&#xff0c;我们将深入学习Java中的Future与异步编程模型。这是为期30天的"Java并发编程实战"系列的第6天。 理论基础 Future接口 Future接口是Java提供的用于表示异步计算的结果。它提供了以下方…...

.NET Core 中预防跨网站请求伪造 (XSRFCSRF) 攻击

.NET Core 中预防跨网站请求伪造 (XSRF/CSRF) 攻击 在如今的网络环境中&#xff0c;安全问题一直是开发者们不可忽视的重要方面。跨网站请求伪造&#xff08;Cross-Site Request Forgery&#xff0c;简称 CSRF&#xff09;就是一种常见且具有威胁性的网络攻击方式。攻击者通过…...

MFC Resource.h 文件详解与修改指南

MFC Resource.h 文件详解与修改指南 Resource.h 是 Visual C 和 MFC 项目中用于集中管理资源标识符&#xff08;Resource ID&#xff09;的头文件。它由 Visual Studio 的资源编辑器自动生成并维护&#xff0c;也可以手动编辑。理解并合理使用该文件对于管理对话框、控件、菜单…...

2025年06月03日Github流行趋势

项目名称&#xff1a;onlook 项目地址url&#xff1a;https://github.com/onlook-dev/onlook项目语言&#xff1a;TypeScript历史star数&#xff1a;12871今日star数&#xff1a;624项目维护者&#xff1a;Kitenite, drfarrell, spartan-vutrannguyen, apps/devin-ai-integrati…...

AI视频编码器(0.4.3) 调试训练bug——使用timm SoftTargetCrossEntropy时出现loss inf

检查过程 进入loss内部实现: > /root/miniconda3/envs/UMT2/lib/python3.9/site-packages/timm/loss/cross_entropy.py(35...

【数据分析】基于Cox模型的R语言实现生存分析与生物标志物风险评估

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载导入数据数据预处理生存分析画图输出图片其他标记物的分析总结系统信息介绍 分析生存数据与多种生物标志物之间的关系。它通过Cox比例风险模型来评估不同生物标志物…...