当前位置: 首页 > news >正文

fastapi实现websocket在线聊天

最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题

1.问题难点

在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分布在不同的进程中,当进行通信时,可能无法找到已连接的连接对象。

2.解决方案

使用使用redis的订阅发布机制,使所有的进程都能进行消息订阅。这样能保证每个进程收到消息后都会进行相关的信息处理了。

3.方案设计

  • 每个进程启动的时候都进行一个消息的订阅。
  • 通过http请求,进行消息发布。
  • 每个进程收到发布的消息后,进行判断是否由自己进行处理。

4.代码实现

①在服务启动时,进行消息订阅,并一直监听消息通道。当有消息发布时,进行消息处理。

# 初始化app
app = FastAPI(title="Ws Chat", description="测试", version="1.0.0")
app.openapi_version = "3.0.0"app.include_router(chat.app, prefix='/api/chat', tags=['Chat'])@app.on_event('startup')
async def on_startup():print(f"订阅初始化:{os.getpid()}")# 执行消息订阅机制https://aioredis.readthedocs.io/en/latest/examples/loop = asyncio.get_event_loop()loop.create_task(register_pubsub())async def reader(channel):# 进行消息的消费async for msg in channel.listen():  # 监听通道# print(msg)msg_data = msg.get("data")if msg_data and isinstance(msg_data, str):msg_data_dict = json.loads(msg_data)print(f"chat:{msg_data_dict}")sender = msg_data_dict.get("sender")# 进行消息处理await chat.cm.handle_websocket_message(msg_data_dict, sender)async def register_pubsub():pool = aioredis.from_url("redis://{}".format(host), db=db, password=password, port=port, encoding="utf-8", decode_responses=True)psub = pool.pubsub()async with psub as p:# 消息订阅await p.subscribe("chat")await reader(p)await p.unsubscribe("chat")

②websocket处理类

from fastapi import WebSocket, WebSocketDisconnectclass ConnectionManager:def __init__(self):# 保存当前所有的链接的websocket对象self.websocket_connections = {}async def connect(self, websocket: WebSocket, client_id):# 添加连接并发送欢迎消息await websocket.accept()self.websocket_connections[client_id] = websocketawait websocket.send_json({"type": "system","msg": "Welcome to the chat app!","sender": "system","recipient": client_id})try:# 处理消息while True:# 获取信息message = await websocket.receive_json()# 处理发送信息await self.handle_websocket_message(message, client_id)except WebSocketDisconnect:# 连接断开时移除连接del self.websocket_connections[client_id]async def handle_websocket_message(self, message: dict, client_id):# 处理私聊消息if message.get("type") == "private_message":recipient = message.get("recipient")msg = message.get("msg")recipient_conn = self.websocket_connections.get(recipient)if recipient_conn:# 在线await recipient_conn.send_json({"type": "private_message","sender": client_id,"msg": msg,"recipient": recipient})async def broadcast(self, message: dict):# 循环变量给所有在线激活的链接发送消息-全局广播for connection in self.websocket_connections:await connection.send_text(message)async def close(self, websocket: WebSocket, client_id):# 断开客户端的链接await websocket.close()del self.websocket_connections[client_id]async def disconnect(self, user_id):websocket: WebSocket = self.websocket_connections[user_id]await websocket.close()del self.websocket_connections[user_id]

③websocket连接

from app.chat_manager.server import ConnectionManagercm = ConnectionManager()@app.websocket("/connect_chat")
async def connect_chat(websocket: WebSocket, user_code: str):try:await cm.connect(websocket, user_code)except WebSocketDisconnect:# 连接断开时移除连接del cm.websocket_connections[user_code]

④http请求进行消息发布

@app.post("/create_chat", summary="发起聊天")
async def create_chat(param: DiagnosisChatSch, r=Depends(get_redis)):""""""ws_param = {"type": "private_message","msg": param.msg,"sender": param.sender,"recipient": param.recipient}# 进行消息发布await r.publish('diagnosis_chat', json.dumps(ws_param))return {'code': 200, 'msg': '成功', 'data': ''}

5.源码

github源码地址:https://github.com/zhangyukuo/fastapi_ws_chat

6.参考文章

https://www.cnblogs.com/a00ium/p/16931133.html

相关文章:

fastapi实现websocket在线聊天

最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题 1.问题难点 在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分…...

LLM推理部署(六):TogetherAI推出世界上LLM最快推理引擎,性能超过vLLM和TGI三倍

LLM能有多快?答案在于LLM推理的最新突破。 TogetherAI声称,他们在CUDA上构建了世界上最快的LLM推理引擎,该引擎运行在NVIDIA Tensor Core GPU上。Together推理引擎可以支持100多个开源大模型,比如Llama-2,并在Llama-2–…...

Unity | 渡鸦避难所-2 | 搭建场景并添加碰撞器

