当前位置: 首页 > news >正文

SpringBoot + Disruptor 实现特快高并发处理,使用Disruptor高速实现队列

1 前言

工作中遇到项目使用Disruptor做消息队列,对!你没看错,不是Kafka也不是rabbitmq。Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录。

2 Disruptor介绍

Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。

Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。

从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。

Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。

其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。

Disruptor的github主页:

https://github.com/LMAX-Exchange/disruptor

3 Disruptor 的核心概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

3.1 Ring Buffer

如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

3.2 Sequence Disruptor

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。

虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。

注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述。

3.3 Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

3.4 Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

3.5 Wait Strategy

定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

3.6 Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

3.7 EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

3.8 EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

3.9 Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

在这里插入图片描述

4 案例-demo

通过下面8个步骤,你就能将Disruptor Get回家啦:

4.1 添加pom.xml依赖

<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.4</version>
</dependency>

4.2 消息体Model

/*** 消息体*/
@Data
public class MessageModel {private String message;
}

4.3 构造EventFactory

public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
}

4.4 构造EventHandler-消费者

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {//这里停止1000ms是为了确定消费消息是异步的Thread.sleep(1000);log.info("消费者处理消息开始");if (event != null) {log.info("消费者消费的信息是:{}",event);}} catch (Exception e) {log.info("消费者处理消息失败");}log.info("消费者处理消息结束");}
}

4.5 构造BeanManager

/*** 获取实例化对象*/
@Component
public class BeanManager implements ApplicationContextAware {private static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}public static ApplicationContext getApplicationContext() { return applicationContext; }public static Object getBean(String name) {return applicationContext.getBean(name);}public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}
}

4.6 构造MQManager

