RocketMQ mqadmin java springboot python 调用笔记
命令
mqadmin命令列表
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:updateTopic Update or create topicdeleteTopic Delete topic from broker and NameServer.updateSubGroup Update or create subscription groupsetConsumeMode Set consume message mode. pull/pop etc.deleteSubGroup Delete subscription group from broker.updateBrokerConfig Update broker's configupdateTopicPerm Update topic permtopicRoute Examine topic route infotopicStatus Examine topic Status infotopicClusterList Get cluster info for topicaddBroker Add a broker to specified containerremoveBroker Remove a broker from specified containerresetMasterFlushOffset Reset master flush offset in slavebrokerStatus Fetch broker runtime status dataqueryMsgById Query Message by IdqueryMsgByKey Query Message by KeyqueryMsgByUniqueKey Query Message by Unique keyqueryMsgByOffset Query Message by offsetqueryMsgTraceById Query a message traceprintMsg Print Message DetailprintMsgByQueue Print Message DetailsendMsgStatus Send msg to broker.brokerConsumeStats Fetch broker consume stats dataproducerConnection Query producer's socket connection and client versionconsumerConnection Query consumer's socket connection, client version and subscriptionconsumerProgress Query consumers's progress, speedconsumerStatus Query consumer's internal data structurecloneGroupOffset Clone offset from other group.producer Query producer's instances, connection, status, etc.clusterList List cluster infostopicList Fetch all topic list from name serverupdateKvConfig Create or update KV config.deleteKvConfig Delete KV config.wipeWritePerm Wipe write perm of broker in all name server you defined in the -n paramaddWritePerm Add write perm of broker in all name server you defined in the -n paramresetOffsetByTime Reset consumer offset by timestamp(without client restart).skipAccumulatedMessage Skip all messages that are accumulated (not consumed) currentlyupdateOrderConf Create or update or delete order confcleanExpiredCQ Clean expired ConsumeQueue on broker.deleteExpiredCommitLog Delete expired CommitLog filescleanUnusedTopic Clean unused topic on broker.startMonitoring Start MonitoringstatsAll Topic and Consumer tps statsallocateMQ Allocate MQcheckMsgSendRT Check message send response timeclusterRT List All clusters Message Send RTgetNamesrvConfig Get configs of name server.updateNamesrvConfig Update configs of name server.getBrokerConfig Get broker config by cluster or special brokergetConsumerConfig Get consumer config by subscription group namequeryCq Query cq command.sendMessage Send a messageconsumeMessage Consume messageupdateAclConfig Update acl config yaml file in brokerdeleteAclConfig Delete Acl Config Account in brokerclusterAclConfigVersion List all of acl config version information in clusterupdateGlobalWhiteAddr Update global white address for acl Config File in brokergetAclConfig List all of acl config information in clusterupdateStaticTopic Update or create static topic, which has fixed number of queuesremappingStaticTopic Update or create static topic, which has fixed number of queuesexportMetadata Export metadataexportConfigs Export configsexportMetrics Export metricshaStatus Fetch ha runtime status datagetSyncStateSet Fetch syncStateSet for target brokersgetBrokerEpoch Fetch broker epoch entriesgetControllerMetaData Get controller cluster's metadatagetControllerConfig Get controller config.updateControllerConfig Update controller config.electMaster Re-elect the specified broker as mastercleanBrokerMetadata Clean metadata of broker on controllerdumpCompactionLog parse compaction log to messagegetColdDataFlowCtrInfo get cold data flow ctr infoupdateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group configremoveColdDataFlowCtrGroupConfig remove consumer from cold ctr configsetCommitLogReadAheadMode set read ahead mode for all commitlog files
topicList
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer
statsAll
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll -n localhost:9876
#Topic #Consumer Group #Accumulation #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
stringRequestTopic stringRequestConsumer 1 0.00 0.00 0 0
TRANS_CHECK_MAX_TIME_TOPIC 0 0.00 0 NO_CONSUMER
BenchmarkTest 0 0.00 0 NO_CONSUMER
string-topic string_consumer 106 0.00 0.00 0 0
string-topic string_consumer_newns 63 0.00 0.00 0 0
TBW102 0 0.00 0 NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster 0 0.00 0 NO_CONSUMER
SELF_TEST_TOPIC 0 0.00 0 NO_CONSUMER
SCHEDULE_TOPIC_XXXX 0 0.00 0 NO_CONSUMER
DefaultCluster_REPLY_TOPIC 0 0.00 0 NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
TopicTest please_rename_unique_group_name 252 0.00 0.00 0 0
TopicTest please_rename_unique_group_name_4 0 0.00 0.00 0 0
localhost.localdomain 0 0.00 0 NO_CONSUMER
order-paid-topic order-paid-consumer 1 0.00 0.00 0 0
user-topic user_consumer 2 0.00 0.00 0 0
message-ext-topic rocketmq-consume-demo-message-ext-consumer 2 0.00 0.00 0 0
OFFSET_MOVED_EVENT 0 0.00 0 NO_CONSUMER
yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
DefaultCluster 0 0.00 0 NO_CONSUMER
spring-transaction-topic string_trans_consumer 15 0.00 0.00 0 0
bytesRequestTopic bytesRequestConsumer 0 0.00 0.00 0 0
topicStatus
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 35 2023-08-25 16:21:35,786
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
Python 生产者:producer.py
from rocketmq.client import Producer, MessagegroupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 启动生产者
producer.start()# 组装消息 topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()
运行
yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$
mqadmin查询topic状态
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 36 2023-08-28 09:03:35,722
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute -n localhost:9876 -t string-topic
{"brokerDatas":[{"brokerAddrs":{0:"10.47.76.67:10911"},"brokerName":"yeqiang-MS-7B23","cluster":"DefaultCluster","enableActingMaster":false}],"filterServerTable":{},"queueDatas":[{"brokerName":"yeqiang-MS-7B23","perm":6,"readQueueNums":8,"topicSysFlag":0,"writeQueueNums":8}]
}
图形工具rocketmq-dashborad
https://github.com/apache/rocketmq-dashboard
自行编译
mvn clean package -Dmaven.test.skip=true
启动
java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar


(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36


consoumer.py
import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"def callback(msg):# 模拟业务print('Received message. messageId: ', msg.id, ' body: ', msg.body)# 消费成功回复CONSUME_SUCCESSreturn ConsumeStatus.CONSUME_SUCCESS# 消费成功回复消息状态# return ConsumeStatus.RECONSUME_LATER# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()while True:time.sleep(3600)
# 资源释放
consumer.shutdown()
启动python消费者
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py[Consumer] Waiting for messages.
Received message. messageId: 7F0001012DF4226307248D16C3250000 body: b'This is a new message.'

可以看到my-group1已被消费
再启动一个consumer.py,产生一次消息

可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。
Java生产一个消息:
training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com


python rocketmq依赖
Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub
python完整程序
python-rocketmq-demo: python3 rocketmq5 的一个例子
相关文章:
RocketMQ mqadmin java springboot python 调用笔记
命令 mqadmin命令列表 yeqiangyeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin The most commonly used mqadmin commands are:updateTopic Update or create topicdeleteTopic Delete topic from broker and NameServer.…...
Java aspose 将HTML导出成Excel文件
1.需求 有一批表格的html文件,需要将这些表格导出成excel文件 2.代码 使用第三方库 aspose ByteArrayInputStream htmlIs new ByteArrayInputStream(htmlBuilder.toString().getBytes()); // 将html字符串构建成输入流 LoadOptions lo new LoadOptions(LoadFo…...
原生微信小程序 动态(横向,纵向)公告(广告)栏
先看一下动态效果 Y轴滚动公告的原理是swiper组件在页面中的Y轴滚动,属性vertical,其余属性也设置一下autoplay circular interval"3000" X轴滚动的原理是,利用动画效果,将内容从右往左过渡过去 wxml: &l…...
pandas和polars简单的对比分析
pandas pandas是基于python写的,底层的数据结构是Numpy数据(ndarray)。pandas自身有两个核心的数据结构:DataFrame和Series,前者是二维的表格数据结构,后者是一维标签化数组。 polars polars是用Rust(一种系统级编程…...
Feign远程调用的使用
假设已配好nacos服务:并且已配好userservice、orderservice,点击跳转 Feign是一个声明式的http客户端,官方地址:https://github.com/OpenFeign/feign,其作用就是在程序中帮助我们优雅的实现http请求的发送,…...
Postman API测试之道:不止于点击,更在于策略
引言:API测试的重要性 在当今的软件开发中,API已经成为了一个不可或缺的部分。它们是软件组件之间交互的桥梁,确保数据的流动和功能的实现。因此,对API的测试显得尤为重要,它不仅关乎功能的正确性,还涉及到…...
5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT
导读:原文《5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT》(获取来源见文尾),本文精选其中精华及架构部分,逻辑清晰、内容完整,为快速形成售前方案提供参考。以下是部分内容, 喜…...
Golang Gorm 一对多的添加
一对多的添加有两种情况: 一种是添加用户的时候同时创建文章其次是创建文章关联已经存在的用户。 package mainimport ("gorm.io/driver/mysql""gorm.io/gorm" )// User 用户表 一个用户拥有多篇文章 type User struct {ID int64Name …...
图像扭曲之锯齿
源码: void wave_sawtooth(cv::Mat& src,cv::Mat& dst,double amplitude,double wavelength) {dst.create(src.rows, src.cols, CV_8UC3);dst.setTo(0);double xAmplitude amplitude;double yAmplitude amplitude;int xWavelength wavelength;int yWave…...
【分布式技术专题】「OSS中间件系列」Minio的文件服务的存储模型及整合Java客户端访问的实战指南
Minio的元数据 数据存储 MinIO对象存储系统没有元数据数据库,所有的操作都是对象级别的粒度的,这种做法的优势是: 个别对象的失效,不会溢出为更大级别的系统失效。便于实现"强一致性"这个特性。此特性对于机器学习与大数据处理非…...
构建个人博客_Obsidian_github.io_hexo
1 初衷 很早就开始分享文档,以技术类的为主,一开始是 MSN,博客,随着平台的更替,后来又用了 CSDN,知乎,简书…… 再后来是 Obsidian,飞书,Notion,常常有以下困…...
烟花厂人员作业释放静电行为检测算法
烟花厂人员作业释放静电行为检测算法通过pythonyolo系列算法模型框架,烟花厂人员作业释放静电行为检测算法在工厂车间入口处能够及时捕捉到人员是否触摸静电释放仪。一旦检测到人员进入时没有触摸静电释放仪,系统将自动触发告警。Python是一种由Guido va…...
ARTS挑战第二周-T:PHP数组相关操作
array_combine() 函数 合并两个数组 array_combine()传入2个参数,使用方法如下 array_combine(array $keys, array $values): array 返回一个 array,用来自 keys 数组的值作为键名,来自 values 数组的值作为相应的值。 array_key_exists() 函…...
【如何对公司网络进行限速?一个案例详解】
有不少朋友问到了关于企业网络QoS配置,这个确实在实际网络应用中非常多,基本上大部分企业或个人都用到这个功能,本期我们详细了解下QoS如何对宽带进行限制,QoS如何企业中应用。 一、什么是QoS? Qos是用来解决网络延迟和阻塞等问…...
服务器安全-修改默认ssh端口
防火墙先打开指定端口,要不修改后连不上(端口需要在65535之内) firewall-cmd --list-ports firewall-cmd --add-port54111/tcp --permanent firewall-cmd --reload-------------------- 先让两个端口同时存在,等配置成功后关闭22端口 vim /etc/ssh/sshd_config重启sshd service…...
保护隐私的第一步:从更新浏览器开始
当今社会已经进入了数字化和网络化的时代,而网络安全问题也日益突显。随着互联网在我们生活中的不断渗透,网络威胁变得愈发普遍和隐蔽。在这样的背景下,网络浏览器作为人们访问互联网的主要工具之一,不仅为我们提供了便捷的上网方…...
Python爬虫框架之快速抓取互联网数据详解
概要 Python爬虫框架是一个能够帮助我们快速抓取互联网数据的工具。在互联网时代,信息爆炸式增长,人们越来越需要一种快速获取信息的方式。而Python爬虫框架就能够帮助我们完成这个任务,它可以帮助我们快速地从互联网上抓取各种数据…...
【算法专题突破】双指针 - 盛最多水的容器(4)
目录 1. 题目解析 2. 算法原理 3. 代码编写 写在最后: 1. 题目解析 题目链接:11. 盛最多水的容器 - 力扣(Leetcode) 这道题目也不难理解, 两边的柱子的盛水量是根据短的那边的柱子决定的, 而盛水量…...
循环神经网络(RNN) | 项目还不成熟 |还在初级阶段
一,定义 循环神经网络(Recurrent Neural Network,RNN)是一种深度学习神经网络架构,专门设计用于处理序列数据,如时间序列数据、自然语言文本等(一般用来解决序列问题)。 因为它们具…...
【Spring Boot】数据库持久层框架MyBatis — MyBatis简介
MyBatis简介 本节首先会介绍什么是ORM、什么是MyBatis、MyBatis的特点以及核心概念,最后介绍MyBatis是如何启动、如何加载配置文件的? 1.什么是ORM ORM(Object Relational Mapping,对象关系映射)是为了解决面向对象…...
QMCFLAC2MP3终极指南:免费快速解锁QQ音乐格式限制
QMCFLAC2MP3终极指南:免费快速解锁QQ音乐格式限制 【免费下载链接】qmcflac2mp3 直接将qmcflac文件转换成mp3文件,突破QQ音乐的格式限制 项目地址: https://gitcode.com/gh_mirrors/qm/qmcflac2mp3 你是否曾经在QQ音乐下载了心爱的歌曲࿰…...
告别黑盒:5分钟为你的自定义CNN模型集成Grad-CAM可视化(附常见错误排查)
告别黑盒:5分钟为你的自定义CNN模型集成Grad-CAM可视化(附常见错误排查) 在深度学习项目中,我们常常陷入一个尴尬境地:模型准确率很高,但完全不知道它究竟"看"了图像的哪些部分做出决策。这种黑盒…...
C语言结构体、枚举、联合体:从内存布局看区别,新手避坑指南
C语言结构体、枚举、联合体:从内存布局看区别,新手避坑指南 在C语言开发中,结构体、枚举和联合体是构建复杂数据模型的三大基石。但很多开发者在实际项目中常遇到这样的困惑:为什么结构体占用的内存比预期大?枚举变量在…...
别再为嵌入式设备大内存发愁了!手把手教你用CMA(连续内存分配器)搞定Linux视频编解码缓冲区
嵌入式多媒体开发中的连续内存优化实战:CMA技术深度解析 在嵌入式多媒体开发领域,视频编解码、图像处理等任务对内存管理提出了严苛要求。当你在树莓派上部署视频监控系统,或在工业摄像头中实现实时H.264编码时,是否经常遇到这样的…...
基于MCP协议的AI Agent远程SSH安全操作实践指南
1. 项目概述与核心价值最近在折腾AI Agent的开发,发现一个挺有意思的现象:很多开发者都卡在了“如何让AI安全、可控地操作远程服务器”这一步。你可能会想到直接给AI一个SSH私钥,但这无异于把自家大门的钥匙扔给一个还在学习走路的机器人&…...
品牌声音技能化:从模糊概念到可执行AI内容策略
1. 项目概述:品牌声音的“技能化”构建最近在和一些做品牌营销、内容运营的朋友聊天,发现一个挺普遍的现象:大家手里都有一堆品牌手册、VI规范,但一到具体执行,比如写一篇公众号推文、拍一条短视频,或者回复…...
防火墙和手动启动都试了?ArcGIS License Server无响应,可能是这两个核心文件在捣鬼
ArcGIS许可服务故障深度解析:当核心文件成为隐形杀手 当你面对ArcGIS License Server无响应的红色报错框,已经尝试了关闭防火墙、调整服务配置、甚至重启服务器等一系列标准操作后,那个令人沮丧的"cannot connect to license server sys…...
设计师速存!Midjourney未公开的风格隐藏开关:--style raw、--s 750、--no texture三者协同作用的神经渲染原理(GPU显存占用下降41%实测)
更多请点击: https://intelliparadigm.com 第一章:设计师速存!Midjourney未公开的风格隐藏开关:--style raw、--s 750、--no texture三者协同作用的神经渲染原理(GPU显存占用下降41%实测) Midjourney v6.1…...
Pandrator:基于Python的自动化内容生成与数据转换工具实践
1. 项目概述与核心价值最近在折腾一些自动化数据处理和内容生成的工作流,发现了一个挺有意思的开源项目,叫Pandrator。乍一看这个名字,可能会联想到“潘多拉”和“生成器”的结合,实际上它也确实是一个功能强大的内容转换与生成工…...
Go语言SDK开发实战:为AI编程助手Cursor构建高效API客户端
1. 项目概述:一个为AI编程助手Cursor定制的Go语言SDK如果你和我一样,日常重度依赖Cursor这类AI编程助手来提升开发效率,同时又是个Go语言的忠实拥趸,那你肯定遇到过这样的场景:想用Go写个脚本,自动化处理一…...
