当前位置: 首页 > news >正文

Spring boot封装rocket mq 教程

1、rocket mq版本

      5.1.3

2、pom引入rocket mq依赖

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency>

3、发送MQ消息工具类

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;@Slf4j
public class MqSendUtil {@SneakyThrowspublic static MessageId sendMq(String topic, String tag, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag(tag)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}@SneakyThrowspublic static MessageId sendMqNoTag(String topic, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}}

4、发送MQ消息测试代码

import cn.hutool.core.util.IdUtil;
import org.recipe.draw.common.util.MqSendUtil;public class MqSendTest {public static void test1() {MqSendUtil.sendMq("demo", "tag", "哈哈哈哈tag", IdUtil.getSnowflakeNextIdStr());}public static void main(String[] args) {test1();}
}

5、MessageContext 消息内容的封装

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Collection;
import java.util.Map;@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageContext {private String messageId;private String topic;private String body;private Map<String, String> properties;private Collection<String> keys;private Long deliveryTimestamp;private String bornHost;private Long bornTimestamp;private int deliveryAttempt;}

6、AbstractMqConsumer 发送mq消息的抽象类

import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.boot.CommandLineRunner;import java.nio.charset.StandardCharsets;
import java.util.Collections;@Slf4j
public abstract class AbstractMqConsumer implements CommandLineRunner {public abstract String topic();public abstract String consumerGroup();public abstract String tag();public abstract void process(MessageContext messageContext);@Overridepublic void run(String... args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "127.0.0.1:9080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = StrUtil.isEmpty(tag()) ? "*" : tag();FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。String consumerGroup = consumerGroup();// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = topic();// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。MessageContext context = toMessageContext(messageView);
//                    log.info("收到mq消息主体内容:{}",context);try {process(context);} catch (Exception e) {log.error("处理mq消息出现异常,消息已自动丢弃,不会再投入队列:", e);}return ConsumeResult.SUCCESS;}).build();log.info("消费者初始化完成,topic:{},tag:{},consumerGroup:{}", topic, tag, consumerGroup);}private MessageContext toMessageContext(MessageView messageView) {Long deliveryTimestamp = messageView.getDeliveryTimestamp().isPresent() ? messageView.getDeliveryTimestamp().get() : null;return MessageContext.builder().messageId(messageView.getMessageId().toString()).topic(messageView.getTopic()).body(StandardCharsets.UTF_8.decode(messageView.getBody()).toString()).properties(messageView.getProperties()).keys(messageView.getKeys()).deliveryTimestamp(deliveryTimestamp).bornHost(messageView.getBornHost()).deliveryAttempt(messageView.getDeliveryAttempt()).build();}}

7、具体的消费类

topic指定消费者订阅的话题,comsumerGroup指明该消费者属于哪一个消费者分组,tag表明是否要获取指定标签的消息,process代表具体的业务处理逻辑,具体消息的内容可以MessageContext 类里面获取

import lombok.extern.slf4j.Slf4j;
import org.recipe.draw.common.mqcomsumer.abstracts.AbstractMqConsumer;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class DemoConsumer extends AbstractMqConsumer {@Overridepublic String topic() {return "demo";}@Overridepublic String consumerGroup() {return "demo";}@Overridepublic String tag() {return null;}@Overridepublic void process(MessageContext messageContext) {log.info("收到消息:{}", messageContext);}
}

相关文章:

Spring boot封装rocket mq 教程

1、rocket mq版本 5.1.3 2、pom引入rocket mq依赖 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency> 3、发送MQ消息工具类 impor…...

Java Swing手搓童年坦克大战游戏(I)

前言 业余偶尔对游戏有些兴趣&#xff0c;不过这样的时代&#xff0c;硬件软件飞速进步&#xff0c;2D游戏画面都无比精美&#xff0c;之前的8bit像素游戏时代早就过去了&#xff0c;不过那时候有许多让人印象深刻的游戏比如魂斗罗、超级玛丽、坦克大战(Battle City)等等。 学…...

【DevOps-04]】Operate阶段工具

一、简要说明 安装Docker安装Docker-compose二、安装Docker 官网地址:https://www.docker.com文档地址:Docker Docs仓库地址:https://hub.docker.com1、Docker相关网站 官方网站Get Docker | Docker Docs...

力扣2807.在链表中插入最大公约数

思路&#xff1a;遍历链表&#xff0c;对于每一个结点求出它与下一个结点的最大公约数并插入到俩个结点之间 代码&#xff1a; /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}…...

开始刷Leetcode之前你需要知道的 - The basic is all you need

数据结构&#xff1a;列表&#xff0c;哈希表&#xff0c;集合&#xff0c;栈&#xff0c;堆&#xff0c;链表&#xff0c;二叉树&#xff0c;图 入门算法&#xff1a;递归&#xff0c;排序算法&#xff0c;二分法&#xff0c;bfs&#xff0c;dfs list/array 列表常见操作&am…...

【PostgreSQL】模式Schema

PostgreSQL 数据库集群包含一个或多个命名数据库。角色和一些其他对象类型在整个集群中共享。与服务器的客户端连接只能访问单个数据库中的数据&#xff0c;该数据库在连接请求中指定。 数据库包含一个或多个命名schema&#xff0c;而这些schema又包含表。schema还包含其他类型…...

JavaScript实现的复杂功能:自动生成带水印的图片

#程序员的崩溃瞬间 在本文中&#xff0c;我们将讨论一个JavaScript实现的复杂功能&#xff0c;该功能可以自动为图片添加水印。这个功能在许多场景中都非常有用&#xff0c;例如&#xff0c;如果你想保护你的图片版权&#xff0c;或者你想在你的网站上显示自定义的水印。 一、…...

图神经网络|8.2 图卷积的计算基本方法

不同于一般的神经网络&#xff0c;网络层数的并不用特别多。 原因是只需要少数次数迭代后&#xff08;当迭代次数为图上的直径&#xff1f;任意两点最短距离的最大值&#xff1f;&#xff09;&#xff0c;某节点便可获取得到图上所有的节点。 通俗的理解是&#xff0c;在社会中…...

equals()与hashCode()方法详解

java.lang.Object类中有两个非常重要的方法&#xff1a; 1 2 public boolean equals(Object obj) public int hashCode() Object类是类继承结构的基础&#xff0c;所以是每一个类的父类。所有的对象&#xff0c;包括数组&#xff0c;都实现了在Object类中定义的方法。 回到…...

六、基于Flask、Flasgger、marshmallow的开发调试

基于Flask、Flasgger、marshmallow的开发调试 问题描述调试方法一调试方法二调试方法三 问题描述 现在有一个传入传出为json格式文件的&#xff0c;Flask-restful开发的程序&#xff0c;需要解决如何调试的问题。 #!/usr/bin/python3 # -*- coding: utf-8 -*- # Project :…...

TypeScript 从入门到进阶之基础篇(三) 元组类型篇

系列文章目录 TypeScript 从入门到进阶系列 TypeScript 从入门到进阶之基础篇(一) ts基础类型篇TypeScript 从入门到进阶之基础篇(二) ts进阶类型篇TypeScript 从入门到进阶之基础篇(三) 元组类型篇TypeScript 从入门到进阶之基础篇(四) symbol类型篇 持续更新中… 文章目录 …...

现代CPU的多种运行模式

目前的CPU大多是支持X86-64技术的兼容CPU&#xff0c;这包括AMD64以及Intel的IA32E&#xff08;后被正式命名为EM64T&#xff0c;Extended Memory 64 Technology&#xff09;&#xff0c;因为AMD64先出&#xff0c;而EM64T与AMD64完全兼容&#xff0c;所以也统一称为AMD64技术。…...

Python PDF处理模块pypdf库详解

概要 PDF&#xff08;Portable Document Format&#xff09;是一种常见的文档格式&#xff0c;广泛用于存储和共享文本和图像数据。在 Python 中&#xff0c;有许多库可以用于处理 PDF 文件&#xff0c;其中之一就是 PyPDF。PyPDF 是一个功能强大的库&#xff0c;它允许你读取…...

C++上位软件通过LibModbus开源库和西门子S7-1200/S7-1500/S7-200 PLC进行ModbusTcp 和ModbusRTU 通信

前言 一直以来上位软件比如C等和西门子等其他品牌PLC之间的数据交换都是大家比较头疼的问题&#xff0c;尤其是C上位软件程序员。传统的方法一般有OPC、Socket 等&#xff0c;直到LibModbus 开源库出现后这种途径对程序袁来说又有了新的选择。 Modbus简介 Modbus特点 1 &#…...

PLSQL Developer 15安装和oracle客户端安装

文章目录 前言一、PLSQL Developer1.下载2.安装 二、oracle客户端1.下载2.环境变量 三、使用1. oci2. 连接3. 配置文件 总结 前言 oracle是经常使用的数据库&#xff0c;PLSQL Developer是众多产品中比较不错的一款工具&#xff0c;接下来我们来介绍PLSQL Developer的安装和使…...

【深度deepin】深度安装,jdk,tomcat,Nginx安装

目录 一 深度 1.1 介绍 1.2 与别的操作系统的优点 二 下载镜像文件及VM安装deepin 三 jdk&#xff0c;tomcat&#xff0c;Nginx安装 3.1 JDK安装 3.2 安装tomcat 3.3 安装nginx 一 深度 1.1 介绍 由深度科技社区开发的开源操作系统&#xff0c;基于Linux内核&#xf…...

解决flask启动报错:ImportError: DLL load failed while importing _dukpy: 找不到指定的程序

现象&#xff1a; 原因&#xff1a;dukpy没有win32执行库 解决办法&#xff1a; 到lfd.uci.edu 第三方库下载dukpy的win32 whl文件 注意&#xff1a; 需要跟你python版本和windows平台&#xff08;32位/64位&#xff09;对应 https://www.lfd.uci.edu/~gohlke/pythonlibs/#…...

腾讯面试总结

腾讯 一面 mysql索引结构&#xff1f;redis持久化策略&#xff1f;zookeeper节点类型说一下&#xff1b;zookeeper选举机制&#xff1f;zookeeper主节点故障&#xff0c;如何重新选举&#xff1f;syn机制&#xff1f;线程池的核心参数&#xff1b;threadlocal的实现&#xff…...

面向对象进阶(static关键字,继承,方法重写,super,this)

文章目录 面向对象进阶部分学习方法&#xff1a;今日内容教学目标 第一章 复习回顾1.1 如何定义类1.2 如何通过类创建对象1.3 封装1.3.1 封装的步骤1.3.2 封装的步骤实现 1.4 构造方法1.4.1 构造方法的作用1.4.2 构造方法的格式1.4.3 构造方法的应用 1.5 this关键字的作用1.5.1…...

Blazor项目如何调用js文件

以下是来自千问的回答并加以整理&#xff1a;&#xff08;说一句&#xff0c;文心3.5所给的回答不完善&#xff0c;根本运行不起来&#xff0c;4.0等有钱了试试&#xff09; 在Blazor项目中引用JavaScript文件&#xff08;.js&#xff09;以实现与JavaScript的互操作&#xff…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

java_网络服务相关_gateway_nacos_feign区别联系

1. spring-cloud-starter-gateway 作用&#xff1a;作为微服务架构的网关&#xff0c;统一入口&#xff0c;处理所有外部请求。 核心能力&#xff1a; 路由转发&#xff08;基于路径、服务名等&#xff09;过滤器&#xff08;鉴权、限流、日志、Header 处理&#xff09;支持负…...

React Native 导航系统实战(React Navigation)

导航系统实战&#xff08;React Navigation&#xff09; React Navigation 是 React Native 应用中最常用的导航库之一&#xff0c;它提供了多种导航模式&#xff0c;如堆栈导航&#xff08;Stack Navigator&#xff09;、标签导航&#xff08;Tab Navigator&#xff09;和抽屉…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例

使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件&#xff0c;常用于在两个集合之间进行数据转移&#xff0c;如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model&#xff1a;绑定右侧列表的值&…...

Linux简单的操作

ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

20个超级好用的 CSS 动画库

分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码&#xff0c;而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库&#xff0c;可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画&#xff0c;可以包含在你的网页或应用项目中。 3.An…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...