【RocketMQ入门-安装部署与Java API测试】
【RocketMQ入门-安装部署与Java API测试】
- 一、环境说明
- 二、安装部署
- 三、Java API 编写Producer和Consumer进行测试
- 四、小结
一、环境说明
- 虚拟机VWMare:安装centos7.6操作系统
- 源码包:rocketmq-all-5.1.3-source-release.zip
- 单master部署,在一台虚拟机上安装部署name server和proxy以及broker
- 流程图:

二、安装部署
-
源码包安装需要事先安装部署maven,下载apache-maven-3.6.3-bin.tar.gz安装包,然后解压并配置环境变量,如下命令:
tar -zvxf apache-maven-3.6.3-bin.tar.gz -C /training/配置环境变量(此处是用root安装),编辑:
vi ~/.bash_profile,在文件末尾添加如下内容:#maven export MVN_HOME=/training/apache-maven-3.6.3 export PATH=$MVN_HOME/bin:$PATH执行:
source ~/.bash_profile使环境生效。 -
进入/training/apache-maven-3.6.3/conf目录下,配置maven的仓库为阿里云和华为云仓库,执行如下命令:
cd /training/apache-maven-3.6.3/conf/ mv settings.xml settings.xml.backup vi settings.xml在打开的settings.xml中,粘贴如下内容即可:
<?xml version="1.0" encoding="utf-8"?> <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation=" http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"><mirrors><mirror><id>aliyunmaven</id><mirrorOf>*</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></mirror><mirror><id>huaweicloud</id><mirrorOf>central</mirrorOf><name>huaweicloud maven</name><url>https://mirrors.huaweicloud.com/repository/maven/</url></mirror></mirrors><profiles><profile><repositories><repository><id>central</id><url>https://maven.aliyun.com/repository/central</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories></profile></profiles> </settings> -
由于CentOS7.6最小模式安装没有unzip命令,需要事先安装,执行如下命令安装:
yum install unzip -y -
解压源码包rocketmq-all-5.1.3-source-release.zip,进入到解压后的目录下,然后编译安装,执行如下命令:
unzip rocketmq-all-5.1.3-source-release.zip cd rocketmq-all-5.1.3-source-release/ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U -
第5步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动NameServer,执行如下命令:
cd /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3 nohup sh bin/mqnamesrv & -
验证NameServer是否启动成功,执行如下命令:
tail -f ~/logs/rocketmqlogs/namesrv.log会看到如下内容,说明已经正常启动了
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
或者执行jps命令查看是否已经有了NameServer进程:NamesrvStartup,如有说明ok -
第5、6步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动Broker和Proxy,执行如下命令:
注意:NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考其他教程。cd /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3 nohup sh bin/mqbroker -n localhost:9876 --enable-proxy & -
验证NameServer是否启动成功,执行如下命令:
tail -f ~/logs/rocketmqlogs/proxy.log会看到如下内容,说明已经正常启动了
The broker[broker-a, 192.168.36.132:10911] boot success. serializeType=JSON and name server is localhost:9876
或者执行jps命令查看是否已经有了:ProxyStartup进程,如有说明ok
三、Java API 编写Producer和Consumer进行测试
- 上述正常启动NameServer和Broker及Proxy后,首先需要创建名为
TestTopic的Topic,执行如下命令:
查看新创建的Topic,验证是否已经创建好,执行:cd /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3 sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
结果如下:sh bin/mqadmin topicList -n localhost:9876

