当前位置: 首页 > news >正文

RocketMQ的demo代码

下面是一个使用Java实现的RocketMQ示例代码,用于发送和消费消息:

首先,您需要下载并安装RocketMQ,并启动NameServer和Broker。

接下来,您可以使用以下示例代码来发送和消费消息:

Producer.java文件:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) {try {// 创建一个DefaultMQProducer实例DefaultMQProducer producer = new DefaultMQProducer("producer_group");// 设置NameServer地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();// 创建消息对象Message message = new Message("topic_name", "tag", "Hello, RocketMQ!".getBytes());// 发送消息producer.send(message);// 关闭Producer实例producer.shutdown();} catch (Exception e) {e.printStackTrace();}}
}

Consumer.java文件:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) {try {// 创建一个DefaultMQPushConsumer实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");// 设置NameServer地址consumer.setNamesrvAddr("localhost:9876");// 设置消费开始位置consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅主题和标签consumer.subscribe("topic_name", "tag");// 注册消息监听器consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("Received message: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动Consumer实例consumer.start();} catch (Exception e) {e.printStackTrace();}}
}

在上述示例代码中,Producer类用于发送消息,而Consumer类用于接收和处理消息。

在Producer类中,我们创建一个DefaultMQProducer实例,并设置NameServer的地址。然后,我们创建一个消息对象,指定主题、标签和消息内容。最后,通过调用send方法发送消息。

在Consumer类中,我们创建一个DefaultMQPushConsumer实例,并设置NameServer的地址。然后,我们设置消费开始位置和订阅特定的主题和标签。注册一个消息监听器,用于处理接收到的消息。最后,调用start方法启动Consumer实例。

请确保您已正确配置RocketMQ服务器和相关依赖项,并根据需要更改服务器地址、主题和标签。运行Producer和Consumer代码后,您将看到Producer发送的消息在Consumer端被接收和打印出来。



当然!这里是另一个使用Java实现的RocketMQ示例代码,用于发送事务消息:

首先,您需要下载并安装RocketMQ,并启动NameServer和Broker。

接下来,您可以使用以下示例代码来发送和处理事务消息:

TransactionProducer.java文件:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public class TransactionProducer {public static void main(String[] args) {try {// 创建一个TransactionMQProducer实例TransactionMQProducer producer = new TransactionMQProducer("producer_group");// 设置NameServer地址producer.setNamesrvAddr("localhost:9876");// 设置事务监听器producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务,根据业务逻辑返回LocalTransactionState// 如果事务执行成功,返回COMMIT_MESSAGE// 如果事务执行失败,返回ROLLBACK_MESSAGE// 如果事务状态不确定,返回UNKNOWreturn LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 检查本地事务状态,根据消息内容返回LocalTransactionState// 如果事务已提交,返回COMMIT_MESSAGE// 如果事务已回滚,返回ROLLBACK_MESSAGE// 如果事务状态不确定,返回UNKNOWreturn LocalTransactionState.COMMIT_MESSAGE;}});// 启动Producer实例producer.start();// 创建消息对象Message message = new Message("topic_name", "tag", "Hello, RocketMQ!".getBytes());// 发送事务消息TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);System.out.println("Transaction Send Result: " + sendResult.getLocalTransactionState());// 关闭Producer实例producer.shutdown();} catch (Exception e) {e.printStackTrace();}}
}

在上述示例代码中,我们创建了一个TransactionMQProducer实例,并设置了NameServer的地址。然后,我们设置了事务监听器,其中包含executeLocalTransaction方法和checkLocalTransaction方法。在executeLocalTransaction方法中,您可以执行本地事务操作,并根据事务结果返回适当的LocalTransactionState。在checkLocalTransaction方法中,您可以检查本地事务状态,并返回相应的LocalTransactionState。

在主程序中,我们创建了一个消息对象,并通过调用sendMessageInTransaction方法发送事务消息。然后,我们打印事务发送结果的LocalTransactionState。

请确保您已正确配置RocketMQ服务器和相关依赖项,并根据需要更改服务器地址、主题和标签。运行TransactionProducer代码后,您将看到事务消息发送的结果。根据本地事务执行的结果和检查的结果,LocalTransactionState将被设置为相应的状态(

COMMIT_MESSAGE、ROLLBACK_MESSAGE或UNKNOW)。



当然!这里是另一个使用Java实现的RocketMQ示例代码,用于顺序消息的发送和消费:

首先,您需要下载并安装RocketMQ,并启动NameServer和Broker。

接下来,您可以使用以下示例代码来发送和消费顺序消息:

顺序消息生产者 (OrderedProducer.java):

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueueSelector;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;public class OrderedProducer {public static void main(String[] args) {try {// 创建一个DefaultMQProducer实例DefaultMQProducer producer = new DefaultMQProducer("producer_group");// 设置NameServer地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();// 发送顺序消息for (int i = 0; i < 10; i++) {Message message = new Message("topic_name", "tag", ("Hello, RocketMQ! " + i).getBytes());SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg;int index = orderId % mqs.size();return mqs.get(index);}}, i);System.out.println("Send Result: " + sendResult);}// 关闭Producer实例producer.shutdown();} catch (Exception e) {e.printStackTrace();}}
}

顺序消息消费者 (OrderedConsumer.java):

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class OrderedConsumer {public static void main(String[] args) {try {// 创建一个DefaultMQPushConsumer实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");// 设置NameServer地址consumer.setNamesrvAddr("localhost:9876");// 设置消费模式为集群模式consumer.setMessageModel(MessageModel.CLUSTERING);// 订阅主题和标签consumer.subscribe("topic_name", "tag");// 注册消息监听器consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("Received message: " + new String(msg.getBody()) + ", QueueId: " + msg.getQueueId());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动Consumer实例consumer.start();} catch (Exception e) {e.printStackTrace();}}
}

在上述示例代码中,OrderedProducer类用于发送顺序消息,而OrderedConsumer类用于接收和处理顺序消息。

在OrderedProducer类中,我们创建了一个DefaultMQProducer实例,并设置了Name

Server的地址。然后,我们使用循环发送了10条顺序消息。在发送消息时,我们使用MessageQueueSelector来选择要发送的消息队列,并通过参数指定了消息的顺序。每条消息都会返回一个SendResult对象。

在OrderedConsumer类中,我们创建了一个DefaultMQPushConsumer实例,并设置了NameServer的地址。我们还设置了消费模式为集群模式,通过subscribe方法订阅特定的主题和标签。注册了一个消息监听器,用于处理接收到的消息。收到的消息将被打印出来,并附带消息所在的队列ID。

请确保您已正确配置RocketMQ服务器和相关依赖项,并根据需要更改服务器地址、主题和标签。运行OrderedProducer和OrderedConsumer代码后,您将看到顺序消息发送和消费的结果。每条消息都将按照发送时指定的顺序被消费。

相关文章:

RocketMQ的demo代码

下面是一个使用Java实现的RocketMQ示例代码&#xff0c;用于发送和消费消息&#xff1a; 首先&#xff0c;您需要下载并安装RocketMQ&#xff0c;并启动NameServer和Broker。 接下来&#xff0c;您可以使用以下示例代码来发送和消费消息&#xff1a; Producer.java文件&…...

C++ 连接、操作postgreSQL(基于libpq库)

C++ 连接postgreSQL(基于libpq库) 1.环境2.数据库操作2.1. c++ 连接数据库2.2. c++ 删除数据库属性表内容2.3. c++ 插入数据库属性表内容2.4 c++ 关闭数据库1.环境 使用libpq库来链接postgresql数据库,主要用到的头文件是这个: #include "libpq-fe.h"2.数据库操…...

Node.js技术简介及其在Web开发中的应用

Node.js是一个基于Chrome V8引擎的JavaScript运行时环境&#xff0c;使得JavaScript能够在服务器端运行。Node.js采用事件驱动、非阻塞I/O模型&#xff0c;能够处理大量并发请求&#xff0c;非常适合处理I/O密集型的应用程序。本文将介绍Node.js的特点、优势以及在Web开发中的应…...

时间序列分析:原理与MATLAB实现

2023年9月数学建模国赛期间提供ABCDE题思路加Matlab代码,专栏链接(赛前一个月恢复源码199,欢迎大家订阅):http://t.csdn.cn/Um9Zd 目录 1. 时间序列分析简介 2. 自回归模型(AR) 2.1. 参数估计 2.2. MATLAB实现...

mysql排序之if(isnull(字段名),0,1),字段名 或者 if(isnull(字段名),1,0),字段名

mysql排序之if(isnull(字段名),0,1),字段名 或者 if(isnull(字段名),1,0),字段名 默认情况下&#xff0c;MySQL将null算作最小值。如果想要手动指定null的顺序&#xff0c;可以这样处理&#xff1a; 将null强制放在最前 //null, null, 1,2,3,4&#xff08;默认就是这样&#…...

华为OD机试真题 Java 实现【递增字符串】【2023Q1 200分】,附详细解题思路

一、题目描述 定义字符串完全由“A’和B"组成,当然也可以全是"A"或全是"B。如果字符串从前往后都是以字典序排列的,那么我们称之为严格递增字符串。 给出一个字符串5,允许修改字符串中的任意字符,即可以将任何的"A"修改成"B,也可以将…...

合并文件解决HiveServer2内存溢出方案

一、文件过多导致HiveServer2内存溢出 1.1查看表文件个数 desc formatted yanyu.tmp• 表文件数量为6522102 1.2查看表文件信息 hadoop fs -ls warehouse/yanyu.db/tmp• 分区为string 类型的time字段&#xff0c;分了2001个区。 1.3.查看某个分区下的文件个数为10000个 …...

韧性数据安全体系缘起与三个目标 |CEO专栏

今年4月&#xff0c;美创科技在数据安全领域的新探索——“韧性”数据安全防护体系框架正式发布亮相。 为帮您更深入了解“韧性数据安全”&#xff0c;我们特别推出专栏“构建适应性进化的韧性数据安全体系”&#xff0c;CEO柳遵梁亲自执笔&#xff0c;进行系列解读分享。 首期…...

华为OD机试真题 Java 实现【火车进站】【牛客练习题】

一、题目描述 给定一个正整数N代表火车数量,0<N<10,接下来输入火车入站的序列,一共N辆火车,每辆火车以数字1-9编号,火车站只有一个方向进出,同时停靠在火车站的列车中,只有后进站的出站了,先进站的才能出站。 要求输出所有火车出站的方案,以字典序排序输出。 …...

c#快速入门(下)

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析2 目录 &#x1f449;&#x1f3fb;Inline和lambda委托和lambda &#x1f449;&#x1f…...

基于深度学习的目标姿态检测方法_kaic

目录 摘要 第1章 引言 1.1 研究背景和意义 1.2 国内外研究现状 1.3 主要内容 第2章 单目相机的目标姿态检测技术 2.1单目相机的工作原理 2.2目标姿态检测 2.3已有的目标姿态检测方法及其局限性 2.4本章总结 第3章 构建数据集 3.1 数据集来源 3.2数据集标注 3.3数据集分析 3.4本…...

Pycharm设置Python每个文件开头自定义模板(带上声明字符编码、作者名、时间等)

Pycharm设置地址&#xff1a; 在File---settings---Editor---File and Code Templates---Python script 脚本里添加: 模板声明设置参考&#xff1a; # ---encoding:utf-8--- # Time : ${DATE} ${HOUR}:${MINUTE} # Author : 作者名 # Email &#xff1a;你的邮箱 # Sit…...

Gem相关操作命令

Gem相关操作命令 gem -v # 查看 gem 版本gem source # 查看 gem 配置源 gem source -l # 查看 gem 配置源目录 gem sources -a url # 添加 gem 配置源&#xff08;url 需换成网址&#xff09; gem sources --add url # 添加 gem 配置源&#xff08;url 需换成网址&#xff09;…...

软件测试2023年行情怎么样?仔细讲解!

目录 前言&#xff1a; 普通功能测试人员不建议跳槽 还有一个要求就是要对业务的极致理解 那么产业互联网趋势会导致什么呢&#xff1f; 现在跳槽涨薪需要掌握到什么样的技术呢&#xff1f; 给大家一些跳槽建议 前言&#xff1a; 软件测试是为了发现程序中的错误而执行程序的…...

【1130. 叶值的最小代价生成树】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 给你一个正整数数组 arr&#xff0c;考虑所有满足以下条件的二叉树&#xff1a; 每个节点都有 0 个或是 2 个子节点。数组 arr 中的值与树的中序遍历中每个叶节点的值一一对应。每个非叶节点的值等于…...

Linux各个目录的全称及含义

/ 根目录&#xff0c;包含整个文件系统的根节点。 /bin : Binary Directory 二进制文件目录&#xff0c;包含一些基本的可执行程序。 /boot : Boot Directory 包含启动系统所需的文件&#xff0c;如内核和引导程序。 /dev : Device Directory 设备文件目录&#xff0c;…...

Cookie和Session原理详解

目录 前言 Cookie Session 会话机制 Cookie和Session的区别 Servlet中对Session和Cookie的封装 代码实例&#xff1a;实现用户登录 约定前后端交互的接口 前端页面&#xff1a; 后端实现 login index 总结 前言 在web的发展史中&#xff0c;我们知道浏览器和服务…...

小程序自动化测试

背景 近期团队打算做一个小程序自动化测试的工具&#xff0c;期望能够做到业务人员操作一遍小程序后&#xff0c;自动还原之前的操作路径&#xff0c;并且捕获操作过程中发生的异常&#xff0c;以此来判断这次发布是否会影响小程序的基础功能。 上述描述看似简单&#xff0c;…...

【linux系统操作】 - 技术一览

文章目录 1. 用户管理2. 文件管理3. 文件系统4. 字符处理5. 网络管理6. 进程管理7. 软件安装8. vi和vim编辑器9. 正则表达式 1. 用户管理 1.用户和用户组 2.账号管理 新增和删除用户、组&#xff1b;检查用户信息切换用户信息、用其他用户身份执行例行任务管理 : 周期性执行任…...

yield和sleep 区别

yield和sleep对比 sleepyieldsleep会导致当前线程暂停指定的时间&#xff0c;没有CPU时间片的消耗。yield只是对CPU调度器的一个提示&#xff0c;如果CPU调度器没有忽略这个提示&#xff0c;它会导致线程上下文的切换。sleep会使线程短暂block&#xff0c;会在给定的时间内释放…...

Vim 调用外部命令学习笔记

Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

Appium+python自动化(十六)- ADB命令

简介 Android 调试桥(adb)是多种用途的工具&#xff0c;该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具&#xff0c;其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利&#xff0c;如安装和调试…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

3-11单元格区域边界定位(End属性)学习笔记

返回一个Range 对象&#xff0c;只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意&#xff1a;它移动的位置必须是相连的有内容的单元格…...

根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要

根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分&#xff1a; 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...

HTML前端开发:JavaScript 获取元素方法详解

作为前端开发者&#xff0c;高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法&#xff0c;分为两大系列&#xff1a; 一、getElementBy... 系列 传统方法&#xff0c;直接通过 DOM 接口访问&#xff0c;返回动态集合&#xff08;元素变化会实时更新&#xff09;。…...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 &#xff0c;不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源&#xff08;最常用&#xff09; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...