Kafka生产者相关
windows中kafka集群部署示例-CSDN博客
先启动集群或者单机也OK

引入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>
关于主题创建
理论来讲创建主题(Topic是Kafka的内部操作),无论生产者或是消费者都不能主动创建主题.
没有主题就不能生产数据
但是往往看到生产者可以创建主题,原因是kafka的内部自动创建主题机制,当生产者中有个管理员,没有该主题就会自动创建
auto.create.topics.enable 默认是true 如果改成false 那么生产者就无法创建了
因此主题是kafka的自动创建主题的机制来实现的,而非生产者创建主题
生产者利用kafka自动创建主题的机制来创建主题...........................................................................
/*** @author hrui* @date 2025/2/26 12:53*/
public class AdminTopicTest {public static void main(String[] args) {Map<String,Object> confMap=new HashMap<>();//例如我的集群是9091 9092 9093 这里无需关心具体连接哪个端口 随意一个端口confMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9091");//管理员对象Admin admin=Admin.create(confMap);/*** 构建主题的三个参数* 第一个参数:主题名称* 第二个参数:分区数量* 第三个参数:副本数量(short类型)*/NewTopic newTopic=new NewTopic("test1",1, (short) 1);//创建主题CreateTopicsResult topics = admin.createTopics(Arrays.asList(newTopic));//关闭管理者对象admin.close();}
}
NewTopic("test1",这里可以传个Map);可以自定义主题分区副本策略 不指定就默认
生产者流程图

生产者大致代码
public class KafkaProducerTest {public static void main(String[] args) {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);kafkaProducer.send(record);}//关闭生产者对象kafkaProducer.close();}
}
生产者拦截器
可以对照流程图,看下生产者拦截器在什么位置,一般是对Key value的整理转换,对生产的数据做统一规范化处理,可以配置多个
可以点进去



大致就是这么个过程

遍历 拦截器 并调用每个拦截器的onSend方法
可以看到每个拦截器都是ProducerInterceptor类型

自定义生产者拦截器
自定义一个类实现ProducerInterceptor
在创建生产者时候添加拦截器配置

生产者拦截器
package com.hrui.interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @author hrui* @date 2025/2/26 14:20*/
public class ValueInterceptor implements ProducerInterceptor<String,String> {@Override//发送数据的时候,会调用public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("拦截器拦截到消息:"+producerRecord.value());return new ProducerRecord<>(producerRecord.topic(),producerRecord.key(),producerRecord.value()+"-拦截器");}@Override//发送数据完毕,服务器返回的响应,会调用此方法public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}@Override//生产者对象关闭时候,会调用此方法public void close() {}@Override//创建生产者对象时候调用public void configure(Map<String, ?> map) {}
}
启动下


生产者数据发送同步或异步

如果需要同步

ACKS数据接收应答处理机制
指的是:
生产者发送数据到 Kafka Broker 时,Kafka 如何处理消息的接收确认。通过设置 ACKS 参数,你可以控制 Kafka 如何在生产者发送消息后确认数据是否成功写入。
ACKS三个配置
ACKS=0 生产者发送数据之后,不等待任何确认,发送了 就认为你可能收到了,丢失不管
ACKS=1 生产者会等待 分区的主副本(Leader)确认消息已经写入到其磁盘中,主副本发送成功确认后,生产者就认为消息已经成功发送。 如果主副本挂了消息仍可能丢失,除非有副本在进行同步
ACKS=all(或ACKS=-1) 等待所有副本确认 消息保证不会丢失 性能会较低,因为生产者需要等待所有副本确认
默认ACKS=-1


生产者数据重试(重发)功能
例如ACKS=1的情况下 Leader还没来的及将数据保存到磁盘
Broker挂了,此时生产者在等待回调 但是一直没回复,超过等待时间
Kafka退出超时重试机制 retry

可以配置retry重试机制
重试机制带来了好处,也有坏处
例如 broker并没有挂 只是因为网络不稳定 这就产生了数据重复和乱序现象
如何避免数据重复
如果ACSK 1或者-1(就是ALL)就是为了数据不丢失,增强可靠性
如果你禁用重试肯定是不行的
但是重试又会导致数据重复和乱序现象
Kafka提供了生产者幂等性操作:所谓生产者幂等性操作就是 生产者的消息无论向Kafka发送多少次,
Kafka的Leader只会保存一条,默认的幂等性是不起作用的
开启

