fastapi实现websocket在线聊天
最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题
1.问题难点
在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分布在不同的进程中,当进行通信时,可能无法找到已连接的连接对象。
2.解决方案
使用使用redis的订阅发布机制,使所有的进程都能进行消息订阅。这样能保证每个进程收到消息后都会进行相关的信息处理了。
3.方案设计
- 每个进程启动的时候都进行一个消息的订阅。
- 通过http请求,进行消息发布。
- 每个进程收到发布的消息后,进行判断是否由自己进行处理。
4.代码实现
①在服务启动时,进行消息订阅,并一直监听消息通道。当有消息发布时,进行消息处理。
# 初始化app
app = FastAPI(title="Ws Chat", description="测试", version="1.0.0")
app.openapi_version = "3.0.0"app.include_router(chat.app, prefix='/api/chat', tags=['Chat'])@app.on_event('startup')
async def on_startup():print(f"订阅初始化:{os.getpid()}")# 执行消息订阅机制https://aioredis.readthedocs.io/en/latest/examples/loop = asyncio.get_event_loop()loop.create_task(register_pubsub())async def reader(channel):# 进行消息的消费async for msg in channel.listen(): # 监听通道# print(msg)msg_data = msg.get("data")if msg_data and isinstance(msg_data, str):msg_data_dict = json.loads(msg_data)print(f"chat:{msg_data_dict}")sender = msg_data_dict.get("sender")# 进行消息处理await chat.cm.handle_websocket_message(msg_data_dict, sender)async def register_pubsub():pool = aioredis.from_url("redis://{}".format(host), db=db, password=password, port=port, encoding="utf-8", decode_responses=True)psub = pool.pubsub()async with psub as p:# 消息订阅await p.subscribe("chat")await reader(p)await p.unsubscribe("chat")
②websocket处理类
from fastapi import WebSocket, WebSocketDisconnectclass ConnectionManager:def __init__(self):# 保存当前所有的链接的websocket对象self.websocket_connections = {}async def connect(self, websocket: WebSocket, client_id):# 添加连接并发送欢迎消息await websocket.accept()self.websocket_connections[client_id] = websocketawait websocket.send_json({"type": "system","msg": "Welcome to the chat app!","sender": "system","recipient": client_id})try:# 处理消息while True:# 获取信息message = await websocket.receive_json()# 处理发送信息await self.handle_websocket_message(message, client_id)except WebSocketDisconnect:# 连接断开时移除连接del self.websocket_connections[client_id]async def handle_websocket_message(self, message: dict, client_id):# 处理私聊消息if message.get("type") == "private_message":recipient = message.get("recipient")msg = message.get("msg")recipient_conn = self.websocket_connections.get(recipient)if recipient_conn:# 在线await recipient_conn.send_json({"type": "private_message","sender": client_id,"msg": msg,"recipient": recipient})async def broadcast(self, message: dict):# 循环变量给所有在线激活的链接发送消息-全局广播for connection in self.websocket_connections:await connection.send_text(message)async def close(self, websocket: WebSocket, client_id):# 断开客户端的链接await websocket.close()del self.websocket_connections[client_id]async def disconnect(self, user_id):websocket: WebSocket = self.websocket_connections[user_id]await websocket.close()del self.websocket_connections[user_id]
③websocket连接
from app.chat_manager.server import ConnectionManagercm = ConnectionManager()@app.websocket("/connect_chat")
async def connect_chat(websocket: WebSocket, user_code: str):try:await cm.connect(websocket, user_code)except WebSocketDisconnect:# 连接断开时移除连接del cm.websocket_connections[user_code]
④http请求进行消息发布
@app.post("/create_chat", summary="发起聊天")
async def create_chat(param: DiagnosisChatSch, r=Depends(get_redis)):""""""ws_param = {"type": "private_message","msg": param.msg,"sender": param.sender,"recipient": param.recipient}# 进行消息发布await r.publish('diagnosis_chat', json.dumps(ws_param))return {'code': 200, 'msg': '成功', 'data': ''}
5.源码
github源码地址:https://github.com/zhangyukuo/fastapi_ws_chat
6.参考文章
https://www.cnblogs.com/a00ium/p/16931133.html
相关文章:
fastapi实现websocket在线聊天
最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题 1.问题难点 在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分…...

LLM推理部署(六):TogetherAI推出世界上LLM最快推理引擎,性能超过vLLM和TGI三倍
LLM能有多快?答案在于LLM推理的最新突破。 TogetherAI声称,他们在CUDA上构建了世界上最快的LLM推理引擎,该引擎运行在NVIDIA Tensor Core GPU上。Together推理引擎可以支持100多个开源大模型,比如Llama-2,并在Llama-2–…...

Unity | 渡鸦避难所-2 | 搭建场景并添加碰撞器
1 规范项目结构 上期中在导入一系列的商店资源包后,Assets 目录已经变的混乱不堪 开发过程中,随着资源不断更新,遵循一定的项目结构和设计规范是非常必要的。这可以增加项目的可读性、维护性、扩展性以及提高团队协作效率 这里先做下简单的…...

展望2024年供应链安全
2023年是开展供应链安全,尤其是开源治理如火如荼的一年,开源治理是供应链安全最重要的一个方面,所以我们从开源治理谈起。我们先回顾一下2023的开源治理情况。我们从信通院《2023年中国企业开源治理全景观察》发布的信息。信通院调研了来自七…...
React 列表页实现
一、介绍 列表页是常用的功能,从后端获取列表数据,刷新到页面上。开发列表页需要考虑以下技术要点:1.如何翻页;2.如何进行内容搜索;3.何时进行页面刷新。 二、使用教程 1.user-service 根据用户id获取用户列表,返回…...

