当前位置: 首页 > news >正文

Reactor反应器模式

文章目录

    • 一、单线程Reactor反应器模式
    • 二、多线程Reactor反应器模式

在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。

为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。

这种模式的优点是解决了前面的新连接被严重阻塞的问题,在一定程度上极大地提高了服务器的吞吐量。但是对于大量的连接,需要消耗大量的现成资源,如果线程数太多,系统无法承受。而且线程的反复创建、销毁、线程的切换也需要代价。因此高并发应用场景下多线程OIO的缺陷是致命的,因此引入了Reactor反应器模式。

反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:

  1. Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器
  2. Handlers处理器的职责:非阻塞的执行业务处理逻辑

一、单线程Reactor反应器模式

Reactor反应器模式有点儿类似事件驱动模式,当有事件触发时,事件源会将事件dispatch分发到handler处理器进行事件处理。反应器模式中的反应器角色类似于事件驱动模式中的dispatcher事件分发器角色。

  • Reactor反应器:负责查询IO事件,当检测到一个IO时间,将其发送给对应的Handler处理器处理,这里的IO事件就是NIO选择器监控的通道IO事件。
  • Handler处理器:与IO事件绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。

基于NIO实现单线程版本的反应器模式需要用到SelectionKey选择键的几个重要的成员方法:

  1. void attach(Object o):将任何的Java对象作为附件添加到SelectionKey实例,主要是将Handler处理器实例作为附件添加到SelectionKey实例
  2. Object attachment():取出之前通过attach添加到SelectionKey选择键实例的附件,一般用于取出绑定的Handler处理器实例。

Reactor实现示例:

package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:29*/
public class Reactor implements Runnable {final private Selector selector;final private ServerSocketChannel serverSocketChannel;public Reactor() {try {this.selector = Selector.open();this.serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(8088));// 注册ServerSocket的accept事件SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 为事件绑定处理器sk.attach(new AcceptHandler());} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectedKey : selectionKeys) {dispatch(selectedKey);}selectionKeys.clear();}} catch (Exception e) {throw new RuntimeException(e);}}private void dispatch(SelectionKey selectedKey) {Runnable handler = (Runnable) selectedKey.attachment();// 此处返回的可能是AcceptHandler也可能是IOHandlerhandler.run();}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();if (socketChannel != null) {new IOHandler(selector, socketChannel); // 注册IO处理器,并将连接加入select列表}} catch (IOException e) {throw new RuntimeException(e);}}}public static void main(String[] args) {new Reactor().run();}
}

Handler实现示例:

package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:53*/
public class IOHandler implements Runnable {final private SocketChannel socketChannel;final private ByteBuffer buffer;public IOHandler(Selector selector, SocketChannel channel) {buffer = ByteBuffer.allocate(1024);socketChannel = channel;try {channel.configureBlocking(false);SelectionKey sk = channel.register(selector, 0); // 此处没有注册感兴趣的事件sk.attach(this);sk.interestOps(SelectionKey.OP_READ); // 注册感兴趣的事件,下一次调用select时才生效selector.wakeup(); // 立即唤醒当前阻塞select操作,使得迅速进入下次select,从而让上面注册的读事件监听可以立即生效} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {int length;while ((length = socketChannel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));}} catch (IOException e) {throw new RuntimeException(e);}}
}

在单线程反应器模式中,Reactor反应器和Handler处理器都执行在同一条线程上(dispatch方法是直接调用run方法,没有创建新的线程),因此当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。

二、多线程Reactor反应器模式

既然Reactor反应器和Handler处理器在一个线程会造成非常严重的性能缺陷,那么可以使用多线程对基础的反应器模式进行改造。

