029-从零搭建微服务-消息队列(一)
写在最前
如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。
源码地址(后端):mingyue: 🎉 基于 Spring Boot、Spring Cloud & Alibaba 的分布式微服务架构基础服务中心
源码地址(前端):mingyue-ui: 🎉 基于 Vue3 + TS + Vite + Element plus 等技术,适配 MingYue 后台微服务
文档地址:Wiki - Gitee.com
消息队列
消息队列(Message Queue)是一种用于在分布式系统中进行异步通信的通信模式和技术。它允许不同的组件或服务之间通过发送和接收消息来进行通信,而无需直接耦合它们的实现细节。消息队列通常用于解耦系统的不同部分,提高系统的可伸缩性、可靠性和灵活性。
以下是消息队列的一些关键特点和概念:
-
消息生产者(Producer): 这是向消息队列发送消息的组件或应用程序。生产者将消息发送到队列中,通常包括一些有关消息内容的元数据。
-
消息队列(Queue): 这是用于存储消息的中间件组件,消息在这里排队等待被处理。消息队列通常支持不同的消息传递模式,例如先进先出(FIFO)或发布/订阅模式。
-
消息消费者(Consumer): 这是从消息队列接收消息并进行处理的组件或应用程序。消费者订阅特定队列,并在有新消息可用时接收并处理它们。
-
消息代理(Message Broker): 这是协调消息的发送和接收的中间件服务。消息代理通常负责消息的路由、传递和确保消息的可靠性。
-
消息确认(Acknowledgment): 消费者在成功处理消息后,通常会向消息队列发送确认,以告知队列消息已被处理。这确保了消息不会被重复处理。
-
消息持久性(Message Durability): 消息队列通常支持消息的持久性,这意味着即使在消息被传递给消费者之后,消息仍然会在系统中存储,以确保不会丢失。
-
消息超时(Message Timeout): 有时候,消息队列会设置消息的超时时间,以确保消息在一定时间内被处理,否则可能会被认为是过期消息。
-
发布/订阅模式(Publish/Subscribe): 这是一种消息传递模式,其中生产者将消息发布到一个主题(topic),而不是特定的队列,然后多个消费者订阅该主题以接收消息。这种模式支持广播消息。
使用场景
-
异步通信:允许不同的系统组件异步通信,提高系统的响应性能。
-
解耦组件:降低系统中不同组件之间的耦合,使得系统更容易维护和扩展。
-
负载均衡:通过分发消息给多个消费者来平衡工作负载。
-
消息传递可靠性:确保消息的可靠传递,即使在系统中的故障情况下也能保证不丢失消息。
-
日志和审计:用于记录和审计系统活动,以便后续分析和故障排除。
技术选型
一些常见的消息队列实现包括 RabbitMQ、RocketMQ、Kafka等,选择适合特定应用场景的消息队列是关键,因为它会影响系统的性能、可靠性和可扩展性。不同的场景可能更适合不同的消息队列系统。
基础对比
| RabbitMQ | RocketMQ | Kafka | |
|---|---|---|---|
| 推出时间 | 2007年 | 2012年 | 2012年 |
| 所属 | Pivotal开源,Mozilla | 阿里开源,Apache | Linkin开源,Apache |
| 社区活跃度 | 高 | 高 | 高 |
| 开发语言 | Erlang | Java | Scala、Java |
| 支持的协议 | AMQP | 自己定义一套 | 自行定义一套(基于TCP) |
| 吞吐量 | 万级(5.95w/s) | 十万级(11.6w/s) | 十万级(17.3w/s) |
| topic数量对吞吐量的影响 | topic达到几百,几千个时,吞吐量会有较小幅度的下降 | topic达到几十,几百个时,吞吐量会大幅度下降 | |
| 时效性 | 微秒级 | 毫秒级 | 毫秒级 |
| 可用性 | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
| 使用场景 | 适用于各种规模的应用程序,尤其适合需要多语言支持的场景。 | 适用于大规模的企业应用和互联网场景,尤其在阿里巴巴等大型公司中得到广泛应用。 | 适用于大数据处理、实时数据流分析、事件溯源等高吞吐量场景。 |
功能对比
| RabbitMQ | RocketMQ | Kafka | |
|---|---|---|---|
| 延迟队列 | ✅ | ✅ | ❌ |
| 死信队列 | ✅ | ✅ | ❌ |
| 优先级队列 | ✅ | ❌ | ❌ |
| 消息回溯 | ❌ | ✅ | ✅ |
| 消焦持久化 | ✅ | ✅ | ✅ |
| 消魚确认机制 | 单条 | Offset | Offset |
| 消息TTL | ✅ | ✅ | ❌ |
| 消息重复 | 支持at least once、at most once | 支持at least once | 支持at least once、at most once |
| 消息顺序性 | ❌ | 消费者加锁 | 分区有序 |
| 消息事务 | ❌ | ✅ | ❌ |
| 消息过滤 | ❌ | ✅ | ❌ |
| 消息查询 | ✅ | ✅ | ❌ |
| 消息重新消费 | ❌ | ✅ | ✅ |
| 消费模式 | 队列模式 | 广播模式+集群模式 | 流模式 |
| 消费推拉模式 | Pull、Push | Pull、Push | Pull |
| 批量发送 | ❌ | ✅ | ✅ |
选型总结
通过对RabbitMQ、RocketMQ、Kafka 基础与功能两个维度对比,本项目将采用 RocketMQ、Kafka 两个消息队列。
RocketMQ 适用场景
-
高性能、高可用性的消息传递场景,例如实时数据分析、电商秒杀等。
-
需要强大的消息过滤和消息追踪功能的场景,例如广告投放、用户推送等。
-
需要分布式事务支持的场景,RocketMQ提供了分布式事务消息特性。
Kafka 适用场景
-
需要高吞吐量和低延迟的实时数据处理场景,例如用户行为日志分析、实时监控等。
-
需要保留大量历史数据并支持数据回溯的场景,例如大数据分析、数据仓库等。
-
需要构建事件驱动架构的场景,Kafka可以作为事件源和消息总线。
Docker 安装 RocketMQ
创建目录结构
具体内容可以参考:mingyue/docker/rocketmq
rocketmq/broker1/confbroker.conf/logsREADME.md/storeREADME.md/namesrv/logsREADME.md docker-compose.yml
编写 docker-compose rocketmq 服务
version: '3.8' services:mingyue-mqnamesrv:image: apache/rocketmq:4.9.4container_name: mingyue-mqnamesrvports:- "9876:9876"environment:JAVA_OPT: -server -Xms512m -Xmx512mcommand: sh mqnamesrvvolumes:- ./rocketmq/namesrv/logs:/home/rocketmq/logs/rocketmqlogs mingyue-mqbroker1:image: apache/rocketmq:4.9.4container_name: mingyue-mqbroker1ports:- "10911:10911"- "10909:10909"- "10912:10912"environment:JAVA_OPT_EXT: -server -Xms512M -Xmx512M -Xmn256mcommand: sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.confdepends_on:- mingyue-mqnamesrvvolumes:- ./rocketmq/broker1/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf- ./rocketmq/broker1/logs:/home/rocketmq/logs/rocketmqlogs- ./rocketmq/broker1/store:/home/rocketmq/store mingyue-mqconsole:image: styletang/rocketmq-console-ngcontainer_name: mingyue-mqconsoleports:- "19876:19876"links:- mingyue-mqnamesrv:mqnamesrv #可以用mqnamesrv这个域名访问rocketmq服务environment:JAVA_OPTS: -Dserver.port=19876 -Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=falsedepends_on:- mingyue-mqnamesrv
启动测试
启动前先执行部分目录赋予读写权限,例:
chmod 777 /docker/rocketmq/broker1/logs
访问 mingyue-mqconsole 可以打开 Dashboard 页面即可:http://ip:19876/#/
Docker 安装 Kafka
创建目录结构
具体内容可以参考:mingyue/docker/kafka
kafka/dataREADME.md docker-compose.yml
编写 docker-compose kafka 服务
version: '3.8' services:mingyue-zookeeper:image: 'bitnami/zookeeper:3.8.0'container_name: mingyue-zookeeperports:- "2181:2181"environment:TZ: Asia/ShanghaiALLOW_ANONYMOUS_LOGIN: "yes"ZOO_SERVER_ID: 1ZOO_PORT_NUMBER: 2181# 自带的控制台 一般用不上可自行开启ZOO_ENABLE_ADMIN_SERVER: "no"# 自带控制台的端口ZOO_ADMIN_SERVER_PORT_NUMBER: 8080 mingyue-kafka:image: 'bitnami/kafka:3.2.0'container_name: mingyue-kafkaports:- "9092:9092"environment:TZ: Asia/Shanghai# 更多变量 查看文档 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.mdKAFKA_BROKER_ID: 1# 监听端口KAFKA_CFG_LISTENERS: PLAINTEXT://:9092# 实际访问ip 本地用 127 内网用 192 外网用 外网ipKAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://宿主机IP:9092KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181ALLOW_PLAINTEXT_LISTENER: "yes"volumes:- /docker/kafka/data:/bitnami/kafka/datadepends_on:- mingyue-zookeeperlinks:- mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务 mingyue-kafka-manager:image: sheepkiller/kafka-manager:latestcontainer_name: mingyue-kafka-managerports:- "19092:19092"environment:ZK_HOSTS: mingyue-zookeeper:2181APPLICATION_SECRET: letmeinKAFKA_MANAGER_USERNAME: mingyueKAFKA_MANAGER_PASSWORD: mingyue123KM_ARGS: -Dhttp.port=19092depends_on:- mingyue-kafkalinks:- mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务
启动测试
启动前先执行部分目录赋予读写权限,例:chmod 777 /docker/kafka/data`
访问 mingyue-kafka-manager 可以打开 Clusters 页面即可:http://mingyue-mq:19092/
Spring Cloud Stream
Spring Cloud Stream 是一个用于构建与共享消息系统连接的高度可扩展的事件驱动微服务的框架。该框架提供了一个基于已经建立和熟悉的 Spring 成语和最佳实践的灵活编程模型,包括支持持久的 pub/sub 语义、消费者组和有状态分区。
说人话:Spring Cloud Stream 是 Spring 用来整合各种 MQ 中间件的框架。
Spring Cloud Stream的核心构建块
-
Destination Binders(目标绑定器):目标指的是 Kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
-
Destination Bindings(目标绑定):MQ 中间件与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
-
Message(消息):一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。
Spring Cloud Stream 架构图
Spring Cloud Stream 应用程序由中间件中立的核心组成。该应用程序通过在外部代理暴露的目的地和代码中的输入/输出参数之间建立绑定,与外部世界进行通信。建立绑定所需的经纪人特定细节由特定于中间件的 Binder 实现处理。

-
Middleware:消息中间件,如RabbitMQ、Kafka、RocketMQ等。
-
Binder:可以认为是适配器,用来将Stream与中间件连接起来,不同的Binder对应不同的中间件,需要我们配置。
-
Application:由Stream封装的消息机制,很少自定义开发。
-
Inputs:输入,可以自定义开发。
-
Outputs:输出,可以自定义开发。
小结
本节介绍了什么是消息队列、以及选择什么样的消息队列,如何对比,最终选择了 Kafka 与 RocketMQ。然后给出了 Docker 一件部署 Kafka 与 RocketMQ 的 docker-compose 脚本。阐述了什么是 Spring Cloud Stream,未来将会使用 Spring Cloud Stream 作为 MQ 中间价的框架。
下面我们就使用 Spring Cloud Stream 来搭建代码与 MQ 之间的桥梁~~~
相关文章:
029-从零搭建微服务-消息队列(一)
写在最前 如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。 源码地址(后端):mingyue: 🎉 基于 Spring Boot、Spring Cloud & Alibaba 的分布式微服务架构基础服务中心 源…...
Python2020年06月Python二级 -- 编程题解析
题目一 数字转汉字 用户输入一个1~9(包含1和9)之间的任一数字,程序输出对应的汉字。 如输入2,程序输出“二”。可重复查询。 答案: 方法一 list1[一,二,三,四,五,六,七,八,九] while True:n int(input(请输入1~9之间任意一个数字…...
差分放大器的精髓:放大差模信号 抑制共模信号
参考如图基本的差分放大电路,在R1R2 R3R4的条件下,其输出与输入的关系为 : 具体推导过程参考:差分运算放大器的放大倍数的计算及结论_正在黑化的KS的博客-CSDN博客 由这个式子我们可以发现,差分放大器放大的是同相端与…...
蓝桥等考Python组别九级006
第一部分:选择题 1、Python L9 (15分) 运行下面程序,可以输出几行“*”?( ) for i in range(6): for j in range(7): print(*, end ) print() 5678 正确答案:B 2、Python …...
初级篇—第五章子查询
文章目录 什么是子查询需求分析与问题解决子查询的基本语法结构子查询的分类 单行子查询单行比较操作符代码示例HAVING 中的子查询CASE中的子查询子查询中的空值问题非法使用子查询 多行子查询多行比较操作符代码示例空值问题 相关子查询代码示例在ORDER BY 中使用子查询EXISTS…...
【AntDesign】封装全局异常处理-全局拦截器
[toc] 场景 本文前端用的是阿里的Ant-Design框架,其他框架也有全局拦截器,思路是相同,具体实现自行百度下吧 因为每次都需要调接口,都需要单独处理异常情况(code !0),因此前端需要对后端返回的…...
Visual Studio 代码显示空格等空白符
1.VS2010: 快捷键:CtrlR,W 2.VS2017、VS2019、VS2022: 工具 -> 选项 -> 文本编辑器 -> 显示 -> 勾选查看空白...
紫光同创FPGA图像视频采集系统,基于OV7725实现,提供工程源码和技术支持
目录 1、前言免责声明 2、设计思路框架视频源选择OV7725摄像头配置及采集动态彩条HDMA图像缓存输入输出视频HDMA缓冲FIFOHDMA控制模块HDMI输出 3、PDS工程详解4、上板调试验证并演示准备工作静态演示动态演示 5、福利:工程源码获取 紫光同创FPGA图像视频采集系统&am…...
京东大型API网关实践之路
概述 1、背景 京东作为电商平台,近几年用户、业务持续增长,访问量持续上升,随着这些业务的发展,API网关应运而生。 API网关,就是为了解放客户端与服务端而存在的。对于客户端,使开放给客户端的接口标准统…...
图像处理: 马赛克艺术
马赛克 第一章 马赛克的历史渊源 1.1 马赛克 艺术中的一种表面装饰,由紧密排列的、通常颜色各异的小块材料(如石头、矿物、玻璃、瓷砖或贝壳)组成。与镶嵌不同的是,镶嵌是将要应用的部件放置在已挖空以容纳设计的表面中࿰…...
postgresql-管理数据表
postgresql-管理数据表 创建表数据类型字段约束表级约束模式搜索路径 修改表添加字段删除字段添加约束删除约束修改字段默认值修改字段数据类型重命名字段重命名表 删除表 创建表 在 PostgreSQL 中,使用 CREATE TABLE 语句创建一个新表: CREATE TABLE …...
Llama2-Chinese项目:3.1-全量参数微调
提供LoRA微调和全量参数微调代码,训练数据为data/train_sft.csv,验证数据为data/dev_sft.csv,数据格式如下所示: "<s>Human: "问题"\n</s><s>Assistant: "答案举个例子,如下所…...
蓝桥等考Python组别十级001
第一部分:选择题 1、Python L10 (15分) 已知s = Hello!,下列说法正确的是( )。 s[1]对应的字符是Hs[2]对应的字符是ls[-1]对应的字符是os[3]对应的字符是o正确答案:B 2、Python L10 (15分) 运行下面程序,输入字符串“Banana”,输出的结果是&#x...
记录 Git 操作时遇到的问题及解决方案
目录 问题:git pull 时报错报错内容: ! [rejected] v1.0.3 -> v1.0.3 (would clobber existing tag)原因:本地 Git 仓库中已经存在名为 v1.0.3 和 v1.0.6 的标签了,而尝试从远程仓库(GitHub)拉取这些标签…...
第一届“龙信杯”电子数据取证竞赛Writeup
目录 移动终端取证 请分析涉案手机的设备标识是_______。(标准格式:12345678) 请确认嫌疑人首次安装目标APP的安装时间是______。(标准格式:2023-09-13.11:32:23) 此检材共连接过______个WiFi。&#x…...
Vue与React//双绑问题
Vue和React是两个目前最流行的前端框架,它们有一些区别主要区别如下: 响应式原理:Vue使用基于模板的方式进行双向绑定,其中使用了Vue自己实现的响应式系统。Vue能够通过追踪数据的依赖关系,自动更新DOM元素。而React采…...
信息安全第四周
社会工程学 社会工程学主要研究如何操纵人的心理和情感来获取机密信息或其他目标。它主要不是通过技术手段攻击计算机系统,而是通过心理学和人际交往技巧来欺骗人,使他们泄露密码、安全代码或其他敏感信息。社会工程学主要是一种安全风险,主要…...
机器学习基础概念与常见算法入门【机器学习、常见模型】
机器学习基础概念与算法 机器学习是计算机科学领域的一个分支,它致力于让计算机系统具备从数据中学习和改进的能力,而不需要显式地进行编程。与传统编程相比,机器学习有着根本性的不同之处。 机器学习与传统编程的不同 传统编程࿱…...
移动端 [Android iOS] 压缩 ECDSA PublicKey
移动端 [Android & iOS] 压缩 ECDSA PublicKey AndroidiOS 使用 Android KeyStore 和 iOS 的 Secure Enclave 提供的安全能力使用 P-256 来对 API 请求进行签名,服务器端再进行验证。 但是发现不论是 iOS 还是安卓都没有提供一个便捷的方式从 iOS 的SecKeyCopyE…...
Spring的配置Bean的方式
在Spring框架中,配置Bean有三种主要方式:自动装配、基于Java的显式配置和基于XML的显式配置。 1、自动装配: 自动装配是Spring容器根据Bean之间的依赖关系,自动将需要的Bean注入到目标Bean中。这是一种非常简便和快捷的配置方式&…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...
《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
Angular微前端架构:Module Federation + ngx-build-plus (Webpack)
以下是一个完整的 Angular 微前端示例,其中使用的是 Module Federation 和 npx-build-plus 实现了主应用(Shell)与子应用(Remote)的集成。 🛠️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...
