当前位置: 首页 > news >正文

Netty入门指南之Reactor模型

作者简介:☕️大家好,我是Aomsir,一个爱折腾的开发者!
个人主页:Aomsir_Spring5应用专栏,Netty应用专栏,RPC应用专栏-CSDN博客
当前专栏:Netty应用专栏_Aomsir的博客-CSDN博客

文章目录

  • 参考文献
  • 前言
  • 单线程Reactor模型
  • 主从式Reactor模型
    • 多线程知识扫盲
    • Worker线程
    • Boss线程
    • 客户端
  • 总结

参考文献

  • 孙哥suns说Netty
  • Netty官方文档

前言

在我们之前的文章中,我们详细地探讨了Java NIO和Selector的相关内容,这为我们进一步的学习打下了坚实的基础。从本篇文章开始,我们将深入学习并理解Reactor模型

单线程Reactor模型

在前两篇文章,我们使用Selector去监控Channel的ACCEPT事件、WRITE事件、READ事件等等,监听到以后就在当前线程进行处理,这已经是一个单线程的Reactor的模型,Selector来进行分发,起到一个多路复用器的作用,但是这个还远远不够,怎么能只让一个线程来同时处理ACCEPT、WRITE和READ,所以就有了我们后面的主从式
在这里插入图片描述

主从式Reactor模型

谈起主从架构,也就是master-slave,它是主节点做一部分内容,从节点做另外一部分的内容,主从都干活,但是干的活内容不一样,比如我们常见的MySQLRedis,它们的读写分离就是主从式架构。
还有一种架构是主备架构(Master-Backup),这种架构就是主挂了以后,从起作用,干的活是一样的,比如Redis中的哨兵机制

结合如下图例更为详细的理解主从式Reactor模型,我们的Boss和Worker都是不同的线程,甚至在实战过程中会是不同的服务器。Boss线程主要用于接收Accept请求,去与客户端建立SocketChannel连接,Worker线程主要去处理实际的读写操作。我们需要把单线程Reactor模型中的sc.register(selector, SelectionKey.OP_READ)转移到Worker线程中去。
在这里插入图片描述

多线程知识扫盲

在接下来的学习中,我们将使用NIO和Selector来实现一个主从Reactor模型。这需要我们具备一定的多线程知识,因此这里我会为你简单介绍一下Java中的多线程。

在Java中,我们通常通过Thread类来创建和管理新的线程。在实际开发中,我们可以创建一个新的类,让它继承Thread类,并重写其run方法。在这个run方法中,我们可以编写自己的多线程任务逻辑。但是,Java的类只能单继承,这在某些情况下可能会对我们的系统设计造成限制。

因此,Java还为我们提供了另一种创建线程的方式,即通过实现Runnable接口。我们可以自定义一个类,让它实现Runnable接口,并重写其run方法,在这个方法中编写我们的多线程任务逻辑。然后,我们可以将这个Runnable实现类的对象传递给Thread类的构造方法,从而创建Thread类的对象。这种方式的优点是,我们不再需要直接继承Thread类来实现多线程任务,而是可以将任务逻辑封装在实现了Runnable接口的类中。这样,我们的类就可以在保持多线程功能的同时,也能继承其他类,从而提供更大的设计灵活性。

由于Java的Thread类实现了Runnable接口,我们可以在设计系统时,采用以下策略:在Runnable的实现类中,添加一个Thread类型的属性,并提供一个register方法。在这个register方法中,我们可以初始化Thread属性,直接将当前类对象(Runnable实现类)传入Thread构造方法进行初始化,然后启动线程。这样我们就可以直接在Runnable内部直接进行线程任务逻辑等,而外部只需要提供一个Runnable接口实现类,线程的创建和启动等都在Runnable接口内部进行操作,封装度更高也更灵活

⚠️注意

  • 启动多线程任务是通过Thread#start()方法,而不是通过Thread#run()方法,调用start方法以后,CPU的时间片也不会立马分配给这个线程
  • 除了Thread#run()和Runnable#run()方法内的代码是属于多线程的,其余的都是main线程,包括Runnable实现类中的自定义方法
  • CPU时间片不一定会等待主线程某个方法完全执行才切换给别的线程,但它一定会等一个代码块执行完,比如if
