当前位置: 首页 > news >正文

Kafka生产者相关

windows中kafka集群部署示例-CSDN博客

先启动集群或者单机也OK

引入依赖

    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>

关于主题创建

理论来讲创建主题(Topic是Kafka的内部操作),无论生产者或是消费者都不能主动创建主题.

没有主题就不能生产数据

但是往往看到生产者可以创建主题,原因是kafka的内部自动创建主题机制,当生产者中有个管理员,没有该主题就会自动创建

auto.create.topics.enable 默认是true  如果改成false  那么生产者就无法创建了

因此主题是kafka的自动创建主题的机制来实现的,而非生产者创建主题

生产者利用kafka自动创建主题的机制来创建主题...........................................................................

/*** @author hrui* @date 2025/2/26 12:53*/
public class AdminTopicTest {public static void main(String[] args) {Map<String,Object> confMap=new HashMap<>();//例如我的集群是9091  9092 9093  这里无需关心具体连接哪个端口  随意一个端口confMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9091");//管理员对象Admin admin=Admin.create(confMap);/*** 构建主题的三个参数* 第一个参数:主题名称* 第二个参数:分区数量* 第三个参数:副本数量(short类型)*/NewTopic newTopic=new NewTopic("test1",1, (short) 1);//创建主题CreateTopicsResult topics = admin.createTopics(Arrays.asList(newTopic));//关闭管理者对象admin.close();}
}

NewTopic("test1",这里可以传个Map);可以自定义主题分区副本策略   不指定就默认

生产者流程图

生产者大致代码

public class KafkaProducerTest {public static void main(String[] args) {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);kafkaProducer.send(record);}//关闭生产者对象kafkaProducer.close();}
}

生产者拦截器

可以对照流程图,看下生产者拦截器在什么位置,一般是对Key   value的整理转换,对生产的数据做统一规范化处理,可以配置多个

可以点进去

大致就是这么个过程

遍历 拦截器  并调用每个拦截器的onSend方法

可以看到每个拦截器都是ProducerInterceptor类型

自定义生产者拦截器

自定义一个类实现ProducerInterceptor

在创建生产者时候添加拦截器配置

生产者拦截器

package com.hrui.interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @author hrui* @date 2025/2/26 14:20*/
public class ValueInterceptor implements ProducerInterceptor<String,String> {@Override//发送数据的时候,会调用public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("拦截器拦截到消息:"+producerRecord.value());return new ProducerRecord<>(producerRecord.topic(),producerRecord.key(),producerRecord.value()+"-拦截器");}@Override//发送数据完毕,服务器返回的响应,会调用此方法public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}@Override//生产者对象关闭时候,会调用此方法public void close() {}@Override//创建生产者对象时候调用public void configure(Map<String, ?> map) {}
}

启动下 

生产者数据发送同步或异步

如果需要同步

ACKS数据接收应答处理机制

指的是:

生产者发送数据到 Kafka Broker 时,Kafka 如何处理消息的接收确认。通过设置 ACKS 参数,你可以控制 Kafka 如何在生产者发送消息后确认数据是否成功写入。

ACKS三个配置

ACKS=0  生产者发送数据之后,不等待任何确认,发送了 就认为你可能收到了,丢失不管

ACKS=1  生产者会等待 分区的主副本(Leader)确认消息已经写入到其磁盘中,主副本发送成功确认后,生产者就认为消息已经成功发送。 如果主副本挂了消息仍可能丢失,除非有副本在进行同步

ACKS=all(或ACKS=-1)  等待所有副本确认 消息保证不会丢失  性能会较低,因为生产者需要等待所有副本确认

默认ACKS=-1  

生产者数据重试(重发)功能

例如ACKS=1的情况下   Leader还没来的及将数据保存到磁盘

Broker挂了,此时生产者在等待回调  但是一直没回复,超过等待时间

Kafka退出超时重试机制  retry

可以配置retry重试机制

重试机制带来了好处,也有坏处

例如 broker并没有挂  只是因为网络不稳定    这就产生了数据重复和乱序现象

如何避免数据重复

如果ACSK 1或者-1(就是ALL)就是为了数据不丢失,增强可靠性

如果你禁用重试肯定是不行的

但是重试又会导致数据重复和乱序现象

Kafka提供了生产者幂等性操作:所谓生产者幂等性操作就是 生产者的消息无论向Kafka发送多少次,

