Kafka3.0.0版本——生产者如何提高吞吐量
目录
- 一、生产者提高吞吐量参数设置
- 二、产者提高吞吐量代码示例
一、生产者提高吞吐量参数设置
- batch.size:设置批次大小,默认16k
- linger.ms:设置等待时间,修改为5-100ms
- buffer.memory:设置缓冲区大小, 默认 32M
- compression.type:设置压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
二、产者提高吞吐量代码示例
-
代码示例
package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /*** 生产者提高吞吐量* 1、设置批次大小,batch.size 默认 16K* 2、设置等待时间,linger.ms 默认 0* 3、设置缓冲区大小,buffer.memory 默认 32M* 4、设置压缩, compression.type 默认 none,可配置值 gzip、snappy、lz4 和 zstd* */ public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());/*** 4、提高吞吐量* *///设置缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//设置批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//设置等待时间 linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);//设置压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//5、创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//6、调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}//7、关闭资源kafkaProducer.close();} }
-
在三台服务器上开启 Kafka 消费者
[root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.29:9092 --topic news
-
在 IDEA 中执行代码,观察 三台服务器控制台中是否接收到消息,如下图:
相关文章:

Kafka3.0.0版本——生产者如何提高吞吐量
目录 一、生产者提高吞吐量参数设置二、产者提高吞吐量代码示例 一、生产者提高吞吐量参数设置 batch.size:设置批次大小,默认16klinger.ms:设置等待时间,修改为5-100msbuffer.memory:设置缓冲区大小, 默认…...
js精度丢失的问题
1.js精度丢失的常见问题,从常见的浮点型进行计算,到位数很长的munber类型进行计算都会造成精度丢失的问题, 首先我们看一个问题: 0.1 0.2 ! 0.3 // truelet a 9007199254740992 a 1 a // true那么js为什么会出现精度丢失的问题&…...
C++ 编译预处理
在编译器对源程序进行编译时,首先要由处理器对程序文本进行预处理。预处理器提供了一组编译预处理指令和预处理操作符。预处理指令实际上不是C语言的一部分,它只是用来扩充C程序设计环境。所有的预处理指令在程序中都以“#”来引导,每一条预处…...

备战秋招 | 笔试强化22
目录 一、选择题 二、编程题 三、选择题题解 四、编程题题解 一、选择题 1、在有序双向链表中定位删除一个元素的平均时间复杂度为 A. O(1) B. O(N) C. O(logN) D. O(N*logN) 2、在一个以 h 为头指针的单循环链表中,p 指针指向链尾结点的条件是( ) A. p->ne…...
LeetCode ACM模式——哈希表篇(二)
刷题顺序及思路来源于代码随想录,网站地址:https://programmercarl.com 202. 快乐数 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为: 对于一个正整数,每一次将该数替换为它每个位置上的数字的平方和。然后重复…...
hadoop 3.1.3集群搭建 ubuntu20
相关 hyper-v安装ubuntu-20-server hyper-v建立快照 hyper-v快速创建虚拟机-导入导出虚拟机 准备 虚拟机设置 采用hyper-v方式安装ubuntu-20虚拟机和koolshare hostnameiph01192.168.66.20h02192.168.66.21h03192.168.66.22 静态IP 所有机器都需要按需设置 sudo vim /e…...

备忘录模式——撤销功能的实现
1、简介 1.1、概述 备忘录模式提供了一种状态恢复的实现机制,使得用户可以方便地回到一个特定的历史步骤。当新的状态无效或者存在问题时,可以使用暂时存储起来的备忘录将状态复原。当前很多软件都提供了撤销(Undo)操作…...

Golang 函数参数的传递方式 值传递,引用传递
基本介绍 我们在讲解函数注意事项和使用细节时,已经讲过值类型和引用类型了,这里我们再系统总结一下,因为这是重难点,值类型参数默认就是值传递,而引用类型参数默认就是引用传递。 两种传递方式(函数默认都…...

K8s影响Pod调度和Deployment
5.应用升级回滚和弹性伸缩...
透明代理和不透明代理
透明代理和不透明代理 1、透明代理(Transparent Proxy)2、不透明代理(Non-Transparent Proxy)3、工作原理4、透明代理为啥比不透明代理多一部先连接到路由再到代理服务器?5、这里路由器做了什么工作6、代理自动配置文件(Proxy Auto-Configuration file,PAC file)7、代理…...

1424. 对角线遍历 II;2369. 检查数组是否存在有效划分;1129. 颜色交替的最短路径
1424. 对角线遍历 II 核心思想:我感觉是一个技巧题,如果想到很容易做出了,想不到就很难了。首先对于一条对角线的数,其坐标ij是一样的,然后同一条对角线斜向上的j是从小到大的,知道这个就很容易做出来了。…...

【漏洞复现】Metabase 远程命令执行漏洞(CVE-2023-38646)
文章目录 前言声明一、漏洞介绍二、影响版本三、漏洞原理四、漏洞复现五、修复建议 前言 Metabase 0.46.6.1之前版本和Metabase Enterprise 1.46.6.1之前版本存在安全漏洞,未经身份认证的远程攻击者利用该漏洞可以在服务器上以运行 Metabase 服务器的权限执行任意命…...
Linux 9的repo for OVS build
源码中自带RPM包spec文件 cd /root/rpmbuild/SOURCES/openvswitch-2.17.7/rhel rpmbuild -bb openvswitch.spec ## 按提示解决,不好解决的依赖可以试试下面的repo 方法 error: File /root/rpmbuild/SOURCES/openvswitch-2.17.7.tar.gz: No such file or direct…...
DOCTYPE 是什么作用?
聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ DOCTYPE 是什么作用?⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅!这个专栏是为那些对Web开发感兴…...

KubeSphere 3.4.0 发布:支持 K8s v1.26
2023 年 07 月 26 日,KubeSphere 开源社区激动地向大家宣布,KubeSphere 3.4.0 正式发布! 让我们先简单回顾下之前三个大版本的主要变化: KubeSphere 3.1.0 新增了“边缘计算”、“计量计费” 等功能,将 Kubernetes 从…...
自然语言文本分类模型代码
以下是一个基于PyTorch的文本分类模型的示例代码,用于将给定的文本分为多个预定义类别: import torch import torch.nn as nn import torch.nn.functional as Fclass TextClassifier(nn.Module):def __init__(self, vocab_size, embedding_dim, hidden_…...

Prometheus实现系统监控报警邮件
Prometheus实现系统监控报警邮件 简介 Prometheus将数据采集和报警分成了两个模块。报警规则配置在Prometheus Servers上, 然后发送报警信息到AlertManger,然后我们的AlertManager就来管理这些报警信息,聚合报警信息过后通过email、PagerDu…...
could not import go.etcd.io/etcd/clientv3-go
问题描述 今天在封装etcd的时候导包报错: could not import go.etcd.io/etcd/clientv3 (no required module provides package "go.etcd.io/etcd/clientv3") 问题解决: get:确保下载了client包 go get go.etcd.io/etcd/client tidy go mod tidy 本文由 mdnice 多平台…...
MySQL的行锁、表锁触发
MySQL的行锁、表锁触发 sql CREATE TABLE products ( product_id INT PRIMARY KEY, product_name VARCHAR(50), stock INT ); INSERT INTO products (product_id, product_name, stock) VALUES (1001, ‘商品A’, 50), (1002, ‘商品B’, 30), (1003, ‘商品C’, 20); 一、行锁…...

mysql-入门笔记-3
# ----------排序查询-------- # 语法 # select 字段列表 from 表名 order by 字段1 排序方式1 ,字段2 排序方式2 ; DESC 降序 ASC升序 # 1 根据年龄对公司的员工进行升序排序---默认升序-黄色提示代码冗余 select * from userTable order by age ASC ; # 2 根据入职时间,对员…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...

【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...

NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA
浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求,本次涉及的主要是收费汇聚交换机的配置,浪潮网络设备在高速项目很少,通…...

Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...