SpringBoot + Disruptor 实现特快高并发处理,使用Disruptor高速实现队列
1 前言
工作中遇到项目使用Disruptor做消息队列,对!你没看错,不是Kafka也不是rabbitmq。Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录。
2 Disruptor介绍
Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。
基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。
Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。
从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。
Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。
其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。
Disruptor的github主页:
https://github.com/LMAX-Exchange/disruptor
3 Disruptor 的核心概念
先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。
3.1 Ring Buffer
如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
3.2 Sequence Disruptor
通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。
虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述。
3.3 Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
3.4 Sequence Barrier
用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
3.5 Wait Strategy
定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
3.6 Event
在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
3.7 EventProcessor
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
3.8 EventHandler
Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
3.9 Producer
即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
4 案例-demo
通过下面8个步骤,你就能将Disruptor Get回家啦:
4.1 添加pom.xml依赖
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.4</version>
</dependency>
4.2 消息体Model
/*** 消息体*/
@Data
public class MessageModel {private String message;
}
4.3 构造EventFactory
public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
}
4.4 构造EventHandler-消费者
@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {//这里停止1000ms是为了确定消费消息是异步的Thread.sleep(1000);log.info("消费者处理消息开始");if (event != null) {log.info("消费者消费的信息是:{}",event);}} catch (Exception e) {log.info("消费者处理消息失败");}log.info("消费者处理消息结束");}
}
4.5 构造BeanManager
/*** 获取实例化对象*/
@Component
public class BeanManager implements ApplicationContextAware {private static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}public static ApplicationContext getApplicationContext() { return applicationContext; }public static Object getBean(String name) {return applicationContext.getBean(name);}public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}
}
4.6 构造MQManager
@Configuration
public class MQManager {@Bean("messageModel")public RingBuffer<MessageModel> messageModelRingBuffer() {//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理ExecutorService executor = Executors.newFixedThreadPool(2);//指定事件工厂HelloEventFactory factory = new HelloEventFactory();//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率int bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者disruptor.handleEventsWith(new HelloEventHandler());// 启动disruptor线程disruptor.start();//获取ringbuffer环,用于接取生产者生产的事件RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
}
4.7 构造Mqservice和实现类-生产者
public interface DisruptorMqService {/*** 消息* @param message*/void sayHelloMq(String message);
}@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {@Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Overridepublic void sayHelloMq(String message) {log.info("record the message: {}",message);//获取下一个Event槽的下标long sequence = messageModelRingBuffer.next();try {//给Event填充数据MessageModel event = messageModelRingBuffer.get(sequence);event.setMessage(message);log.info("往消息队列中添加消息:{}", event);} catch (Exception e) {log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());} finally {//发布Event,激活观察者去消费,将sequence传递给改消费者//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producermessageModelRingBuffer.publish(sequence);}}
}
4.8 构造测试类及方法
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {@Autowiredprivate DisruptorMqService disruptorMqService;/*** 项目内部使用Disruptor做消息队列* @throws Exception*/@Testpublic void sayHelloMqTest() throws Exception{disruptorMqService.sayHelloMq("消息到了,Hello world!");log.info("消息队列已发送完毕");//这里停止2000ms是为了确定是处理消息是异步的Thread.sleep(2000);}
}
4.9 测试运行结果
2023-04-05 14:31:18.543 INFO 7274 --- [ main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl : record the message: 消息到了,Hello world!
2023-04-05 14:31:18.545 INFO 7274 --- [ main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl : 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)
2023-04-05 14:31:18.545 INFO 7274 --- [ main] c.e.utils.demo.DemoApplicationTests : 消息队列已发送完毕
2023-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : 消费者处理消息开始
2023-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)
2023-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : 消费者处理消息结束
5 总结
其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。
相关文章:

SpringBoot + Disruptor 实现特快高并发处理,使用Disruptor高速实现队列
1 前言 工作中遇到项目使用Disruptor做消息队列,对!你没看错,不是Kafka也不是rabbitmq。Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录。 2 Disruptor介绍 Disruptor 是英国外汇交易公司…...

git push origin HEAD:refs/for/master
git push <远程主机名> <本地分支名> : <远程分支名> 例如 git push origin master:refs/for/master 是将本地的master分支推送到远程主机origin上的对应master分支 origin 是远程主机名, 第一个master是本地分支名, 第二…...

S25FL256S介绍及FPGA实现思路
本文介绍 S25FL256S 这款 FLASH 芯片,并进行 FPGA 读写控制的实现(编程思路及注意事项)。 文章目录 S25FL-S 介绍管脚功能说明SPI 时钟模式SDRDDR 工作模式FLASH存储阵列(地址空间映射)常用寄存器及相关指令Status Reg…...

淘宝客APP源码/社交电商自营商城源码/前端基于Uniapp开发
淘宝客APP源码,前端基于Uniapp开发的社交电商自营商城源码。Thinkphp的后台,不是很标准,感兴趣的可以自行研究。 商城功能 1、首页基础装修;2、丰富选品库;3、淘口令解析;4、支持京东;5、支持…...

Oracle 服务器日常巡检
文章目录 1、数据库基本状况检查2、数据库相关资源使用情况检查3、检查Oracle数据库性能4、数据库服务器CPU、MEM、I/O性能5、数据库服务器安全检查 Oracle数据库的日常巡检内容包括: (1)Oracle数据库基本状况检查; (…...

【轨道机器人】实现Windows与下位机串口通信(未完成)
方案一:QT,编写类似串口调试助手的APP,连接上硬件,qt有个好像是串口缓存函数,可以防止占用CPU。(缺点qt估计要时间学) 方案二:利用vscode、C,编写一个可执行exe文件&…...

无人机内存卡数据恢复
1.插入内存卡 2.选择对应的品牌 3.点击恢复 建议:发现数据打不开或者丢失情况,建议及时断电,以免影响数据的正常恢复! #无人机##数据恢复##储存卡#...

基于SSM的校园二手物品交易市场设计与实现
末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:Vue 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目:是 目录…...

Android14 Beta 5
Beta 5,这是 Android 14 Beta 计划中的最后一次计划更新。这是确保您的应用程序已准备就绪并在非 Beta 用户开始获取 Android 14 之前提供反馈的最后机会。为了使您能够在跨多种外形尺寸的设备上测试您的应用程序,Beta 5 适用于 Pixel Tablet 和 Pixel F…...

力扣labuladong——一刷day32
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、力扣654. 最大二叉树二、力扣105. 从前序与中序遍历序列构造二叉树三、力扣106. 从中序与后序遍历序列构造二叉树四、力扣889. 根据前序和后序遍历构造二叉…...

Day01_《MySQL索引与性能优化》摘要
一、资料 视频:《尚硅谷MySQL数据库高级,mysql优化,数据库优化》—周阳 其他博主的完整笔记:MySQL 我的笔记:我的笔记只总结了视频p14-p46部分,因为只有这部分是讲解了MySQL的索引与explain语句分析优化…...

BMS系统项目
1、通过电压监测是否冲满,通过电压可以监测是否放完电 电池得参数 单体过压(充满电) 过压恢复(百分之90多) 欠压保护(百分之几得电,快关机了) 欠压恢复(就是欠压之上…...

sql server 多行数据合并一行显示
在 SQL Server 中,可以使用 STUFF 和 FOR XML PATH 进行多行合并成一行。例如,假设有一个表名为 orders ,其中包含订单号和产品名称: order_idproduct_name1Product A1Product B2Product C2Product D 以下查询将在 order_id 列上…...

「我的AIGC咒语库:分享和AI对话交流的秘诀——如何利用Prompt和AI进行高效交流?」
文章目录 每日一句正能量前言基础介绍什么是Prompt?什么是 Prompt Engineering?为什么需要 Prompt Engineering?如何进行 Prompt Engineering?Prompt的基本原则Prompt的编写模式AI 可以帮助程序员做什么?技术知识总结拆解任务阅读…...

强国有我助力苔花绽放 | 爱心捐赠仪式在西安顺利举办
2023年11月2日,由中国儿童中心、全国少年儿童“双有”主题教育活动组委会、中华少年儿童慈善救助基金会强国有我项目主办,陕西省青少年宫协会、陕西省妇女儿童活动中心、陕西回归儿童救助中心承办的“苔花绽放”事实无人抚养儿童关爱计划捐赠仪式在陕西回…...

Flink SQL -- CheckPoint
1、开启CheckPoint checkpoint可以定时将flink任务的状态持久化到hdfs中,任务执行失败重启可以保证中间结果不丢失 # 修改flink配置文件 vim flink-conf.yaml# checkppint 间隔时间 execution.checkpointing.interval: 1min # 任务手动取消时保存checkpoint execu…...

Load-balanced-online-OJ-system 负载均衡的OJ系统项目
前言 那么这里博主先安利一些干货满满的专栏了! 首先是博主的高质量博客的汇总,这个专栏里面的博客,都是博主最最用心写的一部分,干货满满,希望对大家有帮助。 高质量博客汇总 本项目Github地址 - Load-balanced-o…...

ES6 导入导出
ES6 导入导出 ES6引入了原生的模块化支持,使得JavaScript代码可以被划分为可重用的模块。这些模块可以导出部分代码(如函数、对象、类等),并被其他模块导入使用。 export 命名导出(Named Exports) 可以从…...

【Liunx】部署Ansible自动化运维工具
Ansible自动化运维工具 概述安装部署1.通过yum下载Ansible2.对自己做免密配置3.修改ansiable host配置对服务器进行分组4.测试:对所有服务器进行ping命令5.写playbook6.执行我们写的playbook脚本7.验证 概述 ansible是新出现的自动化运维工具,基于Pytho…...

Python的基础语法
1. 注释:在Python中,使用井号(#)表示单行注释,三个单引号()或三个双引号(""")表示多行注释。 2. 变量:在Python中,不需要声明变量…...

Skywalking流程分析_8(拦截器插件的加载)
前言 在之前的文章中我们将,静态方法、构造方法、实例方法的增强逻辑都分析完毕,但在增强前,对于拦截类的加载是至关重要的,下面我们就来详细的分析 增强插件的加载 静态方法增强前的加载 //clazz 要修改的字节码的原生类 Sta…...

智能AI系统ChatGPT网站源码+支持OpenAI DALL-E3文生图+支持ai绘画(Midjourney)/支持GPT全模型+国内AI全模型
一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…...

腾讯云服务器可用区是什么意思?可用区选择方法
腾讯云服务器可用区是什么意思?云服务器可用区如何选择?可用区是指在同一个地域内电力和网络相互独立的区域,可用区可以做到故障隔离,所以可用区存在的意义在于构建高可用、高容灾应用,将应用部署在不同可用区内&#…...

Jupyter运行显存爆炸,明明上一个单元格已经运行完毕为什么还是会炸?
问题再现 上一个单元格运行完了train(),我想要用模型输出做点东西,可是提醒我显存不够; 在命令行中查看显存占用情况,发现4张卡都占满了,可真是太厉害了! 解决方案 查看原来写的validate(),发…...

【ICE】webrtc lite 1:cmake构建
p2ptransportchannel 是 ICE 实现基于此实现了DTLTransport而前者是独立的模块。依赖库较少主要是ssl absl OpenSSL Protobuf 可选 absl webrtc 不支持大端 :big endian architectures defined in WebRTC’s arch.h D_WINSOCKAPI_ 用来做啥? 以下编译选项: add_compile_opti…...

国内最受欢迎电商API接口调用淘宝商品详情API接口数据
国内实用的API接口 国内最受欢迎的7大API供应平台对比和介绍 本文将介绍7款API供应平台:聚合数据、百度APIStore、Apix、数说聚合、通联数据、HaoService、datasift 。排名不分先后! 免费实用的API接口 第一部分 1、电商数据(API数据接口_开…...

第五篇 基于JSP 技术的网上购书系统——主页面和登录页面实现(网上商城、仿淘宝、当当、亚马逊)
目录 1.系统主界面 1.1功能说明 1.2界面设计 1.3处理流程 1.4 数据来源和算法 1.4.1数据来源 1.4.2查询条件 1.4.3表间关系 1.4.4相关sql实例 2.系统登陆后界面 2.1功能说明 2.2界面设计 2.3处理流程 2.4数据来源和算法 2.4.1数据来源 2.4.2查询条件 2.4.…...

【 云原生 | K8S 】kubeadm 部署Kubernetes集群
目录 1 环境准备 2 所有节点安装docker 3 所有节点安装kubeadm,kubelet和kubectl 4 部署K8S集群 4.1 查看初始化需要的镜像 4.2 初始化kubeadm 4.3 设定kubectl 4.4 所有节点部署网络插件flannel master(2C/4G,cpu核心数要求大于2&am…...

微信小程序rich-text 文本首行缩进和图片居中和富文本rich-text 解析多个空格不成功 nbsp
微信小程序开发使用rich-text组件渲染html格式的代码,常常因为不能自定义css导致文本不能缩进,以及图片不能居中等问题,这里可以考虑使用js的replace方法,替换字符串,然后在渲染的同时加载行内样式。 //获取字符串的图…...

uniapp 设置重写uni-body-page样式,输入字母转大写,条形码扫描
uniapp 设置重写uni-body-page样式,输入字母转大写 一、重写uni-body-page样式 page{ }二、输入字母转大写 input标签设置样式: style"text-transform: uppercase;"绑定的值通过.toUpperCase()转大写 三、条形码扫描 // 调起条码扫描uni…...