基于Socketserver+ThreadPoolExecutor+Thread构造的TCP网络实时通信程序
目录
介绍:
源代码:
Socketserver-服务端代码
Socketserver客户端代码:
介绍:
socketserver是一种传统的传输层网络编程接口,相比WebSocket这种应用层的协议来说,socketserver比较底层,socketserver的网络通信逻辑与收发、传输的数据格式与都要由开发者自己来定义,适合用来学习网络底层通信逻辑。我采用Python脚本来编程Socketserver的接口,我在下面放出源代码。
源代码:
我先讲一下我实现的转发模型,是C/S架构,不是P2P,由服务端中转客户端的发送消息这样。
Socketserver-服务端代码
import json
import socketserver
import struct
from threading import Thread
from concurrent.futures import ThreadPoolExecutorfrom threading import Lockdef send_byte(conn,msg):msg__bs_len = len(msg)msg_bs_len_bs = struct.pack('i',msg__bs_len)conn.sendall(msg_bs_len_bs)conn.sendall(msg)def recv_byte(conn):msg_recv_len_bs = conn.recv(4)msg_recv_len = struct.unpack('i', msg_recv_len_bs)[0]msg_recv = conn.recv( msg_recv_len )return msg_recvdef send(conn,msg):msg_json = json.dumps(msg)msg_bs = msg_json.encode('utf-8')msg_bs_len = len(msg_bs)msg_bs_len_pack=(struct.pack('i', msg_bs_len))conn.sendall(msg_bs_len_pack)conn.sendall(msg_bs)def recv_name(conn):name_len_bs = conn.recv(4)name_len = struct.unpack('i', name_len_bs)[0]name_bs = conn.recv(name_len)name = name_bs.decode('utf-8')return namedef recv(conn):msg_len_bs = conn.recv(4)msg_len = struct.unpack('i', msg_len_bs)[0]msg_bs = conn.recv(msg_len)msg = msg_bs.decode('utf-8')msg = json.loads(msg)return msgclass MyRequestHandler(socketserver.BaseRequestHandler):client_dict = {} #{address_port:address_port,sk_conn:conn}name_list = []stor_user_list = []retr_user_list = []lock = Lock()def handle(self):conn = self.requestaddress_port = self.client_addressclient_name = recv_name(conn)try:with ThreadPoolExecutor() as t:future = t.submit(handle_is_newuser,address_port,conn,client_name)def broadcast_welcome(future):is_new = future.result()if is_new:for key,value in MyRequestHandler.client_dict.items():sk_conn = value['sk_conn']send(sk_conn, f"系统消息: 【{client_name}】 加入了群聊,输入/help获取命令")future.add_done_callback(broadcast_welcome)except Exception as e:print ('出现异常:',e)while 1:msg_dict = recv(conn)print (msg_dict)msg = msg_dict['msg']name = msg_dict['name']try:if msg.upper() == 'Q':MyRequestHandler.name_list.remove(client_name)del MyRequestHandler.client_dict[name]for key, value in MyRequestHandler.client_dict.items():sk_conn = value['sk_conn']print (f'【{name}】退出了群聊')send(sk_conn, f'【{name}】退出了群聊')conn.close()elif msg == 'client/all':send(conn,f'在线用户列表:{MyRequestHandler.name_list}')elif msg == '/help':text ='查看在线用户:client/all\n私聊:/chat [对方名字] [消息内容]\n退出群聊:[q] or [Q]\n向对方传输文件:/stor [对方名字] [本地文件路径]\n显示递归目录树:/tree [对方名字] [远端目录]'send(conn,text)elif msg.lstrip().startswith('/tree_content'):try:parts = msg.split(' ',2)ip_or_name = parts[1]if ip_or_name == name:send(conn,'请指定对方名字')continueif ip_or_name in MyRequestHandler.name_list:values = MyRequestHandler.client_dict[ip_or_name]pri_conn = values['sk_conn']send(pri_conn,msg_dict)except Exception as e:print ('命令执行错误',e)elif msg.lstrip().startswith('/tree'):parts = msg.split(' ',2)ip_or_name = parts[1]if ip_or_name == name:send(conn, '请指定对方名字')continueif ip_or_name in MyRequestHandler.name_list:values = MyRequestHandler.client_dict[ip_or_name]remote_conn = values['sk_conn']send(remote_conn,msg_dict)continueelif msg.lstrip().startswith('stor') or msg.lstrip().startswith('retr') :print ('第一次文件传输交互')msg_bytes = recv_byte(conn)parts = msg.split(' ',3)remote_name= parts[1]client_dict_value = MyRequestHandler.client_dict[remote_name]remote_conn = client_dict_value['sk_conn']cmd = parts[0]send(remote_conn,msg_dict)if cmd == '/stor':print('进来了')send_byte(remote_conn,msg_bytes)print (msg_bytes)print ('发送成功')continueelse:for key, value in MyRequestHandler.client_dict.items():sk_conn = value['sk_conn']send(sk_conn, msg_dict)except Exception as e:print ('意外报错:',e)def handle_is_newuser(address_port,conn,client_name):dict_addr_conn = {}with MyRequestHandler.lock:if client_name in MyRequestHandler.name_list:returnelse:dict_addr_conn['address_port'] = address_portdict_addr_conn['sk_conn'] = connMyRequestHandler.client_dict[client_name] = dict_addr_connMyRequestHandler.name_list.append(client_name)return Trueif __name__ == '__main__':server = socketserver.ThreadingTCPServer(('127.0.0.1', 12345), MyRequestHandler)print("服务器正在运行...")server.serve_forever()
Socketserver客户端代码:
import json
import os
import socket
import struct
from threading import Thread
import sys
import timename = ''
stor_user_list=[]def send_byte(conn,msg):msg__bs_len = len(msg)msg_bs_len_bs = struct.pack('i',msg__bs_len)conn.sendall(msg_bs_len_bs)conn.sendall(msg)def recv_byte(conn):msg_recv_len_bs = conn.recv(4)msg_recv_len = struct.unpack('i', msg_recv_len_bs)[0]msg_recv = conn.recv( msg_recv_len)return msg_recvdef send_name(conn):global namename = input('请取个名字吧:')name_bs = name.encode('utf-8')name_len = len(name_bs)conn.sendall(struct.pack('i', name_len))conn.sendall(name_bs)def send_handle(conn,name_msg):name_msg_json = json.dumps(name_msg)name_msg_json_bs = name_msg_json.encode('utf-8')name_msg_json_bs_len = len(name_msg_json_bs)name_msg_json_bs_len_pack = struct.pack('i', name_msg_json_bs_len)conn.sendall(name_msg_json_bs_len_pack)conn.sendall(name_msg_json_bs)def send(conn):global stor_user_listwhile True:name_msg = {}msg = input()name_msg['name'] = namename_msg['msg'] = msgtry:if msg.upper() == 'Q':# name_msg_json = json.dumps(name_msg)# msg_bs = name_msg_json.encode('utf-8')# msg_len = len(msg_bs)# conn.sendall(struct.pack('i', msg_len))# conn.sendall(msg_bs)send_handle(conn,name_msg)print ('我退出了群聊!')conn.close()sys.exit()if str(msg.lstrip()).startswith('/stor') or str(msg.lstrip()).startswith('/retr') :print('主动发起文件传输(A端)')parts = msg.split(' ', 3)command = parts[0]remote_name = parts[1]localpath = parts[2]# name_msg_json = json.dumps(name_msg)# msg_json_bs = name_msg_json.encode('utf-8')## msg_json_bs_len = len(msg_json_bs)# msg_json_bs_len_pack = struct.pack('i', msg_json_bs_len)## conn.sendall(msg_json_bs_len_pack)# conn.sendall(msg_json_bs )if '/stor' in command:name_byte = {}name_byte['name'] = namename_byte['msg'] = msgsend_handle(conn,name_byte)with open(localpath, mode='rb') as read_file:bytes = read_file.read()print('开始发送文件')send_byte(conn,bytes)print('文件发送成功')sys.stdout.write(f'{name}>>')sys.stdout.flush()continuesend_handle(conn,name_msg)sys.stdout.write(f'{name}>>')sys.stdout.flush()except Exception as e:print('异常报错:', e)sys.exit()def recv_handle(conn):msg_len_pack = conn.recv(4)msg_bs_len = struct.unpack('i', msg_len_pack)[0]msg_bs = conn.recv(msg_bs_len)msg_dict_json = msg_bs.decode('utf-8')msg_dict = json.loads(msg_dict_json)return msg_dictdef recv(conn):global stor_user_listwhile True:try:msg_dict = recv_handle(conn)sys.stdout.write('\r' + ' ' * 100 + '\r') # 覆盖当前行sys.stdout.flush()if isinstance(msg_dict,list):print (msg_dict)elif isinstance(msg_dict,str):#由服务器发送的消息,因此无需以字典格式传输print (msg_dict)elif isinstance(msg_dict,dict):msg = msg_dict['msg']name_msg = msg_dict['name']print (name_msg)if msg.lstrip().startswith('/chat'):parts = msg.split(' ', 2)pri_msg = parts[2]print(f'{name_msg}>>{name} {pri_msg}')if msg.lstrip().startswith('/tree'):parts = msg.split(' ', 2)local_path = parts[2]tree =''def list_tree(path,tree,depth=1):dir_name = os.path.basename(path)tree += str(depth * '|-----')+str(dir_name).strip() + '\n'file_list = os.listdir(path)for file in file_list:filepath = os.path.join(path,file)if os.path.isdir(filepath):tree = list_tree(filepath,tree,depth+1)if os.path.isfile(filepath):tree += str(depth * '|-----') + '|-----' + file + '\n'return treedir_tree =list_tree(local_path,tree)dir_tree_full = '\n' + dir_treeprint (dir_tree_full)msg_dir_tree = {}msg_dir_tree['name'] = namemsg_dir_tree['msg'] = dir_tree_fullsend_handle(conn,msg_dir_tree)if name_msg != name and msg.upper() != 'Q' and not msg.lstrip().startswith('/chat') and not msg.lstrip().startswith('stor') and not msg.lstrip().startswith('retr'):print(f'{name_msg}>> {msg}')if msg.lstrip().startswith('stor') or msg.lstrip().startswith('retr'):msg_bytes = recv_byte(conn)parts = msg_dict['msg'].split(' ',3)command = parts[0]local_path = parts[3]if '/stor' in command:with open(local_path, mode='wb') as writefile:print('开始文件传输(B端)',flush=True)writefile.write(msg_bytes)writefile.flush()os.fsync(writefile.fileno())print ('传输完毕', flush=True)sys.stdout.write(f'{name}>>')sys.stdout.flush()except Exception as e:print ('接收消息出错:',e)if __name__ == '__main__':try:sk = socket.socket()sk.connect(('127.0.0.1', 12345))except Exception as e:print ('socket连接失败',e)sys.exit()send_name(sk)receiver = Thread(target=recv, args=(sk,), daemon=True)receiver.start()send(sk)sk.close()
定义的功能:
查看在线用户:client/all
私聊:/chat [对方名字] [消息内容]
退出群聊:[q] or [Q]
向对方传输文件:/stor [对方名字] [本地文件路径]
显示递归目录树:/tree [对方名字] [远端目录]
PS:
有点bug未修,还有些逻辑未完善(如递归目录树没有单播传递),不过能运行,小问题,你们可以拿去优化一下,我感觉我多线程逻辑也有点狗市,后面了解到websocket就毅然弃坑socketserver了
相关文章:
基于Socketserver+ThreadPoolExecutor+Thread构造的TCP网络实时通信程序
目录 介绍: 源代码: Socketserver-服务端代码 Socketserver客户端代码: 介绍: socketserver是一种传统的传输层网络编程接口,相比WebSocket这种应用层的协议来说,socketserver比较底层,soc…...
[Java 基础]枚举
枚举是一种特殊的类,表示一组固定的常量。枚举跟普通类一样可以用自己的变量、方法和构造函数,构造函数只能使用 private 访问修饰符,所以外部无法调用。 现实生活中的例子: 一周七天(MONDAY ~ SUNDAY) …...

