当前位置: 首页 > news >正文

Spring boot封装rocket mq 教程

1、rocket mq版本

      5.1.3

2、pom引入rocket mq依赖

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency>

3、发送MQ消息工具类

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;@Slf4j
public class MqSendUtil {@SneakyThrowspublic static MessageId sendMq(String topic, String tag, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag(tag)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}@SneakyThrowspublic static MessageId sendMqNoTag(String topic, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}}

4、发送MQ消息测试代码

import cn.hutool.core.util.IdUtil;
import org.recipe.draw.common.util.MqSendUtil;public class MqSendTest {public static void test1() {MqSendUtil.sendMq("demo", "tag", "哈哈哈哈tag", IdUtil.getSnowflakeNextIdStr());}public static void main(String[] args) {test1();}
}

5、MessageContext 消息内容的封装

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Collection;
import java.util.Map;@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageContext {private String messageId;private String topic;private String body;private Map<String, String> properties;private Collection<String> keys;private Long deliveryTimestamp;private String bornHost;private Long bornTimestamp;private int deliveryAttempt;}

6、AbstractMqConsumer 发送mq消息的抽象类

import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.boot.CommandLineRunner;import java.nio.charset.StandardCharsets;
import java.util.Collections;@Slf4j
public abstract class AbstractMqConsumer implements CommandLineRunner {public abstract String topic();public abstract String consumerGroup();public abstract String tag();public abstract void process(MessageContext messageContext);@Overridepublic void run(String... args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "127.0.0.1:9080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = StrUtil.isEmpty(tag()) ? "*" : tag();FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。String consumerGroup = consumerGroup();// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = topic();// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。MessageContext context = toMessageContext(messageView);
//                    log.info("收到mq消息主体内容:{}",context);try {process(context);} catch (Exception e) {log.error("处理mq消息出现异常,消息已自动丢弃,不会再投入队列:", e);}return ConsumeResult.SUCCESS;}).build();log.info("消费者初始化完成,topic:{},tag:{},consumerGroup:{}", topic, tag, consumerGroup);}private MessageContext toMessageContext(MessageView messageView) {Long deliveryTimestamp = messageView.getDeliveryTimestamp().isPresent() ? messageView.getDeliveryTimestamp().get() : null;return MessageContext.builder().messageId(messageView.getMessageId().toString()).topic(messageView.getTopic()).body(StandardCharsets.UTF_8.decode(messageView.getBody()).toString()).properties(messageView.getProperties()).keys(messageView.getKeys()).deliveryTimestamp(deliveryTimestamp).bornHost(messageView.getBornHost()).deliveryAttempt(messageView.getDeliveryAttempt()).build();}}

7、具体的消费类

topic指定消费者订阅的话题,comsumerGroup指明该消费者属于哪一个消费者分组,tag表明是否要获取指定标签的消息,process代表具体的业务处理逻辑,具体消息的内容可以MessageContext 类里面获取

import lombok.extern.slf4j.Slf4j;
import org.recipe.draw.common.mqcomsumer.abstracts.AbstractMqConsumer;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class DemoConsumer extends AbstractMqConsumer {@Overridepublic String topic() {return "demo";}@Overridepublic String consumerGroup() {return "demo";}@Overridepublic String tag() {return null;}@Overridepublic void process(MessageContext messageContext) {log.info("收到消息:{}", messageContext);}
}

相关文章:

Spring boot封装rocket mq 教程

1、rocket mq版本 5.1.3 2、pom引入rocket mq依赖 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency> 3、发送MQ消息工具类 impor…...

Java Swing手搓童年坦克大战游戏(I)

前言 业余偶尔对游戏有些兴趣&#xff0c;不过这样的时代&#xff0c;硬件软件飞速进步&#xff0c;2D游戏画面都无比精美&#xff0c;之前的8bit像素游戏时代早就过去了&#xff0c;不过那时候有许多让人印象深刻的游戏比如魂斗罗、超级玛丽、坦克大战(Battle City)等等。 学…...

【DevOps-04]】Operate阶段工具

一、简要说明 安装Docker安装Docker-compose二、安装Docker 官网地址:https://www.docker.com文档地址:Docker Docs仓库地址:https://hub.docker.com1、Docker相关网站 官方网站Get Docker | Docker Docs...

力扣2807.在链表中插入最大公约数

思路&#xff1a;遍历链表&#xff0c;对于每一个结点求出它与下一个结点的最大公约数并插入到俩个结点之间 代码&#xff1a; /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}…...

开始刷Leetcode之前你需要知道的 - The basic is all you need

数据结构&#xff1a;列表&#xff0c;哈希表&#xff0c;集合&#xff0c;栈&#xff0c;堆&#xff0c;链表&#xff0c;二叉树&#xff0c;图 入门算法&#xff1a;递归&#xff0c;排序算法&#xff0c;二分法&#xff0c;bfs&#xff0c;dfs list/array 列表常见操作&am…...

【PostgreSQL】模式Schema

PostgreSQL 数据库集群包含一个或多个命名数据库。角色和一些其他对象类型在整个集群中共享。与服务器的客户端连接只能访问单个数据库中的数据&#xff0c;该数据库在连接请求中指定。 数据库包含一个或多个命名schema&#xff0c;而这些schema又包含表。schema还包含其他类型…...

JavaScript实现的复杂功能:自动生成带水印的图片

#程序员的崩溃瞬间 在本文中&#xff0c;我们将讨论一个JavaScript实现的复杂功能&#xff0c;该功能可以自动为图片添加水印。这个功能在许多场景中都非常有用&#xff0c;例如&#xff0c;如果你想保护你的图片版权&#xff0c;或者你想在你的网站上显示自定义的水印。 一、…...

图神经网络|8.2 图卷积的计算基本方法

不同于一般的神经网络&#xff0c;网络层数的并不用特别多。 原因是只需要少数次数迭代后&#xff08;当迭代次数为图上的直径&#xff1f;任意两点最短距离的最大值&#xff1f;&#xff09;&#xff0c;某节点便可获取得到图上所有的节点。 通俗的理解是&#xff0c;在社会中…...

equals()与hashCode()方法详解

java.lang.Object类中有两个非常重要的方法&#xff1a; 1 2 public boolean equals(Object obj) public int hashCode() Object类是类继承结构的基础&#xff0c;所以是每一个类的父类。所有的对象&#xff0c;包括数组&#xff0c;都实现了在Object类中定义的方法。 回到…...

六、基于Flask、Flasgger、marshmallow的开发调试

基于Flask、Flasgger、marshmallow的开发调试 问题描述调试方法一调试方法二调试方法三 问题描述 现在有一个传入传出为json格式文件的&#xff0c;Flask-restful开发的程序&#xff0c;需要解决如何调试的问题。 #!/usr/bin/python3 # -*- coding: utf-8 -*- # Project :…...

TypeScript 从入门到进阶之基础篇(三) 元组类型篇

系列文章目录 TypeScript 从入门到进阶系列 TypeScript 从入门到进阶之基础篇(一) ts基础类型篇TypeScript 从入门到进阶之基础篇(二) ts进阶类型篇TypeScript 从入门到进阶之基础篇(三) 元组类型篇TypeScript 从入门到进阶之基础篇(四) symbol类型篇 持续更新中… 文章目录 …...

现代CPU的多种运行模式

目前的CPU大多是支持X86-64技术的兼容CPU&#xff0c;这包括AMD64以及Intel的IA32E&#xff08;后被正式命名为EM64T&#xff0c;Extended Memory 64 Technology&#xff09;&#xff0c;因为AMD64先出&#xff0c;而EM64T与AMD64完全兼容&#xff0c;所以也统一称为AMD64技术。…...

Python PDF处理模块pypdf库详解

概要 PDF&#xff08;Portable Document Format&#xff09;是一种常见的文档格式&#xff0c;广泛用于存储和共享文本和图像数据。在 Python 中&#xff0c;有许多库可以用于处理 PDF 文件&#xff0c;其中之一就是 PyPDF。PyPDF 是一个功能强大的库&#xff0c;它允许你读取…...

C++上位软件通过LibModbus开源库和西门子S7-1200/S7-1500/S7-200 PLC进行ModbusTcp 和ModbusRTU 通信

前言 一直以来上位软件比如C等和西门子等其他品牌PLC之间的数据交换都是大家比较头疼的问题&#xff0c;尤其是C上位软件程序员。传统的方法一般有OPC、Socket 等&#xff0c;直到LibModbus 开源库出现后这种途径对程序袁来说又有了新的选择。 Modbus简介 Modbus特点 1 &#…...

PLSQL Developer 15安装和oracle客户端安装

文章目录 前言一、PLSQL Developer1.下载2.安装 二、oracle客户端1.下载2.环境变量 三、使用1. oci2. 连接3. 配置文件 总结 前言 oracle是经常使用的数据库&#xff0c;PLSQL Developer是众多产品中比较不错的一款工具&#xff0c;接下来我们来介绍PLSQL Developer的安装和使…...

【深度deepin】深度安装,jdk,tomcat,Nginx安装

目录 一 深度 1.1 介绍 1.2 与别的操作系统的优点 二 下载镜像文件及VM安装deepin 三 jdk&#xff0c;tomcat&#xff0c;Nginx安装 3.1 JDK安装 3.2 安装tomcat 3.3 安装nginx 一 深度 1.1 介绍 由深度科技社区开发的开源操作系统&#xff0c;基于Linux内核&#xf…...

解决flask启动报错:ImportError: DLL load failed while importing _dukpy: 找不到指定的程序

现象&#xff1a; 原因&#xff1a;dukpy没有win32执行库 解决办法&#xff1a; 到lfd.uci.edu 第三方库下载dukpy的win32 whl文件 注意&#xff1a; 需要跟你python版本和windows平台&#xff08;32位/64位&#xff09;对应 https://www.lfd.uci.edu/~gohlke/pythonlibs/#…...

腾讯面试总结

腾讯 一面 mysql索引结构&#xff1f;redis持久化策略&#xff1f;zookeeper节点类型说一下&#xff1b;zookeeper选举机制&#xff1f;zookeeper主节点故障&#xff0c;如何重新选举&#xff1f;syn机制&#xff1f;线程池的核心参数&#xff1b;threadlocal的实现&#xff…...

面向对象进阶(static关键字,继承,方法重写,super,this)

文章目录 面向对象进阶部分学习方法&#xff1a;今日内容教学目标 第一章 复习回顾1.1 如何定义类1.2 如何通过类创建对象1.3 封装1.3.1 封装的步骤1.3.2 封装的步骤实现 1.4 构造方法1.4.1 构造方法的作用1.4.2 构造方法的格式1.4.3 构造方法的应用 1.5 this关键字的作用1.5.1…...

Blazor项目如何调用js文件

以下是来自千问的回答并加以整理&#xff1a;&#xff08;说一句&#xff0c;文心3.5所给的回答不完善&#xff0c;根本运行不起来&#xff0c;4.0等有钱了试试&#xff09; 在Blazor项目中引用JavaScript文件&#xff08;.js&#xff09;以实现与JavaScript的互操作&#xff…...

测试微信模版消息推送

进入“开发接口管理”--“公众平台测试账号”&#xff0c;无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息&#xff1a; 关注测试号&#xff1a;扫二维码关注测试号。 发送模版消息&#xff1a; import requests da…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题&#xff1a; 下面创建一个简单的Flask RESTful API示例。首先&#xff0c;我们需要创建环境&#xff0c;安装必要的依赖&#xff0c;然后…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

STM32F4基本定时器使用和原理详解

STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

vue3 字体颜色设置的多种方式

在Vue 3中设置字体颜色可以通过多种方式实现&#xff0c;这取决于你是想在组件内部直接设置&#xff0c;还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法&#xff1a; 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

selenium学习实战【Python爬虫】

selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...