Kafka的Leader只会保存一条,默认的幂等性是不起作用的

开启

要启用生产者的幂等性,必须设置以下两个配置:

  • acks=all(或 acks=-1):这要求生产者等待所有副本确认消息已成功写入,确保数据的持久性和一致性。
  • enable.idempotence=true:启用幂等性保证。
  • 且要开启重试处理
  • 在途请求缓冲区数量指的是 Kafka 生产者在发送消息时,等待确认的消息数量默认是5  不能超过5

在途请求缓冲区的数量:max.in.flight.requests.per.connection

幂等性 确保了相同分区内的消息不会重复,但在 多个分区 的情况下,跨分区的消息仍然无法避免乱序

生产者事务操作

事务可以保证生产者 ID 唯一   解决跨会话  每次重启  生产者ID会变化  加了事务可以保持不变

package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置幂等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重试次数configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超时configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事务 事务基于幂等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事务kafkaProducer.initTransactions();try {//开启事务kafkaProducer.beginTransaction();for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中//可以不设置key 默认是按照轮询的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//发送数据  send方法还可以接收一个参数,就是回调函数  kafkaProducer.send(record);是异步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 处理发送失败的情况e.printStackTrace();} else {// 处理发送成功的情况System.out.println("发送成功:" + recordMetadata);}}});send.get();}//提交事务kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事务kafkaProducer.abortTransaction();}finally {//关闭生产者对象kafkaProducer.close();}}
}

添加事务后  生产者默认会创建一个事务topic   默认50个分区

相关文章:

Kafka生产者相关

windows中kafka集群部署示例-CSDN博客 先启动集群或者单机也OK 引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>关于主题创建 理论…...

技术问题汇总:前端怎么往后端传一个数组?

场景 现在一个专门负责复习算法的服务&#xff0c;筛选出了用户今天需要复习的笔记的ids&#xff0c;现在要调用笔记服务根据ids查询的接口。 请问复习服务怎么将ids发到笔记服务&#xff0c;笔记服务怎么接收。 思路 发的时候肯定是用字符串&#xff0c;接收的时候&#xf…...

【03】STM32F407 HAL 库框架设计学习

【03】STM32F407 HAL 库框架设计学习 摘要 本文旨在为初学者提供一个关于STM32F407微控制器HAL&#xff08;Hardware Abstraction Layer&#xff09;库框架设计的详细学习教程。通过本文&#xff0c;读者将从零开始&#xff0c;逐步掌握STM32F407的基本知识、HAL库的配置步骤…...

智能图像处理平台:图像处理配置类

这里我们先修改一下依赖&#xff0c;不用JavaCV&#xff0c;用openCV。 导入依赖&#xff1a; <!-- JavaCV 依赖&#xff0c;用于图像和视频处理 --> <!-- <dependency>--> <!-- <groupId>org.bytedeco</groupId>--> &l…...

【图文详解】什么是微服务?什么是SpringCloud?

目录 一.认识微服务架构 ??微服务带来的挑战 二.微服务解决方案SpringCloud ??SpringCloud的版本 ??SpringCloud和SpringBoot的关系 ??SpringCloud实现方案 Spring Cloud Netfix Spring Cloud Alibaba ??Spring Cloud 实现对比 在入门Spring Cloud 之前&…...

基于ssm的校园跑腿管理系统+vue

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 系统共有管理员、用户两个角色 管理员主要的功能用户信息管理、任务信息管理、任务类型管理、接单信息管理、公告信息管理、投诉信息管理、公告类型管…...

5个GitHub热点开源项目!!

1.自托管 Moonlight 游戏串流服务&#xff1a;Sunshine 主语言&#xff1a;C&#xff0c;Star&#xff1a;14.4k&#xff0c;周增长&#xff1a;500 这是一个自托管的 Moonlight 游戏串流服务器端项目&#xff0c;支持所有 Moonlight 客户端。用户可以在自己电脑上搭建一个游戏…...

docker通用技术介绍

docker通用技术介绍 1.docker介绍 1.1 基本概念 docker是一个开源的容器化平台&#xff0c;用于快速构建、打包、部署和运行应用程序。它通过容器化技术将应用及其依赖环境&#xff08;如代码、库、系统工具等&#xff09;打包成一个标准化、轻量级的独立单元&#xff0c;实…...