public class MyThread extends Thread{@Overridepublic void run() {for (int i = 0; i < 100; i++) {System.out.println("线程任务逻辑" + i);}}
}
public class MyRunnable implements Runnable{@Overridepublic void run() {for (int i = 0; i < 100; i++) {System.out.println("线程任务逻辑" + " " + i);}}
}public class RunnableTest {public static void main(String[] args) {// 创建任务对象MyRunnable myRunnable = new MyRunnable();// 创建线程对象,并将任务传递进去Thread t1 = new Thread(myRunnable);// 启动线程t1.start();for (int i = 0; i < 100; i++) {System.out.println("main线程" + " " + i);}}
}

Worker线程

这是我们的Worker线程,用于处理客户端与服务端的读写,在我们这个案例中,所有的读写都交给这些worker线程,主线程不管具体的写。如果是单核CPU,那么时间片会不停的在这些时间片时间轮转,而如果是多核CPU,那直接主线程用于处理连接,多个worker线程用于处理具体的读写

在下面这个Worker模型中,我们Worker是一个Runnable实现类,其中包含一个Thread类型的属性,在register方法中对它进行初始化,将实现类本类对象传入,代表后面这个thread对象调用的任务是实现类中重写的run方法的逻辑。

⚠️注意

  • 主线程和Worker线程维护不同的selector,以免出现污染,主线程的selector监控ServerSocketChannel的ACCEPT事件,Worker线程的selector监控注册在对应线程上的SocketChannel的READ/WRITE事件
  • register方法属于主线程,如果等初始化完还没将SocketChannel注册到这个线程的Selector上,就去执行Worker线程的run方法,那selector就会成为阻塞状态,当CPU时间片切换回主线程,就会注册不上,成为一个死锁状态。
  • 为了解决上面这个问题,我们需要将注册这部分的代码放在任务队列里进行传递,但是阻塞问题还是存在,所以我们将selector唤醒,不让其阻塞。当时间片切换到Worker线程,select方法就不会阻塞,运行循环下面的代码,将注册的代码取出来运行,然后处理读写

☹️难点

  • 时间片在sc未注册到selector时就切换给worker线程导致selector阻塞,然后导致阻塞
  • 使用任务队列传递代码同时需要唤醒selector
  • 思路的转变
public class Worker implements Runnable {private static final Logger log = LoggerFactory.getLogger(Worker.class);// 一个线程对应一个selector,以免污染private Selector selector;// 线程Thread对象private Thread thread;// 线程名private String name;// 通过volatile进行线程同步private volatile boolean isCreated;// 任务队列private ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue();// 构造器public Worker(String name) {this.name = name;}// 线程任务(此段还属于主线程!!!)public void register(SocketChannel sc) throws IOException,InterruptedException {log.debug("worker register invoke...");// 对于一个Runnable对象,被调用register后// isCreated标志位就会被置为true// 注意:CPU等这个if代码块执行结束才有可能被调度到worker线程if (!isCreated) {thread = new Thread(this, name);// 调了start,不会立马分配资源(除非抢夺)thread.start();selector = Selector.open();isCreated = true;}// 模拟此处时间片分给worker线程// worker线程进入run方法,开始阻塞监听// selector就会一直阻塞在select方法上,时间片切换回主线程也无法注册// Thread.sleep(1000);// 任务队列:将main线程中注册的代码传递给worker线程runnables.add(() -> {try {sc.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {throw new RuntimeException(e);}});// 唤醒阻塞在select方法上的worker线程// 这样时间片切换到worker线程就直接跳过select方法selector.wakeup();}/*** 线程任务:实际处理读写操作*/@Overridepublic void run() {while (true) {log.debug("worker run method invoke...");try {// 阻塞监听SocketChannel的OP_READselector.select();// 从任务队列中取出任务执行Runnable poll = runnables.poll();if (poll != null) {// 执行注册的步骤poll.run();}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey scKey = iterator.next();iterator.remove();if (scKey.isReadable()) {SocketChannel sc = (SocketChannel) scKey.channel();ByteBuffer buffer = ByteBuffer.allocate(30);int read = sc.read(buffer);if (read == -1) {scKey.cancel();break;}buffer.flip();String result = Charset.defaultCharset().decode(buffer).toString();System.out.println("result = " + result);}}} catch (IOException e) {throw new RuntimeException(e);}}}
}

Boss线程

Boss线程用来监听ServerSocketChannel的ACCEPT事件,监听到了以后将其传递给Worker线程去注册和监听处理,注意线程池只有两个Worker线程,为了保证每一个新进来的SocketChannel都被注册到与前一个线程不同的线程上,这里使用AtomicInteger原子操作类来处理

public class ReactorBossServer {private static final Logger log = LoggerFactory.getLogger(ReactorBossServer.class);public static void main(String[] args) throws Exception{log.debug("boss thread start...");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);ssc.bind(new InetSocketAddress(8000));Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);// 模拟任务池,将任务对象进行创建Worker[] workers = new Worker[2];for (int i = 0; i < workers.length; i++) {// Worker worker = new Worker("worker1");workers[i] = new Worker("worker - " + i);}// 原子操作类AtomicInteger index = new AtomicInteger();while (true) {// 阻塞等待Channel的事件的触发selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey sscSelectionKey = iterator.next();iterator.remove();// 如果是ACCEPT请求则进行处理,交给worker线程处理if (sscSelectionKey.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("boss invoke worker register...");// hash取模  x%2 = 0|1// 通过原子类确保sc每次进来注册给不同的workerworkers[index.getAndIncrement() % workers.length].register(sc);log.debug("boss invoke worker register...");}}}}
}

