高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案
高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案
目录
- 🟢 WebSocket 协议概述
- 🔵 在 FastAPI 中实现 WebSocket
- 🟣 Django Channels 实现异步实时通信
- 🔴 使用 Redis 实现实时推送
🟢 1. WebSocket 协议概述
WebSocket 是 HTML5 规范中提出的一种新协议,旨在实现客户端与服务器之间的全双工通信。与传统的 HTTP 请求/响应模型不同,WebSocket 协议允许持久的、双向的连接,客户端和服务器可以在任意时刻相互发送数据,而无需重新发起 HTTP 请求。WebSocket 可以减少通信延迟和网络资源消耗,特别适用于实时应用程序,如聊天系统、股票交易、游戏、以及其他需要即时数据更新的场景。
🧩 WebSocket 的工作原理
WebSocket 的工作流程可以简单概括如下:
- 初始握手:客户端通过 HTTP 发起 WebSocket 连接请求,服务器通过特殊的响应头
Upgrade
升级协议,建立连接。 - 持久连接:一旦连接建立,客户端和服务器之间的通信通道就保持打开,直到一方主动关闭连接。双方可以同时发送消息,消息是基于帧的传输方式。
- 双向通信:与传统的 HTTP 轮询不同,WebSocket 是全双工的,意味着服务器和客户端可以随时相互发送数据。消息可以是文本帧或者二进制帧,且通过 WebSocket 协议传输的消息有较小的开销,适合频繁的小数据传输场景。
⚡️ WebSocket 和 HTTP 的区别
- 连接方式:HTTP 是短连接,每次请求/响应后连接断开,而 WebSocket 是持久化连接,只有在连接关闭时才断开。
- 通信方向:HTTP 是客户端发起请求,服务器响应;WebSocket 支持双向通信,客户端和服务器都可以主动发送消息。
- 协议开销:WebSocket 建立连接时会使用 HTTP 协议握手,但后续通信数据帧头信息非常小,开销比 HTTP 低。
WebSocket 的这些特性使其成为实时应用开发中的重要工具。
🔵 2. 在 FastAPI 中实现 WebSocket
FastAPI 是一个现代的、快速的 Web 框架,提供了对 WebSocket 支持的简便实现。FastAPI 内部集成了 ASGI(Asynchronous Server Gateway Interface),这意味着它能够轻松处理异步通信任务,比如 WebSocket。以下是如何在 FastAPI 中实现 WebSocket 通信的详细步骤。
📌 FastAPI 中的 WebSocket 代码实现
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()# 管理多个 WebSocket 连接的管理器
class ConnectionManager:def __init__(self):self.active_connections: List[WebSocket] = []# 添加新连接async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)# 移除断开的连接async def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)# 向所有连接广播消息async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):await manager.connect(websocket)try:while True:# 接收客户端消息data = await websocket.receive_text()await manager.broadcast(f"Client #{client_id} says: {data}")except WebSocketDisconnect:manager.disconnect(websocket)await manager.broadcast(f"Client #{client_id} disconnected")
🧩 代码解析
- ConnectionManager 类:用于管理 WebSocket 连接。它可以接受新连接、断开旧连接以及广播消息给所有连接的客户端。
- websocket_endpoint:这个路径函数处理 WebSocket 连接。当客户端连接时,它将接收并广播消息给其他连接的客户端。它使用
client_id
来标识连接的客户端。 - 广播消息:通过
manager.broadcast()
方法,可以将来自某个客户端的消息发送给所有连接的客户端。
🚀 运行示例:
要运行这个示例,可以将代码保存为 main.py
,然后使用命令 uvicorn main:app --reload
启动 FastAPI 服务器。连接的客户端可以通过 /ws/{client_id}
连接,并且所有客户端都能实时接收消息。
此示例展示了如何通过 FastAPI 简洁地实现 WebSocket 服务,利用 Python 的异步特性来实现高效的实时通信。
🟣 3. Django Channels 实现异步实时通信
Django 默认是同步框架,但通过 Django Channels,可以为其添加异步功能,尤其是支持 WebSocket 的实时通信功能。Django Channels 通过将请求分配给适当的消费者来处理异步通信,而不需要重新设计整个 Django 应用。以下是如何使用 Django Channels 来实现 WebSocket 通信。
📌 安装 Django Channels
首先,需要安装 Django Channels:
pip install channels
然后,在 settings.py
中添加 Channels 配置:
INSTALLED_APPS = [# 其他应用'channels',
]# 指定 ASGI 应用
ASGI_APPLICATION = "myproject.asgi.application"
在项目根目录创建一个 asgi.py
文件:
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from myapp import routingos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')application = ProtocolTypeRouter({"http": get_asgi_application(),"websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
})
📌 Django Channels WebSocket 代码实现
创建 consumers.py
来定义 WebSocket 消费者:
import json
from channels.generic.websocket import AsyncWebsocketConsumerclass ChatConsumer(AsyncWebsocketConsumer):async def connect(self):self.room_name = 'chat'await self.channel_layer.group_add(self.room_name, self.channel_name)await self.accept()async def disconnect(self, close_code):await self.channel_layer.group_discard(self.room_name, self.channel_name)async def receive(self, text_data):message = json.loads(text_data)['message']await self.channel_layer.group_send(self.room_name,{'type': 'chat_message','message': message})async def chat_message(self, event):message = event['message']await self.send(text_data=json.dumps({'message': message}))
在 routing.py
中定义 WebSocket 路由:
from django.urls import re_path
from . import consumerswebsocket_urlpatterns = [re_path(r'ws/chat/$', consumers.ChatConsumer.as_asgi()),
]
🧩 代码解析
- ChatConsumer:这是 WebSocket 消费者类,使用
AsyncWebsocketConsumer
来处理异步 WebSocket 连接。connect()
方法处理客户端连接,disconnect()
方法处理断开,receive()
用于接收并转发消息。 - channel_layer:Django Channels 使用频道层来在多个 WebSocket 连接之间共享数据。
group_send()
方法用于向一个组中的所有 WebSocket 客户端广播消息。
🚀 运行示例
通过配置 Django Channels 后,启动 Django 开发服务器并连接到 /ws/chat/
端点,即可开始实时通信。此实现展示了如何利用 Django 的扩展实现异步 WebSocket 通信,方便现有 Django 项目无缝添加实时通信功能。
🔴 4. 使用 Redis 实现实时推送
Redis 是一个强大的内存数据存储系统,支持发布/订阅(Pub/Sub)模式,这使得它非常适合实现实时推送功能。通过将 Redis 集成到 WebSocket 应用中,可以轻松地实现高效的实时数据推送服务。以下展示如何使用 Redis 结合 WebSocket 来实现消息的实时推送。
📌 安装 Redis 和依赖
首先,安装 Redis 和 aioredis
以在 Python 中使用异步 Redis 客户端:
pip install aioredis
📌 实现基于 Redis 的 WebSocket 推送
在 WebSocket 服务中,通过 Redis 的发布/订阅模式实现消息推送。
import aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()class ConnectionManager:def __init__(self):self.active_connections: List[WebSocket] = []async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)async def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()@app.on_event("startup")
async def startup_event():global redisredis = await aioredis.create_redis_pool('redis://localhost')@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):await manager.connect(websocket)try:while True:# 接收消息data = await websocket.receive_text()await redis.publish("channel:1", data)except WebSocketDisconnect:manager.disconnect(websocket)@app.on_event("shutdown")
async def shutdown_event():redis.close()await redis.wait_closed()
📌 Redis 订阅消息并推送给 WebSocket 客户端
实现 Redis 消息订阅和推送:
import aioredis
from fastapi import FastAPIapp = FastAPI()@app.on_event("startup")
async def startup_event():redis = await aioredis.create_redis_pool('redis://localhost')pubsub = redis.pubsub()async def reader():await pubsub.subscribe('channel:1')async for message in pubsub.listen():print(f"Received message: {message['data']}")app.loop.create_task(reader())@app.on_event("shutdown")
async def shutdown_event():redis.close()await redis.wait_closed()
🧩 代码解析
- Redis 发布:通过
redis.publish()
方法,消息发布到指定的频道channel:1
,这些消息随后会被其他 Redis 客户端订阅。 - Redis 订阅:
pubsub.listen()
用于订阅 Redis 频道上的消息,收到消息后可以处理或转发给 WebSocket 客户端。
🚀 运行示例
启动 Redis 服务器并运行上述代码,客户端通过 WebSocket 连接,可以实时接收 Redis 频道中的推送消息。此示例展示了如何利用 Redis 高效的发布/订阅机制来实现实时消息推送,结合 WebSocket 实现真正的全双工通信。
相关文章:
高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案
高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案 目录 🟢 WebSocket 协议概述🔵 在 FastAPI 中实现 WebSocket🟣 Django Channels 实现异步实时通信🔴 使用 Redis 实现实时推送 🟢 1. WebS…...
大二上学期详细学习计划
本学习完成目标: 项目: 书籍:《mysql必知必会》《java核心技术卷》(暂时)加强JavaSE的学习,掌握Java核心Mysqlsql(把牛客上的那50道sql语句题写完)gitmaven完成springboot项目&…...

