用 Python 从零构建异步回显服务器
简介
让我们从 0 开始,搭建一个异步服务输出服务器。
套接字
套接字(socket),是不同计算机中实现通信的一种方式,你可以理解成一个接口,它会在客户端和服务端建立连接,一台发送数据,一台接收数据,靠的就是套接字。
阻塞套接字服务器
import socket# socket.AF_INET: IPv4 主机号 + 端口号
# socket.SOCK_STREAM: TCP 连接
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# socket.SOL_SOCKET: 套接字选项
# socket.SO_REUSEADDR: 允许重用本地地址,避免端口被占用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('localhost', 8000)
server_socket.bind(server_address) # 绑定地址和端口号
# socket.listen(): 监听连接请求
server_socket.listen()connection, client_address = server_socket.accept()
print(f'我获取了一个连接地址: {client_address}')
启动后,尝试连接服务器
# 启动服务器python server.py# 连接服务器
nc localhost 8000# 输出
我获取了一个连接地址: ('127.0.0.1', 60525)
从套接字读取和写入数据
前面的示例只能接收一次连接,而且无法读取和写入数据,让我们修改一下
import socket# socket.AF_INET: IPv4 主机号 + 端口号
# socket.SOCK_STREAM: TCP 连接
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# socket.SOL_SOCKET: 套接字选项
# socket.SO_REUSEADDR: 允许重用本地地址,避免端口被占用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('localhost', 8000)
server_socket.bind(server_address) # 绑定地址和端口号
# socket.listen(): 监听连接请求
server_socket.listen()try:connection, client_address = server_socket.accept()print(f'我获取了一个连接地址: {client_address}')buffer = b''while buffer[-5:] != b'quit\n':data = connection.recv(1024)if not data:breakelse:buffer += dataprint(f'接收数据: {data}')print(f'接收到的所有数据: {buffer}')connection.sendall(buffer)finally:server_socket.close()
这里最重要的就是 recv 方法,它可以从套接字中接收数据,并且写入服务端,然后使用 sendall 方法将接收到的数据,最后全部发送给客户端。
允许多个连接的服务器
如果你尝试使用多个客户端连接上面的服务器,你会发现只有第一个会生效,让我们改写一下,让它可以支持连接多个客户端。
import socket# socket.AF_INET: IPv4 主机号 + 端口号
# socket.SOCK_STREAM: TCP 连接
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# socket.SOL_SOCKET: 套接字选项
# socket.SO_REUSEADDR: 允许重用本地地址,避免端口被占用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('localhost', 8000)
server_socket.bind(server_address) # 绑定地址和端口号
# socket.listen(): 监听连接请求
server_socket.listen()connections = []try:while True:connection, client_address = server_socket.accept()print(f'我获取了一个连接地址: {client_address}')buffer = b''while buffer[-5:] != b'quit\n':data = connection.recv(1024)if not data:breakelse:buffer += dataprint(f'接收数据: {data}')print(f'接收到的所有数据: {buffer}')connection.sendall(buffer)finally:server_socket.close()
对比一下代码,你会发现,只不过是把每一个连接放入一个列表中,然后循环遍历接收数据,但是这段示例依然不够优秀,当运行时你会发现,每次第二次连接都会阻塞,都会要第一次连接断开后,第二次连接才会生效。
原因是因为我们使用的是阻塞套接字,想要允许多个客户端连接,我们需要使用非阻塞套接字。
使用非阻塞套接字
python 套接字中设置非阻塞套接字比较简单,一行代码就搞定了setblocking(False)
import socket# socket.AF_INET: IPv4 主机号 + 端口号
# socket.SOCK_STREAM: TCP 连接
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# socket.SOL_SOCKET: 套接字选项
# socket.SO_REUSEADDR: 允许重用本地地址,避免端口被占用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('localhost', 8000)
server_socket.bind(server_address) # 绑定地址和端口号
# socket.listen(): 监听连接请求
server_socket.listen()
server_socket.setblocking(False) # 设置为非阻塞模式connections = []try:while True:try:connection, client_address = server_socket.accept()connection.setblocking(False)print(f'我获取了一个连接地址: {client_address}')connections.append(connection)print(f'当前连接数: {len(connections)}')except BlockingIOError:passfor connection in connections:try:buffer = b''while buffer[-5:] != b'quit\n':data = connection.recv(1024)if not data:breakelse:buffer += dataprint(f'接收数据: {data}')print(f'接收到的所有数据: {buffer}')connection.send(buffer)except BlockingIOError:passfinally:server_socket.close()
尝试运行,会发现 cpu 飙到 100% ,我们需要更好的方法。
使用 selectors 模块优化
Python 的选择器模块是连接操作系统的低层接口,cpu 消耗很少。
import selectors
import socketselector = selectors.DefaultSelector()server_socket = socket.socket()
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('127.0.0.1', 8000)
server_socket.setblocking(False)
server_socket.bind(server_address)
server_socket.listen()selector.register(server_socket, selectors.EVENT_READ)while True:events = selector.select(timeout=1)if len(events) == 0:print('没有事件,等待一会吧')for event, _ in events:event_socket = event.fileobjif event_socket == server_socket:client_socket, client_address = server_socket.accept()print(f'连接来自 {client_address}')client_socket.setblocking(False)selector.register(client_socket, selectors.EVENT_READ)else:data = event_socket.recv(1024)if data:print(f'我获取了数据 {data}')event_socket.send(b'You Got ' + data)else:print('客户端断开连接')selector.unregister(event_socket)event_socket.close()
代码解释
# 创建选择器(自动选择适合当前操作系统的机制)
selector = selectors.DefaultSelector()# 创建服务器 socket 并设置参数
server_socket = socket.socket()
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 注册服务器 socket,监听“可读事件” (EVENT_READ)
# 意思是:当有客户端连接进来,我们就会收到事件通知。
selector.register(server_socket, selectors.EVENT_READ)# 返回所有“发生了事件的 socket 对象”,等待最多 1 秒;
events = selector.select(timeout=1)
因为 selectors 模块是操作系统级别的高效事件通知系统,所以使用这个程序它的 CPU 使用率很低。
Python 自带的 asyncio 默认事件循环基于 selectors 模块
使用 asyncio 构建
回显服务器
import asyncio
import socketasync def echo(connection, loop):while data := await loop.sock_recv(connection, 1024):await loop.sock_sendall(connection, data)async def listen_for_connection(server_socket, loop):while True:connection, address = await loop.sock_accept(server_socket)connection.setblocking(False)print(f'获取到连接: {address}')asyncio.create_task(echo(connection, loop))async def main():server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('localhost', 8000)server_socket.setblocking(False)server_socket.bind(server_address)server_socket.listen()await listen_for_connection(server_socket, asyncio.get_event_loop())asyncio.run(main())
:=是 python3.8 新增的海象运算符
while data := await ...相当于:data = await ...
while data:
解决服务器的错误任务
import asyncio
import socket
import loggingasync def echo(connection, loop):try:while data := await loop.sock_recv(connection, 1024):if data == b'boom\n':raise Exception('Boom!')await loop.sock_sendall(connection, data)except Exception as e:logging.exception(f'服务器报错啦: {e}')finally:connection.close()print('关闭连接')async def listen_for_connection(server_socket, loop):while True:connection, address = await loop.sock_accept(server_socket)connection.setblocking(False)print(f'获取到连接: {address}')asyncio.create_task(echo(connection, loop))async def main():server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('localhost', 8000)server_socket.setblocking(False)server_socket.bind(server_address)server_socket.listen()await listen_for_connection(server_socket, asyncio.get_event_loop())asyncio.run(main())
相关文章:
用 Python 从零构建异步回显服务器
简介 让我们从 0 开始,搭建一个异步服务输出服务器。 套接字 套接字(socket),是不同计算机中实现通信的一种方式,你可以理解成一个接口,它会在客户端和服务端建立连接,一台发送数据ÿ…...
mybatis--多对一处理/一对多处理
多对一处理(association) 多个学生对一个老师 对于学生这边,关联:多个学生,关联一个老师[多对一] 对于老师而言,集合,一个老师有多个学生【一对多】 SQL: 测试环境搭建 1.导入依…...
QT Sqlite数据库-教程002 查询数据-上
【1】DQL语句: DQL语句(数据查询语言),用来查询数据记录。DQL 基本结构由 SELECT FROM、WHERE、JOIN 等子句构成。DQL 语句并不会改变数据库,而是让数据库将查询结果发送结果集给客户端,返回的结果是一张虚…...
Kubernetes Operator 是什么,以及它们的用途
最近一个朋友问我关于EKS的升级问题: 场景: 如果我有 100 个 EKS 集群需要升级,那么所有集群都安装了安全插件。由于我不想在升级后手动在每个EKS中重复安装此插件,如何实现EKS升级后自动安装这个安全插件? 答案有多…...
鸿蒙开发08-泛型类型和函数
泛型类型和函数允许创建的代码在各种类型上运行,而不仅支持单一类型,类型参数是一种特殊的变量,它代表类型而不是值。 泛型函数的基本语法如下: function 函数名<类型参数>(参数: 类型参数): 类型参数 {// 函数体return 参…...
C语言十大经典数学应用
C语言在解决数学问题方面非常有用,因为它提供了丰富的数学函数和运算符。以下是一些经典的C语言数学题,这些题目可以帮助你提高编程和数学能力。 1. 计算圆的面积 给定圆的半径,计算圆的面积。 #include <stdio.h> #include <mat…...
计算机视觉——图像金字塔与目标图像边缘检测原理与实践
一、两个图像块之间的相似性或距离度量 1.1 平方差和(SSD) 平方差和(SSD) 是一种常用的图像相似性度量方法。它通过计算两个图像在每个对应位置的像素值差的平方和来衡量两个图像之间的整体差异。如果两个图像在每个位置的像素值…...
VRoid-Blender-Unity个人工作流笔记
流程 VRoid 选配模型>减面、减材质>导出vrm Blender(先有CATS、vrm插件) 导入vrm>Fix model>修骨骼>导出fbx Unity 找回贴图、改着色器、调着色器参数…… VRoid 减面 以模型不出现明显棱角为准。脸好像减面100也问题不大。 下…...
Domain Adaptation领域自适应
背景与问题定义 传统监督学习假设:训练集与测试集数据分布一致。 Domain Shift:测试数据分布与训练数据不同,模型泛化性能骤降 。 例如在黑白图像上训练数字分类器,测试时用彩色图像,准确率骤降。 Domain Adaptatio…...
从自动测量、8D响应到供应链协同的全链路质量管理数字化方案——全星QMS如何破解汽车行业质量困局
全星QMS如何破解汽车行业质量困局:从自动测量、8D响应到供应链协同的全链路数字化方案 在当今竞争激烈的市场环境中,企业要想脱颖而出,必须确保产品质量的稳定性和可靠性。 全星质量QMS软件系统凭借其强大的功能和灵活的架构,为企…...
UNITY 屏幕UI自适应
1.主要就是根据屏幕的选择根据尺寸 和UI的锚点和中心点来选择,也可以通过代码来动态修改 2.参考视频:Unity UGUI屏幕自适应看这个就够了_哔哩哔哩_bilibili...
【信息系统项目管理师】高分论文:论信息系统项目的范围管理(投资信息化全流程管理项目)
更多内容请见: 备考信息系统项目管理师-专栏介绍和目录 文章目录 1、规划范围管理2、收集需求3、定义范围4、创建wbs5、确认范围6、控制范围2018年2月,我有幸参加了 XX省自贸区财政投资信息化全流程管理项目的假设,作为项目发起单位,省自贸办经过审时度势,及时响应国家自贸…...
联想电脑开机出现Defalut Boot Device Missing or Boot Failed怎么办
目录 一、恢复bios默认设置 二、关机重启 三、“物理”方法 在图书馆敲代码时,去吃了午饭回来发现刚开机就出现了下图的问题(崩溃),想起之前也发生过一次 这样的问题,现在把我用到的方法写在下面,可能对…...
newbee商城购物车模块mapper.xml
1.浏览代码 1)表 自定义 DROP TABLE IF EXISTS tb_newbee_mall_shopping_cart_item; CREATE TABLE tb_newbee_mall_shopping_cart_item (cart_item_id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 购物项主键id,user_id bigint(20) NOT NULL COMMENT 用户主键id…...
SQL学习笔记-聚合查询
非聚合查询和聚合查询的概念及差别 1. 非聚合查询 非聚合查询(Non-Aggregate Query)是指不使用聚合函数的查询。这类查询通常用于从表中检索具体的行和列数据,返回的结果是表中的原始数据。 示例 假设有一个名为 employees 的表ÿ…...
【Vue 3 + Element Plus 实现产品标签的动态添加、删除与回显】
🚀Vue 3 Element Plus 实现产品标签的动态添加、删除与回显 在后台管理系统中,我们经常需要对表单数据进行动态处理,尤其是类似“产品标签”这样的字段,它需要用户能够灵活添加、删除,并在编辑时自动回显。今天我们就…...
【NLP】23.小结:选择60题
Question 1: What does the fixed lookup table in traditional NLP represent? A. A table of one‐hot vectors B. A table of pre‐trained dense word embeddings C. A dictionary of word definitions D. A table of n-gram counts Answer (中文): 答案选 B。传统NLP中“…...
IntelliJ 配置(二)配置相关类库(2)LineMarkerProvider
一、介绍 LineMarkerProvider 是 IntelliJ 平台插件开发中的一个接口,它的作用是在编辑器左侧的“行标记区域”(就是代码行号左边那一栏)添加各种图标、标记或导航链接。比如Java 类中看到的: 小绿色三角形(可以点击运…...
从零开始学java--线性表
数据结构基础 目录 数据结构基础 线性表 顺序表 链表 顺序表和链表的区别: 栈 队列 线性表 线性表(linear list)是n个具有相同特性的数据元素的有限序列。 线性表中的元素个数就是线性表的长度,表的起始位置称为表头&am…...
AD917X系列JESD204B MODE7使用
MODE7特殊在F8,M4使用2个复数通道 CH0_NCO10MHz CH1_NCO30MHZ DP_NCO50MHz DDS1偏移20MHz DDS2偏移40MHz...
Spring Cloud之远程调用OpenFeign最佳实践
目录 OpenFeign最佳实践 问题引入 Feign 继承方式 创建Module 引入依赖 编写接口 打Jar包 服务提供方 服务消费方 启动服务并访问 Feign 抽取方式 创建Module 引入依赖 编写接口 打Jar包 服务消费方 启动服务并访问 服务部署 修改pom.xml文件 观察Nacos控制…...
【Python爬虫】详细入门指南
目录 一、简单介绍 二、详细工作流程以及组成部分 三、 简单案例实现 一、简单介绍 在当今数字化信息飞速发展的时代,数据的获取与分析变得愈发重要,而网络爬虫技术作为一种能够从互联网海量信息中自动抓取所需数据的有效手段,正逐渐走入…...
Win11系统 VMware虚拟机 安装教程
Win11系统 VMware虚拟机 安装教程 一、介绍 Windows 11是由微软公司(Microsoft)开发的操作系统,应用于计算机和平板电脑等设备 。于2021年6月24日发布 ,2021年10月5日发行 。 Windows 11提供了许多创新功能,增加了新…...
打造AI应用基础设施:Milvus向量数据库部署与运维
目录 打造AI应用基础设施:Milvus向量数据库部署与运维1. Milvus介绍1.1 什么是向量数据库?1.2 Milvus主要特点 2. Milvus部署方案对比2.1 Milvus Lite2.2 Milvus Standalone2.3 Milvus Distributed2.4 部署方案对比表 3. Milvus部署操作命令实战3.1 Milv…...
对于客户端数据存储方案——SQLite的思考
SQLite 比较适合进行本地小型数据的存储,在功能丰富性和并发能力上不如 MySQL。 数据类型差异 SQLite 使用动态类型系统:只有 5 种基本存储类 (NULL, INTEGER, REAL, TEXT, BLOB) 类型亲和性:SQLite 会将声明的列类型映射到最接近的存储类 …...
【深度学习与大模型基础】第11章-Bernoulli分布,Multinoulli分布
一、Bernoulli分布 1. 基本概念 想象你抛一枚硬币: 正面朝上(记为 1)概率是 p(比如 0.6)。 反面朝上(记为 0)概率是 1-p(比如 0.4)。 这就是一个Bernoulli分布&…...
基于Windows通过nginx代理访问Oracle数据库
基于Windows通过nginx代理访问Oracle数据库 环境说明: 生产环境是一套一主一备的ADG架构服务器,用户需要访问生产数据,基于安全考虑,生产IP地址不能直接对外服务,所以需要在DMZ部署一个前置机,并在该前置机…...
北斗和GPS信号频率重叠-兼容与互操作
越来越多的同学们发现北斗三代信号的B1C,B2a信号居然和美国GPS L1,L5处在同样频率上? 为什么美国会允许这样的事情发生?同频率难道不干扰彼此的信号吗? 思博伦卫星导航技术支持文章TED 这事得从2006年联合国成立全球卫星导航系统…...
python爬虫:喜马拉雅案例(破解sign值)
声明: 本文章中所有内容仅供学习交流使用,不用于其他任何目的,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关! 根据上一篇文章,我们破解了本网站的,手机号和密码验证&#x…...
如何高效查询订单销售情况与售罄率:从SQL到架构优化的全流程设计
在电商平台、SaaS多租户系统中,订单数据作为核心数据之一,承载了关键的运营指标,如销售额、商品售罄率、订单转化等。随着数据量的持续增长,如何在大数据量条件下快速、稳定地获取统计信息,成为系统设计的重点之一。 本文将从查询目标分析入手,结合数据库设计优化与典型…...
