RocketMQ mqadmin java springboot python 调用笔记
命令
mqadmin命令列表
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:updateTopic Update or create topicdeleteTopic Delete topic from broker and NameServer.updateSubGroup Update or create subscription groupsetConsumeMode Set consume message mode. pull/pop etc.deleteSubGroup Delete subscription group from broker.updateBrokerConfig Update broker's configupdateTopicPerm Update topic permtopicRoute Examine topic route infotopicStatus Examine topic Status infotopicClusterList Get cluster info for topicaddBroker Add a broker to specified containerremoveBroker Remove a broker from specified containerresetMasterFlushOffset Reset master flush offset in slavebrokerStatus Fetch broker runtime status dataqueryMsgById Query Message by IdqueryMsgByKey Query Message by KeyqueryMsgByUniqueKey Query Message by Unique keyqueryMsgByOffset Query Message by offsetqueryMsgTraceById Query a message traceprintMsg Print Message DetailprintMsgByQueue Print Message DetailsendMsgStatus Send msg to broker.brokerConsumeStats Fetch broker consume stats dataproducerConnection Query producer's socket connection and client versionconsumerConnection Query consumer's socket connection, client version and subscriptionconsumerProgress Query consumers's progress, speedconsumerStatus Query consumer's internal data structurecloneGroupOffset Clone offset from other group.producer Query producer's instances, connection, status, etc.clusterList List cluster infostopicList Fetch all topic list from name serverupdateKvConfig Create or update KV config.deleteKvConfig Delete KV config.wipeWritePerm Wipe write perm of broker in all name server you defined in the -n paramaddWritePerm Add write perm of broker in all name server you defined in the -n paramresetOffsetByTime Reset consumer offset by timestamp(without client restart).skipAccumulatedMessage Skip all messages that are accumulated (not consumed) currentlyupdateOrderConf Create or update or delete order confcleanExpiredCQ Clean expired ConsumeQueue on broker.deleteExpiredCommitLog Delete expired CommitLog filescleanUnusedTopic Clean unused topic on broker.startMonitoring Start MonitoringstatsAll Topic and Consumer tps statsallocateMQ Allocate MQcheckMsgSendRT Check message send response timeclusterRT List All clusters Message Send RTgetNamesrvConfig Get configs of name server.updateNamesrvConfig Update configs of name server.getBrokerConfig Get broker config by cluster or special brokergetConsumerConfig Get consumer config by subscription group namequeryCq Query cq command.sendMessage Send a messageconsumeMessage Consume messageupdateAclConfig Update acl config yaml file in brokerdeleteAclConfig Delete Acl Config Account in brokerclusterAclConfigVersion List all of acl config version information in clusterupdateGlobalWhiteAddr Update global white address for acl Config File in brokergetAclConfig List all of acl config information in clusterupdateStaticTopic Update or create static topic, which has fixed number of queuesremappingStaticTopic Update or create static topic, which has fixed number of queuesexportMetadata Export metadataexportConfigs Export configsexportMetrics Export metricshaStatus Fetch ha runtime status datagetSyncStateSet Fetch syncStateSet for target brokersgetBrokerEpoch Fetch broker epoch entriesgetControllerMetaData Get controller cluster's metadatagetControllerConfig Get controller config.updateControllerConfig Update controller config.electMaster Re-elect the specified broker as mastercleanBrokerMetadata Clean metadata of broker on controllerdumpCompactionLog parse compaction log to messagegetColdDataFlowCtrInfo get cold data flow ctr infoupdateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group configremoveColdDataFlowCtrGroupConfig remove consumer from cold ctr configsetCommitLogReadAheadMode set read ahead mode for all commitlog files
topicList
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer
statsAll
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll -n localhost:9876
#Topic #Consumer Group #Accumulation #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
stringRequestTopic stringRequestConsumer 1 0.00 0.00 0 0
TRANS_CHECK_MAX_TIME_TOPIC 0 0.00 0 NO_CONSUMER
BenchmarkTest 0 0.00 0 NO_CONSUMER
string-topic string_consumer 106 0.00 0.00 0 0
string-topic string_consumer_newns 63 0.00 0.00 0 0
TBW102 0 0.00 0 NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster 0 0.00 0 NO_CONSUMER
SELF_TEST_TOPIC 0 0.00 0 NO_CONSUMER
SCHEDULE_TOPIC_XXXX 0 0.00 0 NO_CONSUMER
DefaultCluster_REPLY_TOPIC 0 0.00 0 NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
TopicTest please_rename_unique_group_name 252 0.00 0.00 0 0
TopicTest please_rename_unique_group_name_4 0 0.00 0.00 0 0
localhost.localdomain 0 0.00 0 NO_CONSUMER
order-paid-topic order-paid-consumer 1 0.00 0.00 0 0
user-topic user_consumer 2 0.00 0.00 0 0
message-ext-topic rocketmq-consume-demo-message-ext-consumer 2 0.00 0.00 0 0
OFFSET_MOVED_EVENT 0 0.00 0 NO_CONSUMER
yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
DefaultCluster 0 0.00 0 NO_CONSUMER
spring-transaction-topic string_trans_consumer 15 0.00 0.00 0 0
bytesRequestTopic bytesRequestConsumer 0 0.00 0.00 0 0
topicStatus
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 35 2023-08-25 16:21:35,786
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
Python 生产者:producer.py
from rocketmq.client import Producer, MessagegroupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 启动生产者
producer.start()# 组装消息 topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()
运行
yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$
mqadmin查询topic状态
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 36 2023-08-28 09:03:35,722
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute -n localhost:9876 -t string-topic
{"brokerDatas":[{"brokerAddrs":{0:"10.47.76.67:10911"},"brokerName":"yeqiang-MS-7B23","cluster":"DefaultCluster","enableActingMaster":false}],"filterServerTable":{},"queueDatas":[{"brokerName":"yeqiang-MS-7B23","perm":6,"readQueueNums":8,"topicSysFlag":0,"writeQueueNums":8}]
}
图形工具rocketmq-dashborad
https://github.com/apache/rocketmq-dashboard
自行编译
mvn clean package -Dmaven.test.skip=true
启动
java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36
consoumer.py
import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"def callback(msg):# 模拟业务print('Received message. messageId: ', msg.id, ' body: ', msg.body)# 消费成功回复CONSUME_SUCCESSreturn ConsumeStatus.CONSUME_SUCCESS# 消费成功回复消息状态# return ConsumeStatus.RECONSUME_LATER# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()while True:time.sleep(3600)
# 资源释放
consumer.shutdown()
启动python消费者
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py[Consumer] Waiting for messages.
Received message. messageId: 7F0001012DF4226307248D16C3250000 body: b'This is a new message.'
可以看到my-group1已被消费
再启动一个consumer.py,产生一次消息
可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。
Java生产一个消息:
training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com
python rocketmq依赖
Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub
python完整程序
python-rocketmq-demo: python3 rocketmq5 的一个例子
相关文章:

