当前位置: 首页 > news >正文

Spring Boot 使用 Disruptor 做内部高性能消息队列

这里写自定义目录标题

  • 一 、背景
  • 二 、Disruptor介绍
  • 三 、Disruptor 的核心概念
    • 3.1 Ring Buffer
    • 3.2 Sequence Disruptor
    • 3.3 Sequencer
    • 3.4 Sequence Barrier
    • 3.5 Wait Strategy
    • 3.6 Event
    • 3.7 EventProcessor
    • 3.8 EventHandler
    • 3.9 Producer
  • 四、案例-demo
  • 五、总结

一 、背景

工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka也不是rabbitmq。Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录。

二 、Disruptor介绍

  1. Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。;
  2. Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟;
  3. 从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了;
  4. Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升;
  5. 其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案;
    Disruptor的github主页

三 、Disruptor 的核心概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

3.1 Ring Buffer

如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

3.2 Sequence Disruptor

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。

3.3 Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

3.4 Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

3.5 Wait Strategy

定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

3.6 Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

3.7 EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

3.8 EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

3.9 Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
在这里插入图片描述

四、案例-demo

通过下面8个步骤,你就能将Disruptor Get回家啦:
1、添加pom.xml依赖

<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version>
</dependency>

2、消息体Model

/*** 消息体*/
@Data
public class MessageModel {private String message;
}

3、构造EventFactory

public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
}

4、构造EventHandler-消费者

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {//这里停止1000ms是为了确定消费消息是异步的Thread.sleep(1000);log.info("消费者处理消息开始");if (event != null) {log.info("消费者消费的信息是:{}",event);}} catch (Exception e) {log.info("消费者处理消息失败");}log.info("消费者处理消息结束");}
}

5、构造BeanManager

/*** 获取实例化对象*/
@Component
public class BeanManager implements ApplicationContextAware {private static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}public static ApplicationContext getApplicationContext() { return applicationContext; }public static Object getBean(String name) {return applicationContext.getBean(name);}public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}
}

6、构造MQManager