#渗透测试#批量漏洞挖掘#某图创图书馆集群管理系统updOpuserPw SQL注入(CVE-2021-44321)

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…...

智能合约安全 | 合约无效化攻击

目录&#xff1a; 智能合约安全 合约无效化攻击 合约自毁函数 selfdestruct 攻击实现 漏洞防御 总结 智能合约安全 合约无效化攻击 合约无效化攻击类同于web安全中的逻辑漏洞中的一种 我们这里拿一个典型的例子来讲解 有这样一份智能合约, 每个人可以向其中发送1 eth 第七个…...

RabbitMQ 的介绍与使用

一. 简介 1> 什么是MQ 消息队列&#xff08;Message Queue&#xff0c;简称MQ&#xff09;&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO先入先出&#xff0c;只不过队列中存放的内容是message而已。 其主要用途&#xff1a;不同进程Process/线程T…...

【手撕算法】K-Means聚类全解析:从数学推导到图像分割实战

摘要 聚类算法是探索数据内在结构的利器&#xff01;本文手撕K-Means核心公式&#xff0c;结合Python代码实现与图像分割案例&#xff0c;详解&#xff1a; ✅ 欧氏距离计算 ✅ 簇中心迭代更新 ✅ 肘部法则优化 目录 摘要 目录 一、算法核心思想 二、数学原理详解 2.1 …...

【SQL技术】不同数据库引擎 SQL 优化方案剖析

一、引言 在数据处理和分析的世界里&#xff0c;SQL 是不可或缺的工具。不同的数据库系统&#xff0c;如 MySQL、PostgreSQL&#xff08;PG&#xff09;、Doris 和 Hive&#xff0c;在架构和性能特点上存在差异&#xff0c;因此针对它们的 SQL 优化策略也各有不同。这些数据库…...

RabbitMQ系列(二)基本概念之Publisher

在 RabbitMQ 中&#xff0c;Publisher&#xff08;发布者&#xff09; 是负责向 RabbitMQ 服务器发送消息的客户端角色&#xff0c;通常被称为“生产者”。以下是其核心功能与工作机制的详细解析&#xff1a; 一、核心定义与作用 消息发送者 Publisher 将消息发送到 RabbitMQ 的…...

OAK相机的抗震性测试

在工业环境中&#xff0c;双目视觉相机必须具备与工作环境同等的坚固性。鉴于部分客户会将我们的相机应用于恶劣环境&#xff08;例如安装在重型机械上&#xff09;&#xff0c;我们依据EN 60068-2-6:2008标准对相机进行了振动耐受性测试。 测试涉及的相机型号包括&#xff1a…...

2025最新Nginx高频面试题

2025最新Nginx高频面试题 摘要&#xff1a;本文整理了2025年企业高频Nginx面试题&#xff0c;覆盖核心原理、配置优化、安全防护及云原生场景实战&#xff0c;助你轻松应对技术面试&#xff01; 核心原理篇 1. Nginx的Master-Worker架构优势是什么&#xff1f; 答案&#xf…...

【Kubernetes】API server 限流 之 maxinflight.go

这个文件实现了一个基于信号量(Channel)的简单限流器。 基础知识 总共有四种channel 带缓冲的channel nonMutatingChan、mutatingChan 都是带缓冲的channel &#xff0c;这类channel 的特点是&#xff1a; 这允许最多 mutatingLimit /nonMutatingLimit 个请求同时获取令牌并执…...

推荐算法工程师的技术图谱和学习路径

推荐算法工程师的技术图谱和学习路径可以从多个维度进行概述,可以总结如下: 一、技术图谱 推荐算法工程师需要掌握的技术栈主要分为以下几个方面: 数学基础: 微积分、线性代数、概率论与统计学是推荐算法的基础,用于理解模型的数学原理和优化算法。高等数学、最优化理论…...

Milvus高性能向量数据库与大模型结合

Milvus | 高性能向量数据库&#xff0c;为规模而构建Milvus 是一个为 GenAI 应用构建的开源向量数据库。使用 pip 安装&#xff0c;执行高速搜索&#xff0c;并扩展到数十亿个向量。https://milvus.io/zh Milvus 是什么&#xff1f; Milvus 是一种高性能、高扩展性的向量数据…...

轮式机器人在复杂地形中如何选择合适的全局路径规划算法?