RocketMQ mqadmin java springboot python 调用笔记
命令 mqadmin命令列表 yeqiangyeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin The most commonly used mqadmin commands are:updateTopic Update or create topicdeleteTopic Delete topic from broker and NameServer.…...
Java aspose 将HTML导出成Excel文件
1.需求 有一批表格的html文件,需要将这些表格导出成excel文件 2.代码 使用第三方库 aspose ByteArrayInputStream htmlIs new ByteArrayInputStream(htmlBuilder.toString().getBytes()); // 将html字符串构建成输入流 LoadOptions lo new LoadOptions(LoadFo…...

原生微信小程序 动态(横向,纵向)公告(广告)栏
先看一下动态效果 Y轴滚动公告的原理是swiper组件在页面中的Y轴滚动,属性vertical,其余属性也设置一下autoplay circular interval"3000" X轴滚动的原理是,利用动画效果,将内容从右往左过渡过去 wxml: &l…...
pandas和polars简单的对比分析
pandas pandas是基于python写的,底层的数据结构是Numpy数据(ndarray)。pandas自身有两个核心的数据结构:DataFrame和Series,前者是二维的表格数据结构,后者是一维标签化数组。 polars polars是用Rust(一种系统级编程…...
Feign远程调用的使用
假设已配好nacos服务:并且已配好userservice、orderservice,点击跳转 Feign是一个声明式的http客户端,官方地址:https://github.com/OpenFeign/feign,其作用就是在程序中帮助我们优雅的实现http请求的发送,…...

Postman API测试之道:不止于点击,更在于策略
引言:API测试的重要性 在当今的软件开发中,API已经成为了一个不可或缺的部分。它们是软件组件之间交互的桥梁,确保数据的流动和功能的实现。因此,对API的测试显得尤为重要,它不仅关乎功能的正确性,还涉及到…...

5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT
导读:原文《5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT》(获取来源见文尾),本文精选其中精华及架构部分,逻辑清晰、内容完整,为快速形成售前方案提供参考。以下是部分内容, 喜…...

Golang Gorm 一对多的添加
一对多的添加有两种情况: 一种是添加用户的时候同时创建文章其次是创建文章关联已经存在的用户。 package mainimport ("gorm.io/driver/mysql""gorm.io/gorm" )// User 用户表 一个用户拥有多篇文章 type User struct {ID int64Name …...

图像扭曲之锯齿
源码: void wave_sawtooth(cv::Mat& src,cv::Mat& dst,double amplitude,double wavelength) {dst.create(src.rows, src.cols, CV_8UC3);dst.setTo(0);double xAmplitude amplitude;double yAmplitude amplitude;int xWavelength wavelength;int yWave…...

【分布式技术专题】「OSS中间件系列」Minio的文件服务的存储模型及整合Java客户端访问的实战指南
Minio的元数据 数据存储 MinIO对象存储系统没有元数据数据库,所有的操作都是对象级别的粒度的,这种做法的优势是: 个别对象的失效,不会溢出为更大级别的系统失效。便于实现"强一致性"这个特性。此特性对于机器学习与大数据处理非…...

构建个人博客_Obsidian_github.io_hexo
1 初衷 很早就开始分享文档,以技术类的为主,一开始是 MSN,博客,随着平台的更替,后来又用了 CSDN,知乎,简书…… 再后来是 Obsidian,飞书,Notion,常常有以下困…...

烟花厂人员作业释放静电行为检测算法
烟花厂人员作业释放静电行为检测算法通过pythonyolo系列算法模型框架,烟花厂人员作业释放静电行为检测算法在工厂车间入口处能够及时捕捉到人员是否触摸静电释放仪。一旦检测到人员进入时没有触摸静电释放仪,系统将自动触发告警。Python是一种由Guido va…...
ARTS挑战第二周-T:PHP数组相关操作
array_combine() 函数 合并两个数组 array_combine()传入2个参数,使用方法如下 array_combine(array $keys, array $values): array 返回一个 array,用来自 keys 数组的值作为键名,来自 values 数组的值作为相应的值。 array_key_exists() 函…...

【如何对公司网络进行限速?一个案例详解】
有不少朋友问到了关于企业网络QoS配置,这个确实在实际网络应用中非常多,基本上大部分企业或个人都用到这个功能,本期我们详细了解下QoS如何对宽带进行限制,QoS如何企业中应用。 一、什么是QoS? Qos是用来解决网络延迟和阻塞等问…...

服务器安全-修改默认ssh端口
防火墙先打开指定端口,要不修改后连不上(端口需要在65535之内) firewall-cmd --list-ports firewall-cmd --add-port54111/tcp --permanent firewall-cmd --reload-------------------- 先让两个端口同时存在,等配置成功后关闭22端口 vim /etc/ssh/sshd_config重启sshd service…...

保护隐私的第一步:从更新浏览器开始
当今社会已经进入了数字化和网络化的时代,而网络安全问题也日益突显。随着互联网在我们生活中的不断渗透,网络威胁变得愈发普遍和隐蔽。在这样的背景下,网络浏览器作为人们访问互联网的主要工具之一,不仅为我们提供了便捷的上网方…...

Python爬虫框架之快速抓取互联网数据详解
概要 Python爬虫框架是一个能够帮助我们快速抓取互联网数据的工具。在互联网时代,信息爆炸式增长,人们越来越需要一种快速获取信息的方式。而Python爬虫框架就能够帮助我们完成这个任务,它可以帮助我们快速地从互联网上抓取各种数据…...

【算法专题突破】双指针 - 盛最多水的容器(4)
目录 1. 题目解析 2. 算法原理 3. 代码编写 写在最后: 1. 题目解析 题目链接:11. 盛最多水的容器 - 力扣(Leetcode) 这道题目也不难理解, 两边的柱子的盛水量是根据短的那边的柱子决定的, 而盛水量…...

循环神经网络(RNN) | 项目还不成熟 |还在初级阶段
一,定义 循环神经网络(Recurrent Neural Network,RNN)是一种深度学习神经网络架构,专门设计用于处理序列数据,如时间序列数据、自然语言文本等(一般用来解决序列问题)。 因为它们具…...

【Spring Boot】数据库持久层框架MyBatis — MyBatis简介
MyBatis简介 本节首先会介绍什么是ORM、什么是MyBatis、MyBatis的特点以及核心概念,最后介绍MyBatis是如何启动、如何加载配置文件的? 1.什么是ORM ORM(Object Relational Mapping,对象关系映射)是为了解决面向对象…...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...

使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...
【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error
在前端开发中,JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作(如 Promise、async/await 等),开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝(r…...

FFmpeg avformat_open_input函数分析
函数内部的总体流程如下: avformat_open_input 精简后的代码如下: int avformat_open_input(AVFormatContext **ps, const char *filename,ff_const59 AVInputFormat *fmt, AVDictionary **options) {AVFormatContext *s *ps;int i, ret 0;AVDictio…...

【Qt】控件 QWidget
控件 QWidget 一. 控件概述二. QWidget 的核心属性可用状态:enabled几何:geometrywindows frame 窗口框架的影响 窗口标题:windowTitle窗口图标:windowIconqrc 机制 窗口不透明度:windowOpacity光标:cursor…...

VSCode 没有添加Windows右键菜单
关键字:VSCode;Windows右键菜单;注册表。 文章目录 前言一、工程环境二、配置流程1.右键文件打开2.右键文件夹打开3.右键空白处打开文件夹 三、测试总结 前言 安装 VSCode 时没有注意,实际使用的时候发现 VSCode 在 Windows 菜单栏…...
Vue3学习(接口,泛型,自定义类型,v-for,props)
一,前言 继续学习 二,TS接口泛型自定义类型 1.接口 TypeScript 接口(Interface)是一种定义对象形状的强大工具,它可以描述对象必须包含的属性、方法和它们的类型。接口不会被编译成 JavaScript 代码,仅…...

实现p2p的webrtc-srs版本
1. 基本知识 1.1 webrtc 一、WebRTC的本质:实时通信的“网络协议栈”类比 将WebRTC类比为Linux网络协议栈极具洞察力,二者在架构设计和功能定位上高度相似: 分层协议栈架构 Linux网络协议栈:从底层物理层到应用层(如…...