1 规范项目结构 上期中在导入一系列的商店资源包后,Assets 目录已经变的混乱不堪 开发过程中,随着资源不断更新,遵循一定的项目结构和设计规范是非常必要的。这可以增加项目的可读性、维护性、扩展性以及提高团队协作效率 这里先做下简单的…...

展望2024年供应链安全

2023年是开展供应链安全,尤其是开源治理如火如荼的一年,开源治理是供应链安全最重要的一个方面,所以我们从开源治理谈起。我们先回顾一下2023的开源治理情况。我们从信通院《2023年中国企业开源治理全景观察》发布的信息。信通院调研了来自七…...

React 列表页实现

一、介绍 列表页是常用的功能,从后端获取列表数据,刷新到页面上。开发列表页需要考虑以下技术要点:1.如何翻页;2.如何进行内容搜索;3.何时进行页面刷新。 二、使用教程 1.user-service 根据用户id获取用户列表,返回…...

【程序人生】还记得当初自己为什么选择计算机?

✏️ 初识计算机: 还记得人生中第一次接触计算机编程是在高中,第一门编程语言是Python(很可惜由于条件限制的原因,当时没能坚持学下去......现在想来有点后悔,没能坚持,唉......)。但是&#xf…...

9-tornado-Template优化方法、个人信息案例、tornado中ORM的使用(peewee的使用、peewee_async)、WTForms的使用

在很多情况下,前端模板中在很多页面有都重复的内容可以使用,比如页头、页尾、甚至中间的内容都有可能重复。这时,为了提高开发效率,我们就可以考虑在共同的部分提取出来, 主要方法有如下: 1. 模板继承 2. U…...

IDEA中.java .class .jar的含义与联系

当使用IntelliJ IDEA这样的集成开发环境进行Java编程时,通常涉及.java源代码文件、.class编译后的字节码文件以及.jar可执行的Java存档文件。 1. .java 文件: 1.这些文件包含了Java源代码,以文本形式编写。它们通常位于项目中的源代码目录中…...

北斗三号短报文森林消防应急通信及天通野外图传综合方案

森林火灾突发性强、破坏性大、危险性高,是全球发生最频繁、处置最困难、危害最严重的自然灾害之一,是生态文明建设成果和森林资源安全的最大威胁,甚至可能引发生态灾难和社会危机。我国总体上是一个缺林少绿、生态脆弱的国家,是一…...

js Array.every()的使用

