Kafka生产者相关
windows中kafka集群部署示例-CSDN博客
先启动集群或者单机也OK
引入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>
关于主题创建
理论来讲创建主题(Topic是Kafka的内部操作),无论生产者或是消费者都不能主动创建主题.
没有主题就不能生产数据
但是往往看到生产者可以创建主题,原因是kafka的内部自动创建主题机制,当生产者中有个管理员,没有该主题就会自动创建
auto.create.topics.enable 默认是true 如果改成false 那么生产者就无法创建了
因此主题是kafka的自动创建主题的机制来实现的,而非生产者创建主题
生产者利用kafka自动创建主题的机制来创建主题...........................................................................
/*** @author hrui* @date 2025/2/26 12:53*/
public class AdminTopicTest {public static void main(String[] args) {Map<String,Object> confMap=new HashMap<>();//例如我的集群是9091 9092 9093 这里无需关心具体连接哪个端口 随意一个端口confMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9091");//管理员对象Admin admin=Admin.create(confMap);/*** 构建主题的三个参数* 第一个参数:主题名称* 第二个参数:分区数量* 第三个参数:副本数量(short类型)*/NewTopic newTopic=new NewTopic("test1",1, (short) 1);//创建主题CreateTopicsResult topics = admin.createTopics(Arrays.asList(newTopic));//关闭管理者对象admin.close();}
}
NewTopic("test1",这里可以传个Map);可以自定义主题分区副本策略 不指定就默认
生产者流程图
生产者大致代码
public class KafkaProducerTest {public static void main(String[] args) {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);kafkaProducer.send(record);}//关闭生产者对象kafkaProducer.close();}
}
生产者拦截器
可以对照流程图,看下生产者拦截器在什么位置,一般是对Key value的整理转换,对生产的数据做统一规范化处理,可以配置多个
可以点进去
大致就是这么个过程
遍历 拦截器 并调用每个拦截器的onSend方法
可以看到每个拦截器都是ProducerInterceptor类型
自定义生产者拦截器
自定义一个类实现ProducerInterceptor
在创建生产者时候添加拦截器配置
生产者拦截器
package com.hrui.interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @author hrui* @date 2025/2/26 14:20*/
public class ValueInterceptor implements ProducerInterceptor<String,String> {@Override//发送数据的时候,会调用public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("拦截器拦截到消息:"+producerRecord.value());return new ProducerRecord<>(producerRecord.topic(),producerRecord.key(),producerRecord.value()+"-拦截器");}@Override//发送数据完毕,服务器返回的响应,会调用此方法public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}@Override//生产者对象关闭时候,会调用此方法public void close() {}@Override//创建生产者对象时候调用public void configure(Map<String, ?> map) {}
}
启动下
生产者数据发送同步或异步
如果需要同步
ACKS数据接收应答处理机制
指的是:
生产者发送数据到 Kafka Broker 时,Kafka 如何处理消息的接收确认。通过设置 ACKS 参数,你可以控制 Kafka 如何在生产者发送消息后确认数据是否成功写入。
ACKS三个配置
ACKS=0 生产者发送数据之后,不等待任何确认,发送了 就认为你可能收到了,丢失不管
ACKS=1 生产者会等待 分区的主副本(Leader)确认消息已经写入到其磁盘中,主副本发送成功确认后,生产者就认为消息已经成功发送。 如果主副本挂了消息仍可能丢失,除非有副本在进行同步
ACKS=all(或ACKS=-1) 等待所有副本确认 消息保证不会丢失 性能会较低,因为生产者需要等待所有副本确认
默认ACKS=-1
生产者数据重试(重发)功能
例如ACKS=1的情况下 Leader还没来的及将数据保存到磁盘
Broker挂了,此时生产者在等待回调 但是一直没回复,超过等待时间
Kafka退出超时重试机制 retry
可以配置retry重试机制
重试机制带来了好处,也有坏处
例如 broker并没有挂 只是因为网络不稳定 这就产生了数据重复和乱序现象
如何避免数据重复
如果ACSK 1或者-1(就是ALL)就是为了数据不丢失,增强可靠性
如果你禁用重试肯定是不行的
但是重试又会导致数据重复和乱序现象
Kafka提供了生产者幂等性操作:所谓生产者幂等性操作就是 生产者的消息无论向Kafka发送多少次,
Kafka的Leader只会保存一条,默认的幂等性是不起作用的
开启
要启用生产者的幂等性,必须设置以下两个配置:
acks=all
(或acks=-1
):这要求生产者等待所有副本确认消息已成功写入,确保数据的持久性和一致性。enable.idempotence=true
:启用幂等性保证。- 且要开启重试处理
- 在途请求缓冲区数量指的是 Kafka 生产者在发送消息时,等待确认的消息数量默认是5 不能超过5
在途请求缓冲区的数量:max.in.flight.requests.per.connection
幂等性 确保了相同分区内的消息不会重复,但在 多个分区 的情况下,跨分区的消息仍然无法避免乱序
生产者事务操作
事务可以保证生产者 ID 唯一 解决跨会话 每次重启 生产者ID会变化 加了事务可以保持不变
package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置幂等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重试次数configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超时configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事务 事务基于幂等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事务kafkaProducer.initTransactions();try {//开启事务kafkaProducer.beginTransaction();for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中//可以不设置key 默认是按照轮询的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//发送数据 send方法还可以接收一个参数,就是回调函数 kafkaProducer.send(record);是异步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 处理发送失败的情况e.printStackTrace();} else {// 处理发送成功的情况System.out.println("发送成功:" + recordMetadata);}}});send.get();}//提交事务kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事务kafkaProducer.abortTransaction();}finally {//关闭生产者对象kafkaProducer.close();}}
}
添加事务后 生产者默认会创建一个事务topic 默认50个分区
相关文章:

