当前位置: 首页 > news >正文

Kafka生产者相关

windows中kafka集群部署示例-CSDN博客

先启动集群或者单机也OK

引入依赖

    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>

关于主题创建

理论来讲创建主题(Topic是Kafka的内部操作),无论生产者或是消费者都不能主动创建主题.

没有主题就不能生产数据

但是往往看到生产者可以创建主题,原因是kafka的内部自动创建主题机制,当生产者中有个管理员,没有该主题就会自动创建

auto.create.topics.enable 默认是true  如果改成false  那么生产者就无法创建了

因此主题是kafka的自动创建主题的机制来实现的,而非生产者创建主题

生产者利用kafka自动创建主题的机制来创建主题...........................................................................

/*** @author hrui* @date 2025/2/26 12:53*/
public class AdminTopicTest {public static void main(String[] args) {Map<String,Object> confMap=new HashMap<>();//例如我的集群是9091  9092 9093  这里无需关心具体连接哪个端口  随意一个端口confMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9091");//管理员对象Admin admin=Admin.create(confMap);/*** 构建主题的三个参数* 第一个参数:主题名称* 第二个参数:分区数量* 第三个参数:副本数量(short类型)*/NewTopic newTopic=new NewTopic("test1",1, (short) 1);//创建主题CreateTopicsResult topics = admin.createTopics(Arrays.asList(newTopic));//关闭管理者对象admin.close();}
}

NewTopic("test1",这里可以传个Map);可以自定义主题分区副本策略   不指定就默认

生产者流程图

生产者大致代码

public class KafkaProducerTest {public static void main(String[] args) {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);kafkaProducer.send(record);}//关闭生产者对象kafkaProducer.close();}
}

生产者拦截器

可以对照流程图,看下生产者拦截器在什么位置,一般是对Key   value的整理转换,对生产的数据做统一规范化处理,可以配置多个

可以点进去

大致就是这么个过程

遍历 拦截器  并调用每个拦截器的onSend方法

可以看到每个拦截器都是ProducerInterceptor类型

自定义生产者拦截器

自定义一个类实现ProducerInterceptor

在创建生产者时候添加拦截器配置

生产者拦截器

package com.hrui.interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @author hrui* @date 2025/2/26 14:20*/
public class ValueInterceptor implements ProducerInterceptor<String,String> {@Override//发送数据的时候,会调用public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("拦截器拦截到消息:"+producerRecord.value());return new ProducerRecord<>(producerRecord.topic(),producerRecord.key(),producerRecord.value()+"-拦截器");}@Override//发送数据完毕,服务器返回的响应,会调用此方法public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}@Override//生产者对象关闭时候,会调用此方法public void close() {}@Override//创建生产者对象时候调用public void configure(Map<String, ?> map) {}
}

启动下 

生产者数据发送同步或异步

如果需要同步

ACKS数据接收应答处理机制

指的是:

生产者发送数据到 Kafka Broker 时,Kafka 如何处理消息的接收确认。通过设置 ACKS 参数,你可以控制 Kafka 如何在生产者发送消息后确认数据是否成功写入。

ACKS三个配置

ACKS=0  生产者发送数据之后,不等待任何确认,发送了 就认为你可能收到了,丢失不管

ACKS=1  生产者会等待 分区的主副本(Leader)确认消息已经写入到其磁盘中,主副本发送成功确认后,生产者就认为消息已经成功发送。 如果主副本挂了消息仍可能丢失,除非有副本在进行同步

ACKS=all(或ACKS=-1)  等待所有副本确认 消息保证不会丢失  性能会较低,因为生产者需要等待所有副本确认

默认ACKS=-1  

生产者数据重试(重发)功能

例如ACKS=1的情况下   Leader还没来的及将数据保存到磁盘

Broker挂了,此时生产者在等待回调  但是一直没回复,超过等待时间

Kafka退出超时重试机制  retry

可以配置retry重试机制

重试机制带来了好处,也有坏处

例如 broker并没有挂  只是因为网络不稳定    这就产生了数据重复和乱序现象

如何避免数据重复

如果ACSK 1或者-1(就是ALL)就是为了数据不丢失,增强可靠性

如果你禁用重试肯定是不行的

但是重试又会导致数据重复和乱序现象

Kafka提供了生产者幂等性操作:所谓生产者幂等性操作就是 生产者的消息无论向Kafka发送多少次,

Kafka的Leader只会保存一条,默认的幂等性是不起作用的

开启

要启用生产者的幂等性,必须设置以下两个配置:

  • acks=all(或 acks=-1):这要求生产者等待所有副本确认消息已成功写入,确保数据的持久性和一致性。
  • enable.idempotence=true:启用幂等性保证。
  • 且要开启重试处理
  • 在途请求缓冲区数量指的是 Kafka 生产者在发送消息时,等待确认的消息数量默认是5  不能超过5

