当前位置: 首页 > news >正文

kafka-生产者拦截器(SpringBoot整合Kafka)

文章目录

  • 1、生产者拦截器
    • 1.1、创建生产者拦截器
    • 1.2、KafkaTemplate配置生产者拦截器
    • 1.3、使用Java代码创建主题分区副本
    • 1.4、application.yml配置----v1版
    • 1.5、屏蔽 kafka debug 日志 logback.xml
    • 1.6、引入spring-kafka依赖
    • 1.7、控制台日志

1、生产者拦截器

1.1、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

1.2、KafkaTemplate配置生产者拦截器

package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者拦截器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}
}

1.3、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}

1.4、application.yml配置----v1版

server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 0 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

1.5、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.6、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.7、控制台日志

生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者拦截器
消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717490776329
[[{"partition": 0,"offset": 0,"msg": "spring-kafka-生产者拦截器","timespan": 1717490776329,"date": "2024-06-04 08:46:16"}]
]

在这里插入图片描述

相关文章:

kafka-生产者拦截器(SpringBoot整合Kafka)

文章目录 1、生产者拦截器1.1、创建生产者拦截器1.2、KafkaTemplate配置生产者拦截器1.3、使用Java代码创建主题分区副本1.4、application.yml配置----v1版1.5、屏蔽 kafka debug 日志 logback.xml1.6、引入spring-kafka依赖1.7、控制台日志 1、生产者拦截器 1.1、创建生产者拦…...

每日一题:聊聊 Redis 过期键的删除策略

聊聊 Redis 过期键的删除策略 答案 惰性删除 &#xff1a;只会在取出 key 的时候才对数据进行过期检查&#xff1b;这样对 CPU 最友好&#xff0c;但是可能会造成太多过期 key 没有被删除&#xff08;占用内存&#xff09;。 通过定时器实现&#xff08;时间事件&#xff09;&…...

边缘计算的AI小板——OrangePi AI Pro

简介 OrangePi AI Pro是一款基于Allwinner H6处理器的嵌入式AI计算设备&#xff0c;适用于物联网和边缘计算。它具有强大的性能、低功耗、多接口和小尺寸。 本文分为三个部分&#xff1a; 一、对该板进行简单的开箱介绍。 二、 将SD卡中的系统迁移到由于该板支持SD卡、SSD…...

RDMA (2)

iWARP(RDMA)怎么工作的 招式1:bypass内核 非iWARP时,当应用向网络适配器发出读或者写命令时,命令穿过用户空间以及内核空间,因此需要在用户空间和内核空间间进行切换。 iWARP使用RDMA,让应用直接将命令送达到网络适配器。这规避了对内核的调用,减少了开销和延迟。 招式2…...

vue.config.js中,devServer对象用于配置开发服务器的行为

