当前位置: 首页 > news >正文

SpringBoot + Disruptor 实现特快高并发处理,使用Disruptor高速实现队列

1 前言

工作中遇到项目使用Disruptor做消息队列,对!你没看错,不是Kafka也不是rabbitmq。Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录。

2 Disruptor介绍

Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。

Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。

从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。

Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。

其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。

Disruptor的github主页:

https://github.com/LMAX-Exchange/disruptor

3 Disruptor 的核心概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

3.1 Ring Buffer

如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

3.2 Sequence Disruptor

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。

虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。

注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述。

3.3 Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

3.4 Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

3.5 Wait Strategy

定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

3.6 Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

3.7 EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

3.8 EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

3.9 Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

在这里插入图片描述

4 案例-demo

通过下面8个步骤,你就能将Disruptor Get回家啦:

4.1 添加pom.xml依赖

<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.4</version>
</dependency>

4.2 消息体Model

/*** 消息体*/
@Data
public class MessageModel {private String message;
}

4.3 构造EventFactory

public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
}

4.4 构造EventHandler-消费者

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {//这里停止1000ms是为了确定消费消息是异步的Thread.sleep(1000);log.info("消费者处理消息开始");if (event != null) {log.info("消费者消费的信息是:{}",event);}} catch (Exception e) {log.info("消费者处理消息失败");}log.info("消费者处理消息结束");}
}

4.5 构造BeanManager

/*** 获取实例化对象*/
@Component
public class BeanManager implements ApplicationContextAware {private static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}public static ApplicationContext getApplicationContext() { return applicationContext; }public static Object getBean(String name) {return applicationContext.getBean(name);}public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}
}

4.6 构造MQManager