Kafka【十四】生产者发送消息时的消息分区策略
【1】分区策略 Kafka中Topic是对数据逻辑上的分类,而Partition才是数据真正存储的物理位置。所以在生产数据时,如果只是指定Topic的名称,其实Kafka是不知道将数据发送到哪一个Broker节点的。我们可以在构建数据传递Topic参数的同时ÿ…...
SQL优化:执行计划详细分析
视频讲解:SQL优化:SQL执行计划详细分析_哔哩哔哩_bilibili 1.1 执行计划详解 id select_type table partitions type possible_keys key key_len ref rows filtered Extra 1.1.1 ID 【概…...

Android Studio -> Android Studio 获取release模式和debug模式的APK
Android Studio上鼠标修改构建类型 Release版本 激活路径:More tool windows->Build Variants->Active Build Variant->releaseAPK路径:Project\app\build\intermediates\apk\app-release.apk Debug版本 激活路径:More tool w…...

基于 SpringBoot 的实习管理系统
专业团队,咨询送免费开题报告,大家可以来留言。 摘 要 随着信息化时代的到来,管理系统都趋向于智能化、系统化,实习管理也不例外,但目前国内仍都使用人工管理,市场规模越来越大,同时信息量也越…...
vmware workstation 17 linux版
链接: https://pan.baidu.com/s/1F3kpNEi_2GZW0FHUO-8p-g?pwd6666 提取码: 6666 1 先安装虚拟机 不管什么错误 先安装vmware workstation 17 2 编译 覆盖安装vmware-host-modules-workstation-17.5.1 只需这样就可以 # sudo apt install dkms build-essential bc iw…...