要启用生产者的幂等性,必须设置以下两个配置:
acks=all(或acks=-1):这要求生产者等待所有副本确认消息已成功写入,确保数据的持久性和一致性。enable.idempotence=true:启用幂等性保证。- 且要开启重试处理
- 在途请求缓冲区数量指的是 Kafka 生产者在发送消息时,等待确认的消息数量默认是5 不能超过5
在途请求缓冲区的数量:max.in.flight.requests.per.connection
幂等性 确保了相同分区内的消息不会重复,但在 多个分区 的情况下,跨分区的消息仍然无法避免乱序
生产者事务操作
事务可以保证生产者 ID 唯一 解决跨会话 每次重启 生产者ID会变化 加了事务可以保持不变
package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置幂等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重试次数configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超时configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事务 事务基于幂等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事务kafkaProducer.initTransactions();try {//开启事务kafkaProducer.beginTransaction();for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中//可以不设置key 默认是按照轮询的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//发送数据 send方法还可以接收一个参数,就是回调函数 kafkaProducer.send(record);是异步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 处理发送失败的情况e.printStackTrace();} else {// 处理发送成功的情况System.out.println("发送成功:" + recordMetadata);}}});send.get();}//提交事务kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事务kafkaProducer.abortTransaction();}finally {//关闭生产者对象kafkaProducer.close();}}
}
添加事务后 生产者默认会创建一个事务topic 默认50个分区
![]()

