当前位置: 首页 > news >正文

python的websocket方法教程

WebSocket是一种网络通信协议,它在单个TCP连接上提供全双工的通信信道。在本篇文章中,我们将探讨如何在Python中使用WebSocket实现实时通信。

websockets是Python中最常用的网络库之一,也是websocket协议的Python实现。它不仅作为基础组件在众多项目中发挥着重要作用,其源码也值得广大“Python玩家”研究。
官网:https://github.com/python-websockets/websockets

1. 什么是WebSocket?

WebSocket协议是在2008年由Web应用程序设计师和开发人员创建的,目的是为了在Web浏览器和服务器之间提供更高效、更低延迟的双向通信。它允许客户端和服务器在任何时候发送消息,无需重新建立TCP连接。WebSocket可以在Web浏览器和服务器之间传输文本和二进制数据,使得构建实时Web应用程序变得更加简单。

2. 在Python中使用WebSocket

Python中有多个库可以帮助我们使用WebSocket,如:websockets、aiohttp等。在本文中,我们将使用websockets库来演示WebSocket编程。

要安装websockets库,你可以使用pip:

pip install websockets

3. 创建WebSocket服务器

使用websockets库,我们可以轻松地创建一个WebSocket服务器。以下是一个简单的示例:

import asyncio
import websocketsasync def echo(websocket, path):async for message in websocket:print(f"Received message: {message}")await websocket.send(f"Echo: {message}")start_server = websockets.serve(echo, "localhost", 8765)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在这个示例中,我们定义了一个名为echo的协程函数,它接收两个参数:websocket和path。该函数使用async for循环读取客户端发送的消息,并将消息发送回客户端。

然后,我们使用websockets.serve()函数创建一个WebSocket服务器,监听本地主机的8765端口。最后,我们使用asyncio的事件循环启动服务器。

4. 创建WebSocket客户端

要创建一个WebSocket客户端,我们同样可以使用websockets库。以下是一个简单的客户端示例:

import asyncio
import websocketsasync def main():async with websockets.connect("ws://localhost:8765") as websocket:message = "Hello, server!"await websocket.send(message)print(f"Sent: {message}")response = await websocket.recv()print(f"Received: {response}")asyncio.run(main())

在这个示例中,我们使用websockets.connect()函数建立与WebSocket服务器的连接。然后,我们使用send()方法向服务器发送消息,并使用recv()方法接收服务器的响应。

5. 总结

WebSocket协议为Web浏览器和服务器之间提供了实时双向通信的能力,使得构建实时Web应用程序变得更加容易。在Python中,我们可以使用websockets库轻松地实现WebSocket编程。

6. 通过websockets这个项目,从大型开源项目中学习asyncio库。

一、asyncio.Transport
在官方文档中,Transport被描述成对socket的抽象,它控制着如何传输数据。除了websockets,uvicorn、daphne等ASGI实现都会用到Transport。

Transport继承于ReadTransport和WriteTransport,两者都继承于BaseTransport。顾名思义,Transport兼备读和写的功能,可以类比为读写socket对象。
在这里插入图片描述

Transport对象提供以下常用函数——

is_reading:判断该Transport是否在读。

set_write_buffer_limits:设置写入Transport的高和低水位。考虑到网络状况,有时不希望写入过多的数据。

write、write_eof、write_line:为当前Transport写入数据,分别表示写入二进制数据、eof和二进制行数据。其中eof写入后不会关闭Transport,但会flush数据。

abort:立刻关闭Transport,不接受新的数据。留在缓冲的数据也会丢失,后续调用Protocol的connection_lost函数。

在websockets中,Transport使用场景不多,一般都是通过Protocol对象的回调参数使用的。在websocket的初始化过程中,会设置Transport的最高水位。同样,在这种场景下,该对象也是作为回调参数使用的。
在这里插入图片描述