@Configuration
public class MQManager {@Bean("messageModel")public RingBuffer<MessageModel> messageModelRingBuffer() {//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理ExecutorService executor = Executors.newFixedThreadPool(2);//指定事件工厂HelloEventFactory factory = new HelloEventFactory();//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率int bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者disruptor.handleEventsWith(new HelloEventHandler());// 启动disruptor线程disruptor.start();//获取ringbuffer环,用于接取生产者生产的事件RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
}

4.7 构造Mqservice和实现类-生产者

public interface DisruptorMqService {/*** 消息* @param message*/void sayHelloMq(String message);
}@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {@Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Overridepublic void sayHelloMq(String message) {log.info("record the message: {}",message);//获取下一个Event槽的下标long sequence = messageModelRingBuffer.next();try {//给Event填充数据MessageModel event = messageModelRingBuffer.get(sequence);event.setMessage(message);log.info("往消息队列中添加消息:{}", event);} catch (Exception e) {log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());} finally {//发布Event,激活观察者去消费,将sequence传递给改消费者//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producermessageModelRingBuffer.publish(sequence);}}
}

4.8 构造测试类及方法

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {@Autowiredprivate DisruptorMqService disruptorMqService;/*** 项目内部使用Disruptor做消息队列* @throws Exception*/@Testpublic void sayHelloMqTest() throws Exception{disruptorMqService.sayHelloMq("消息到了,Hello world!");log.info("消息队列已发送完毕");//这里停止2000ms是为了确定是处理消息是异步的Thread.sleep(2000);}
}

4.9 测试运行结果

2023-04-05 14:31:18.543  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : record the message: 消息到了,Hello world!
2023-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)
2023-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.utils.demo.DemoApplicationTests      : 消息队列已发送完毕
2023-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息开始
2023-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)
2023-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息结束

5 总结

其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。

相关文章:

SpringBoot + Disruptor 实现特快高并发处理,使用Disruptor高速实现队列

1 前言 工作中遇到项目使用Disruptor做消息队列&#xff0c;对&#xff01;你没看错&#xff0c;不是Kafka也不是rabbitmq。Disruptor有个最大的优点就是快&#xff0c;还有一点它是开源的哦&#xff0c;下面做个简单的记录。 2 Disruptor介绍 Disruptor 是英国外汇交易公司…...

git push origin HEAD:refs/for/master

git push <远程主机名> <本地分支名> : <远程分支名> 例如 git push origin master&#xff1a;refs/for/master 是将本地的master分支推送到远程主机origin上的对应master分支 origin 是远程主机名&#xff0c; 第一个master是本地分支名&#xff0c; 第二…...

S25FL256S介绍及FPGA实现思路

本文介绍 S25FL256S 这款 FLASH 芯片&#xff0c;并进行 FPGA 读写控制的实现&#xff08;编程思路及注意事项&#xff09;。 文章目录 S25FL-S 介绍管脚功能说明SPI 时钟模式SDRDDR 工作模式FLASH存储阵列&#xff08;地址空间映射&#xff09;常用寄存器及相关指令Status Reg…...

淘宝客APP源码/社交电商自营商城源码/前端基于Uniapp开发

淘宝客APP源码&#xff0c;前端基于Uniapp开发的社交电商自营商城源码。Thinkphp的后台&#xff0c;不是很标准&#xff0c;感兴趣的可以自行研究。 商城功能 1、首页基础装修&#xff1b;2、丰富选品库&#xff1b;3、淘口令解析&#xff1b;4、支持京东&#xff1b;5、支持…...

Oracle 服务器日常巡检

文章目录 1、数据库基本状况检查2、数据库相关资源使用情况检查3、检查Oracle数据库性能4、数据库服务器CPU、MEM、I/O性能5、数据库服务器安全检查 Oracle数据库的日常巡检内容包括&#xff1a; &#xff08;1&#xff09;Oracle数据库基本状况检查&#xff1b; &#xff08…...

【轨道机器人】实现Windows与下位机串口通信(未完成)

方案一&#xff1a;QT&#xff0c;编写类似串口调试助手的APP&#xff0c;连接上硬件&#xff0c;qt有个好像是串口缓存函数&#xff0c;可以防止占用CPU。&#xff08;缺点qt估计要时间学&#xff09; 方案二&#xff1a;利用vscode、C&#xff0c;编写一个可执行exe文件&…...

无人机内存卡数据恢复

1.插入内存卡 2.选择对应的品牌 3.点击恢复 建议&#xff1a;发现数据打不开或者丢失情况&#xff0c;建议及时断电&#xff0c;以免影响数据的正常恢复&#xff01; #无人机##数据恢复##储存卡#...

基于SSM的校园二手物品交易市场设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…...

Android14 Beta 5

Beta 5&#xff0c;这是 Android 14 Beta 计划中的最后一次计划更新。这是确保您的应用程序已准备就绪并在非 Beta 用户开始获取 Android 14 之前提供反馈的最后机会。为了使您能够在跨多种外形尺寸的设备上测试您的应用程序&#xff0c;Beta 5 适用于 Pixel Tablet 和 Pixel F…...

力扣labuladong——一刷day32

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、力扣654. 最大二叉树二、力扣105. 从前序与中序遍历序列构造二叉树三、力扣106. 从中序与后序遍历序列构造二叉树四、力扣889. 根据前序和后序遍历构造二叉…...

Day01_《MySQL索引与性能优化》摘要

一、资料 视频&#xff1a;《尚硅谷MySQL数据库高级&#xff0c;mysql优化&#xff0c;数据库优化》—周阳 其他博主的完整笔记&#xff1a;MySQL 我的笔记&#xff1a;我的笔记只总结了视频p14-p46部分&#xff0c;因为只有这部分是讲解了MySQL的索引与explain语句分析优化…...

BMS系统项目

1、通过电压监测是否冲满&#xff0c;通过电压可以监测是否放完电 电池得参数 单体过压&#xff08;充满电&#xff09; 过压恢复&#xff08;百分之90多&#xff09; 欠压保护&#xff08;百分之几得电&#xff0c;快关机了&#xff09; 欠压恢复&#xff08;就是欠压之上…...

sql server 多行数据合并一行显示

在 SQL Server 中&#xff0c;可以使用 STUFF 和 FOR XML PATH 进行多行合并成一行。例如&#xff0c;假设有一个表名为 orders &#xff0c;其中包含订单号和产品名称&#xff1a; order_idproduct_name1Product A1Product B2Product C2Product D 以下查询将在 order_id 列上…...

「我的AIGC咒语库:分享和AI对话交流的秘诀——如何利用Prompt和AI进行高效交流?」

文章目录 每日一句正能量前言基础介绍什么是Prompt?什么是 Prompt Engineering&#xff1f;为什么需要 Prompt Engineering&#xff1f;如何进行 Prompt Engineering&#xff1f;Prompt的基本原则Prompt的编写模式AI 可以帮助程序员做什么&#xff1f;技术知识总结拆解任务阅读…...

强国有我助力苔花绽放 | 爱心捐赠仪式在西安顺利举办

2023年11月2日&#xff0c;由中国儿童中心、全国少年儿童“双有”主题教育活动组委会、中华少年儿童慈善救助基金会强国有我项目主办&#xff0c;陕西省青少年宫协会、陕西省妇女儿童活动中心、陕西回归儿童救助中心承办的“苔花绽放”事实无人抚养儿童关爱计划捐赠仪式在陕西回…...

Flink SQL -- CheckPoint

1、开启CheckPoint checkpoint可以定时将flink任务的状态持久化到hdfs中&#xff0c;任务执行失败重启可以保证中间结果不丢失 # 修改flink配置文件 vim flink-conf.yaml# checkppint 间隔时间 execution.checkpointing.interval: 1min # 任务手动取消时保存checkpoint execu…...

Load-balanced-online-OJ-system 负载均衡的OJ系统项目

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量博客汇总 本项目Github地址 - Load-balanced-o…...

ES6 导入导出

ES6 导入导出 ES6引入了原生的模块化支持&#xff0c;使得JavaScript代码可以被划分为可重用的模块。这些模块可以导出部分代码&#xff08;如函数、对象、类等&#xff09;&#xff0c;并被其他模块导入使用。 export 命名导出&#xff08;Named Exports&#xff09; 可以从…...

【Liunx】部署Ansible自动化运维工具

Ansible自动化运维工具 概述安装部署1.通过yum下载Ansible2.对自己做免密配置3.修改ansiable host配置对服务器进行分组4.测试&#xff1a;对所有服务器进行ping命令5.写playbook6.执行我们写的playbook脚本7.验证 概述 ansible是新出现的自动化运维工具&#xff0c;基于Pytho…...

Python的基础语法

1. 注释&#xff1a;在Python中&#xff0c;使用井号&#xff08;#&#xff09;表示单行注释&#xff0c;三个单引号&#xff08;&#xff09;或三个双引号&#xff08;"""&#xff09;表示多行注释。 2. 变量&#xff1a;在Python中&#xff0c;不需要声明变量…...

Skywalking流程分析_8(拦截器插件的加载)

前言 在之前的文章中我们将&#xff0c;静态方法、构造方法、实例方法的增强逻辑都分析完毕&#xff0c;但在增强前&#xff0c;对于拦截类的加载是至关重要的&#xff0c;下面我们就来详细的分析 增强插件的加载 静态方法增强前的加载 //clazz 要修改的字节码的原生类 Sta…...

智能AI系统ChatGPT网站源码+支持OpenAI DALL-E3文生图+支持ai绘画(Midjourney)/支持GPT全模型+国内AI全模型

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…...

腾讯云服务器可用区是什么意思?可用区选择方法

腾讯云服务器可用区是什么意思&#xff1f;云服务器可用区如何选择&#xff1f;可用区是指在同一个地域内电力和网络相互独立的区域&#xff0c;可用区可以做到故障隔离&#xff0c;所以可用区存在的意义在于构建高可用、高容灾应用&#xff0c;将应用部署在不同可用区内&#…...

Jupyter运行显存爆炸,明明上一个单元格已经运行完毕为什么还是会炸?

问题再现 上一个单元格运行完了train()&#xff0c;我想要用模型输出做点东西&#xff0c;可是提醒我显存不够&#xff1b; 在命令行中查看显存占用情况&#xff0c;发现4张卡都占满了&#xff0c;可真是太厉害了&#xff01; 解决方案 查看原来写的validate()&#xff0c;发…...

【ICE】webrtc lite 1:cmake构建

p2ptransportchannel 是 ICE 实现基于此实现了DTLTransport而前者是独立的模块。依赖库较少主要是ssl absl OpenSSL Protobuf 可选 absl webrtc 不支持大端 :big endian architectures defined in WebRTC’s arch.h D_WINSOCKAPI_ 用来做啥? 以下编译选项: add_compile_opti…...

国内最受欢迎电商API接口调用淘宝商品详情API接口数据

国内实用的API接口 国内最受欢迎的7大API供应平台对比和介绍 本文将介绍7款API供应平台&#xff1a;聚合数据、百度APIStore、Apix、数说聚合、通联数据、HaoService、datasift 。排名不分先后&#xff01; 免费实用的API接口 第一部分 1、电商数据&#xff08;API数据接口_开…...

第五篇 基于JSP 技术的网上购书系统——主页面和登录页面实现(网上商城、仿淘宝、当当、亚马逊)

目录 1.系统主界面 1.1功能说明 1.2界面设计 1.3处理流程 1.4 数据来源和算法 1.4.1数据来源 1.4.2查询条件 1.4.3表间关系 1.4.4相关sql实例 2.系统登陆后界面 2.1功能说明 2.2界面设计 2.3处理流程 2.4数据来源和算法 2.4.1数据来源 2.4.2查询条件 2.4.…...

【 云原生 | K8S 】kubeadm 部署Kubernetes集群

目录 1 环境准备 2 所有节点安装docker 3 所有节点安装kubeadm&#xff0c;kubelet和kubectl 4 部署K8S集群 4.1 查看初始化需要的镜像 4.2 初始化kubeadm 4.3 设定kubectl 4.4 所有节点部署网络插件flannel master&#xff08;2C/4G&#xff0c;cpu核心数要求大于2&am…...

微信小程序rich-text 文本首行缩进和图片居中和富文本rich-text 解析多个空格不成功 nbsp

微信小程序开发使用rich-text组件渲染html格式的代码&#xff0c;常常因为不能自定义css导致文本不能缩进&#xff0c;以及图片不能居中等问题&#xff0c;这里可以考虑使用js的replace方法&#xff0c;替换字符串&#xff0c;然后在渲染的同时加载行内样式。 //获取字符串的图…...

uniapp 设置重写uni-body-page样式,输入字母转大写,条形码扫描

uniapp 设置重写uni-body-page样式&#xff0c;输入字母转大写 一、重写uni-body-page样式 page{ }二、输入字母转大写 input标签设置样式&#xff1a; style"text-transform: uppercase;"绑定的值通过.toUpperCase()转大写 三、条形码扫描 // 调起条码扫描uni…...