相关文章:
Kafka生产者相关
windows中kafka集群部署示例-CSDN博客 先启动集群或者单机也OK 引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>关于主题创建 理论…...
技术问题汇总:前端怎么往后端传一个数组?
场景 现在一个专门负责复习算法的服务,筛选出了用户今天需要复习的笔记的ids,现在要调用笔记服务根据ids查询的接口。 请问复习服务怎么将ids发到笔记服务,笔记服务怎么接收。 思路 发的时候肯定是用字符串,接收的时候…...
【03】STM32F407 HAL 库框架设计学习
【03】STM32F407 HAL 库框架设计学习 摘要 本文旨在为初学者提供一个关于STM32F407微控制器HAL(Hardware Abstraction Layer)库框架设计的详细学习教程。通过本文,读者将从零开始,逐步掌握STM32F407的基本知识、HAL库的配置步骤…...
智能图像处理平台:图像处理配置类
这里我们先修改一下依赖,不用JavaCV,用openCV。 导入依赖: <!-- JavaCV 依赖,用于图像和视频处理 --> <!-- <dependency>--> <!-- <groupId>org.bytedeco</groupId>--> &l…...
【图文详解】什么是微服务?什么是SpringCloud?
目录 一.认识微服务架构 ??微服务带来的挑战 二.微服务解决方案SpringCloud ??SpringCloud的版本 ??SpringCloud和SpringBoot的关系 ??SpringCloud实现方案 Spring Cloud Netfix Spring Cloud Alibaba ??Spring Cloud 实现对比 在入门Spring Cloud 之前&…...
基于ssm的校园跑腿管理系统+vue
作者主页:舒克日记 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 系统共有管理员、用户两个角色 管理员主要的功能用户信息管理、任务信息管理、任务类型管理、接单信息管理、公告信息管理、投诉信息管理、公告类型管…...
5个GitHub热点开源项目!!
1.自托管 Moonlight 游戏串流服务:Sunshine 主语言:C,Star:14.4k,周增长:500 这是一个自托管的 Moonlight 游戏串流服务器端项目,支持所有 Moonlight 客户端。用户可以在自己电脑上搭建一个游戏…...
docker通用技术介绍
docker通用技术介绍 1.docker介绍 1.1 基本概念 docker是一个开源的容器化平台,用于快速构建、打包、部署和运行应用程序。它通过容器化技术将应用及其依赖环境(如代码、库、系统工具等)打包成一个标准化、轻量级的独立单元,实…...
#渗透测试#批量漏洞挖掘#某图创图书馆集群管理系统updOpuserPw SQL注入(CVE-2021-44321)
免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停…...
智能合约安全 | 合约无效化攻击
目录: 智能合约安全 合约无效化攻击 合约自毁函数 selfdestruct 攻击实现 漏洞防御 总结 智能合约安全 合约无效化攻击 合约无效化攻击类同于web安全中的逻辑漏洞中的一种 我们这里拿一个典型的例子来讲解 有这样一份智能合约, 每个人可以向其中发送1 eth 第七个…...
RabbitMQ 的介绍与使用
一. 简介 1> 什么是MQ 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。 其主要用途:不同进程Process/线程T…...
【手撕算法】K-Means聚类全解析:从数学推导到图像分割实战
摘要 聚类算法是探索数据内在结构的利器!本文手撕K-Means核心公式,结合Python代码实现与图像分割案例,详解: ✅ 欧氏距离计算 ✅ 簇中心迭代更新 ✅ 肘部法则优化 目录 摘要 目录 一、算法核心思想 二、数学原理详解 2.1 …...
【SQL技术】不同数据库引擎 SQL 优化方案剖析
一、引言 在数据处理和分析的世界里,SQL 是不可或缺的工具。不同的数据库系统,如 MySQL、PostgreSQL(PG)、Doris 和 Hive,在架构和性能特点上存在差异,因此针对它们的 SQL 优化策略也各有不同。这些数据库…...
RabbitMQ系列(二)基本概念之Publisher
在 RabbitMQ 中,Publisher(发布者) 是负责向 RabbitMQ 服务器发送消息的客户端角色,通常被称为“生产者”。以下是其核心功能与工作机制的详细解析: 一、核心定义与作用 消息发送者 Publisher 将消息发送到 RabbitMQ 的…...
OAK相机的抗震性测试
在工业环境中,双目视觉相机必须具备与工作环境同等的坚固性。鉴于部分客户会将我们的相机应用于恶劣环境(例如安装在重型机械上),我们依据EN 60068-2-6:2008标准对相机进行了振动耐受性测试。 测试涉及的相机型号包括:…...
2025最新Nginx高频面试题
2025最新Nginx高频面试题 摘要:本文整理了2025年企业高频Nginx面试题,覆盖核心原理、配置优化、安全防护及云原生场景实战,助你轻松应对技术面试! 核心原理篇 1. Nginx的Master-Worker架构优势是什么? 答案…...
【Kubernetes】API server 限流 之 maxinflight.go
这个文件实现了一个基于信号量(Channel)的简单限流器。 基础知识 总共有四种channel 带缓冲的channel nonMutatingChan、mutatingChan 都是带缓冲的channel ,这类channel 的特点是: 这允许最多 mutatingLimit /nonMutatingLimit 个请求同时获取令牌并执…...
推荐算法工程师的技术图谱和学习路径
推荐算法工程师的技术图谱和学习路径可以从多个维度进行概述,可以总结如下: 一、技术图谱 推荐算法工程师需要掌握的技术栈主要分为以下几个方面: 数学基础: 微积分、线性代数、概率论与统计学是推荐算法的基础,用于理解模型的数学原理和优化算法。高等数学、最优化理论…...
Milvus高性能向量数据库与大模型结合
Milvus | 高性能向量数据库,为规模而构建Milvus 是一个为 GenAI 应用构建的开源向量数据库。使用 pip 安装,执行高速搜索,并扩展到数十亿个向量。https://milvus.io/zh Milvus 是什么? Milvus 是一种高性能、高扩展性的向量数据…...
轮式机器人在复杂地形中如何选择合适的全局路径规划算法?
已思考完成 收起 嗯,用户问的是轮式机器人在复杂地形中如何选择合适的全局路径规划算法。首先,我需要理解复杂地形可能带来的挑战,比如崎岖的路面、动态障碍物、非结构化环境等等。轮式机器人在这里的运动控制需要考虑地形通过性、稳定性&…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...
使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...
Reasoning over Uncertain Text by Generative Large Language Models
https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...
使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...
