当前位置: 首页 > news >正文

RocketMq基础学习+SpringBoot集成

学习贴:参考https://blog.csdn.net/zhiyikeji/article/details/138286088

文章目录

    • 普通消息
    • 顺序消息
    • 延迟消息
    • 批量消息
    • 事务消息
  • SpringBoot整合RocketMQ

在这里插入图片描述

3.1 NameServer
NameServer是一个简单的路由注册中心,支持Topic和Broker的动态注册和发现。作用主要包括两点:

Broker管理:Broker会把集群信息注册到NameServer上,NameServer会把这些信息记录下来,作为路由信息的基本数据。然后还会提供心态检测机制,检查Broker是否还存活
路由信息:因为NameServer存放的有Broker集群的基本信息(例如有哪些Borker可用,以及Broker下的队列信息),所以Producer和Consumer就可以通过NameServer知道整个Broker集群信息,生产者生产的信息就可以知道往哪个Broker下的哪个Message queue队列中投递消息,消费者也可以通过自己的配置信息去哪个broker下哪个队列中去拉取消息进行消费。
NameServer是无状态的,且NameServer集群下各个NameServer是互相不通信的,没有任何信息同步操作。每个Broker都会与NameServer集群下的每一个NameServer节点建立长链接,然后会向每一个NameServer注册自己的路由信息,所以每个NamerServer下都保存了Broker集群的完整路由信息。当某个NameServer节点挂掉后,消费者和生产者也可以通过其他NameServer获取到Broker的完整信息,所以大部分情况下NameServer通常会部署多个实例

Broker代理服务器
主要负责生产者生产消息的存储,为消费者拉取信息提供查询,也会存储消息相关的一些其他数据,例如Topic信息、队列信息、消费进度偏移量等。
而且Broker是服务高可用的保证:相对于NameServer来说,Broker的部署会相对于复杂一些。

普通主从集群模式:
这种集群下会给每个节点分配特定的角色,分为Master和Slave,一个Master可以对应多个Slave但是一个Slave只能对应一个Master,master负责响应生成存储消息的请求,并存储消息。slave负责储存从主节点同步过来的数据(可以选择同步或者异步),我们可以通过配置conf目录下的配置文件来选择如何配置。我们可以通过指定相同的BrokerName,不同的BrokerId来制定一个Broker集群。BrokerId中0代表master,非0代表slave。但是这种模式下的弊端就是各个节点的角色无法切换,如果一个master挂掉后,这一组的Broker就不可用了
Dledger高可用集群:
这个是RocketMQ4.5版本后提供的一种集群高可用模式,这个模式下会随机选举出一个节点作为master,当master节点挂了后,会通过Raft机制然后会从slave节点中选择一个节点升级为master。

普通消息

发送消息的方式有同步、异步、单向三种方式

同步方式:同步方式是最常用的方式,通常是发送一条消息后早收到服务端同步响应之后,然后再发送下一条消息,可以用于一些比较重要的场景,例如:短信发送
异步方式:发送一条消息到服务端后不需要等到服务端响应,就可以继续发送下一条消息,但是需要注册回调接口,然后通过回调接口获取服务端的响应,经常用于一些链路执行过长的场景
单向方式:不需要等待服务端响应也不需要注册回调接口,就可以继续发送下一条消息,耗时非常短,通常适用于耗时短但是可靠性要求没那么高的场景。
发送消息的步骤大概如下:

  1. 创建生产者:普通消息通常都是DefaultProducer,然后设置一个produceGroup的名字
  2. 注册NameServer地址:可以setNameServer(),如果是多个中间以;分开
  3. 构建消息体:Topic、tag、keys、消息体等信息
  4. 发送消息:通过send()方法发送消息

顺序消息

