Socket网络编程(五)——TCP数据发送与接收并行
目录
- 主要实现需求
- TCP 服务端收发并行重构
- 启动main方法重构
- 重构分离收发消息的操作
- 重构接收消息的操作
- 重构发送消息
- TCPServer调用发送消息的逻辑
- 监听客户端链接逻辑重构
- Socket、流的退出与关闭
- TCP 客户端收发并行重构
- 客户端 main函数重构
- 客户端接收消息重构
- 客户端发送消息重构
- 客户端 linkWith 主方法重构
- TCP 收发并行重构测试
- 服务端重构后执行日志
- 客户端重构后执行日志
- 源码下载
主要实现需求
多线程收发并行
TCP多线程收发协作
TCP 服务端收发并行重构
TCP 服务端收发并行重构
启动main方法重构
原有的main逻辑如下:

重构后如下:
public class Server {public static void main(String[] args) throws IOException {TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);boolean isSucceed = tcpServer.start();if(!isSucceed){System.out.println("Start TCP server failed.");}UDPProvider.start(TCPConstants.PORT_SERVER);// 键盘输入:BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));String str;do {str = bufferedReader.readLine();tcpServer.broadcast(str);} while (!"00bye00".equalsIgnoreCase(str));UDPProvider.stop();tcpServer.stop();}
}
重构后,从while循环不断读取键盘输入信息,当输入“00bye00” 时退出读取。此处只读取键盘输入数据,客户端发送的数据在会重新拆分出来新的线程单独处理。
重构分离收发消息的操作
创建 ClientHandler.java 重构收发消息操作:
public class ClientHandler {private final Socket socket;private final ClientReadHandler readHandler;private final ClientWriteHandler writeHandler;private final CloseNotiry closeNotiry;public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException {this.socket = socket;this.readHandler = new ClientReadHandler(socket.getInputStream());this.writeHandler = new ClientWriteHandler(socket.getOutputStream());this.closeNotiry = closeNotiry;System.out.println("新客户链接: " + socket.getInetAddress() + "\tP:" + socket.getPort());}
}
重构接收消息的操作
/*** 接收数据*/class ClientReadHandler extends Thread {private boolean done = false;private final InputStream inputStream;ClientReadHandler(InputStream inputStream){this.inputStream = inputStream;}@Overridepublic void run(){super.run();try {// 得到输入流,用于接收数据BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客户端拿到一条数据String str = socketInput.readLine();if(str == null){System.out.println("客户端已无法读取数据!");// 退出当前客户端ClientHandler.this.exitBySelf();break;}// 打印到屏幕System.out.println(str);}while (!done);socketInput.close();}catch (IOException e){if(!done){System.out.println("连接异常断开");ClientHandler.this.exitBySelf();}}finally {// 连接关闭CloseUtils.close(inputStream);}}void exit(){done = true;CloseUtils.close(inputStream);}}
创建一个单独的线程进行接收消息,该线程不需要关闭。
重构发送消息
/*** 发送数据*/class ClientWriteHandler {private boolean done = false;private final PrintStream printStream;private final ExecutorService executorService;ClientWriteHandler(OutputStream outputStream) {this.printStream = new PrintStream(outputStream);// 发送消息使用线程池来实现this.executorService = Executors.newSingleThreadExecutor();}void exit(){done = true;CloseUtils.close(printStream);executorService.shutdown();}void send(String str) {executorService.execute(new WriteRunnable(str));}class WriteRunnable implements Runnable{private final String msg;WriteRunnable(String msg){this.msg = msg;}@Overridepublic void run(){if(ClientWriteHandler.this.done){return;}try {ClientWriteHandler.this.printStream.println(msg);}catch (Exception e){e.printStackTrace();}}}}
TCPServer调用发送消息的逻辑
public void broadcast(String str) {for (ClientHandler client : clientHandlerList){// 发送消息client.send(str);}}
监听客户端链接逻辑重构
private List<ClientHandler> clientHandlerList = new ArrayList<>();/*** 监听客户端链接*/private class ClientListener extends Thread {private ServerSocket server;private boolean done = false;private ClientListener(int port) throws IOException {server = new ServerSocket(port);System.out.println("服务器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort());}@Overridepublic void run(){super.run();System.out.println("服务器准备就绪~");// 等待客户端连接do{// 得到客户端Socket client;try {client = server.accept();}catch (Exception e){continue;}try {// 客户端构建异步线程ClientHandler clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));// 启动线程clientHandler.readToPrint();clientHandlerList.add(clientHandler);} catch (IOException e) {e.printStackTrace();System.out.println("客户端连接异常: " + e.getMessage());}}while (!done);System.out.println("服务器已关闭!");}void exit(){done = true;try {server.close();}catch (IOException e){e.printStackTrace();}}}
clientHandlerList作为已经建立了连接的客户端的集合,用于管理当前用户的信息。接收与发送都使用该集合。
Socket、流的退出与关闭
/*** 退出、关闭流*/public void exit(){readHandler.exit();writeHandler.exit();CloseUtils.close(socket);System.out.println("客户端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort());}/*** 发送消息* @param str*/public void send(String str){writeHandler.send(str);}/*** 接收消息*/public void readToPrint() {readHandler.exit();}/*** 接收、发送消息异常,自动关闭*/private void exitBySelf() {exit();closeNotiry.onSelfClosed(this);}/*** 关闭流*/public interface CloseNotiry{void onSelfClosed(ClientHandler handler);}
TCP 客户端收发并行重构
客户端 main函数重构
public static void main(String[] args) {// 定义10秒的搜索时间,如果超过10秒未搜索到,就认为服务器端没有开机ServerInfo info = UDPSearcher.searchServer(10000);System.out.println("Server:" + info);if( info != null){try {TCPClient.linkWith(info);}catch (IOException e){e.printStackTrace();}}}
客户端接收消息重构
static class ReadHandler extends Thread{private boolean done = false;private final InputStream inputStream;ReadHandler(InputStream inputStream){this.inputStream = inputStream;}@Overridepublic void run(){try {// 得到输入流,用于接收数据BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客户端拿到一条数据String str = null;try {str = socketInput.readLine();}catch (SocketTimeoutException e){}if(str == null){System.out.println("连接已关闭,无法读取数据!");break;}// 打印到屏幕System.out.println(str);}while (!done);socketInput.close();}catch (IOException e){if(!done){System.out.println("连接异常断开:" + e.getMessage());}}finally {// 连接关闭CloseUtils.close(inputStream);}}void exit(){done = true;CloseUtils.close(inputStream);}}
创建ReadHandler用单独的线程去接收服务端的消息。连接关闭则exit() 关闭客户端。
客户端发送消息重构
private static void write(Socket client) throws IOException {// 构建键盘输入流InputStream in = System.in;BufferedReader input = new BufferedReader(new InputStreamReader(in));// 得到Socket输出流,并转换为打印流OutputStream outputStream = client.getOutputStream();PrintStream socketPrintStream = new PrintStream(outputStream);boolean flag = true;do {// 键盘读取一行String str = input.readLine();// 发送到服务器socketPrintStream.println(str);// 从服务器读取一行if("00bye00".equalsIgnoreCase(str)){break;}}while(flag);// 资源释放socketPrintStream.close();}
在linkWith() 中调用write() 发送方法,由 do-while 循环读取本地键盘输入信息进行发送操作。当满足 “00bye00” 时,关闭循环,关闭socket连接,结束该线程。
客户端 linkWith 主方法重构
public static void linkWith(ServerInfo info) throws IOException {Socket socket = new Socket();// 超时时间socket.setSoTimeout(3000);// 端口2000;超时时间300mssocket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));//System.out.println("已发起服务器连接,并进入后续流程~");System.out.println("客户端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort());System.out.println("服务器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort());try {ReadHandler readHandler = new ReadHandler(socket.getInputStream());readHandler.start();// 发送接收数据write(socket);}catch (Exception e){System.out.println("异常关闭");}// 释放资源socket.close();System.out.println("客户端已退出~");}
原有的逻辑里,是调用 todo() 方法,在todo() 方法里同时进行收发操作。现在是进行读写分离。
TCP 收发并行重构测试
服务端重构后执行日志

客户端重构后执行日志

源码下载
下载地址:https://gitee.com/qkongtao/socket_study/tree/master/src/main/java/cn/kt/socket/SocketDemo_L5_TCP_Channel
相关文章:
Socket网络编程(五)——TCP数据发送与接收并行
目录 主要实现需求TCP 服务端收发并行重构启动main方法重构重构分离收发消息的操作重构接收消息的操作重构发送消息TCPServer调用发送消息的逻辑监听客户端链接逻辑重构Socket、流的退出与关闭 TCP 客户端收发并行重构客户端 main函数重构客户端接收消息重构客户端发送消息重构…...
2024最新-ubuntu22.04安装最新版QT6.6~6.8教程
1. 在官网下载 online_installer: https://download.qt.io/archive/online_installers/4.7/qt-unified-linux-x64-4.7.0-online.run 或者直接镜像站下载: http://mirrors.ustc.edu.cn/qtproject/archive/online_installers/4.7/qt-unified-linux-x6…...
STM32------分析GPIO寄存器
一、初始LED原理图 共阴极led LED发光二极管,需要有电流通过才能点亮,当有电压差就会产生电流 二极管两端的电压差超过2.7v就会有电流通过 电阻的作用 由于公式IV/R 不加电阻容易造成瞬间电流无穷大 发光二极管工作电流为10-20MA 3.3v / 1kΩ 3.…...
数学实验-Matlab使用(1)
使用方法以及笔记均在文件中 class1_func1.m function f class1_func1(x) % f为输出,输出有多个时需要用中括号以矩阵的方式包起来 % x为输入f sin(x)class1_func2.m function [a,b,u,v] class1_func2(x,y)[a,b] eig(x)[u,v] eig(y)class1.m % 当语句后有…...
kafka文件存储机制和消费者
1.broker文件存储机制 去查看真正的存储文件: 在/opt/module/kafka/datas/ 路径下 kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index 如果是6415那么这个会存储在563的log文件之中,因为介于6410和10090之间。 2.…...
《汇编语言》- 读书笔记 - 第15章-外中断
《汇编语言》- 读书笔记 - 第15章-外中断 15.1 接口芯片和端口15.2 外中断信息1. 可屏蔽中断(Maskable Interrupt)2. 不可屏蔽中断(Non-Maskable Interrupt)设计思想 15.3 PC 机键盘的处理过程1. 键盘输入2. 引发 9 号中断3. 执行…...
【Vue3】CSS 新特性
:slotted <template> <!-- App.vue--><Son ><div class"a">我要插入了</div></Son> </template><script setup lang"ts"> import Son from ./components/Son.vue </script><style></sty…...
四信水电站泄洪预警方案,精准提升防汛应急水平
2022年5月水利部办公厅发布《关于开展水库泄洪设施专项排查整改的紧急通知》,为坚决贯彻落实关于水库大坝安全的重要指示批示精神、关于保障水库泄洪通道通畅的批示要求,全力防范水库可能出现的重大险情,确保水库安全度汛。 2023年3月国家能源…...
k8s中容器的调度与创建:CRI,cgroup
container调度与创建 选自:K8s、CRI与container - packy的文章 - 知乎 https://zhuanlan.zhihu.com/p/102897620 Cgroup创建: cgexec -g cpu,memory:$UUID \ > unshare -uinpUrf --mount-proc \ > sh -c "/bin/hostname $UUID &…...
Unity安装与简单设置
安装网址:https://unity.cn 设置语言: 设置安装位置:否则C盘就会爆了 获取一个个人的资格证: 开始安装: 安装完毕。 添加模块:例如简体中文 新建项目: 布局2*3、单栏布局、 设置…...
数据库的介绍、分类、作用和特点
数据库是用来存储、管理和检索数据的集合系统。根据数据处理模型的不同,数据库可以分为多种类型,主要包括: 1、关系型数据库(RDBMS): 介绍:关系型数据库使用表格形式来存储数据,并通…...
【Unity】机器人末端执行器仿真
机械手臂的末端执行器使用多项式来计算转动角度可能有几个原因: 精确控制:机械臂的运动通常需要高度的精确性,特别是在精密工作或复杂运动轨迹的情况下。多项式,特别是高阶的,可以很好地近似复杂的非线性关系和运动轨迹…...
更换个人开发环境后,pycharm连接服务器报错Authentication failed
原因:服务器中更换个人开发环境后,密码变了。 解决:在pycharm中修改服务器开发环境密码即可。 1 找到Tools-Depolyment-Configuration 2 点击SSH Configuration后的省略号 3 修改这里面的Password即可...
E - Bad Juice
解题思路 由于最后返回一个01字符串表示所选人的状态要求人数最少,即字符串长度最少而要用最少的字符,找出则返回的字符为二进制下的编号这样利用了所有的01字符号人表示二进制下位的情况注意对于2的次方项,只需要有位,可以用位均…...
用HTML5的<canvas>元素实现刮刮乐游戏
用HTML5的<canvas>元素实现刮刮乐游戏 用HTML5的<canvas>元素实现刮刮乐,要求:将上面的“图层”的图像可用鼠标刮去,露出下面的“图层”的图像。 示例从简单到复杂。 简单示例 准备两张图像,我这…...
TypeScript + react 中 TypeScript 的加入后 , 有哪些优化项目
在使用 TypeScript 结合 React 进行开发时,TypeScript 提供了许多优化和增强代码质量的方式。以下是一些关键的优化操作和最佳实践: 强类型组件属性(Props)和状态(State): 使用接口或类型别名定义组件的 pr…...
Redis学习路径(构建体系)
学习路径 掌握数据类型(分析底层数据结构)和缓存的基本使用 (理论使用) 掌握 redis 实现高性能,高可靠、高可用技术 (理论)学习redis源代码底层实现 (底层实现) 先来一个引言,比较宏观的角度…...
【README 小技巧】 展示gitee中开源项目start
【README 小技巧】 展示gitee中开源项目start <a target"_blank" hrefhttps://gitee.com/wujiawei1207537021/wu-framework-parent><img srchttps://gitee.com/wujiawei1207537021/wu-framework-parent/badge/star.svg altGitee star/></a>...
tcping实用小工具
Tcping实用小工具命令详解 一、tcping介绍 tcping:tcping命令基于tcp协议监控,可以从较低级别的协议获得简单的,可能不可靠的数据报服务。 原则上,TCP应该能够在从容硬线连接到分组交换或电路交换网络的各种通信系统之上操作。 …...
【Web】Java反序列化之CC2——commons-collections4的新链之一
目录 关于commons-collections4 一个重要的思维模型 触发Transform的关键类:TransformingComparator 反序列化的入口:PriorityQueue Exp 关于commons-collections4 commons-collections4 是 Apache Commons 组件库中的一个项目,它是对旧…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...
【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
力扣-35.搜索插入位置
题目描述 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...
Angular微前端架构:Module Federation + ngx-build-plus (Webpack)
以下是一个完整的 Angular 微前端示例,其中使用的是 Module Federation 和 npx-build-plus 实现了主应用(Shell)与子应用(Remote)的集成。 🛠️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...
重启Eureka集群中的节点,对已经注册的服务有什么影响
先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...
