RocketMQ mqadmin java springboot python 调用笔记
命令
mqadmin命令列表
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:updateTopic Update or create topicdeleteTopic Delete topic from broker and NameServer.updateSubGroup Update or create subscription groupsetConsumeMode Set consume message mode. pull/pop etc.deleteSubGroup Delete subscription group from broker.updateBrokerConfig Update broker's configupdateTopicPerm Update topic permtopicRoute Examine topic route infotopicStatus Examine topic Status infotopicClusterList Get cluster info for topicaddBroker Add a broker to specified containerremoveBroker Remove a broker from specified containerresetMasterFlushOffset Reset master flush offset in slavebrokerStatus Fetch broker runtime status dataqueryMsgById Query Message by IdqueryMsgByKey Query Message by KeyqueryMsgByUniqueKey Query Message by Unique keyqueryMsgByOffset Query Message by offsetqueryMsgTraceById Query a message traceprintMsg Print Message DetailprintMsgByQueue Print Message DetailsendMsgStatus Send msg to broker.brokerConsumeStats Fetch broker consume stats dataproducerConnection Query producer's socket connection and client versionconsumerConnection Query consumer's socket connection, client version and subscriptionconsumerProgress Query consumers's progress, speedconsumerStatus Query consumer's internal data structurecloneGroupOffset Clone offset from other group.producer Query producer's instances, connection, status, etc.clusterList List cluster infostopicList Fetch all topic list from name serverupdateKvConfig Create or update KV config.deleteKvConfig Delete KV config.wipeWritePerm Wipe write perm of broker in all name server you defined in the -n paramaddWritePerm Add write perm of broker in all name server you defined in the -n paramresetOffsetByTime Reset consumer offset by timestamp(without client restart).skipAccumulatedMessage Skip all messages that are accumulated (not consumed) currentlyupdateOrderConf Create or update or delete order confcleanExpiredCQ Clean expired ConsumeQueue on broker.deleteExpiredCommitLog Delete expired CommitLog filescleanUnusedTopic Clean unused topic on broker.startMonitoring Start MonitoringstatsAll Topic and Consumer tps statsallocateMQ Allocate MQcheckMsgSendRT Check message send response timeclusterRT List All clusters Message Send RTgetNamesrvConfig Get configs of name server.updateNamesrvConfig Update configs of name server.getBrokerConfig Get broker config by cluster or special brokergetConsumerConfig Get consumer config by subscription group namequeryCq Query cq command.sendMessage Send a messageconsumeMessage Consume messageupdateAclConfig Update acl config yaml file in brokerdeleteAclConfig Delete Acl Config Account in brokerclusterAclConfigVersion List all of acl config version information in clusterupdateGlobalWhiteAddr Update global white address for acl Config File in brokergetAclConfig List all of acl config information in clusterupdateStaticTopic Update or create static topic, which has fixed number of queuesremappingStaticTopic Update or create static topic, which has fixed number of queuesexportMetadata Export metadataexportConfigs Export configsexportMetrics Export metricshaStatus Fetch ha runtime status datagetSyncStateSet Fetch syncStateSet for target brokersgetBrokerEpoch Fetch broker epoch entriesgetControllerMetaData Get controller cluster's metadatagetControllerConfig Get controller config.updateControllerConfig Update controller config.electMaster Re-elect the specified broker as mastercleanBrokerMetadata Clean metadata of broker on controllerdumpCompactionLog parse compaction log to messagegetColdDataFlowCtrInfo get cold data flow ctr infoupdateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group configremoveColdDataFlowCtrGroupConfig remove consumer from cold ctr configsetCommitLogReadAheadMode set read ahead mode for all commitlog files
topicList
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer
statsAll
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll -n localhost:9876
#Topic #Consumer Group #Accumulation #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
stringRequestTopic stringRequestConsumer 1 0.00 0.00 0 0
TRANS_CHECK_MAX_TIME_TOPIC 0 0.00 0 NO_CONSUMER
BenchmarkTest 0 0.00 0 NO_CONSUMER
string-topic string_consumer 106 0.00 0.00 0 0
string-topic string_consumer_newns 63 0.00 0.00 0 0
TBW102 0 0.00 0 NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster 0 0.00 0 NO_CONSUMER
SELF_TEST_TOPIC 0 0.00 0 NO_CONSUMER
SCHEDULE_TOPIC_XXXX 0 0.00 0 NO_CONSUMER
DefaultCluster_REPLY_TOPIC 0 0.00 0 NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
TopicTest please_rename_unique_group_name 252 0.00 0.00 0 0
TopicTest please_rename_unique_group_name_4 0 0.00 0.00 0 0
localhost.localdomain 0 0.00 0 NO_CONSUMER
order-paid-topic order-paid-consumer 1 0.00 0.00 0 0
user-topic user_consumer 2 0.00 0.00 0 0
message-ext-topic rocketmq-consume-demo-message-ext-consumer 2 0.00 0.00 0 0
OFFSET_MOVED_EVENT 0 0.00 0 NO_CONSUMER
yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
DefaultCluster 0 0.00 0 NO_CONSUMER
spring-transaction-topic string_trans_consumer 15 0.00 0.00 0 0
bytesRequestTopic bytesRequestConsumer 0 0.00 0.00 0 0
topicStatus
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 35 2023-08-25 16:21:35,786
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
Python 生产者:producer.py
from rocketmq.client import Producer, MessagegroupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 启动生产者
producer.start()# 组装消息 topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()
运行
yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$
mqadmin查询topic状态
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 36 2023-08-28 09:03:35,722
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute -n localhost:9876 -t string-topic
{"brokerDatas":[{"brokerAddrs":{0:"10.47.76.67:10911"},"brokerName":"yeqiang-MS-7B23","cluster":"DefaultCluster","enableActingMaster":false}],"filterServerTable":{},"queueDatas":[{"brokerName":"yeqiang-MS-7B23","perm":6,"readQueueNums":8,"topicSysFlag":0,"writeQueueNums":8}]
}
图形工具rocketmq-dashborad
https://github.com/apache/rocketmq-dashboard
自行编译
mvn clean package -Dmaven.test.skip=true
启动
java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar


(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36


consoumer.py
import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"def callback(msg):# 模拟业务print('Received message. messageId: ', msg.id, ' body: ', msg.body)# 消费成功回复CONSUME_SUCCESSreturn ConsumeStatus.CONSUME_SUCCESS# 消费成功回复消息状态# return ConsumeStatus.RECONSUME_LATER# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()while True:time.sleep(3600)
# 资源释放
consumer.shutdown()
启动python消费者
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py[Consumer] Waiting for messages.
Received message. messageId: 7F0001012DF4226307248D16C3250000 body: b'This is a new message.'

可以看到my-group1已被消费
再启动一个consumer.py,产生一次消息

可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。
Java生产一个消息:
training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com


python rocketmq依赖
Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub
python完整程序
python-rocketmq-demo: python3 rocketmq5 的一个例子
相关文章:
RocketMQ mqadmin java springboot python 调用笔记
命令 mqadmin命令列表 yeqiangyeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin The most commonly used mqadmin commands are:updateTopic Update or create topicdeleteTopic Delete topic from broker and NameServer.…...
Java aspose 将HTML导出成Excel文件
1.需求 有一批表格的html文件,需要将这些表格导出成excel文件 2.代码 使用第三方库 aspose ByteArrayInputStream htmlIs new ByteArrayInputStream(htmlBuilder.toString().getBytes()); // 将html字符串构建成输入流 LoadOptions lo new LoadOptions(LoadFo…...
原生微信小程序 动态(横向,纵向)公告(广告)栏
先看一下动态效果 Y轴滚动公告的原理是swiper组件在页面中的Y轴滚动,属性vertical,其余属性也设置一下autoplay circular interval"3000" X轴滚动的原理是,利用动画效果,将内容从右往左过渡过去 wxml: &l…...
pandas和polars简单的对比分析
pandas pandas是基于python写的,底层的数据结构是Numpy数据(ndarray)。pandas自身有两个核心的数据结构:DataFrame和Series,前者是二维的表格数据结构,后者是一维标签化数组。 polars polars是用Rust(一种系统级编程…...
Feign远程调用的使用
假设已配好nacos服务:并且已配好userservice、orderservice,点击跳转 Feign是一个声明式的http客户端,官方地址:https://github.com/OpenFeign/feign,其作用就是在程序中帮助我们优雅的实现http请求的发送,…...
Postman API测试之道:不止于点击,更在于策略
引言:API测试的重要性 在当今的软件开发中,API已经成为了一个不可或缺的部分。它们是软件组件之间交互的桥梁,确保数据的流动和功能的实现。因此,对API的测试显得尤为重要,它不仅关乎功能的正确性,还涉及到…...
5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT
导读:原文《5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT》(获取来源见文尾),本文精选其中精华及架构部分,逻辑清晰、内容完整,为快速形成售前方案提供参考。以下是部分内容, 喜…...
Golang Gorm 一对多的添加
一对多的添加有两种情况: 一种是添加用户的时候同时创建文章其次是创建文章关联已经存在的用户。 package mainimport ("gorm.io/driver/mysql""gorm.io/gorm" )// User 用户表 一个用户拥有多篇文章 type User struct {ID int64Name …...
图像扭曲之锯齿
源码: void wave_sawtooth(cv::Mat& src,cv::Mat& dst,double amplitude,double wavelength) {dst.create(src.rows, src.cols, CV_8UC3);dst.setTo(0);double xAmplitude amplitude;double yAmplitude amplitude;int xWavelength wavelength;int yWave…...
【分布式技术专题】「OSS中间件系列」Minio的文件服务的存储模型及整合Java客户端访问的实战指南
Minio的元数据 数据存储 MinIO对象存储系统没有元数据数据库,所有的操作都是对象级别的粒度的,这种做法的优势是: 个别对象的失效,不会溢出为更大级别的系统失效。便于实现"强一致性"这个特性。此特性对于机器学习与大数据处理非…...
构建个人博客_Obsidian_github.io_hexo
1 初衷 很早就开始分享文档,以技术类的为主,一开始是 MSN,博客,随着平台的更替,后来又用了 CSDN,知乎,简书…… 再后来是 Obsidian,飞书,Notion,常常有以下困…...
烟花厂人员作业释放静电行为检测算法
烟花厂人员作业释放静电行为检测算法通过pythonyolo系列算法模型框架,烟花厂人员作业释放静电行为检测算法在工厂车间入口处能够及时捕捉到人员是否触摸静电释放仪。一旦检测到人员进入时没有触摸静电释放仪,系统将自动触发告警。Python是一种由Guido va…...
ARTS挑战第二周-T:PHP数组相关操作
array_combine() 函数 合并两个数组 array_combine()传入2个参数,使用方法如下 array_combine(array $keys, array $values): array 返回一个 array,用来自 keys 数组的值作为键名,来自 values 数组的值作为相应的值。 array_key_exists() 函…...
【如何对公司网络进行限速?一个案例详解】
有不少朋友问到了关于企业网络QoS配置,这个确实在实际网络应用中非常多,基本上大部分企业或个人都用到这个功能,本期我们详细了解下QoS如何对宽带进行限制,QoS如何企业中应用。 一、什么是QoS? Qos是用来解决网络延迟和阻塞等问…...
服务器安全-修改默认ssh端口
防火墙先打开指定端口,要不修改后连不上(端口需要在65535之内) firewall-cmd --list-ports firewall-cmd --add-port54111/tcp --permanent firewall-cmd --reload-------------------- 先让两个端口同时存在,等配置成功后关闭22端口 vim /etc/ssh/sshd_config重启sshd service…...
保护隐私的第一步:从更新浏览器开始
当今社会已经进入了数字化和网络化的时代,而网络安全问题也日益突显。随着互联网在我们生活中的不断渗透,网络威胁变得愈发普遍和隐蔽。在这样的背景下,网络浏览器作为人们访问互联网的主要工具之一,不仅为我们提供了便捷的上网方…...
Python爬虫框架之快速抓取互联网数据详解
概要 Python爬虫框架是一个能够帮助我们快速抓取互联网数据的工具。在互联网时代,信息爆炸式增长,人们越来越需要一种快速获取信息的方式。而Python爬虫框架就能够帮助我们完成这个任务,它可以帮助我们快速地从互联网上抓取各种数据…...
【算法专题突破】双指针 - 盛最多水的容器(4)
目录 1. 题目解析 2. 算法原理 3. 代码编写 写在最后: 1. 题目解析 题目链接:11. 盛最多水的容器 - 力扣(Leetcode) 这道题目也不难理解, 两边的柱子的盛水量是根据短的那边的柱子决定的, 而盛水量…...
循环神经网络(RNN) | 项目还不成熟 |还在初级阶段
一,定义 循环神经网络(Recurrent Neural Network,RNN)是一种深度学习神经网络架构,专门设计用于处理序列数据,如时间序列数据、自然语言文本等(一般用来解决序列问题)。 因为它们具…...
【Spring Boot】数据库持久层框架MyBatis — MyBatis简介
MyBatis简介 本节首先会介绍什么是ORM、什么是MyBatis、MyBatis的特点以及核心概念,最后介绍MyBatis是如何启动、如何加载配置文件的? 1.什么是ORM ORM(Object Relational Mapping,对象关系映射)是为了解决面向对象…...
第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
MMaDA: Multimodal Large Diffusion Language Models
CODE : https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA,它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构…...
相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
三分算法与DeepSeek辅助证明是单峰函数
前置 单峰函数有唯一的最大值,最大值左侧的数值严格单调递增,最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值,最小值左侧的数值严格单调递减,最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...
