多人聊天室 NIO模型实现
NIO编程模型
- Selector监听客户端不同的zhuangtai
- 不同客户端触发不同的状态后,交由相应的handles处理
- Selector和对应的处理handles都是在同一线程上实现的
I/O多路复用
在Java中,I/O多路复用是一种技术,它允许单个线程处理多个输入/输出(I/O)源,而不需要为每个I/O源创建一个线程。这种技术可以显著提高性能,因为它减少了线程创建和上下文切换的开销。I/O多路复用的核心思想是使用一个机制来监控多个I/O通道,一旦某个通道有数据可读或可写,就通知应用程序进行相应的操作。
NIO模型 + Selector监听通道 == 经典的I/O多路复用
同步式I/O和异步I/O概念及分类
概念:
- 同步式I/O(Synchronous I/O)
定义:在同步I/O模型中,当一个线程发起一个I/O请求时,它会阻塞,直到I/O操作完成。也就是说,线程会一直等待直到数据被读取或写入完毕。
特点:阻塞性:线程在I/O操作完成之前不能执行其他任务。
资源消耗:每个I/O操作都需要一个线程或进程,可能导致资源消耗较大,特别是在高并发场景下。 - 异步I/O(Asynchronous I/O)
定义:在异步I/O模型中,当一个线程发起一个I/O请求后,它不会被阻塞,而是可以继续执行其他任务。I/O操作在后台进行,当操作完成时,系统会通知发起请求的线程。
特点:非阻塞性:线程不需要等待I/O操作完成,可以继续执行其他任务。
并发性:可以提高系统的并发处理能力,适用于高并发场景。
分类:
- BIO(Blocking I/O):
类型:同步I/O。
特点:在BIO模型中,当线程执行I/O操作时,如果数据还没有准备好,它会一直等待直到数据准备完成。在这个过程中,线程被阻塞,不能执行其他任务。 - NIO(Non-blocking I/O):
类型:非阻塞I/O,可以用于同步或异步操作。
特点:NIO模型中的I/O操作是非阻塞的,这意味着当数据没有准备好时,线程可以立即返回,去做其他事情。NIO本身提供了非阻塞的能力,但是它既可以用于同步编程(通过在while循环中检查并处理I/O事件),也可以与异步I/O(如Java 7引入的NIO.2,也称为Asynchronous I/O)结合使用。 - I/O多路复用(I/O Multiplexing):
类型:同步I/O。
特点:I/O多路复用模型允许单个线程监控多个I/O通道,但是当线程执行I/O操作时,如果数据没有准备好,线程仍然会被阻塞。最常见的I/O多路复用技术是select/poll系统调用。在Java中,可以通过Selector和Channel实现I/O多路复用。
总结:
- NIO模型+Selector实现的I/O多路复用是同步式I/O,因为服务器端需要多次调用selector.select()来查看是否有新的事件发生。如果服务器端不通过多次调用selector.select(),也没有其他线程会通知主线程有新的事件发生,主线程就会持续阻塞。
- AIO异步I/O则是当主线程查看发现没有新事件发生时立刻返回处理其他事件,当有新事件发生时主线程会被通知,并来处理。
ChatServer实现
package server;import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;public class ChatServer {private static final int DEFAULT_PORT = 8888;private static final String QUIT = "quit";private static final int BUFFER = 1024;private ServerSocketChannel server;private Selector selector;private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);//统一编码,解码方法private Charset charset = Charset.forName("UTF-8");//可以自定义服务器端的端口private int port;public ChatServer() {this(DEFAULT_PORT);}public ChatServer(int port) {this.port = port;}private void start() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.socket().bind(new InetSocketAddress(port));selector = Selector.open();//将ServerSocketChannel的Accept事件注册到selector上//一旦ServerSocketChannel接收到了客户端的连接请求,selector就会得知server.register(selector, SelectionKey.OP_ACCEPT);System.out.println("启动服务器, 监听端口:" + port + "...");while (true) {//有事件被触发了select()函数才会有返回selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey key : selectionKeys) {// 处理被触发的事件handles(key);}//清空集合,防止重复处理selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();} finally {close(selector);}}private void handles(SelectionKey key) throws IOException {// ACCEPT事件 - 和客户端建立了连接if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel client = server.accept();client.configureBlocking(false);// 在selector上注册可能发生的Read事件client.register(selector, SelectionKey.OP_READ);System.out.println(getClientName(client) + "已连接");}// READ事件 - 客户端发送了消息else if (key.isReadable()) {SocketChannel client = (SocketChannel) key.channel();String fwdMsg = receive(client);if (fwdMsg.isEmpty()) {// 客户端异常// 取消掉key的注册,以后不再响应Read的事件// selector的key注销掉以后通常搭配selector.wakeup(); (是个好习惯)立刻唤醒selector,判断当前发生的事件key.cancel();selector.wakeup();} else {System.out.println(getClientName(client) + ":" + fwdMsg);forwardMessage(client, fwdMsg);// 检查用户是否退出if (readyToQuit(fwdMsg)) {key.cancel();selector.wakeup();System.out.println(getClientName(client) + "已断开");}}}}private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {for (SelectionKey key: selector.keys()) {Channel connectedClient = key.channel();//如果遍历到了服务器的监听Socket,则跳过(不需要将消息转发给服务器)if (connectedClient instanceof ServerSocketChannel) {continue;}// key是否有效(key对应的Channel和selector都在运行没有关闭) && 不是发消息的客户端他自己if (key.isValid() && !client.equals(connectedClient)) {// 写Buffer前先将Buffer清空wBuffer.clear();// 写入Buffer消息wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg));// 将Buffer从写状态反转成读状态wBuffer.flip();// 将Buffer中的数据写入通道中while (wBuffer.hasRemaining()) {((SocketChannel)connectedClient).write(wBuffer);}}}}private String receive(SocketChannel client) throws IOException {// 在每次新的读取前先把buffer清空rBuffer.clear();// 将channel中的信息读入rBuffer中,直到读不出文件while(client.read(rBuffer) > 0);// 将rBuffer从写模式从转换成读模式rBuffer.flip();return String.valueOf(charset.decode(rBuffer));}private String getClientName(SocketChannel client) {return "客户端[" + client.socket().getPort() + "]";}private boolean readyToQuit(String msg) {return QUIT.equals(msg);}private void close(Closeable closable) {if (closable != null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) {ChatServer chatServer = new ChatServer(7777);chatServer.start();}
}
- 将Channel中的事件注册在Selector
- 用Selector监控事件的发生,实现了在同一线程处理多个客户端输入
- 极大提高了线程的使用效率,使得服务器端能够处理大量的客户端连接
实现ChatClient
ChatClient
package client;import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;public class ChatClient {private static final String DEFAULT_SERVER_HOST = "127.0.0.1";private static final int DEFAULT_SERVER_PORT = 8888;private static final String QUIT = "quit";private static final int BUFFER = 1024;private String host;private int port;private SocketChannel client;private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);private Selector selector;private Charset charset = Charset.forName("UTF-8");public ChatClient() {this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);}public ChatClient(String host, int port) {this.host = host;this.port = port;}public boolean readyToQuit(String msg) {return QUIT.equals(msg);}private void close(Closeable closable) {if (closable != null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}private void start() {try {client = SocketChannel.open();client.configureBlocking(false);selector = Selector.open();client.register(selector, SelectionKey.OP_CONNECT);client.connect(new InetSocketAddress(host, port));while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey key : selectionKeys) {handles(key);}selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();} catch (ClosedSelectorException e) {// 用户正常退出} finally {close(selector);}}private void handles(SelectionKey key) throws IOException {// CONNECT事件 - 连接就绪事件if (key.isConnectable()) {SocketChannel client = (SocketChannel) key.channel();// 判断连接是否建立完全if (client.isConnectionPending()) {// 调用finishConnect方法正式建立连接client.finishConnect();// 创建一个新的线程来处理用户的输入new Thread(new UserInputHandler(this)).start();}// 将Read事件注册在selector上面client.register(selector, SelectionKey.OP_READ);}// READ事件 - 服务器转发消息else if (key.isReadable()) {SocketChannel client = (SocketChannel) key.channel();String msg = receive(client);if (msg.isEmpty()) {// 服务器异常close(selector);} else {System.out.println(msg);}}}public void send(String msg) throws IOException {if (msg.isEmpty()) {return;}wBuffer.clear();wBuffer.put(charset.encode(msg));wBuffer.flip();while (wBuffer.hasRemaining()) {client.write(wBuffer);}// 检查用户是否准备退出if (readyToQuit(msg)) {close(selector);}}private String receive(SocketChannel client) throws IOException {rBuffer.clear();while (client.read(rBuffer) > 0);rBuffer.flip();return String.valueOf(charset.decode(rBuffer));}public static void main(String[] args) {ChatClient client = new ChatClient("127.0.0.1", 7777);client.start();}
}
- ChatClient仍然需要通过创建新的线程来处理用户输入
UserInputHanlder
package client;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;public class UserInputHandler implements Runnable {private ChatClient chatClient;public UserInputHandler(ChatClient chatClient) {this.chatClient = chatClient;}@Overridepublic void run() {try {// 等待用户输入消息BufferedReader consoleReader =new BufferedReader(new InputStreamReader(System.in));while (true) {String input = consoleReader.readLine();// 向服务器发送消息chatClient.send(input);// 检查用户是否准备退出if (chatClient.readyToQuit(input)) {break;}}} catch (IOException e) {e.printStackTrace();}}
}
- UserInputHandler仍然需要阻塞式的等待用户的输入
- 用户的输入延迟应非常小,所以线程必须时刻等待着用户的输入以便第一时间处理
总结
与用BIO模型实现的多人聊天室有什么区别
- 使用Channel代替Stream
- 使用Selector监控多条Channel
- 可以在一个线程里处理多个Channel I/O
相关文章:

多人聊天室 NIO模型实现
NIO编程模型 Selector监听客户端不同的zhuangtai不同客户端触发不同的状态后,交由相应的handles处理Selector和对应的处理handles都是在同一线程上实现的 I/O多路复用 在Java中,I/O多路复用是一种技术,它允许单个线程处理多个输入/输出&…...

三、使用 Maven:命令行环境
文章目录 1. 第一节 实验一:根据坐标创建 Maven 工程1.1 Maven 核心概念:坐标1.2 实验操作1.3 Maven核心概念:POM1.4 Maven核心概念:约定的目录结构 2. 实验二:在 Maven 工程中编写代码2.1 主体程序2.2 测试程序 3. 执…...

Blender导入下载好的fbx模型像的骨骼像针戳/像刺猬
为什么我下载下来的骨骼模型和我自己绑定的模型骨骼朝向完全不一样 左边是下载的模型 右边是我自己绑定的模型 左边的模型刚刚感觉都是像针一样往外戳的,像刺猬一样那种。 解决方法勾选自动骨骼坐标系...

如何高效搭建智能BI数据分析系统
作为当今信息化时代,数据资产已经成为企业最为核心倚重的,自然企业也就面临来自于对内部这些数据的处理和分析。如何在大批量的数据当中提取有用信息,帮助企业做出智慧决策,是不少企业面临的问题。作为国内知名的BI数据分析系统服…...

第 6 章 Java 并发包中锁原理剖析Part one
目录 6.1 LockSupport 工具类 6.2 独占锁 ReentrantLock 的原理 获取锁 1.void lock() 方法 2.void lockInterruptibly() 方法 3.boolean tryLock() 方法 4.boolean tryLock(long timeout, TimeUnit unit) 方法 释放锁 6.1 Lo…...

使用 Canvas 绘制一个镂空的圆形区域
如果要实现一个类似人脸识别的界面,要求使用 canvas 进行绘制,中间镂空透明区域,背景是白色的画布。 技术方案: 首先,使用 canvas 绘制一个白色画布其次,使用 context.globalCompositeOperation 合成属性进…...

【Notepad++】---设置背景为护眼色(豆沙绿)最新最详细
在编程的艺术世界里,代码和灵感需要寻找到最佳的交融点,才能打造出令人为之惊叹的作品。而在这座秋知叶i博客的殿堂里,我们将共同追寻这种完美结合,为未来的世界留下属于我们的独特印记。 【Notepad】---设置背景为护眼色…...

2024 数学建模国一经验分享
2024 数学建模国一经验分享 背景:武汉某211,专业:计算机科学 心血来潮,就从学习和组队两个方面指点下后来者,帮新人避坑吧 2024年我在数学建模比赛中获得了国一(教练说论文的分数是湖北省B组第一࿰…...

安全见闻2
安全见闻,犹如一座庞大而深邃的知识宝库,其中涵盖了形形色色的网络安全知识与错综复杂的网络技术体系。在当今数字化时代,这些领域的重要性不言而喻,它们不仅关乎个人信息的保护与隐私安全,更是支撑着整个互联网世界以…...

Web游戏开发指南:在 Phaser.js 中读取和管理游戏手柄输入
前言 Phaser.js 是一个广受欢迎的 HTML5 游戏框架,为开发者提供了创建跨平台 2D 游戏的强大工具。在现代游戏开发中,支持游戏手柄已成为提升玩家体验的重要方面。本文将详细介绍如何在 Phaser.js 中监听和处理游戏手柄的输入,帮助开发者为他…...

代码随想录32 动态规划理论基础,509. 斐波那契数,70. 爬楼梯,746. 使用最小花费爬楼梯。
1.动态规划理论基础 动态规划刷题大纲 什么是动态规划 动态规划,英文:Dynamic Programming,简称DP,如果某一问题有很多重叠子问题,使用动态规划是最有效的。 所以动态规划中每一个状态一定是由上一个状态推导出来的…...

记录一个Flutter 3.24单元测试点击事件bug
哈喽,我是老刘 这两天发现一个Flutter 3.24版本的单元测试的一个小bug,提醒大家注意一下。 老刘自己写代码十多年了,写Flutter也6年多了,没想到前两天在一个小小的BottomNavigationBar 组件上翻了车。 给大家分享一下事件的经过。…...

使用Python将 word文档转pdf文档
第一步:我们需要导入支持包 >pip install pywin32 如果下载速度比较慢的话,可以考虑使用国内镜像源。 第二步:我们需要导入文件,这里采用 input,用户填入路径后,直接获取路径下的word文档,实现批量转换…...

基于C#+SQLite开发数据库应用的示例
SQLite数据库,小巧但功能强大;并且是基于文件型的数据库,驱动库就是一个dll文件,有些开发工具 甚至不需要带这个dll,比如用Delphi开发,用一些三方组件;数据库也是一个文件,虽然是个文…...

Vue基本语法
Options API 选项式/配置式api 需要在script中的export default一个对象对象中可以包含data、method、components等keydata是数据,数据必须是一个方法(如果是对象,会导致多组件的时候,数据互相影响,因为对象赋值后&…...

芯片发展史
芯片的发展史可分为几个重要的阶段,从早期的真空管到现代的集成电路,反映了技术进步和创新的历程: 1. 真空管时代 (1904 - 1950年代) 真空管是20世纪初的电子元件,用于放大信号和开关,广泛应用在早期的收音机、电视机…...

我的知识图谱和Neo4j数据库的使用
知识图谱概述 知识图谱的含义 RDF与RDFS RDF(Resource Description Framework,资源描述框架)和RDFS(RDF Schema,RDF模式)是构建知识图谱的基础技术之一。它们提供了一种标准的方式来表示信息,…...

ASP.NET CORE API 解决跨域问题
环境 vs2022 .net 8 创建ASP.net Core API项目 配置跨域 编写ApiController 启动项目 得到服务器运行的 地址 在Hbuiler中创建web项目,编写代码 【运行】-【运行到浏览器】-选择一个浏览器,查看结果 正常显示 问题 如果允许所有源访问,有安全风险方…...

sram测试注意讨论
常规测试首先是mbist测试,原理不用多说,自己看,主要是注意点和考虑点: 1、明确测试用的到func_clk的频率的大小,根据经验值一般大于800M的时钟需要特别考虑Timing的问题:由于pr摆放的位置原因,…...

Mybatis 支持延迟加载的详细内容
延迟加载的概念深入 延迟加载是一种在处理复杂对象关系时非常有用的策略。在企业级应用开发中,数据库中的表之间往往存在着各种关联关系,如一对多(一个用户有多个订单)、多对多(一个学生可以选多门课程,一门…...

word文档使用技巧笔记
中文和数字断开到第二行 word一串数字断开_百度知道 下划线对齐 word下划线怎么固定长度一致-百度经验...

使用docker-compose部署搜索引擎ElasticSearch6.8.10
背景 Elasticsearch 是一个开源的分布式搜索和分析引擎,基于 Apache Lucene 构建。它被广泛用于实时数据搜索、日志分析、全文检索等应用场景。 Elasticsearch 支持高效的全文搜索,并提供了强大的聚合功能,可以处理大规模的数据集并进行快速…...

bugku-web-login2
不知道为啥用bp始终登不上hackbar可以 随便输入个账号密码bp抓包,发现个小tip是base64加密的解密 $sql"SELECT username,password FROM admin WHERE username".$username.""; if (!empty($row) && $row[password]md5($password)){ } …...

【 AI技术赋能有限元分析与材料科学应用实践】Neo-Hookean 材料与深度学习结合的有限元分析
Neo-Hookean 材料模型是用于描述非线性弹性材料(如软组织和橡胶等)的经典模型,特别适用于大变形问题。其基本思想是通过应变能密度函数来描述材料的弹性行为。在该模型中,材料的应力-应变关系不仅依赖于应变能,还通过变…...

StarRocks关于ConcurrentModificationException 问题的解决
背景 本文基于 StarRocks 3.1.7 目前在基于Starrocks做一些数据分析的操作(主要是做一些简单的查询),同事遇到了一些并发的问题: ontent:2024-11-27 07:04:34,048 WARN (starrocks-mysql-nio-pool-214933|3593819) [StmtExecutor.execute():643] execute Exceptio…...

网络安全防护指南:筑牢网络安全防线(5/10)
一、网络安全的基本概念 (一)网络的定义 网络是指由计算机或者其他信息终端及相关设备组成的按照一定的规则和程序对信息收集、存储、传输、交换、处理的系统。在当今数字化时代,网络已经成为人们生活和工作中不可或缺的一部分。它连接了世…...

替代FTP最佳跨网文件传输解决方案——FileLink
在传统的企业文件传输中,FTP(文件传输协议)曾因其便捷性和高效性被广泛应用。然而,其固有的安全漏洞、对大文件传输支持的局限性、易受网络攻击等问题,已逐渐暴露出FTP在现代企业环境下的不足。针对这一问题࿰…...

Cesium在vue2中的引入和注意事项
在Vue2中,可以使用npm包管理工具来安装Cesium,并通过import语句将其引入到项目中。下面是在Vue2中引入Cesium的步骤和注意事项: 步骤: 首先,打开终端并进入Vue项目的根目录。 运行以下命令来安装Cesium: …...

CentOS 9 配置静态IP
文章目录 1_问题原因2_nmcli 配置静态IP3_使用配置文件固定IP4_重启后存在的问题5_nmcli 补充 1_问题原因 CentOS 7 于 2014年6月发布,基于 RHEL 7,并在 2024年6月30日 结束维护。 CentOS 9 作为目前的最新版本,今天闲来闲来无事下载下来后…...

深入解析 Webhook:从原理到实践的全面指南
1. 引言 1.1 什么是 Webhook? Webhook 是一种基于 HTTP 回调的轻量级通信机制,它允许一个系统实时向另一个系统发送数据。当特定事件发生时,Webhook 会主动向指定的 URL 发送 HTTP 请求,通常携带事件相关的数据。这种被动接收通…...