【程序人生】还记得当初自己为什么选择计算机?
✏️ 初识计算机: 还记得人生中第一次接触计算机编程是在高中,第一门编程语言是Python(很可惜由于条件限制的原因,当时没能坚持学下去......现在想来有点后悔,没能坚持,唉......)。但是…...
9-tornado-Template优化方法、个人信息案例、tornado中ORM的使用(peewee的使用、peewee_async)、WTForms的使用
在很多情况下,前端模板中在很多页面有都重复的内容可以使用,比如页头、页尾、甚至中间的内容都有可能重复。这时,为了提高开发效率,我们就可以考虑在共同的部分提取出来, 主要方法有如下: 1. 模板继承 2. U…...
IDEA中.java .class .jar的含义与联系
当使用IntelliJ IDEA这样的集成开发环境进行Java编程时,通常涉及.java源代码文件、.class编译后的字节码文件以及.jar可执行的Java存档文件。 1. .java 文件: 1.这些文件包含了Java源代码,以文本形式编写。它们通常位于项目中的源代码目录中…...

北斗三号短报文森林消防应急通信及天通野外图传综合方案
森林火灾突发性强、破坏性大、危险性高,是全球发生最频繁、处置最困难、危害最严重的自然灾害之一,是生态文明建设成果和森林资源安全的最大威胁,甚至可能引发生态灾难和社会危机。我国总体上是一个缺林少绿、生态脆弱的国家,是一…...

js Array.every()的使用
2023.12.13今天我学习了如何使用Array.every()的使用,这个方法是用于检测数组中所有存在的元素。 比如我们需要判断这个数组里面的全部元素是否都包含张三,可以这样写: let demo [{id: 1, name: 张三}, {id: 2, name: 张三五}, {id: 3, name…...

前端编码中快速填充内容--乱数假文
写前端页面的时候,如果要快速插入图片,可以使用 https://picsum.photos/ 详见笔者这篇博文: 工具网站:随机生成图片的网站-CSDN博客 可是,如果要快速填充文字内容该怎么做呢? 以前,我们都是…...

数据结构二维数组计算题,以行为主?以列为主?
1.假设以行序为主序存储二维数组Aarray[1..100,1..100],设每个数据元素占2个存储单元,基地址为10,则LOC[5,5]( )。 A.808 B.818 C.1010 D&…...
springboot(ssm电影院订票信息管理系统 影院购票系统Java系统
springboot(ssm电影院订票信息管理系统 影院购票系统Java系统 开发语言:Java 框架:ssm/springboot vue JDK版本:JDK1.8(或11) 服务器:tomcat 数据库:mysql 5.7(或8.0࿰…...
AI 问答-供应链管理-相关概念:SCM、SRM、MDM、DMS、ERP、OBS、CRM、WMS...
一、供应链管理是什么 供应链管理:理解供应链管理_snowli的博客-CSDN博客 二、SCM 供应链管理 SCM全称为“Supply Chain Management”,即供应链管理。 SCM是企业管理范畴中一个非常重要的概念,指的是企业与供应商、生产商、分销商等各方之…...

初学vue3与ts:vue3选项式api获取当前路由地址
vue2的获取方法 this.$route.pathvue3选项式api获取方法 import { useRouter } from vue-router; const router useRouter(); console.log(router) console.log(router.currentRoute.value.path)...

2023最新大模型实验室解决方案
人工智能是引领未来的新兴战略性技术,是驱动新一轮科技革命和产业变革的重要力量。近年来,人工智能相关技术持续演进,产业化和商业化进程不断提速,正在加快与千行百业深度融合。 大模型实验室架构图 大模型实验室建设内容 一、课…...
leetcode707.设计链表
题目描述 你可以选择使用单链表或者双链表,设计并实现自己的链表。 单链表中的节点应该具备两个属性:val 和 next 。val 是当前节点的值,next 是指向下一个节点的指针/引用。 如果是双向链表,则还需要属性 prev 以指示链表中的…...

【K8s】Kubernetes CRD 介绍(控制器)
文章目录 CRD 概述1. 操作CRD1.1 创建 CRD1.2 操作 CRD 2. 其他笔记2.1 Kubectl 发现机制2.2 校验 CR2.3 简称和属性 3. 架构设计3.1 控制器概览 参考 CRD 概述 CR(Custom Resource)其实就是在 Kubernetes 中定义一个自己的资源类型,是一个具…...

Python 小程序之PDF文档加解密
PDF文档的加密和解密 文章目录 PDF文档的加密和解密前言一、总体构思二、使用到的库三、PDF文档的加密1.用户输入模块2.打开并读取文档数据3.遍历保存数据到新文档4.新文档进行加密5.新文档命名生成路径6.保存新加密的文档 四、PDF文档的解密1.用户输入模块2.前提准备2.文件解密…...
使用python脚本一个简单的搭建ansible集群
1.环境说明: 角色主机名ip地址控制主机server192.168.174.150受控主机/被管节点client1192.168.174.151受控主机/被管节点client2192.168.174.152 2.安装python和pip包 yum install -y epel-release yum install -y python python-pip 3.pip安装依赖库 pip in…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
2024年赣州旅游投资集团社会招聘笔试真
2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...