kafka-生产者监听器(SpringBoot整合Kafka)
文章目录
- 1、生产者监听器
- 1.1、创建生产者监听器
- 1.2、创建生产者拦截器
- 1.3、发送消息测试
- 1.4、使用Java代码创建主题分区副本
- 1.5、application.yml配置----v1版
- 1.6、屏蔽 kafka debug 日志 logback.xml
- 1.7、引入spring-kafka依赖
- 1.8、控制台日志
1、生产者监听器
1.1、创建生产者监听器
package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1 leader落盘 broker ack之后 才成功//ack为 -1 分区所有副本全部落盘 broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}
1.2、创建生产者拦截器
package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
1.3、发送消息测试
package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}
}
1.4、使用Java代码创建主题分区副本
package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}
1.5、application.yml配置----v1版
server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 0 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
1.6、屏蔽 kafka debug 日志 logback.xml
<configuration> <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>
1.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
1.8、控制台日志
生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器
消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717573749549
MyKafkaProducerListener消息发送成功:topic=my_topic1,partition = null,key = null,value = spring-kafka-生产者监听器,offset = 0


[[{"partition": 0,"offset": 0,"msg": "spring-kafka-生产者监听器","timespan": 1717573749549,"date": "2024-06-05 07:49:09"}]
]
相关文章:
kafka-生产者监听器(SpringBoot整合Kafka)
文章目录 1、生产者监听器1.1、创建生产者监听器1.2、创建生产者拦截器1.3、发送消息测试1.4、使用Java代码创建主题分区副本1.5、application.yml配置----v1版1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、控制台日志 1、生产者监听器 1.1、创建生产…...
3D感知视觉表示与模型分析:深入探究视觉基础模型的三维意识
在深度学习与大规模预训练的推动下,视觉基础模型展现出了令人印象深刻的泛化能力。这些模型不仅能够对任意图像进行分类、分割和生成,而且它们的中间表示对于其他视觉任务,如检测和分割,同样具有强大的零样本能力。然而࿰…...
VS2019+QT5.15调用动态库dll带有命名空间
VS2019QT5.15调用动态库dll带有命名空间 vs创建动态库 参考: QT调用vs2019生成的c动态库-CSDN博客 demo的dll头文件: // 下列 ifdef 块是创建使从 DLL 导出更简单的 // 宏的标准方法。此 DLL 中的所有文件都是用命令行上定义的 DLL3_EXPORTS // 符号…...
助力草莓智能自动化采摘,基于YOLOv5全系列【n/s/m/l/x】参数模型开发构建果园种植采摘场景下草莓成熟度智能检测识别系统
随着科技的飞速发展,人工智能(AI)技术已经渗透到我们生活的方方面面,从智能家居到自动驾驶,再到医疗健康,其影响力无处不在。然而,当我们把目光转向中国的农业领域时,一个令人惊讶的…...
C++中的生成器模式
目录 生成器模式(Builder Pattern) 实际应用 构建一辆汽车 构建一台计算机 构建一个房子 总结 生成器模式(Builder Pattern) 生成器模式是一种创建型设计模式,它允许你分步骤创建复杂对象。与其他创建型模式不同…...
基于python的PDF文件解析器汇总
基于python的PDF文件解析器汇总 大多数已发表的科学文献目前以 PDF 格式存在,这是一种轻量级、普遍的文件格式,能够保持一致的文本布局和格式。对于人类读者而言, PDF格式的文件内容展示整洁且一致的布局有助于阅读,可以很容易地…...
C++多线程同步总结
C多线程同步总结 关于C多线程同步 一、C11规范下的线程库 1、C11 线程库的基本用法:创建线程、分离线程 #include<iostream> #include<thread> #include<windows.h> using namespace std; void threadProc() {cout<<"this is in t…...
【机器学习】基于CNN-RNN模型的验证码图片识别
1. 引言 1.1. OCR技术研究的背景 1.1.1. OCR技术能够提升互联网体验 随着互联网应用的广泛普及,用户在日常操作中频繁遇到需要输入验证码的场景,无论是在登录、注册、支付还是其他敏感操作中,验证码都扮演着重要角色来确保安全性。然而&am…...
一文读懂Samtec分离式线缆组件选型 | 快速攻略
【摘要/前言】 2023年,全球线缆组件市场规模大致在2100多亿美元。汽车和电信行业是线缆组件最大的两个市场,中国和北美是最大的两个制造地区。有趣的是,特定应用(即定制)和矩形组件是两个最大的产品组。 【Samtec产品…...
批量申请SSL证书如何做到既方便成本又最低
假如您手头拥有1千个域名,并且打算为每一个域名搭建网站,那么在当前的网络环境下,您必须确保这些网站通过https的方式提供服务。这意味着,您将为每一个域名申请SSL证书,以确保网站数据传输的安全性和可信度。那么&…...
Python 设计模式(创建型)
文章目录 抽象工厂模式场景示例 单例模式场景实现方式 工厂方法模式场景示例 简单工厂模式场景示例 建造者模式场景示例 原型模式场景示例 抽象工厂模式 抽象工厂模式(Abstract Factory Pattern)是一种创建型设计模式,它提供了一种将一组相关…...
PyTorch 索引与切片-Tensor基本操作
以如下 tensor a 为例,展示常用的 indxing, slicing 及其他高阶操作 >>> a torch.rand(4,3,28,28) >>> a.shape torch.Size([4, 3, 28, 28])Indexing: 使用索引获取目标对象,[x,x,x,....] >>> a[0].shape torch.Size([3, 2…...
深入浅出 LangChain 与智能 Agent:构建下一代 AI 助手
我们小时候都玩过乐高积木。通过堆砌各种颜色和形状的积木,我们可以构建出城堡、飞机、甚至整个城市。现在,想象一下如果有一个数字世界的乐高,我们可以用这样的“积木”来构建智能程序,这些程序能够阅读、理解和撰写文本…...
scss是什么安装使⽤的步骤
当谈到SCSS时,我们首先需要了解它是什么。SCSS,也称为Sassy CSS,是Sass(Syntactically Awesome Stylesheets)的一种语法,它是CSS的预处理器,允许你使用变量、嵌套规则、混合(mixin&a…...
Pspark从hive读数据写到Pgsql数据库
前提条件 要使用PySpark从Hive读取数据并写入到PostgreSQL数据库,你需要确保以下几点: 你的PySpark环境已经配置好,并且能够连接到你的Hive数据。 PostgreSQL JDBC驱动程序已经添加到你的PySpark环境中。 你已经在PostgreSQL中创建好了相应…...
Pixi.js学习 (六)数组
目录 前言 一、数组 1.1 定义数组 1.2 数组存取与删除 1.3 使用数组统一操作敌机 二、实战 例题一:使用数组统一操作敌机 例题一代码: 总结 前言 为了提高作者的代码编辑水品,作者在使用博客的时候使用的集成工具为 HBuilderX。 下文所有截…...
操作系统复习-Linux的文件系统
文件系统概述 FAT FAT(File Allocation Table)FAT16、FAT32等,微软Dos/Windows使用的文件系统使用一张表保存盘块的信息 NTFS NTFS (New Technology File System)WindowsNT环境的文件系统NTFS对FAT进行了改进,取代了日的文件系统 EXT EXT(Extended…...
代码随想录算法训练营第三十六天| 860.柠檬水找零、 406.根据身高重建队列、 452. 用最少数量的箭引爆气球
LeetCode 860.柠檬水找零 题目链接:https://leetcode.cn/problems/lemonade-change/description/ 文章链接:https://programmercarl.com/0860.%E6%9F%A0%E6%AA%AC%E6%B0%B4%E6%89%BE%E9%9B%B6.html 思路 贪心算法:遇见20的时候有两种找零的…...
如何在C#中实现多线程
在C#中实现多线程有多种方式,包括使用System.Threading.Thread类、System.Threading.Tasks.Task类、System.Threading.Tasks.Parallel类以及异步编程模型(async和await)。下面我将为你展示每种方法的基本用法。 1. 使用System.Threading.Thread类 using System; using Syst…...
【LLM】快速了解Dify 0.6.10的核心功能:知识库检索、Agent创建和工作流编排(二)
【LLM】快速了解Dify 0.6.10的核心功能:知识库检索、Agent创建和工作流编排(二) 文章目录 【LLM】快速了解Dify 0.6.10的核心功能:知识库检索、Agent创建和工作流编排(二)一、创建一个简单的聊天助手&#…...
【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
装饰模式(Decorator Pattern)重构java邮件发奖系统实战
前言 现在我们有个如下的需求,设计一个邮件发奖的小系统, 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...