  1. 将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样业务处理线程与负责服务监听和IO时间查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。
  2. 如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器线程,同时引入多个选择器,每一个SubReactor子线程负责一个选择器。

MultiReactor:

package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 16:51*/
public class MultiReactor {private final ServerSocketChannel server;private final Selector[] selectors = new Selector[2];private final SubReactor[] reactors = new SubReactor[2];private final AtomicInteger index = new AtomicInteger(0);public MultiReactor() {try {server = ServerSocketChannel.open();selectors[0] = Selector.open();selectors[1] = Selector.open();server.bind(new InetSocketAddress(8080));server.configureBlocking(false);SelectionKey register = server.register(selectors[0], SelectionKey.OP_ACCEPT);register.attach(new AcceptHandler());reactors[0] = new SubReactor(selectors[0]);reactors[1] = new SubReactor(selectors[1]);} catch (IOException e) {throw new RuntimeException(e);}}private void startService() {new Thread(reactors[0]).start();new Thread(reactors[1]).start();}class SubReactor implements Runnable {final private Selector selector;public SubReactor(Selector selector) {this.selector = selector;}@Overridepublic void run() {while (!Thread.interrupted()) {try {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectionKey : selectionKeys) {dispatch(selectionKey);}selectionKeys.clear();} catch (IOException e) {throw new RuntimeException(e);}}}}private void dispatch(SelectionKey selectionKey) {Runnable attachment = (Runnable) selectionKey.attachment();if (attachment != null) {attachment.run();}}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = server.accept();new MultiHandler(selectors[index.getAndIncrement()], socketChannel);if (index.get() == selectors.length) {index.set(0);}} catch (IOException e) {throw new RuntimeException(e);}}}
}

MultiHandler:

package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 17:28*/
public class MultiHandler implements Runnable {final private Selector selector;final private SocketChannel channel;final ByteBuffer buffer = ByteBuffer.allocate(1024);static ExecutorService pool = Executors.newFixedThreadPool(4);public MultiHandler(Selector selector, SocketChannel channel) {this.selector = selector;this.channel = channel;try {channel.configureBlocking(false);SelectionKey register = channel.register(selector, SelectionKey.OP_READ);register.attach(this);selector.wakeup();} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {pool.execute(() -> {synchronized (this) {int length;try {while ((length = channel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));buffer.clear();}} catch (IOException e) {throw new RuntimeException(e);}}   });}
}

相关文章:

Reactor反应器模式

文章目录 一、单线程Reactor反应器模式二、多线程Reactor反应器模式 在Java的OIO编程中&#xff0c;最初和最原始的网络服务器程序使用一个while循环&#xff0c;不断地监听端口是否有新的连接&#xff0c;如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网…...

alibaba.fastjson的使用(六) -- JavaBean==》Json字符串、JSONObject、JSONArray

目录 1. JavaBean转 Json字符串 2. JavaBean转 JSONObject 3. List转JSONArray 在pom文件中引入依赖: <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.14</version></dependency&…...

uniapp 自定义导航栏

自定义导航栏 修改 pages.json 在 pages.json 中将 navigateionStyle 设为 custom 新建 systemInfo.js systemInfo.js 用来获取当前设备的机型系统信息&#xff0c;放在 common 目录下 /*** 此 js 文件管理关于当前设备的机型系统信息*/ const systemInfo function() {/***…...

查分小程序:一键查询成绩,班主任和家长的得力助手

作为一名老师&#xff0c;是否曾经为了让学生能够方便地查询成绩而烦恼&#xff1f;担心学生忘记密码&#xff1f;还是手动输入成绩太繁琐&#xff1f;今天&#xff0c;给大家分享一个超级实用的查分小程序&#xff0c;让成绩查询变得更轻松&#xff01; 什么是成绩查询系统&am…...

Linux内核驱动开发的步骤

Linux操作系统的内核是一个强大的、开源的操作系统内核&#xff0c;它为各种硬件设备提供支持。为了让硬件设备能够与Linux系统无缝协作&#xff0c;需要编写相应的内核驱动程序。本文将介绍Linux内核驱动开发的一般步骤&#xff0c;以帮助开发者了解如何创建自己的内核驱动。 …...