Kafka生产者相关
windows中kafka集群部署示例-CSDN博客 先启动集群或者单机也OK 引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>关于主题创建 理论…...
技术问题汇总:前端怎么往后端传一个数组?
场景 现在一个专门负责复习算法的服务,筛选出了用户今天需要复习的笔记的ids,现在要调用笔记服务根据ids查询的接口。 请问复习服务怎么将ids发到笔记服务,笔记服务怎么接收。 思路 发的时候肯定是用字符串,接收的时候…...
【03】STM32F407 HAL 库框架设计学习
【03】STM32F407 HAL 库框架设计学习 摘要 本文旨在为初学者提供一个关于STM32F407微控制器HAL(Hardware Abstraction Layer)库框架设计的详细学习教程。通过本文,读者将从零开始,逐步掌握STM32F407的基本知识、HAL库的配置步骤…...

智能图像处理平台:图像处理配置类
这里我们先修改一下依赖,不用JavaCV,用openCV。 导入依赖: <!-- JavaCV 依赖,用于图像和视频处理 --> <!-- <dependency>--> <!-- <groupId>org.bytedeco</groupId>--> &l…...

【图文详解】什么是微服务?什么是SpringCloud?
目录 一.认识微服务架构 ??微服务带来的挑战 二.微服务解决方案SpringCloud ??SpringCloud的版本 ??SpringCloud和SpringBoot的关系 ??SpringCloud实现方案 Spring Cloud Netfix Spring Cloud Alibaba ??Spring Cloud 实现对比 在入门Spring Cloud 之前&…...

基于ssm的校园跑腿管理系统+vue
作者主页:舒克日记 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 系统共有管理员、用户两个角色 管理员主要的功能用户信息管理、任务信息管理、任务类型管理、接单信息管理、公告信息管理、投诉信息管理、公告类型管…...

5个GitHub热点开源项目!!
1.自托管 Moonlight 游戏串流服务:Sunshine 主语言:C,Star:14.4k,周增长:500 这是一个自托管的 Moonlight 游戏串流服务器端项目,支持所有 Moonlight 客户端。用户可以在自己电脑上搭建一个游戏…...
docker通用技术介绍
docker通用技术介绍 1.docker介绍 1.1 基本概念 docker是一个开源的容器化平台,用于快速构建、打包、部署和运行应用程序。它通过容器化技术将应用及其依赖环境(如代码、库、系统工具等)打包成一个标准化、轻量级的独立单元,实…...

#渗透测试#批量漏洞挖掘#某图创图书馆集群管理系统updOpuserPw SQL注入(CVE-2021-44321)
免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停…...

智能合约安全 | 合约无效化攻击
目录: 智能合约安全 合约无效化攻击 合约自毁函数 selfdestruct 攻击实现 漏洞防御 总结 智能合约安全 合约无效化攻击 合约无效化攻击类同于web安全中的逻辑漏洞中的一种 我们这里拿一个典型的例子来讲解 有这样一份智能合约, 每个人可以向其中发送1 eth 第七个…...