二、asyncio.Protocol
如果Transport是对socket的抽象,那么Protocol就是对协议的抽象。它提供了如何使用Transport的方式。
在这里插入图片描述

用户使用的Protocol直接继承自BaseProtocol,并提供了六个Unimplemented函数需要用户去实现——

connection_made:当连接建立时会执行该函数,该函数包含一个Transport类型的参数。

connection_lost:当连接丢失或者关闭时会执行该函数,该函数包含一个Exception类型的参数。

pause_writing:当Transport对象写入的数据高于之前设置的高水位时被调用,一般会暂停数据的写入。

resume_writing:当Transport对象写入的数据低于之前设置的低水位时被调用,一般用于恢复数据写入。

data_received:当有数据被接受时回调,该函数包含一个二进制对象data,用来表示接受的数据。

eof_received:当被Transport对象被调用write_eof时被调用。

在websockets中,server端的connection_made实现截图如图所示。在该函数中,websockets将用户实现的handler封装成task对象,并和websocket的server绑定。
在这里插入图片描述

而在client端中实现如第一节截图所示,只是在reader中注册该Transport对象。

websockets的connection_lost函数实现方式如下。主要操作即更新状态、关闭pings、更新对应的waiter状态,以及维护reader对象。
在这里插入图片描述

在其他函数的实现中,websockets也主要用到了reader对象完成数据流的暂停和恢复,以及数据的写入。

从上面代码实现可以看出,websockets通过reader代理完成数据流的操作。这个reader是一个asyncio.StreamReader对象。这个对象具体如何使用将在下一篇介绍。

附录:进阶版本:

python使用websockets库
serve:在server端使用,等待客户端的连接。如果连接成功,返回一个websocket。

connect: 在client端使用,用于建立连接。

send:发送数据

recv:接收数据

close:关闭连接

服务端

#!/usr/bin/python3
# 主要功能:创建1个基本的websocket server, 符合asyncio 开发要求
import asyncio
import websockets
from datetime import datetimeasync def handler(websocket):data = await websocket.recv()reply = f"Data received as \"{data}\".  time: {datetime.now()}"print(reply)await websocket.send(reply)print("Send reply")async def main():async with websockets.serve(handler, "localhost", 9999):await asyncio.Future()  # run foreverif __name__ == "__main__":asyncio.run(main())

客户端

import asyncio
import websockets
import timeasync def ws_client(url):for i in range(1, 40):async with websockets.connect(url) as websocket:await websocket.send("Hello, I am PyPy.")response = await websocket.recv()print(response)time.sleep(1)asyncio.run(ws_client('ws://localhost:9999'))

服务端

import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"# 握手,通过接收Hi,发送"success"来进行双方的握手。
async def serverHands(websocket):while True:recv_text = await websocket.recv()print("recv_text=" + recv_text)if recv_text == "Hi":print("connected success")await websocket.send("success")return Trueelse:await websocket.send("connected fail")# 接收从客户端发来的消息并处理,再返给客户端success
async def serverRecv(websocket):while True:recv_text = await websocket.recv()print("recv:", recv_text)await websocket.send("success,get mess:"+ recv_text)# 握手并且接收数据
async def serverRun(websocket, path):print(path)await serverHands(websocket)await serverRecv(websocket)# main function
if __name__ == '__main__':print("======server======")server = websockets.serve(serverRun, IP_ADDR, IP_PORT)asyncio.get_event_loop().run_until_complete(server)asyncio.get_event_loop().run_forever()

客户端

import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"async def clientHands(websocket):while True:# 通过发送hello握手await websocket.send("Hi")response_str = await websocket.recv()# 接收"success"来进行双方的握手if "success" in response_str:print("握手成功")return True# 向服务器端发送消息
async def clientSend(websocket):while True:input_text = input("input text: ")if input_text == "exit":print(f'"exit", bye!')await websocket.close(reason="exit")return Falseawait websocket.send(input_text)recv_text = await websocket.recv()print(f"{recv_text}")# 进行websocket连接
async def clientRun():ipaddress = IP_ADDR + ":" + IP_PORTasync with websockets.connect("ws://" + ipaddress) as websocket:await clientHands(websocket)await clientSend(websocket)# main function
if __name__ == '__main__':print("======client======")asyncio.get_event_loop().run_until_complete(clientRun())

