当前位置: 首页 > news >正文

Spring Kafka生产者实现

需求

我们需要通过Spring Kafka库,将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入,SASL 采用 SCRAM-SHA-256 方式加解密。

pom.xml

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

我这里不需要写版本号,应为我使用的Spring Boot。Spring Boot会自动帮我挑选spring-kafka应该使用哪个版本合适。

application.yml

spring:kafka:producer:# kafka集群地址bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092client-id: producer-dev# SASL_PLAINTEXT 接入方式security:protocol: SASL_PLAINTEXT# 反序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:# SASL 采用 SCRAM-SHA-256 方式sasl:mechanism: SCRAM-SHA-256# jaas配置jaas:options:username: kafkauserpassword: kafkapwdenabled: truelogin-module: org.apache.kafka.common.security.scram.ScramLoginModulecontrol-flag: required

以上,是关于Spring Kafka的全部配置。下面摘要出来的配置,是可以单独配置在配置中心的:

topic:# 接收消息的主题配置save: hello_kafka_topic
spring:kafka:producer:client-id: producer-dev# kafka集群地址bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092# jaas配置jaas:options:username: kafkauserpassword: kafkapwd

Java

KafkaProducerService.java


public interface KafkaProducerService {/*** 转发消息到kafka*/void sendToKafka(String msg);}

KafkaProducerServiceImpl.java

import cn.com.xxx.service.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** 转发消息到kafka*/
@RefreshScope
@Slf4j
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** kafka接收消息的主题*/@Value("${topic.save}")private String topic;@Overridepublic void sendToKafka(String msg) {log.info(String.format("$$$$ => Producing message: %s", msg));ProducerRecord<String, String> recordKafka = new ProducerRecord<>(topic, msg);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(recordKafka);future.addCallback(new KafkaSendCallback<String, String>() {@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("成功发消息:{}给Kafka:{}", msg, result);}@Overridepublic void onFailure(KafkaProducerException ex) {log.error("发消息:{}给Kafka:{}", msg, recordKafka, ex);}});}
}

到这里为止Spring Kafka生产者所有配置就都可以了。这里使用的异步监听kafka回调的方式发送消息。

总结

这里使用Spring Kafka库异回调步给Kafka消息。这里使用的Spring Kafka库是老版本,所以,这里的使用的回调类是ListenableFuture类。

参考:

  • Spring for Apache Kafka2.8.3
  • Spring for Apache Kafka

相关文章:

Spring Kafka生产者实现

需求 我们需要通过Spring Kafka库&#xff0c;将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入&#xff0c;SASL 采用 SCRAM-SHA-256 方式加解密。 pom.xml <dependency><groupId>org.springframew…...

手把手教你入门Three.js(初识篇)

Three.js入门篇 一、Three.js和webGL的介绍二、开发和学习环境三、 三个基本概念1. 场景Scene2. 相机Camera3. 渲染器Renderer 四、三维坐标系五、材质Material六、光源1. 点光源2. 环境光3. 平行光: 七、常见几何体八、渲染器-设置设备像素比九、渲染器-锯齿属性 一、Three.js…...

Hadoop学习总结(搭建Hadoop集群(伪分布式模式))

如果前面有搭建过Hadoop集群完全分布式模式&#xff0c;现在搭建Hadoop伪分布式模式可以选择直接克隆完全分布式模式中的主节点(hadoop001)。以下是在搭建过完全分布式模式下的Hadoop集群的情况进行 伪分布式模式下的Hadoop功能与完全分布式模式下的Hadoop功能相同。 一、克隆…...

人性与理性共赢,真心罐头跃过增长的山海关

在北方不少地方&#xff0c;黄桃罐头是一种抚慰人心的力量。从大连起家&#xff0c;用真材实料打动人心的真心罐头&#xff0c;在朝着国民品牌前进的路上&#xff0c;需要更透彻地洞悉“人性”。 ”人的因素影响太大。我们希望可以告别个人英雄主义&#xff0c;用流程来保证可…...

【Redis】Docker部署Redis数据库

Docker部署Redis数据库 1. Redis介绍2. CentOS 7 安装 & Docker 配置3. 拉取Redis 镜像、创建容器3.1 配置Docker镜像源3.2 拉取Redis 镜像3.3 容器创建 1. Redis介绍 Redis&#xff08;Remote Dictionary Server )&#xff0c;即远程字典服务&#xff0c;是一个开源的使用…...

【目标跟踪】多目标跟踪测距

文章目录 前言python代码&#xff08;带注释&#xff09;main.pysort.pykalman.pydistance.py 结语 前言 先放效果图。目标框内左上角&#xff0c;显示的是目标距离相机的纵向距离。目标横向距离、速度已求出&#xff0c;没在图片展示。这里不仅仅实现对目标检测框的跟踪&#…...

吐血整理,服务端性能测试-Docker部署MySQL/Nginx(详细步骤)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 基于Docker部署My…...

基于单片机设计的智能窗帘控制系统

一、前言 智能家居技术在近年来取得了巨大的发展&#xff0c;并逐渐成为人们日常生活中的一部分。智能家居系统带来了便利、舒适和高效的生活体验&#xff0c;拥有广泛的应用领域&#xff0c;其中之一就是智能窗帘控制系统。 传统窗帘需要手动操作&#xff0c;打开或关闭窗帘…...

WSL的秘钥被修改了要怎么弄