devServer: {hot: true, // 启用模块热替换&#xff08;Hot Module Replacement&#xff0c;HMR&#xff09;。liveReload: true, // 启用页面自动刷新。当热更新失败时&#xff0c;将回退到页面自动刷新。open: true, // 启动服务器后自动打开浏览器。port: 8080, // 设置开发…...

JVM 运行流程

JVM 是 Java 运行的基础&#xff0c;也是实现一次编译到处执行的关键&#xff0c;那么 JVM 是如何执行的呢&#xff1f; JVM 执行流程 程序在执行之前先要把java代码转换成字节码&#xff08;class 文件&#xff09;&#xff0c; JVM 首先需要把字节码通过一定的 方式 类加…...

android-JNI

1.2【静态库】的特点&#xff1a; &#xff08;.a&#xff09; ①静态库对函数库的链接是在编译期完成的。执行期间代码装载速度快。 ②使可执行文件变大&#xff0c;浪费空间和资源&#xff08;占空间&#xff09;。 ③对程序的更新、部署与发布不方便&#xff0c;需要全量更新…...

Go_unsafe包

是什么&#xff1f;为什么&#xff1f; 如何利用unsafe包修改私有成员&#xff1f; 结构体会被分配到一块连续的内存&#xff0c;结构体的地址也代表第一个成员的地址。 如何利用unsafe包获取slice和map的长度&#xff1f; // 利用unsafe包修改私有成员 type S struct {name …...

【HarmonyOS4学习笔记】《HarmonyOS4+NEXT星河版入门到企业级实战教程》课程学习笔记(十三)

课程地址&#xff1a; 黑马程序员HarmonyOS4NEXT星河版入门到企业级实战教程&#xff0c;一套精通鸿蒙应用开发 &#xff08;本篇笔记对应课程第 20 - 21节&#xff09; P20《19.ArkUI-属性动画和显式动画》 本节先来学习属性动画和显式动画&#xff1a; 在代码中定义动画&am…...

企业建站响应式网站建设平台版源码系统 海量模版可选择 带完整的安装代码以及搭建教程

系统概述 企业建站响应式网站建设平台版源码系统是一款集创新性、实用性和便捷性于一体的建站解决方案。它旨在为用户提供一站式的网站建设服务&#xff0c;无论你是新手还是经验丰富的开发者&#xff0c;都能通过该系统轻松实现网站的构建与部署。 该系统采用先进的技术架构…...

在 VSCode 中搭建 Flutter 开发环境并运行项目

要在 Visual Studio Code (VSCode) 中运行 Flutter 项目并启动虚拟机&#xff08;例如 Android Emulator&#xff09;&#xff0c;可以按照以下步骤进行设置和操作&#xff1a; 一、安装 Flutter 和 Dart 插件 安装 Flutter SDK&#xff1a; 前往 Flutter 官网 下载并安装 Flu…...

如何执行VMware P2V迁移|VMware Converter和替代方案

VMware中的P2V是什么&#xff1f; 我们常说的VMware P2V其实指的就是“物理到虚拟”&#xff0c;将工作负载从物理机器转换或迁移到虚拟机&#xff08;VM&#xff09;的过程&#xff0c;能够使您无需从头开始费力地创建和配置新虚拟机。 就像您可以使用Disk2vhd执行Hyper-V物理…...

03-3.2.3 队列的链式存储的实现

&#x1f44b; Hi, I’m Beast Cheng&#x1f440; I’m interested in photography, hiking, landscape…&#x1f331; I’m currently learning python, javascript, kotlin…&#x1f4eb; How to reach me --> 458290771qq.com 喜欢《数据结构》部分笔记的小伙伴可以订…...

Spring AI 第二讲 之 Chat Model API 第八节Anthropic 3 Chat

Anthropic Claude 是一系列基础人工智能模型&#xff0c;可用于各种应用。对于开发人员和企业来说&#xff0c;您可以利用 API 访问&#xff0c;直接在 Anthropic 的人工智能基础架构之上进行构建。 Spring AI 支持用于同步和流式文本生成的 Anthropic 消息 API。 Anthropic …...

【ARM 常见汇编指令学习 6.2 -- ARMv8 汇编指令 SDIV 详细介绍】

文章目录 SDIV指令格式使用示例注意事项总结 SDIV ARMv8 架构中的 SDIV 指令用于执行带符号整数除法操作。这意味着它可以处理负数除法&#xff0c;与 UDIV&#xff08;执行无符号整数除法&#xff09;形成对比。SDIV 将两个寄存器中的带符号整数相除&#xff0c;将除法结果存…...

【ArcGIS微课1000例】0113:大地测量要素概述与构建

文章目录 一、大地测量要素描述1. 大地要素的概念2. 大地要素的类型二、创建大地测量要素1. 创建要素类2. 创建大地要素一、大地测量要素描述 1. 大地要素的概念 大地测量要素的测量值考虑了投影空间的固有变形。如果要创建一个空间跨度较大的要素(例如一条横跨大洋的飞行路…...

【记录】LangChain+本地模型的文档问答(webUI)

已在notebook测试无误。 包安装 pip install langchain langchain_community transformers InstructorEmbedding sentence_transformers2.2.2 faiss-gpu PyPDF2 streamlit pyngrok gradio fitz frontend 环境变量设置 huggingface连不上无法下载模型&#xff0c;需要设置镜像。…...

Winddow系统下关于Golang使用Cgo的配置

1.配置CGO_ENABLED为1 go env -w CGO_ENABLED1 2.安装gcc环境&#xff0c;否则出现cgo: C compiler "gcc" not found: exec: "gcc": executable file not found in %PATH%错误 安装包&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1sgF9lijqGeP…...

python面向过程与初始面向对象编程

让我们穿越到《龙珠》世界&#xff0c;一起揭开 面向对象编程 的神秘面纱吧。 面向过程编程与面向对象编程 天下第一武道会 选手登记 第 22 届天下第一武道会即将召开&#xff0c;各路武术高手齐聚一堂&#xff0c;其中最受瞩目的&#xff0c;当属卡卡罗特&#xff08;孙悟…...

vue3 实现自定义指令封装 --- 通俗易懂

1、局部自定义指令 1.1 在<script setup>定义组件内的指令&#xff0c;任何以v开头的驼峰式命名的变量都可以被用作一个自定义指令 <template><div><h3>使用自定义指令</h3><div>########################## start 局部自定义指令</d…...

不止于JWT:用FastAPI的Depends实现细粒度权限控制

&#x1f4cc; 本文摘要 很多FastAPI初学者把JWT认证当成权限控制的终点&#xff0c;结果上线后频繁出现越权操作。本文通过一个真实的“多租户Todo”案例&#xff0c;带你从0搭建基于角色的访问控制&#xff08;RBAC&#xff09;和数据级权限&#xff08;ABAC&#xff09;&…...

终极Python自动化抢票神器:如何用DamaiHelper告别演唱会门票焦虑

终极Python自动化抢票神器&#xff1a;如何用DamaiHelper告别演唱会门票焦虑 【免费下载链接】DamaiHelper 大麦网演唱会演出抢票脚本。 项目地址: https://gitcode.com/gh_mirrors/dama/DamaiHelper 在当今热门演出门票一票难求的时代&#xff0c;传统手动抢票方式已经…...

OpenClaw局域网访问配置

根据OpenClaw最新官方文档&#xff08;截至2026年3月&#xff09;&#xff0c;以下是更新后的局域网访问配置指南&#xff0c;整合了网络架构、安全加固和自动化配对等新特性&#xff1a;一、核心配置命令&#xff08;基于新版网关协议&#xff09;启用LAN多接口监听 使用新参数…...

C#桌面开发选型指南:OpenTK vs SharpGL,在.NET Framework 4.7/Winform中谁更香?

C#桌面开发选型指南&#xff1a;OpenTK vs SharpGL在WinForm中的深度对决 当我们需要在.NET WinForm项目中集成3D图形功能时&#xff0c;OpenTK和SharpGL这两个库常常成为开发者纠结的选择。作为在.NET生态中封装OpenGL的两种主流方案&#xff0c;它们各有特色&#xff0c;适用…...

ArcGIS10.2许可服务启动失败?别急着重装,试试这个命令行修复大法(附端口冲突排查)

ArcGIS 10.2许可服务启动失败的终极排查指南&#xff1a;从命令行到端口冲突解决 当你面对灰色的启动按钮和毫无反应的ArcGIS License Administrator界面时&#xff0c;那种挫败感我深有体会。作为地理信息行业的从业者&#xff0c;我们常常依赖ArcGIS完成关键工作&#xff0c…...

OpenClaw+Qwen3.5-4B-Claude镜像:30分钟搭建逻辑推理自动化工作流

OpenClawQwen3.5-4B-Claude镜像&#xff1a;30分钟搭建逻辑推理自动化工作流 1. 为什么需要逻辑推理自动化 上周我遇到一个典型的技术问题&#xff1a;需要从200多行Python日志中找出导致接口超时的根本原因。手动排查不仅耗时&#xff0c;还容易遗漏关键线索。这让我开始思考…...

薛定谔共价对接实战:如何为你的靶点蛋白快速找到‘锁死’它的共价抑制剂?

薛定谔共价对接实战&#xff1a;靶点蛋白的共价抑制剂高效筛选策略 药物研发领域正经历一场静默革命——共价抑制剂从曾经的"危险分子"摇身变为现代药物设计的明星。与传统可逆抑制剂不同&#xff0c;共价抑制剂能与靶点蛋白形成稳定的共价键&#xff0c;实现近乎不可…...

SDXL 1.0电影级绘图工坊真实案例:文化遗产数字化重建与风格复原实践

SDXL 1.0电影级绘图工坊真实案例&#xff1a;文化遗产数字化重建与风格复原实践 想象一下&#xff0c;你面前有一张因年代久远而模糊不清的古建筑照片&#xff0c;或是仅存于文字描述中的历史场景。如何将它们清晰地、生动地、甚至以不同艺术风格再现出来&#xff1f;这曾是考…...

LLaMA-Omni代码贡献指南:如何参与这个开源语音AI项目

LLaMA-Omni代码贡献指南&#xff1a;如何参与这个开源语音AI项目 【免费下载链接】LLaMA-Omni LLaMA-Omni is a low-latency and high-quality end-to-end speech interaction model built upon Llama-3.1-8B-Instruct, aiming to achieve speech capabilities at the GPT-4o l…...

DeepSeek-OCR 技术解析:基于视觉压缩的端到端文档理解新范式

1. DeepSeek-OCR&#xff1a;重新定义文档理解的下一代技术 第一次接触DeepSeek-OCR时&#xff0c;我正被一个复杂的多栏报纸数字化项目困扰。传统OCR工具在处理这种复杂版面时&#xff0c;要么丢失栏目分隔信息&#xff0c;要么混淆文字顺序。直到尝试了DeepSeek-OCR的Gundam动…...