Reactor反应器模式
文章目录
- 一、单线程Reactor反应器模式
- 二、多线程Reactor反应器模式
在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。
为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。
这种模式的优点是解决了前面的新连接被严重阻塞的问题,在一定程度上极大地提高了服务器的吞吐量。但是对于大量的连接,需要消耗大量的现成资源,如果线程数太多,系统无法承受。而且线程的反复创建、销毁、线程的切换也需要代价。因此高并发应用场景下多线程OIO的缺陷是致命的,因此引入了Reactor反应器模式。
反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:
- Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器
- Handlers处理器的职责:非阻塞的执行业务处理逻辑
一、单线程Reactor反应器模式
Reactor反应器模式有点儿类似事件驱动模式,当有事件触发时,事件源会将事件dispatch分发到handler处理器进行事件处理。反应器模式中的反应器角色类似于事件驱动模式中的dispatcher事件分发器角色。
- Reactor反应器:负责查询IO事件,当检测到一个IO时间,将其发送给对应的Handler处理器处理,这里的IO事件就是NIO选择器监控的通道IO事件。
- Handler处理器:与IO事件绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。
基于NIO实现单线程版本的反应器模式需要用到SelectionKey选择键的几个重要的成员方法:
- void attach(Object o):将任何的Java对象作为附件添加到SelectionKey实例,主要是将Handler处理器实例作为附件添加到SelectionKey实例
- Object attachment():取出之前通过attach添加到SelectionKey选择键实例的附件,一般用于取出绑定的Handler处理器实例。
Reactor实现示例:
package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:29*/
public class Reactor implements Runnable {final private Selector selector;final private ServerSocketChannel serverSocketChannel;public Reactor() {try {this.selector = Selector.open();this.serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(8088));// 注册ServerSocket的accept事件SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 为事件绑定处理器sk.attach(new AcceptHandler());} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectedKey : selectionKeys) {dispatch(selectedKey);}selectionKeys.clear();}} catch (Exception e) {throw new RuntimeException(e);}}private void dispatch(SelectionKey selectedKey) {Runnable handler = (Runnable) selectedKey.attachment();// 此处返回的可能是AcceptHandler也可能是IOHandlerhandler.run();}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();if (socketChannel != null) {new IOHandler(selector, socketChannel); // 注册IO处理器,并将连接加入select列表}} catch (IOException e) {throw new RuntimeException(e);}}}public static void main(String[] args) {new Reactor().run();}
}
Handler实现示例:
package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:53*/
public class IOHandler implements Runnable {final private SocketChannel socketChannel;final private ByteBuffer buffer;public IOHandler(Selector selector, SocketChannel channel) {buffer = ByteBuffer.allocate(1024);socketChannel = channel;try {channel.configureBlocking(false);SelectionKey sk = channel.register(selector, 0); // 此处没有注册感兴趣的事件sk.attach(this);sk.interestOps(SelectionKey.OP_READ); // 注册感兴趣的事件,下一次调用select时才生效selector.wakeup(); // 立即唤醒当前阻塞select操作,使得迅速进入下次select,从而让上面注册的读事件监听可以立即生效} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {int length;while ((length = socketChannel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));}} catch (IOException e) {throw new RuntimeException(e);}}
}
在单线程反应器模式中,Reactor反应器和Handler处理器都执行在同一条线程上(dispatch方法是直接调用run方法,没有创建新的线程),因此当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。
二、多线程Reactor反应器模式
既然Reactor反应器和Handler处理器在一个线程会造成非常严重的性能缺陷,那么可以使用多线程对基础的反应器模式进行改造。
- 将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样业务处理线程与负责服务监听和IO时间查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。
- 如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器线程,同时引入多个选择器,每一个SubReactor子线程负责一个选择器。
MultiReactor:
package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 16:51*/
public class MultiReactor {private final ServerSocketChannel server;private final Selector[] selectors = new Selector[2];private final SubReactor[] reactors = new SubReactor[2];private final AtomicInteger index = new AtomicInteger(0);public MultiReactor() {try {server = ServerSocketChannel.open();selectors[0] = Selector.open();selectors[1] = Selector.open();server.bind(new InetSocketAddress(8080));server.configureBlocking(false);SelectionKey register = server.register(selectors[0], SelectionKey.OP_ACCEPT);register.attach(new AcceptHandler());reactors[0] = new SubReactor(selectors[0]);reactors[1] = new SubReactor(selectors[1]);} catch (IOException e) {throw new RuntimeException(e);}}private void startService() {new Thread(reactors[0]).start();new Thread(reactors[1]).start();}class SubReactor implements Runnable {final private Selector selector;public SubReactor(Selector selector) {this.selector = selector;}@Overridepublic void run() {while (!Thread.interrupted()) {try {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectionKey : selectionKeys) {dispatch(selectionKey);}selectionKeys.clear();} catch (IOException e) {throw new RuntimeException(e);}}}}private void dispatch(SelectionKey selectionKey) {Runnable attachment = (Runnable) selectionKey.attachment();if (attachment != null) {attachment.run();}}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = server.accept();new MultiHandler(selectors[index.getAndIncrement()], socketChannel);if (index.get() == selectors.length) {index.set(0);}} catch (IOException e) {throw new RuntimeException(e);}}}
}
MultiHandler:
package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 17:28*/
public class MultiHandler implements Runnable {final private Selector selector;final private SocketChannel channel;final ByteBuffer buffer = ByteBuffer.allocate(1024);static ExecutorService pool = Executors.newFixedThreadPool(4);public MultiHandler(Selector selector, SocketChannel channel) {this.selector = selector;this.channel = channel;try {channel.configureBlocking(false);SelectionKey register = channel.register(selector, SelectionKey.OP_READ);register.attach(this);selector.wakeup();} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {pool.execute(() -> {synchronized (this) {int length;try {while ((length = channel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));buffer.clear();}} catch (IOException e) {throw new RuntimeException(e);}} });}
}
相关文章:
Reactor反应器模式
文章目录 一、单线程Reactor反应器模式二、多线程Reactor反应器模式 在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网…...
alibaba.fastjson的使用(六) -- JavaBean==》Json字符串、JSONObject、JSONArray
目录 1. JavaBean转 Json字符串 2. JavaBean转 JSONObject 3. List转JSONArray 在pom文件中引入依赖: <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.14</version></dependency&…...
uniapp 自定义导航栏
自定义导航栏 修改 pages.json 在 pages.json 中将 navigateionStyle 设为 custom 新建 systemInfo.js systemInfo.js 用来获取当前设备的机型系统信息,放在 common 目录下 /*** 此 js 文件管理关于当前设备的机型系统信息*/ const systemInfo function() {/***…...
查分小程序:一键查询成绩,班主任和家长的得力助手
作为一名老师,是否曾经为了让学生能够方便地查询成绩而烦恼?担心学生忘记密码?还是手动输入成绩太繁琐?今天,给大家分享一个超级实用的查分小程序,让成绩查询变得更轻松! 什么是成绩查询系统&am…...
Linux内核驱动开发的步骤
Linux操作系统的内核是一个强大的、开源的操作系统内核,它为各种硬件设备提供支持。为了让硬件设备能够与Linux系统无缝协作,需要编写相应的内核驱动程序。本文将介绍Linux内核驱动开发的一般步骤,以帮助开发者了解如何创建自己的内核驱动。 …...
【Java 进阶篇】HTML DOM 事件详解
当用户在网页上点击按钮、输入文本、鼠标移动到某个区域或执行其他互动操作时,这些动作都可以触发事件。HTML DOM(文档对象模型)允许我们使用JavaScript来捕获、处理和响应这些事件,以实现网页的交互和动态性。本篇博客将围绕HTML…...
redis 从小白到大师系列
字符串 Redis 字符串数据类型 set 字符串 /*** 设置字符串*/ $t $redis->set(o1,o1); //返回true or false var_dump($t);get字符串 /*** 获取字符串*/ $t $redis->get(o1); //返回true or false var_dump($t);结果: string(2) “o1” 返回 key 中字符串…...
vue使用.filter方法检索数组中指定时间段内的数据
假设你有一个名为dataArray的数组,其中包含了你要筛选的数据。那么,你可以按照以下步骤进行筛选: 创建一个名为filteredArray的新数组,用于存储筛选后的结果。使用数组的filter方法遍历dataArray,并对每个元素应用筛选…...
Ubuntu 安装 npm 和 node
前言 最近学习VUE,在ubuntu 2204 上配置开发环境,涉及到npm node nodejs vue-Cli脚手架等内容,做以记录。 一、node nodejs npm nvm 区别 ? node 是框架,类似python的解释器。nodejs 是编程语言,是js语言的…...
Matlab论文插图绘制模板第122期—函数折线图(fplot)
本期分享的是函数折线图的绘制模板。 所谓函数折线图,就是将自定义线函数进行可视化表达。 先来看一下成品效果: 特别提示:本期内容『数据代码』已上传资源群中,加群的朋友请自行下载。有需要的朋友可以关注同名公号【阿昆的…...
IK分词器如何修改支持跨版本ES
一、问题描述:IK分词器版本和ES版本不一致,无法找到和自己ES版本匹配的分词器。 IK分词器,提供的插件版本,远赶不上ES的更新版本,在使用过程中,不一定能顺利的找到与自己使用的ES版本相对应。在ES集群中使用…...
Spring MVC常用十大注解
Spring MVC常用十大注解 一,什么要使用注解 使用注解可以简化配置,提高代码的可读性和可维护性。通过注解可以实现依赖注入,减少手动管理对象的代码量。注解还支持面向切面编程,实现切面、切入点和通知等。此外,注解提…...
二、【MyBatis】 MyBatis入门与简单使用
二、【MyBatis】 MyBatis入门与简单使用 二、【MyBatis】 MyBatis入门与简单使用一、什么是ORM二、为什么mybatis是半自动的ORM框架2.1 Hibernate优点2.2 Hibernate缺点2.3 MyBatis与Hibernate区别三、Mybatis快速入门3.1 项目引入Maven相关依赖3.2 创建测试数据库3.3 编写数据…...
基于DF模式的协作通信技术matlab性能仿真
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1、DF概述 4.2、DF基本原理 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2013b 3.部分核心程序 clc; clear; close all; warning off; addpath(genpath(pwd))…...
Angular-01:基本架构
各种学习后的知识点整理归纳,非原创! ① 概述 angular是一个使用HTML、CSS、TypeScript构建的客户端应用的框架,用来构建单页面应用程序。是一个重量级的框架,内部集成了大量开箱即用的功能模块。是为大型应用开发而设计…...
字符串划分
题目描述 给定一个小写字母组成的字符串s,请找出字符串中两个不同位置的字符作为分割点,使得字符串分成的三个连续子串且子串权重相等,注意子串不包含分割点。 若能找到满足条件的两个分割点,请输出这两个分割点在字符串中的位置…...
ImportError: /lib64/libstdc++.so.6: version `CXXABI_1.3.9‘ not found的解决方法
导致该错误的原因:gcc动态库版本太老了 解决方法: 1、编辑~/.bash_profile vim ~/.bash_profile 2、将anaconda3/lib的路径加入库文件的路径 LD_LIBRARY_PATH/your_path/anaconda3/lib:$LD_LIBRARY_PATH export LD_LIBRARY_PATH 3、重载~/.bash_pr…...
华为云全新上线Serverless应用中心,支持一键构建文生图应用
近日,华为云全新上线Serverless应用中心,提供了大量应用模板,让用户能够一键部署函数和周边依赖资源,节省部署时间,快速上手将应用部署到华为云函数计算FunctionGraph,并一键开通周边依赖资源。 本次Serve…...
scrapy的安装和使用
一、scrapy是什么:Scrapy是一个为了爬取网站数据,提取结构性数据而编写的应用框架,可以应用在包括数据挖掘,信息处理或存储历史数据等一系列的程序 二、scrapy的安装:pip install scrapy -i https://pypi.douban.com/…...
Kotlin中的异常处理
异常是在程序执行过程中出现的错误或意外情况,可以干扰程序的正常流程。在Kotlin中,我们可以通过异常处理机制来捕获和处理异常,以保证程序的稳定性。本篇博客将介绍异常的产生、捕获、定义、受检异常和finally关键字,并提供相应的…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解
JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用,结合SQLite数据库实现联系人管理功能,并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能,同时可以最小化到系统…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...
并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
Webpack性能优化:构建速度与体积优化策略
一、构建速度优化 1、升级Webpack和Node.js 优化效果:Webpack 4比Webpack 3构建时间降低60%-98%。原因: V8引擎优化(for of替代forEach、Map/Set替代Object)。默认使用更快的md4哈希算法。AST直接从Loa…...
Git常用命令完全指南:从入门到精通
Git常用命令完全指南:从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...