服务端

# -*- coding:utf8 -*-import json
import socket
import asyncio
import logging
import websockets
import multiprocessingIP = '127.0.0.1'
PORT_CHAT = 9090USERS ={}#提供聊天的后台
async def ServerWs(websocket,path):logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',filename="chat.log",level=logging.INFO)# 握手await websocket.send(json.dumps({"type": "handshake"}))async for message in websocket:data = json.loads(message)message = ''# 用户发信息if data["type"] == 'send':name = '404'for k, v in USERS.items():if v == websocket:name = kdata["from"] = nameif len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "user", "content": data["content"], "from": name})# 用户注册elif data["type"] == 'register':try:USERS[data["uuid"]] = websocketif len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "login", "content": data["content"], "user_list": list(USERS.keys())})except Exception as exp:print(exp)# 用户注销elif data["type"] == 'unregister':del USERS[data["uuid"]]if len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "logout", "content": data["content"], "user_list": list(USERS.keys())})#打印日志logging.info(data)# 群发await asyncio.wait([user.send(message) for user in USERS.values()])def server_run():print("server")start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()if __name__ == "__main__":from multiprocessing import Processmultiprocessing.freeze_support()server = Process(target=server_run, daemon=False)server.start()

服务端

import asyncio
import websockets
import time
import json
import threading
# 功能模块
class OutputHandler():async def run(self,message,send_ms,websocket):# 用户发信息await send_ms(message, websocket)# 单发消息# await send_ms(message, websocket)# 群发消息#await s('hi起来')# 存储所有的客户端
Clients = {}# 服务端
class WS_Server():def __init__(self):self.ip = "127.0.0.1"self.port = 9090# 回调函数(发消息给客户端)async def callback_send(self, msg, websocket=None):await self.sendMsg(msg, websocket)# 发送消息async def sendMsg(self, msg, websocket):print('sendMsg:', msg)# websocket不为空,单发,为空,群发消息if websocket != None:await websocket.send(msg)else:# 群发消息await self.broadcastMsg(msg)# 避免被卡线程await asyncio.sleep(0.2)# 群发消息async def broadcastMsg(self, msg):for user in Clients:await user.send(msg)# 针对不同的信息进行请求,可以考虑json文本async def runCaseX(self,jsonMsg,websocket):print('runCase')op = OutputHandler()# 参数:消息、方法、socketawait op.run(jsonMsg,self.callback_send,websocket)# 连接一个客户端,起一个循环监听async def echo(self,websocket, path):# 添加到客户端列表# Clients.append(websocket)# 握手await websocket.send(json.dumps({"type": "handshake"}))# 循环监听while True:# 接受信息try:# 接受文本recv_text = await websocket.recv()message = "Get message: {}".format(recv_text)# 返回客户端信息await websocket.send(message)# 转jsondata = json.loads(recv_text)# 用户发信息if data["type"] == 'send':name = '404'for k, v in Clients.items():if v == websocket:name = kdata["from"] = nameif len(Clients) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "send", "content": data["content"], "from": name})await self.runCaseX(jsonMsg=message, websocket=websocket)# 用户注册elif data["type"] == 'register':try:Clients[data["uuid"]] = websocketif len(Clients) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())})await self.runCaseX(jsonMsg=message, websocket=websocket)except Exception as exp:print(exp)# 用户注销elif data["type"] == 'unregister':del Clients[data["uuid"]]# 对message进行解析,跳进不同功能区# await self.runCaseX(jsonMsg=data,websocket=websocket)# 链接断开except websockets.ConnectionClosed:print("ConnectionClosed...", path)# del Clientsbreak# 无效状态except websockets.InvalidState:print("InvalidState...")# del Clientsbreak# 报错except Exception as e:print("ws连接报错",e)# del Clientsbreak# 启动服务器async def runServer(self):async with websockets.serve(self.echo, self.ip, self.port):await asyncio.Future()  # run forever# 多协程模式,防止阻塞主线程无法做其他事情def WebSocketServer(self):asyncio.run(self.runServer())# 多线程启动def startServer(self):# 多线程启动,否则会堵塞thread = threading.Thread(target=self.WebSocketServer)thread.start()# thread.join()if __name__=='__main__':print("server")s = WS_Server()s.startServer()