【Java 进阶篇】HTML DOM 事件详解

当用户在网页上点击按钮、输入文本、鼠标移动到某个区域或执行其他互动操作时&#xff0c;这些动作都可以触发事件。HTML DOM&#xff08;文档对象模型&#xff09;允许我们使用JavaScript来捕获、处理和响应这些事件&#xff0c;以实现网页的交互和动态性。本篇博客将围绕HTML…...

redis 从小白到大师系列

字符串 Redis 字符串数据类型 set 字符串 /*** 设置字符串*/ $t $redis->set(o1,o1); //返回true or false var_dump($t);get字符串 /*** 获取字符串*/ $t $redis->get(o1); //返回true or false var_dump($t);结果&#xff1a; string(2) “o1” 返回 key 中字符串…...

vue使用.filter方法检索数组中指定时间段内的数据

假设你有一个名为dataArray的数组&#xff0c;其中包含了你要筛选的数据。那么&#xff0c;你可以按照以下步骤进行筛选&#xff1a; 创建一个名为filteredArray的新数组&#xff0c;用于存储筛选后的结果。使用数组的filter方法遍历dataArray&#xff0c;并对每个元素应用筛选…...

Ubuntu 安装 npm 和 node

前言 最近学习VUE&#xff0c;在ubuntu 2204 上配置开发环境&#xff0c;涉及到npm node nodejs vue-Cli脚手架等内容&#xff0c;做以记录。 一、node nodejs npm nvm 区别 &#xff1f; node 是框架&#xff0c;类似python的解释器。nodejs 是编程语言&#xff0c;是js语言的…...

Matlab论文插图绘制模板第122期—函数折线图(fplot)

本期分享的是函数折线图的绘制模板。​ 所谓函数折线图&#xff0c;就是将自定义线函数进行可视化表达​。 先来看一下成品效果&#xff1a; 特别提示&#xff1a;本期内容『数据代码』已上传资源群中&#xff0c;加群的朋友请自行下载。有需要的朋友可以关注同名公号【阿昆的…...

IK分词器如何修改支持跨版本ES

一、问题描述&#xff1a;IK分词器版本和ES版本不一致&#xff0c;无法找到和自己ES版本匹配的分词器。 IK分词器&#xff0c;提供的插件版本&#xff0c;远赶不上ES的更新版本&#xff0c;在使用过程中&#xff0c;不一定能顺利的找到与自己使用的ES版本相对应。在ES集群中使用…...

Spring MVC常用十大注解

Spring MVC常用十大注解 一&#xff0c;什么要使用注解 使用注解可以简化配置&#xff0c;提高代码的可读性和可维护性。通过注解可以实现依赖注入&#xff0c;减少手动管理对象的代码量。注解还支持面向切面编程&#xff0c;实现切面、切入点和通知等。此外&#xff0c;注解提…...

二、【MyBatis】 MyBatis入门与简单使用

二、【MyBatis】 MyBatis入门与简单使用 二、【MyBatis】 MyBatis入门与简单使用一、什么是ORM二、为什么mybatis是半自动的ORM框架2.1 Hibernate优点2.2 Hibernate缺点2.3 MyBatis与Hibernate区别三、Mybatis快速入门3.1 项目引入Maven相关依赖3.2 创建测试数据库3.3 编写数据…...

基于DF模式的协作通信技术matlab性能仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1、DF概述 4.2、DF基本原理 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2013b 3.部分核心程序 clc; clear; close all; warning off; addpath(genpath(pwd))…...

Angular-01:基本架构

各种学习后的知识点整理归纳&#xff0c;非原创&#xff01; ① 概述 angular是一个使用HTML、CSS、TypeScript构建的客户端应用的框架&#xff0c;用来构建单页面应用程序。是一个重量级的框架&#xff0c;内部集成了大量开箱即用的功能模块。是为大型应用开发而设计&#xf…...

