RocketMQ mqadmin java springboot python 调用笔记
命令
mqadmin命令列表
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:updateTopic Update or create topicdeleteTopic Delete topic from broker and NameServer.updateSubGroup Update or create subscription groupsetConsumeMode Set consume message mode. pull/pop etc.deleteSubGroup Delete subscription group from broker.updateBrokerConfig Update broker's configupdateTopicPerm Update topic permtopicRoute Examine topic route infotopicStatus Examine topic Status infotopicClusterList Get cluster info for topicaddBroker Add a broker to specified containerremoveBroker Remove a broker from specified containerresetMasterFlushOffset Reset master flush offset in slavebrokerStatus Fetch broker runtime status dataqueryMsgById Query Message by IdqueryMsgByKey Query Message by KeyqueryMsgByUniqueKey Query Message by Unique keyqueryMsgByOffset Query Message by offsetqueryMsgTraceById Query a message traceprintMsg Print Message DetailprintMsgByQueue Print Message DetailsendMsgStatus Send msg to broker.brokerConsumeStats Fetch broker consume stats dataproducerConnection Query producer's socket connection and client versionconsumerConnection Query consumer's socket connection, client version and subscriptionconsumerProgress Query consumers's progress, speedconsumerStatus Query consumer's internal data structurecloneGroupOffset Clone offset from other group.producer Query producer's instances, connection, status, etc.clusterList List cluster infostopicList Fetch all topic list from name serverupdateKvConfig Create or update KV config.deleteKvConfig Delete KV config.wipeWritePerm Wipe write perm of broker in all name server you defined in the -n paramaddWritePerm Add write perm of broker in all name server you defined in the -n paramresetOffsetByTime Reset consumer offset by timestamp(without client restart).skipAccumulatedMessage Skip all messages that are accumulated (not consumed) currentlyupdateOrderConf Create or update or delete order confcleanExpiredCQ Clean expired ConsumeQueue on broker.deleteExpiredCommitLog Delete expired CommitLog filescleanUnusedTopic Clean unused topic on broker.startMonitoring Start MonitoringstatsAll Topic and Consumer tps statsallocateMQ Allocate MQcheckMsgSendRT Check message send response timeclusterRT List All clusters Message Send RTgetNamesrvConfig Get configs of name server.updateNamesrvConfig Update configs of name server.getBrokerConfig Get broker config by cluster or special brokergetConsumerConfig Get consumer config by subscription group namequeryCq Query cq command.sendMessage Send a messageconsumeMessage Consume messageupdateAclConfig Update acl config yaml file in brokerdeleteAclConfig Delete Acl Config Account in brokerclusterAclConfigVersion List all of acl config version information in clusterupdateGlobalWhiteAddr Update global white address for acl Config File in brokergetAclConfig List all of acl config information in clusterupdateStaticTopic Update or create static topic, which has fixed number of queuesremappingStaticTopic Update or create static topic, which has fixed number of queuesexportMetadata Export metadataexportConfigs Export configsexportMetrics Export metricshaStatus Fetch ha runtime status datagetSyncStateSet Fetch syncStateSet for target brokersgetBrokerEpoch Fetch broker epoch entriesgetControllerMetaData Get controller cluster's metadatagetControllerConfig Get controller config.updateControllerConfig Update controller config.electMaster Re-elect the specified broker as mastercleanBrokerMetadata Clean metadata of broker on controllerdumpCompactionLog parse compaction log to messagegetColdDataFlowCtrInfo get cold data flow ctr infoupdateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group configremoveColdDataFlowCtrGroupConfig remove consumer from cold ctr configsetCommitLogReadAheadMode set read ahead mode for all commitlog files
topicList
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer
statsAll
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll -n localhost:9876
#Topic #Consumer Group #Accumulation #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
stringRequestTopic stringRequestConsumer 1 0.00 0.00 0 0
TRANS_CHECK_MAX_TIME_TOPIC 0 0.00 0 NO_CONSUMER
BenchmarkTest 0 0.00 0 NO_CONSUMER
string-topic string_consumer 106 0.00 0.00 0 0
string-topic string_consumer_newns 63 0.00 0.00 0 0
TBW102 0 0.00 0 NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster 0 0.00 0 NO_CONSUMER
SELF_TEST_TOPIC 0 0.00 0 NO_CONSUMER
SCHEDULE_TOPIC_XXXX 0 0.00 0 NO_CONSUMER
DefaultCluster_REPLY_TOPIC 0 0.00 0 NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC CID_RMQ_SYS_TRANS 0 0.00 0.00 0 0
TopicTest please_rename_unique_group_name 252 0.00 0.00 0 0
TopicTest please_rename_unique_group_name_4 0 0.00 0.00 0 0
localhost.localdomain 0 0.00 0 NO_CONSUMER
order-paid-topic order-paid-consumer 1 0.00 0.00 0 0
user-topic user_consumer 2 0.00 0.00 0 0
message-ext-topic rocketmq-consume-demo-message-ext-consumer 2 0.00 0.00 0 0
OFFSET_MOVED_EVENT 0 0.00 0 NO_CONSUMER
yeqiang-MS-7B23 0 0.00 0 NO_CONSUMER
DefaultCluster 0 0.00 0 NO_CONSUMER
spring-transaction-topic string_trans_consumer 15 0.00 0.00 0 0
bytesRequestTopic bytesRequestConsumer 0 0.00 0.00 0 0
topicStatus
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 35 2023-08-25 16:21:35,786
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
Python 生产者:producer.py
from rocketmq.client import Producer, MessagegroupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 启动生产者
producer.start()# 组装消息 topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()
运行
yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$
mqadmin查询topic状态
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus -n localhost:9876 -t string-topic
#Broker Name #QID #Min Offset #Max Offset #Last Updated
yeqiang-MS-7B23 0 0 36 2023-08-28 09:03:35,722
yeqiang-MS-7B23 1 0 52 2023-08-25 14:55:57,152
yeqiang-MS-7B23 2 0 33 2023-08-25 16:21:35,646
yeqiang-MS-7B23 3 0 42 2023-08-25 14:55:57,172
yeqiang-MS-7B23 4 0 1 2023-08-25 16:21:34,355
yeqiang-MS-7B23 5 0 1 2023-08-25 14:55:57,105
yeqiang-MS-7B23 6 0 4 2023-08-25 16:23:01,489
yeqiang-MS-7B23 7 0 1 2023-08-25 16:21:36,186
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute -n localhost:9876 -t string-topic
{"brokerDatas":[{"brokerAddrs":{0:"10.47.76.67:10911"},"brokerName":"yeqiang-MS-7B23","cluster":"DefaultCluster","enableActingMaster":false}],"filterServerTable":{},"queueDatas":[{"brokerName":"yeqiang-MS-7B23","perm":6,"readQueueNums":8,"topicSysFlag":0,"writeQueueNums":8}]
}
图形工具rocketmq-dashborad
https://github.com/apache/rocketmq-dashboard
自行编译
mvn clean package -Dmaven.test.skip=true
启动
java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36
consoumer.py
import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"def callback(msg):# 模拟业务print('Received message. messageId: ', msg.id, ' body: ', msg.body)# 消费成功回复CONSUME_SUCCESSreturn ConsumeStatus.CONSUME_SUCCESS# 消费成功回复消息状态# return ConsumeStatus.RECONSUME_LATER# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
# accessKey, # 角色密钥
# secretKey, # 角色名称
# ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()while True:time.sleep(3600)
# 资源释放
consumer.shutdown()
启动python消费者
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py[Consumer] Waiting for messages.
Received message. messageId: 7F0001012DF4226307248D16C3250000 body: b'This is a new message.'
可以看到my-group1已被消费
再启动一个consumer.py,产生一次消息
可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。
Java生产一个消息:
training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com
python rocketmq依赖
Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub
python完整程序
python-rocketmq-demo: python3 rocketmq5 的一个例子
相关文章:

RocketMQ mqadmin java springboot python 调用笔记
命令 mqadmin命令列表 yeqiangyeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin The most commonly used mqadmin commands are:updateTopic Update or create topicdeleteTopic Delete topic from broker and NameServer.…...

Java aspose 将HTML导出成Excel文件
1.需求 有一批表格的html文件,需要将这些表格导出成excel文件 2.代码 使用第三方库 aspose ByteArrayInputStream htmlIs new ByteArrayInputStream(htmlBuilder.toString().getBytes()); // 将html字符串构建成输入流 LoadOptions lo new LoadOptions(LoadFo…...

原生微信小程序 动态(横向,纵向)公告(广告)栏
先看一下动态效果 Y轴滚动公告的原理是swiper组件在页面中的Y轴滚动,属性vertical,其余属性也设置一下autoplay circular interval"3000" X轴滚动的原理是,利用动画效果,将内容从右往左过渡过去 wxml: &l…...

pandas和polars简单的对比分析
pandas pandas是基于python写的,底层的数据结构是Numpy数据(ndarray)。pandas自身有两个核心的数据结构:DataFrame和Series,前者是二维的表格数据结构,后者是一维标签化数组。 polars polars是用Rust(一种系统级编程…...

Feign远程调用的使用
假设已配好nacos服务:并且已配好userservice、orderservice,点击跳转 Feign是一个声明式的http客户端,官方地址:https://github.com/OpenFeign/feign,其作用就是在程序中帮助我们优雅的实现http请求的发送,…...

Postman API测试之道:不止于点击,更在于策略
引言:API测试的重要性 在当今的软件开发中,API已经成为了一个不可或缺的部分。它们是软件组件之间交互的桥梁,确保数据的流动和功能的实现。因此,对API的测试显得尤为重要,它不仅关乎功能的正确性,还涉及到…...

5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT
导读:原文《5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT》(获取来源见文尾),本文精选其中精华及架构部分,逻辑清晰、内容完整,为快速形成售前方案提供参考。以下是部分内容, 喜…...

Golang Gorm 一对多的添加
一对多的添加有两种情况: 一种是添加用户的时候同时创建文章其次是创建文章关联已经存在的用户。 package mainimport ("gorm.io/driver/mysql""gorm.io/gorm" )// User 用户表 一个用户拥有多篇文章 type User struct {ID int64Name …...

图像扭曲之锯齿
源码: void wave_sawtooth(cv::Mat& src,cv::Mat& dst,double amplitude,double wavelength) {dst.create(src.rows, src.cols, CV_8UC3);dst.setTo(0);double xAmplitude amplitude;double yAmplitude amplitude;int xWavelength wavelength;int yWave…...

【分布式技术专题】「OSS中间件系列」Minio的文件服务的存储模型及整合Java客户端访问的实战指南
Minio的元数据 数据存储 MinIO对象存储系统没有元数据数据库,所有的操作都是对象级别的粒度的,这种做法的优势是: 个别对象的失效,不会溢出为更大级别的系统失效。便于实现"强一致性"这个特性。此特性对于机器学习与大数据处理非…...

构建个人博客_Obsidian_github.io_hexo
1 初衷 很早就开始分享文档,以技术类的为主,一开始是 MSN,博客,随着平台的更替,后来又用了 CSDN,知乎,简书…… 再后来是 Obsidian,飞书,Notion,常常有以下困…...

烟花厂人员作业释放静电行为检测算法
烟花厂人员作业释放静电行为检测算法通过pythonyolo系列算法模型框架,烟花厂人员作业释放静电行为检测算法在工厂车间入口处能够及时捕捉到人员是否触摸静电释放仪。一旦检测到人员进入时没有触摸静电释放仪,系统将自动触发告警。Python是一种由Guido va…...

ARTS挑战第二周-T:PHP数组相关操作
array_combine() 函数 合并两个数组 array_combine()传入2个参数,使用方法如下 array_combine(array $keys, array $values): array 返回一个 array,用来自 keys 数组的值作为键名,来自 values 数组的值作为相应的值。 array_key_exists() 函…...

【如何对公司网络进行限速?一个案例详解】
有不少朋友问到了关于企业网络QoS配置,这个确实在实际网络应用中非常多,基本上大部分企业或个人都用到这个功能,本期我们详细了解下QoS如何对宽带进行限制,QoS如何企业中应用。 一、什么是QoS? Qos是用来解决网络延迟和阻塞等问…...

服务器安全-修改默认ssh端口
防火墙先打开指定端口,要不修改后连不上(端口需要在65535之内) firewall-cmd --list-ports firewall-cmd --add-port54111/tcp --permanent firewall-cmd --reload-------------------- 先让两个端口同时存在,等配置成功后关闭22端口 vim /etc/ssh/sshd_config重启sshd service…...

保护隐私的第一步:从更新浏览器开始
当今社会已经进入了数字化和网络化的时代,而网络安全问题也日益突显。随着互联网在我们生活中的不断渗透,网络威胁变得愈发普遍和隐蔽。在这样的背景下,网络浏览器作为人们访问互联网的主要工具之一,不仅为我们提供了便捷的上网方…...

Python爬虫框架之快速抓取互联网数据详解
概要 Python爬虫框架是一个能够帮助我们快速抓取互联网数据的工具。在互联网时代,信息爆炸式增长,人们越来越需要一种快速获取信息的方式。而Python爬虫框架就能够帮助我们完成这个任务,它可以帮助我们快速地从互联网上抓取各种数据…...

【算法专题突破】双指针 - 盛最多水的容器(4)
目录 1. 题目解析 2. 算法原理 3. 代码编写 写在最后: 1. 题目解析 题目链接:11. 盛最多水的容器 - 力扣(Leetcode) 这道题目也不难理解, 两边的柱子的盛水量是根据短的那边的柱子决定的, 而盛水量…...

循环神经网络(RNN) | 项目还不成熟 |还在初级阶段
一,定义 循环神经网络(Recurrent Neural Network,RNN)是一种深度学习神经网络架构,专门设计用于处理序列数据,如时间序列数据、自然语言文本等(一般用来解决序列问题)。 因为它们具…...

【Spring Boot】数据库持久层框架MyBatis — MyBatis简介
MyBatis简介 本节首先会介绍什么是ORM、什么是MyBatis、MyBatis的特点以及核心概念,最后介绍MyBatis是如何启动、如何加载配置文件的? 1.什么是ORM ORM(Object Relational Mapping,对象关系映射)是为了解决面向对象…...

K8S Nginx Ingress实现金丝雀发布
通过给 Ingress 资源指定 Nginx Ingress 所支持的 annotation 可实现金丝雀发布。 需给服务创建2个 Ingress,其中1个常规 Ingress,另1个为带 nginx.ingress.kubernetes.io/canary: "true" 固定的 annotation 的 Ingress,称为 Cana…...

【C++入门】new和delete(C/C++内存管理)
目录 1.C/C内存分布2.C语言中动态内存管理方式3.C内存管理方式3.1new/delete操作内置类型3.2new和delete操作自定义类型 4.operator new与operator delete函数5.new和delete的实现原理5.1内置类型5.2自定义类型 6.malloc/free和new/delete的区别7.定位new表达式(了解…...

C++设计模式之桥接模式
文章目录 一、桥接模式二、std::error_code与设计模式(桥接模式)参考 一、桥接模式 在C中,桥接模式通常涉及以下几个角色: 抽象类接口(Abstraction):定义抽象部分的接口,并维护一个…...

前端速查速记系列----评论列表
小程序评论列表 效果图 wxml代码 <view id"econtent"><block wx:for"{{commentlist}}" wx:for-item"item" wx:for-index"index" wx:key"{{item.id}}"><view class"box1"><view class"…...

hiredis的安装与使用
hiredis的介绍 Hiredis 是一个用于 C 语言的轻量级、高性能的 Redis 客户端库。它提供了一组简单易用的 API,用于与 Redis 数据库进行交互。Hiredis 支持 Redis 的所有主要功能,包括字符串、哈希、列表、集合、有序集合等数据结构的读写操作,…...

【InsCode】InsCode打造的JavaSE与Linux命令互融的伪Linux文件系统小项目
🧑💻作者名称:DaenCode 🎤作者简介:啥技术都喜欢捣鼓捣鼓,喜欢分享技术、经验、生活。 😎人生感悟:尝尽人生百味,方知世间冷暖。 📖所属专栏:Ja…...

“深入解析JVM:探索Java虚拟机的内部机制“
标题:深入解析JVM:探索Java虚拟机的内部机制 摘要:本文将深入探索Java虚拟机(JVM)的内部机制,包括JVM的基本结构、内存管理、垃圾回收机制和即时编译器等。通过对JVM内部机制的详细解析,我们可…...

内网远程控制总结
前言 在内网渗透过程中,会碰到远程控制soft或者其他,这里针对远程控制软件做如下总结。 远程控制软件 向日葵篇 向日葵查看版本 向日葵(可以攻击) 针对向日葵的话其实如果有本地安装的话,是有可能存在漏洞的。这…...

Excel显示此值与此单元格定义的数据验证限制不匹配怎么办?
总结:1、在编辑excel文档的时候,弹出此时预测单元格定义的数据验证,限制不匹配的提示。2、这是我们点击菜单来的数据菜单。3、然后点击数据工具栏的数据验证下拉按钮。4、在弹出的菜单中选择数据验证的菜单项。5、然后在打开的窗口中点击左下…...

mysql(八)事务隔离级别及加锁流程详解
目录 MySQL 锁简介什么是锁锁的作用锁的种类共享排他锁共享锁排它锁 粒度锁全局锁表级锁页级锁行级锁种类 意向锁间隙临键记录锁记录锁间隙锁 加锁的流程锁的内存结构加锁的基本流程根据主键加锁根据二级索引加锁根据非索引字段查询加锁加锁规律 锁信息查看查看锁的sql语句 数据…...