Python websocket
@router.websocket('/chat/{flow_id}')
接口代码,并了解其工作流程、涉及的组件以及如何基于此实现你的新 WebSocket 接口。以下内容将分为几个部分进行讲解:
- 接口整体概述
- 代码逐行解析
- 关键组件和依赖关系
- 如何基于此实现新功能
- 示例:创建一个新的 WebSocket 接口
1. 接口整体概述
@router.websocket('/chat/{flow_id}')
是一个 WebSocket 端点,用于处理实时聊天会话。它的主要职责包括:
- 认证和授权:通过 JWT 令牌验证用户身份。
- 连接管理:建立和管理 WebSocket 连接。
- 消息处理:接收和发送消息,处理聊天逻辑。
- 异常处理:处理连接断开、认证失败和其他异常情况。
2. 代码逐行解析
以下是你提供的 WebSocket 端点代码:
@router.websocket('/chat/{flow_id}')
async def chat(*,flow_id: str,websocket: WebSocket,t: Optional[str] = None,chat_id: Optional[str] = None,version_id: Optional[int] = None,Authorize: AuthJWT = Depends(),
):"""Websocket endpoint for chat."""try:if t:Authorize.jwt_required(auth_from='websocket', token=t)Authorize._token = telse:Authorize.jwt_required(auth_from='websocket', websocket=websocket)login_user = await get_login_user(Authorize)user_id = login_user.user_idif chat_id:with session_getter() as session:db_flow = session.get(Flow, flow_id)if not db_flow:await websocket.accept()message = '该技能已被删除'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)if db_flow.status != 2:await websocket.accept()message = '当前技能未上线,无法直接对话'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)graph_data = db_flow.dataelse:flow_data_key = 'flow_data_' + flow_idif version_id:flow_data_key = flow_data_key + '_' + str(version_id)if not flow_data_store.exists(flow_data_key) or str(flow_data_store.hget(flow_data_key, 'status'),'utf-8') != BuildStatus.SUCCESS.value:await websocket.accept()message = '当前编译没通过'await websocket.close(code=status.WS_1013_TRY_AGAIN_LATER, reason=message)returngraph_data = json.loads(flow_data_store.hget(flow_data_key, 'graph_data'))if not chat_id:# 调试时,每次都初始化对象chat_manager.set_cache(get_cache_key(flow_id, chat_id), None)with logger.contextualize(trace_id=chat_id):logger.info('websocket_verify_ok begin=handle_websocket')await chat_manager.handle_websocket(flow_id,chat_id,websocket,user_id,gragh_data=graph_data)except WebSocketException as exc:logger.error(f'Websocket exrror: {str(exc)}')await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))except Exception as exc:logger.exception(f'Error in chat websocket: {str(exc)}')messsage = exc.detail if isinstance(exc, HTTPException) else str(exc)if 'Could not validate credentials' in str(exc):await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason='Unauthorized')else:await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=messsage)
2.1. 函数签名和参数
@router.websocket('/chat/{flow_id}')
async def chat(*,flow_id: str,websocket: WebSocket,t: Optional[str] = None,chat_id: Optional[str] = None,version_id: Optional[int] = None,Authorize: AuthJWT = Depends(),
):
flow_id: str
:URL路径参数,用于标识聊天流程或技能。websocket: WebSocket
:WebSocket 连接对象。t: Optional[str]
:查询参数,可选,用于传递 JWT 令牌。chat_id: Optional[str]
:查询参数,可选,用于标识具体的聊天会话。version_id: Optional[int]
:查询参数,可选,用于标识流程的版本。Authorize: AuthJWT = Depends()
:依赖注入,用于处理 JWT 认证。
2.2. 认证和授权
if t:Authorize.jwt_required(auth_from='websocket', token=t)Authorize._token = t
else:Authorize.jwt_required(auth_from='websocket', websocket=websocket)
login_user = await get_login_user(Authorize)
user_id = login_user.user_id
-
条件判断
:
- 如果提供了
t
参数,使用它作为 JWT 令牌进行认证。 - 否则,尝试从 WebSocket 连接中提取 JWT 令牌进行认证。
- 如果提供了
-
获取用户信息
:
get_login_user(Authorize)
函数用于解析 JWT 令牌并获取登录用户的信息。- 获取
user_id
以供后续使用。
2.3. 流程或技能验证
if chat_id:with session_getter() as session:db_flow = session.get(Flow, flow_id)if not db_flow:await websocket.accept()message = '该技能已被删除'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)if db_flow.status != 2:await websocket.accept()message = '当前技能未上线,无法直接对话'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)graph_data = db_flow.data
else:flow_data_key = 'flow_data_' + flow_idif version_id:flow_data_key = flow_data_key + '_' + str(version_id)if not flow_data_store.exists(flow_data_key) or str(flow_data_store.hget(flow_data_key, 'status'),'utf-8') != BuildStatus.SUCCESS.value:await websocket.accept()message = '当前编译没通过'await websocket.close(code=status.WS_1013_TRY_AGAIN_LATER, reason=message)returngraph_data = json.loads(flow_data_store.hget(flow_data_key, 'graph_data'))
-
有
chat_id
:
- 从数据库获取
Flow
对象(流程或技能)。 - 如果
Flow
不存在,关闭连接并返回错误信息。 - 如果
Flow
状态不是2
(假设2
表示“已上线”),关闭连接并返回错误信息。 - 获取
graph_data
(流程图数据)用于后续处理。
- 从数据库获取
-
无
chat_id
:
- 根据
flow_id
和version_id
生成flow_data_key
。 - 从 Redis 缓存中检查该
flow_data_key
是否存在且状态为SUCCESS
。 - 如果条件不满足,关闭连接并返回错误信息。
- 获取
graph_data
(流程图数据)用于后续处理。
- 根据
2.4. 初始化聊天会话
if not chat_id:# 调试时,每次都初始化对象chat_manager.set_cache(get_cache_key(flow_id, chat_id), None)
-
无
chat_id
:
- 调用
chat_manager.set_cache
方法,初始化或重置聊天缓存。这在调试时可能会频繁初始化聊天对象。
- 调用
2.5. 处理聊天 WebSocket 会话
with logger.contextualize(trace_id=chat_id):logger.info('websocket_verify_ok begin=handle_websocket')await chat_manager.handle_websocket(flow_id,chat_id,websocket,user_id,gragh_data=graph_data)
-
上下文日志记录:使用
trace_id=chat_id
来上下文化日志,便于追踪特定聊天会话的日志。 -
调用
chat_manager.handle_websocket
:
- 传递
flow_id
、chat_id
、websocket
、user_id
和graph_data
。 handle_websocket
方法负责处理整个 WebSocket 会话,包括接收消息、发送响应、处理业务逻辑等。
- 传递
2.6. 异常处理
except WebSocketException as exc:logger.error(f'Websocket exrror: {str(exc)}')await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))
except Exception as exc:logger.exception(f'Error in chat websocket: {str(exc)}')messsage = exc.detail if isinstance(exc, HTTPException) else str(exc)if 'Could not validate credentials' in str(exc):await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason='Unauthorized')else:await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=messsage)
-
WebSocketException
:
- 记录错误日志。
- 以
WS_1011_INTERNAL_ERROR
状态码关闭连接。
-
其他异常
:
- 记录异常日志。
- 如果异常与认证失败相关,使用
WS_1008_POLICY_VIOLATION
状态码关闭连接并返回“Unauthorized”。 - 否则,使用
WS_1011_INTERNAL_ERROR
状态码关闭连接并返回错误信息。
3. 关键组件和依赖关系
为了全面理解这个 WebSocket 接口,我们需要了解以下关键组件和它们的职责:
3.1. AuthJWT
和 get_login_user
-
AuthJWT
:
- 负责处理 JWT 认证,包括生成、验证和解析令牌。
-
get_login_user
:
- 解析
AuthJWT
提供的令牌,获取登录用户的信息。 - 返回一个
UserPayload
对象,包含用户的user_id
、user_name
等信息。
- 解析
3.2. session_getter
-
职责
:
- 提供数据库会话上下文管理,确保数据库操作的事务性和资源管理。
-
用法
:
- 使用
with session_getter() as session
来获取数据库会话,并在with
块结束时自动关闭会话。
- 使用
3.3. ChatManager
-
职责
:
- 管理 WebSocket 连接。
- 处理聊天消息的接收与发送。
- 维护聊天历史。
- 处理具体的聊天逻辑,如调用 LangChain 对象、生成响应等。
-
主要方法
:
handle_websocket
:处理 WebSocket 会话,包括接收消息、处理消息、发送响应等。- 其他辅助方法,如
connect
、send_message
、close_connection
等,用于管理连接和消息传输。
3.4. Redis 缓存
-
职责
:
- 存储流程图数据 (
graph_data
) 和构建状态 (status
)。 - 用于验证流程是否已成功构建,以及存储和检索相关数据。
- 存储流程图数据 (
3.5. 数据库模型和 DAO 类
-
Flow
:
- 表示一个流程或技能的数据库模型。
-
ChatMessage
和
ChatMessageDao
:
ChatMessage
表示聊天消息的数据库模型。ChatMessageDao
提供对ChatMessage
的数据库操作方法,如查询、插入、更新等。
4. 如何基于此实现新 WebSocket 功能
假设你的需求是实现一个新的 WebSocket 接口,例如 /chat/{flow_id}/new_feature
,你可以按照以下步骤进行:
4.1. 定位文件和结构
基于现有项目结构,新的 WebSocket 接口应添加到相同的路由文件中,即 src/backend/bisheng/api/v1/chat.py
。如果有多个 WebSocket 接口,考虑将它们逻辑上分离到不同的模块中,例如 chat_new_feature.py
,然后在路由文件中引入。
4.2. 编写新的 WebSocket 端点
在 chat.py
文件中新增一个 WebSocket 端点:
@router.websocket('/chat/{flow_id}/new_feature')
async def chat_new_feature(*,flow_id: str,websocket: WebSocket,t: Optional[str] = None,chat_id: Optional[str] = None,version_id: Optional[int] = None,Authorize: AuthJWT = Depends(),
):"""Websocket endpoint for new feature."""try:if t:Authorize.jwt_required(auth_from='websocket', token=t)Authorize._token = telse:Authorize.jwt_required(auth_from='websocket', websocket=websocket)login_user = await get_login_user(Authorize)user_id = login_user.user_id# 验证流程或技能if chat_id:with session_getter() as session:db_flow = session.get(Flow, flow_id)if not db_flow:await websocket.accept()message = '该技能已被删除'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)if db_flow.status != 2:await websocket.accept()message = '当前技能未上线,无法直接对话'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)graph_data = db_flow.dataelse:flow_data_key = 'flow_data_' + flow_idif version_id:flow_data_key = flow_data_key + '_' + str(version_id)if not flow_data_store.exists(flow_data_key) or str(flow_data_store.hget(flow_data_key, 'status'),'utf-8') != BuildStatus.SUCCESS.value:await websocket.accept()message = '当前编译没通过'await websocket.close(code=status.WS_1013_TRY_AGAIN_LATER, reason=message)returngraph_data = json.loads(flow_data_store.hget(flow_data_key, 'graph_data'))if not chat_id:# 初始化对象chat_manager.set_cache(get_cache_key(flow_id, chat_id), None)with logger.contextualize(trace_id=chat_id):logger.info('websocket_verify_ok begin=handle_new_feature_websocket')# 调用 ChatManager 的新方法来处理新的功能await chat_manager.handle_new_feature_websocket(flow_id,chat_id,websocket,user_id,gragh_data=graph_data)except WebSocketException as exc:logger.error(f'Websocket error: {str(exc)}')await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))except Exception as exc:logger.exception(f'Error in new feature websocket: {str(exc)}')message = exc.detail if isinstance(exc, HTTPException) else str(exc)if 'Could not validate credentials' in str(exc):await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason='Unauthorized')else:await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=message)
4.3. 实现 ChatManager
的新方法
在 src/backend/bisheng/chat/manager.py
文件中,添加一个新的方法 handle_new_feature_websocket
,用于处理新功能的 WebSocket 会话。
# src/backend/bisheng/chat/manager.pyclass ChatManager:# 现有的 __init__ 和其他方法...async def handle_new_feature_websocket(self, flow_id: str, chat_id: str, websocket: WebSocket, user_id: int, gragh_data: dict):"""处理新功能的 WebSocket 会话。"""client_key = uuid.uuid4().hexchat_client = ChatClient(request=None, # 如果有 Request 对象,传递进去client_key=client_key,client_id=flow_id,chat_id=chat_id,user_id=user_id,login_user=UserPayload(user_id=user_id, user_name='username', role='user'), # 根据实际情况调整work_type=WorkType.NEW_FEATURE, # 假设有新的工作类型websocket=websocket,graph_data=gragh_data)await self.accept_client(client_key, chat_client, websocket)logger.debug(f'New feature websocket accepted: client_key={client_key}, flow_id={flow_id}, chat_id={chat_id}')try:while True:try:json_payload_receive = await asyncio.wait_for(websocket.receive_json(), timeout=2.0)except asyncio.TimeoutError:continuetry:payload = json.loads(json_payload_receive) if json_payload_receive else {}except TypeError:payload = json_payload_receive# 处理新功能的消息await chat_client.handle_new_feature_message(payload)except WebSocketDisconnect as e:logger.info(f'New feature websocket disconnect: {str(e)}')except IgnoreException:# 客户端内部自行关闭连接passexcept Exception as e:logger.exception(f'Error in new feature websocket: {str(e)}')await self.close_client(client_key, code=status.WS_1011_INTERNAL_ERROR, reason='未知错误')finally:await self.close_client(client_key, code=status.WS_1000_NORMAL_CLOSURE, reason='Client disconnected')self.clear_client(client_key)
解释:
- 创建
ChatClient
实例:- 新的
ChatClient
实例用于处理特定的聊天会话。 - 你可能需要扩展
ChatClient
类,添加处理新功能消息的方法,如handle_new_feature_message
。
- 新的
- 接收和处理消息:
- 持续接收来自客户端的 JSON 消息。
- 调用
handle_new_feature_message
方法处理消息。
- 异常处理:
- 处理连接断开、忽略的异常和其他未知错误,确保连接正确关闭。
4.4. 扩展 ChatClient
类
根据新功能的需求,扩展 ChatClient
类,添加处理新功能消息的方法。
# src/backend/bisheng/chat/client.pyclass ChatClient:# 现有的 __init__ 和其他方法...async def handle_new_feature_message(self, payload: dict):"""处理新功能的消息。"""# 根据 payload 的内容,执行相应的逻辑# 例如,执行某个特定的操作、调用服务、返回结果等# 示例:假设新功能是执行一个计算任务if 'action' in payload:action = payload['action']if action == 'compute':data = payload.get('data')result = self.perform_computation(data)response = {'result': result}await self.websocket.send_json(response)else:response = {'error': '未知的操作'}await self.websocket.send_json(response)def perform_computation(self, data):"""执行某个计算任务的示例方法。"""# 示例计算:计算数据的平方try:number = float(data)return number ** 2except (ValueError, TypeError):return 'Invalid input for computation'
解释:
handle_new_feature_message
方法:- 解析接收到的
payload
,根据内容执行相应的操作。 - 示例中,如果
action
为compute
,则执行一个计算任务并返回结果。
- 解析接收到的
perform_computation
方法:- 一个示例计算方法,接收数据并返回其平方。
4.5. 更新 ChatClient
类
确保 ChatClient
类中包含处理新功能消息的方法,如上面的 handle_new_feature_message
。如果需要处理更复杂的逻辑,可以进一步扩展。
5. 示例:创建一个新的 WebSocket 接口
假设你需要实现一个新的 WebSocket 接口 /chat/{flow_id}/translate
,用于实时翻译用户发送的消息。以下是具体的实现步骤:
5.1. 新增 WebSocket 端点
在 src/backend/bisheng/api/v1/chat.py
中新增 WebSocket 端点:
@router.websocket('/chat/{flow_id}/translate')
async def chat_translate(*,flow_id: str,websocket: WebSocket,t: Optional[str] = None,chat_id: Optional[str] = None,version_id: Optional[int] = None,Authorize: AuthJWT = Depends(),
):"""Websocket endpoint for translate feature."""try:# 认证和授权if t:Authorize.jwt_required(auth_from='websocket', token=t)Authorize._token = telse:Authorize.jwt_required(auth_from='websocket', websocket=websocket)login_user = await get_login_user(Authorize)user_id = login_user.user_id# 验证流程或技能if chat_id:with session_getter() as session:db_flow = session.get(Flow, flow_id)if not db_flow:await websocket.accept()message = '该技能已被删除'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)if db_flow.status != 2:await websocket.accept()message = '当前技能未上线,无法直接对话'await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=message)graph_data = db_flow.dataelse:flow_data_key = 'flow_data_' + flow_idif version_id:flow_data_key = flow_data_key + '_' + str(version_id)if not flow_data_store.exists(flow_data_key) or str(flow_data_store.hget(flow_data_key, 'status'),'utf-8') != BuildStatus.SUCCESS.value:await websocket.accept()message = '当前编译没通过'await websocket.close(code=status.WS_1013_TRY_AGAIN_LATER, reason=message)returngraph_data = json.loads(flow_data_store.hget(flow_data_key, 'graph_data'))if not chat_id:# 初始化对象chat_manager.set_cache(get_cache_key(flow_id, chat_id), None)with logger.contextualize(trace_id=chat_id):logger.info('websocket_translate_verify_ok begin=handle_translate_websocket')# 调用 ChatManager 的新方法来处理翻译功能await chat_manager.handle_translate_websocket(flow_id,chat_id,websocket,user_id,gragh_data=graph_data)except WebSocketException as exc:logger.error(f'Websocket error: {str(exc)}')await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))except Exception as exc:logger.exception(f'Error in translate websocket: {str(exc)}')message = exc.detail if isinstance(exc, HTTPException) else str(exc)if 'Could not validate credentials' in str(exc):await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason='Unauthorized')else:await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=message)
5.2. 实现 ChatManager
的新方法
在 src/backend/bisheng/chat/manager.py
中,添加 handle_translate_websocket
方法:
# src/backend/bisheng/chat/manager.pyclass ChatManager:# 现有的 __init__ 和其他方法...async def handle_translate_websocket(self, flow_id: str, chat_id: str, websocket: WebSocket, user_id: int, gragh_data: dict):"""处理翻译功能的 WebSocket 会话。"""client_key = uuid.uuid4().hexchat_client = ChatClient(request=None, # 如果有 Request 对象,传递进去client_key=client_key,client_id=flow_id,chat_id=chat_id,user_id=user_id,login_user=UserPayload(user_id=user_id, user_name='username', role='user'), # 根据实际情况调整work_type=WorkType.TRANSLATE, # 假设有翻译工作类型websocket=websocket,graph_data=gragh_data)await self.accept_client(client_key, chat_client, websocket)logger.debug(f'Translate websocket accepted: client_key={client_key}, flow_id={flow_id}, chat_id={chat_id}')try:while True:try:json_payload_receive = await asyncio.wait_for(websocket.receive_json(), timeout=2.0)except asyncio.TimeoutError:continuetry:payload = json.loads(json_payload_receive) if json_payload_receive else {}except TypeError:payload = json_payload_receive# 处理翻译功能的消息await chat_client.handle_translate_message(payload)except WebSocketDisconnect as e:logger.info(f'Translate websocket disconnect: {str(e)}')except IgnoreException:# 客户端内部自行关闭连接passexcept Exception as e:logger.exception(f'Error in translate websocket: {str(e)}')await self.close_client(client_key, code=status.WS_1011_INTERNAL_ERROR, reason='未知错误')finally:await self.close_client(client_key, code=status.WS_1000_NORMAL_CLOSURE, reason='Client disconnected')self.clear_client(client_key)
5.3. 扩展 ChatClient
类
在 src/backend/bisheng/chat/client.py
中,添加处理翻译消息的方法:
# src/backend/bisheng/chat/client.pyclass ChatClient:# 现有的 __init__ 和其他方法...async def handle_translate_message(self, payload: dict):"""处理翻译功能的消息。"""if 'text' in payload:text_to_translate = payload['text']target_language = payload.get('target_language', 'en') # 默认翻译为英文translated_text = self.translate_text(text_to_translate, target_language)response = {'translated_text': translated_text}await self.websocket.send_json(response)else:response = {'error': 'No text provided for translation'}await self.websocket.send_json(response)def translate_text(self, text: str, target_language: str) -> str:"""执行文本翻译的示例方法。实际实现中可以调用第三方翻译服务,如 Google Translate API。"""# 示例:简单的模拟翻译translations = {'en': lambda x: f'Translated to English: {x}','zh': lambda x: f'翻译为中文: {x}',# 添加更多语言的翻译逻辑}translate_func = translations.get(target_language, lambda x: 'Unsupported language')return translate_func(text)
解释:
-
handle_translate_message
方法:
- 检查
payload
中是否包含text
字段。 - 获取
target_language
(目标语言),默认翻译为英文。 - 调用
translate_text
方法进行翻译。 - 将翻译结果通过 WebSocket 发送给客户端。
- 检查
-
translate_text
方法:
- 示例中提供了一个简单的翻译逻辑,实际应用中应调用第三方翻译 API(如 Google Translate、DeepL 等)。
5.4. 测试新的 WebSocket 接口
在本地环境中启动服务器,并使用 WebSocket 客户端工具(如 websocat、Postman 或浏览器的开发者工具)进行测试。
示例测试流程:
-
建立连接:
websocat "ws://127.0.0.1:8000/api/v1/chat/<flow_id>/translate?t=<token>&chat_id=<chat_id>"
-
发送翻译请求:
{"text": "Hello, how are you?","target_language": "zh" }
-
接收翻译结果:
{"translated_text": "翻译为中文: Hello, how are you?" }
6. 关键步骤总结
基于现有的 WebSocket 接口实现新功能时,遵循以下步骤:
- 新增 WebSocket 端点:
- 在相应的路由文件中(如
chat.py
)新增一个 WebSocket 路由。 - 定义必要的参数和认证逻辑。
- 在相应的路由文件中(如
- 扩展
ChatManager
类:- 添加一个新的方法来处理特定功能的 WebSocket 会话(如
handle_translate_websocket
)。 - 在该方法中,创建和管理
ChatClient
实例,并处理消息接收和发送。
- 添加一个新的方法来处理特定功能的 WebSocket 会话(如
- 扩展
ChatClient
类:- 添加一个新的方法来处理特定功能的消息(如
handle_translate_message
)。 - 实现功能逻辑,如调用翻译服务,并通过 WebSocket 发送响应。
- 添加一个新的方法来处理特定功能的消息(如
- 测试和验证:
- 编写和运行单元测试,确保新功能正常工作。
- 使用 WebSocket 客户端工具进行手动测试,验证消息的发送和接收。
- 文档更新:
- 更新 API 文档,说明新的 WebSocket 接口的使用方法和参数。
7. 文件路径和结构概览
以下是相关文件的路径和主要职责:
- WebSocket 路由:
src/backend/bisheng/api/v1/chat.py
:定义所有聊天相关的 API 端点,包括 WebSocket 端点。
- 聊天管理器:
src/backend/bisheng/chat/manager.py
:管理 WebSocket 连接、聊天历史和消息处理。
- 聊天客户端:
src/backend/bisheng/chat/client.py
:处理具体的聊天会话,包括消息的接收和发送。
- 认证服务:
src/backend/bisheng/api/services/user_service.py
:处理用户认证和授权。
- 数据库模型和 DAO:
src/backend/bisheng/database/models/
:包含数据库模型和数据访问对象,用于操作数据库中的数据。
- 缓存管理:
src/backend/bisheng/cache/
:管理缓存数据,如 Redis 缓存。
8. 最佳实践建议
- 模块化设计:将不同功能的逻辑分离到不同的模块或类中,保持代码的清晰和可维护性。
- 错误处理:确保 WebSocket 端点具备全面的错误处理机制,能够优雅地处理各种异常情况,避免资源泄漏。
- 安全性:确保所有 WebSocket 连接都经过认证和授权,防止未授权的访问。
- 性能优化:
- 使用异步编程模型(如
asyncio
)处理高并发的 WebSocket 连接。 - 合理管理连接池和任务队列,避免阻塞主事件循环。
- 使用异步编程模型(如
- 测试覆盖:编写充分的单元测试和集成测试,确保新功能的稳定性和可靠性。
- 日志记录:在关键步骤和异常处记录详细的日志,便于调试和监控。
总结
通过详细解析现有的 WebSocket 接口,你可以了解其工作流程和关键组件,并基于此实现新的 WebSocket 功能。关键在于:
- 理解现有的架构和逻辑:了解如何管理 WebSocket 连接、处理消息和维护聊天历史。
- 遵循一致的编码风格和结构:确保新功能与现有代码保持一致,提高代码的可读性和可维护性。
- 注重安全性和性能:确保所有 WebSocket 连接都经过认证,合理管理资源,优化性能。
- 充分测试和验证:通过自动化测试和手动测试,确保新功能的稳定性和正确性。
相关文章:
Python websocket
router.websocket(/chat/{flow_id}) 接口代码,并了解其工作流程、涉及的组件以及如何基于此实现你的新 WebSocket 接口。以下内容将分为几个部分进行讲解: 接口整体概述代码逐行解析关键组件和依赖关系如何基于此实现新功能示例:创建一个新的…...

【MySQL-5】MySQL的内置函数
目录 1. 整体学习的思维导图 2. 日期函数 编辑 2.1 current_date() 2.2 current_time() 2.3 current_timestamp() 2.4 date(datetime) 2.5 now() 2.6 date_add() 2.7 date_sub() 2.8 datediff() 2.9 案例 2.9.1 创建一个出生日期登记簿 2.9.2 创建一个留言版 3…...

深度学习笔记之BERT(三)RoBERTa
深度学习笔记之RoBERTa 引言回顾:BERT的预训练策略RoBERTa训练过程分析静态掩码与动态掩码的比较模型输入模式与下一句预测使用大批量进行训练使用Byte-pair Encoding作为子词词元化算法更大的数据集和更多的训练步骤 RoBERTa配置 引言 本节将介绍一种基于 BERT \t…...
C++知识点总结(59):背包型动态规划
背包型动态规划 一、背包 dp1. 01 背包(限量)2. 完全背包(不限量)3. 口诀 二、例题1. 和是质数的子集数2. 黄金的太阳3. 负数子集和4. NASA的⻝物计划 一、背包 dp 1. 01 背包(限量) 假如有这几个物品&am…...

C++:反向迭代器的实现
反向迭代器的实现与 stack 、queue 相似,是通过适配器模式实现的。通过传入不同类型的迭代器来实现其反向迭代器。 正向迭代器中,begin() 指向第一个位置,end() 指向最后一个位置的下一个位置。 代码实现: template<class I…...
webGL入门教程_04vec3、vec4 和齐次坐标总结
vec3、vec4 和齐次坐标总结 1. vec3 和 vec4 1.1 什么是 vec3 和 vec4? vec3: GLSL 中的三维向量类型,包含 3 个浮点数:(x, y, z)。常用于表示三维坐标、RGB 颜色、法线、方向等。 vec4: GLSL 中的四维向量类型&…...

uniapp中父组件数组更新后与页面渲染数组不一致实战记录
简单描述一下业务场景方便理解: 商品设置功能,支持添加多组商品(点击添加按钮进行增加).可以对任意商品进行删除(点击减少按钮对选中的商品设置进行删除). 问题: 正常添加操作后,对已添加的任意商品删除后,控制台打印数组正常.但是与页面显示不一致.已上图为例,选中尾…...
优化 Conda 下载速度:详细的代理配置和网络管理策略
优化 Conda 下载速度:详细的代理配置和网络管理策略 为了彻底解决使用 Conda 下载 PyTorch 时遇到的速度问题,并确保下载过程稳定可靠,这需要一个详细、综合的技术方案。让我们更深入地分析问题原因,然后详尽地解释采取的解决策略…...

服务器遭受DDoS攻击后如何恢复运行?
当服务器遭受 DDoS(分布式拒绝服务)攻击 后,恢复运行需要快速采取应急措施来缓解攻击影响,并在恢复后加强防护以减少未来攻击的风险。以下是详细的分步指南: 一、应急处理步骤 1. 确认服务器是否正在遭受 DDoS 攻击 …...

MFC音视频播放器-支持电子放大等功能
前言 本播放器在VS2019下开发,使用ffmpegD3D实现视频播放渲染功能。同时本播放器支持录像功能、截图功能、音视频播放功能、码流信息显示、电子放大功能等。D3D的渲染同时支持surface和texture两种方式,电子放大功能是在D3D Texture方式下进行实现。以下…...
c语言编程1.17蓝桥杯历届试题-回文数字
题目描述 观察数字:12321,123321 都有一个共同的特征,无论从左到右读还是从右向左读,都是相同的。这样的数字叫做:回文数字。 本题要求你找到一些5位或6位的十进制数字。满足如下要求: 该数字的各个数位之…...
el-table 纵向 横向 多级表头
<el-table :data"tableData" class"diaTable":span-method"handleSpanMethod"border:header-cell-style"{background:#292929,color:#fff}"><!-- 纵向表头 --><el-table-column label"纵向表头" width"…...

uniapp开发微信小程序笔记8-uniapp使用vant框架
前言:其实用uni-app开发微信小程序的首选不应该是vant,因为vant没有专门给uni-app设置专栏,可以看到目前Vant 官方提供了 Vue 2 版本、Vue 3 版本和微信小程序版本,并由社区团队维护 React 版本和支付宝小程序版本。 但是vant的优…...
分布式项目使用Redis实现数据库对象自增主键ID
hello。大家好,我是灰小猿,一个超会写bug的程序猿! 在分布式项目中,数据表的主键ID一般可能存在于UUID或自增ID这两种形式,UUID好理解而且实现起来也最容易,但是缺点就是数据表中的主键ID是32位的字符串&a…...
npm-运行项目报错:A complete log of this run can be found .......npm-cache_logs\
1.问题 没有找到对应的某种依赖,node_modules出现问题。 2.解决 (1)查看对应依赖是否引入或者是由于合并分支错误 引入js或依赖不存在。谨慎删除依赖包 (2)查找对应引入依赖进行安装最后解决方法-删除依赖包清除缓存 npm cache clean --force (2)重新向同事引入…...

SolarCube: 高分辨率太阳辐照预测基准数据集
太阳能作为清洁能源在减缓气候变化中的作用日益凸显,其稳定的供应对电网管理至关重要。然而,太阳辐照受云层和天气变化的影响波动较大,给光伏电力的管理带来挑战,尤其是在调度、储能和备用系统管理方面。因此,精确的太…...

华为小米苹果三星移动设备访问windows共享文件夹windows11
如果移动设备和windows电脑都在同一个局域网内,可以用移动设备访问windows11的共享文件夹 1、设置共享文件夹 2、添加everyone用户即可 3、查看ip地址 4、在华为手机上点击文件管理,里面有个网上邻居 5、正常情况下,华为手机会扫描到同一局域…...
网络安全三防指南:只防病毒不安全
5月17日,瑞星全球反病毒监测网截获一个恶性病毒,由于该病毒的破坏能力和当年著名的CIH病毒几乎完全一样,因此瑞星将该病毒命名为“新CIH”病毒。被“新CIH”感染的电脑,主板和硬盘数据将被破坏,致使电脑无法启动&#…...

论文概览 |《Urban Analytics and City Science》2023.05 Vol.50 Issue.4
本次给大家整理的是《Environment and Planning B: Urban Analytics and City Science》杂志2023年5月第50卷第4期的论文的题目和摘要,一共包括19篇SCI论文! 论文1 Data analytics and sustainable urban development in global cities 全球城市的数据…...

【ROS2】ROS2 C++版本 与 Python版本比较
ROS 系列学习教程(总目录) ROS2 系列学习教程(总目录) 目录 一、功能包的构建方式二、功能包组织结构三、代码编写四、性能与效率五、兼容性六、应用场景 目前ROS开发主要使用 C 和 Python 语言,这里会分别实现并讲解。 相较于ROS1,ROS2的 C 和 Python …...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...

Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...

关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...

k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...

企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...

Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

基于PHP的连锁酒店管理系统
有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发,数据库mysql,前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...