RocketMq基础学习+SpringBoot集成
学习贴:参考https://blog.csdn.net/zhiyikeji/article/details/138286088
文章目录
- 普通消息
- 顺序消息
- 延迟消息
- 批量消息
- 事务消息
- SpringBoot整合RocketMQ
3.1 NameServer
NameServer是一个简单的路由注册中心,支持Topic和Broker的动态注册和发现。作用主要包括两点:
Broker管理:Broker会把集群信息注册到NameServer上,NameServer会把这些信息记录下来,作为路由信息的基本数据。然后还会提供心态检测机制,检查Broker是否还存活
路由信息:因为NameServer存放的有Broker集群的基本信息(例如有哪些Borker可用,以及Broker下的队列信息),所以Producer和Consumer就可以通过NameServer知道整个Broker集群信息,生产者生产的信息就可以知道往哪个Broker下的哪个Message queue队列中投递消息,消费者也可以通过自己的配置信息去哪个broker下哪个队列中去拉取消息进行消费。
NameServer是无状态的,且NameServer集群下各个NameServer是互相不通信的,没有任何信息同步操作。每个Broker都会与NameServer集群下的每一个NameServer节点建立长链接,然后会向每一个NameServer注册自己的路由信息,所以每个NamerServer下都保存了Broker集群的完整路由信息。当某个NameServer节点挂掉后,消费者和生产者也可以通过其他NameServer获取到Broker的完整信息,所以大部分情况下NameServer通常会部署多个实例
Broker代理服务器
主要负责生产者生产消息的存储,为消费者拉取信息提供查询,也会存储消息相关的一些其他数据,例如Topic信息、队列信息、消费进度偏移量等。
而且Broker是服务高可用的保证:相对于NameServer来说,Broker的部署会相对于复杂一些。
普通主从集群模式:
这种集群下会给每个节点分配特定的角色,分为Master和Slave,一个Master可以对应多个Slave但是一个Slave只能对应一个Master,master负责响应生成存储消息的请求,并存储消息。slave负责储存从主节点同步过来的数据(可以选择同步或者异步),我们可以通过配置conf目录下的配置文件来选择如何配置。我们可以通过指定相同的BrokerName,不同的BrokerId来制定一个Broker集群。BrokerId中0代表master,非0代表slave。但是这种模式下的弊端就是各个节点的角色无法切换,如果一个master挂掉后,这一组的Broker就不可用了
Dledger高可用集群:
这个是RocketMQ4.5版本后提供的一种集群高可用模式,这个模式下会随机选举出一个节点作为master,当master节点挂了后,会通过Raft机制然后会从slave节点中选择一个节点升级为master。
普通消息
发送消息的方式有同步、异步、单向三种方式
同步方式:同步方式是最常用的方式,通常是发送一条消息后早收到服务端同步响应之后,然后再发送下一条消息,可以用于一些比较重要的场景,例如:短信发送
异步方式:发送一条消息到服务端后不需要等到服务端响应,就可以继续发送下一条消息,但是需要注册回调接口,然后通过回调接口获取服务端的响应,经常用于一些链路执行过长的场景
单向方式:不需要等待服务端响应也不需要注册回调接口,就可以继续发送下一条消息,耗时非常短,通常适用于耗时短但是可靠性要求没那么高的场景。
发送消息的步骤大概如下:
- 创建生产者:普通消息通常都是DefaultProducer,然后设置一个produceGroup的名字
- 注册NameServer地址:可以setNameServer(),如果是多个中间以;分开
- 构建消息体:Topic、tag、keys、消息体等信息
- 发送消息:通过send()方法发送消息
顺序消息
顺序消息是对生产者生产消息和消费者消费消息的顺序有严格要求的。
但是RocketMQ并不能保证所有消息的有序性,因为默认情况下一个Topic下的消息会发送到不同的Message queue上,消费者也会从不同的Message queue上拉取消息,这种情况下是不能保证有序的。
RocketMQ的有序性是要保证Producer、Broker、Consumer三者的有序性,严格按照FIFO方式来对消息进行处理,我们可以从生产者把消息发送到同一个Message queue上,所以只能有一个生产者,因为多个生产者的生产的消息是无法有序的,并且生成者发送消息不能选择多线程的方式。然后消费者可以注册一个MessageListenerOrderly(),在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。
int orderId = i & 10;try {Message message = new Message("TopicTest",tags[i % tags.length],"KEYS"+i,"BODY".getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置一个MessageQueueSelector队列选择器SendResult sendResult = orderMessageProducer.send(message, new MessageQueueSelector() {//list 消息队列列表,args 我们传入的orderId@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object args) {Integer id = (Integer)args;//这样可以确保相同orderId的消息总是发送到相同的队列,实现消息的顺序性return list.get( id%list.size() );}}, orderId);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}
延迟消息
延时消息是生产者生产的消息发送到服务端后,并不希望马上被消费,而是希望延迟一段时间后才被消费
try {Message message = new Message("TopicTest","tags","OrderId" + i,"test".getBytes(RemotingHelper.DEFAULT_CHARSET));//设置延迟级别message.setDelayTimeLevel(3);SendResult sendResult = scheduledMessageProducer.send(message);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}
批量消息
可以把消息整合到一批后在进行发送,可以增加吞吐率,并减少API和网络调用次数
ArrayList<Message> messages = new ArrayList<Message>();messages.add(new Message("TopicTest", "tag1", "1", "test1".getBytes()));messages.add(new Message("TopicTest", "tag2", "2", "test".getBytes()));messages.add(new Message("TopicTest", "tag3", "3", "test3".getBytes()));SendResult sendResult = batchProducer.send(messages);
事务消息
事务消息是在分布式系统中 保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两 个操作的原子性,也就是这两个操作一起成功或者一起失败。
对于一些数据具有强一致性场景的情况下,例如上游订单付款成功后,下游才可以进行积分变更、物流发货、购物车状态变更。这种类似场景下可以选择事务消息。
SpringBoot整合RocketMQ
创建一个MAVEN项目,引入依赖:
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.3.10.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.10.RELEASE</version></dependency>
</dependencies>
创建一个配置类:application.properties
#NameServer地址
rocketmq.name-server=ip地址:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup
创建一个启动类:
@SpringBootApplication
public class RocketMqApplication {public static void main(String[] args) {SpringApplication.run(RocketMqApplication.class,args);}
}
创建一个生产者类:
@Component
public class SpringProducer {@ResourceRocketMQTemplate rocketMqTemplate;public void sendMessage(String topic, String message){rocketMqTemplate.convertAndSend(topic,message);}
}
创建一个消费者类:
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TopicTest")
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println(s);}
}
创建一个Controller类:
@RestController
public class MqSendController {@Resourceprivate SpringProducer springProducer;@Value("${producer.topic}")String topic;@GetMapping("/send")public void sendMessage(@RequestParam("message") String message){springProducer.sendMessage(topic,message);}
}
然后访问:http://localhost:8080/send?message=test
相关文章:

RocketMq基础学习+SpringBoot集成
学习贴:参考https://blog.csdn.net/zhiyikeji/article/details/138286088 文章目录 普通消息顺序消息延迟消息批量消息事务消息 SpringBoot整合RocketMQ 3.1 NameServer NameServer是一个简单的路由注册中心,支持Topic和Broker的动态注册和发现。作用主…...

分布式cap
P(分区安全)都能保证,就是在C(强一致)和A(性能)之间做取舍。 (即立马做主从同步,还是先返回写入结果等会再做主从同步。类似的还有,缓存和db之间的同步。&am…...
mybatis-xml映射文件及mybatis动态sql
规范 XML映射文件的名称与Mapper接口名称一致,并且将XML映射文件和Mapper接口放置在相同包下(同包同名)。 XML映射文件的namespace属性为Mapper接口全限定名一致。 XML映射文件中sql语句的id与Mapper接口中的方法名一致,并保持返回类型一致…...

计算机网络复习——概念强化作业
物理层负责网络通信的二进制传输 用于将MAC地址解析为IP地址的协议为RARP。 一个交换机接收到一帧,其目的地址在它的MAC地址表中查不到,交换机应该向除了来的端口外的所有其它端口转发。 关于ICMP协议,下面的论述中正确的是ICMP可传送IP通信过程中出现的错误信息。 在B类网络…...

用友BIP与旺店通数据集成方案解析
用友BIP与旺店通企业奇门的供应商集成同步方案 在现代企业的数据管理中,跨平台的数据集成是实现高效业务运作的关键环节。本文将分享一个实际案例:如何通过轻易云数据集成平台,将用友BIP系统中的供应商数据无缝对接到旺店通企业奇门…...

string类函数的手动实现
在上一篇文章中,我们讲解了一些string类的函数,但是对于我们要熟练掌握c是远远不够的,今天,我将手动实现一下这些函数~ 注意:本篇文章中会大量应用复用,这是一种很巧妙的方法 和以往一样,还是…...

Oceanbase离线集群部署
准备工作 两台服务器 服务器的配置参照官网要求来 服务器名配置服务器IPoceanbase116g8h192.168.10.239oceanbase216g8h192.168.10.239 这里选oceanbase1作为 obd机器 oceanbase安装包 选择社区版本的时候自己系统的安装包 ntp时间同步rpm包 联网机器下载所需的软件包 …...

transformers生成式对话机器人
简介 生成式对话机器人是一种先进的人工智能系统,它能够通过学习大量的自然语言数据来模拟人类进行开放、连贯且创造性的对话。与基于规则或检索式的聊天机器人不同,生成式对话机器人并不局限于预定义的回答集,而是可以根据对话上下文动态地…...

WPF中的VisualState(视觉状态)
以前在设置控件样式或自定义控件时,都是使用触发器来进行样式更改。触发器可以在属性值发生更改时启动操作。 像这样: <Style TargetType"ListBoxItem"><Setter Property"Opacity" Value"0.5" /><Setter …...
C#设计模式--状态模式(State Pattern)
状态模式是一种行为设计模式,它允许对象在其内部状态发生变化时改变其行为。这种模式的核心思想是将状态封装在独立的对象中,而不是将状态逻辑散布在整个程序中。 用途 简化复杂的条件逻辑:通过将不同的状态封装在不同的类中,可…...

〔 MySQL 〕索引
目录 1. 没有索引,可能会有什么问题 2. 认识磁盘 MySQL与存储 先来研究一下磁盘: 在看看磁盘中一个盘片编辑 扇区 定位扇区编辑 结论 磁盘随机访问(Random Access)与连续访问(Sequential Access) 3. MySQL 与磁盘交互基本单位 4. 建立共识…...

计算机网络研究实训室建设方案
一、概述 本方案旨在规划并实施一个先进的计算机网络研究实训室,旨在为学生提供一个深入学习、实践和研究网络技术的平台。实训室将集教学、实验、研究于一体,覆盖网络基础、网络架构、网络安全、网络管理等多个领域,以培养具备扎实理论基础…...

韩企研学团造访图为科技:共探人工智能创新前沿
今日,一支由韩国知名企业研学专家组成的代表团莅临图为科技深圳总部,展开了一场深度技术交流与研讨活动。 此次访问旨在通过实地探访中国领先的科技企业,促进中韩两国在科技创新领域的深入合作与交流。 韩国游学团合影 图为科技作为一家在人…...
html button 按钮单选且 高亮
<DIV class"middle"> <div class"containerTarget"> <span class"hover-target1" οnclick"btn(1);">韵达 </span> <span class"hover-target2" οnclick"btn(2);">中通 </span…...
图片上传HTML
alioss sky:jwt:# 设置jwt签名加密时使用的秘钥admin-secret-key: itcast# 设置jwt过期时间admin-ttl: 7200000# 设置前端传递过来的令牌名称admin-token-name: tokenalioss:endpoint: ${sky.alioss.endpoint}access-key-id: ${sky.alioss.access-key-id}access-key-secret: $…...
C++学习-函数
C 函数 目录 函数默认参数引用传参函数重载 数量不同类型不同 内联函数 函数默认参数 #include<iostream>using std::cout; using std::endl;int power(int n, int x2); // x2 是默认参数int main() {cout << power(5) << endl; // 没有传 x 的值&#x…...
spring boot 测试 mybatis mapper类
spring boot 测试 mybatis mapper类 针对 mybatis plus不启动 webserver指定加载 xml 【过滤 “classpath*:/mapper/**/*.xml” 下的xml】, mapper xml文件名和mapper java文件名称要一样,是根据文件名称过滤的。默认情况加载和解析所有mapper.xml 自定义 MapperT…...

远程游戏新体验!
在这个数字化的时代,游戏已经不仅限于家里的电视或书房的电脑了。远程游戏,也就是通过远程控制软件在不同地点操作游戏设备,给玩家带来了前所未有的自由和灵活性。RayLink远程控制软件,凭借其出色的性能和专为游戏设计的功能&…...

Let up bring up a linux.part2 [十一]
之前的篇幅中我们已经将 Linux 内核 bringup 起来了,不知道大家有没有去尝试将根文件系统运行起来,今天我就带领大家完成这个事情,可以跟着下面的步骤一步步来完成: 在这里我们使用 busybox 构建 rootfs: 下载 busyb…...

调用大模型api 批量处理图像 保存到excel
最近需要调用大模型,并将结果保存到excel中,效果如下: 代码: import base64 from zhipuai import ZhipuAI import os import pandas as pd from openpyxl import Workbook from openpyxl.drawing.image import Image from io i…...
应用升级/灾备测试时使用guarantee 闪回点迅速回退
1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...

家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...

OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...