当前位置: 首页 > news >正文

Reactor反应器模式

文章目录

    • 一、单线程Reactor反应器模式
    • 二、多线程Reactor反应器模式

在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。

为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。

这种模式的优点是解决了前面的新连接被严重阻塞的问题,在一定程度上极大地提高了服务器的吞吐量。但是对于大量的连接,需要消耗大量的现成资源,如果线程数太多,系统无法承受。而且线程的反复创建、销毁、线程的切换也需要代价。因此高并发应用场景下多线程OIO的缺陷是致命的,因此引入了Reactor反应器模式。

反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:

  1. Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器
  2. Handlers处理器的职责:非阻塞的执行业务处理逻辑

一、单线程Reactor反应器模式

Reactor反应器模式有点儿类似事件驱动模式,当有事件触发时,事件源会将事件dispatch分发到handler处理器进行事件处理。反应器模式中的反应器角色类似于事件驱动模式中的dispatcher事件分发器角色。

  • Reactor反应器:负责查询IO事件,当检测到一个IO时间,将其发送给对应的Handler处理器处理,这里的IO事件就是NIO选择器监控的通道IO事件。
  • Handler处理器:与IO事件绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。

基于NIO实现单线程版本的反应器模式需要用到SelectionKey选择键的几个重要的成员方法:

  1. void attach(Object o):将任何的Java对象作为附件添加到SelectionKey实例,主要是将Handler处理器实例作为附件添加到SelectionKey实例
  2. Object attachment():取出之前通过attach添加到SelectionKey选择键实例的附件,一般用于取出绑定的Handler处理器实例。

Reactor实现示例:

package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:29*/
public class Reactor implements Runnable {final private Selector selector;final private ServerSocketChannel serverSocketChannel;public Reactor() {try {this.selector = Selector.open();this.serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(8088));// 注册ServerSocket的accept事件SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 为事件绑定处理器sk.attach(new AcceptHandler());} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectedKey : selectionKeys) {dispatch(selectedKey);}selectionKeys.clear();}} catch (Exception e) {throw new RuntimeException(e);}}private void dispatch(SelectionKey selectedKey) {Runnable handler = (Runnable) selectedKey.attachment();// 此处返回的可能是AcceptHandler也可能是IOHandlerhandler.run();}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();if (socketChannel != null) {new IOHandler(selector, socketChannel); // 注册IO处理器,并将连接加入select列表}} catch (IOException e) {throw new RuntimeException(e);}}}public static void main(String[] args) {new Reactor().run();}
}

Handler实现示例:

package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:53*/
public class IOHandler implements Runnable {final private SocketChannel socketChannel;final private ByteBuffer buffer;public IOHandler(Selector selector, SocketChannel channel) {buffer = ByteBuffer.allocate(1024);socketChannel = channel;try {channel.configureBlocking(false);SelectionKey sk = channel.register(selector, 0); // 此处没有注册感兴趣的事件sk.attach(this);sk.interestOps(SelectionKey.OP_READ); // 注册感兴趣的事件,下一次调用select时才生效selector.wakeup(); // 立即唤醒当前阻塞select操作,使得迅速进入下次select,从而让上面注册的读事件监听可以立即生效} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {int length;while ((length = socketChannel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));}} catch (IOException e) {throw new RuntimeException(e);}}
}

在单线程反应器模式中,Reactor反应器和Handler处理器都执行在同一条线程上(dispatch方法是直接调用run方法,没有创建新的线程),因此当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。

二、多线程Reactor反应器模式

既然Reactor反应器和Handler处理器在一个线程会造成非常严重的性能缺陷,那么可以使用多线程对基础的反应器模式进行改造。

  1. 将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样业务处理线程与负责服务监听和IO时间查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。
  2. 如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器线程,同时引入多个选择器,每一个SubReactor子线程负责一个选择器。

MultiReactor:

package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 16:51*/
public class MultiReactor {private final ServerSocketChannel server;private final Selector[] selectors = new Selector[2];private final SubReactor[] reactors = new SubReactor[2];private final AtomicInteger index = new AtomicInteger(0);public MultiReactor() {try {server = ServerSocketChannel.open();selectors[0] = Selector.open();selectors[1] = Selector.open();server.bind(new InetSocketAddress(8080));server.configureBlocking(false);SelectionKey register = server.register(selectors[0], SelectionKey.OP_ACCEPT);register.attach(new AcceptHandler());reactors[0] = new SubReactor(selectors[0]);reactors[1] = new SubReactor(selectors[1]);} catch (IOException e) {throw new RuntimeException(e);}}private void startService() {new Thread(reactors[0]).start();new Thread(reactors[1]).start();}class SubReactor implements Runnable {final private Selector selector;public SubReactor(Selector selector) {this.selector = selector;}@Overridepublic void run() {while (!Thread.interrupted()) {try {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectionKey : selectionKeys) {dispatch(selectionKey);}selectionKeys.clear();} catch (IOException e) {throw new RuntimeException(e);}}}}private void dispatch(SelectionKey selectionKey) {Runnable attachment = (Runnable) selectionKey.attachment();if (attachment != null) {attachment.run();}}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = server.accept();new MultiHandler(selectors[index.getAndIncrement()], socketChannel);if (index.get() == selectors.length) {index.set(0);}} catch (IOException e) {throw new RuntimeException(e);}}}
}