Windows环境本地部署Oracle 19c及卸载实操手册
前言: 一直在做其他测试,貌似都忘了Windows环境oracle 19c的部署,这是一个很早很早的安装记录了,放上来做个备录给到大家参考。 Oracle 19c:进一步增强了自动化功能,并提供了更好的性能和安全性。这个版本在自动化、性能和安全性方面进行了重大改进,以满足现代企业对数…...

MapStruct介绍
一、MapStruct 1.1何为MapStruct 要说这个东西,其实和我们刚刚讲到的Lombok相类似。其是由我们的源代码加上MapStruct经过编译后得到.class文件,文件中自动补全了代码。那么补全了什么代码?实现了什么功能? MapStruct的产生&…...

35天学习小结
距离上次纪念日,已经过去了35天咯 算算也有5周了,在这一个月里,收获的也挺多,在这个过程中认识的大佬也是越来越多了hh 学到的东西,其实也没有很多,这个暑假多多少少还是有遗憾的~ 第一周 学习了一些有…...

【iOS】UIViewController的生命周期
UIViewController的生命周期 文章目录 UIViewController的生命周期前言UIViewController的一个结构UIViewController的函数的执行顺序运行代码viewWillAppear && viewDidAppear多个视图控制器跳转时的生命周期pushpresent 小结 前言 之前对于有关于UIViewControlller的…...