WSL的秘钥被修改了要怎么弄 gitgithub.com: Permission denied (publickey).ssh-add -l但是我是想加到github上的guiaguaide1.github.com里面哎&#xff0c;为什么这个是shengyi gitgithub.com: Permission denied (publickey). git push -u origin报错 aaaASUS:~/ML/paper/A…...

cesium开发引入方式

无独有偶&#xff0c;引入无非两种方式&#xff1a;外部标签引入和import导入。 1、外部引入 外部引入的话需要提前去下载开发包&#xff0c;下载完后&#xff0c;Build文件夹有两个文件夹&#xff1a;Cesium和CesiumUnminified&#xff0c;Cesium是压缩版的&#xff0c;Cesiu…...

无缝的链间互操作性:通用消息传递的强大之处

前言 通用消息传递&#xff08;General Message Passing&#xff0c;GMP&#xff09;是一种支持区块链之间通信和数据传输的机制。GMP正在成为增强不同区块链网络之间互操作性的解决方案。GMP允许应用程序构建者通过使用安全消息在区块链之间通信和交换信息来利用任何区块链的…...

minio + linux + docker + spring boot实现文件上传与下载

minio docker spring boot实现文件上传与下载 1.在linux上安装并启动docker2.在docker中拉取minio并启动3.Spring Boot 整合 minio4.测试 minio 文件上传、下载及图片预览等功能 1.在linux上安装并启动docker 检查linux内核&#xff0c;必须是3.10以上 uname ‐r安装docker…...

vue ant DatePicker 日期选择器 限制日期可控范围

场景 限制当前日期之前不能选择 限制只能选择日期区间内 Ant Design Vue 效果 <a-date-picker :disabledDate"disabledDate"></a-date-picker>method // 限制日期选择disabledDate(current) {return current && current > moment().endOf(&…...

linux 音视频架构 linux音视频开发

linux 音视频架构 linux音视频开发 转载 mob6454cc65110a 2023-07-26 22:27:01 文章标签 linux 音视频架构 ci QT 视频教程 文章分类 架构 后端开发 阅读数117 目录 前言1、软件工具准备a. 录音软件b. 录屏软件c. 摄像头软件d. 安卓屏幕操作软件e. 视频剪辑软件2、视频教…...

el-table添加固定高度height后高度自适应

0 效果 1 添加自定义指令 新建目录src/directive/el-table 在el-table目录下新建文件adaptive.js import { addResizeListener, removeResizeListener } from element-ui/src/utils/resize-event// 设置表格高度const doResize async(el, binding, vnode) > {// 获取表格…...

Python分享之多进程探索 (multiprocessing包)

在初步了解Python多进程之后&#xff0c;我们可以继续探索multiprocessing包中更加高级的工具。这些工具可以让我们更加便利地实现多进程。 进程池 进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的士兵&#xff0c;准备执行任务(程序)。一个进程池中可以容…...

Boris FX Mocha Pro 2023:Mac/win全能影像处理神器

Boris FX Mocha Pro 2023是一款广受欢迎的影像处理软件&#xff0c;它凭借其强大的功能和卓越的性能&#xff0c;成为了影视后期、广告制作、动画设计等领域的必备工具。无论您是专业的影视制作人员&#xff0c;还是初入行的新手&#xff0c;Boris FX Mocha Pro 2023都能为您的…...

elementUI 特定分辨率(如1920*1080)下el-row未超出一行却换行

在1920*1080分辨率下&#xff0c; el-col 内容未超出 el-col 宽度&#xff0c;el-col 不足以占据一行&#xff0c;el-row 却自动换行了&#xff08;其他分辨率没有这个问题&#xff09;。 截图&#xff1a; 排查&#xff1a; el-col 内容没有溢出&#xff1b;没有多余的 pad…...

mac电脑视频处理推荐:达芬奇DaVinci Resolve Studio 18 中文最新

DaVinci Resolve Studio 18是一款专业的视频编辑、调色和后期制作软件&#xff0c;由Blackmagic Design开发。它被广泛应用于电影、电视和广告等行业&#xff0c;提供了全面的工具和功能&#xff0c;使用户能够进行高质量的影片制作和后期处理。 以下是DaVinci Resolve Studio…...

OKLink携手CertiK在港举办Web3生态安全主题论坛

2023年10月23日&#xff0c;OKLink与CertiK共同发起的Web3生态安全主题论坛在香港铜锣湾拉开帷幕。本次论坛由OKLink和CertiK主办&#xff0c;香港投资推广署独家支持&#xff0c;聚焦如何构建安全可靠的Web3生态系统议题&#xff0c;同时深入剖析这一进程中所面临的潜在挑战。…...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》

引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

3.3.1_1 检错编码(奇偶校验码)

从这节课开始&#xff0c;我们会探讨数据链路层的差错控制功能&#xff0c;差错控制功能的主要目标是要发现并且解决一个帧内部的位错误&#xff0c;我们需要使用特殊的编码技术去发现帧内部的位错误&#xff0c;当我们发现位错误之后&#xff0c;通常来说有两种解决方案。第一…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止

<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet&#xff1a; https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》

在注意力分散、内容高度同质化的时代&#xff0c;情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现&#xff0c;消费者对内容的“有感”程度&#xff0c;正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中&#xff0…...

python如何将word的doc另存为docx

将 DOCX 文件另存为 DOCX 格式&#xff08;Python 实现&#xff09; 在 Python 中&#xff0c;你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是&#xff0c;.doc 是旧的 Word 格式&#xff0c;而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...