高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案
高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案
目录
- 🟢 WebSocket 协议概述
- 🔵 在 FastAPI 中实现 WebSocket
- 🟣 Django Channels 实现异步实时通信
- 🔴 使用 Redis 实现实时推送
🟢 1. WebSocket 协议概述
WebSocket 是 HTML5 规范中提出的一种新协议,旨在实现客户端与服务器之间的全双工通信。与传统的 HTTP 请求/响应模型不同,WebSocket 协议允许持久的、双向的连接,客户端和服务器可以在任意时刻相互发送数据,而无需重新发起 HTTP 请求。WebSocket 可以减少通信延迟和网络资源消耗,特别适用于实时应用程序,如聊天系统、股票交易、游戏、以及其他需要即时数据更新的场景。
🧩 WebSocket 的工作原理
WebSocket 的工作流程可以简单概括如下:
- 初始握手:客户端通过 HTTP 发起 WebSocket 连接请求,服务器通过特殊的响应头
Upgrade升级协议,建立连接。 - 持久连接:一旦连接建立,客户端和服务器之间的通信通道就保持打开,直到一方主动关闭连接。双方可以同时发送消息,消息是基于帧的传输方式。
- 双向通信:与传统的 HTTP 轮询不同,WebSocket 是全双工的,意味着服务器和客户端可以随时相互发送数据。消息可以是文本帧或者二进制帧,且通过 WebSocket 协议传输的消息有较小的开销,适合频繁的小数据传输场景。
⚡️ WebSocket 和 HTTP 的区别
- 连接方式:HTTP 是短连接,每次请求/响应后连接断开,而 WebSocket 是持久化连接,只有在连接关闭时才断开。
- 通信方向:HTTP 是客户端发起请求,服务器响应;WebSocket 支持双向通信,客户端和服务器都可以主动发送消息。
- 协议开销:WebSocket 建立连接时会使用 HTTP 协议握手,但后续通信数据帧头信息非常小,开销比 HTTP 低。
WebSocket 的这些特性使其成为实时应用开发中的重要工具。
🔵 2. 在 FastAPI 中实现 WebSocket
FastAPI 是一个现代的、快速的 Web 框架,提供了对 WebSocket 支持的简便实现。FastAPI 内部集成了 ASGI(Asynchronous Server Gateway Interface),这意味着它能够轻松处理异步通信任务,比如 WebSocket。以下是如何在 FastAPI 中实现 WebSocket 通信的详细步骤。
📌 FastAPI 中的 WebSocket 代码实现
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()# 管理多个 WebSocket 连接的管理器
class ConnectionManager:def __init__(self):self.active_connections: List[WebSocket] = []# 添加新连接async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)# 移除断开的连接async def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)# 向所有连接广播消息async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):await manager.connect(websocket)try:while True:# 接收客户端消息data = await websocket.receive_text()await manager.broadcast(f"Client #{client_id} says: {data}")except WebSocketDisconnect:manager.disconnect(websocket)await manager.broadcast(f"Client #{client_id} disconnected")
🧩 代码解析
- ConnectionManager 类:用于管理 WebSocket 连接。它可以接受新连接、断开旧连接以及广播消息给所有连接的客户端。
- websocket_endpoint:这个路径函数处理 WebSocket 连接。当客户端连接时,它将接收并广播消息给其他连接的客户端。它使用
client_id来标识连接的客户端。 - 广播消息:通过
manager.broadcast()方法,可以将来自某个客户端的消息发送给所有连接的客户端。
🚀 运行示例:
要运行这个示例,可以将代码保存为 main.py,然后使用命令 uvicorn main:app --reload 启动 FastAPI 服务器。连接的客户端可以通过 /ws/{client_id} 连接,并且所有客户端都能实时接收消息。
此示例展示了如何通过 FastAPI 简洁地实现 WebSocket 服务,利用 Python 的异步特性来实现高效的实时通信。
🟣 3. Django Channels 实现异步实时通信
Django 默认是同步框架,但通过 Django Channels,可以为其添加异步功能,尤其是支持 WebSocket 的实时通信功能。Django Channels 通过将请求分配给适当的消费者来处理异步通信,而不需要重新设计整个 Django 应用。以下是如何使用 Django Channels 来实现 WebSocket 通信。
📌 安装 Django Channels
首先,需要安装 Django Channels:
pip install channels
然后,在 settings.py 中添加 Channels 配置:
INSTALLED_APPS = [# 其他应用'channels',
]# 指定 ASGI 应用
ASGI_APPLICATION = "myproject.asgi.application"
在项目根目录创建一个 asgi.py 文件:
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from myapp import routingos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')application = ProtocolTypeRouter({"http": get_asgi_application(),"websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
})
📌 Django Channels WebSocket 代码实现
创建 consumers.py 来定义 WebSocket 消费者:
import json
from channels.generic.websocket import AsyncWebsocketConsumerclass ChatConsumer(AsyncWebsocketConsumer):async def connect(self):self.room_name = 'chat'await self.channel_layer.group_add(self.room_name, self.channel_name)await self.accept()async def disconnect(self, close_code):await self.channel_layer.group_discard(self.room_name, self.channel_name)async def receive(self, text_data):message = json.loads(text_data)['message']await self.channel_layer.group_send(self.room_name,{'type': 'chat_message','message': message})async def chat_message(self, event):message = event['message']await self.send(text_data=json.dumps({'message': message}))
在 routing.py 中定义 WebSocket 路由:
from django.urls import re_path
from . import consumerswebsocket_urlpatterns = [re_path(r'ws/chat/$', consumers.ChatConsumer.as_asgi()),
]
🧩 代码解析
- ChatConsumer:这是 WebSocket 消费者类,使用
AsyncWebsocketConsumer来处理异步 WebSocket 连接。connect()方法处理客户端连接,disconnect()方法处理断开,receive()用于接收并转发消息。 - channel_layer:Django Channels 使用频道层来在多个 WebSocket 连接之间共享数据。
group_send()方法用于向一个组中的所有 WebSocket 客户端广播消息。
🚀 运行示例
通过配置 Django Channels 后,启动 Django 开发服务器并连接到 /ws/chat/ 端点,即可开始实时通信。此实现展示了如何利用 Django 的扩展实现异步 WebSocket 通信,方便现有 Django 项目无缝添加实时通信功能。
🔴 4. 使用 Redis 实现实时推送
Redis 是一个强大的内存数据存储系统,支持发布/订阅(Pub/Sub)模式,这使得它非常适合实现实时推送功能。通过将 Redis 集成到 WebSocket 应用中,可以轻松地实现高效的实时数据推送服务。以下展示如何使用 Redis 结合 WebSocket 来实现消息的实时推送。
📌 安装 Redis 和依赖
首先,安装 Redis 和 aioredis 以在 Python 中使用异步 Redis 客户端:
pip install aioredis
📌 实现基于 Redis 的 WebSocket 推送
在 WebSocket 服务中,通过 Redis 的发布/订阅模式实现消息推送。
import aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()class ConnectionManager:def __init__(self):self.active_connections: List[WebSocket] = []async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)async def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()@app.on_event("startup")
async def startup_event():global redisredis = await aioredis.create_redis_pool('redis://localhost')@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):await manager.connect(websocket)try:while True:# 接收消息data = await websocket.receive_text()await redis.publish("channel:1", data)except WebSocketDisconnect:manager.disconnect(websocket)@app.on_event("shutdown")
async def shutdown_event():redis.close()await redis.wait_closed()
📌 Redis 订阅消息并推送给 WebSocket 客户端
实现 Redis 消息订阅和推送:
import aioredis
from fastapi import FastAPIapp = FastAPI()@app.on_event("startup")
async def startup_event():redis = await aioredis.create_redis_pool('redis://localhost')pubsub = redis.pubsub()async def reader():await pubsub.subscribe('channel:1')async for message in pubsub.listen():print(f"Received message: {message['data']}")app.loop.create_task(reader())@app.on_event("shutdown")
async def shutdown_event():redis.close()await redis.wait_closed()
🧩 代码解析
- Redis 发布:通过
redis.publish()方法,消息发布到指定的频道channel:1,这些消息随后会被其他 Redis 客户端订阅。 - Redis 订阅:
pubsub.listen()用于订阅 Redis 频道上的消息,收到消息后可以处理或转发给 WebSocket 客户端。
🚀 运行示例
启动 Redis 服务器并运行上述代码,客户端通过 WebSocket 连接,可以实时接收 Redis 频道中的推送消息。此示例展示了如何利用 Redis 高效的发布/订阅机制来实现实时消息推送,结合 WebSocket 实现真正的全双工通信。
相关文章:
高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案
高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案 目录 🟢 WebSocket 协议概述🔵 在 FastAPI 中实现 WebSocket🟣 Django Channels 实现异步实时通信🔴 使用 Redis 实现实时推送 🟢 1. WebS…...
大二上学期详细学习计划
本学习完成目标: 项目: 书籍:《mysql必知必会》《java核心技术卷》(暂时)加强JavaSE的学习,掌握Java核心Mysqlsql(把牛客上的那50道sql语句题写完)gitmaven完成springboot项目&…...
Kafka【十四】生产者发送消息时的消息分区策略
【1】分区策略 Kafka中Topic是对数据逻辑上的分类,而Partition才是数据真正存储的物理位置。所以在生产数据时,如果只是指定Topic的名称,其实Kafka是不知道将数据发送到哪一个Broker节点的。我们可以在构建数据传递Topic参数的同时ÿ…...
SQL优化:执行计划详细分析
视频讲解:SQL优化:SQL执行计划详细分析_哔哩哔哩_bilibili 1.1 执行计划详解 id select_type table partitions type possible_keys key key_len ref rows filtered Extra 1.1.1 ID 【概…...
Android Studio -> Android Studio 获取release模式和debug模式的APK
Android Studio上鼠标修改构建类型 Release版本 激活路径:More tool windows->Build Variants->Active Build Variant->releaseAPK路径:Project\app\build\intermediates\apk\app-release.apk Debug版本 激活路径:More tool w…...
基于 SpringBoot 的实习管理系统
专业团队,咨询送免费开题报告,大家可以来留言。 摘 要 随着信息化时代的到来,管理系统都趋向于智能化、系统化,实习管理也不例外,但目前国内仍都使用人工管理,市场规模越来越大,同时信息量也越…...
vmware workstation 17 linux版
链接: https://pan.baidu.com/s/1F3kpNEi_2GZW0FHUO-8p-g?pwd6666 提取码: 6666 1 先安装虚拟机 不管什么错误 先安装vmware workstation 17 2 编译 覆盖安装vmware-host-modules-workstation-17.5.1 只需这样就可以 # sudo apt install dkms build-essential bc iw…...
Windows环境本地部署Oracle 19c及卸载实操手册
前言: 一直在做其他测试,貌似都忘了Windows环境oracle 19c的部署,这是一个很早很早的安装记录了,放上来做个备录给到大家参考。 Oracle 19c:进一步增强了自动化功能,并提供了更好的性能和安全性。这个版本在自动化、性能和安全性方面进行了重大改进,以满足现代企业对数…...
MapStruct介绍
一、MapStruct 1.1何为MapStruct 要说这个东西,其实和我们刚刚讲到的Lombok相类似。其是由我们的源代码加上MapStruct经过编译后得到.class文件,文件中自动补全了代码。那么补全了什么代码?实现了什么功能? MapStruct的产生&…...
35天学习小结
距离上次纪念日,已经过去了35天咯 算算也有5周了,在这一个月里,收获的也挺多,在这个过程中认识的大佬也是越来越多了hh 学到的东西,其实也没有很多,这个暑假多多少少还是有遗憾的~ 第一周 学习了一些有…...
【iOS】UIViewController的生命周期
UIViewController的生命周期 文章目录 UIViewController的生命周期前言UIViewController的一个结构UIViewController的函数的执行顺序运行代码viewWillAppear && viewDidAppear多个视图控制器跳转时的生命周期pushpresent 小结 前言 之前对于有关于UIViewControlller的…...
ELK在Linux服务器下使用docker快速部署(超详细)
ELK是什么? 首先说说什么是ELK ELK 是一个开源的日志管理和分析平台,由三个主要组件组成: Elasticsearch:一个分布式搜索和分析引擎,能够快速存储、搜索和分析大量数据。它是 ELK 堆栈的核心,负责数据的…...
unity导入半透明webm + AE合成半透明视频
有些webm的文件导入unity后无法正常播报,踩坑好久才知道需要webm中的:VP8 标准 现在手上有几条mp4双通道的视频,当然unity中有插件是可以支持这种视频的,为了省事和代码洁癖,毅然决然要webm走到黑。 mp4导入AE合成半透…...
力扣: 四数相加II
文章目录 需求代码结尾 需求 给你四个整数数组 nums1、nums2、nums3 和 nums4 ,数组长度都是 n ,请你计算有多少个元组 (i, j, k, l) 能满足: 0 < i, j, k, l < n nums1[i] nums2[j] nums3[k] nums4[l] 0 示例 1: 输入…...
径向基函数神经网络RBFNN案例实操
简介 (来自ChatGPT的介绍,如有更正建议请指出) 径向基函数神经网络(Radial Basis Function Neural Network, RBFNN)是一种特殊的前馈神经网络,其结构和特点与其他常见的神经网络有所不同,主要表现在以下几个方面: 网络结构三层结构:RBF神经网络通常由三层组成:输入层…...
Java-数据结构-二叉树-习题(一) (✪ω✪)
文本目录: ❄️一、习题一(检查两颗树是否相同): ▶ 思路: ▶ 代码: ❄️二、习题二(另一棵树的子树): ▶ 思路: ▶ 代码: ❄️三、习题三(翻转二叉树): ▶ 思路: ▶ 代…...
js 时间戳转日期格式
timestampToDate(obj.project_time), import moment from “moment”; const timestampToDate (timestamp: any) > { const date new Date(timestamp * 1000); const newDate moment(date).format(“YYYY-MM-DD”); return newDate; // 使用Intl.DateTimeFormat进行格式…...
基于人工智能的自动驾驶系统项目教学指南
自动驾驶系统是人工智能的一个核心应用领域,涉及多个学科的交叉:从计算机视觉、深度学习、传感器融合到控制系统,自动驾驶项目可以提供高度的挑战性和实践意义。在这篇文章中,我们将构建一个基于深度学习的自动驾驶系统的简化版本…...
[Linux#49][UDP] 2w字详解 | socketaddr | 常用API | 实操:实现简易Udp传输
目录 套接字地址结构(sockaddr) 1.Socket API 2.sockaddr结构 3. sockaddr、sockaddr_in 和 sockaddr_un 的关系 sockaddr 结构体 sockaddr_in 结构体(IPv4 套接字地址) sockaddr_un 结构体(Unix域套接字地址&a…...
期权组合策略有什么风险?期权组合策略是什么?
今天期权懂带你了解期权组合策略有什么风险?期权组合策略是什么?期权组合策略是通过结合不同期权合约(如看涨期权和看跌期权),以及标的资产(如股票)来实现特定投资目标的策略。 期权组合策略市…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止
<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet: https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...
04-初识css
一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...
AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)
本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...
Python 训练营打卡 Day 47
注意力热力图可视化 在day 46代码的基础上,对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...
uniapp 实现腾讯云IM群文件上传下载功能
UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中,群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS,在uniapp中实现: 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...
水泥厂自动化升级利器:Devicenet转Modbus rtu协议转换网关
在水泥厂的生产流程中,工业自动化网关起着至关重要的作用,尤其是JH-DVN-RTU疆鸿智能Devicenet转Modbus rtu协议转换网关,为水泥厂实现高效生产与精准控制提供了有力支持。 水泥厂设备众多,其中不少设备采用Devicenet协议。Devicen…...
