fastapi实现websocket在线聊天
最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题
1.问题难点
在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分布在不同的进程中,当进行通信时,可能无法找到已连接的连接对象。
2.解决方案
使用使用redis的订阅发布机制,使所有的进程都能进行消息订阅。这样能保证每个进程收到消息后都会进行相关的信息处理了。
3.方案设计
- 每个进程启动的时候都进行一个消息的订阅。
- 通过http请求,进行消息发布。
- 每个进程收到发布的消息后,进行判断是否由自己进行处理。
4.代码实现
①在服务启动时,进行消息订阅,并一直监听消息通道。当有消息发布时,进行消息处理。
# 初始化app
app = FastAPI(title="Ws Chat", description="测试", version="1.0.0")
app.openapi_version = "3.0.0"app.include_router(chat.app, prefix='/api/chat', tags=['Chat'])@app.on_event('startup')
async def on_startup():print(f"订阅初始化:{os.getpid()}")# 执行消息订阅机制https://aioredis.readthedocs.io/en/latest/examples/loop = asyncio.get_event_loop()loop.create_task(register_pubsub())async def reader(channel):# 进行消息的消费async for msg in channel.listen(): # 监听通道# print(msg)msg_data = msg.get("data")if msg_data and isinstance(msg_data, str):msg_data_dict = json.loads(msg_data)print(f"chat:{msg_data_dict}")sender = msg_data_dict.get("sender")# 进行消息处理await chat.cm.handle_websocket_message(msg_data_dict, sender)async def register_pubsub():pool = aioredis.from_url("redis://{}".format(host), db=db, password=password, port=port, encoding="utf-8", decode_responses=True)psub = pool.pubsub()async with psub as p:# 消息订阅await p.subscribe("chat")await reader(p)await p.unsubscribe("chat")
②websocket处理类
from fastapi import WebSocket, WebSocketDisconnectclass ConnectionManager:def __init__(self):# 保存当前所有的链接的websocket对象self.websocket_connections = {}async def connect(self, websocket: WebSocket, client_id):# 添加连接并发送欢迎消息await websocket.accept()self.websocket_connections[client_id] = websocketawait websocket.send_json({"type": "system","msg": "Welcome to the chat app!","sender": "system","recipient": client_id})try:# 处理消息while True:# 获取信息message = await websocket.receive_json()# 处理发送信息await self.handle_websocket_message(message, client_id)except WebSocketDisconnect:# 连接断开时移除连接del self.websocket_connections[client_id]async def handle_websocket_message(self, message: dict, client_id):# 处理私聊消息if message.get("type") == "private_message":recipient = message.get("recipient")msg = message.get("msg")recipient_conn = self.websocket_connections.get(recipient)if recipient_conn:# 在线await recipient_conn.send_json({"type": "private_message","sender": client_id,"msg": msg,"recipient": recipient})async def broadcast(self, message: dict):# 循环变量给所有在线激活的链接发送消息-全局广播for connection in self.websocket_connections:await connection.send_text(message)async def close(self, websocket: WebSocket, client_id):# 断开客户端的链接await websocket.close()del self.websocket_connections[client_id]async def disconnect(self, user_id):websocket: WebSocket = self.websocket_connections[user_id]await websocket.close()del self.websocket_connections[user_id]
③websocket连接
from app.chat_manager.server import ConnectionManagercm = ConnectionManager()@app.websocket("/connect_chat")
async def connect_chat(websocket: WebSocket, user_code: str):try:await cm.connect(websocket, user_code)except WebSocketDisconnect:# 连接断开时移除连接del cm.websocket_connections[user_code]
④http请求进行消息发布
@app.post("/create_chat", summary="发起聊天")
async def create_chat(param: DiagnosisChatSch, r=Depends(get_redis)):""""""ws_param = {"type": "private_message","msg": param.msg,"sender": param.sender,"recipient": param.recipient}# 进行消息发布await r.publish('diagnosis_chat', json.dumps(ws_param))return {'code': 200, 'msg': '成功', 'data': ''}
5.源码
github源码地址:https://github.com/zhangyukuo/fastapi_ws_chat
6.参考文章
https://www.cnblogs.com/a00ium/p/16931133.html
相关文章:
fastapi实现websocket在线聊天
最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题 1.问题难点 在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分…...
LLM推理部署(六):TogetherAI推出世界上LLM最快推理引擎,性能超过vLLM和TGI三倍
LLM能有多快?答案在于LLM推理的最新突破。 TogetherAI声称,他们在CUDA上构建了世界上最快的LLM推理引擎,该引擎运行在NVIDIA Tensor Core GPU上。Together推理引擎可以支持100多个开源大模型,比如Llama-2,并在Llama-2–…...
Unity | 渡鸦避难所-2 | 搭建场景并添加碰撞器
1 规范项目结构 上期中在导入一系列的商店资源包后,Assets 目录已经变的混乱不堪 开发过程中,随着资源不断更新,遵循一定的项目结构和设计规范是非常必要的。这可以增加项目的可读性、维护性、扩展性以及提高团队协作效率 这里先做下简单的…...
展望2024年供应链安全
2023年是开展供应链安全,尤其是开源治理如火如荼的一年,开源治理是供应链安全最重要的一个方面,所以我们从开源治理谈起。我们先回顾一下2023的开源治理情况。我们从信通院《2023年中国企业开源治理全景观察》发布的信息。信通院调研了来自七…...
React 列表页实现
一、介绍 列表页是常用的功能,从后端获取列表数据,刷新到页面上。开发列表页需要考虑以下技术要点:1.如何翻页;2.如何进行内容搜索;3.何时进行页面刷新。 二、使用教程 1.user-service 根据用户id获取用户列表,返回…...
【程序人生】还记得当初自己为什么选择计算机?
✏️ 初识计算机: 还记得人生中第一次接触计算机编程是在高中,第一门编程语言是Python(很可惜由于条件限制的原因,当时没能坚持学下去......现在想来有点后悔,没能坚持,唉......)。但是…...
9-tornado-Template优化方法、个人信息案例、tornado中ORM的使用(peewee的使用、peewee_async)、WTForms的使用
在很多情况下,前端模板中在很多页面有都重复的内容可以使用,比如页头、页尾、甚至中间的内容都有可能重复。这时,为了提高开发效率,我们就可以考虑在共同的部分提取出来, 主要方法有如下: 1. 模板继承 2. U…...
IDEA中.java .class .jar的含义与联系
当使用IntelliJ IDEA这样的集成开发环境进行Java编程时,通常涉及.java源代码文件、.class编译后的字节码文件以及.jar可执行的Java存档文件。 1. .java 文件: 1.这些文件包含了Java源代码,以文本形式编写。它们通常位于项目中的源代码目录中…...
北斗三号短报文森林消防应急通信及天通野外图传综合方案
森林火灾突发性强、破坏性大、危险性高,是全球发生最频繁、处置最困难、危害最严重的自然灾害之一,是生态文明建设成果和森林资源安全的最大威胁,甚至可能引发生态灾难和社会危机。我国总体上是一个缺林少绿、生态脆弱的国家,是一…...
js Array.every()的使用
2023.12.13今天我学习了如何使用Array.every()的使用,这个方法是用于检测数组中所有存在的元素。 比如我们需要判断这个数组里面的全部元素是否都包含张三,可以这样写: let demo [{id: 1, name: 张三}, {id: 2, name: 张三五}, {id: 3, name…...
前端编码中快速填充内容--乱数假文
写前端页面的时候,如果要快速插入图片,可以使用 https://picsum.photos/ 详见笔者这篇博文: 工具网站:随机生成图片的网站-CSDN博客 可是,如果要快速填充文字内容该怎么做呢? 以前,我们都是…...
数据结构二维数组计算题,以行为主?以列为主?
1.假设以行序为主序存储二维数组Aarray[1..100,1..100],设每个数据元素占2个存储单元,基地址为10,则LOC[5,5]( )。 A.808 B.818 C.1010 D&…...
springboot(ssm电影院订票信息管理系统 影院购票系统Java系统
springboot(ssm电影院订票信息管理系统 影院购票系统Java系统 开发语言:Java 框架:ssm/springboot vue JDK版本:JDK1.8(或11) 服务器:tomcat 数据库:mysql 5.7(或8.0࿰…...
AI 问答-供应链管理-相关概念:SCM、SRM、MDM、DMS、ERP、OBS、CRM、WMS...
一、供应链管理是什么 供应链管理:理解供应链管理_snowli的博客-CSDN博客 二、SCM 供应链管理 SCM全称为“Supply Chain Management”,即供应链管理。 SCM是企业管理范畴中一个非常重要的概念,指的是企业与供应商、生产商、分销商等各方之…...
初学vue3与ts:vue3选项式api获取当前路由地址
vue2的获取方法 this.$route.pathvue3选项式api获取方法 import { useRouter } from vue-router; const router useRouter(); console.log(router) console.log(router.currentRoute.value.path)...
2023最新大模型实验室解决方案
人工智能是引领未来的新兴战略性技术,是驱动新一轮科技革命和产业变革的重要力量。近年来,人工智能相关技术持续演进,产业化和商业化进程不断提速,正在加快与千行百业深度融合。 大模型实验室架构图 大模型实验室建设内容 一、课…...
leetcode707.设计链表
题目描述 你可以选择使用单链表或者双链表,设计并实现自己的链表。 单链表中的节点应该具备两个属性:val 和 next 。val 是当前节点的值,next 是指向下一个节点的指针/引用。 如果是双向链表,则还需要属性 prev 以指示链表中的…...
【K8s】Kubernetes CRD 介绍(控制器)
文章目录 CRD 概述1. 操作CRD1.1 创建 CRD1.2 操作 CRD 2. 其他笔记2.1 Kubectl 发现机制2.2 校验 CR2.3 简称和属性 3. 架构设计3.1 控制器概览 参考 CRD 概述 CR(Custom Resource)其实就是在 Kubernetes 中定义一个自己的资源类型,是一个具…...
Python 小程序之PDF文档加解密
PDF文档的加密和解密 文章目录 PDF文档的加密和解密前言一、总体构思二、使用到的库三、PDF文档的加密1.用户输入模块2.打开并读取文档数据3.遍历保存数据到新文档4.新文档进行加密5.新文档命名生成路径6.保存新加密的文档 四、PDF文档的解密1.用户输入模块2.前提准备2.文件解密…...
使用python脚本一个简单的搭建ansible集群
1.环境说明: 角色主机名ip地址控制主机server192.168.174.150受控主机/被管节点client1192.168.174.151受控主机/被管节点client2192.168.174.152 2.安装python和pip包 yum install -y epel-release yum install -y python python-pip 3.pip安装依赖库 pip in…...
RocketMQ延迟消息机制
两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
shell脚本--常见案例
1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件: 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...
visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
华为OD机考-机房布局
import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...
