Reactor反应器模式
文章目录
- 一、单线程Reactor反应器模式
- 二、多线程Reactor反应器模式
在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。
为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。
这种模式的优点是解决了前面的新连接被严重阻塞的问题,在一定程度上极大地提高了服务器的吞吐量。但是对于大量的连接,需要消耗大量的现成资源,如果线程数太多,系统无法承受。而且线程的反复创建、销毁、线程的切换也需要代价。因此高并发应用场景下多线程OIO的缺陷是致命的,因此引入了Reactor反应器模式。
反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:
- Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器
- Handlers处理器的职责:非阻塞的执行业务处理逻辑
一、单线程Reactor反应器模式
Reactor反应器模式有点儿类似事件驱动模式,当有事件触发时,事件源会将事件dispatch分发到handler处理器进行事件处理。反应器模式中的反应器角色类似于事件驱动模式中的dispatcher事件分发器角色。
- Reactor反应器:负责查询IO事件,当检测到一个IO时间,将其发送给对应的Handler处理器处理,这里的IO事件就是NIO选择器监控的通道IO事件。
- Handler处理器:与IO事件绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。
基于NIO实现单线程版本的反应器模式需要用到SelectionKey选择键的几个重要的成员方法:
- void attach(Object o):将任何的Java对象作为附件添加到SelectionKey实例,主要是将Handler处理器实例作为附件添加到SelectionKey实例
- Object attachment():取出之前通过attach添加到SelectionKey选择键实例的附件,一般用于取出绑定的Handler处理器实例。
Reactor实现示例:
package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:29*/
public class Reactor implements Runnable {final private Selector selector;final private ServerSocketChannel serverSocketChannel;public Reactor() {try {this.selector = Selector.open();this.serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(8088));// 注册ServerSocket的accept事件SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 为事件绑定处理器sk.attach(new AcceptHandler());} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectedKey : selectionKeys) {dispatch(selectedKey);}selectionKeys.clear();}} catch (Exception e) {throw new RuntimeException(e);}}private void dispatch(SelectionKey selectedKey) {Runnable handler = (Runnable) selectedKey.attachment();// 此处返回的可能是AcceptHandler也可能是IOHandlerhandler.run();}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();if (socketChannel != null) {new IOHandler(selector, socketChannel); // 注册IO处理器,并将连接加入select列表}} catch (IOException e) {throw new RuntimeException(e);}}}public static void main(String[] args) {new Reactor().run();}
}
Handler实现示例:
package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:53*/
public class IOHandler implements Runnable {final private SocketChannel socketChannel;final private ByteBuffer buffer;public IOHandler(Selector selector, SocketChannel channel) {buffer = ByteBuffer.allocate(1024);socketChannel = channel;try {channel.configureBlocking(false);SelectionKey sk = channel.register(selector, 0); // 此处没有注册感兴趣的事件sk.attach(this);sk.interestOps(SelectionKey.OP_READ); // 注册感兴趣的事件,下一次调用select时才生效selector.wakeup(); // 立即唤醒当前阻塞select操作,使得迅速进入下次select,从而让上面注册的读事件监听可以立即生效} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {int length;while ((length = socketChannel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));}} catch (IOException e) {throw new RuntimeException(e);}}
}
在单线程反应器模式中,Reactor反应器和Handler处理器都执行在同一条线程上(dispatch方法是直接调用run方法,没有创建新的线程),因此当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。
二、多线程Reactor反应器模式
既然Reactor反应器和Handler处理器在一个线程会造成非常严重的性能缺陷,那么可以使用多线程对基础的反应器模式进行改造。
- 将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样业务处理线程与负责服务监听和IO时间查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。
- 如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器线程,同时引入多个选择器,每一个SubReactor子线程负责一个选择器。
MultiReactor:
package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 16:51*/
public class MultiReactor {private final ServerSocketChannel server;private final Selector[] selectors = new Selector[2];private final SubReactor[] reactors = new SubReactor[2];private final AtomicInteger index = new AtomicInteger(0);public MultiReactor() {try {server = ServerSocketChannel.open();selectors[0] = Selector.open();selectors[1] = Selector.open();server.bind(new InetSocketAddress(8080));server.configureBlocking(false);SelectionKey register = server.register(selectors[0], SelectionKey.OP_ACCEPT);register.attach(new AcceptHandler());reactors[0] = new SubReactor(selectors[0]);reactors[1] = new SubReactor(selectors[1]);} catch (IOException e) {throw new RuntimeException(e);}}private void startService() {new Thread(reactors[0]).start();new Thread(reactors[1]).start();}class SubReactor implements Runnable {final private Selector selector;public SubReactor(Selector selector) {this.selector = selector;}@Overridepublic void run() {while (!Thread.interrupted()) {try {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectionKey : selectionKeys) {dispatch(selectionKey);}selectionKeys.clear();} catch (IOException e) {throw new RuntimeException(e);}}}}private void dispatch(SelectionKey selectionKey) {Runnable attachment = (Runnable) selectionKey.attachment();if (attachment != null) {attachment.run();}}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = server.accept();new MultiHandler(selectors[index.getAndIncrement()], socketChannel);if (index.get() == selectors.length) {index.set(0);}} catch (IOException e) {throw new RuntimeException(e);}}}
}
MultiHandler:
package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 17:28*/
public class MultiHandler implements Runnable {final private Selector selector;final private SocketChannel channel;final ByteBuffer buffer = ByteBuffer.allocate(1024);static ExecutorService pool = Executors.newFixedThreadPool(4);public MultiHandler(Selector selector, SocketChannel channel) {this.selector = selector;this.channel = channel;try {channel.configureBlocking(false);SelectionKey register = channel.register(selector, SelectionKey.OP_READ);register.attach(this);selector.wakeup();} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {pool.execute(() -> {synchronized (this) {int length;try {while ((length = channel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));buffer.clear();}} catch (IOException e) {throw new RuntimeException(e);}} });}
}
相关文章:
Reactor反应器模式
文章目录 一、单线程Reactor反应器模式二、多线程Reactor反应器模式 在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网…...
alibaba.fastjson的使用(六) -- JavaBean==》Json字符串、JSONObject、JSONArray
目录 1. JavaBean转 Json字符串 2. JavaBean转 JSONObject 3. List转JSONArray 在pom文件中引入依赖: <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.14</version></dependency&…...

