当前位置: 首页 > news >正文

Spring Kafka生产者实现

需求

我们需要通过Spring Kafka库,将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入,SASL 采用 SCRAM-SHA-256 方式加解密。

pom.xml

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

我这里不需要写版本号,应为我使用的Spring Boot。Spring Boot会自动帮我挑选spring-kafka应该使用哪个版本合适。

application.yml

spring:kafka:producer:# kafka集群地址bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092client-id: producer-dev# SASL_PLAINTEXT 接入方式security:protocol: SASL_PLAINTEXT# 反序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:# SASL 采用 SCRAM-SHA-256 方式sasl:mechanism: SCRAM-SHA-256# jaas配置jaas:options:username: kafkauserpassword: kafkapwdenabled: truelogin-module: org.apache.kafka.common.security.scram.ScramLoginModulecontrol-flag: required

以上,是关于Spring Kafka的全部配置。下面摘要出来的配置,是可以单独配置在配置中心的:

topic:# 接收消息的主题配置save: hello_kafka_topic
spring:kafka:producer:client-id: producer-dev# kafka集群地址bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092# jaas配置jaas:options:username: kafkauserpassword: kafkapwd

Java

KafkaProducerService.java


public interface KafkaProducerService {/*** 转发消息到kafka*/void sendToKafka(String msg);}

KafkaProducerServiceImpl.java

import cn.com.xxx.service.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** 转发消息到kafka*/
@RefreshScope
@Slf4j
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** kafka接收消息的主题*/@Value("${topic.save}")private String topic;@Overridepublic void sendToKafka(String msg) {log.info(String.format("$$$$ => Producing message: %s", msg));ProducerRecord<String, String> recordKafka = new ProducerRecord<>(topic, msg);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(recordKafka);future.addCallback(new KafkaSendCallback<String, String>() {@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("成功发消息:{}给Kafka:{}", msg, result);}@Overridepublic void onFailure(KafkaProducerException ex) {log.error("发消息:{}给Kafka:{}", msg, recordKafka, ex);}});}
}

到这里为止Spring Kafka生产者所有配置就都可以了。这里使用的异步监听kafka回调的方式发送消息。

总结

这里使用Spring Kafka库异回调步给Kafka消息。这里使用的Spring Kafka库是老版本,所以,这里的使用的回调类是ListenableFuture类。

参考:

  • Spring for Apache Kafka2.8.3
  • Spring for Apache Kafka

相关文章:

Spring Kafka生产者实现

需求 我们需要通过Spring Kafka库&#xff0c;将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入&#xff0c;SASL 采用 SCRAM-SHA-256 方式加解密。 pom.xml <dependency><groupId>org.springframew…...

手把手教你入门Three.js(初识篇)

Three.js入门篇 一、Three.js和webGL的介绍二、开发和学习环境三、 三个基本概念1. 场景Scene2. 相机Camera3. 渲染器Renderer 四、三维坐标系五、材质Material六、光源1. 点光源2. 环境光3. 平行光: 七、常见几何体八、渲染器-设置设备像素比九、渲染器-锯齿属性 一、Three.js…...

Hadoop学习总结(搭建Hadoop集群(伪分布式模式))

如果前面有搭建过Hadoop集群完全分布式模式&#xff0c;现在搭建Hadoop伪分布式模式可以选择直接克隆完全分布式模式中的主节点(hadoop001)。以下是在搭建过完全分布式模式下的Hadoop集群的情况进行 伪分布式模式下的Hadoop功能与完全分布式模式下的Hadoop功能相同。 一、克隆…...

人性与理性共赢,真心罐头跃过增长的山海关

在北方不少地方&#xff0c;黄桃罐头是一种抚慰人心的力量。从大连起家&#xff0c;用真材实料打动人心的真心罐头&#xff0c;在朝着国民品牌前进的路上&#xff0c;需要更透彻地洞悉“人性”。 ”人的因素影响太大。我们希望可以告别个人英雄主义&#xff0c;用流程来保证可…...

【Redis】Docker部署Redis数据库

Docker部署Redis数据库 1. Redis介绍2. CentOS 7 安装 & Docker 配置3. 拉取Redis 镜像、创建容器3.1 配置Docker镜像源3.2 拉取Redis 镜像3.3 容器创建 1. Redis介绍 Redis&#xff08;Remote Dictionary Server )&#xff0c;即远程字典服务&#xff0c;是一个开源的使用…...

【目标跟踪】多目标跟踪测距

文章目录 前言python代码&#xff08;带注释&#xff09;main.pysort.pykalman.pydistance.py 结语 前言 先放效果图。目标框内左上角&#xff0c;显示的是目标距离相机的纵向距离。目标横向距离、速度已求出&#xff0c;没在图片展示。这里不仅仅实现对目标检测框的跟踪&#…...

吐血整理,服务端性能测试-Docker部署MySQL/Nginx(详细步骤)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 基于Docker部署My…...

基于单片机设计的智能窗帘控制系统

一、前言 智能家居技术在近年来取得了巨大的发展&#xff0c;并逐渐成为人们日常生活中的一部分。智能家居系统带来了便利、舒适和高效的生活体验&#xff0c;拥有广泛的应用领域&#xff0c;其中之一就是智能窗帘控制系统。 传统窗帘需要手动操作&#xff0c;打开或关闭窗帘…...

WSL的秘钥被修改了要怎么弄

