当前位置: 首页 > news >正文

python的websocket方法教程

WebSocket是一种网络通信协议,它在单个TCP连接上提供全双工的通信信道。在本篇文章中,我们将探讨如何在Python中使用WebSocket实现实时通信。

websockets是Python中最常用的网络库之一,也是websocket协议的Python实现。它不仅作为基础组件在众多项目中发挥着重要作用,其源码也值得广大“Python玩家”研究。
官网:https://github.com/python-websockets/websockets

1. 什么是WebSocket?

WebSocket协议是在2008年由Web应用程序设计师和开发人员创建的,目的是为了在Web浏览器和服务器之间提供更高效、更低延迟的双向通信。它允许客户端和服务器在任何时候发送消息,无需重新建立TCP连接。WebSocket可以在Web浏览器和服务器之间传输文本和二进制数据,使得构建实时Web应用程序变得更加简单。

2. 在Python中使用WebSocket

Python中有多个库可以帮助我们使用WebSocket,如:websockets、aiohttp等。在本文中,我们将使用websockets库来演示WebSocket编程。

要安装websockets库,你可以使用pip:

pip install websockets

3. 创建WebSocket服务器

使用websockets库,我们可以轻松地创建一个WebSocket服务器。以下是一个简单的示例:

import asyncio
import websocketsasync def echo(websocket, path):async for message in websocket:print(f"Received message: {message}")await websocket.send(f"Echo: {message}")start_server = websockets.serve(echo, "localhost", 8765)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在这个示例中,我们定义了一个名为echo的协程函数,它接收两个参数:websocket和path。该函数使用async for循环读取客户端发送的消息,并将消息发送回客户端。

然后,我们使用websockets.serve()函数创建一个WebSocket服务器,监听本地主机的8765端口。最后,我们使用asyncio的事件循环启动服务器。

4. 创建WebSocket客户端

要创建一个WebSocket客户端,我们同样可以使用websockets库。以下是一个简单的客户端示例:

import asyncio
import websocketsasync def main():async with websockets.connect("ws://localhost:8765") as websocket:message = "Hello, server!"await websocket.send(message)print(f"Sent: {message}")response = await websocket.recv()print(f"Received: {response}")asyncio.run(main())

在这个示例中,我们使用websockets.connect()函数建立与WebSocket服务器的连接。然后,我们使用send()方法向服务器发送消息,并使用recv()方法接收服务器的响应。

5. 总结

WebSocket协议为Web浏览器和服务器之间提供了实时双向通信的能力,使得构建实时Web应用程序变得更加容易。在Python中,我们可以使用websockets库轻松地实现WebSocket编程。

6. 通过websockets这个项目,从大型开源项目中学习asyncio库。

一、asyncio.Transport
在官方文档中,Transport被描述成对socket的抽象,它控制着如何传输数据。除了websockets,uvicorn、daphne等ASGI实现都会用到Transport。

Transport继承于ReadTransport和WriteTransport,两者都继承于BaseTransport。顾名思义,Transport兼备读和写的功能,可以类比为读写socket对象。
在这里插入图片描述

Transport对象提供以下常用函数——

is_reading:判断该Transport是否在读。

set_write_buffer_limits:设置写入Transport的高和低水位。考虑到网络状况,有时不希望写入过多的数据。

write、write_eof、write_line:为当前Transport写入数据,分别表示写入二进制数据、eof和二进制行数据。其中eof写入后不会关闭Transport,但会flush数据。

abort:立刻关闭Transport,不接受新的数据。留在缓冲的数据也会丢失,后续调用Protocol的connection_lost函数。

在websockets中,Transport使用场景不多,一般都是通过Protocol对象的回调参数使用的。在websocket的初始化过程中,会设置Transport的最高水位。同样,在这种场景下,该对象也是作为回调参数使用的。
在这里插入图片描述

二、asyncio.Protocol
如果Transport是对socket的抽象,那么Protocol就是对协议的抽象。它提供了如何使用Transport的方式。
在这里插入图片描述

用户使用的Protocol直接继承自BaseProtocol,并提供了六个Unimplemented函数需要用户去实现——

connection_made:当连接建立时会执行该函数,该函数包含一个Transport类型的参数。

connection_lost:当连接丢失或者关闭时会执行该函数,该函数包含一个Exception类型的参数。

pause_writing:当Transport对象写入的数据高于之前设置的高水位时被调用,一般会暂停数据的写入。