已思考完成 收起 嗯&#xff0c;用户问的是轮式机器人在复杂地形中如何选择合适的全局路径规划算法。首先&#xff0c;我需要理解复杂地形可能带来的挑战&#xff0c;比如崎岖的路面、动态障碍物、非结构化环境等等。轮式机器人在这里的运动控制需要考虑地形通过性、稳定性&…...

一多操作系统的生命体架构与当前主流开发语言的区别

这套架构与当前主流开发语言的区别&#xff0c;本质上就是**“造物主”与“工匠”**的区别。 目前的编程语言&#xff08;无论是 C、Java 还是 Python&#xff09;都是在教计算机**“怎么做”&#xff08;How&#xff09;&#xff0c;而一多 OS 的生物学构架是在告诉系统“要什…...

机器学习——聚类评价指标SSE、SC、CH演示案例

一.评价指标简介SSE考虑了簇内因素SSE越越小越好SSE&#xff0b;肘部法常用来确定聚类的最佳K值SC轮廓系数法考虑了簇内和簇间因素&#xff0c;数值越大越好CH考虑簇内&#xff0c;簇间以及K值因素&#xff0c;数值越大越好二.代码部分详解1.SSE&#xff0b;肘部法#1.演示SSE&a…...

对比直接使用厂商api体验taotoken在延迟与可用性上的差异

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 对比直接使用厂商 API 体验 Taotoken 在延迟与可用性上的差异 在构建依赖大模型能力的应用时&#xff0c;开发者通常会直接调用特定…...

通过Taotoken用量看板分析团队月度大模型API消费明细

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 通过Taotoken用量看板分析团队月度大模型API消费明细 对于团队管理者而言&#xff0c;清晰、透明地掌握大模型API的消费情况是项目…...

掌握AI写教材方法,低查重工具让教材编写变得如此简单!

许多教材编写者常感到失落&#xff0c;因为经过反复琢磨的教材内容&#xff0c;在缺乏相应的辅助资源时&#xff0c;教学效果往往大打折扣。课后练习的题型设计需要有层次感&#xff0c;但往往缺乏创新灵感&#xff1b;想要制作出直观的教学课件&#xff0c;却没有技术来实现&a…...

VideoDownloadHelper:打破视频下载壁垒的智能浏览器插件

VideoDownloadHelper&#xff1a;打破视频下载壁垒的智能浏览器插件 【免费下载链接】VideoDownloadHelper Chrome Extension to Help Download Video for Some Video Sites. 项目地址: https://gitcode.com/gh_mirrors/vi/VideoDownloadHelper 在信息爆炸的时代&#x…...

深入解析CPU L1/L2缓存:原理、性能影响与编程优化实战

1. 项目概述&#xff1a;从“快”字说起做性能调优或者写高性能代码的朋友&#xff0c;对“缓存”这个词一定不陌生。我们总在说&#xff0c;把数据放进缓存里&#xff0c;访问就快了。但缓存本身&#xff0c;尤其是离CPU核心最近的一级缓存&#xff08;L1 Cache&#xff09;和…...

如何利用Chanlun-Pro实现智能缠论量化交易:3步掌握市场结构识别

如何利用Chanlun-Pro实现智能缠论量化交易&#xff1a;3步掌握市场结构识别 【免费下载链接】chanlun-pro 基于缠中说禅所讲缠论理论&#xff0c;以便量化分析市场行情的工具 项目地址: https://gitcode.com/gh_mirrors/ch/chanlun-pro 在金融市场日益复杂的今天&#x…...

EXCEL文件展示MLP的计算过程

MLP 实现步骤&#xff08;共 5 步&#xff09; 步骤 1&#xff1a;输入层数据准备 在表格中输入两个特征值 x1、x2&#xff0c;作为 MLP 的输入。本次使用&#xff1a;x10.5&#xff0c;x20.8步骤 2&#xff1a;设置网络参数&#xff08;权重 偏置&#xff09; 手动设置输入层…...

Bebas Neue 开源字体深度解析:几何美学的技术实现与实战应用

Bebas Neue 开源字体深度解析&#xff1a;几何美学的技术实现与实战应用 【免费下载链接】Bebas-Neue Bebas Neue font 项目地址: https://gitcode.com/gh_mirrors/be/Bebas-Neue Bebas Neue 作为全球最受欢迎的开源几何无衬线字体&#xff0c;以其极简设计、高度统一的…...