WSL的秘钥被修改了要怎么弄 gitgithub.com: Permission denied (publickey).ssh-add -l但是我是想加到github上的guiaguaide1.github.com里面哎&#xff0c;为什么这个是shengyi gitgithub.com: Permission denied (publickey). git push -u origin报错 aaaASUS:~/ML/paper/A…...

cesium开发引入方式

无独有偶&#xff0c;引入无非两种方式&#xff1a;外部标签引入和import导入。 1、外部引入 外部引入的话需要提前去下载开发包&#xff0c;下载完后&#xff0c;Build文件夹有两个文件夹&#xff1a;Cesium和CesiumUnminified&#xff0c;Cesium是压缩版的&#xff0c;Cesiu…...

无缝的链间互操作性:通用消息传递的强大之处

前言 通用消息传递&#xff08;General Message Passing&#xff0c;GMP&#xff09;是一种支持区块链之间通信和数据传输的机制。GMP正在成为增强不同区块链网络之间互操作性的解决方案。GMP允许应用程序构建者通过使用安全消息在区块链之间通信和交换信息来利用任何区块链的…...

minio + linux + docker + spring boot实现文件上传与下载

minio docker spring boot实现文件上传与下载 1.在linux上安装并启动docker2.在docker中拉取minio并启动3.Spring Boot 整合 minio4.测试 minio 文件上传、下载及图片预览等功能 1.在linux上安装并启动docker 检查linux内核&#xff0c;必须是3.10以上 uname ‐r安装docker…...

vue ant DatePicker 日期选择器 限制日期可控范围

场景 限制当前日期之前不能选择 限制只能选择日期区间内 Ant Design Vue 效果 <a-date-picker :disabledDate"disabledDate"></a-date-picker>method // 限制日期选择disabledDate(current) {return current && current > moment().endOf(&…...

linux 音视频架构 linux音视频开发

linux 音视频架构 linux音视频开发 转载 mob6454cc65110a 2023-07-26 22:27:01 文章标签 linux 音视频架构 ci QT 视频教程 文章分类 架构 后端开发 阅读数117 目录 前言1、软件工具准备a. 录音软件b. 录屏软件c. 摄像头软件d. 安卓屏幕操作软件e. 视频剪辑软件2、视频教…...

el-table添加固定高度height后高度自适应

0 效果 1 添加自定义指令 新建目录src/directive/el-table 在el-table目录下新建文件adaptive.js import { addResizeListener, removeResizeListener } from element-ui/src/utils/resize-event// 设置表格高度const doResize async(el, binding, vnode) > {// 获取表格…...

Python分享之多进程探索 (multiprocessing包)

在初步了解Python多进程之后&#xff0c;我们可以继续探索multiprocessing包中更加高级的工具。这些工具可以让我们更加便利地实现多进程。 进程池 进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的士兵&#xff0c;准备执行任务(程序)。一个进程池中可以容…...

Boris FX Mocha Pro 2023:Mac/win全能影像处理神器

Boris FX Mocha Pro 2023是一款广受欢迎的影像处理软件&#xff0c;它凭借其强大的功能和卓越的性能&#xff0c;成为了影视后期、广告制作、动画设计等领域的必备工具。无论您是专业的影视制作人员&#xff0c;还是初入行的新手&#xff0c;Boris FX Mocha Pro 2023都能为您的…...

elementUI 特定分辨率(如1920*1080)下el-row未超出一行却换行

在1920*1080分辨率下&#xff0c; el-col 内容未超出 el-col 宽度&#xff0c;el-col 不足以占据一行&#xff0c;el-row 却自动换行了&#xff08;其他分辨率没有这个问题&#xff09;。 截图&#xff1a; 排查&#xff1a; el-col 内容没有溢出&#xff1b;没有多余的 pad…...

mac电脑视频处理推荐:达芬奇DaVinci Resolve Studio 18 中文最新

DaVinci Resolve Studio 18是一款专业的视频编辑、调色和后期制作软件&#xff0c;由Blackmagic Design开发。它被广泛应用于电影、电视和广告等行业&#xff0c;提供了全面的工具和功能&#xff0c;使用户能够进行高质量的影片制作和后期处理。 以下是DaVinci Resolve Studio…...

OKLink携手CertiK在港举办Web3生态安全主题论坛

2023年10月23日&#xff0c;OKLink与CertiK共同发起的Web3生态安全主题论坛在香港铜锣湾拉开帷幕。本次论坛由OKLink和CertiK主办&#xff0c;香港投资推广署独家支持&#xff0c;聚焦如何构建安全可靠的Web3生态系统议题&#xff0c;同时深入剖析这一进程中所面临的潜在挑战。…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止

<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet&#xff1a; https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂度…...

C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。

1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj&#xff0c;再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解

在 C/C 编程的编译和链接过程中&#xff0c;附加包含目录、附加库目录和附加依赖项是三个至关重要的设置&#xff0c;它们相互配合&#xff0c;确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中&#xff0c;这些概念容易让人混淆&#xff0c;但深入理解它们的作用和联…...

莫兰迪高级灰总结计划简约商务通用PPT模版

莫兰迪高级灰总结计划简约商务通用PPT模版&#xff0c;莫兰迪调色板清新简约工作汇报PPT模版&#xff0c;莫兰迪时尚风极简设计PPT模版&#xff0c;大学生毕业论文答辩PPT模版&#xff0c;莫兰迪配色总结计划简约商务通用PPT模版&#xff0c;莫兰迪商务汇报PPT模版&#xff0c;…...

在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7

在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤&#xff1a; 第一步&#xff1a; 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为&#xff1a; // 改为 v…...