resume_writing:当Transport对象写入的数据低于之前设置的低水位时被调用,一般用于恢复数据写入。

data_received:当有数据被接受时回调,该函数包含一个二进制对象data,用来表示接受的数据。

eof_received:当被Transport对象被调用write_eof时被调用。

在websockets中,server端的connection_made实现截图如图所示。在该函数中,websockets将用户实现的handler封装成task对象,并和websocket的server绑定。
在这里插入图片描述

而在client端中实现如第一节截图所示,只是在reader中注册该Transport对象。

websockets的connection_lost函数实现方式如下。主要操作即更新状态、关闭pings、更新对应的waiter状态,以及维护reader对象。
在这里插入图片描述

在其他函数的实现中,websockets也主要用到了reader对象完成数据流的暂停和恢复,以及数据的写入。

从上面代码实现可以看出,websockets通过reader代理完成数据流的操作。这个reader是一个asyncio.StreamReader对象。这个对象具体如何使用将在下一篇介绍。

附录:进阶版本:

python使用websockets库
serve:在server端使用,等待客户端的连接。如果连接成功,返回一个websocket。

connect: 在client端使用,用于建立连接。

send:发送数据

recv:接收数据

close:关闭连接

服务端

#!/usr/bin/python3
# 主要功能:创建1个基本的websocket server, 符合asyncio 开发要求
import asyncio
import websockets
from datetime import datetimeasync def handler(websocket):data = await websocket.recv()reply = f"Data received as \"{data}\".  time: {datetime.now()}"print(reply)await websocket.send(reply)print("Send reply")async def main():async with websockets.serve(handler, "localhost", 9999):await asyncio.Future()  # run foreverif __name__ == "__main__":asyncio.run(main())

客户端

import asyncio
import websockets
import timeasync def ws_client(url):for i in range(1, 40):async with websockets.connect(url) as websocket:await websocket.send("Hello, I am PyPy.")response = await websocket.recv()print(response)time.sleep(1)asyncio.run(ws_client('ws://localhost:9999'))

服务端

import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"# 握手,通过接收Hi,发送"success"来进行双方的握手。
async def serverHands(websocket):while True:recv_text = await websocket.recv()print("recv_text=" + recv_text)if recv_text == "Hi":print("connected success")await websocket.send("success")return Trueelse:await websocket.send("connected fail")# 接收从客户端发来的消息并处理,再返给客户端success
async def serverRecv(websocket):while True:recv_text = await websocket.recv()print("recv:", recv_text)await websocket.send("success,get mess:"+ recv_text)# 握手并且接收数据
async def serverRun(websocket, path):print(path)await serverHands(websocket)await serverRecv(websocket)# main function
if __name__ == '__main__':print("======server======")server = websockets.serve(serverRun, IP_ADDR, IP_PORT)asyncio.get_event_loop().run_until_complete(server)asyncio.get_event_loop().run_forever()

客户端

import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"async def clientHands(websocket):while True:# 通过发送hello握手await websocket.send("Hi")response_str = await websocket.recv()# 接收"success"来进行双方的握手if "success" in response_str:print("握手成功")return True# 向服务器端发送消息
async def clientSend(websocket):while True:input_text = input("input text: ")if input_text == "exit":print(f'"exit", bye!')await websocket.close(reason="exit")return Falseawait websocket.send(input_text)recv_text = await websocket.recv()print(f"{recv_text}")# 进行websocket连接
async def clientRun():ipaddress = IP_ADDR + ":" + IP_PORTasync with websockets.connect("ws://" + ipaddress) as websocket:await clientHands(websocket)await clientSend(websocket)# main function
if __name__ == '__main__':print("======client======")asyncio.get_event_loop().run_until_complete(clientRun())

服务端

# -*- coding:utf8 -*-import json
import socket
import asyncio
import logging
import websockets
import multiprocessingIP = '127.0.0.1'
PORT_CHAT = 9090USERS ={}#提供聊天的后台
async def ServerWs(websocket,path):logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',filename="chat.log",level=logging.INFO)# 握手await websocket.send(json.dumps({"type": "handshake"}))async for message in websocket:data = json.loads(message)message = ''# 用户发信息if data["type"] == 'send':name = '404'for k, v in USERS.items():if v == websocket:name = kdata["from"] = nameif len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "user", "content": data["content"], "from": name})# 用户注册elif data["type"] == 'register':try:USERS[data["uuid"]] = websocketif len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "login", "content": data["content"], "user_list": list(USERS.keys())})except Exception as exp:print(exp)# 用户注销elif data["type"] == 'unregister':del USERS[data["uuid"]]if len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "logout", "content": data["content"], "user_list": list(USERS.keys())})#打印日志logging.info(data)# 群发await asyncio.wait([user.send(message) for user in USERS.values()])def server_run():print("server")start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()if __name__ == "__main__":from multiprocessing import Processmultiprocessing.freeze_support()server = Process(target=server_run, daemon=False)server.start()

