当前位置: 首页 > news >正文

Spring Boot集成Kafka:最佳实践与详细指南

文章目录

  • 一、生产者
    • 1.引入库
    • 2.配置文件
    • 3.配置类
      • PublicConfig.java
      • MessageProducer.java
    • 4.业务处理类
  • 三、消费者
    • 1.引入库
    • 2.配置类
      • PublicConfig.java
      • MessageConsumer.java
    • 3.业务类

一、生产者

1.引入库

引入需要依赖的jar包,引入POM文件:

 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2.配置文件

配置Kafka的相关参数(或者你项目的cacos或者yaml文件里添加)

以下是一个示例配置:application.properties

ccm.kafka.servers:192.168.1.95:9092,192.168.1.96:9092,192.168.1.97:9092
ccm.kafka.topics.xxx:xxx_content_dev

Tip:建议topic命名规则:租户简称+项目关键词+系统环境的方式,更容易区分

3.配置类

PublicConfig.java

@Data
@Configuration
@ConfigurationProperties(prefix = "ccm.kafka")
//配置信息nacos中配置
public class PublicConfig {private String servers;private String alertTopic;}

MessageProducer.java


@Slf4j
@Component
public class MessageProducer {private Producer producerKafka;@AutowiredPublicConfig publicConfig;/*** 初始化方法*/@PostConstructpublic String init() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publicConfig.getServers());props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(30 * 1000));props.put(ProducerConfig.ACKS_CONFIG, "all");producerKafka = new KafkaProducer(props);log.info("kafka message channel created successfully");return "OK";}public ResponseData send(String content, String topic) {long startTime = System.currentTimeMillis();try {String key = UUID.randomUUID().toString().replace("-", "");ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, key, content);log.info("MessageProducer send key {},message{}", key, content);Future<RecordMetadata> send = producerKafka.send(kafkaMessage);send.get();log.info("MessageProducer send cost time:{}", System.currentTimeMillis() - startTime);} catch (Exception e) {log.error("MessageProducer Failed to push message:{}", e.getMessage());return ResponseData.errorWithMsg("MessageProducer Failed to push message:" + e.getMessage());}return null;}}

4.业务处理类

示例代码的业务场景:定时生成预警消息发送给下游系统调用。

//启动类注意增加定时注解的支持
@SpringBootApplication
@MapperScan(basePackages = {"com.xx.xx.mapper","com.xx.xx.crawler.mapper"})
@EnableScheduling
public class CATApp {public static void main(String[] args) {SpringApplication.run(CATApp.class,args);}}@Service
@Slf4j
public class CrawlerService {@Scheduled(cron = "${crawler.scheduled.cron:0 */1 * * * ?}") // 每5分钟执行一次//   @Scheduled(cron = "${crawler.scheduled.cron:0 0 0/1 * * ?}") // 每小时执行一次public void crawlAndSaveAlertInfos() {log.info(">>>>>>>>>>>>> crawlAndSaveAlertInfos  ");//替换成具体的业务场景 List<AlertInfo> alertInfos = fetchAlertInfoList();if (!alertInfos.isEmpty()) {for (AlertInfo alertInfo : alertInfos) {//发送预警信息到kafka供下游调用crawlerAlertSyncService.sendCrawlerAlertMsgKafka(alertInfo);}}}
/**** 预警消息通过Kafka异步同步其他应用*/
public interface CrawlerAlertSyncService {void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) ;}@Slf4j
@Service
public class CrawlerAlertSyncServiceImpl implements CrawlerAlertSyncService {@Autowiredprivate MessageProducer messageProducer;@Resourceprivate PublicConfig publicConfig;@Overridepublic void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) {String topic = publicConfig.getAlertTopic();String servers = publicConfig.getServers();log.info("send publish msg to kafka  ,topic:{},bizId:{}", topic, alertInfo.getAlertid());log.info("send publish msg to kafka  ,servers:{}", servers);String content = JSON.toJSONString(alertInfo);log.info("send publish msg to kafka  ,content:{}", content);if (StringUtils.isNotBlank(topic)) {messageProducer.send(content, topic);}}
}

