当前位置: 首页 > news >正文

RocketMQ 5.0 学习笔记

1. 需求

背景:业务需要,平台将使用rocketMQ来实现消息的发送与消费,替代redis的消息功能。

需要在搭建好rocketMQ平台后,进行研究和验证。

技术:Springboot + RocketMQ5.0

使用场景:签到活动,给用户推送消息,日志上报等

2. 笔记

2.1 安装RocketMQ 5.0

2.1.1 下载

官网:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart/

请注意下载二进制包,二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。

二进制包:https://dist.apache.org/repos/dist/release/rocketmq/5.1.0/rocketmq-all-5.1.0-bin-release.zip

2.1.2 修改参数

解压文件到 D:\rocketmq-5.0,就相当于安装好了
默认的java运行内存很大,这里要修改一下内存配置:
进入bin目录,修改runbroker.sh文件和runserver.sh(如果是windows系统,修改runbroker.cmd文件runserver.cmd

原本是4g,4g,2g的配置,我这里修改为了256m,256m,256m的配置,两个文件都是修改成这样就差不多了。
linux: JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
windows: set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx256m"

修改保存后,就是启动了。
在这里插入图片描述

2.2 启动

启动可以按照官网的quick start启动,如下:

2.2.1 本地windows启动
  1. 启动NameServer
start mqnamesrv.cmd

在这里插入图片描述

  1. 启动broker
start mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true

在这里插入图片描述

  1. 启动proxy
start mqbroker.cmd -n 127.0.0.1:9876

在这里插入图片描述

2.2.2 Linux启动

分别是在解压后的rocketMQ文件夹下执行如下命令:

  1. 启动mqnamesrv请在rocketMQ解压后的文件夹中的bin目录同级使用下面这些命令
### 启动namesrv
$ nohup sh bin/mqnamesrv &### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
  1. 再启动Broker + proxy
### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/broker_default.log 
The broker[broker-a,192.169.1.2:10911] boot success...

可以使用jps查看或者使用如下命令查看日志文件:

tail -f ~/logs/rocketmqlogs/broker.log
  1. 如果用jps可查看启动的服务

OK,启动完成

2.3 RocketMQ Dashborad

参见git官网 https://gitcode.net/mirrors/apache/rocketmq-dashboard/

  1. 本地启动rocketmq-dashboard项目

将项目拉到本地后,idea打开,修改配置文件application.yml

rocketmq:config:# if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, default localhost:9876# configure multiple namesrv addresses to manage multiple different clustersnamesrvAddrs:- 127.0.0.1:9876  #修改namesrv的地址- 127.0.0.2:9876

启动:

windos:启动项目,idea -> run application

linux:先用maven打成jar包,然后 java-jar 启动

2.4 rocketmq + springboot

项目结构:
在这里插入图片描述

  1. 导入依赖
    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies>
  1. application.properties
server.port=8080#自定义proxy地址
rocketmq.proxy = 127.0.0.1:8081
  1. 配置生产者
package com.example.config;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class RocketConfig {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。@Value("${rocketmq.proxy}")private String mqProxy;@Bean(name="MyProducer")public Producer createProducer(){ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(mqProxy);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try {log.info("初始化rocketmq5.0生产者: proxy:{}",mqProxy);Producer producer = provider.newProducerBuilder().setClientConfiguration(configuration).build();log.info("初始化rocketmq5.0生产者成功: proxy:{}", mqProxy);return producer;} catch (ClientException e) {log.info("初始化rocketmq5.0生产者失败:{}", e);}return null;}
}
  1. 消费者
package com.example.service;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.Collections;@Slf4j
@Component
public class RocketConsumer {@Value("${rocketmq.proxy}")private String mqProxy;// 为消费者指定所属的消费者分组,Group需要提前创建。private static final String My_Consumer_Group = "myConsumerGroup1";// 指定需要订阅哪个目标Topic,Topic需要提前创建。private static final String My_Topic = "myTopicTest1";@Bean(name = "MyConsumer")public void mqConsumer(){ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(mqProxy).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。try {log.info("构建消费者:proxy: {}, consumer_group: {}, topic: {}", mqProxy, My_Consumer_Group, My_Topic);provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(My_Consumer_Group)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(My_Topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。log.info("消费消息:{}", messageView);log.info("消息内容:messageId={}, messageBody={}", messageView.getMessageId(),StandardCharsets.UTF_8.decode(messageView.getBody()).toString());return ConsumeResult.SUCCESS;}).build();log.info("构建消费者成功:proxy: {}, consumer_group: {}, topic: {}", mqProxy, My_Consumer_Group, My_Topic);} catch (ClientException e) {log.info("构建消费者异常:proxy: {}, consumer_group: {}, topic: {}, Excepiton:", mqProxy, My_Consumer_Group, My_Topic, e);}}
}
  1. 接口测试
package com.example.controller;import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@RestController
public class TestMqController {@Resource(name = "MyProducer")private Producer producer;@GetMapping("/sendMessage")public String sendMessage() throws ClientException {MessageBuilder messageBuilder = new MessageBuilderImpl();String msgStr = "a test message for rocketmq5.0 ...";Message message = messageBuilder.setTopic("myTopicTest1").setBody(msgStr.getBytes(StandardCharsets.UTF_8)).build();SendReceipt send = producer.send(message);return "success";}
}
  1. 启动服务

在这里插入图片描述

  1. 调接口
http://127.0.0.1:8080/sendMessage
  1. 后台日志
2023-03-07 19:19:15.915  INFO 21508 --- [onsumption-1-34] com.example.service.RocketConsumer       : 消费消息:MessageViewImpl{messageId=01A87EEA967B8354040418B7B300000000, topic=myTopicTest1, bornHost=DESKTOP-RNCSLDE, bornTimestamp=1678187955816, endpoints=ipv4:127.0.0.1:8081, deliveryAttempt=1, tag=null, keys=[], messageGroup=null, deliveryTimestamp=null, properties={}}
2023-03-07 19:19:15.915  INFO 21508 --- [onsumption-1-34] com.example.service.RocketConsumer       : 消息内容:messageId=01A87EEA967B8354040418B7B300000000, messageBody=a test message for rocketmq5.0 ...
  1. dashborad

本地启动rocketmq dashborad, 修改服务启动端口 server.port: 8088

访问面板:http://127.0.0.1:8088/#/ (我调用了4次接口)

在这里插入图片描述
在这里插入图片描述

3. 思考

  1. 消费异常如何处理?
    • 打印日志,记录msgId, body等
    • 重试机制是怎样的
  2. 之前的消息是存在redis,消息如何从redis平滑迁移到rocketmq

代码参考文章:RocketMQ 5.0 实战

相关文章:

RocketMQ 5.0 学习笔记

1. 需求 背景&#xff1a;业务需要&#xff0c;平台将使用rocketMQ来实现消息的发送与消费&#xff0c;替代redis的消息功能。 需要在搭建好rocketMQ平台后&#xff0c;进行研究和验证。 技术&#xff1a;Springboot RocketMQ5.0 使用场景&#xff1a;签到活动&#xff0c…...

796.子矩阵的和

输入一个 n行 m列的整数矩阵&#xff0c;再输入 q个询问&#xff0c;每个询问包含四个整数 x1,y1,x2,y2&#xff0c;表示一个子矩阵的左上角坐标和右下角坐标。 对于每个询问输出子矩阵中所有数的和。 输入格式 第一行包含三个整数 n&#xff0c;m&#xff0c;q。 接下来 n…...

【PySide6】信号(signal)和槽函数(slot),以及事件过滤器

说明 在PYQT中&#xff0c;父控件可以通过两种方式响应子控件的事件&#xff1a; 通过信号(signal)和槽函数(slot)机制连接子控件和父控件父控件可以通过设置eventFilter()方法来监听响应子控件的事件 一、信号(signal)和槽函数(slot) 示例 在PYQT中&#xff0c;每个组件都…...

canal admin管理端配置(二)

下载安装 下载地址&#xff1a; 下载解压即可 配置 修改canal.admin-1.1.5\conf\application.yml server:port: 8089 #端口根据是否冲突修改 spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT8spring.datasource:address: 192.0.16.12:3306#数据库ip和端口…...

Servlet 生命周期

Servlet的生命周期有四个阶段&#xff1a;加载并实例化、初始化、请求处理、销毁。主要涉及到的方法有init、service、doGet、doPost、destory等 加载并实例化 Servlet容器负责加载和实例化Servelt。当Servlet容器启动时&#xff0c;或者在容器检测到需要这个Servlet来响应第一…...

redis集群模式登陆

总结redis单机模式时&#xff0c;登陆redis的命令格式&#xff1a; ./redis-cli -h 地址 -p 端口redis集群模式时&#xff0c;登陆redis的命令格式&#xff1a; ./redis-cli -h 地址 -p 端口 -c举例1&#xff1a;redis单机模式下登陆rootubuntu:/usr/local/redis/redis-7.0.0/b…...

04-useMemo 、React.memo、useCallback

useMemo 、React.memo、useCallback 一、useMemo 基本用法 缓存数据&#xff0c;模拟 Vue 中的计算属性。 同样useMemo跟vue中component一样&#xff0c;也是有缓存的&#xff0c;会将结果缓存下来 import React, { useMemo, useState } from react;export default functio…...

windows下安装emqx Unable to load emulator DLL@if ===/ SET data_dir=“

1.报错内容 I:\0-software\02-emqx\emqx-5.0.19-windows-amd64\bin>emqx start Unable to load emulator DLL (I:\0-software\02-emqx\emqx-5.0.19-windows-amd64\erts-12.3.2.9\bin\beam.smp.dll) 此时不应有 SET。 I:\0-software\02-emqx\emqx-5.0.19-windows-amd64\bin&…...

Redis常见问题(未完待续)

Redis常见问题Redis为什么快 &#xff1f;Redis为什么快 &#xff1f; 根据官方数据&#xff0c;Redis 的 QPS 可以达到约 100000&#xff08;每秒请求数&#xff09;&#xff1b; 基于内存 对于磁盘数据库来说&#xff0c;首先要将数据通过 IO 操作读取到内存里再读取&#x…...

2024秋招BAT核心算法 | 详解图论

图论入门与最短路径算法 图的基本概念 由节点和边组成的集合 图的一些概念&#xff1a; ①有向边&#xff08;有向图&#xff09;&#xff0c;无向边&#xff08;无向图&#xff09;&#xff0c;权值 ②节点&#xff08;度&#xff09;&#xff0c;对应无向图&#xff0c;…...

凝聚共识,锚定未来 | 第四届OpenI/O 启智开发者大会NLP大模型论坛成功举办!

2023年2月24日下午&#xff0c;第四届OpenI/O启智开发者大会NLP大模型分论坛在深圳人才研修院隆重举办。该论坛以“开源集智创新探索中文NLP大模型生态发展”为主题&#xff0c;众多业内人士和研发者在此共享NLP领域的前沿动态和研发经验&#xff0c;畅想中国NLP领域的发展前景…...

99.【Git】

Git(一)、什么是版本控制1.什么是版本控制2、常见的版本控制工具(二)、版本控制分类1、本地版本控制2、集中版本控制 SVN3、分布式版本控制 Git(三)、Git与SVN的主要区别1、Git历史(四)、Git下载与环境配置1.git下载2、启动Git(五)、常用的Linux命令1.Linux常用命令(六)、Git必…...

Linux驱动交叉编译把驱动文件放入开发板,以及printk函数打印级别

上一篇介绍了一个最简单的驱动程序和驱动程序大体结构&#xff0c;但那还是用本地编译只能在Ubuntu上运行&#xff0c;我们该怎么编译一个能加载到开发板上呢&#xff0c;就需要交叉编译&#xff0c;交叉编译通常都是在嵌入式开发中使用到的。 交叉编译 理解交叉编译前先了解…...

力扣(LeetCode)433. 最小基因变化(2023.03.07)

基因序列可以表示为一条由 8 个字符组成的字符串&#xff0c;其中每个字符都是 ‘A’、‘C’、‘G’ 和 ‘T’ 之一。 假设我们需要调查从基因序列 start 变为 end 所发生的基因变化。一次基因变化就意味着这个基因序列中的一个字符发生了变化。 例如&#xff0c;“AACCGGTT”…...

网络基础(2)

目录1. 端口号2. 套接字socket3. 网络通信3.1 sockaddr与sockaddr_in3.2 接口服务端3.2.1 创建套接字&#xff0c;打开网络文件3.2.2 给该服务器绑定端口和ip&#xff08;特殊处理&#xff09;3.2.3 初始化相关服务器3.2.4 提供服务客户端3.2.5 绑定3.2.6 使用服务4. makefile实…...

掌握Spring Cloud Gateway:构建高性能API网关的原理和实践

Spring Cloud Gateway 是一个基于 Spring Boot 的 API 网关&#xff0c;用于构建微服务架构中的网关服务。它提供了统一的路由、请求转发、过滤器、负载均衡、熔断等功能&#xff0c;帮助开发者更好地管理和控制微服务系统的请求流量。 本文将介绍 Spring Cloud Gateway 的原理…...

NAST概述

一、NATS介绍 NATS是由CloudFoundry的架构师Derek开发的一个开源的、轻量级、高性能的&#xff0c;支持发布、订阅机制的分布式消息队列系统。它的核心基于EventMachine开发&#xff0c;代码量不多&#xff0c;可以下载下来慢慢研究。 不同于Java社区的kafka&#xff0c;nats…...

【JS知识点】——原型和原型链

文章目录原型和原型链构造函数原型显式原型&#xff08;prototype&#xff09;隐式原型&#xff08;\_\_proto\_\_&#xff09;原型链总结原型和原型链 在js中&#xff0c;原型和原型链是一个非常重要的知识点&#xff0c;只有理解原型和原型链&#xff0c;才能深刻理解JS。在…...

c盘怎么清理到最干净?有什么好的清理方法

c盘怎么清理到最干净?有什么好的清理方法&#xff1f;清理C盘空间是电脑维护的重要步骤之一。C盘是Windows操作系统的核心部分&#xff0c;保存了许多重要的系统文件&#xff0c;因此空间不足会影响计算机的性能和稳定性。下面是一些清理C盘空间的方法 一.清理临时文件 在使用…...

day26_HTML

今日内容 上课同步视频:CuteN饕餮的个人空间_哔哩哔哩_bilibili 同步笔记沐沐霸的博客_CSDN博客-Java2301 零、 复习昨日 一、二阶段介绍 二、HTML 零、 复习昨日 见代码 一、二阶段介绍 第一阶段: 基础入门 java基本语法编程基础(方法,数组)面向对象编程常用类高级(IO,线程,新…...

2024具身智能技术全景解析:从人形机器人到AGI的硬件与算法协同进化

1. 具身智能&#xff1a;当机器人学会"思考"和"行动" 想象一下&#xff0c;你家的扫地机器人不仅能自动规划路线清洁地板&#xff0c;还能在你做饭时递调料瓶、在你工作疲惫时泡一杯咖啡——这不是科幻电影&#xff0c;而是具身智能技术正在实现的场景。具…...

6种专业计时模式:让OBS直播时间管理变得如此简单

6种专业计时模式&#xff1a;让OBS直播时间管理变得如此简单 【免费下载链接】obs-advanced-timer 项目地址: https://gitcode.com/gh_mirrors/ob/obs-advanced-timer 想让你的直播画面看起来更加专业吗&#xff1f;OBS高级计时器正是你需要的秘密武器&#xff01;这款…...

从移位相加到硬件实现:FPGA二进制乘法器的设计精髓

1. 从纸笔计算到硬件逻辑&#xff1a;二进制乘法的本质 记得第一次学二进制乘法时&#xff0c;我拿着铅笔在纸上画了半天移位相加的步骤。比如计算11011011&#xff0c;就像小学生列竖式一样&#xff0c;先写下110111101&#xff0c;然后11011左移一位变成11010&#xff0c;接着…...

MMC级联H桥仿真图解析:电压电流双闭环控制策略研究

MMC&#xff0c;级联H桥仿真图&#xff0c;电压电流双闭环。最近在搞MMC&#xff08;模块化多电平换流器&#xff09;的仿真&#xff0c;发现这玩意儿真是电力电子界的乐高——全靠子模块堆叠。特别是级联H桥的结构&#xff0c;玩电压合成比搭积木刺激多了。今天咱们就着电压电…...

AI 辅助开发实战:基于低代码与智能生成的五金店管理系统毕设架构设计

最近在帮学弟学妹们看毕业设计&#xff0c;发现“五金店管理系统”是个高频选题。但很多人做着做着就陷入了“增删改查”的泥潭&#xff0c;前端界面简陋&#xff0c;业务逻辑也写得七零八落&#xff0c;最后答辩时演示效果平平&#xff0c;技术深度更是无从谈起。这让我开始思…...

实战演练,用快马生成GitHub团队协作项目,掌握Issue管理和CI/CD集成

最近在团队协作开发时&#xff0c;发现很多新成员对GitHub的完整工作流不太熟悉。于是我用InsCode(快马)平台快速搭建了一个GitHub实战项目&#xff0c;模拟真实开发场景。这个项目特别适合想系统学习团队协作的小伙伴&#xff0c;下面分享我的实践过程&#xff1a; 项目初始化…...

合肥高中英语一对一辅导2026指南,突破听说读写全面提升路径

合肥高中英语一对一辅导2026指南&#xff0c;突破听说读写全面提升路径据《2026年中国基础教育课外辅导行业白皮书》数据显示&#xff0c;2026年高中阶段英语学科辅导需求同比增长23%&#xff0c;其中超过65%的学生家长明确表示&#xff0c;传统大班教学已无法满足孩子个性化提…...

基于人工电场搜索智能优化算法的水库发电和供水优化调度

基于人工电场搜索智能优化算法的水库发电和供水优化调度&#xff1b; 代码为MATLAB编写&#xff0c;可直接运行&#xff1b; 含有实例数据&#xff0c;点击即可运行&#xff0c;替换成自己数据点击即可出结果&#xff0c;如图。在水库管理中&#xff0c;实现发电和供水的优化调…...

AI赋能安装流程:快马智能诊断工具,自动解决软件安装兼容性问题

在开发软件的过程中&#xff0c;安装环节往往是第一个拦路虎。特别是当遇到系统环境复杂、依赖库版本冲突、权限配置等问题时&#xff0c;传统的安装方式常常让人头疼不已。最近我在尝试开发一个智能安装问题诊断工具时&#xff0c;发现InsCode(快马)平台的AI辅助功能特别实用&…...

如何突破英雄联盟操作效率瓶颈?League-Toolkit的5大革新功能解析

如何突破英雄联盟操作效率瓶颈&#xff1f;League-Toolkit的5大革新功能解析 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 在快…...