RabbitMQ 的介绍与使用
一. 简介 1> 什么是MQ 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。 其主要用途:不同进程Process/线程T…...
【手撕算法】K-Means聚类全解析:从数学推导到图像分割实战
摘要 聚类算法是探索数据内在结构的利器!本文手撕K-Means核心公式,结合Python代码实现与图像分割案例,详解: ✅ 欧氏距离计算 ✅ 簇中心迭代更新 ✅ 肘部法则优化 目录 摘要 目录 一、算法核心思想 二、数学原理详解 2.1 …...

【SQL技术】不同数据库引擎 SQL 优化方案剖析
一、引言 在数据处理和分析的世界里,SQL 是不可或缺的工具。不同的数据库系统,如 MySQL、PostgreSQL(PG)、Doris 和 Hive,在架构和性能特点上存在差异,因此针对它们的 SQL 优化策略也各有不同。这些数据库…...
RabbitMQ系列(二)基本概念之Publisher
在 RabbitMQ 中,Publisher(发布者) 是负责向 RabbitMQ 服务器发送消息的客户端角色,通常被称为“生产者”。以下是其核心功能与工作机制的详细解析: 一、核心定义与作用 消息发送者 Publisher 将消息发送到 RabbitMQ 的…...

OAK相机的抗震性测试
在工业环境中,双目视觉相机必须具备与工作环境同等的坚固性。鉴于部分客户会将我们的相机应用于恶劣环境(例如安装在重型机械上),我们依据EN 60068-2-6:2008标准对相机进行了振动耐受性测试。 测试涉及的相机型号包括:…...
2025最新Nginx高频面试题
2025最新Nginx高频面试题 摘要:本文整理了2025年企业高频Nginx面试题,覆盖核心原理、配置优化、安全防护及云原生场景实战,助你轻松应对技术面试! 核心原理篇 1. Nginx的Master-Worker架构优势是什么? 答案…...
【Kubernetes】API server 限流 之 maxinflight.go
这个文件实现了一个基于信号量(Channel)的简单限流器。 基础知识 总共有四种channel 带缓冲的channel nonMutatingChan、mutatingChan 都是带缓冲的channel ,这类channel 的特点是: 这允许最多 mutatingLimit /nonMutatingLimit 个请求同时获取令牌并执…...
推荐算法工程师的技术图谱和学习路径
推荐算法工程师的技术图谱和学习路径可以从多个维度进行概述,可以总结如下: 一、技术图谱 推荐算法工程师需要掌握的技术栈主要分为以下几个方面: 数学基础: 微积分、线性代数、概率论与统计学是推荐算法的基础,用于理解模型的数学原理和优化算法。高等数学、最优化理论…...

Milvus高性能向量数据库与大模型结合
Milvus | 高性能向量数据库,为规模而构建Milvus 是一个为 GenAI 应用构建的开源向量数据库。使用 pip 安装,执行高速搜索,并扩展到数十亿个向量。https://milvus.io/zh Milvus 是什么? Milvus 是一种高性能、高扩展性的向量数据…...
轮式机器人在复杂地形中如何选择合适的全局路径规划算法?
已思考完成 收起 嗯,用户问的是轮式机器人在复杂地形中如何选择合适的全局路径规划算法。首先,我需要理解复杂地形可能带来的挑战,比如崎岖的路面、动态障碍物、非结构化环境等等。轮式机器人在这里的运动控制需要考虑地形通过性、稳定性&…...

【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...

MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
大语言模型如何处理长文本?常用文本分割技术详解
为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
生成 Git SSH 证书
🔑 1. 生成 SSH 密钥对 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 参数说明: -t rsa&#x…...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合
在汽车智能化的汹涌浪潮中,车辆不再仅仅是传统的交通工具,而是逐步演变为高度智能的移动终端。这一转变的核心支撑,来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒(T-Box)方案:NXP S32K146 与…...

Unity UGUI Button事件流程
场景结构 测试代码 public class TestBtn : MonoBehaviour {void Start(){var btn GetComponent<Button>();btn.onClick.AddListener(OnClick);}private void OnClick(){Debug.Log("666");}}当添加事件时 // 实例化一个ButtonClickedEvent的事件 [Formerl…...

在 Spring Boot 中使用 JSP
jsp? 好多年没用了。重新整一下 还费了点时间,记录一下。 项目结构: pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...