@Configuration
public class MQManager {@Bean("messageModel")public RingBuffer<MessageModel> messageModelRingBuffer() {//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理ExecutorService executor = Executors.newFixedThreadPool(2);//指定事件工厂HelloEventFactory factory = new HelloEventFactory();//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率int bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者disruptor.handleEventsWith(new HelloEventHandler());// 启动disruptor线程disruptor.start();//获取ringbuffer环,用于接取生产者生产的事件RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}

7、构造Mqservice和实现类-生产者

public interface DisruptorMqService {/*** 消息* @param message*/void sayHelloMq(String message);
}@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {@Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Overridepublic void sayHelloMq(String message) {log.info("record the message: {}",message);//获取下一个Event槽的下标long sequence = messageModelRingBuffer.next();try {//给Event填充数据MessageModel event = messageModelRingBuffer.get(sequence);event.setMessage(message);log.info("往消息队列中添加消息:{}", event);} catch (Exception e) {log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());} finally {//发布Event,激活观察者去消费,将sequence传递给改消费者//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producermessageModelRingBuffer.publish(sequence);}}
}

8、构造测试类及方法

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {@Autowiredprivate DisruptorMqService disruptorMqService;/*** 项目内部使用Disruptor做消息队列* @throws Exception*/@Testpublic void sayHelloMqTest() throws Exception{disruptorMqService.sayHelloMq("消息到了,Hello world!");log.info("消息队列已发送完毕");//这里停止2000ms是为了确定是处理消息是异步的Thread.sleep(2000);}
}

测试运行结果

2020-04-05 14:31:18.543  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : record the message: 消息到了,Hello world!
2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)
2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.utils.demo.DemoApplicationTests      : 消息队列已发送完毕
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息开始
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息结束

五、总结

其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。

相关文章:

Spring Boot 使用 Disruptor 做内部高性能消息队列

这里写自定义目录标题 一 、背景二 、Disruptor介绍三 、Disruptor 的核心概念3.1 Ring Buffer3.2 Sequence Disruptor3.3 Sequencer3.4 Sequence Barrier3.5 Wait Strategy3.6 Event3.7 EventProcessor3.8 EventHandler3.9 Producer 四、案例-demo五、总结 一 、背景 工作中遇…...

一、灵动mm32单片机_开发环境的搭建(Keil)

1、安装Keil MDK。 略。 2、安装芯片对应的Pack包。 (1)这里以MM32F0130单片机为例。 (2)进入灵动微电子官网。上海灵动微电子股份有限公司 (3)点击“支持”→“KEILPacl”。 (3)点击下载Pack包。 (4)下载后&#xff0c;解压下载的压缩包&#xff0c;找到对应的Pack包&…...

【5G PHY】5G SS/PBCH块介绍(二)

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G算力网络技术标准研究。 博客…...

简单而高效:使用PHP爬虫从网易音乐获取音频的方法

概述 网易音乐是一个流行的在线音乐平台&#xff0c;提供了海量的音乐资源和服务。如果你想从网易音乐下载音频文件&#xff0c;你可能会遇到一些困难&#xff0c;因为网易音乐对其音频资源进行了加密和防盗链的处理。本文将介绍一种使用PHP爬虫从网易音乐获取音频的方法&…...

渗透测试工具-sqlmap使用

sqlmap是一个开源渗透测试的自动化工具&#xff0c;可以自动检测和利用SQL注入漏洞并接管数据库服务器。它配备了一个强大的检测引擎&#xff0c;许多用于终极渗透测试的利基功能&#xff0c;以及广泛的开关&#xff0c;包括数据库指纹识别、从数据库中获取数据、访问底层文件系…...

C# WPF: Imag图片填充方式有哪些?

C#和WPF中的图像填充方式 在WPF中&#xff0c;你可以使用Image控件来显示图像&#xff0c;并使用不同的填充方式来控制图像在控件中的显示方式。以下是一些常见的图像填充方式&#xff1a; Stretch&#xff08;拉伸&#xff09;&#xff1a;这是默认的填充方式&#xff0c;它…...

uniapp开发小程序—根据生日日期计算年龄 周岁

0、需求 在UniApp开发小程序中&#xff0c;将接口返回的出生日期转化为年龄&#xff1b;判断接口返回的年龄是否是周岁 可以使用JavaScript的日期处理方法来实现。 一、第一种方式&#xff08;示例代码&#xff09;&#xff1a; //javascript // 假设接口返回的年龄为生日的…...

windows下基于vscode的ssh服务远程连接ubuntu服务器

Ubuntu端配置 1.确保ubuntu端已启用ssh服务 首先&#xff0c;安装ssh服务 sudo apt-get install openssh-server 安装后&#xff0c;打开ssh服务 sudo service ssh start 如果显示有sshd就说明成功了。 判断是否成功打开 ps -e|grep ssh 同时也可以通过如下方式确保ss…...

OpenCV学习(二)——OpenCV中绘图功能

2. OpenCV中绘图功能2.1 画线2.2 画矩形2.3 画圆2.4 画多边形2.5 添加文本 2. OpenCV中绘图功能 绘图可以实现画线、画矩形、画圆、画多边形和添加文本等操作。 import cv2 import numpy as np# 读取图像 img cv2.imread(lena.jpg)# 画直线 cv2.line(img, (0, 0), (512, 512…...

业务架构、应用架构、技术架构、数据架构

架构规划的重要性 如果没有进行合理的架构规划&#xff0c;将会引发一系列的问题。为了避免这些问题的发生&#xff0c;企业需要进行业务架构、应用架构、技术架构和数据架构的全面规划和设计&#xff0c;以构建一个清晰、可持续发展的企业架构。 https://www.zhihu.com/que…...

独创改进 | RT-DETR 引入 Asymptotic Hybrid Encoder | 渐进混合特征解码结构

本专栏内容均为博主独家全网首发,未经授权,任何形式的复制、转载、洗稿或传播行为均属违法侵权行为,一经发现将采取法律手段维护合法权益。我们对所有未经授权传播行为保留追究责任的权利。请尊重原创,支持创作者的努力,共同维护网络知识产权。 文章目录 网络结构实验结果…...

SpringCloudAlibaba实战-nacos集群部署

写在前面&#xff1a;在学习阶段&#xff0c;我们想快速学习SpringCloudAlibaba功能&#xff0c;但总是花费大量时间跟着视频或博客做组件配置。由于版本的更迭&#xff0c;我们学习时的组件版本很可能和作者的不一致&#xff0c;又或者是各自环境不一&#xff0c;只能一坑又一…...

Elasticsearch安装IK分词器

ik分词包 参考博客、参考博客 将下载好的zip包解压&#xff0c;生成一个ik文件夹 将ik文件夹移动到ES安装目录下的plugins文件夹下&#xff08;每台ES节点都要执行相同的操作&#xff09; 重启ES集群 坑...

『51单片机』 DS1302时钟

&#x1f6a9; WRITE IN FRONT &#x1f6a9; &#x1f50e; 介绍&#xff1a;"謓泽"正在路上朝着"攻城狮"方向"前进四" &#x1f50e;&#x1f3c5; 荣誉&#xff1a;2021|2022年度博客之星物联网与嵌入式开发TOP5|TOP4、2021|2222年获评百大…...

ubuntu部署个人网盘nextCloud使用docker-compose方式

概述 当下各大网盘的容量都是有限制的&#xff0c;而且xx云不开会员网速就拉跨。 所以就想搭建一个自己的盘&#xff0c;并且可以控制用户的权限分组&#xff1b; nextCloud就很合适 我这边都是自己用偶尔给其他人使用下&#xff0c;所以直接docker部署了。 ubuntu版本&…...

【ChatGPT 01】ChatGPT基础科普

1. 从图灵测试到ChatGPT 1950年&#xff0c;艾伦•图灵(Alan Turing)发表论文**《计算机器与智能》&#xff08; Computing Machinery and Intelligence&#xff09;&#xff0c;提出并尝试回答“机器能否思考”这一关键问题。在论文中&#xff0c;图灵提出了“模仿游戏”&…...

2317.操作后的最大异或和

非常好的一个位运算推公式题目 首先num[i]^x可以知道 这里可以变成任意一个数字 又有num[i]&上上面的数字 所以我们可以扣掉任意位的1把它变成0 答案让我们求异或和 所以只要这一位有1 答案的这一位就有1 我们发现这就是一个按位或运算 class Solution { public:int maxi…...

Python爬虫-经典案例详解

爬虫一般指从网络资源的抓取&#xff0c;通过Python语言的脚本特性&#xff0c;配置字符的处理非常灵活&#xff0c;Python有丰富的网络抓取模块&#xff0c;因而两者经常联系在一起Python就被叫作爬虫。爬虫可以抓取某个网站或者某个应用的内容提取有用的价值信息。有时还可以…...

【信创】银河麒麟V10 安装postgis

安装postGis步骤 1、安装 proj4 #tar -zxvf proj-4.8.0.tar.gz #cd proj-4.8.0 #mkdir -p /opt/proj-4.8.0 #./configure --prefix=/opt/proj-4.8.0 #make && make install #vi /etc/ld.so.conf.d/proj-4.8.0.conf #ldconfig 2、安装 geos #tar -xjf geos-3.6.1.tar.b…...

OpenCV常用功能——灰度处理和图像二值化处理

文章目录 一、灰度处理1.1 cvtColor函数 二、图像二值化处理2.1 全局阈值2.2 自适应阈值 一、灰度处理 1.1 cvtColor函数 函数原型&#xff1a; cv2.cvtColor(src, code[, dst[, dstCn]]) -> dst功能&#xff1a;转换图像颜色空间。 参数&#xff1a; src: 输入图像。co…...

Android Wi-Fi 连接失败日志分析

1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分&#xff1a; 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析&#xff1a; CTR…...

synchronized 学习

学习源&#xff1a; https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖&#xff0c;也要考虑性能问题&#xff08;场景&#xff09; 2.常见面试问题&#xff1a; sync出…...

day52 ResNet18 CBAM

在深度学习的旅程中&#xff0c;我们不断探索如何提升模型的性能。今天&#xff0c;我将分享我在 ResNet18 模型中插入 CBAM&#xff08;Convolutional Block Attention Module&#xff09;模块&#xff0c;并采用分阶段微调策略的实践过程。通过这个过程&#xff0c;我不仅提升…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上&#xff0c;开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识&#xff0c;在 vs 2017 平台上&#xff0c;进行 ASP.NET 应用程序和简易网站的开发&#xff1b;初步熟悉开发一…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施&#xff0c;由雇主和个人按一定比例缴纳保险费&#xff0c;建立社会医疗保险基金&#xff0c;支付雇员医疗费用的一种医疗保险制度&#xff0c; 它是促进社会文明和进步的…...

Python如何给视频添加音频和字幕

在Python中&#xff0c;给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加&#xff0c;包括必要的代码示例和详细解释。 环境准备 在开始之前&#xff0c;需要安装以下Python库&#xff1a;…...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)

船舶制造装配管理现状&#xff1a;装配工作依赖人工经验&#xff0c;装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书&#xff0c;但在实际执行中&#xff0c;工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...

使用LangGraph和LangSmith构建多智能体人工智能系统

现在&#xff0c;通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战&#xff0c;比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配

目录 一、C 内存的基本概念​ 1.1 内存的物理与逻辑结构​ 1.2 C 程序的内存区域划分​ 二、栈内存分配​ 2.1 栈内存的特点​ 2.2 栈内存分配示例​ 三、堆内存分配​ 3.1 new和delete操作符​ 4.2 内存泄漏与悬空指针问题​ 4.3 new和delete的重载​ 四、智能指针…...