多线程环境中,如果多个线程同时尝试向同一个TCP客户端发送数据,添加同步机制
原代码 public async Task SendToClientAsync(TcpClient targetClient, byte[] data, int offset, int length) {try{// 1. 检查客户端是否有效if (targetClient null || !targetClient.Connected){Console.WriteLine("Cannot send: client is not connected");ret…...

【含文档+PPT+源码】基于微信小程序的旅游论坛系统的设计与实现
项目介绍 本课程演示的是一款基于微信小程序的旅游论坛系统的设计与实现,主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含:项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统 …...

贝叶斯优化+LSTM+时序预测=Nature子刊!
贝叶斯优化与LSTM的融合在时间序列预测领域取得了显著成效,特别是在处理那些涉及众多超参数调整的复杂问题时。 1.这种结合不仅极大提高了预测的精确度,还优化了模型训练流程,提升了效率和成本效益。超参数优化的新篇章:LSTM因其…...
NodeJS全栈WEB3面试题——P3Web3.js / Ethers.js 使用
3.1 Ethers.js 和 Web3.js 的主要区别是什么? 比较点Ethers.jsWeb3.js体积更轻量,适合前端较大,加载慢,适合 Node文档文档简洁、现代化,支持 TypeScript文档丰富,但不够现代化模块化设计高度模块化&#x…...
Quick UI 组件加载到 Axure
将 Quick UI 组件加载到 Axure 的完整指南 Axure 支持通过自定义元件库加载外部 UI 组件库(如 Quick UI),以下是详细的操作流程: 一、准备工作 获取 Quick UI 组件库文件: 下载 .rplib 格式的 Quick UI 元件库文件&a…...

Vue3(ref与reactive)
一,ref创建_基本类型的响应式数据 在 Vue 3 中,ref是创建响应式数据的核心 API 之一 ** ref的基本概念** ref用于创建一个可变的响应式数据引用,适用于任何类型的值(基本类型、对象、数组等)。通过ref包装的值会被转…...

Starrocks中RoaringBitmap杂谈
背景 最近在阅读Starrocks源码的时候,遇到ColumnRefSet的RoaringBitmap使用,所以借此来讨论一下RoaringBitmap这个数据结构,这种思想是很值得借鉴的。 对于的实现可以参考一下 <dependency><groupId>org.roaringbitmap</groupId><…...
通过ca证书的方式设置允许远程访问Docker服务
设置允许远程访问Docker服务 使用场景 环境 系统:anolis7.9 修改Docker服务配置,配置安全证书 生成ca证书到/etc/docker目录中,后续会要用到 #该步骤需要设置密码,后面步骤会要用到,此处设置密码为123456 openss…...

涂胶协作机器人解决方案 | Kinova Link 6 Cobot在涂胶工业的方案应用与价值
涂胶工业现状背景: 涂胶工艺在汽车制造、电子组装、航空航天等工业领域极为关键,关乎产品密封、防水、绝缘性能及外观质量。 然而,传统涂胶作业问题频发。人工操作重复性强易疲劳,涂胶质量波动大;大型涂胶器使用增加工…...
理解继承与组合的本质:Qt 项目中的设计选择指南
文章目录 理解继承与组合的本质:Qt 项目中的设计选择指南一、继承与组合的本质区别1. 继承(Inheritance)2. 组合(Composition) 二、继承的适用场景三、组合的适用场景四、错误使用继承的后果五、判断继承或组合的三问法…...

新手小白使用VMware创建虚拟机安装Linux
新手小白想要练习linux,找不到合适的地方,可以先创建一个虚拟机,在自己创建的虚拟机里面进行练习,接下来我给大家接受一下创建虚拟机的步骤。 VMware选择创建新的虚拟机 选择自定义 硬件兼容性选择第一个,不同的版本&a…...
使用 PHP 和 Guzzle 对接印度股票数据源API
对接 StockTV API 可能涉及获取实时或历史的金融市场数据,如股票价格、交易量、市场新闻等。为了帮助你更好地理解如何使用 PHP 对接 StockTV API,下面我将提供一个通用指南和示例代码。 前提条件 注册并获取API密钥:首先你需要在 StockTV …...

EscapeX:去中心化游戏,开启极限娱乐新体验
VEX 平台推出全新去中心化游戏 EscapeX(数字逃脫),创新性地将大逃杀玩法与区块链技术相融合。用户不仅能畅享紧张刺激的解谜过程,更能在去中心化、公正透明的环境中参与游戏。EscapeX 的上线,为 VEX 生态注入全新活力&…...

使用PyQt5的图形用户界面(GUI)开发教程
文章目录 写在前面一、PyQt5的安装1.1 使用Conda管理环境1.1.1 新建环境1.1.2 conda list和pip list的区别1.1.3 conda install和pip install的区别 1.2 安装PyQt5和Qt Designer1.3 VsCode中配置Qt Designer 二、PyQt5的UI设计2.1 .ui文件设计2.2 .qrc文件建立2.3 qss设计 三、…...
STM32实战:智能环境监测站设计方案
下面是一个基于STM32的智能环境监测站设计方案,使用Keil MDK-ARM开发环境。这个系统集成了多种传感器,并通过OLED显示数据,同时具备数据存储和报警功能。 [STM32F4系列MCU] ├── I2C总线 │ ├── SHT30温湿度传感器 │ ├──…...
猎板硬金镀层厚度:新能源汽车高压系统的可靠性基石
在新能源汽车的电池管理系统(BMS)和电机控制器中,硬金镀层厚度直接关系到高压环境下的电气稳定性与使用寿命。猎板针对车载场景开发的耐电迁移方案(金层 2.5μm,镍层 8μm),经 150℃/85% RH 高压…...
KEYSIGHT是德科技 E5063A 18G ENA系列网络分析仪
KEYSIGHT是德科技 E5063A 18G ENA系列网络分析仪 E5063A ENA 矢量网络分析仪 18GHz 2端口 降低无源射频元器件的测试成本 Keysight E5063A ENA 是一款经济适用的台式矢量网络分析仪,可用于测试简单的无源元器件,例如频率最高达到 18 GHz 的天线、滤…...
VR 虚拟仿真工器具:开启医学新视界的智慧钥匙
VR 虚拟仿真工器具在医疗领域的应用,为医疗行业的发展带来了新的机遇。在手术模拟训练中,它让医生提前熟悉手术流程和操作技巧。对于一些复杂的手术,如心脏搭桥手术、神经外科手术等,手术难度大、风险高,对医生的操作技…...
webshell管理工具、C2远控服务器流量分析
文章目录 一、Webshell管理工具流量分析1. 蚁剑(AntSword)2. 冰蝎(Behinder)3. 哥斯拉(Godzilla)二、常见C2远控服务器流量分析1. Metasploit2. CobaltStrike 三、防御对抗策略总结 一、Webshell管理工具流…...

JavaWeb:前端工程化-TS(TypeScript)
概述 快速入门 常用类型 基础类型 联合类型 函数类型 对象类型 接口Interface Interface和type区别 典型推论...

unity+ spine切换武器不换皮肤解决方案
1.在spine编辑中获取到角色武器插槽名称 这里的武器插槽名称为“zj_22”。角色的spine正常导出到unity中。 2.将需要替换的武器图片单独放在一个spine项目里面,并为每个武器单独建立一个插槽。 而且全部放在根骨骼Root下。 3.将武器的spine动画导出,会…...

[java八股文][MySQL面试篇]SQL基础
NOSQL和SQL的区别? SQL数据库,指关系型数据库 - 主要代表:SQL Server,Oracle,MySQL(开源),PostgreSQL(开源)。 关系型数据库存储结构化数据。这些数据逻辑上以行列二维表的形式存在,每一列代表…...
Ubuntu中SSH服务器安装使用
SSH服务安装 1. 安装 OpenSSH 安装 SSH 服务端(允许远程登录) sudo apt update sudo apt install openssh-server安装 SSH 客户端(用于连接其他服务器) sudo apt install openssh-client2. 检查 SSH 服务状态 sudo systemctl…...

【AI论文】SWE-rebench:一个用于软件工程代理的任务收集和净化评估的自动化管道
摘要:基于LLM的代理在越来越多的软件工程(SWE)任务中显示出有前景的能力。 然而,推进这一领域面临着两个关键挑战。 首先,高质量的训练数据稀缺,尤其是反映现实世界软件工程场景的数据,在这些场…...

Flask文件处理全攻略:安全上传下载与异常处理实战
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storms…...

【算法深练】分组循环:“分”出条理,化繁为简
目录 引言 分组循环 2760. 最长奇偶子数组 1446. 连续字符 1869. 哪种连续子字符串更长 2414. 最长的字母序连续子字符串的长度 3456. 找出长度为 K 的特殊子字符串 1957. 删除字符使字符串变好 674. 最长连续递增序列 978. 最长湍流子数组 2110. 股票平滑下跌阶段的…...

焊缝缺陷焊接缺陷识别分割数据集labelme格式5543张4类别
数据集中有超过一半为增强图片,请认真观察图片预览 数据集格式:labelme格式(不包含mask文件,仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数):5543 标注数量(json文件个数):5543 标注类别数:4…...

关于scrapy在pycharm中run可以运行,但是debug不行的问题
关于scrapy在pycharm中run模式可以运行,但是debug模式不行的问题 文章目录 关于scrapy在pycharm中run模式可以运行,但是debug模式不行的问题查了下原因 点击run就可以运行,但是debug就是运行不了 一点击debug就报这个错,也不知道啥…...