相关文章:

python的websocket方法教程

WebSocket是一种网络通信协议,它在单个TCP连接上提供全双工的通信信道。在本篇文章中,我们将探讨如何在Python中使用WebSocket实现实时通信。 websockets是Python中最常用的网络库之一,也是websocket协议的Python实现。它不仅作为基础组件在…...

Qt处理焦点事件(获得焦点,失去焦点)

背景: 我只是想处理焦点动作,由于懒,上网一搜,排名靠前的一位朋友,使用重写部件的方式实现。还是因为懒,所以感觉复杂了。于是又花了一分钟解决了一下。 所以记录下来,以免以后忘了。 思路&a…...

SiteGround如何设置WordPress网站自动更新

SiteGround Autoupdate功能会自动帮我们更新在他们这里托管的所有WordPress网站,这样做是为了保证网站安全,并且让它们一直保持最新状态。他们会根据我们选择的设置自动更新不同版本的WordPress,包括主要版本和次要版本。在每次自动更新之前&…...

http代理和SOCK5代理谁更安全?

在这个网络化的时代,我们常常听到HTTP代理和SOCKS5代理这两个名词,不过很多人并不了解是什么意思。今天,我们将揭开这两种代理的神秘面纱,看看到底HTTP代理和SOCKS5代理哪个更安全? HTTP代理:高效通信的“枢…...

Kotlin关键字二——constructor和init

在关键字一——var和val中最后提到了构造函数,这里就学习下构造函数相关的关键字: constructor和init。 主要构造(primary constructor) kotlin和java一样,在定义类时就自动生成了无参构造 // 会生成默认的无参构造函数 class Person{ }与java不同的是…...

java的long类型超过9位报错:the literal 987654321000 of type int is out of range

java的long类型超过9位报错 1、报错提示2、报错截图3、解决办法4、参考文章 1、报错提示 the literal 987654321000 of type int is out of range 2、报错截图 3、解决办法 long类型是一种用于表示较大整数的数据类型,范围比int类型更广泛。然而,即使…...

【Java期末复习资料】(4)模拟卷

有不会的题可以后台问我的哦,看见了就会回。 本文章主要是模拟卷,知识点例题简答题已经发过了,可以在主页专栏Java中找一下 一、单项选择题 1. 编译 Java Application 源程序文件将产生相应的字节码文件,这些字节码文件的扩展名为…...

【计算机网络】UDP报文详解

目录 一. UDP协议概述 二. UDP报文格式 首部 三. UDP的缓冲区 一. UDP协议概述 UDP——用户数据报协议,是传输层的一个重要协议 基于UDP的应用层协议有:DNS,TFTP,SNMP,NTP 协议全称默认端口号DNSDomain Name Se…...

排序算法——归并排序

归并排序(Merge Sort)是计算机科学中非常重要的排序算法之一。它不仅高效、稳定,而且是许多高级排序技术和算法思想的基础。在本文中,我们将深入探讨归并排序的原理、实现方法,以及它的优缺点。 1. 归并排序的原理 归…...

2023 年安徽省职业院校技能大赛高职组“软件测试”赛项样题

2023 年安徽省职业院校技能大赛 高职组“软件测试”赛项样题 目录 任务一:功能测试(45 分) 1、测试计划(5 分) 2、测试用例(15 分) 3、Bug 清单(20 分) 4、测试报告&…...

