Spring boot封装rocket mq 教程
1、rocket mq版本
5.1.3
2、pom引入rocket mq依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency>
3、发送MQ消息工具类
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;@Slf4j
public class MqSendUtil {@SneakyThrowspublic static MessageId sendMq(String topic, String tag, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag(tag)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}@SneakyThrowspublic static MessageId sendMqNoTag(String topic, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}}
4、发送MQ消息测试代码
import cn.hutool.core.util.IdUtil;
import org.recipe.draw.common.util.MqSendUtil;public class MqSendTest {public static void test1() {MqSendUtil.sendMq("demo", "tag", "哈哈哈哈tag", IdUtil.getSnowflakeNextIdStr());}public static void main(String[] args) {test1();}
}
5、MessageContext 消息内容的封装
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Collection;
import java.util.Map;@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageContext {private String messageId;private String topic;private String body;private Map<String, String> properties;private Collection<String> keys;private Long deliveryTimestamp;private String bornHost;private Long bornTimestamp;private int deliveryAttempt;}
6、AbstractMqConsumer 发送mq消息的抽象类
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.boot.CommandLineRunner;import java.nio.charset.StandardCharsets;
import java.util.Collections;@Slf4j
public abstract class AbstractMqConsumer implements CommandLineRunner {public abstract String topic();public abstract String consumerGroup();public abstract String tag();public abstract void process(MessageContext messageContext);@Overridepublic void run(String... args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "127.0.0.1:9080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = StrUtil.isEmpty(tag()) ? "*" : tag();FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。String consumerGroup = consumerGroup();// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = topic();// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。MessageContext context = toMessageContext(messageView);
// log.info("收到mq消息主体内容:{}",context);try {process(context);} catch (Exception e) {log.error("处理mq消息出现异常,消息已自动丢弃,不会再投入队列:", e);}return ConsumeResult.SUCCESS;}).build();log.info("消费者初始化完成,topic:{},tag:{},consumerGroup:{}", topic, tag, consumerGroup);}private MessageContext toMessageContext(MessageView messageView) {Long deliveryTimestamp = messageView.getDeliveryTimestamp().isPresent() ? messageView.getDeliveryTimestamp().get() : null;return MessageContext.builder().messageId(messageView.getMessageId().toString()).topic(messageView.getTopic()).body(StandardCharsets.UTF_8.decode(messageView.getBody()).toString()).properties(messageView.getProperties()).keys(messageView.getKeys()).deliveryTimestamp(deliveryTimestamp).bornHost(messageView.getBornHost()).deliveryAttempt(messageView.getDeliveryAttempt()).build();}}
7、具体的消费类
topic指定消费者订阅的话题,comsumerGroup指明该消费者属于哪一个消费者分组,tag表明是否要获取指定标签的消息,process代表具体的业务处理逻辑,具体消息的内容可以MessageContext 类里面获取
import lombok.extern.slf4j.Slf4j;
import org.recipe.draw.common.mqcomsumer.abstracts.AbstractMqConsumer;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class DemoConsumer extends AbstractMqConsumer {@Overridepublic String topic() {return "demo";}@Overridepublic String consumerGroup() {return "demo";}@Overridepublic String tag() {return null;}@Overridepublic void process(MessageContext messageContext) {log.info("收到消息:{}", messageContext);}
}
相关文章:
Spring boot封装rocket mq 教程
1、rocket mq版本 5.1.3 2、pom引入rocket mq依赖 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency> 3、发送MQ消息工具类 impor…...
Java Swing手搓童年坦克大战游戏(I)
前言 业余偶尔对游戏有些兴趣,不过这样的时代,硬件软件飞速进步,2D游戏画面都无比精美,之前的8bit像素游戏时代早就过去了,不过那时候有许多让人印象深刻的游戏比如魂斗罗、超级玛丽、坦克大战(Battle City)等等。 学…...
【DevOps-04]】Operate阶段工具
一、简要说明 安装Docker安装Docker-compose二、安装Docker 官网地址:https://www.docker.com文档地址:Docker Docs仓库地址:https://hub.docker.com1、Docker相关网站 官方网站Get Docker | Docker Docs...
力扣2807.在链表中插入最大公约数
思路:遍历链表,对于每一个结点求出它与下一个结点的最大公约数并插入到俩个结点之间 代码: /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}…...
开始刷Leetcode之前你需要知道的 - The basic is all you need
数据结构:列表,哈希表,集合,栈,堆,链表,二叉树,图 入门算法:递归,排序算法,二分法,bfs,dfs list/array 列表常见操作&am…...
【PostgreSQL】模式Schema
PostgreSQL 数据库集群包含一个或多个命名数据库。角色和一些其他对象类型在整个集群中共享。与服务器的客户端连接只能访问单个数据库中的数据,该数据库在连接请求中指定。 数据库包含一个或多个命名schema,而这些schema又包含表。schema还包含其他类型…...
JavaScript实现的复杂功能:自动生成带水印的图片
#程序员的崩溃瞬间 在本文中,我们将讨论一个JavaScript实现的复杂功能,该功能可以自动为图片添加水印。这个功能在许多场景中都非常有用,例如,如果你想保护你的图片版权,或者你想在你的网站上显示自定义的水印。 一、…...
图神经网络|8.2 图卷积的计算基本方法
不同于一般的神经网络,网络层数的并不用特别多。 原因是只需要少数次数迭代后(当迭代次数为图上的直径?任意两点最短距离的最大值?),某节点便可获取得到图上所有的节点。 通俗的理解是,在社会中…...
equals()与hashCode()方法详解
java.lang.Object类中有两个非常重要的方法: 1 2 public boolean equals(Object obj) public int hashCode() Object类是类继承结构的基础,所以是每一个类的父类。所有的对象,包括数组,都实现了在Object类中定义的方法。 回到…...
六、基于Flask、Flasgger、marshmallow的开发调试
基于Flask、Flasgger、marshmallow的开发调试 问题描述调试方法一调试方法二调试方法三 问题描述 现在有一个传入传出为json格式文件的,Flask-restful开发的程序,需要解决如何调试的问题。 #!/usr/bin/python3 # -*- coding: utf-8 -*- # Project :…...
TypeScript 从入门到进阶之基础篇(三) 元组类型篇
系列文章目录 TypeScript 从入门到进阶系列 TypeScript 从入门到进阶之基础篇(一) ts基础类型篇TypeScript 从入门到进阶之基础篇(二) ts进阶类型篇TypeScript 从入门到进阶之基础篇(三) 元组类型篇TypeScript 从入门到进阶之基础篇(四) symbol类型篇 持续更新中… 文章目录 …...
现代CPU的多种运行模式
目前的CPU大多是支持X86-64技术的兼容CPU,这包括AMD64以及Intel的IA32E(后被正式命名为EM64T,Extended Memory 64 Technology),因为AMD64先出,而EM64T与AMD64完全兼容,所以也统一称为AMD64技术。…...
Python PDF处理模块pypdf库详解
概要 PDF(Portable Document Format)是一种常见的文档格式,广泛用于存储和共享文本和图像数据。在 Python 中,有许多库可以用于处理 PDF 文件,其中之一就是 PyPDF。PyPDF 是一个功能强大的库,它允许你读取…...
C++上位软件通过LibModbus开源库和西门子S7-1200/S7-1500/S7-200 PLC进行ModbusTcp 和ModbusRTU 通信
前言 一直以来上位软件比如C等和西门子等其他品牌PLC之间的数据交换都是大家比较头疼的问题,尤其是C上位软件程序员。传统的方法一般有OPC、Socket 等,直到LibModbus 开源库出现后这种途径对程序袁来说又有了新的选择。 Modbus简介 Modbus特点 1 &#…...
PLSQL Developer 15安装和oracle客户端安装
文章目录 前言一、PLSQL Developer1.下载2.安装 二、oracle客户端1.下载2.环境变量 三、使用1. oci2. 连接3. 配置文件 总结 前言 oracle是经常使用的数据库,PLSQL Developer是众多产品中比较不错的一款工具,接下来我们来介绍PLSQL Developer的安装和使…...
【深度deepin】深度安装,jdk,tomcat,Nginx安装
目录 一 深度 1.1 介绍 1.2 与别的操作系统的优点 二 下载镜像文件及VM安装deepin 三 jdk,tomcat,Nginx安装 3.1 JDK安装 3.2 安装tomcat 3.3 安装nginx 一 深度 1.1 介绍 由深度科技社区开发的开源操作系统,基于Linux内核…...
解决flask启动报错:ImportError: DLL load failed while importing _dukpy: 找不到指定的程序
现象: 原因:dukpy没有win32执行库 解决办法: 到lfd.uci.edu 第三方库下载dukpy的win32 whl文件 注意: 需要跟你python版本和windows平台(32位/64位)对应 https://www.lfd.uci.edu/~gohlke/pythonlibs/#…...
腾讯面试总结
腾讯 一面 mysql索引结构?redis持久化策略?zookeeper节点类型说一下;zookeeper选举机制?zookeeper主节点故障,如何重新选举?syn机制?线程池的核心参数;threadlocal的实现ÿ…...
面向对象进阶(static关键字,继承,方法重写,super,this)
文章目录 面向对象进阶部分学习方法:今日内容教学目标 第一章 复习回顾1.1 如何定义类1.2 如何通过类创建对象1.3 封装1.3.1 封装的步骤1.3.2 封装的步骤实现 1.4 构造方法1.4.1 构造方法的作用1.4.2 构造方法的格式1.4.3 构造方法的应用 1.5 this关键字的作用1.5.1…...
Blazor项目如何调用js文件
以下是来自千问的回答并加以整理:(说一句,文心3.5所给的回答不完善,根本运行不起来,4.0等有钱了试试) 在Blazor项目中引用JavaScript文件(.js)以实现与JavaScript的互操作ÿ…...
地震勘探——干扰波识别、井中地震时距曲线特点
目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波:可以用来解决所提出的地质任务的波;干扰波:所有妨碍辨认、追踪有效波的其他波。 地震勘探中,有效波和干扰波是相对的。例如,在反射波…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
UE5 学习系列(三)创建和移动物体
这篇博客是该系列的第三篇,是在之前两篇博客的基础上展开,主要介绍如何在操作界面中创建和拖动物体,这篇博客跟随的视频链接如下: B 站视频:s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
系统设计 --- MongoDB亿级数据查询优化策略
系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
leetcodeSQL解题:3564. 季节性销售分析
leetcodeSQL解题:3564. 季节性销售分析 题目: 表:sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