顺序消息是对生产者生产消息和消费者消费消息的顺序有严格要求的。
但是RocketMQ并不能保证所有消息的有序性,因为默认情况下一个Topic下的消息会发送到不同的Message queue上,消费者也会从不同的Message queue上拉取消息,这种情况下是不能保证有序的。
RocketMQ的有序性是要保证Producer、Broker、Consumer三者的有序性,严格按照FIFO方式来对消息进行处理,我们可以从生产者把消息发送到同一个Message queue上,所以只能有一个生产者,因为多个生产者的生产的消息是无法有序的,并且生成者发送消息不能选择多线程的方式。然后消费者可以注册一个MessageListenerOrderly(),在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。

			int orderId = i & 10;try {Message message = new Message("TopicTest",tags[i % tags.length],"KEYS"+i,"BODY".getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置一个MessageQueueSelector队列选择器SendResult sendResult = orderMessageProducer.send(message, new MessageQueueSelector() {//list 消息队列列表,args 我们传入的orderId@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object args) {Integer id = (Integer)args;//这样可以确保相同orderId的消息总是发送到相同的队列,实现消息的顺序性return list.get( id%list.size() );}}, orderId);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}

延迟消息

延时消息是生产者生产的消息发送到服务端后,并不希望马上被消费,而是希望延迟一段时间后才被消费

		try {Message message = new Message("TopicTest","tags","OrderId" + i,"test".getBytes(RemotingHelper.DEFAULT_CHARSET));//设置延迟级别message.setDelayTimeLevel(3);SendResult sendResult = scheduledMessageProducer.send(message);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}

批量消息

可以把消息整合到一批后在进行发送,可以增加吞吐率,并减少API和网络调用次数

        ArrayList<Message> messages = new ArrayList<Message>();messages.add(new Message("TopicTest", "tag1", "1", "test1".getBytes()));messages.add(new Message("TopicTest", "tag2", "2", "test".getBytes()));messages.add(new Message("TopicTest", "tag3", "3", "test3".getBytes()));SendResult sendResult = batchProducer.send(messages);

事务消息

事务消息是在分布式系统中 保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两 个操作的原子性,也就是这两个操作一起成功或者一起失败。
对于一些数据具有强一致性场景的情况下,例如上游订单付款成功后,下游才可以进行积分变更、物流发货、购物车状态变更。这种类似场景下可以选择事务消息。

SpringBoot整合RocketMQ

创建一个MAVEN项目,引入依赖:

<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.3.10.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.10.RELEASE</version></dependency>
</dependencies>

创建一个配置类:application.properties
#NameServer地址
rocketmq.name-server=ip地址:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup

创建一个启动类:

@SpringBootApplication
public class RocketMqApplication {public static void main(String[] args) {SpringApplication.run(RocketMqApplication.class,args);}
}

创建一个生产者类:

@Component
public class SpringProducer {@ResourceRocketMQTemplate rocketMqTemplate;public void sendMessage(String topic, String message){rocketMqTemplate.convertAndSend(topic,message);}
}

创建一个消费者类:

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TopicTest")
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println(s);}
}

创建一个Controller类:

@RestController
public class MqSendController {@Resourceprivate SpringProducer springProducer;@Value("${producer.topic}")String topic;@GetMapping("/send")public void sendMessage(@RequestParam("message") String message){springProducer.sendMessage(topic,message);}
}

然后访问:http://localhost:8080/send?message=test

相关文章:

RocketMq基础学习+SpringBoot集成

学习贴&#xff1a;参考https://blog.csdn.net/zhiyikeji/article/details/138286088 文章目录 普通消息顺序消息延迟消息批量消息事务消息 SpringBoot整合RocketMQ 3.1 NameServer NameServer是一个简单的路由注册中心&#xff0c;支持Topic和Broker的动态注册和发现。作用主…...

分布式cap

P&#xff08;分区安全&#xff09;都能保证&#xff0c;就是在C&#xff08;强一致&#xff09;和A&#xff08;性能&#xff09;之间做取舍。 &#xff08;即立马做主从同步&#xff0c;还是先返回写入结果等会再做主从同步。类似的还有&#xff0c;缓存和db之间的同步。&am…...

mybatis-xml映射文件及mybatis动态sql

规范 XML映射文件的名称与Mapper接口名称一致&#xff0c;并且将XML映射文件和Mapper接口放置在相同包下(同包同名&#xff09;。 XML映射文件的namespace属性为Mapper接口全限定名一致。 XML映射文件中sql语句的id与Mapper接口中的方法名一致&#xff0c;并保持返回类型一致…...

计算机网络复习——概念强化作业

物理层负责网络通信的二进制传输 用于将MAC地址解析为IP地址的协议为RARP。 一个交换机接收到一帧,其目的地址在它的MAC地址表中查不到,交换机应该向除了来的端口外的所有其它端口转发。 关于ICMP协议,下面的论述中正确的是ICMP可传送IP通信过程中出现的错误信息。 在B类网络…...

用友BIP与旺店通数据集成方案解析

用友BIP与旺店通企业奇门的供应商集成同步方案 在现代企业的数据管理中&#xff0c;跨平台的数据集成是实现高效业务运作的关键环节。本文将分享一个实际案例&#xff1a;如何通过轻易云数据集成平台&#xff0c;将用友BIP系统中的供应商数据无缝对接到旺店通企业奇门&#xf…...

string类函数的手动实现

在上一篇文章中&#xff0c;我们讲解了一些string类的函数&#xff0c;但是对于我们要熟练掌握c是远远不够的&#xff0c;今天&#xff0c;我将手动实现一下这些函数~ 注意&#xff1a;本篇文章中会大量应用复用&#xff0c;这是一种很巧妙的方法 和以往一样&#xff0c;还是…...

Oceanbase离线集群部署

准备工作 两台服务器 服务器的配置参照官网要求来 服务器名配置服务器IPoceanbase116g8h192.168.10.239oceanbase216g8h192.168.10.239 这里选oceanbase1作为 obd机器 oceanbase安装包 选择社区版本的时候自己系统的安装包 ntp时间同步rpm包 联网机器下载所需的软件包 …...

transformers生成式对话机器人

简介 生成式对话机器人是一种先进的人工智能系统&#xff0c;它能够通过学习大量的自然语言数据来模拟人类进行开放、连贯且创造性的对话。与基于规则或检索式的聊天机器人不同&#xff0c;生成式对话机器人并不局限于预定义的回答集&#xff0c;而是可以根据对话上下文动态地…...

WPF中的VisualState(视觉状态)

以前在设置控件样式或自定义控件时&#xff0c;都是使用触发器来进行样式更改。触发器可以在属性值发生更改时启动操作。 像这样&#xff1a; <Style TargetType"ListBoxItem"><Setter Property"Opacity" Value"0.5" /><Setter …...

C#设计模式--状态模式(State Pattern)

状态模式是一种行为设计模式&#xff0c;它允许对象在其内部状态发生变化时改变其行为。这种模式的核心思想是将状态封装在独立的对象中&#xff0c;而不是将状态逻辑散布在整个程序中。 用途 简化复杂的条件逻辑&#xff1a;通过将不同的状态封装在不同的类中&#xff0c;可…...

〔 MySQL 〕索引

目录 1. 没有索引&#xff0c;可能会有什么问题 2. 认识磁盘 MySQL与存储 先来研究一下磁盘&#xff1a; 在看看磁盘中一个盘片​编辑 扇区 定位扇区​编辑 结论 磁盘随机访问(Random Access)与连续访问(Sequential Access) 3. MySQL 与磁盘交互基本单位 4. 建立共识…...

计算机网络研究实训室建设方案

一、概述 本方案旨在规划并实施一个先进的计算机网络研究实训室&#xff0c;旨在为学生提供一个深入学习、实践和研究网络技术的平台。实训室将集教学、实验、研究于一体&#xff0c;覆盖网络基础、网络架构、网络安全、网络管理等多个领域&#xff0c;以培养具备扎实理论基础…...

韩企研学团造访图为科技:共探人工智能创新前沿

今日&#xff0c;一支由韩国知名企业研学专家组成的代表团莅临图为科技深圳总部&#xff0c;展开了一场深度技术交流与研讨活动。 此次访问旨在通过实地探访中国领先的科技企业&#xff0c;促进中韩两国在科技创新领域的深入合作与交流。 韩国游学团合影 图为科技作为一家在人…...

html button 按钮单选且 高亮

<DIV class"middle"> <div class"containerTarget"> <span class"hover-target1" οnclick"btn(1);">韵达 </span> <span class"hover-target2" οnclick"btn(2);">中通 </span…...

图片上传HTML

alioss sky:jwt:# 设置jwt签名加密时使用的秘钥admin-secret-key: itcast# 设置jwt过期时间admin-ttl: 7200000# 设置前端传递过来的令牌名称admin-token-name: tokenalioss:endpoint: ${sky.alioss.endpoint}access-key-id: ${sky.alioss.access-key-id}access-key-secret: $…...

C++学习-函数

C 函数 目录 函数默认参数引用传参函数重载 数量不同类型不同 内联函数 函数默认参数 #include<iostream>using std::cout; using std::endl;int power(int n, int x2); // x2 是默认参数int main() {cout << power(5) << endl; // 没有传 x 的值&#x…...

spring boot 测试 mybatis mapper类

spring boot 测试 mybatis mapper类 针对 mybatis plus不启动 webserver指定加载 xml 【过滤 “classpath*:/mapper/**/*.xml” 下的xml】, mapper xml文件名和mapper java文件名称要一样&#xff0c;是根据文件名称过滤的。默认情况加载和解析所有mapper.xml 自定义 MapperT…...

远程游戏新体验!

在这个数字化的时代&#xff0c;游戏已经不仅限于家里的电视或书房的电脑了。远程游戏&#xff0c;也就是通过远程控制软件在不同地点操作游戏设备&#xff0c;给玩家带来了前所未有的自由和灵活性。RayLink远程控制软件&#xff0c;凭借其出色的性能和专为游戏设计的功能&…...

Let up bring up a linux.part2 [十一]

之前的篇幅中我们已经将 Linux 内核 bringup 起来了&#xff0c;不知道大家有没有去尝试将根文件系统运行起来&#xff0c;今天我就带领大家完成这个事情&#xff0c;可以跟着下面的步骤一步步来完成&#xff1a; 在这里我们使用 busybox 构建 rootfs&#xff1a; 下载 busyb…...

调用大模型api 批量处理图像 保存到excel

最近需要调用大模型&#xff0c;并将结果保存到excel中&#xff0c;效果如下&#xff1a; 代码&#xff1a; import base64 from zhipuai import ZhipuAI import os import pandas as pd from openpyxl import Workbook from openpyxl.drawing.image import Image from io i…...

Android tinyalsa深度解析之pcm_params_get_period_size_max调用流程与实战(一百七十二)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》作者 博主新书推荐&#xff1a;《Android系统多媒体进阶实战》&#x1f680; Android Audio工程师专栏地址&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; Android多媒体专栏地址&a…...

一键搭建AI对话系统:通义千问1.5-1.8B-Chat-GPTQ-Int4镜像使用指南

一键搭建AI对话系统&#xff1a;通义千问1.5-1.8B-Chat-GPTQ-Int4镜像使用指南 想快速拥有一个属于自己的AI对话助手吗&#xff1f;今天要介绍的这个方法&#xff0c;可能比你想象中简单得多。不用折腾复杂的模型下载&#xff0c;不用配置繁琐的运行环境&#xff0c;更不用写一…...

51单片机学习(五)数码管显示

如有大佬发现我文章里的错误&#xff0c;希望多多指出&#xff0c;或者有缺少的也欢迎告诉我&#xff0c;我会尽快补充上去的&#xff0c;感谢各位的支持&#xff0c;要互三的d我哦&#xff01;一.数码管数码管显示屏和U4 74HC245U574H138译码器一位数码管引脚定义一个数码管由…...

如何获取网易云音乐永久链接:终极免费解决方案指南

如何获取网易云音乐永久链接&#xff1a;终极免费解决方案指南 【免费下载链接】netease-cloud-music-api 网易云音乐直链解析 API 项目地址: https://gitcode.com/gh_mirrors/ne/netease-cloud-music-api 你是否曾经遇到过这样的烦恼&#xff1a;好不容易找到一首喜欢的…...

软文SEO的常见指标有哪些_如何撰写有吸引力的软文标题

<h2>软文SEO的常见指标有哪些</h2> <p>在当今的数字营销领域&#xff0c;软文&#xff08;Soft Article&#xff09;已经成为推动网站流量和品牌知名度的重要工具。要让软文真正发挥作用&#xff0c;我们必须了解软文SEO的常见指标&#xff0c;这些指标可以帮…...

深度探索:开源工具OpenCore Legacy Patcher技术揭秘与完整指南

深度探索&#xff1a;开源工具OpenCore Legacy Patcher技术揭秘与完整指南 【免费下载链接】OpenCore-Legacy-Patcher Experience macOS just like before 项目地址: https://gitcode.com/GitHub_Trending/op/OpenCore-Legacy-Patcher 随着苹果系统持续演进&#xff0c;…...

AntdUI实战:用WinForm和.NET 6给老旧内部管理系统“换肤”的完整记录

AntdUI实战&#xff1a;用WinForm和.NET 6给老旧内部管理系统“换肤”的完整记录 当企业内部的WinForm系统运行超过十年&#xff0c;那些灰底蓝框的界面早已与现代审美格格不入。去年接手某制造业ERP系统改造时&#xff0c;我面对的是一个基于.NET Framework 4.0的"古董&q…...

FlowState Lab结合计算机网络概念:模拟智能网络配置助手

FlowState Lab结合计算机网络概念&#xff1a;模拟智能网络配置助手 1. 网络运维的痛点与AI解决方案 网络工程师每天都要面对复杂的网络环境和层出不穷的故障问题。传统排错流程往往需要工程师手动检查设备配置、分析日志信息、查阅技术文档&#xff0c;这个过程耗时耗力且容…...

Simulink仿真速度太慢?试试用C Mex S函数给模型“提提速”

Simulink性能优化实战&#xff1a;用C Mex S函数突破仿真速度瓶颈 当Simulink模型运行缓慢时&#xff0c;工程师们常常陷入漫长的等待。本文将揭示如何通过C Mex S函数这一利器&#xff0c;将仿真速度提升10倍以上&#xff0c;特别适合处理复杂算法、图像处理和大规模系统仿真等…...

Java函数计算部署被低估的致命风险:类加载冲突、内存泄漏、上下文丢失——3个真实P0故障复盘

第一章&#xff1a;Java函数计算部署被低估的致命风险&#xff1a;类加载冲突、内存泄漏、上下文丢失——3个真实P0故障复盘在Serverless架构下&#xff0c;Java函数计算因其启动慢、内存占用高而常被“降级使用”&#xff0c;但更隐蔽的风险来自运行时环境的不可见性。我们复盘…...