MultiHandler:

package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 17:28*/
public class MultiHandler implements Runnable {final private Selector selector;final private SocketChannel channel;final ByteBuffer buffer = ByteBuffer.allocate(1024);static ExecutorService pool = Executors.newFixedThreadPool(4);public MultiHandler(Selector selector, SocketChannel channel) {this.selector = selector;this.channel = channel;try {channel.configureBlocking(false);SelectionKey register = channel.register(selector, SelectionKey.OP_READ);register.attach(this);selector.wakeup();} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {pool.execute(() -> {synchronized (this) {int length;try {while ((length = channel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));buffer.clear();}} catch (IOException e) {throw new RuntimeException(e);}}   });}
}

相关文章:

Reactor反应器模式

文章目录 一、单线程Reactor反应器模式二、多线程Reactor反应器模式 在Java的OIO编程中&#xff0c;最初和最原始的网络服务器程序使用一个while循环&#xff0c;不断地监听端口是否有新的连接&#xff0c;如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网…...

alibaba.fastjson的使用(六) -- JavaBean==》Json字符串、JSONObject、JSONArray

目录 1. JavaBean转 Json字符串 2. JavaBean转 JSONObject 3. List转JSONArray 在pom文件中引入依赖: <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.14</version></dependency&…...

uniapp 自定义导航栏

自定义导航栏 修改 pages.json 在 pages.json 中将 navigateionStyle 设为 custom 新建 systemInfo.js systemInfo.js 用来获取当前设备的机型系统信息&#xff0c;放在 common 目录下 /*** 此 js 文件管理关于当前设备的机型系统信息*/ const systemInfo function() {/***…...

查分小程序:一键查询成绩,班主任和家长的得力助手

作为一名老师&#xff0c;是否曾经为了让学生能够方便地查询成绩而烦恼&#xff1f;担心学生忘记密码&#xff1f;还是手动输入成绩太繁琐&#xff1f;今天&#xff0c;给大家分享一个超级实用的查分小程序&#xff0c;让成绩查询变得更轻松&#xff01; 什么是成绩查询系统&am…...

Linux内核驱动开发的步骤

Linux操作系统的内核是一个强大的、开源的操作系统内核&#xff0c;它为各种硬件设备提供支持。为了让硬件设备能够与Linux系统无缝协作&#xff0c;需要编写相应的内核驱动程序。本文将介绍Linux内核驱动开发的一般步骤&#xff0c;以帮助开发者了解如何创建自己的内核驱动。 …...

【Java 进阶篇】HTML DOM 事件详解

当用户在网页上点击按钮、输入文本、鼠标移动到某个区域或执行其他互动操作时&#xff0c;这些动作都可以触发事件。HTML DOM&#xff08;文档对象模型&#xff09;允许我们使用JavaScript来捕获、处理和响应这些事件&#xff0c;以实现网页的交互和动态性。本篇博客将围绕HTML…...

redis 从小白到大师系列

字符串 Redis 字符串数据类型 set 字符串 /*** 设置字符串*/ $t $redis->set(o1,o1); //返回true or false var_dump($t);get字符串 /*** 获取字符串*/ $t $redis->get(o1); //返回true or false var_dump($t);结果&#xff1a; string(2) “o1” 返回 key 中字符串…...

vue使用.filter方法检索数组中指定时间段内的数据

假设你有一个名为dataArray的数组&#xff0c;其中包含了你要筛选的数据。那么&#xff0c;你可以按照以下步骤进行筛选&#xff1a; 创建一个名为filteredArray的新数组&#xff0c;用于存储筛选后的结果。使用数组的filter方法遍历dataArray&#xff0c;并对每个元素应用筛选…...

Ubuntu 安装 npm 和 node

前言 最近学习VUE&#xff0c;在ubuntu 2204 上配置开发环境&#xff0c;涉及到npm node nodejs vue-Cli脚手架等内容&#xff0c;做以记录。 一、node nodejs npm nvm 区别 &#xff1f; node 是框架&#xff0c;类似python的解释器。nodejs 是编程语言&#xff0c;是js语言的…...

Matlab论文插图绘制模板第122期—函数折线图(fplot)

本期分享的是函数折线图的绘制模板。​ 所谓函数折线图&#xff0c;就是将自定义线函数进行可视化表达​。 先来看一下成品效果&#xff1a; 特别提示&#xff1a;本期内容『数据代码』已上传资源群中&#xff0c;加群的朋友请自行下载。有需要的朋友可以关注同名公号【阿昆的…...

IK分词器如何修改支持跨版本ES