服务端

import asyncio
import websockets
import time
import json
import threading
# 功能模块
class OutputHandler():async def run(self,message,send_ms,websocket):# 用户发信息await send_ms(message, websocket)# 单发消息# await send_ms(message, websocket)# 群发消息#await s('hi起来')# 存储所有的客户端
Clients = {}# 服务端
class WS_Server():def __init__(self):self.ip = "127.0.0.1"self.port = 9090# 回调函数(发消息给客户端)async def callback_send(self, msg, websocket=None):await self.sendMsg(msg, websocket)# 发送消息async def sendMsg(self, msg, websocket):print('sendMsg:', msg)# websocket不为空,单发,为空,群发消息if websocket != None:await websocket.send(msg)else:# 群发消息await self.broadcastMsg(msg)# 避免被卡线程await asyncio.sleep(0.2)# 群发消息async def broadcastMsg(self, msg):for user in Clients:await user.send(msg)# 针对不同的信息进行请求,可以考虑json文本async def runCaseX(self,jsonMsg,websocket):print('runCase')op = OutputHandler()# 参数:消息、方法、socketawait op.run(jsonMsg,self.callback_send,websocket)# 连接一个客户端,起一个循环监听async def echo(self,websocket, path):# 添加到客户端列表# Clients.append(websocket)# 握手await websocket.send(json.dumps({"type": "handshake"}))# 循环监听while True:# 接受信息try:# 接受文本recv_text = await websocket.recv()message = "Get message: {}".format(recv_text)# 返回客户端信息await websocket.send(message)# 转jsondata = json.loads(recv_text)# 用户发信息if data["type"] == 'send':name = '404'for k, v in Clients.items():if v == websocket:name = kdata["from"] = nameif len(Clients) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "send", "content": data["content"], "from": name})await self.runCaseX(jsonMsg=message, websocket=websocket)# 用户注册elif data["type"] == 'register':try:Clients[data["uuid"]] = websocketif len(Clients) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())})await self.runCaseX(jsonMsg=message, websocket=websocket)except Exception as exp:print(exp)# 用户注销elif data["type"] == 'unregister':del Clients[data["uuid"]]# 对message进行解析,跳进不同功能区# await self.runCaseX(jsonMsg=data,websocket=websocket)# 链接断开except websockets.ConnectionClosed:print("ConnectionClosed...", path)# del Clientsbreak# 无效状态except websockets.InvalidState:print("InvalidState...")# del Clientsbreak# 报错except Exception as e:print("ws连接报错",e)# del Clientsbreak# 启动服务器async def runServer(self):async with websockets.serve(self.echo, self.ip, self.port):await asyncio.Future()  # run forever# 多协程模式,防止阻塞主线程无法做其他事情def WebSocketServer(self):asyncio.run(self.runServer())# 多线程启动def startServer(self):# 多线程启动,否则会堵塞thread = threading.Thread(target=self.WebSocketServer)thread.start()# thread.join()if __name__=='__main__':print("server")s = WS_Server()s.startServer()

相关文章:

python的websocket方法教程

WebSocket是一种网络通信协议,它在单个TCP连接上提供全双工的通信信道。在本篇文章中,我们将探讨如何在Python中使用WebSocket实现实时通信。 websockets是Python中最常用的网络库之一,也是websocket协议的Python实现。它不仅作为基础组件在…...

Qt处理焦点事件(获得焦点,失去焦点)

背景: 我只是想处理焦点动作,由于懒,上网一搜,排名靠前的一位朋友,使用重写部件的方式实现。还是因为懒,所以感觉复杂了。于是又花了一分钟解决了一下。 所以记录下来,以免以后忘了。 思路&a…...

SiteGround如何设置WordPress网站自动更新

SiteGround Autoupdate功能会自动帮我们更新在他们这里托管的所有WordPress网站,这样做是为了保证网站安全,并且让它们一直保持最新状态。他们会根据我们选择的设置自动更新不同版本的WordPress,包括主要版本和次要版本。在每次自动更新之前&…...

