高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案
高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案
目录
- 🟢 WebSocket 协议概述
- 🔵 在 FastAPI 中实现 WebSocket
- 🟣 Django Channels 实现异步实时通信
- 🔴 使用 Redis 实现实时推送
🟢 1. WebSocket 协议概述
WebSocket 是 HTML5 规范中提出的一种新协议,旨在实现客户端与服务器之间的全双工通信。与传统的 HTTP 请求/响应模型不同,WebSocket 协议允许持久的、双向的连接,客户端和服务器可以在任意时刻相互发送数据,而无需重新发起 HTTP 请求。WebSocket 可以减少通信延迟和网络资源消耗,特别适用于实时应用程序,如聊天系统、股票交易、游戏、以及其他需要即时数据更新的场景。
🧩 WebSocket 的工作原理
WebSocket 的工作流程可以简单概括如下:
- 初始握手:客户端通过 HTTP 发起 WebSocket 连接请求,服务器通过特殊的响应头
Upgrade升级协议,建立连接。 - 持久连接:一旦连接建立,客户端和服务器之间的通信通道就保持打开,直到一方主动关闭连接。双方可以同时发送消息,消息是基于帧的传输方式。
- 双向通信:与传统的 HTTP 轮询不同,WebSocket 是全双工的,意味着服务器和客户端可以随时相互发送数据。消息可以是文本帧或者二进制帧,且通过 WebSocket 协议传输的消息有较小的开销,适合频繁的小数据传输场景。
⚡️ WebSocket 和 HTTP 的区别
- 连接方式:HTTP 是短连接,每次请求/响应后连接断开,而 WebSocket 是持久化连接,只有在连接关闭时才断开。
- 通信方向:HTTP 是客户端发起请求,服务器响应;WebSocket 支持双向通信,客户端和服务器都可以主动发送消息。
- 协议开销:WebSocket 建立连接时会使用 HTTP 协议握手,但后续通信数据帧头信息非常小,开销比 HTTP 低。
WebSocket 的这些特性使其成为实时应用开发中的重要工具。
🔵 2. 在 FastAPI 中实现 WebSocket
FastAPI 是一个现代的、快速的 Web 框架,提供了对 WebSocket 支持的简便实现。FastAPI 内部集成了 ASGI(Asynchronous Server Gateway Interface),这意味着它能够轻松处理异步通信任务,比如 WebSocket。以下是如何在 FastAPI 中实现 WebSocket 通信的详细步骤。
📌 FastAPI 中的 WebSocket 代码实现
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()# 管理多个 WebSocket 连接的管理器
class ConnectionManager:def __init__(self):self.active_connections: List[WebSocket] = []# 添加新连接async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)# 移除断开的连接async def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)# 向所有连接广播消息async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):await manager.connect(websocket)try:while True:# 接收客户端消息data = await websocket.receive_text()await manager.broadcast(f"Client #{client_id} says: {data}")except WebSocketDisconnect:manager.disconnect(websocket)await manager.broadcast(f"Client #{client_id} disconnected")
🧩 代码解析
- ConnectionManager 类:用于管理 WebSocket 连接。它可以接受新连接、断开旧连接以及广播消息给所有连接的客户端。
- websocket_endpoint:这个路径函数处理 WebSocket 连接。当客户端连接时,它将接收并广播消息给其他连接的客户端。它使用
client_id来标识连接的客户端。 - 广播消息:通过
manager.broadcast()方法,可以将来自某个客户端的消息发送给所有连接的客户端。
🚀 运行示例:
要运行这个示例,可以将代码保存为 main.py,然后使用命令 uvicorn main:app --reload 启动 FastAPI 服务器。连接的客户端可以通过 /ws/{client_id} 连接,并且所有客户端都能实时接收消息。
此示例展示了如何通过 FastAPI 简洁地实现 WebSocket 服务,利用 Python 的异步特性来实现高效的实时通信。
🟣 3. Django Channels 实现异步实时通信
Django 默认是同步框架,但通过 Django Channels,可以为其添加异步功能,尤其是支持 WebSocket 的实时通信功能。Django Channels 通过将请求分配给适当的消费者来处理异步通信,而不需要重新设计整个 Django 应用。以下是如何使用 Django Channels 来实现 WebSocket 通信。
📌 安装 Django Channels
首先,需要安装 Django Channels:
pip install channels
然后,在 settings.py 中添加 Channels 配置:
INSTALLED_APPS = [# 其他应用'channels',
]# 指定 ASGI 应用
ASGI_APPLICATION = "myproject.asgi.application"
在项目根目录创建一个 asgi.py 文件:
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from myapp import routingos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')application = ProtocolTypeRouter({"http": get_asgi_application(),"websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
})
📌 Django Channels WebSocket 代码实现
创建 consumers.py 来定义 WebSocket 消费者:
import json
from channels.generic.websocket import AsyncWebsocketConsumerclass ChatConsumer(AsyncWebsocketConsumer):async def connect(self):self.room_name = 'chat'await self.channel_layer.group_add(self.room_name, self.channel_name)await self.accept()async def disconnect(self, close_code):await self.channel_layer.group_discard(self.room_name, self.channel_name)async def receive(self, text_data):message = json.loads(text_data)['message']await self.channel_layer.group_send(self.room_name,{'type': 'chat_message','message': message})async def chat_message(self, event):message = event['message']await self.send(text_data=json.dumps({'message': message}))
在 routing.py 中定义 WebSocket 路由:
from django.urls import re_path
from . import consumerswebsocket_urlpatterns = [re_path(r'ws/chat/$', consumers.ChatConsumer.as_asgi()),
]
🧩 代码解析
- ChatConsumer:这是 WebSocket 消费者类,使用
AsyncWebsocketConsumer来处理异步 WebSocket 连接。connect()方法处理客户端连接,disconnect()方法处理断开,receive()用于接收并转发消息。 - channel_layer:Django Channels 使用频道层来在多个 WebSocket 连接之间共享数据。
group_send()方法用于向一个组中的所有 WebSocket 客户端广播消息。
🚀 运行示例
通过配置 Django Channels 后,启动 Django 开发服务器并连接到 /ws/chat/ 端点,即可开始实时通信。此实现展示了如何利用 Django 的扩展实现异步 WebSocket 通信,方便现有 Django 项目无缝添加实时通信功能。
🔴 4. 使用 Redis 实现实时推送
Redis 是一个强大的内存数据存储系统,支持发布/订阅(Pub/Sub)模式,这使得它非常适合实现实时推送功能。通过将 Redis 集成到 WebSocket 应用中,可以轻松地实现高效的实时数据推送服务。以下展示如何使用 Redis 结合 WebSocket 来实现消息的实时推送。
📌 安装 Redis 和依赖
首先,安装 Redis 和 aioredis 以在 Python 中使用异步 Redis 客户端:
pip install aioredis
📌 实现基于 Redis 的 WebSocket 推送
在 WebSocket 服务中,通过 Redis 的发布/订阅模式实现消息推送。
import aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()class ConnectionManager:def __init__(self):self.active_connections: List[WebSocket] = []async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)async def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()@app.on_event("startup")
async def startup_event():global redisredis = await aioredis.create_redis_pool('redis://localhost')@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):await manager.connect(websocket)try:while True:# 接收消息data = await websocket.receive_text()await redis.publish("channel:1", data)except WebSocketDisconnect:manager.disconnect(websocket)@app.on_event("shutdown")
async def shutdown_event():redis.close()await redis.wait_closed()
📌 Redis 订阅消息并推送给 WebSocket 客户端
实现 Redis 消息订阅和推送:
import aioredis
from fastapi import FastAPIapp = FastAPI()@app.on_event("startup")
async def startup_event():redis = await aioredis.create_redis_pool('redis://localhost')pubsub = redis.pubsub()async def reader():await pubsub.subscribe('channel:1')async for message in pubsub.listen():print(f"Received message: {message['data']}")app.loop.create_task(reader())@app.on_event("shutdown")
async def shutdown_event():redis.close()await redis.wait_closed()
🧩 代码解析
- Redis 发布:通过
redis.publish()方法,消息发布到指定的频道channel:1,这些消息随后会被其他 Redis 客户端订阅。 - Redis 订阅:
pubsub.listen()用于订阅 Redis 频道上的消息,收到消息后可以处理或转发给 WebSocket 客户端。
🚀 运行示例
启动 Redis 服务器并运行上述代码,客户端通过 WebSocket 连接,可以实时接收 Redis 频道中的推送消息。此示例展示了如何利用 Redis 高效的发布/订阅机制来实现实时消息推送,结合 WebSocket 实现真正的全双工通信。
相关文章:
高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案
高级实时通信:基于 Python 的 WebSocket 实现与异步推送解决方案 目录 🟢 WebSocket 协议概述🔵 在 FastAPI 中实现 WebSocket🟣 Django Channels 实现异步实时通信🔴 使用 Redis 实现实时推送 🟢 1. WebS…...
大二上学期详细学习计划
本学习完成目标: 项目: 书籍:《mysql必知必会》《java核心技术卷》(暂时)加强JavaSE的学习,掌握Java核心Mysqlsql(把牛客上的那50道sql语句题写完)gitmaven完成springboot项目&…...
Kafka【十四】生产者发送消息时的消息分区策略
【1】分区策略 Kafka中Topic是对数据逻辑上的分类,而Partition才是数据真正存储的物理位置。所以在生产数据时,如果只是指定Topic的名称,其实Kafka是不知道将数据发送到哪一个Broker节点的。我们可以在构建数据传递Topic参数的同时ÿ…...
SQL优化:执行计划详细分析
视频讲解:SQL优化:SQL执行计划详细分析_哔哩哔哩_bilibili 1.1 执行计划详解 id select_type table partitions type possible_keys key key_len ref rows filtered Extra 1.1.1 ID 【概…...
Android Studio -> Android Studio 获取release模式和debug模式的APK
Android Studio上鼠标修改构建类型 Release版本 激活路径:More tool windows->Build Variants->Active Build Variant->releaseAPK路径:Project\app\build\intermediates\apk\app-release.apk Debug版本 激活路径:More tool w…...
基于 SpringBoot 的实习管理系统
专业团队,咨询送免费开题报告,大家可以来留言。 摘 要 随着信息化时代的到来,管理系统都趋向于智能化、系统化,实习管理也不例外,但目前国内仍都使用人工管理,市场规模越来越大,同时信息量也越…...
vmware workstation 17 linux版
链接: https://pan.baidu.com/s/1F3kpNEi_2GZW0FHUO-8p-g?pwd6666 提取码: 6666 1 先安装虚拟机 不管什么错误 先安装vmware workstation 17 2 编译 覆盖安装vmware-host-modules-workstation-17.5.1 只需这样就可以 # sudo apt install dkms build-essential bc iw…...
Windows环境本地部署Oracle 19c及卸载实操手册
前言: 一直在做其他测试,貌似都忘了Windows环境oracle 19c的部署,这是一个很早很早的安装记录了,放上来做个备录给到大家参考。 Oracle 19c:进一步增强了自动化功能,并提供了更好的性能和安全性。这个版本在自动化、性能和安全性方面进行了重大改进,以满足现代企业对数…...
MapStruct介绍
一、MapStruct 1.1何为MapStruct 要说这个东西,其实和我们刚刚讲到的Lombok相类似。其是由我们的源代码加上MapStruct经过编译后得到.class文件,文件中自动补全了代码。那么补全了什么代码?实现了什么功能? MapStruct的产生&…...
35天学习小结
距离上次纪念日,已经过去了35天咯 算算也有5周了,在这一个月里,收获的也挺多,在这个过程中认识的大佬也是越来越多了hh 学到的东西,其实也没有很多,这个暑假多多少少还是有遗憾的~ 第一周 学习了一些有…...
【iOS】UIViewController的生命周期
UIViewController的生命周期 文章目录 UIViewController的生命周期前言UIViewController的一个结构UIViewController的函数的执行顺序运行代码viewWillAppear && viewDidAppear多个视图控制器跳转时的生命周期pushpresent 小结 前言 之前对于有关于UIViewControlller的…...
ELK在Linux服务器下使用docker快速部署(超详细)
ELK是什么? 首先说说什么是ELK ELK 是一个开源的日志管理和分析平台,由三个主要组件组成: Elasticsearch:一个分布式搜索和分析引擎,能够快速存储、搜索和分析大量数据。它是 ELK 堆栈的核心,负责数据的…...
unity导入半透明webm + AE合成半透明视频
有些webm的文件导入unity后无法正常播报,踩坑好久才知道需要webm中的:VP8 标准 现在手上有几条mp4双通道的视频,当然unity中有插件是可以支持这种视频的,为了省事和代码洁癖,毅然决然要webm走到黑。 mp4导入AE合成半透…...
力扣: 四数相加II
文章目录 需求代码结尾 需求 给你四个整数数组 nums1、nums2、nums3 和 nums4 ,数组长度都是 n ,请你计算有多少个元组 (i, j, k, l) 能满足: 0 < i, j, k, l < n nums1[i] nums2[j] nums3[k] nums4[l] 0 示例 1: 输入…...
径向基函数神经网络RBFNN案例实操
简介 (来自ChatGPT的介绍,如有更正建议请指出) 径向基函数神经网络(Radial Basis Function Neural Network, RBFNN)是一种特殊的前馈神经网络,其结构和特点与其他常见的神经网络有所不同,主要表现在以下几个方面: 网络结构三层结构:RBF神经网络通常由三层组成:输入层…...
Java-数据结构-二叉树-习题(一) (✪ω✪)
文本目录: ❄️一、习题一(检查两颗树是否相同): ▶ 思路: ▶ 代码: ❄️二、习题二(另一棵树的子树): ▶ 思路: ▶ 代码: ❄️三、习题三(翻转二叉树): ▶ 思路: ▶ 代…...
js 时间戳转日期格式
timestampToDate(obj.project_time), import moment from “moment”; const timestampToDate (timestamp: any) > { const date new Date(timestamp * 1000); const newDate moment(date).format(“YYYY-MM-DD”); return newDate; // 使用Intl.DateTimeFormat进行格式…...
基于人工智能的自动驾驶系统项目教学指南
自动驾驶系统是人工智能的一个核心应用领域,涉及多个学科的交叉:从计算机视觉、深度学习、传感器融合到控制系统,自动驾驶项目可以提供高度的挑战性和实践意义。在这篇文章中,我们将构建一个基于深度学习的自动驾驶系统的简化版本…...
[Linux#49][UDP] 2w字详解 | socketaddr | 常用API | 实操:实现简易Udp传输
目录 套接字地址结构(sockaddr) 1.Socket API 2.sockaddr结构 3. sockaddr、sockaddr_in 和 sockaddr_un 的关系 sockaddr 结构体 sockaddr_in 结构体(IPv4 套接字地址) sockaddr_un 结构体(Unix域套接字地址&a…...
期权组合策略有什么风险?期权组合策略是什么?
今天期权懂带你了解期权组合策略有什么风险?期权组合策略是什么?期权组合策略是通过结合不同期权合约(如看涨期权和看跌期权),以及标的资产(如股票)来实现特定投资目标的策略。 期权组合策略市…...
python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...
AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别
【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能
1. 开发环境准备 安装DevEco Studio 3.1: 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK 项目配置: // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...
掌握 HTTP 请求:理解 cURL GET 语法
cURL 是一个强大的命令行工具,用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中,cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...