在途请求缓冲区的数量:max.in.flight.requests.per.connection

幂等性 确保了相同分区内的消息不会重复,但在 多个分区 的情况下,跨分区的消息仍然无法避免乱序

生产者事务操作

事务可以保证生产者 ID 唯一   解决跨会话  每次重启  生产者ID会变化  加了事务可以保持不变

package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置幂等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重试次数configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超时configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事务 事务基于幂等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事务kafkaProducer.initTransactions();try {//开启事务kafkaProducer.beginTransaction();for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中//可以不设置key 默认是按照轮询的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//发送数据  send方法还可以接收一个参数,就是回调函数  kafkaProducer.send(record);是异步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 处理发送失败的情况e.printStackTrace();} else {// 处理发送成功的情况System.out.println("发送成功:" + recordMetadata);}}});send.get();}//提交事务kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事务kafkaProducer.abortTransaction();}finally {//关闭生产者对象kafkaProducer.close();}}
}

添加事务后  生产者默认会创建一个事务topic   默认50个分区

相关文章:

Kafka生产者相关

windows中kafka集群部署示例-CSDN博客 先启动集群或者单机也OK 引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>关于主题创建 理论…...

技术问题汇总:前端怎么往后端传一个数组?

场景 现在一个专门负责复习算法的服务&#xff0c;筛选出了用户今天需要复习的笔记的ids&#xff0c;现在要调用笔记服务根据ids查询的接口。 请问复习服务怎么将ids发到笔记服务&#xff0c;笔记服务怎么接收。 思路 发的时候肯定是用字符串&#xff0c;接收的时候&#xf…...

【03】STM32F407 HAL 库框架设计学习

【03】STM32F407 HAL 库框架设计学习 摘要 本文旨在为初学者提供一个关于STM32F407微控制器HAL&#xff08;Hardware Abstraction Layer&#xff09;库框架设计的详细学习教程。通过本文&#xff0c;读者将从零开始&#xff0c;逐步掌握STM32F407的基本知识、HAL库的配置步骤…...

智能图像处理平台:图像处理配置类

这里我们先修改一下依赖&#xff0c;不用JavaCV&#xff0c;用openCV。 导入依赖&#xff1a; <!-- JavaCV 依赖&#xff0c;用于图像和视频处理 --> <!-- <dependency>--> <!-- <groupId>org.bytedeco</groupId>--> &l…...

【图文详解】什么是微服务?什么是SpringCloud?

目录 一.认识微服务架构 ??微服务带来的挑战 二.微服务解决方案SpringCloud ??SpringCloud的版本 ??SpringCloud和SpringBoot的关系 ??SpringCloud实现方案 Spring Cloud Netfix Spring Cloud Alibaba ??Spring Cloud 实现对比 在入门Spring Cloud 之前&…...

基于ssm的校园跑腿管理系统+vue

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 系统共有管理员、用户两个角色 管理员主要的功能用户信息管理、任务信息管理、任务类型管理、接单信息管理、公告信息管理、投诉信息管理、公告类型管…...

5个GitHub热点开源项目!!

1.自托管 Moonlight 游戏串流服务&#xff1a;Sunshine 主语言&#xff1a;C&#xff0c;Star&#xff1a;14.4k&#xff0c;周增长&#xff1a;500 这是一个自托管的 Moonlight 游戏串流服务器端项目&#xff0c;支持所有 Moonlight 客户端。用户可以在自己电脑上搭建一个游戏…...

docker通用技术介绍

docker通用技术介绍 1.docker介绍 1.1 基本概念 docker是一个开源的容器化平台&#xff0c;用于快速构建、打包、部署和运行应用程序。它通过容器化技术将应用及其依赖环境&#xff08;如代码、库、系统工具等&#xff09;打包成一个标准化、轻量级的独立单元&#xff0c;实…...

#渗透测试#批量漏洞挖掘#某图创图书馆集群管理系统updOpuserPw SQL注入(CVE-2021-44321)

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…...

智能合约安全 | 合约无效化攻击

目录&#xff1a; 智能合约安全 合约无效化攻击 合约自毁函数 selfdestruct 攻击实现 漏洞防御 总结 智能合约安全 合约无效化攻击 合约无效化攻击类同于web安全中的逻辑漏洞中的一种 我们这里拿一个典型的例子来讲解 有这样一份智能合约, 每个人可以向其中发送1 eth 第七个…...

RabbitMQ 的介绍与使用

一. 简介 1> 什么是MQ 消息队列&#xff08;Message Queue&#xff0c;简称MQ&#xff09;&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO先入先出&#xff0c;只不过队列中存放的内容是message而已。 其主要用途&#xff1a;不同进程Process/线程T…...

【手撕算法】K-Means聚类全解析:从数学推导到图像分割实战