字符串划分

题目描述 给定一个小写字母组成的字符串s&#xff0c;请找出字符串中两个不同位置的字符作为分割点&#xff0c;使得字符串分成的三个连续子串且子串权重相等&#xff0c;注意子串不包含分割点。 若能找到满足条件的两个分割点&#xff0c;请输出这两个分割点在字符串中的位置…...

ImportError: /lib64/libstdc++.so.6: version `CXXABI_1.3.9‘ not found的解决方法

导致该错误的原因&#xff1a;gcc动态库版本太老了 解决方法&#xff1a; 1、编辑~/.bash_profile vim ~/.bash_profile 2、将anaconda3/lib的路径加入库文件的路径 LD_LIBRARY_PATH/your_path/anaconda3/lib:$LD_LIBRARY_PATH export LD_LIBRARY_PATH 3、重载~/.bash_pr…...

华为云全新上线Serverless应用中心,支持一键构建文生图应用

近日&#xff0c;华为云全新上线Serverless应用中心&#xff0c;提供了大量应用模板&#xff0c;让用户能够一键部署函数和周边依赖资源&#xff0c;节省部署时间&#xff0c;快速上手将应用部署到华为云函数计算FunctionGraph&#xff0c;并一键开通周边依赖资源。 本次Serve…...

scrapy的安装和使用

一、scrapy是什么&#xff1a;Scrapy是一个为了爬取网站数据&#xff0c;提取结构性数据而编写的应用框架&#xff0c;可以应用在包括数据挖掘&#xff0c;信息处理或存储历史数据等一系列的程序 二、scrapy的安装&#xff1a;pip install scrapy -i https://pypi.douban.com/…...

Kotlin中的异常处理

异常是在程序执行过程中出现的错误或意外情况&#xff0c;可以干扰程序的正常流程。在Kotlin中&#xff0c;我们可以通过异常处理机制来捕获和处理异常&#xff0c;以保证程序的稳定性。本篇博客将介绍异常的产生、捕获、定义、受检异常和finally关键字&#xff0c;并提供相应的…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻

在如今就业市场竞争日益激烈的背景下&#xff0c;越来越多的求职者将目光投向了日本及中日双语岗位。但是&#xff0c;一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧&#xff1f;面对生疏的日语交流环境&#xff0c;即便提前恶补了…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题&#xff1a; 下面创建一个简单的Flask RESTful API示例。首先&#xff0c;我们需要创建环境&#xff0c;安装必要的依赖&#xff0c;然后…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

九天毕昇深度学习平台 | 如何安装库?

pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子&#xff1a; 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...

【Post-process】【VBA】ETABS VBA FrameObj.GetNameList and write to EXCEL

ETABS API实战:导出框架元素数据到Excel 在结构工程师的日常工作中,经常需要从ETABS模型中提取框架元素信息进行后续分析。手动复制粘贴不仅耗时,还容易出错。今天我们来用简单的VBA代码实现自动化导出。 🎯 我们要实现什么? 一键点击,就能将ETABS中所有框架元素的基…...

热烈祝贺埃文科技正式加入可信数据空间发展联盟

2025年4月29日&#xff0c;在福州举办的第八届数字中国建设峰会“可信数据空间分论坛”上&#xff0c;可信数据空间发展联盟正式宣告成立。国家数据局党组书记、局长刘烈宏出席并致辞&#xff0c;强调该联盟是推进全国一体化数据市场建设的关键抓手。 郑州埃文科技有限公司&am…...

LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》

&#x1f9e0; LangChain 中 TextSplitter 的使用详解&#xff1a;从基础到进阶&#xff08;附代码&#xff09; 一、前言 在处理大规模文本数据时&#xff0c;特别是在构建知识库或进行大模型训练与推理时&#xff0c;文本切分&#xff08;Text Splitting&#xff09; 是一个…...