ELK在Linux服务器下使用docker快速部署(超详细)
ELK是什么? 首先说说什么是ELK ELK 是一个开源的日志管理和分析平台,由三个主要组件组成: Elasticsearch:一个分布式搜索和分析引擎,能够快速存储、搜索和分析大量数据。它是 ELK 堆栈的核心,负责数据的…...

unity导入半透明webm + AE合成半透明视频
有些webm的文件导入unity后无法正常播报,踩坑好久才知道需要webm中的:VP8 标准 现在手上有几条mp4双通道的视频,当然unity中有插件是可以支持这种视频的,为了省事和代码洁癖,毅然决然要webm走到黑。 mp4导入AE合成半透…...

力扣: 四数相加II
文章目录 需求代码结尾 需求 给你四个整数数组 nums1、nums2、nums3 和 nums4 ,数组长度都是 n ,请你计算有多少个元组 (i, j, k, l) 能满足: 0 < i, j, k, l < n nums1[i] nums2[j] nums3[k] nums4[l] 0 示例 1: 输入…...
径向基函数神经网络RBFNN案例实操
简介 (来自ChatGPT的介绍,如有更正建议请指出) 径向基函数神经网络(Radial Basis Function Neural Network, RBFNN)是一种特殊的前馈神经网络,其结构和特点与其他常见的神经网络有所不同,主要表现在以下几个方面: 网络结构三层结构:RBF神经网络通常由三层组成:输入层…...

Java-数据结构-二叉树-习题(一) (✪ω✪)
文本目录: ❄️一、习题一(检查两颗树是否相同): ▶ 思路: ▶ 代码: ❄️二、习题二(另一棵树的子树): ▶ 思路: ▶ 代码: ❄️三、习题三(翻转二叉树): ▶ 思路: ▶ 代…...
js 时间戳转日期格式
timestampToDate(obj.project_time), import moment from “moment”; const timestampToDate (timestamp: any) > { const date new Date(timestamp * 1000); const newDate moment(date).format(“YYYY-MM-DD”); return newDate; // 使用Intl.DateTimeFormat进行格式…...
基于人工智能的自动驾驶系统项目教学指南
自动驾驶系统是人工智能的一个核心应用领域,涉及多个学科的交叉:从计算机视觉、深度学习、传感器融合到控制系统,自动驾驶项目可以提供高度的挑战性和实践意义。在这篇文章中,我们将构建一个基于深度学习的自动驾驶系统的简化版本…...

[Linux#49][UDP] 2w字详解 | socketaddr | 常用API | 实操:实现简易Udp传输
目录 套接字地址结构(sockaddr) 1.Socket API 2.sockaddr结构 3. sockaddr、sockaddr_in 和 sockaddr_un 的关系 sockaddr 结构体 sockaddr_in 结构体(IPv4 套接字地址) sockaddr_un 结构体(Unix域套接字地址&a…...

期权组合策略有什么风险?期权组合策略是什么?
今天期权懂带你了解期权组合策略有什么风险?期权组合策略是什么?期权组合策略是通过结合不同期权合约(如看涨期权和看跌期权),以及标的资产(如股票)来实现特定投资目标的策略。 期权组合策略市…...

linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径
目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)
参考官方文档:https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java(供 Kotlin 使用) 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

Yolov8 目标检测蒸馏学习记录
yolov8系列模型蒸馏基本流程,代码下载:这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中,**知识蒸馏(Knowledge Distillation)**被广泛应用,作为提升模型…...