http代理和SOCK5代理谁更安全?

在这个网络化的时代,我们常常听到HTTP代理和SOCKS5代理这两个名词,不过很多人并不了解是什么意思。今天,我们将揭开这两种代理的神秘面纱,看看到底HTTP代理和SOCKS5代理哪个更安全? HTTP代理:高效通信的“枢…...

Kotlin关键字二——constructor和init

在关键字一——var和val中最后提到了构造函数,这里就学习下构造函数相关的关键字: constructor和init。 主要构造(primary constructor) kotlin和java一样,在定义类时就自动生成了无参构造 // 会生成默认的无参构造函数 class Person{ }与java不同的是…...

java的long类型超过9位报错:the literal 987654321000 of type int is out of range

java的long类型超过9位报错 1、报错提示2、报错截图3、解决办法4、参考文章 1、报错提示 the literal 987654321000 of type int is out of range 2、报错截图 3、解决办法 long类型是一种用于表示较大整数的数据类型,范围比int类型更广泛。然而,即使…...

【Java期末复习资料】(4)模拟卷

有不会的题可以后台问我的哦,看见了就会回。 本文章主要是模拟卷,知识点例题简答题已经发过了,可以在主页专栏Java中找一下 一、单项选择题 1. 编译 Java Application 源程序文件将产生相应的字节码文件,这些字节码文件的扩展名为…...

【计算机网络】UDP报文详解

目录 一. UDP协议概述 二. UDP报文格式 首部 三. UDP的缓冲区 一. UDP协议概述 UDP——用户数据报协议,是传输层的一个重要协议 基于UDP的应用层协议有:DNS,TFTP,SNMP,NTP 协议全称默认端口号DNSDomain Name Se…...

排序算法——归并排序

归并排序(Merge Sort)是计算机科学中非常重要的排序算法之一。它不仅高效、稳定,而且是许多高级排序技术和算法思想的基础。在本文中,我们将深入探讨归并排序的原理、实现方法,以及它的优缺点。 1. 归并排序的原理 归…...

2023 年安徽省职业院校技能大赛高职组“软件测试”赛项样题

2023 年安徽省职业院校技能大赛 高职组“软件测试”赛项样题 目录 任务一:功能测试(45 分) 1、测试计划(5 分) 2、测试用例(15 分) 3、Bug 清单(20 分) 4、测试报告&…...

Mysql8和Oracle实际项目中递归查询树形结构

背景: 项目升级,引入MySQL数据库,之前一直用的是Oracle数据,在做用户登录单位维护的时候,需要返回该用户所属单位下的所有子单位。下边是模拟项目数据实践的过程。 数据准备: 准备一张单位表&#xff0c…...

docker mysql8 设置不区分大小写

docker安装Mysql8.0的坑之lower_case_table_names_docker mysql lower_case_table_names-CSDN博客https://blog.csdn.net/p793049488/article/details/108365929 docker run ‐di ‐‐nametensquare_mysql ‐p 33306:3306 ‐e MYSQL_ROOT_PASSWORD123456 mysql...

Audio Siganl (MATLAB) 代码学习—常见问题3

问题描述 生成信号y1: 8000个样本,1000个周期,幅度为0.85的余弦信号。若信号的持续时间为1s,则采样频率和信号频率为多少。生成信号y2: 持续时间为1s,幅度为0.7,频率为500Hz,相位为 π / 4 \pi/4 π/4生成信号y:y_1+y_2绘制前200ms的y信号示意图计算y的DFT绘制频域示意图…...

【PTA题目】7-8 矩阵运算 分数 10