2023.12.13今天我学习了如何使用Array.every()的使用,这个方法是用于检测数组中所有存在的元素。 比如我们需要判断这个数组里面的全部元素是否都包含张三,可以这样写: let demo [{id: 1, name: 张三}, {id: 2, name: 张三五}, {id: 3, name…...

前端编码中快速填充内容--乱数假文

写前端页面的时候,如果要快速插入图片,可以使用 https://picsum.photos/ 详见笔者这篇博文: 工具网站:随机生成图片的网站-CSDN博客 可是,如果要快速填充文字内容该怎么做呢? 以前,我们都是…...

数据结构二维数组计算题,以行为主?以列为主?

1.假设以行序为主序存储二维数组Aarray[1..100,1..100],设每个数据元素占2个存储单元,基地址为10,则LOC[5,5]( )。 A.808 B.818 C.1010 D&…...

springboot(ssm电影院订票信息管理系统 影院购票系统Java系统

springboot(ssm电影院订票信息管理系统 影院购票系统Java系统 开发语言:Java 框架:ssm/springboot vue JDK版本:JDK1.8(或11) 服务器:tomcat 数据库:mysql 5.7(或8.0&#xff0…...

AI 问答-供应链管理-相关概念:SCM、SRM、MDM、DMS、ERP、OBS、CRM、WMS...

一、供应链管理是什么 供应链管理:理解供应链管理_snowli的博客-CSDN博客 二、SCM 供应链管理 SCM全称为“Supply Chain Management”,即供应链管理。 SCM是企业管理范畴中一个非常重要的概念,指的是企业与供应商、生产商、分销商等各方之…...

初学vue3与ts:vue3选项式api获取当前路由地址

vue2的获取方法 this.$route.pathvue3选项式api获取方法 import { useRouter } from vue-router; const router useRouter(); console.log(router) console.log(router.currentRoute.value.path)...

2023最新大模型实验室解决方案

人工智能是引领未来的新兴战略性技术,是驱动新一轮科技革命和产业变革的重要力量。近年来,人工智能相关技术持续演进,产业化和商业化进程不断提速,正在加快与千行百业深度融合。 大模型实验室架构图 大模型实验室建设内容 一、课…...

leetcode707.设计链表

题目描述 你可以选择使用单链表或者双链表,设计并实现自己的链表。 单链表中的节点应该具备两个属性:val 和 next 。val 是当前节点的值,next 是指向下一个节点的指针/引用。 如果是双向链表,则还需要属性 prev 以指示链表中的…...

【K8s】Kubernetes CRD 介绍(控制器)

文章目录 CRD 概述1. 操作CRD1.1 创建 CRD1.2 操作 CRD 2. 其他笔记2.1 Kubectl 发现机制2.2 校验 CR2.3 简称和属性 3. 架构设计3.1 控制器概览 参考 CRD 概述 CR(Custom Resource)其实就是在 Kubernetes 中定义一个自己的资源类型,是一个具…...

Python 小程序之PDF文档加解密

PDF文档的加密和解密 文章目录 PDF文档的加密和解密前言一、总体构思二、使用到的库三、PDF文档的加密1.用户输入模块2.打开并读取文档数据3.遍历保存数据到新文档4.新文档进行加密5.新文档命名生成路径6.保存新加密的文档 四、PDF文档的解密1.用户输入模块2.前提准备2.文件解密…...

使用python脚本一个简单的搭建ansible集群

1.环境说明: 角色主机名ip地址控制主机server192.168.174.150受控主机/被管节点client1192.168.174.151受控主机/被管节点client2192.168.174.152 2.安装python和pip包 yum install -y epel-release yum install -y python python-pip 3.pip安装依赖库 pip in…...

Phi-4-mini-reasoning效果展示:中文长文本多跳推理与隐含前提挖掘

Phi-4-mini-reasoning效果展示:中文长文本多跳推理与隐含前提挖掘 1. 模型核心能力概览 Phi-4-mini-reasoning是一款专注于推理任务的文本生成模型,在数学推导、逻辑分析和多步推理等场景下展现出独特优势。与通用聊天模型不同,它专为"…...

EVA-01效果展示:多场景图文问答案例,看AI如何精准识别与深度分析

EVA-01效果展示:多场景图文问答案例,看AI如何精准识别与深度分析 1. 视觉神经同步系统初体验 当你第一次打开EVA-01视觉神经同步系统,最直观的感受就是它独特的"暴走白昼"界面设计。与传统AI工具常见的深色背景不同,这…...

OpenClaw对话日志分析:Qwen3-14B挖掘用户真实需求

OpenClaw对话日志分析:Qwen3-14B挖掘用户真实需求 1. 为什么需要分析对话日志? 作为一个长期使用OpenClaw的开发者,我发现自己陷入了一个典型的技术陷阱:花大量时间开发新功能,却很少回头审视用户实际如何使用这些功…...

【Polars 2.0企业级数据清洗黄金法则】:5大生产环境避坑指南+实测性能提升3.7倍基准报告

第一章:Polars 2.0企业级数据清洗黄金法则总览Polars 2.0 以零拷贝语义、并行执行引擎与原生 Arrow 内存布局为核心,重构了企业级数据清洗的性能边界与工程可靠性。其惰性 API 与 eager 模式无缝协同,使复杂清洗流水线既可交互调试&#xff0…...

寒冬降临:当资本撤出AI测试赛道

2026年初,全球资本市场对AI技术的狂热投资骤然降温。随着VC基金转向更保守的资产配置,依赖融资的AI测试工具开发商面临生存危机:初创公司批量裁员,开源项目停止维护,企业采购的智能测试平台因无法续约沦为“断线木偶”…...

向量化计算落地难?揭秘阿里/腾讯内部正在用的7个Java Vector API高危避坑场景

第一章:Java Vector API向量化计算落地的现实困境Java Vector API(JEP 338、414、426、448)虽在JDK 16起逐步成熟,但实际工程化部署仍面临多重结构性约束。其核心矛盾在于:API设计高度抽象,而底层硬件适配、…...

Agent在零售行业能解决哪些痛点?——深度解析零售企业智能自动化转型路径

在2026年零售行业加速迈向智能化的背景下,AI Agent(人工智能智能体)已不再仅仅是技术实验室的产物,而是演变为重构行业价值链的核心驱动力。传统的零售运营长期受困于人力密集型模式,面临着全球化运营复杂度高、数据孤…...

01_第一篇:到底什么是嵌入式芯片?与通用CPU_GPU_DSP的核心区别

嵌入式芯片入门:到底什么是嵌入式芯片?与通用CPU/GPU/DSP的核心区别 引言:智能时代的核心基石,嵌入式芯片的无处不在 在万物互联的智能时代,我们的生活早已被无数“隐形大脑”环绕:清晨唤醒你的智能手环、出…...

2026届必备的五大AI辅助论文助手实际效果

Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek 基于大语言模型与自然语言处理技术的 AI 写作软件,是内容生产领域新兴工具&…...

Phi-4-mini-reasoning助力Java安装与环境配置:从JDK到IDE的智能指引

Phi-4-mini-reasoning助力Java安装与环境配置:从JDK到IDE的智能指引 1. 为什么需要智能指引来安装Java? 刚接触Java开发的朋友们,十有八九会在环境配置这一步卡壳。我见过太多初学者在JDK版本选择、环境变量配置这些环节反复折腾&#xff0…...