RocketMq基础学习+SpringBoot集成
学习贴:参考https://blog.csdn.net/zhiyikeji/article/details/138286088
文章目录
- 普通消息
- 顺序消息
- 延迟消息
- 批量消息
- 事务消息
- SpringBoot整合RocketMQ

3.1 NameServer
NameServer是一个简单的路由注册中心,支持Topic和Broker的动态注册和发现。作用主要包括两点:
Broker管理:Broker会把集群信息注册到NameServer上,NameServer会把这些信息记录下来,作为路由信息的基本数据。然后还会提供心态检测机制,检查Broker是否还存活
路由信息:因为NameServer存放的有Broker集群的基本信息(例如有哪些Borker可用,以及Broker下的队列信息),所以Producer和Consumer就可以通过NameServer知道整个Broker集群信息,生产者生产的信息就可以知道往哪个Broker下的哪个Message queue队列中投递消息,消费者也可以通过自己的配置信息去哪个broker下哪个队列中去拉取消息进行消费。
NameServer是无状态的,且NameServer集群下各个NameServer是互相不通信的,没有任何信息同步操作。每个Broker都会与NameServer集群下的每一个NameServer节点建立长链接,然后会向每一个NameServer注册自己的路由信息,所以每个NamerServer下都保存了Broker集群的完整路由信息。当某个NameServer节点挂掉后,消费者和生产者也可以通过其他NameServer获取到Broker的完整信息,所以大部分情况下NameServer通常会部署多个实例
Broker代理服务器
主要负责生产者生产消息的存储,为消费者拉取信息提供查询,也会存储消息相关的一些其他数据,例如Topic信息、队列信息、消费进度偏移量等。
而且Broker是服务高可用的保证:相对于NameServer来说,Broker的部署会相对于复杂一些。
普通主从集群模式:
这种集群下会给每个节点分配特定的角色,分为Master和Slave,一个Master可以对应多个Slave但是一个Slave只能对应一个Master,master负责响应生成存储消息的请求,并存储消息。slave负责储存从主节点同步过来的数据(可以选择同步或者异步),我们可以通过配置conf目录下的配置文件来选择如何配置。我们可以通过指定相同的BrokerName,不同的BrokerId来制定一个Broker集群。BrokerId中0代表master,非0代表slave。但是这种模式下的弊端就是各个节点的角色无法切换,如果一个master挂掉后,这一组的Broker就不可用了
Dledger高可用集群:
这个是RocketMQ4.5版本后提供的一种集群高可用模式,这个模式下会随机选举出一个节点作为master,当master节点挂了后,会通过Raft机制然后会从slave节点中选择一个节点升级为master。
普通消息
发送消息的方式有同步、异步、单向三种方式
同步方式:同步方式是最常用的方式,通常是发送一条消息后早收到服务端同步响应之后,然后再发送下一条消息,可以用于一些比较重要的场景,例如:短信发送
异步方式:发送一条消息到服务端后不需要等到服务端响应,就可以继续发送下一条消息,但是需要注册回调接口,然后通过回调接口获取服务端的响应,经常用于一些链路执行过长的场景
单向方式:不需要等待服务端响应也不需要注册回调接口,就可以继续发送下一条消息,耗时非常短,通常适用于耗时短但是可靠性要求没那么高的场景。
发送消息的步骤大概如下:
- 创建生产者:普通消息通常都是DefaultProducer,然后设置一个produceGroup的名字
- 注册NameServer地址:可以setNameServer(),如果是多个中间以;分开
- 构建消息体:Topic、tag、keys、消息体等信息
- 发送消息:通过send()方法发送消息
顺序消息
顺序消息是对生产者生产消息和消费者消费消息的顺序有严格要求的。
但是RocketMQ并不能保证所有消息的有序性,因为默认情况下一个Topic下的消息会发送到不同的Message queue上,消费者也会从不同的Message queue上拉取消息,这种情况下是不能保证有序的。
RocketMQ的有序性是要保证Producer、Broker、Consumer三者的有序性,严格按照FIFO方式来对消息进行处理,我们可以从生产者把消息发送到同一个Message queue上,所以只能有一个生产者,因为多个生产者的生产的消息是无法有序的,并且生成者发送消息不能选择多线程的方式。然后消费者可以注册一个MessageListenerOrderly(),在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。
int orderId = i & 10;try {Message message = new Message("TopicTest",tags[i % tags.length],"KEYS"+i,"BODY".getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置一个MessageQueueSelector队列选择器SendResult sendResult = orderMessageProducer.send(message, new MessageQueueSelector() {//list 消息队列列表,args 我们传入的orderId@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object args) {Integer id = (Integer)args;//这样可以确保相同orderId的消息总是发送到相同的队列,实现消息的顺序性return list.get( id%list.size() );}}, orderId);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}
延迟消息
延时消息是生产者生产的消息发送到服务端后,并不希望马上被消费,而是希望延迟一段时间后才被消费
try {Message message = new Message("TopicTest","tags","OrderId" + i,"test".getBytes(RemotingHelper.DEFAULT_CHARSET));//设置延迟级别message.setDelayTimeLevel(3);SendResult sendResult = scheduledMessageProducer.send(message);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}
批量消息
可以把消息整合到一批后在进行发送,可以增加吞吐率,并减少API和网络调用次数
ArrayList<Message> messages = new ArrayList<Message>();messages.add(new Message("TopicTest", "tag1", "1", "test1".getBytes()));messages.add(new Message("TopicTest", "tag2", "2", "test".getBytes()));messages.add(new Message("TopicTest", "tag3", "3", "test3".getBytes()));SendResult sendResult = batchProducer.send(messages);
事务消息
事务消息是在分布式系统中 保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两 个操作的原子性,也就是这两个操作一起成功或者一起失败。
对于一些数据具有强一致性场景的情况下,例如上游订单付款成功后,下游才可以进行积分变更、物流发货、购物车状态变更。这种类似场景下可以选择事务消息。
SpringBoot整合RocketMQ
创建一个MAVEN项目,引入依赖:
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.3.10.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.10.RELEASE</version></dependency>
</dependencies>
创建一个配置类:application.properties
#NameServer地址
rocketmq.name-server=ip地址:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup
创建一个启动类:
@SpringBootApplication
public class RocketMqApplication {public static void main(String[] args) {SpringApplication.run(RocketMqApplication.class,args);}
}
创建一个生产者类:
@Component
public class SpringProducer {@ResourceRocketMQTemplate rocketMqTemplate;public void sendMessage(String topic, String message){rocketMqTemplate.convertAndSend(topic,message);}
}
创建一个消费者类:
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TopicTest")
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println(s);}
}
创建一个Controller类:
@RestController
public class MqSendController {@Resourceprivate SpringProducer springProducer;@Value("${producer.topic}")String topic;@GetMapping("/send")public void sendMessage(@RequestParam("message") String message){springProducer.sendMessage(topic,message);}
}
然后访问:http://localhost:8080/send?message=test
相关文章:
RocketMq基础学习+SpringBoot集成
学习贴:参考https://blog.csdn.net/zhiyikeji/article/details/138286088 文章目录 普通消息顺序消息延迟消息批量消息事务消息 SpringBoot整合RocketMQ 3.1 NameServer NameServer是一个简单的路由注册中心,支持Topic和Broker的动态注册和发现。作用主…...
分布式cap
P(分区安全)都能保证,就是在C(强一致)和A(性能)之间做取舍。 (即立马做主从同步,还是先返回写入结果等会再做主从同步。类似的还有,缓存和db之间的同步。&am…...
mybatis-xml映射文件及mybatis动态sql
规范 XML映射文件的名称与Mapper接口名称一致,并且将XML映射文件和Mapper接口放置在相同包下(同包同名)。 XML映射文件的namespace属性为Mapper接口全限定名一致。 XML映射文件中sql语句的id与Mapper接口中的方法名一致,并保持返回类型一致…...
计算机网络复习——概念强化作业
物理层负责网络通信的二进制传输 用于将MAC地址解析为IP地址的协议为RARP。 一个交换机接收到一帧,其目的地址在它的MAC地址表中查不到,交换机应该向除了来的端口外的所有其它端口转发。 关于ICMP协议,下面的论述中正确的是ICMP可传送IP通信过程中出现的错误信息。 在B类网络…...
用友BIP与旺店通数据集成方案解析
用友BIP与旺店通企业奇门的供应商集成同步方案 在现代企业的数据管理中,跨平台的数据集成是实现高效业务运作的关键环节。本文将分享一个实际案例:如何通过轻易云数据集成平台,将用友BIP系统中的供应商数据无缝对接到旺店通企业奇门…...
string类函数的手动实现
在上一篇文章中,我们讲解了一些string类的函数,但是对于我们要熟练掌握c是远远不够的,今天,我将手动实现一下这些函数~ 注意:本篇文章中会大量应用复用,这是一种很巧妙的方法 和以往一样,还是…...
Oceanbase离线集群部署
准备工作 两台服务器 服务器的配置参照官网要求来 服务器名配置服务器IPoceanbase116g8h192.168.10.239oceanbase216g8h192.168.10.239 这里选oceanbase1作为 obd机器 oceanbase安装包 选择社区版本的时候自己系统的安装包 ntp时间同步rpm包 联网机器下载所需的软件包 …...
transformers生成式对话机器人
简介 生成式对话机器人是一种先进的人工智能系统,它能够通过学习大量的自然语言数据来模拟人类进行开放、连贯且创造性的对话。与基于规则或检索式的聊天机器人不同,生成式对话机器人并不局限于预定义的回答集,而是可以根据对话上下文动态地…...
WPF中的VisualState(视觉状态)
以前在设置控件样式或自定义控件时,都是使用触发器来进行样式更改。触发器可以在属性值发生更改时启动操作。 像这样: <Style TargetType"ListBoxItem"><Setter Property"Opacity" Value"0.5" /><Setter …...
C#设计模式--状态模式(State Pattern)
状态模式是一种行为设计模式,它允许对象在其内部状态发生变化时改变其行为。这种模式的核心思想是将状态封装在独立的对象中,而不是将状态逻辑散布在整个程序中。 用途 简化复杂的条件逻辑:通过将不同的状态封装在不同的类中,可…...
〔 MySQL 〕索引
目录 1. 没有索引,可能会有什么问题 2. 认识磁盘 MySQL与存储 先来研究一下磁盘: 在看看磁盘中一个盘片编辑 扇区 定位扇区编辑 结论 磁盘随机访问(Random Access)与连续访问(Sequential Access) 3. MySQL 与磁盘交互基本单位 4. 建立共识…...
计算机网络研究实训室建设方案
一、概述 本方案旨在规划并实施一个先进的计算机网络研究实训室,旨在为学生提供一个深入学习、实践和研究网络技术的平台。实训室将集教学、实验、研究于一体,覆盖网络基础、网络架构、网络安全、网络管理等多个领域,以培养具备扎实理论基础…...
韩企研学团造访图为科技:共探人工智能创新前沿
今日,一支由韩国知名企业研学专家组成的代表团莅临图为科技深圳总部,展开了一场深度技术交流与研讨活动。 此次访问旨在通过实地探访中国领先的科技企业,促进中韩两国在科技创新领域的深入合作与交流。 韩国游学团合影 图为科技作为一家在人…...
html button 按钮单选且 高亮
<DIV class"middle"> <div class"containerTarget"> <span class"hover-target1" οnclick"btn(1);">韵达 </span> <span class"hover-target2" οnclick"btn(2);">中通 </span…...
图片上传HTML
alioss sky:jwt:# 设置jwt签名加密时使用的秘钥admin-secret-key: itcast# 设置jwt过期时间admin-ttl: 7200000# 设置前端传递过来的令牌名称admin-token-name: tokenalioss:endpoint: ${sky.alioss.endpoint}access-key-id: ${sky.alioss.access-key-id}access-key-secret: $…...
C++学习-函数
C 函数 目录 函数默认参数引用传参函数重载 数量不同类型不同 内联函数 函数默认参数 #include<iostream>using std::cout; using std::endl;int power(int n, int x2); // x2 是默认参数int main() {cout << power(5) << endl; // 没有传 x 的值&#x…...
spring boot 测试 mybatis mapper类
spring boot 测试 mybatis mapper类 针对 mybatis plus不启动 webserver指定加载 xml 【过滤 “classpath*:/mapper/**/*.xml” 下的xml】, mapper xml文件名和mapper java文件名称要一样,是根据文件名称过滤的。默认情况加载和解析所有mapper.xml 自定义 MapperT…...
远程游戏新体验!
在这个数字化的时代,游戏已经不仅限于家里的电视或书房的电脑了。远程游戏,也就是通过远程控制软件在不同地点操作游戏设备,给玩家带来了前所未有的自由和灵活性。RayLink远程控制软件,凭借其出色的性能和专为游戏设计的功能&…...
Let up bring up a linux.part2 [十一]
之前的篇幅中我们已经将 Linux 内核 bringup 起来了,不知道大家有没有去尝试将根文件系统运行起来,今天我就带领大家完成这个事情,可以跟着下面的步骤一步步来完成: 在这里我们使用 busybox 构建 rootfs: 下载 busyb…...
调用大模型api 批量处理图像 保存到excel
最近需要调用大模型,并将结果保存到excel中,效果如下: 代码: import base64 from zhipuai import ZhipuAI import os import pandas as pd from openpyxl import Workbook from openpyxl.drawing.image import Image from io i…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...
深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
4. TypeScript 类型推断与类型组合
一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式,自动确定它们的类型。 这一特性减少了显式类型注解的需要,在保持类型安全的同时简化了代码。通过分析上下文和初始值,TypeSc…...
掌握 HTTP 请求:理解 cURL GET 语法
cURL 是一个强大的命令行工具,用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中,cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
智能职业发展系统:AI驱动的职业规划平台技术解析
智能职业发展系统:AI驱动的职业规划平台技术解析 引言:数字时代的职业革命 在当今瞬息万变的就业市场中,传统的职业规划方法已无法满足个人和企业的需求。据统计,全球每年有超过2亿人面临职业转型困境,而企业也因此遭…...