7-8 矩阵运算 分数 10 全屏浏览题目 切换布局 作者 C课程组 单位 浙江大学 给定一个nn的方阵,本题要求计算该矩阵除副对角线、最后一列和最后一行以外的所有元素之和。副对角线为从矩阵的右上角至左下角的连线。 输入格式: 输入第一行给出正整数n(…...

Ubuntu20.04创建并挂在zfs池

Ubuntu 下使用 ZFS [适用于中高级用户] 主磁盘上清洁安装带有ZFS的Ubuntu后,可以开始体验其特性。 所有ZFS配置过程都需要命令行。 我不知道有GUI工具。 创建一个 ZFS 池 本节仅适用于具有多个磁盘的系统。 如果只有一个磁盘,Ubuntu会在安装时自动创建…...

x的平方根算法(leetcode第69题)

题目描述: 给你一个非负整数 x ,计算并返回 x 的 算术平方根 。由于返回类型是整数,结果只保留 整数部分 ,小数部分将被 舍去 。注意:不允许使用任何内置指数函数和算符,例如 pow(x, 0.5) 或者 x ** 0.5 。…...

打破空间限制,畅享真实生活

直播已经成为了当今社会中非常流行的一种娱乐方式,也是人们获取信息和互动的重要渠道之一。而无绿幕直播,则是近年来兴起的一种特殊形式,它打破了以往直播的空间限制,让观众们能够更贴近主播,更真实地感受到直播背后的…...

Python基础期末复习 新手 2

虽然age 10在__init__方法中定义了一个局部变量age,但这个局部变量并不会影响类属性age的值。类属性是在类级别上定义的,不属于任何一个实例。因此,在创建实例s1和s2时,它们的age属性值都为类属性的初始值0。 尽管对类的属性值进…...

Java接入ChatGPT接口简单示例

我们定义了一个名为ChartGPTConfig的类,它有两个私有成员变量apiKey和apiUrl,分别表示ChartGPT的API密钥和API URL。 public class ChartGPTConfig {private final String apiKey;private final String apiUrl;public ChartGPTConfig(String apiKey, St…...

解决夜神模拟器与Android studio自动断开的问题

原因:夜神模拟器的adb版本和Android sdk的adb版本不一致 解决办法: 1.找到android的sdk (1)File--->Project Structure (2)SDK Location:记下sdk的位置 2.找到sdk中的adb文件 SDK-->platform-tools-->adb.exe 3.复制…...

利用C语言模拟实现堆的基本操作和调堆算法

利用C语言模拟实现堆的基本操作和调堆算法 文章目录 利用C语言模拟实现堆的基本操作和调堆算法前言一、堆的基本原理大根堆和小根堆的比较 二、实现堆的基本操作1)结构定义2)初始化堆(HeapInit)3)销毁堆(He…...

react hooks之useRef和useImperativeHandle

为什么这两个一起写,是因为这两个关联性很大,逐一介绍。 一:useRef 1、作用:用于在函数组件中创建一个持久化的引用变量。这个引用变量可以在组件的多次渲染之间保持不变,并且可以访问和修改 DOM 元素或其他组件实例…...

scala方法与函数

定义方法定义函数方法和函数的区别scala的方法函数操作 1.9 方法与函数 1.9.1 定义方法 定义方法的基本格式是: def 方法名称(参数列表):返回值类型 方法体 def add(x: Int, y: Int): Int x y println(add(1, 2)) // 3 //也…...

前端框架(Front-end Framework)和库(Library)的区别

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…...

mysql原理--B+树索引的使用

1.索引的代价 在介绍如何更好的使用索引之前先要了解一下使用这玩意儿的代价,它在空间和时间上都会拖后腿: (1). 空间上的代价 这个是显而易见的,每建立一个索引都要为它建立一棵 B 树,每一棵 B 树的每一个节点都是一个数据页&…...

Android : Room 数据库的基本用法 —简单应用_三_版本

在实体类中添加了新字段: Entity(tableName "people") public class People {//新添加的字段private String email;public String getEmail() {return email;}public void setEmail(String email) {this.email email;}} 再次编译启动时会报错&#xf…...

微服务网关组件Gateway实战

1. 需求背景 在微服务架构中,通常一个系统会被拆分为多个微服务,面对这么多微服务客户端应该如何去调用呢?如果根据每个微服务的地址发起调用,存在如下问题: 客户端多次请求不同的微服务,会增加客户端代码…...

目标检测YOLO系列从入门到精通技术详解100篇-【目标检测】三维重建(补充篇)

目录 前言 算法原理 三维重建意义 三维重建定义 常见的三维重建表达方式...

关于uniapp X 的最新消息

uni-app x 是什么? uni-app x,是下一代 uni-app,是一个跨平台应用开发引擎。 uni-app x 没有使用js和webview,它基于 uts 语言。在App端,uts在iOS编译为swift、在Android编译为kotlin,完全达到了原生应用的…...

spark从表中采样(随机选取)一定数量的行

在Spark SQL中,你可以使用TABLESAMPLE来按行数对表进行采样。以下是使用TABLESAMPLE的示例: SELECT * FROM table_name TABLESAMPLE (1000 ROWS);在这个示例中,table_name是你要查询的表名。TABLESAMPLE子句后面的(1000 ROWS)表示采样的行数…...