- 创建消费者组,执行如下命令:
执行命令无任何错误即说明已经创建成功。cd /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3 sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876 - 在Idea中创建Maven工程,添加rocketmq依赖,添加如下依赖到pom.xml中:
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><rocketmq-client-java-version>5.0.5</rocketmq-client-java-version><slf4j.version>1.7.25</slf4j.version> </properties><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>${rocketmq-client-java-version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency> </dependencies> - 编写ProducerTest生产者,代码如下:
import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientConfigurationBuilder; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException;public class ProducerTest {private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);public static void main(String[] args) throws Exception {testMain();}public static void testMain() throws ClientException, IOException {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "192.168.36.132:8081";// 消息发送的目标Topic名称,需要提前创建。// 执行:sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultClusterString topic = "TestTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();int temp = 0;while (true) {String msg = "第 " + temp + " 条消息,我喜欢rocketmq!!";temp++;// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")// 消息体。.setBody(msg.getBytes()).build();try {// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);Thread.sleep(1000);logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Exception e) {logger.error("Failed to send message", e);}}// producer.close();} } - 编写CommonUtils工具类,用于将ByteBuffer转成String,代码如下:
import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets;public class CommonUtils {public static void main(String[] args) {System.out.println("Hello world!");}public static String decodeKey(ByteBuffer bytes) {Charset charset = StandardCharsets.UTF_8;return charset.decode(bytes).toString();}public static byte[] decodeValue(ByteBuffer bytes) {int len = bytes.limit() - bytes.position();byte[] bytes1 = new byte[len];bytes.get(bytes1);return bytes1;}public static ByteBuffer encodeKey(String key) {return ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8));}public static ByteBuffer encodeValue(byte[] value) {ByteBuffer byteBuffer = ByteBuffer.allocate(value.length);byteBuffer.clear();byteBuffer.get(value, 0, value.length);return byteBuffer;} } - 编写ConsumerTest生产者,代码如下:
import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.rocketmq.producer.CommonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class PushConsumerTest {private static final Logger logger = LoggerFactory.getLogger(PushConsumerTest.class);private PushConsumerTest() {}public static void main(String[] args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "192.168.36.132:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。// 执行:sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876String consumerGroup = "testgroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = "TestTopic";// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。logger.info("Consume message successfully, messageId={},messageBody={}", messageView.getMessageId(), CommonUtils.decodeKey(messageView.getBody()));return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);// 如果不需要再使用 PushConsumer,可关闭该实例。// pushConsumer.close();} } - 为了能查看到控制台日志输入,需要在resources目录下新建log4j.properties、log4j2.properties,具体内容如下:
log4j.properties内容:log4j.rootLogger=INFO,consolelog4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%nlog4j2.properties内容:name = PropertiesConfig property.filename = target/logs#appenders = console, file #配置值是appender的类型,并不是具体appender实例的name appenders = rollingappender.rolling.type = RollingFile appender.rolling.name = RollingLogFile appender.rolling.fileName=${filename}/automationlogs.log appender.rolling.filePattern = ${filename}/automationlogs-%d{MM-dd-yy-HH-mm-ss}-%i.log appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 5rootLogger.level = INFO,console rootLogger.appenderRef.rolling.ref = RollingLogFile - 到此,完成了所有准备工作了,整个工程如下所示:

- 运行ProducerTest程序进行消息的发送,控制台中会看到如下内容:

- 运行ConsumerTest程序接收消息,控制台中会看到如下内容:

四、小结
至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们也通过编写Java程序进行简单的消息收发。如本文对您有帮助,麻烦您动动发财的手指点个赞~~~~~,谢谢您的阅读!!!
相关文章:
【RocketMQ入门-安装部署与Java API测试】
【RocketMQ入门-安装部署与Java API测试】 一、环境说明二、安装部署三、Java API 编写Producer和Consumer进行测试四、小结 一、环境说明 虚拟机VWMare:安装centos7.6操作系统源码包:rocketmq-all-5.1.3-source-release.zip单master部署,在…...
SystemVerilog之覆盖率详解
文章目录 1.0 覆盖率前言1.1 覆盖率类型1.2 覆盖策略及覆盖组1.3 覆盖率数据采样1.3.1 bin的创建与使用1.3.2 条件覆盖率1.3.3 翻转覆盖率1.3.4 wildcard覆盖率1.3.5 忽略bin与非法bin 1.4 交叉覆盖率1.4.1 排除部分cross bin1.4.2 精细化交叉覆盖率1.4.3 单个实例的覆盖率1.4.…...
Qt Designer设计的界面如何显示、即运行显示窗口界面
首先利用Qt Designer设计.ui文件,然后采用Tools->External Tools->PyUIC转换成.py文件。这个.py文件是.ui文件编译而来的,将这种文件由.ui文件编译而来的.py文件称之为界面文件。由于界面文件每次编译时候都会初始化,所以需要新建一个.…...
vue3的setup的使用和原理解析
setup是Vue 3中引入的一个新的组件选项。它是一个在组件实例创建之前被调用的函数,用于设置组件的初始状态、计算属性、方法等。setup函数是Vue 3中函数式组件的核心部分,它提供了一种新的方式来编写组件逻辑。 使用setup函数有以下几个步骤:…...
Spring boot中的线程池-ThreadPoolTaskExecutor
一、jdk的阻塞队列: 二、Spring boot工程的有哪些阻塞队列呢? 1、默认注入的ThreadPoolTaskExecutor 视频解说: 线程池篇-springboot项目中的service层里简单注入ThreadPoolTaskExecutor并且使用_哔哩哔哩_bilibili 程序代码:…...
pgsql checkpoint机制(1)
检查点触发时机 检查点间隔时间由checkpoint_timeout设置pg_xlog中wall段文件总大小超过参数max_WAL_size的值postgresql服务器在smart或fast模式下关闭手动checkpoint 为什么需要检查点? 定期保持修改过的数据块作为实例恢复时起始位置(问题…...
微信小程序 map地图(轨迹)
allMarkers效果图 废话少说直接上马(最后是我遇到的问题) cover-view是气泡弹窗,可以自定义弹窗,要配合js:customCallout,如果是非自定义的话:callout(可以修改颜色、边框宽度、圆角…...
【钉钉接口】bpms_task_change、bpms_instance_change 的区别及举例
bpms_task_change:审批任务回调,是针对审批任务状态的推送。如审批人执行审批、审批人转交审批等针对具体某个审批节点的操作,属于 bpms_task_change 事件类型。bpms_instance_change:审批实例回调,是针对审批实例状态…...
vue左右div结构手动拉伸并且echarts图表根据拉伸宽高自适应
需求: 左右结构的div,可以根据数据抬起按下进行拉伸修改容器宽度的操作给左右结构某一图表设置拉伸自适应左右结构都设置个最小宽度,只能到一定区域内拉伸解决echarts的bug(重复加载chart实例):[ECharts] …...
开发工具Eclipse的使用
🥳🥳Welcome Huihuis Code World ! !🥳🥳 接下来看看由辉辉所写的关于Eclipse使用的相关操作吧 目录 🥳🥳Welcome Huihuis Code World ! !🥳🥳 一.Eclipse是什么 二.使用Eclipse的…...
DrawerLayout布局使用教程Android侧边栏导航完全指南:创建简单实用的导航抽屉
导航抽屉(侧边栏)在现代移动应用中扮演着关键角色,提供了流畅的用户导航体验。本文将带您从头开始,逐步创建一个基本的 Android 侧边栏导航示例,为您的应用增添更多交互魅力。 1. 创建新的 Android 项目 首先&#x…...
Dynamics 365 实体快速创建功能启用
这里我会先用例子讲快速创建,包含了字段创建等内容。希望直接了解配置过程的,可以根据目目录跳转查看。 1 例子 我们这里创建了两个实体,学生和选择的科目。它们的关系是一个学生可以选择多个科目,即学生和科目选择是一对多关系。所以我们在选择的科目中创建了一个学生的…...
Mybatis三剑客(一)在springboot中自动生成Mybatis【generator】
1、pom.xml中新增plugin <plugin><groupId>org.mybatis.generator</groupId><artifactId>mybatis-generator-maven-plugin</artifactId><version>1.3.7</version><configuration><overwrite>true</overwrite><…...
【LeetCode 热题 100】图论 专题(bfs,拓扑排序,Trie树 字典树)
from: https://leetcode.cn/studyplan/top-100-liked/ bfs 具有 边权为1 的最短路性质 拓扑排序,入度 Trie树, 高效存储 字符串【见鬼,不知道为什么写错,需要掌握熟练度】 文章目录 200. 岛屿数量【dfs / bfs】994. 腐…...
Jmeter压测实战:Jmeter二次开发之自定义函数
目录 1 前言 2 开发准备 3 自定义函数核心实现 3.1 新建项目 3.2 继承实现AbstractFunction类 3.3 最终项目结构 4 Jmeter加载扩展包 4.1 maven构建配置 4.2 项目打包 4.3 Jmeter加载扩展包 5 自定义函数调用调试 5.1 打开Jmeter函数助手,选择自定义函数…...
在python中使用nvidia的VPF库对RTSP流进行硬解码并使用opencv进行显示
解码并处理视频流的多线程应用 随着视频处理技术的不断发展,越来越多的应用需要对视频流进行解码和处理。在本文中,我们将介绍一个基于Python的多线程应用程序,该应用程序可以解码并处理多个RTSP视频流,同时利用GPU加速࿰…...
C++中using namespace std的作用记录
using namespace std;这句代码的作用是引入std命名空间,使得程序可以直接使用std命名空间下的标识符,而不需要加上std::前缀。 在C中,标识符被组织在不同的命名空间中,以避免命名冲突。最常见的命名空间是std,它包含了C标准库中的所有标识符,如cout、vector、string等。 默认…...
【TX 企业微信私有化历史版本 API 信息泄露】
目录 影响版本 复现过程 修复方式 影响版本 影响私有化部署: toB toG版微信 2.5.x 版本 2.6.930000 版本以下 危险程度:高危。攻击者可以进行获取企业的部门信息,员工信息,如权限较高包括应用获取,记录文件等等均…...
腾讯云轻量应用服务器镜像应用模板清单大全
腾讯云轻量应用服务器支持多种应用模板镜像,Windows和Linux镜像模板都有,如:宝塔Linux面板腾讯云专享版、WordPress、WooCommerce、LAMP、Node.js、Docker CE、K3s、宝塔Windows面板和ASP.NET等应用模板镜像,腾讯云服务器网分享腾…...
C语言链表操作
目录 链表基本操作 删除重复元素 查找倒数第N个节点 查找中间节点 约瑟夫环 循环链表 合并有序链表 逆置链表 逆置链表(双向链表) 链表基本操作 //linklist.c#include "linklist.h" #include <stdlib.h>struct node *head NULL; struct node *tail…...
聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
聊一聊接口测试的意义有哪些?
目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开,首…...
算法笔记2
1.字符串拼接最好用StringBuilder,不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...
PHP 8.5 即将发布:管道操作符、强力调试
前不久,PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5!作为 PHP 语言的又一次重要迭代,PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是,借助强大的本地开发环境 ServBay&am…...
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分: 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...
redis和redission的区别
Redis 和 Redisson 是两个密切相关但又本质不同的技术,它们扮演着完全不同的角色: Redis: 内存数据库/数据结构存储 本质: 它是一个开源的、高性能的、基于内存的 键值存储数据库。它也可以将数据持久化到磁盘。 核心功能: 提供丰…...
数据库正常,但后端收不到数据原因及解决
从代码和日志来看,后端SQL查询确实返回了数据,但最终user对象却为null。这表明查询结果没有正确映射到User对象上。 在前后端分离,并且ai辅助开发的时候,很容易出现前后端变量名不一致情况,还不报错,只是单…...
小智AI+MCP
什么是小智AI和MCP 如果还不清楚的先看往期文章 手搓小智AI聊天机器人 MCP 深度解析:AI 的USB接口 如何使用小智MCP 1.刷支持mcp的小智固件 2.下载官方MCP的示例代码 Github:https://github.com/78/mcp-calculator 安这个步骤执行 其中MCP_ENDPOI…...