三、消费者

1.引入库

在消费者工程pom文件中配置依赖

 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2.配置类

同样根据该项目情况编写配置类,示例代码中仍为读取naco配置

PublicConfig.java

@Data
@Configuration
@Slf4j
@ConfigurationProperties(prefix = "xman.kafka")
public class PublicConfig {private String servers;private Map<String,String> topics;public String getTopic(String appCode) {if(Objects.isNull(topics) || topics.isEmpty()){return null;}return topics.get(appCode);}private String alertTopic;private String group;
}

MessageConsumer.java

@Slf4j
@Component
public abstract class MessageConsumer {// 用于持续监听kafka消息的专用线程池private ExecutorService threadPool;// 用于持续消费kafka消息的专用线程池private ExecutorService consumerThreadPool;@Resourceprivate PublicConfig publicConfig;/*** 初始化方法*/@PostConstructpublic String init() {MessageConfigField messageConfig = MessageConfigField.builder().servers(publicConfig.getServers()).topic(publicConfig.getAlertTopic()).group(publicConfig.getGroup()).build();if (StringUtils.isBlank(messageConfig.getServers())) {//没有配置kafka信息return "OK";}initThreadPool();KafkaConsumer<String, String> instance = kafkaInstance(messageConfig.getServers(),messageConfig.getGroup(), messageConfig.getTopic(), messageConfig.getClientName(),messageConfig.getUsername(), messageConfig.getPassword());startListen(instance);log.info("ccm kafka消息订阅成功:clientId:" + messageConfig.getClientName());return "OK";}private void initThreadPool() {if (null == threadPool) {log.info("initThreadPool start");threadPool = Executors.newFixedThreadPool(1);log.info("initThreadPool done");}}private void startListen(KafkaConsumer<String, String> consumer) {threadPool.submit(() -> {TenantContext.setContextCode(CommonConstants.TENANT_CODE);while (true) {try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));if (records == null || records.isEmpty()) {continue;}for (ConsumerRecord<String, String> record : records) {Optional<String> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {String msg = kafkaMessage.get();if (StringUtils.isNotBlank(msg)) {log.info("msgJson:" + msg);consumeMsg(msg);}}}} catch (Exception e) {TimeUnit.SECONDS.sleep(1);log.error("consume error", e);}}});}public static KafkaConsumer<String, String> kafkaInstance(String servers, String group,String topic, String clientId, String username, String password) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);if (StringUtils.isNotBlank(group)) {props.put(ConsumerConfig.GROUP_ID_CONFIG, group);}props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);List<String> subscribedTopics = new ArrayList<>();subscribedTopics.add(topic);consumer.subscribe(subscribedTopics);return consumer;}/*** 核心逻辑,由子类继承实现** @param msgData msg*/public abstract void consumeMsg(String msgData) throws Exception;}

3.业务类

@Slf4j
@Service
@RefreshScope
public class CmsInfoConsumer extends MessageConsumer {@Resourceprivate InfoService infoService;@Overridepublic void consumeMsg(String msgData) throws Exception {log.info("CmsWeatherConsumer收到mq消息message:{}", msgData);CcmAlertInfoDTO alertInfoDTO = JSONObject.parseObject(msgData, CcmAlertInfoDTO.class);try {//to_do 处理消费内容infoService.saveInfoContent(alertInfoDTO);} catch (Exception e) {e.printStackTrace();log.info("同步用户消息失败:" + e);}}
}

至此,一个简单的通过kafka同步预警消息的应用就开发完了。

相关文章:

Spring Boot集成Kafka:最佳实践与详细指南

文章目录 一、生产者1.引入库2.配置文件3.配置类PublicConfig.javaMessageProducer.java 4.业务处理类 三、消费者1.引入库2.配置类PublicConfig.javaMessageConsumer.java 3.业务类 一、生产者 1.引入库 引入需要依赖的jar包&#xff0c;引入POM文件&#xff1a; <depend…...

基于Qwen2-VL模型针对LaTeX OCR任务进行微调训练 - 多图推理

基于Qwen2-VL模型针对LaTeX OCR任务进行微调训练 - 多图推理 flyfish 基于Qwen2-VL模型针对LaTeX_OCR任务进行微调训练_-_LoRA配置如何写 基于Qwen2-VL模型针对LaTeX_OCR任务进行微调训练_-_单图推理 基于Qwen2-VL模型针对LaTeX_OCR任务进行微调训练_-_原模型_单图推理 基于Q…...

详解下c语言下的多维数组和指针数组

在实际c语言编程中&#xff0c;三维及以上数组我们使用的很少&#xff0c;二维数组我们使用得较多。说到数组&#xff0c;又不得关联到指针&#xff0c;因为他们两者的联系太紧密了。今天我们就详细介绍下c语言下的多维数组(主要是介绍二维数组)和指针。 一、二维数组 1.1&am…...

免费送源码:Java+ssm+MySQL 基于微服务架构的餐饮系统的设计与实现 计算机毕业设计原创定制

摘 要 近年来,我国经济和社会发展迅速,人们物质生活水平日渐提高,餐饮行业更是发展迅速,人们对于餐饮行业的认识和要求也越来越高。传统形式的餐饮行业都是以人为本,管理起来需要很多人力、物力、财力,既不方便管理者的管理,也不方便顾客实时了解餐厅动态,给传统餐饮行业的经…...

LeetCode hot100-69-N

https://leetcode.cn/problems/valid-parentheses/description/?envTypestudy-plan-v2&envIdtop-100-liked 20. 有效的括号 已解答 简单 相关标签 相关企业 提示 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#x…...

【橘子容器】如何构建一个docker镜像

你肯定打过docker镜像是吧&#xff0c;作为一个开发这很正常&#xff0c;那么你用的什么打包方式呢&#xff0c;这里我们来梳理几种常用的docker镜像构建方式。 ps&#xff1a;这里不是太讲原理&#xff0c;更多的是一种科普和操作。因为讲原理的东西网上已经够多了。 一、Dock…...

EFAK kafka可视化管理工具部署使用

简介&#xff1a;EFAK是开源的可视化和管理软件。它允许您查询、可视化、提醒和探索您的指标&#xff0c;无论它们存储在何处。简单来说&#xff0c;它为您提供了将 Kafka 集群数据转换为漂亮的图形和可视化效果的工具。 环境&#xff1a;①操作系统&#xff1a;CentOS7.6&…...

Spring Boot 工程分层实战(五个分层维度)

1、分层思想 计算机领域有一句话&#xff1a;计算机中任何问题都可通过增加一个虚拟层解决。这句体现了分层思想重要性&#xff0c;分层思想同样适用于Java工程架构。 分层优点是每层只专注本层工作&#xff0c;可以类比设计模式单一职责原则&#xff0c;或者经济学比较优势原…...

vscode IntelliSense Configurations

IntelliSense 是一个强大的代码补全和代码分析功能&#xff0c;它可以帮助开发者提高编程效率。图中显示的是 VSCode 的 IntelliSense 配置界面&#xff0c;具体配置如下&#xff1a; Compiler path&#xff08;编译器路径&#xff09;: 这里指定了用于构建项目的编译器的完整路…...

hbase读写操作后hdfs内存占用太大的问题

hbase读写操作后hdfs内存占用太大的问题 查看内存信息hbase读写操作 查看内存信息 查看本地磁盘的内存信息 df -h查看hdfs上根目录下各个文件的内存大小 hdfs dfs -du -h /查看hdfs上/hbase目录下各个文件的内存大小 hdfs dfs -du -h /hbase查看hdfs上/hbase/oldWALs目录下…...

C++----入门篇

引言 C是在C的基础之上&#xff0c;容纳进去了面向对象编程思想&#xff0c;并增加了许多有用的库&#xff0c;以及编程范式等。熟悉C语言之后&#xff0c;对C学习有一定的帮助&#xff0c;本章节主要目标&#xff1a; 1. 补充C语言语法的不足&#xff0c;以及C是如何对C语言…...

C语言程序设计P5-5【应用函数进行程序设计 | 第五节】—知识要点:变量的作用域和生存期

知识要点&#xff1a;变量的作用域和生存期 视频&#xff1a; 目录 一、任务分析 二、必备知识与理论 三、任务实施 一、任务分析 有一个一维数组&#xff0c;内放 10 个学生成绩&#xff0c;写一个函数&#xff0c;求出平均分、最高分和最低分。 任务要求用一个函数来完…...

用 Sass 模块化系统取代全局导入,消除 1.80.0 引入的 @import 弃用警告

目录 前言 问题 import 的缺陷 命名冲突 重复导入 模块系统 use 规则 forward 规则 实际修改 前言 最初&#xff0c;Sass 使用 import 规则通过单个全局命名空间加载其他文件&#xff0c;所有内置函数也可全局使用。由于模块系统&#xff08;use 和 forward 规则&…...

安卓低功耗蓝牙BLE官方开发例程(JAVA)翻译注释版

官方原文链接 https://developer.android.com/develop/connectivity/bluetooth/ble/ble-overview?hlzh-cn 目录 低功耗蓝牙 基础知识 关键术语和概念 角色和职责 查找 BLE 设备 连接到 GATT 服务器 设置绑定服务 设置 BluetoothAdapter 连接到设备 声明 GATT 回…...

搭建fastapi项目

环境准备 # 创建项目目录 mkdir my_fastapi_project cd my_fastapi_project# 创建和激活虚拟环境 python -m venv venv .\venv\Scripts\activate安装必要的包 pip install fastapi uvicorn python-dotenv创建项目基本结构 my_fastapi_project/ │ .env # …...

Maven学习(Maven项目模块化。模块间“继承“机制。父(工程),子项目(模块)间聚合)

目录 一、Maven项目模块化&#xff1f; &#xff08;1&#xff09;基本介绍。 &#xff08;2&#xff09;汽车模块化生产再聚合组装。 &#xff08;3&#xff09;Maven项目模块化图解。 1、maven_parent。 2、maven_pojo。 3、maven_dao。 4、maven_service。 5、maven_web。 6…...

华为云云原生中间件DCS DMS 通过中国信通院与全球IPv6测试中心双重能力检测

近日&#xff0c;中国信息通信研究院&#xff08;以下简称“中国信通院”&#xff09;与全球IPv6测试中心相继宣布&#xff0c;华为云的分布式缓存服务&#xff08;Distributed Cache Service&#xff0c;简称DCS&#xff09;和分布式消息服务&#xff08;Distributed Message …...

PostgreSQL中事件触发器Event Trigger

在PostgreSQL中&#xff0c;事件触发器&#xff08;Event Trigger&#xff09;是一种特殊的触发器类型&#xff0c;它允许你在特定的数据库系统事件发生时执行特定的操作。与普通的触发器不同&#xff0c;事件触发器并不与特定的表或视图相关联&#xff0c;而是与数据库级别的全…...

uni.request流式(Stream)请求,实现打印机效果

最近使用扣子 - 开发指南 (coze.cn)和智谱AI开放平台开发小程序AI导诊和用药对话指南。 开发的过程中也是走了不少坑,下面就来聊聊走了哪些坑。 坑1 :coze试了v2和v3的接口,两个接口请求还是有点差别的,v2拿到了botId和accessToken可以直接请求不需要做任何处理,v3还需要…...

canvas保存图片

需求&#xff1a;上面有几个按钮&#xff0c;其中有一个切换是图片 用v-if会导致图片加载慢 实现方法&#xff1a; 一进来就加载&#xff0c;通过监听元素显示&#xff0c;用于控制canvas的宽高&#xff0c;从而达到隐藏的效果 组件dowolad.vue <template><view …...

DNS到底有什么用?

举个例子&#xff0c;对于我们来说访问的域名是www.baidu.com&#xff0c;但是实际在计算机并不认识这个域名&#xff0c;计算机是需要通过IP地址去访问这个网站&#xff0c;所以呢&#xff1f;这个时候就需要一个dns解析器&#xff0c;来把这串域名转换为IP地址给计算机去访问…...

什么是CRM系统?CRM系统的功能、操作流程、生命周期

CRM系统作为企业管理和维护客户关系的重要工具&#xff0c;在商业活动中扮演着越来越重要的角色。今天&#xff0c;就让我们一起揭开它的神秘面纱&#xff0c;看看这个“幕后英雄”到底是怎么工作的。 什么是CRM系统&#xff1f; 首先&#xff0c;我们要了解什么是CRM。简单来…...

美畅物联丨JS播放器录像功能:从技术到应用的全面解析

畅联云平台的JS播放器是一款功能十分强大的视频汇聚平台播放工具&#xff0c;它已经具备众多实用功能&#xff0c;像实时播放、历史录像回放、云台控制、倍速播放、录像记录、音频播放、画面放大、全屏展示、截图捕捉等等。这些功能构建起了一个高效、灵活且用户友好的播放环境…...

我们来学mysql -- 事务并发之不可重复读(原理篇)

事务并发之不可重复读 题记不可重复读系列文章 题记 在《事务之概念》提到事务对应现实世界的状态转换&#xff0c;这个过程要满足4个特性这世界&#xff0c;真理只在大炮射程之类&#xff0c;通往和平的道路&#xff0c;非“常人”可以驾驭一个人生活按部就班&#xff0c;人多…...

ABAQUS进行焊接仿真分析(含子程序)

0 前言 焊接技术作为现代制造业中的重要连接工艺,广泛应用于汽车、船舶、航空航天、能源等多个行业。焊接接头的质量和性能直接影响到结构件的安全性、可靠性和使用寿命。因此,在焊接过程中如何有效预测和优化焊接过程中的热效应、应力变化以及材料变形等问题,成为了焊接研…...

BAPI_GOODSMVT_CREATE物料凭证增强字段

目的&#xff1a;增加字段LSMNG LSMEH的赋值 项目MSEG 的 BAPI 表增强结构 BAPI_TE_XMSEG 抬头MKPF 的 BAIP 表增强 BAPI_TE_XMKPF 1. 在结构BAPI_TE_XMSEG中appending structure附加结构 ZMSEG_001&#xff0c;增加字段LSMNG&#xff0c; LSMEH In The method IF_EX_MB_H…...

tomcat的优化和动静分离

tomcat的优化 1.tomcat的配置优化 2.操作系统的内核优化 注意&#xff1a;设置保存后&#xff0c;需要重新ssh连接才会看到配置更改的变化 vim /etc/security/limits.conf # 65535 为Linux系统最大打开文件数 * soft nproc 65535 * hard nproc 65535 * soft nofile 65535 *…...

[ShaderLab] 【Unity】【图像编程】理解 Unity Shader 的结构

在计算机图形学领域,开发者经常面临着管理着色器复杂性的挑战。正如大卫惠勒(David Wheeler)所说:“计算机科学中的任何问题都可以通过增加一层抽象来解决。” Unity 提供了这样一层抽象,即 ShaderLab,它通过组织和定义渲染过程的各个步骤,简化了编写着色器的过程。 什…...

vue的前端架构 介绍各自的优缺点

Vue.js 是一个用于构建用户界面的渐进式框架&#xff0c;可以根据项目的复杂性和需求选择不同的前端架构。以下是几种常见的 Vue 前端架构及其优缺点&#xff1a; 1. 单页应用 (SPA) 单页应用&#xff08;Single Page Application&#xff0c;简称 SPA&#xff09;是一种现代…...

可信AI与零知识证明的概念

可信AI 可信AI是指人工智能的设计、开发和部署遵循一系列原则和方法,以确保其行为和决策是可靠、可解释、公平、安全且符合人类价值观和社会利益的.以下是关于可信AI的举例说明、实现方式及主流方案: 举例说明 医疗诊断领域:一个可信AI的医疗诊断系统,不仅能够准确地识别…...