@Configuration
public class MQManager {@Bean("messageModel")public RingBuffer<MessageModel> messageModelRingBuffer() {//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理ExecutorService executor = Executors.newFixedThreadPool(2);//指定事件工厂HelloEventFactory factory = new HelloEventFactory();//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率int bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者disruptor.handleEventsWith(new HelloEventHandler());// 启动disruptor线程disruptor.start();//获取ringbuffer环,用于接取生产者生产的事件RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
}

4.7 构造Mqservice和实现类-生产者

public interface DisruptorMqService {/*** 消息* @param message*/void sayHelloMq(String message);
}@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {@Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Overridepublic void sayHelloMq(String message) {log.info("record the message: {}",message);//获取下一个Event槽的下标long sequence = messageModelRingBuffer.next();try {//给Event填充数据MessageModel event = messageModelRingBuffer.get(sequence);event.setMessage(message);log.info("往消息队列中添加消息:{}", event);} catch (Exception e) {log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());} finally {//发布Event,激活观察者去消费,将sequence传递给改消费者//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producermessageModelRingBuffer.publish(sequence);}}
}

4.8 构造测试类及方法

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {@Autowiredprivate DisruptorMqService disruptorMqService;/*** 项目内部使用Disruptor做消息队列* @throws Exception*/@Testpublic void sayHelloMqTest() throws Exception{disruptorMqService.sayHelloMq("消息到了,Hello world!");log.info("消息队列已发送完毕");//这里停止2000ms是为了确定是处理消息是异步的Thread.sleep(2000);}
}

4.9 测试运行结果

2023-04-05 14:31:18.543  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : record the message: 消息到了,Hello world!
2023-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)
2023-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.utils.demo.DemoApplicationTests      : 消息队列已发送完毕
2023-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息开始
2023-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)
2023-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息结束

5 总结

其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。

相关文章:

SpringBoot + Disruptor 实现特快高并发处理,使用Disruptor高速实现队列

1 前言 工作中遇到项目使用Disruptor做消息队列&#xff0c;对&#xff01;你没看错&#xff0c;不是Kafka也不是rabbitmq。Disruptor有个最大的优点就是快&#xff0c;还有一点它是开源的哦&#xff0c;下面做个简单的记录。 2 Disruptor介绍 Disruptor 是英国外汇交易公司…...

git push origin HEAD:refs/for/master

git push <远程主机名> <本地分支名> : <远程分支名> 例如 git push origin master&#xff1a;refs/for/master 是将本地的master分支推送到远程主机origin上的对应master分支 origin 是远程主机名&#xff0c; 第一个master是本地分支名&#xff0c; 第二…...

S25FL256S介绍及FPGA实现思路

本文介绍 S25FL256S 这款 FLASH 芯片&#xff0c;并进行 FPGA 读写控制的实现&#xff08;编程思路及注意事项&#xff09;。 文章目录 S25FL-S 介绍管脚功能说明SPI 时钟模式SDRDDR 工作模式FLASH存储阵列&#xff08;地址空间映射&#xff09;常用寄存器及相关指令Status Reg…...

淘宝客APP源码/社交电商自营商城源码/前端基于Uniapp开发

淘宝客APP源码&#xff0c;前端基于Uniapp开发的社交电商自营商城源码。Thinkphp的后台&#xff0c;不是很标准&#xff0c;感兴趣的可以自行研究。 商城功能 1、首页基础装修&#xff1b;2、丰富选品库&#xff1b;3、淘口令解析&#xff1b;4、支持京东&#xff1b;5、支持…...

Oracle 服务器日常巡检

文章目录 1、数据库基本状况检查2、数据库相关资源使用情况检查3、检查Oracle数据库性能4、数据库服务器CPU、MEM、I/O性能5、数据库服务器安全检查 Oracle数据库的日常巡检内容包括&#xff1a; &#xff08;1&#xff09;Oracle数据库基本状况检查&#xff1b; &#xff08…...

【轨道机器人】实现Windows与下位机串口通信(未完成)

方案一&#xff1a;QT&#xff0c;编写类似串口调试助手的APP&#xff0c;连接上硬件&#xff0c;qt有个好像是串口缓存函数&#xff0c;可以防止占用CPU。&#xff08;缺点qt估计要时间学&#xff09; 方案二&#xff1a;利用vscode、C&#xff0c;编写一个可执行exe文件&…...

无人机内存卡数据恢复

1.插入内存卡 2.选择对应的品牌 3.点击恢复 建议&#xff1a;发现数据打不开或者丢失情况&#xff0c;建议及时断电&#xff0c;以免影响数据的正常恢复&#xff01; #无人机##数据恢复##储存卡#...

基于SSM的校园二手物品交易市场设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…...

Android14 Beta 5

Beta 5&#xff0c;这是 Android 14 Beta 计划中的最后一次计划更新。这是确保您的应用程序已准备就绪并在非 Beta 用户开始获取 Android 14 之前提供反馈的最后机会。为了使您能够在跨多种外形尺寸的设备上测试您的应用程序&#xff0c;Beta 5 适用于 Pixel Tablet 和 Pixel F…...

力扣labuladong——一刷day32

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、力扣654. 最大二叉树二、力扣105. 从前序与中序遍历序列构造二叉树三、力扣106. 从中序与后序遍历序列构造二叉树四、力扣889. 根据前序和后序遍历构造二叉…...

Day01_《MySQL索引与性能优化》摘要

一、资料 视频&#xff1a;《尚硅谷MySQL数据库高级&#xff0c;mysql优化&#xff0c;数据库优化》—周阳 其他博主的完整笔记&#xff1a;MySQL 我的笔记&#xff1a;我的笔记只总结了视频p14-p46部分&#xff0c;因为只有这部分是讲解了MySQL的索引与explain语句分析优化…...

BMS系统项目

1、通过电压监测是否冲满&#xff0c;通过电压可以监测是否放完电 电池得参数 单体过压&#xff08;充满电&#xff09; 过压恢复&#xff08;百分之90多&#xff09; 欠压保护&#xff08;百分之几得电&#xff0c;快关机了&#xff09; 欠压恢复&#xff08;就是欠压之上…...

sql server 多行数据合并一行显示

在 SQL Server 中&#xff0c;可以使用 STUFF 和 FOR XML PATH 进行多行合并成一行。例如&#xff0c;假设有一个表名为 orders &#xff0c;其中包含订单号和产品名称&#xff1a; order_idproduct_name1Product A1Product B2Product C2Product D 以下查询将在 order_id 列上…...

「我的AIGC咒语库:分享和AI对话交流的秘诀——如何利用Prompt和AI进行高效交流?」

文章目录 每日一句正能量前言基础介绍什么是Prompt?什么是 Prompt Engineering&#xff1f;为什么需要 Prompt Engineering&#xff1f;如何进行 Prompt Engineering&#xff1f;Prompt的基本原则Prompt的编写模式AI 可以帮助程序员做什么&#xff1f;技术知识总结拆解任务阅读…...

强国有我助力苔花绽放 | 爱心捐赠仪式在西安顺利举办

2023年11月2日&#xff0c;由中国儿童中心、全国少年儿童“双有”主题教育活动组委会、中华少年儿童慈善救助基金会强国有我项目主办&#xff0c;陕西省青少年宫协会、陕西省妇女儿童活动中心、陕西回归儿童救助中心承办的“苔花绽放”事实无人抚养儿童关爱计划捐赠仪式在陕西回…...

Flink SQL -- CheckPoint

1、开启CheckPoint checkpoint可以定时将flink任务的状态持久化到hdfs中&#xff0c;任务执行失败重启可以保证中间结果不丢失 # 修改flink配置文件 vim flink-conf.yaml# checkppint 间隔时间 execution.checkpointing.interval: 1min # 任务手动取消时保存checkpoint execu…...

Load-balanced-online-OJ-system 负载均衡的OJ系统项目

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量博客汇总 本项目Github地址 - Load-balanced-o…...

ES6 导入导出

ES6 导入导出 ES6引入了原生的模块化支持&#xff0c;使得JavaScript代码可以被划分为可重用的模块。这些模块可以导出部分代码&#xff08;如函数、对象、类等&#xff09;&#xff0c;并被其他模块导入使用。 export 命名导出&#xff08;Named Exports&#xff09; 可以从…...

【Liunx】部署Ansible自动化运维工具

Ansible自动化运维工具 概述安装部署1.通过yum下载Ansible2.对自己做免密配置3.修改ansiable host配置对服务器进行分组4.测试&#xff1a;对所有服务器进行ping命令5.写playbook6.执行我们写的playbook脚本7.验证 概述 ansible是新出现的自动化运维工具&#xff0c;基于Pytho…...

Python的基础语法

1. 注释&#xff1a;在Python中&#xff0c;使用井号&#xff08;#&#xff09;表示单行注释&#xff0c;三个单引号&#xff08;&#xff09;或三个双引号&#xff08;"""&#xff09;表示多行注释。 2. 变量&#xff1a;在Python中&#xff0c;不需要声明变量…...

React hook之useRef

React useRef 详解 useRef 是 React 提供的一个 Hook&#xff0c;用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途&#xff0c;下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

相机从app启动流程

一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...

鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/

使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题&#xff1a;docker pull 失败 网络不同&#xff0c;需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

jmeter聚合报告中参数详解

sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample&#xff08;样本数&#xff09; 表示测试中发送的请求数量&#xff0c;即测试执行了多少次请求。 单位&#xff0c;以个或者次数表示。 示例&#xff1a;…...

基于PHP的连锁酒店管理系统

有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发&#xff0c;数据库mysql&#xff0c;前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...

Linux系统部署KES

1、安装准备 1.版本说明V008R006C009B0014 V008&#xff1a;是version产品的大版本。 R006&#xff1a;是release产品特性版本。 C009&#xff1a;是通用版 B0014&#xff1a;是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存&#xff1a;1GB 以上 硬盘&#xf…...

LLaMA-Factory 微调 Qwen2-VL 进行人脸情感识别(二)

在上一篇文章中,我们详细介绍了如何使用LLaMA-Factory框架对Qwen2-VL大模型进行微调,以实现人脸情感识别的功能。本篇文章将聚焦于微调完成后,如何调用这个模型进行人脸情感识别的具体代码实现,包括详细的步骤和注释。 模型调用步骤 环境准备:确保安装了必要的Python库。…...