Mysql8和Oracle实际项目中递归查询树形结构

背景: 项目升级,引入MySQL数据库,之前一直用的是Oracle数据,在做用户登录单位维护的时候,需要返回该用户所属单位下的所有子单位。下边是模拟项目数据实践的过程。 数据准备: 准备一张单位表&#xff0c…...

docker mysql8 设置不区分大小写

docker安装Mysql8.0的坑之lower_case_table_names_docker mysql lower_case_table_names-CSDN博客https://blog.csdn.net/p793049488/article/details/108365929 docker run ‐di ‐‐nametensquare_mysql ‐p 33306:3306 ‐e MYSQL_ROOT_PASSWORD123456 mysql...

Audio Siganl (MATLAB) 代码学习—常见问题3

问题描述 生成信号y1: 8000个样本,1000个周期,幅度为0.85的余弦信号。若信号的持续时间为1s,则采样频率和信号频率为多少。生成信号y2: 持续时间为1s,幅度为0.7,频率为500Hz,相位为 π / 4 \pi/4 π/4生成信号y:y_1+y_2绘制前200ms的y信号示意图计算y的DFT绘制频域示意图…...

【PTA题目】7-8 矩阵运算 分数 10

7-8 矩阵运算 分数 10 全屏浏览题目 切换布局 作者 C课程组 单位 浙江大学 给定一个nn的方阵,本题要求计算该矩阵除副对角线、最后一列和最后一行以外的所有元素之和。副对角线为从矩阵的右上角至左下角的连线。 输入格式: 输入第一行给出正整数n(…...

Ubuntu20.04创建并挂在zfs池

Ubuntu 下使用 ZFS [适用于中高级用户] 主磁盘上清洁安装带有ZFS的Ubuntu后,可以开始体验其特性。 所有ZFS配置过程都需要命令行。 我不知道有GUI工具。 创建一个 ZFS 池 本节仅适用于具有多个磁盘的系统。 如果只有一个磁盘,Ubuntu会在安装时自动创建…...

x的平方根算法(leetcode第69题)

题目描述: 给你一个非负整数 x ,计算并返回 x 的 算术平方根 。由于返回类型是整数,结果只保留 整数部分 ,小数部分将被 舍去 。注意:不允许使用任何内置指数函数和算符,例如 pow(x, 0.5) 或者 x ** 0.5 。…...

打破空间限制,畅享真实生活

直播已经成为了当今社会中非常流行的一种娱乐方式,也是人们获取信息和互动的重要渠道之一。而无绿幕直播,则是近年来兴起的一种特殊形式,它打破了以往直播的空间限制,让观众们能够更贴近主播,更真实地感受到直播背后的…...

Python基础期末复习 新手 2

虽然age 10在__init__方法中定义了一个局部变量age,但这个局部变量并不会影响类属性age的值。类属性是在类级别上定义的,不属于任何一个实例。因此,在创建实例s1和s2时,它们的age属性值都为类属性的初始值0。 尽管对类的属性值进…...

Java接入ChatGPT接口简单示例

我们定义了一个名为ChartGPTConfig的类,它有两个私有成员变量apiKey和apiUrl,分别表示ChartGPT的API密钥和API URL。 public class ChartGPTConfig {private final String apiKey;private final String apiUrl;public ChartGPTConfig(String apiKey, St…...

解决夜神模拟器与Android studio自动断开的问题

原因:夜神模拟器的adb版本和Android sdk的adb版本不一致 解决办法: 1.找到android的sdk (1)File--->Project Structure (2)SDK Location:记下sdk的位置 2.找到sdk中的adb文件 SDK-->platform-tools-->adb.exe 3.复制…...

深度解析 Claude Code v2.1.88 源码:技术栈与底层实现全揭秘(基于流出架构资料)

深度解析 Claude Code v2.1.88 源码:技术栈与底层实现全揭秘(基于流出架构资料) 摘要:2026年3月31日,Claude Code v2.1.88 相关技术资料(含TypeScript工程架构、核心模块实现逻辑,合计51.2万行代码量级)公开流出,包含其核心架构、工具系统、安全机制等全部实现细节。…...

光伏储能并网仿真实战手记:PQ控制与扰动观察法的那些事儿

光伏储能三相PQ恒功率并网控制仿真(附参考文献及文档)①网侧 光伏储能三相PQ恒功率并网控制仿真(附参考文献及文档)①网侧:采用PQ恒功率控制,参考文献《微电网及其逆变器控制技术的研究》②储能控制:直流母线电压外环,电池电流内环双闭环控制策略直流母线…...

javaweb驾校考试车辆预约系统

目录同行可拿货,招校园代理 ,本人源头供货商功能模块划分预约功能设计考试管理模块系统辅助功能技术实现参考项目技术支持源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作同行可拿货,招校园代理 ,本人源头供货商 功能模块划分 用户管理模块…...

OpenClaw技能市场:Qwen3.5-9B增强的自动化模块扩展

OpenClaw技能市场:Qwen3.5-9B增强的自动化模块扩展 1. 为什么需要技能市场? 去年我接手了一个内容运营项目,每天要处理大量重复性工作:从多个渠道收集资料、整理成Markdown格式、发布到不同平台。手动操作不仅耗时,还…...

3大核心突破:解密m4s-converter如何实现B站缓存视频的智能重生

3大核心突破:解密m4s-converter如何实现B站缓存视频的智能重生 【免费下载链接】m4s-converter 一个跨平台小工具,将bilibili缓存的m4s格式音视频文件合并成mp4 项目地址: https://gitcode.com/gh_mirrors/m4/m4s-converter 你是否曾面对B站缓存目…...

MediaCrawler:社交媒体数据采集的全方位解决方案

MediaCrawler:社交媒体数据采集的全方位解决方案 【免费下载链接】MediaCrawler-new 项目地址: https://gitcode.com/GitHub_Trending/me/MediaCrawler-new 在信息爆炸的数字时代,社交媒体平台成为数据的富矿。无论是市场分析、学术研究还是内容…...

Kubernetes与网络管理最佳实践

Kubernetes与网络管理最佳实践 1. Kubernetes网络模型 Kubernetes网络模型定义了集群中Pod、Service和外部网络之间的通信规则,是集群网络管理的基础。 1.1 网络模型核心原则 Pod间通信:所有Pod可以直接通信,无需NATPod与Service通信&#xf…...

模型蒸馏与量化:为什么大厂急需能把大模型跑在边缘端的SDE?

在2026年的北美科技求职市场中,人工智能的下半场战役已经悄然转移了阵地。当行业内绝大多数求职者还在简历上堆砌“熟练调用大语言模型API”或“基于LangChain构建应用”时,北美头部科技公司(如Apple、Google、Meta)的招聘重心已经…...

深度学习框架YOLOV8模型如何训练水下生物检测数据集 构建基于YOLOv8➕pyqt5的水下生物检测系统 海胆‘, ‘海参‘, ‘扇贝‘, ‘海星‘, ‘水草

享基于YOLOv8➕pyqt5的水下生物检测系统内含7600张水下生物数据集 包括[‘海胆’, ‘海参’, ‘扇贝’, ‘海星’, ‘水草’],5类也可自行替换模型,使用该界面做其他检测 这是一个非常经典的计算机视觉应用项目,结合了深度学习的目标检测&…...

树莓派与STM32串口通信实战:从配置到调试全流程解析

1. 硬件准备与环境搭建 第一次尝试用树莓派和STM32做串口通信时,我对着桌上堆满的零件发愁:到底哪些线该接哪里?后来发现其实核心部件就三样:树莓派(推荐4B型号)、STM32开发板(我用的是F103C8T6…...