摘要 聚类算法是探索数据内在结构的利器&#xff01;本文手撕K-Means核心公式&#xff0c;结合Python代码实现与图像分割案例&#xff0c;详解&#xff1a; ✅ 欧氏距离计算 ✅ 簇中心迭代更新 ✅ 肘部法则优化 目录 摘要 目录 一、算法核心思想 二、数学原理详解 2.1 …...

【SQL技术】不同数据库引擎 SQL 优化方案剖析

一、引言 在数据处理和分析的世界里&#xff0c;SQL 是不可或缺的工具。不同的数据库系统&#xff0c;如 MySQL、PostgreSQL&#xff08;PG&#xff09;、Doris 和 Hive&#xff0c;在架构和性能特点上存在差异&#xff0c;因此针对它们的 SQL 优化策略也各有不同。这些数据库…...

RabbitMQ系列(二)基本概念之Publisher

在 RabbitMQ 中&#xff0c;Publisher&#xff08;发布者&#xff09; 是负责向 RabbitMQ 服务器发送消息的客户端角色&#xff0c;通常被称为“生产者”。以下是其核心功能与工作机制的详细解析&#xff1a; 一、核心定义与作用 消息发送者 Publisher 将消息发送到 RabbitMQ 的…...

OAK相机的抗震性测试

在工业环境中&#xff0c;双目视觉相机必须具备与工作环境同等的坚固性。鉴于部分客户会将我们的相机应用于恶劣环境&#xff08;例如安装在重型机械上&#xff09;&#xff0c;我们依据EN 60068-2-6:2008标准对相机进行了振动耐受性测试。 测试涉及的相机型号包括&#xff1a…...

2025最新Nginx高频面试题

2025最新Nginx高频面试题 摘要&#xff1a;本文整理了2025年企业高频Nginx面试题&#xff0c;覆盖核心原理、配置优化、安全防护及云原生场景实战&#xff0c;助你轻松应对技术面试&#xff01; 核心原理篇 1. Nginx的Master-Worker架构优势是什么&#xff1f; 答案&#xf…...

【Kubernetes】API server 限流 之 maxinflight.go

这个文件实现了一个基于信号量(Channel)的简单限流器。 基础知识 总共有四种channel 带缓冲的channel nonMutatingChan、mutatingChan 都是带缓冲的channel &#xff0c;这类channel 的特点是&#xff1a; 这允许最多 mutatingLimit /nonMutatingLimit 个请求同时获取令牌并执…...

推荐算法工程师的技术图谱和学习路径

推荐算法工程师的技术图谱和学习路径可以从多个维度进行概述,可以总结如下: 一、技术图谱 推荐算法工程师需要掌握的技术栈主要分为以下几个方面: 数学基础: 微积分、线性代数、概率论与统计学是推荐算法的基础,用于理解模型的数学原理和优化算法。高等数学、最优化理论…...

Milvus高性能向量数据库与大模型结合

Milvus | 高性能向量数据库&#xff0c;为规模而构建Milvus 是一个为 GenAI 应用构建的开源向量数据库。使用 pip 安装&#xff0c;执行高速搜索&#xff0c;并扩展到数十亿个向量。https://milvus.io/zh Milvus 是什么&#xff1f; Milvus 是一种高性能、高扩展性的向量数据…...

轮式机器人在复杂地形中如何选择合适的全局路径规划算法?

已思考完成 收起 嗯&#xff0c;用户问的是轮式机器人在复杂地形中如何选择合适的全局路径规划算法。首先&#xff0c;我需要理解复杂地形可能带来的挑战&#xff0c;比如崎岖的路面、动态障碍物、非结构化环境等等。轮式机器人在这里的运动控制需要考虑地形通过性、稳定性&…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

CSS设置元素的宽度根据其内容自动调整

width: fit-content 是 CSS 中的一个属性值&#xff0c;用于设置元素的宽度根据其内容自动调整&#xff0c;确保宽度刚好容纳内容而不会超出。 效果对比 默认情况&#xff08;width: auto&#xff09;&#xff1a; 块级元素&#xff08;如 <div>&#xff09;会占满父容器…...

Linux 内存管理实战精讲:核心原理与面试常考点全解析

Linux 内存管理实战精讲&#xff1a;核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用&#xff0c;还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)

引言 工欲善其事&#xff0c;必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后&#xff0c;我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集&#xff0c;就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...

Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、Spring MVC与MyBatis技术解析 一、第一轮基础概念问题 1. Spring框架的核心容器是什么&#xff1f;它的作用是什么&#xff1f; Spring框架的核心容器是IoC&#xff08;控制反转&#xff09;容器。它的主要作用是管理对…...

【Veristand】Veristand环境安装教程-Linux RT / Windows

首先声明&#xff0c;此教程是针对Simulink编译模型并导入Veristand中编写的&#xff0c;同时需要注意的是老用户编译可能用的是Veristand Model Framework&#xff0c;那个是历史版本&#xff0c;且NI不会再维护&#xff0c;新版本编译支持为VeriStand Model Generation Suppo…...