uniapp 自定义导航栏
自定义导航栏 修改 pages.json 在 pages.json 中将 navigateionStyle 设为 custom 新建 systemInfo.js systemInfo.js 用来获取当前设备的机型系统信息,放在 common 目录下 /*** 此 js 文件管理关于当前设备的机型系统信息*/ const systemInfo function() {/***…...

查分小程序:一键查询成绩,班主任和家长的得力助手
作为一名老师,是否曾经为了让学生能够方便地查询成绩而烦恼?担心学生忘记密码?还是手动输入成绩太繁琐?今天,给大家分享一个超级实用的查分小程序,让成绩查询变得更轻松! 什么是成绩查询系统&am…...
Linux内核驱动开发的步骤
Linux操作系统的内核是一个强大的、开源的操作系统内核,它为各种硬件设备提供支持。为了让硬件设备能够与Linux系统无缝协作,需要编写相应的内核驱动程序。本文将介绍Linux内核驱动开发的一般步骤,以帮助开发者了解如何创建自己的内核驱动。 …...

【Java 进阶篇】HTML DOM 事件详解
当用户在网页上点击按钮、输入文本、鼠标移动到某个区域或执行其他互动操作时,这些动作都可以触发事件。HTML DOM(文档对象模型)允许我们使用JavaScript来捕获、处理和响应这些事件,以实现网页的交互和动态性。本篇博客将围绕HTML…...

redis 从小白到大师系列
字符串 Redis 字符串数据类型 set 字符串 /*** 设置字符串*/ $t $redis->set(o1,o1); //返回true or false var_dump($t);get字符串 /*** 获取字符串*/ $t $redis->get(o1); //返回true or false var_dump($t);结果: string(2) “o1” 返回 key 中字符串…...
vue使用.filter方法检索数组中指定时间段内的数据
假设你有一个名为dataArray的数组,其中包含了你要筛选的数据。那么,你可以按照以下步骤进行筛选: 创建一个名为filteredArray的新数组,用于存储筛选后的结果。使用数组的filter方法遍历dataArray,并对每个元素应用筛选…...

Ubuntu 安装 npm 和 node
前言 最近学习VUE,在ubuntu 2204 上配置开发环境,涉及到npm node nodejs vue-Cli脚手架等内容,做以记录。 一、node nodejs npm nvm 区别 ? node 是框架,类似python的解释器。nodejs 是编程语言,是js语言的…...

Matlab论文插图绘制模板第122期—函数折线图(fplot)
本期分享的是函数折线图的绘制模板。 所谓函数折线图,就是将自定义线函数进行可视化表达。 先来看一下成品效果: 特别提示:本期内容『数据代码』已上传资源群中,加群的朋友请自行下载。有需要的朋友可以关注同名公号【阿昆的…...

IK分词器如何修改支持跨版本ES
一、问题描述:IK分词器版本和ES版本不一致,无法找到和自己ES版本匹配的分词器。 IK分词器,提供的插件版本,远赶不上ES的更新版本,在使用过程中,不一定能顺利的找到与自己使用的ES版本相对应。在ES集群中使用…...

Spring MVC常用十大注解
Spring MVC常用十大注解 一,什么要使用注解 使用注解可以简化配置,提高代码的可读性和可维护性。通过注解可以实现依赖注入,减少手动管理对象的代码量。注解还支持面向切面编程,实现切面、切入点和通知等。此外,注解提…...
二、【MyBatis】 MyBatis入门与简单使用
二、【MyBatis】 MyBatis入门与简单使用 二、【MyBatis】 MyBatis入门与简单使用一、什么是ORM二、为什么mybatis是半自动的ORM框架2.1 Hibernate优点2.2 Hibernate缺点2.3 MyBatis与Hibernate区别三、Mybatis快速入门3.1 项目引入Maven相关依赖3.2 创建测试数据库3.3 编写数据…...

基于DF模式的协作通信技术matlab性能仿真
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1、DF概述 4.2、DF基本原理 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2013b 3.部分核心程序 clc; clear; close all; warning off; addpath(genpath(pwd))…...
Angular-01:基本架构
各种学习后的知识点整理归纳,非原创! ① 概述 angular是一个使用HTML、CSS、TypeScript构建的客户端应用的框架,用来构建单页面应用程序。是一个重量级的框架,内部集成了大量开箱即用的功能模块。是为大型应用开发而设计…...
字符串划分
题目描述 给定一个小写字母组成的字符串s,请找出字符串中两个不同位置的字符作为分割点,使得字符串分成的三个连续子串且子串权重相等,注意子串不包含分割点。 若能找到满足条件的两个分割点,请输出这两个分割点在字符串中的位置…...
ImportError: /lib64/libstdc++.so.6: version `CXXABI_1.3.9‘ not found的解决方法
导致该错误的原因:gcc动态库版本太老了 解决方法: 1、编辑~/.bash_profile vim ~/.bash_profile 2、将anaconda3/lib的路径加入库文件的路径 LD_LIBRARY_PATH/your_path/anaconda3/lib:$LD_LIBRARY_PATH export LD_LIBRARY_PATH 3、重载~/.bash_pr…...

华为云全新上线Serverless应用中心,支持一键构建文生图应用
近日,华为云全新上线Serverless应用中心,提供了大量应用模板,让用户能够一键部署函数和周边依赖资源,节省部署时间,快速上手将应用部署到华为云函数计算FunctionGraph,并一键开通周边依赖资源。 本次Serve…...

scrapy的安装和使用
一、scrapy是什么:Scrapy是一个为了爬取网站数据,提取结构性数据而编写的应用框架,可以应用在包括数据挖掘,信息处理或存储历史数据等一系列的程序 二、scrapy的安装:pip install scrapy -i https://pypi.douban.com/…...
Kotlin中的异常处理
异常是在程序执行过程中出现的错误或意外情况,可以干扰程序的正常流程。在Kotlin中,我们可以通过异常处理机制来捕获和处理异常,以保证程序的稳定性。本篇博客将介绍异常的产生、捕获、定义、受检异常和finally关键字,并提供相应的…...

Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...

深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...

STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
五年级数学知识边界总结思考-下册
目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...
工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配
AI3D视觉的工业赋能者 迁移科技成立于2017年,作为行业领先的3D工业相机及视觉系统供应商,累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成,通过稳定、易用、高回报的AI3D视觉系统,为汽车、新能源、金属制造等行…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...

华为OD机考-机房布局
import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...