高级 Python Web 开发:基于 FastAPI 构建高效实时聊天系统与并发控制
高级 Python Web 开发:基于 FastAPI 构建高效实时聊天系统与并发控制
目录
- 🌐 WebSocket 实时通讯概述
- 💬 FastAPI 中实现 WebSocket 聊天系统
- 🔧 WebSocket 并发控制与性能优化
- 🔒 WebSocket 安全性与认证机制
- 🚀 WebSocket 在生产环境的优化与部署
1. 🌐 WebSocket 实时通讯概述
WebSocket 是一种网络协议,旨在提供在客户端与服务器之间持久化、全双工的通信通道。与传统的 HTTP 请求-响应模型不同,WebSocket 允许客户端和服务器之间建立一个持续的连接,这使得它非常适合用于构建实时应用,如聊天系统、在线游戏和实时数据流等。
在传统的 HTTP 通信中,每次请求都需要重新建立连接,而 WebSocket 通过单一的握手过程就可以保持连接的持久性。这种技术可以显著减少延迟,使得数据交换更为迅速与高效。特别是在需要实时交互的应用场景中,WebSocket 可以显著提升用户体验。
FastAPI 是一个现代化、快速(高性能)的 Web 框架,支持构建高效的 RESTful API,同时也具备了对 WebSocket 协议的原生支持。通过 FastAPI,我们可以快速构建一个强大且高效的实时通讯系统。接下来将详细讲解如何在 FastAPI 中实现 WebSocket 协议并构建一个简单的聊天系统。
2. 💬 FastAPI 中实现 WebSocket 聊天系统
WebSocket 连接管理
在构建聊天系统时,我们需要处理多个用户之间的消息传递。为了支持多用户之间的消息广播,我们需要管理每个 WebSocket 连接,并确保消息能够正确地从发送者转发到接收者。
在 FastAPI 中,WebSocket 连接的管理非常简单。通过 WebSocket 对象,可以进行连接的建立、接收和发送消息。下面是一个简单的 WebSocket 聊天系统的实现:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Listapp = FastAPI()# 存储所有的 WebSocket 连接
active_connections: List[WebSocket] = []# 处理 WebSocket 连接
@app.websocket("/chat/{username}")
async def chat(websocket: WebSocket, username: str):# 等待连接await websocket.accept()active_connections.append(websocket)try:while True:# 接收客户端消息message = await websocket.receive_text()# 广播消息给所有连接for connection in active_connections:if connection != websocket:await connection.send_text(f"{username}: {message}")except WebSocketDisconnect:# 断开连接时移除 WebSocketactive_connections.remove(websocket)await websocket.close()
代码解析
- 连接管理:通过
active_connections列表保存当前所有活跃的 WebSocket 连接。每当一个新的客户端连接时,系统会将其 WebSocket 对象添加到这个列表中。 - 接受连接:
websocket.accept()允许客户端与服务器建立 WebSocket 连接。 - 消息接收与广播:通过
await websocket.receive_text()来接收客户端发送的文本消息。然后,系统将该消息广播给所有其他连接的客户端(除了发送者)。 - 断开连接处理:当客户端断开连接时,抛出
WebSocketDisconnect异常,系统会从active_connections列表中移除该 WebSocket 对象,并关闭连接。
通过这种方式,聊天系统能够支持多个客户端之间的实时消息传递。
3. 🔧 WebSocket 并发控制与性能优化
在实际应用中,尤其是高并发的场景下,如何有效地管理 WebSocket 连接并确保系统的高性能是至关重要的。虽然 FastAPI 本身就具备了异步处理能力,但仍然需要一些策略来优化性能,处理大量并发连接。
并发管理
- 异步操作:FastAPI 利用 Python 的异步 I/O(
asyncio)模型,可以在不阻塞的情况下高效处理多个并发的 WebSocket 连接。每个 WebSocket 的处理都是独立的异步任务,不会阻塞其他连接。 - 连接数控制:在高并发情况下,可以根据需要限制最大连接数,防止过多连接带来的资源耗尽。
MAX_CONNECTIONS = 1000 # 最大连接数限制@app.websocket("/chat/{username}")
async def chat(websocket: WebSocket, username: str):if len(active_connections) >= MAX_CONNECTIONS:await websocket.close(code=1000) # 关闭新连接returnawait websocket.accept()active_connections.append(websocket)...
- 心跳检测:为了确保连接的健康性,可以定期发送心跳包,检测客户端是否仍然在线。如果客户端断开连接但没有显式关闭 WebSocket,心跳包可以帮助检测并及时清理无效连接。
import asyncioasync def send_heartbeat(websocket: WebSocket):while True:await asyncio.sleep(30) # 每30秒发送一次心跳await websocket.send_text("heartbeat")
性能优化
- 消息压缩:在大量消息交换的场景下,采用压缩算法(如 gzip)可以减少数据传输量,提高性能。
- 负载均衡:在高并发的情况下,可以采用负载均衡技术将 WebSocket 请求分发到多个 FastAPI 实例上,从而分担流量压力。
4. 🔒 WebSocket 安全性与认证机制
在实时通讯系统中,安全性是一个不可忽视的问题,尤其是 WebSocket 连接的认证与授权。以下是常见的安全措施:
WebSocket 身份验证
通过 HTTP 头部或 Cookie 传递认证信息,是一种常见的做法。在 FastAPI 中,可以通过 Depends 依赖注入机制来实现 WebSocket 的身份验证。
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBeareroauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")async def get_current_user(token: str = Depends(oauth2_scheme)):# 校验 token 并获取用户信息user = get_user_from_token(token)if user is None:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")return user
通过这种方式,每当用户尝试连接 WebSocket 时,可以在连接之前通过 OAuth2 或 JWT token 校验其身份。
数据加密与防篡改
在 WebSocket 数据传输过程中,建议使用 WSS(WebSocket Secure)协议来确保数据传输的加密性。此外,为了防止数据篡改,可以在传输数据时使用 HMAC 等签名机制进行数据验证。
import hmac
import hashlibdef verify_message_signature(message: str, signature: str, secret: str) -> bool:computed_signature = hmac.new(secret.encode(), message.encode(), hashlib.sha256).hexdigest()return hmac.compare_digest(computed_signature, signature)
5. 🚀 WebSocket 在生产环境的优化与部署
在生产环境中,WebSocket 服务需要处理更多的请求,并且通常需要与其他微服务进行协作。为了确保 WebSocket 服务的高效运行,下面是一些常见的优化措施:
服务部署
- 水平扩展:通过多个 FastAPI 实例并行运行 WebSocket 服务,可以有效地分担负载。
- WebSocket 与负载均衡器:在大规模部署时,采用支持 WebSocket 的负载均衡器(如 NGINX、HAProxy)可以将客户端请求合理地分发到多个 FastAPI 实例。
持久化与状态管理
对于聊天系统,用户的聊天记录和会话状态可能需要持久化。可以使用 Redis 或数据库来存储聊天历史记录,并且可以使用 Redis 进行会话的共享。
import redisr = redis.Redis(host='localhost', port=6379, db=0)def store_message(message: str, channel: str):r.publish(channel, message) # 将消息存入 Redis
高可用性与容错
为了保证 WebSocket 服务的高可用性,采用分布式系统设计、故障转移机制(如 Kubernetes 自动扩容、健康检查等)是必要的。
通过这些部署和优化策略,可以确保 WebSocket 服务在生产环境中具有良好的性能和稳定性。
相关文章:
高级 Python Web 开发:基于 FastAPI 构建高效实时聊天系统与并发控制
高级 Python Web 开发:基于 FastAPI 构建高效实时聊天系统与并发控制 目录 🌐 WebSocket 实时通讯概述💬 FastAPI 中实现 WebSocket 聊天系统🔧 WebSocket 并发控制与性能优化🔒 WebSocket 安全性与认证机制…...
深入理解Java虚拟机(JVM)
JVM概述 JVM作用 java虚拟机负责装载字节码到其内部,解释/编译为对应平台上的机器码指令执行,通俗说就是将字节码转换为机器码 JVM内部构造 1、类加载部分:负责把硬盘上的字节码加载到内存中(运行时数据区) 2、运…...
笔试面试——逻辑题
1.n从1开始,每个操作可以选择对n加1或者对n加倍,若想获得整数2014,最少需要多少个操作。 2.一个池塘,养龙虾若干,请想一个办法尽量准确的估算其中有多少龙虾? 3. S先生,P先生,Q先生他们知道桌子…...
【深度学习入门实战】基于Keras的手写数字识别实战(附完整可视化分析)
本人主页:机器学习司猫白 ok,话不多说,我们进入正题吧 项目概述 本案例使用经典的MNIST手写数字数据集,通过Keras构建全连接神经网络,实现0-9数字的分类识别。文章将包含: 关键概念图解完整实现代码训练过程可视化模型效果深度分析环境准备 import numpy as np impo…...
软考高级《系统架构设计师》知识点(一)
计算机硬件 校验码 码距:就单个编码A:00而言,其码距为1,因为其只需要改变一位就变成另一个编码。在两个编码中,从A码到B码转换所需要改变的位数称为码距,如A:00要转换为B:11,码距为2。一般来说,…...
用大模型学大模型01-制定学习计划
提示词:我想学习大模型,需要AI制定一个完整的学习计划,并给出学习路径和学习资料。以教科书目录的方式给出学习路线 第1章:数学与编程基础(4-6周) 1.1 数学基础 线性代数(矩阵运算、特征值分…...
lvs的DR模式
基于Linux的负载均衡集群软件 LVS 全称为Linux Virtual Server,是一款开源的四层(传输层)负载均衡软件 Nginx 支持四层和七层(应用层)负载均衡 HAProxy 和Nginx一样,也可同时支持四层和七层(应用层)负载均衡 基于Linux的高可用集群软件 Keepalived Keepalived是Linux…...
mysql读写分离与proxysql的结合
上一篇文章介绍了mysql如何设置成主从复制模式,而主从复制的目的,是为了读写分离。 读写分离,拿spring boot项目来说,可以有2种方式: 1)设置2个数据源,读和写分开使用 2)使用中间件…...
【C++学习篇】C++11第二期学习
目录 1. 可变参数模板 1.1 基本语法及原理 1.2 包扩展 1.3empalce系列接⼝ 2. lamba 2.1 lambda的语法表达式 2.2 捕捉列表 2.3 lamba的原理 1. 可变参数模板 1.1 基本语法及原理 1. C11⽀持可变参数模板,也就是说⽀持可变数量参数的函数模板和类模板&…...
TextWebSocketHandler 和 @ServerEndpoint 各自实现 WebSocket 服务器
TextWebSocketHandler 和 ServerEndpoint 都可以用于实现 WebSocket 服务器,但它们属于不同的技术栈,使用方式和功能有一些区别。以下是它们的对比: 1. 技术栈对比 特性TextWebSocketHandler (Spring)ServerEndpoint (Java EE/JSR-356)所属框…...
【C++高并发服务器WebServer】-18:事件处理模式与线程池
本文目录 一、事件处理模式1.1 Reactor模式1.2 Proactor模式1.3 同步IO模拟Proactor模式 二、线程池 一、事件处理模式 服务器程序通常需要处理三类事件:I/O事件、信号、定时事件。 对应的有两种高效的事件处理模式:Reactor和Proactor,同步…...
23种设计模式的定义和应用场景-02-结构型模式-C#代码
23种设计模式的定义和应用场景: 1. 创建型模式(共5种): 单例模式(Singleton)、工厂方法模式(Factory Method)、抽象工厂模式(Abstract Factory)、建造者模式…...
数据脱敏方案总结
什么是数据脱敏 数据脱敏的定义 数据脱敏百度百科中是这样定义的: 数据脱敏,指对某些敏感信息通过脱敏规则进行数据的变形,实现敏感隐私数据的可靠保护。这样就可以在开发、测试和其它非生产环境以及外包环境中安全地使用脱敏后的真实数据集…...
自然语言处理NLP入门 -- 第二节预处理文本数据
在自然语言处理(NLP)中,数据的质量直接影响模型的表现。文本预处理的目标是清理和标准化文本数据,使其适合机器学习或深度学习模型处理。本章介绍几种常见的文本预处理方法,并通过 Python 代码进行示例。 2.1 文本清理…...
02.10 TCP之文件传输
1.思维导图 2.作业 服务器代码: #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <pthread.h> …...
基于STM32的ADS1230驱动例程
自己在练手项目中用到了ADS1230,根据芯片手册自写的驱动代码,已测可用,希望对将要用到ADS1230芯片的人有所帮助。 芯片:STM32系列任意芯片、ADS1230 环境:使用STM32CubeMX配置引脚、KEIL 部分电路: 代码…...
Bro想要玩github api
Bro想要在vscode 和 rest client插件的帮助下,修改我的github个人信息 ### 先安装REST client插件 ### 文件名test-github.http ### bro需要自己在github develop setting 获得token ### ref link: https://docs.github.com/en/authentication/keeping-your-accoun…...
idea插件开发,如何获取idea设置的系统语言
手打不易,如果转摘,请注明出处! 注明原文:https://zhangxiaofan.blog.csdn.net/article/details/145578160 版本要求 大于 2024.3 错误用法 网上有的说使用:UIUtil com.intellij.util.ui.UIUtil 代码示例…...
怎麼使用靜態住宅IP進行多社媒帳號管理
隨著社交媒體平臺的多樣化,很多人發現一個社媒帳號已經無法滿足需求。以下是幾個常見場景: 企業需求:企業可能需要在不同平臺上運營多個品牌帳號,為每個市場地區單獨設立帳號。個人需求:一些自由職業者或內容創作者可…...
InfiniBand与IP over InfiniBand(IPOIB):实现高性能网络通信的底层机制
在现代高性能计算(HPC)和数据中心环境中,网络通信的效率和性能至关重要。InfiniBand(IB)作为一种高性能的串行计算机总线架构,以其低延迟、高带宽和高可靠性而广泛应用于集群计算和数据中心。IP over InfiniBand(IPOIB)则是在InfiniBand网络上实现IP协议的一种方式,它…...
应用升级/灾备测试时使用guarantee 闪回点迅速回退
1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...
高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...
ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...
【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)
要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况,可以通过以下几种方式模拟或触发: 1. 增加CPU负载 运行大量计算密集型任务,例如: 使用多线程循环执行复杂计算(如数学运算、加密解密等)。运行图…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
Java入门学习详细版(一)
大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...
图表类系列各种样式PPT模版分享
图标图表系列PPT模版,柱状图PPT模版,线状图PPT模版,折线图PPT模版,饼状图PPT模版,雷达图PPT模版,树状图PPT模版 图表类系列各种样式PPT模版分享:图表系列PPT模板https://pan.quark.cn/s/20d40aa…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
浪潮交换机配置track检测实现高速公路收费网络主备切换NQA
浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求,本次涉及的主要是收费汇聚交换机的配置,浪潮网络设备在高速项目很少,通…...
Qemu arm操作系统开发环境
使用qemu虚拟arm硬件比较合适。 步骤如下: 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载,下载地址:https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...
