三、Kafka生产者
目录
- 3.1 生产者消息发送流程
- 3.1.1 发送原理
- 3.2 异步发送 API
- 3.3 同步发送数据
- 3.4 生产者分区
- 3.4.1 kafka分区的好处
- 3.4.2 生产者发送消息的分区策略
- 3.4.3 自定义分区器
- 3.5 生产者如何提高吞吐量
- 3.6 数据可靠性
3.1 生产者消息发送流程
3.1.1 发送原理
3.2 异步发送 API
3.3 同步发送数据
3.4 生产者分区
3.4.1 kafka分区的好处
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果- 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
3.4.2 生产者发送消息的分区策略

3.4.3 自定义分区器
1、需求:
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区
2、定义类实现 Partitioner 接口,重写 partition()方法。
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取数据 atguigu helloString msgValues = value.toString();int partition;if (msgValues.contains("atguigu")){partition = 0;}else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
3、使用分区器的方法,在生产者的配置中添加分区器参数
public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {// 0 配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 关联自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");// 1 创建kafka生产者对象// "" helloKafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
}

3.5 生产者如何提高吞吐量
- 分批次发送消息
- 对生产者消息采用压缩
四个重要参数:

public class CustomProducerParameters {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接kafka集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");// 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// 批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 1 创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 50; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));}// 3 关闭资源kafkaProducer.close();}
}
3.6 数据可靠性
相关文章:
三、Kafka生产者
目录 3.1 生产者消息发送流程3.1.1 发送原理 3.2 异步发送 API3.3 同步发送数据3.4 生产者分区3.4.1 kafka分区的好处3.4.2 生产者发送消息的分区策略3.4.3 自定义分区器 3.5 生产者如何提高吞吐量3.6 数据可靠性 3.1 生产者消息发送流程 3.1.1 发送原理 3.2 异步发送 API 3…...
【SA8295P 源码分析】19 - QNX Host NFS 文件系统配置
【SA8295P 源码分析】19 - QNX Host NFS 文件系统配置 一、NFS Server二、NFS Client三、NFS 相关的文件及目录四、将文件放入QNX 文件系统中五、编译下载验证系列文章汇总见:《【SA8295P 源码分析】00 - 系列文章链接汇总》 本文链接:《【SA8295P 源码分析】19 - QNX Host N…...
JRE、JDK、JVM及JIT之间有什么不同?_java基础知识总结
当涉及Java编程和执行时,以下术语具有不同的含义: 1.JRE (Java Runtime Environment) JRE是Java运行时环境的缩写。它是一个包含用于在计算机上运行Java应用程序所需的组件集合。JRE包括了以下几个主要部分: Java虚拟机(JVM):用…...
sqlite3数据库的实现
sqlite3代码实现数据库的插入、删除、修改、退出功能 #include <head.h> #include <sqlite3.h> #include <unistd.h> int do_insert(sqlite3 *db); int do_delete(sqlite3 *db); int do_update(sqlite3 *db);int main(int argc, const char *argv[]) {sqlit…...
c#设计模式-结构型模式 之 桥接模式
前言 桥接模式是一种设计模式,它将抽象与实现分离,使它们可以独立变化。这种模式涉及到一个接口作为桥梁,使实体类的功能独立于接口实现类。这两种类型的类可以结构化改变而互不影响。 桥接模式的主要目的是通过将实现和抽象分离,…...
【Vue-Router】导航守卫
前置守卫 main.ts import { createApp } from vue import App from ./App.vue import {router} from ./router // import 引入 import ElementPlus from element-plus import element-plus/dist/index.css const app createApp(App) app.use(router) // use 注入 ElementPlu…...
07无监督学习——降维
1.降维的概述 维数灾难(Curse of Dimensionality):通常是指在涉及到向量的计算的问题中,随着维数的增加,计算量呈指数倍增长的一种现象。 1.1什么是降维? 1.降维(Dimensionality Reduction)是将训练数据中的样本(实例)从高维空间转换到低维…...
系列七、IOC操作bean管理(xml自动装配)
一、概述 自动装配是根据指定规则(属性名称或者属性类型),Spring自动将匹配的属性值进行注入。 二、分类 xml自动装配分为按照属性名称自动装配(byName)和按照属性类型自动装配(byType)。 2.1…...
01- vdom 和模板编译源码
组件渲染的过程 template --> ast --> render --> vDom --> 真实的Dom --> 页面 Runtime-Compiler和Runtime-Only的区别 - 简书 编译步骤 模板编译是Vue中比较核心的一部分。关于 Vue 编译原理这块的整体逻辑主要分三个部分,也可以说是分三步&am…...
C++入门知识点——解决C语言不足
😶🌫️Take your time ! 😶🌫️ 💥个人主页:🔥🔥🔥大魔王🔥🔥🔥 💥代码仓库:🔥🔥魔…...
探秘分布式大数据:融合专业洞见,燃起趣味火花,启迪玄幻思维
文章目录 一 数据导论二 大数据的诞生三 大数据概论3.1 大数据的5V特征3.2 大数据的工作核心 四 大数据软件生态4.1 数据存储软件4.2 数据计算软件4.3 数据传输软件 五 Apache Hadoop概述5.1 Apache Hadoop框架5.2 Hadoop的功能5.3 Hadoop的发展5.4 Hadoop发行版本 一 数据导论…...
什么是 SPI,和API有什么区别?
面试回答 Java 中区分 API 和 SPI,通俗的讲:API 和 SPI 都是相对的概念,他们的差别只在语义上,API 直接被应用开发人员使用,SPI 被框架扩展人员使用。 API Application Programming Interface 大多数情况下ÿ…...
python3 安装clickhouse_sqlalchemy(greenlet) 失败
环境信息: centos7操作系统,python3.8 执行pip3 install clickhouse_sqlalchemy或者pip3 install greenlet报以下报错: Command "/opt/python3.6.10-customized/bin/python3.6 -u -c "import setuptools, tokenize;file/tmp/pip-in…...
五款拿来就能用的炫酷表白代码
「作者主页」:士别三日wyx 「作者简介」:CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」:小白零基础《Python入门到精通》 五款炫酷表白代码 1、无限弹窗表白2、做我女朋友好吗,不同意就关机3、…...
Springboot 封装整活 Mybatis 动态查询条件SQL自动组装拼接
前言 ps:最近在参与3100保卫战,战况很激烈,刚刚打完仗,来更新一下之前写了一半的博客。 该篇针对日常写查询的时候,那些动态条件sql 做个简单的封装,自动生成(抛砖引玉,搞个小玩具&a…...
宝塔部署Java+Vue前后端分离项目经验总结
前言 之前部署服务器都是在Linux环境下自己一点一点安装软件,听说用宝塔傻瓜式部署更快,这次浅浅尝试了一把。 确实简单! 1、 买服务器 咋买服务器略,记得服务器装系统就装 Cent OS 7系列即可,我装的7.6。 2、创建…...
【公告】停止更新
CSDN 博客的限制太多了。阅读体验也非常差。后续将不再 CSDN 上更新。 逐步迁移到掘金和个人博客。 欢迎关注 掘金:0xforee 个人博客:0xforee’s blog...
AutoHotKey+VSCode开发扩展推荐
原来一直用的大众推荐的SciTeAHK版,最近发现VSCode更舒服一些,有几个必装的扩展推荐一下: AutoHotkey Plus 请注意不是AutoHotkey Plus Plus。如果在扩展商店里搜索会有两个,一个是Plus,一个是Plus Plus。我选择Pllus&…...
了解 JSON 格式
一、JSON 基础 JSON(JavaScript Object Notation,JavaScript 对象表示法)是一种轻量级的数据交换格式,JSON 的设计目的是使得数据的存储和交换变得简单。 JSON 易于人的阅读和书写,同时也易于机器的解析和生成。尽管 J…...
[RDMA] 高性能异步的消息传递和RPC :Accelio
1. Introduce Accelio是一个高性能异步的可靠消息传递和RPC库,能优化硬件加速。 RDMA和TCP / IP传输被实现,并且其他的传输也能被实现,如共享存储器可以利用这个高效和方便的API的优点。Accelio 是 Mellanox 公司的RDMA中间件,用…...
手把手教你用modf()和fmod()解决C语言浮点数计算中的常见坑
深入解析C语言浮点数计算:modf()与fmod()的实战应用 浮点数计算在C语言开发中无处不在,从游戏物理引擎到嵌入式传感器数据处理,精确的浮点运算直接关系到程序行为的正确性。然而,许多开发者第一次遭遇浮点数计算误差时,…...
如何突破内容访问限制?5类开源工具的技术解析与场景适配
如何突破内容访问限制?5类开源工具的技术解析与场景适配 【免费下载链接】bypass-paywalls-chrome-clean 项目地址: https://gitcode.com/GitHub_Trending/by/bypass-paywalls-chrome-clean 在信息爆炸的数字时代,优质内容往往被各种访问限制所阻…...
多模态扩展实验:OpenClaw+Qwen3-32B处理图片描述生成
多模态扩展实验:OpenClawQwen3-32B处理图片描述生成 1. 实验背景与动机 最近在探索如何将OpenClaw的自动化能力扩展到视觉领域。作为一个长期依赖文本交互的框架,OpenClaw能否结合多模态大模型处理图像任务?这引发了我的兴趣。恰好手头有台…...
3步解锁iOS激活锁:Applera1n工具完整使用指南
3步解锁iOS激活锁:Applera1n工具完整使用指南 【免费下载链接】applera1n icloud bypass for ios 15-16 项目地址: https://gitcode.com/gh_mirrors/ap/applera1n 当你面对一部显示"激活锁"界面的iPhone,反复输入Apple ID却始终无法进入…...
基于三菱PLC与MCGS组态的农田智能灌溉系统说明(两万字)
基于三菱PLC农田灌溉 包含说明一万 和MCGS组态农田智能灌溉系统说明一万前阵子回豫东老家帮我叔打理那三亩秋月梨果园,那浇地给我整得怀疑人生——三伏天顶着三十七八度的太阳,扛着铁锹跑遍地头开电磁阀,中午热得头晕就算了,晚上还…...
钓鱼邮件应急响应清单:从样本分析到全网封堵的5个关键步骤
钓鱼邮件应急响应实战指南:从识别到处置的闭环管理 钓鱼邮件如同数字时代的隐形陷阱,每年造成数以亿计的经济损失。作为IT运维人员,我们需要建立一套快速响应机制,在攻击者得手前切断威胁链条。本文将分享一套经过实战检验的响应框…...
学术PDF处理神器:OpenClaw+GLM-4.7-Flash自动提取关键结论
学术PDF处理神器:OpenClawGLM-4.7-Flash自动提取关键结论 1. 为什么需要自动化文献处理? 作为一名经常需要阅读大量学术文献的研究者,我发现自己花费在整理文献上的时间甚至超过了实际阅读时间。每次下载几十篇PDF后,手动提取目…...
Nacos集群启动时,那个神秘的cluster.conf文件到底是怎么被找到和监控的?
Nacos集群启动时cluster.conf文件的寻址与监控机制深度解析 从一次集群配置失效事件说起 上周深夜,我们的分布式系统监控平台突然发出警报——Nacos集群中的三个节点相继失联。紧急排查时发现,明明已经更新了cluster.conf文件新增了两个节点,…...
【Frida Android】实战篇:Frida-Trace 进阶追踪——JNI 函数参数捕获与修改
1. 为什么需要捕获JNI函数参数? 在Android安全分析和逆向工程中,JNI函数往往是关键突破口。很多应用会把核心逻辑放在native层实现,比如加密算法、授权验证、敏感数据处理等。单纯Hook Java层方法可能无法触及这些关键逻辑,这时候…...
LumiPixel优化升级:如何利用Z-Image模型生成更细腻的像素人像
LumiPixel优化升级:如何利用Z-Image模型生成更细腻的像素人像 1. 引言:像素艺术的复兴与挑战 像素艺术作为一种独特的数字艺术形式,近年来在游戏、NFT和数字设计领域迎来复兴。然而传统像素创作面临两大核心挑战: 细节表现力不…...