一、问题描述&#xff1a;IK分词器版本和ES版本不一致&#xff0c;无法找到和自己ES版本匹配的分词器。 IK分词器&#xff0c;提供的插件版本&#xff0c;远赶不上ES的更新版本&#xff0c;在使用过程中&#xff0c;不一定能顺利的找到与自己使用的ES版本相对应。在ES集群中使用…...

Spring MVC常用十大注解

Spring MVC常用十大注解 一&#xff0c;什么要使用注解 使用注解可以简化配置&#xff0c;提高代码的可读性和可维护性。通过注解可以实现依赖注入&#xff0c;减少手动管理对象的代码量。注解还支持面向切面编程&#xff0c;实现切面、切入点和通知等。此外&#xff0c;注解提…...

二、【MyBatis】 MyBatis入门与简单使用

二、【MyBatis】 MyBatis入门与简单使用 二、【MyBatis】 MyBatis入门与简单使用一、什么是ORM二、为什么mybatis是半自动的ORM框架2.1 Hibernate优点2.2 Hibernate缺点2.3 MyBatis与Hibernate区别三、Mybatis快速入门3.1 项目引入Maven相关依赖3.2 创建测试数据库3.3 编写数据…...

基于DF模式的协作通信技术matlab性能仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1、DF概述 4.2、DF基本原理 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2013b 3.部分核心程序 clc; clear; close all; warning off; addpath(genpath(pwd))…...

Angular-01:基本架构

各种学习后的知识点整理归纳&#xff0c;非原创&#xff01; ① 概述 angular是一个使用HTML、CSS、TypeScript构建的客户端应用的框架&#xff0c;用来构建单页面应用程序。是一个重量级的框架&#xff0c;内部集成了大量开箱即用的功能模块。是为大型应用开发而设计&#xf…...

字符串划分

题目描述 给定一个小写字母组成的字符串s&#xff0c;请找出字符串中两个不同位置的字符作为分割点&#xff0c;使得字符串分成的三个连续子串且子串权重相等&#xff0c;注意子串不包含分割点。 若能找到满足条件的两个分割点&#xff0c;请输出这两个分割点在字符串中的位置…...

ImportError: /lib64/libstdc++.so.6: version `CXXABI_1.3.9‘ not found的解决方法

导致该错误的原因&#xff1a;gcc动态库版本太老了 解决方法&#xff1a; 1、编辑~/.bash_profile vim ~/.bash_profile 2、将anaconda3/lib的路径加入库文件的路径 LD_LIBRARY_PATH/your_path/anaconda3/lib:$LD_LIBRARY_PATH export LD_LIBRARY_PATH 3、重载~/.bash_pr…...

华为云全新上线Serverless应用中心,支持一键构建文生图应用

近日&#xff0c;华为云全新上线Serverless应用中心&#xff0c;提供了大量应用模板&#xff0c;让用户能够一键部署函数和周边依赖资源&#xff0c;节省部署时间&#xff0c;快速上手将应用部署到华为云函数计算FunctionGraph&#xff0c;并一键开通周边依赖资源。 本次Serve…...

scrapy的安装和使用

一、scrapy是什么&#xff1a;Scrapy是一个为了爬取网站数据&#xff0c;提取结构性数据而编写的应用框架&#xff0c;可以应用在包括数据挖掘&#xff0c;信息处理或存储历史数据等一系列的程序 二、scrapy的安装&#xff1a;pip install scrapy -i https://pypi.douban.com/…...

Kotlin中的异常处理

异常是在程序执行过程中出现的错误或意外情况&#xff0c;可以干扰程序的正常流程。在Kotlin中&#xff0c;我们可以通过异常处理机制来捕获和处理异常&#xff0c;以保证程序的稳定性。本篇博客将介绍异常的产生、捕获、定义、受检异常和finally关键字&#xff0c;并提供相应的…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

云计算——弹性云计算器(ECS)

弹性云服务器&#xff1a;ECS 概述 云计算重构了ICT系统&#xff0c;云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台&#xff0c;包含如下主要概念。 ECS&#xff08;Elastic Cloud Server&#xff09;&#xff1a;即弹性云服务器&#xff0c;是云计算…...

【JavaEE】-- HTTP

1. HTTP是什么&#xff1f; HTTP&#xff08;全称为"超文本传输协议"&#xff09;是一种应用非常广泛的应用层协议&#xff0c;HTTP是基于TCP协议的一种应用层协议。 应用层协议&#xff1a;是计算机网络协议栈中最高层的协议&#xff0c;它定义了运行在不同主机上…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

《Playwright:微软的自动化测试工具详解》

Playwright 简介:声明内容来自网络&#xff0c;将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具&#xff0c;支持 Chrome、Firefox、Safari 等主流浏览器&#xff0c;提供多语言 API&#xff08;Python、JavaScript、Java、.NET&#xff09;。它的特点包括&a…...

聊聊 Pulsar:Producer 源码解析

一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台&#xff0c;以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中&#xff0c;Producer&#xff08;生产者&#xff09; 是连接客户端应用与消息队列的第一步。生产者…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

ElasticSearch搜索引擎之倒排索引及其底层算法

文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

大数据学习(132)-HIve数据分析

​​​​&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4…...