客户端

public class MyClient {private static final Logger log = LoggerFactory.getLogger(MyClient.class);public static void main(String[] args) throws Exception{// 1、创建客户端channel,并连接服务端SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000));socketChannel.write(Charset.defaultCharset().encode("hello\n"));System.out.println("-------------------------------------");}
}

总结

这篇文章的阅读绝对值得你的精心研读。Netty的基础建立在NIO之上,如果单纯的学习Netty,你可能只会看到一堆的API,而无法深入理解其背后的设计原则和工作机制。

然而,本篇文章从设计理念到编码实践,详细剖析了Reactor模型,为Netty的学习铺平了道路。这不仅帮助你理解Netty的运作方式,更能让你洞察其背后的设计哲学,使你在学习时不仅知其然,更能知其所以然。因此,这篇文章对于深化你对Netty的理解,研究其内部工作原理,无疑具有极大的价值

相关文章:

Netty入门指南之Reactor模型

作者简介&#xff1a;☕️大家好&#xff0c;我是Aomsir&#xff0c;一个爱折腾的开发者&#xff01; 个人主页&#xff1a;Aomsir_Spring5应用专栏,Netty应用专栏,RPC应用专栏-CSDN博客 当前专栏&#xff1a;Netty应用专栏_Aomsir的博客-CSDN博客 文章目录 参考文献前言单线程…...

Ubuntu20.04软件安装顺序

目录 0.网卡驱动1. sogoupinyin2. terminator3.1zsh3.2升级Cmake&#xff08;有些后面的软件需要高版本Cmake&#xff09;4.显卡驱动(在cuda之前)5.CUDA与cudnn,TensorRT6.OpenCV(在ROS之前)6.1先安装各种依赖6.2安装Ceres-1.14.06.3安装Pangolin6.4安装Sophus6.5安装VTK6.5编译…...

适配器模式 ( Adapter Pattern )(6)

适配器模式 ( Adapter Pattern ) 适配器模式&#xff08;Adapter Pattern&#xff09;是作为两个不兼容的接口之间的桥梁 适配器模式涉及到一个单一的类&#xff0c;该类负责加入独立的或不兼容的接口功能 举个真实的例子&#xff0c;读卡器是作为内存卡和笔记本之间的适配器…...

JAVA G1垃圾收集器介绍

为解决CMS算法产生空间碎片和其它一系列的问题缺陷&#xff0c;HotSpot提供了另外一种垃圾回收策略&#xff0c;G1&#xff08;Garbage First&#xff09;算法&#xff0c;通过参数-XX:UseG1GC来启用&#xff0c;该算法在JDK 7u4版本被正式推出&#xff0c;官网对此描述如下&am…...

十方影视后期“领进门”,成长与成就还得靠自身

在这个充满视觉冲击的时代&#xff0c;影视后期制作已经成为了一种炙手可热的艺术形式。而在这个领域&#xff0c;Adobe After Effects&#xff08;AE&#xff09;这款软件无疑是王者之一。十方影视后期作为十方教育科技旗下的艺术设计学科&#xff0c;不仅培养了数万名优秀的后…...

Golang之火爆原因

引言 在计算机编程领域&#xff0c;有很多种编程语言可供选择。然而&#xff0c;近年来&#xff0c;Golang&#xff08;Go&#xff09;这门相对年轻的编程语言却越来越受欢迎&#xff0c;备受推崇。那么&#xff0c;为什么Golang如此火爆&#xff1f;本文将探讨Golang之火爆原…...

WPF中Dispatcher对象的用途是什么

在WPF (Windows Presentation Foundation) 中&#xff0c;Dispatcher 对象的主要用途是提供一个与UI线程关联的消息循环系统&#xff0c;这允许开发者在UI线程上安排和执行任务。由于WPF的UI元素不是线程安全的&#xff0c;因此任何对UI元素的访问都必须从创建该元素的线程&…...

图论17-有向图的强联通分量-Kosaraju算法

文章目录 1 概念2 Kosaraju算法2.1 在图类中设计反图2.2 强连通分量的判断和普通联通分量的区别2.3 代码实现 1 概念 2 Kosaraju算法 对原图的反图进行DFS的后序遍历。 2.1 在图类中设计反图 // 重写图的构造函数public Graph(TreeSet<Integer>[] adj, boolean dire…...

ubuntu中使用 vscode 连接docker开发环境

文章目录 ubuntu中使用 vscode 连接docker开发环境步骤一&#xff1a;安装 Remote Development 插件步骤二&#xff1a;连接远程环境步骤三&#xff1a;开发 问题解决参考连接 ubuntu中使用 vscode 连接docker开发环境 Remote Development 是一个 Visual Studio Code 插件&…...

【广州华锐视点】海外制片人VR虚拟情景教学带来全新的学习体验

虚拟现实&#xff08;Virtual Reality&#xff0c;简称VR&#xff09;是一种利用电脑模拟产生一个三维的虚拟世界&#xff0c;提供用户关于视觉、听觉、触觉等感官的模拟体验的技术。随着科技的进步&#xff0c;VR已经被广泛应用到许多领域&#xff0c;包括游戏、教育、医疗、房…...

龙芯loongarch64麒麟服务器配置yum源

服务器信息&#xff1a; uname -a # 命令 Linux bogon 4.19.90-52.22.v2207.a.ky10.loongarch64 #1 SMP Tue Mar 14 11:18:26 CST 2023 loongarch64 loongarch64 loongarch64 GNU/Linux yum源配置&#xff1a; cd /etc/yum.repos.d/ vim kylin_loongarch64.repo 将下面内容拷贝…...

Centos7 单用户模式修改密码 3步搞定 666 (百分比成功)

1.第一步重新服务器 2.进入这个页面按e进入单用户模式 3.找到linux16这行 在后面添加 init/bin/bash 按ctrlx进入 4.注意是事项直接修改是报错passud: Authentication token manipulation error 需要执行权限&#xff1a;mount -o remount,rw /...

深度学习 机器视觉 车位识别车道线检测 - python opencv 计算机竞赛

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习 机器视觉 车位识别车道线检测 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f947;学长这里给一个题目综合评分(每项满分5分) …...

Java主流分布式解决方案多场景设计与实战

Java的主流分布式解决方案的设计和实战涉及到多个场景&#xff0c;包括但不限于以下几点&#xff1a; 分布式缓存&#xff1a;在Java的分布式系统中&#xff0c;缓存是非常重要的一部分。常用的分布式缓存技术包括Redis、EhCache等。这些缓存技术可以用来提高系统的性能和响应…...

docker安装MongoDB数据库,并且进行密码配置

很美的一首小诗> 我在外面流浪&#xff0c;回来时 故乡瘦了一圈—— 墩子叔走了&#xff0c;门前的池水 干了一半。 屋后驼背的柳树 头发散落了一地&#xff0c; 老房子蹲在坟边&#xff0c;屋顶的白云 仍在风中奔跑。 安装配置 要在Docker中安装MongoDB并启用远程连接&…...

ssh脚本找不到命令或者执行无效的解决办法

如图&#xff1a;今天在编写脚本时发现的这个问题&#xff0c; 在排除脚本语法错误、编码格式等情况下&#xff0c;仍然出现“bash 。。未找到命令”的字样 解决办法&#xff1a; 给每台虚拟机的环境变量source一下&#xff1a; 命令如下 source /etc/profile或者输入 vim ~…...

2023年11月18日(星期六)骑行海囗林场公园

2023年11月18日 (星期六) 骑行海囗林场公园(赏枫树林&#xff09;&#xff0c;早8:30到9:00&#xff0c; 大观公园门囗集合&#xff0c;9:30准时出发 【因迟到者&#xff0c;骑行速度快者&#xff0c;可自行追赶偶遇。】 偶遇地点:大观公园门口集合 &#xff0c;家住东&#x…...

xss 漏洞

1、XSS类型 XSS攻击大致上分为3类&#xff1a; 反射型xss&#xff0c;DOM型xss&#xff0c;存储型xss。前两类为非持久性xss&#xff0c;后者为持久型xss。 1.1 非持久型xss&#xff1a; 1&#xff09;反射型 XSS 攻击相对于访问者而言是一次性的&#xff0c;具体表现在恶意…...

一文图解爬虫_姊妹篇(spider)

—引导语 爬虫&#xff0c;没有一个时代比当前更重视它。一个好的爬虫似乎可以洞穿整个互联网&#xff0c;“来装满自己的胃”。 接上一篇&#xff1a;一文图解爬虫&#xff08;spider&#xff09; 博主已初步对爬虫的“五脏六腑”进行了解剖。虽然俗称“爬虫”&#xff0c;但窃…...

【vue实战项目】通用管理系统:api封装、404页

前言 本文为博主的vue实战小项目系列中的第三篇&#xff0c;很适合后端或者才入门的小伙伴看&#xff0c;一个前端项目从0到1的保姆级教学。前面的内容&#xff1a; 【vue实战项目】通用管理系统&#xff1a;登录页-CSDN博客 【vue实战项目】通用管理系统&#xff1a;封装to…...

conda相比python好处

Conda 作为 Python 的环境和包管理工具&#xff0c;相比原生 Python 生态&#xff08;如 pip 虚拟环境&#xff09;有许多独特优势&#xff0c;尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处&#xff1a; 一、一站式环境管理&#xff1a…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)

在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马&#xff08;服务器方面的&#xff09;的原理&#xff0c;连接&#xff0c;以及各种木马及连接工具的分享 文件木马&#xff1a;https://w…...

Proxmox Mail Gateway安装指南:从零开始配置高效邮件过滤系统

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storms…...

从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障

关键领域软件测试的"安全密码"&#xff1a;Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力&#xff0c;从金融交易到交通管控&#xff0c;这些关乎国计民生的关键领域…...

wpf在image控件上快速显示内存图像

wpf在image控件上快速显示内存图像https://www.cnblogs.com/haodafeng/p/10431387.html 如果你在寻找能够快速在image控件刷新大图像&#xff08;比如分辨率3000*3000的图像&#xff09;的办法&#xff0c;尤其是想把内存中的裸数据&#xff08;只有图像的数据&#xff0c;不包…...