【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)
【通用消息通知服务】0x3 - 发送我们第一条消息
项目地址: A generic message notification system[Github]
实现接收/发送Websocket消息
Websocket Connection Pool
import asyncio
from asyncio.queues import Queue
from asyncio.queues import QueueEmpty
from contextlib import suppress
from typing import Anyimport async_timeout
import orjson
from sanic.log import logger
from ulid import ULIDfrom common.depend import DependencyPING = "#ping"
PONG = "#pong"class WebsocketConnectionPoolDependency(Dependency, dependency_name="WebsocketPool", dependency_alias="ws_pool"
):def __init__(self, app) -> None:super().__init__(app)self.lock = asyncio.Lock()self.connections = {} # 存储websocket connectionsself.send_queues = {} # 各websocket发送队列self.recv_queues = {} # 各websocket接收消息队列self.close_callbacks = {} # websocket销毁回调self.listeners = {} # 连接监听函数def _gen_id(self) -> str:return str(ULID())async def add_connection(self, connection) -> str:async with self.lock:id = self._gen_id()self.connections[id] = connectionself.send_queues[id] = Queue()self.app.add_task(self.send_task(self.send_queues[id], connection),name=f"websocket_{id}_send_task",)self.recv_queues[id] = Queue()self.app.add_task(self.recv_task(self.recv_queues[id], connection),name=f"websocket_{id}_recv_task",)self.app.add_task(self.notify_task(id), name=f"websocket_{id}_notify_task")self.app.add_task(self.is_alive_task(id), name=f"websocket_{id}_is_alive_task")setattr(connection, "_id", id)return connection._iddef get_connection(self, connection_id: str):return self.connections.get(connection_id)async def add_listener(self, connection_id, handler) -> str:async with self.lock:id = self._gen_id()self.listeners.setdefault(connection_id, {}).update({id: handler})return idasync def remove_listener(self, connection_id, listener_id):async with self.lock:self.listeners.get(connection_id, {}).pop(listener_id, None)async def add_close_callback(self, connection_id, callback):async with self.lock:self.close_callbacks.setdefault(connection_id, []).append(callback)def is_alive(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idreturn connection_id in self.connectionsasync def remove_connection(self, connection: Any):if hasattr(connection, "_id"):connection_id = connection._idelse:connection_id = connectionif connection_id not in self.connections:# removed alreadyreturnasync with self.lock:logger.info(f"remove connection: {connection_id}")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_send_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_recv_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_notify_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_is_alive_task")if connection_id in self.send_queues:del self.send_queues[connection_id]if connection_id in self.recv_queues:del self.recv_queues[connection_id]if connection_id in self.listeners:del self.listeners[connection_id]if connection_id in self.close_callbacks:await self.do_close_callbacks(connection_id)del self.close_callbacks[connection_id]if connection_id in self.connections:del self.connections[connection_id]async def do_close_callbacks(self, connection_id):for cb in self.close_callbacks.get(connection_id, []):self.app.add_task(cb(connection_id))async def prepare(self):self.is_prepared = Truelogger.info("dependency:WebsocketPool is prepared")return self.is_preparedasync def check(self):return Trueasync def send_task(self, queue, connection):while self.is_alive(connection):try:data = queue.get_nowait()except QueueEmpty:await asyncio.sleep(0)continuetry:if isinstance(data, (bytes, str, int)):await connection.send(data)else:await connection.send(orjson.dumps(data).decode())queue.task_done()except Exception as err:breakasync def recv_task(self, queue, connection):while self.is_alive(connection):try:data = await connection.recv()await queue.put(data)logger.info(f"recv message: {data} from connection: {connection._id}")except Exception as err:breakasync def notify_task(self, connection_id):while self.is_alive(connection_id):try:logger.info(f"notify connection: {connection_id}'s listeners")data = await self.recv_queues[connection_id].get()for listener in self.listeners.get(connection_id, {}).values():await listener(connection_id, data)except Exception as err:passasync def is_alive_task(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idget_pong = asyncio.Event()async def wait_pong(connection_id, data):if data != PONG:returnget_pong.set()while True:get_pong.clear()await self.send(connection_id, PING)listener_id = await self.add_listener(connection_id, wait_pong)with suppress(asyncio.TimeoutError):async with async_timeout.timeout(self.app.config.WEBSOCKET_PING_TIMEOUT):await get_pong.wait()await self.remove_listener(connection_id, listener_id)if get_pong.is_set():# this connection is closedawait asyncio.sleep(self.app.config.WEBSOCKET_PING_INTERVAL)else:await self.remove_connection(connection_id)async def wait_closed(self, connection_id: str):"""if negative=True, only release when client close this connection."""while self.is_alive(connection_id):await asyncio.sleep(0)return Falseasync def send(self, connection_id: str, data: Any) -> bool:if not self.is_alive(connection_id):return Falseif connection_id not in self.send_queues:return Falseawait self.send_queues[connection_id].put(data)return True
Websocket Provider
from typing import Dict
from typing import List
from typing import Unionfrom pydantic import BaseModel
from pydantic import field_serializer
from sanic.log import loggerfrom apps.message.common.constants import MessageProviderType
from apps.message.common.constants import MessageStatus
from apps.message.common.interfaces import SendResult
from apps.message.providers.base import MessageProviderModel
from apps.message.validators.types import EndpointExID
from apps.message.validators.types import EndpointTag
from apps.message.validators.types import ETag
from apps.message.validators.types import ExID
from utils import get_appclass WebsocketMessageProviderModel(MessageProviderModel):class Info:name = "websocket"description = "Bio-Channel Communication"type = MessageProviderType.WEBSOCKETclass Capability:is_enabled = Truecan_send = Trueclass Message(BaseModel):connections: List[Union[EndpointTag, EndpointExID, str]]action: strpayload: Union[List, Dict, str, bytes]@field_serializer("connections")def serialize_connections(self, connections):return list(set(map(str, connections)))async def send(self, provider_id, message: Message) -> SendResult:app = get_app()websocket_pool = app.ctx.ws_poolsent_list = set()connections = []for connection in message.connections:if isinstance(connection, ETag):connections.extend([wfor c in await connection.decode()for w in c.get("websockets", [])])elif isinstance(connection, ExID):endpoint = await connection.decode()if endpoint:connections.extend(endpoint.get("websockets", []))else:connections.append(connection)connections = list(set(filter(lambda x: app.ctx.ws_pool.is_alive(connection), connections)))# logger.info(f"sending websocket message to {connections}")for connection in connections:if await websocket_pool.send(connection, data=message.model_dump_json(exclude=["connections"])):sent_list.add(connection)if sent_list:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.SUCCEEDED)else:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.FAILED)
websocket接口
@app.websocket("/websocket")
async def handle_websocket(request, ws):from apps.endpoint.listeners import register_websocket_endpointfrom apps.endpoint.listeners import unregister_websocket_endpointcon_id = Nonetry:ctx = request.app.ctxcon_id = await ctx.ws_pool.add_connection(ws)logger.info(f"new connection connected -> {con_id}")await ctx.ws_pool.add_listener(con_id, register_websocket_endpoint)await ctx.ws_pool.add_close_callback(con_id, unregister_websocket_endpoint)await ctx.ws_pool.send(con_id, data={"action": "on.connect", "payload": {"connection_id": con_id}})await ctx.ws_pool.wait_closed(con_id) # 等待连接断开finally:# 如果连接被客户端断开, handle_websocket将会被直接销毁, 所以销毁处理需要放在finally。request.app.add_task(request.app.ctx.ws_pool.remove_connection(con_id))
结果截图
相关文章:

【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)
【通用消息通知服务】0x3 - 发送我们第一条消息 项目地址: A generic message notification system[Github] 实现接收/发送Websocket消息 Websocket Connection Pool import asyncio from asyncio.queues import Queue from asyncio.queues import QueueEmpty from contextli…...

Eclipse打jar包与JavaDOC文档的生成
补充知识点——Eclipse打jar包与JavaDOC文档的生成 1、Eclipse如何打jar包,如何运行jar包 Java当中编写的Java代码,Java类、方法、接口这些东西就是项目中相关内容,到时候我们需要把代码提供给甲方、或者是我们需要运行我们编写的代码&…...
力扣:80. 删除有序数组中的重复项 II(Python3)
题目: 给你一个有序数组 nums ,请你 原地 删除重复出现的元素,使得出现次数超过两次的元素只出现两次 ,返回删除后数组的新长度。 不要使用额外的数组空间,你必须在 原地 修改输入数组 并在使用 O(1) 额外空间的条件下…...
linux:需要注意docker和aws的rds的mysql默认是UTC而不是中国时区
问题: 如题 解决办法: docker参考: mysql时间不对,修改时区_set global time_zone 无效_《小书生》的博客-CSDN博客 aws参考: https://www.youtube.com/watch?vB-NaqV-A1BY mysql - AWS修改RDS时区 - 个人文章 - Segm…...

访问 GitHub 方法
访问 GitHub 方法 方法一:最常见的就是 fq,但这个是违法的行为,自己私下搞可以,不能教你们。 方法二:利用加速器,这是正规合法操作。这里推荐一个免费的加速器,下载安装 Watt Toolkit加速器,原名…...

旅游APP外包开发注意事项
旅游类APP通常具有多种功能,以提供给用户更好的旅行体验。以下分享常见的旅游类APP功能以及在开发和使用这些APP时需要注意的问题,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 常见功能…...

ROS机器人编程---------(二)ROS中的核心概念
ROS机器人编程 ROS中的核心概念 ROS的通信机制 在ROS中结点是最小单元,比如说机器人的遥控器可以作为一个控制结点,机器人上的摄像头也可以看作一个结点,ROS通过协调各个结点来实现 在启动任何ROS结点之前,都必须先启动ROS Mas…...

Python学习教程:进程的调度
前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 要想多个进程交替运行,操作系统必须对这些进程进行调度, 这个调度也不是随即进行的,而是需要遵循一定的法则,由此就有了进程的调度算法。 python更多源码/资料/解答/教程等 …...

ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案
ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案 本文是ElasticSearch第三讲,在了解ElaticSearch之后,我们还要了解Elastic背后的生态 即我们常说的ELK;与此同时,还会给你展示ElasticSearch的案例场景&…...

基于Java+SpringBoot+Vue前后端分离农商对接系统设计和实现
博主介绍:✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专…...

【模方ModelFun】实景三维建模和修模4.0.7最新版安装包以及图文安装教程
模方ModelFun 具有多种功能,旨在帮助用户进行实景三维建模和修模。以下是一些主要功能的简要介绍: 实景三维建模:【模方ModelFun】提供了自动化的实景三维重建功能,可以从实景图像中提取几何形状和纹理信息,生成高质量…...

介绍几个搜索引擎
Google:全球最大的搜索引擎,提供全面的搜索服务,包括网页、图片、视频、新闻、地图等。 Baidu:中国最大的搜索引擎,提供类似于Google的全面搜索服务,同时也有网盘、知道等功能。 Bing:微软公司…...

iPhone 隔空投送使用指南:详细教程
本文介绍了如何在iPhone上使用隔空投送,包括如何在iOS 11到iOS 14的iPhone上启用它、发送文件以及接受或拒绝AirDrop发送给你的文件。对于iOS 7以上的旧款iPhone,提供了另一种方法。 如何打开隔空投送 你可以通过以下两种方式之一启动隔空投送功能:在“设置”应用程序或控…...

百度文心一言GPT免费入口也来了!!!
文心一言入口地址:文心一言能力全面开放 文心一言是百度全新一代知识增强大语言模型,文心大模型家族的新成员,能够与人对话互动,回答问题,协助创作,高效便捷地帮助人们获取信息、知识和灵感。 文心一言的技…...
线程调度和线程控制
在Java中,线程调度和线程控制是多线程编程中重要的概念,它们用于管理和控制线程的执行。以下是关于线程调度和线程控制的一些重要概念和技术: **1. 线程调度(Thread Scheduling): ** 线程调度是操作系统或Java虚拟机决定哪个线程在何时执行的过程。Java提供了多种线程调度…...
laravel excel导入导出
一、安装第三方 composer require maatwebsite/excel版本2.1和现在版本 有所不一样 二、导入 <?php namespace App\Import; use Maatwebsite\Excel\Concerns\ToCollection;class TestImport implements ToCollection {public function __construct(){}public function c…...

Windows无法删除分区怎么办?
我们知道Windows系统内置的磁盘管理工具是一个很实用的程序,可以帮助我们完成很多磁盘分区相关的基础操作,比如当我们想要删除硬盘上的某一个分区时,先想到的可能会是磁盘管理工具。但是当我们准备在磁盘管理工具中删除某个分区时,…...

【请求报错:javax.net.ssl.SSLHandshakeException: No appropriate protocol】
1、问题描述 在请求服务时报错说SSL握手异常协议禁用啥的,而且我的连接数据库的url也加了useSSLfalse javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate)2、解决方法 在网上查找了方法…...

elementUI textarea可自适应文本高度的文本域
效果图; 通过设置 autosize 属性可以使得文本域的高度能够根据文本内容自动进行调整,并且 autosize 还可以设定为一个对象,指定最小行数和最大行数。 <el-inputtype"textarea"autosizeplaceholder"请输入内容"v-model"te…...

WebRTC-Streamer交叉编译
WebRTC-Streamer交叉编译 flyfish 文章目录 WebRTC-Streamer交叉编译零、前言一、提前准备工作1 安装需要的工具2 可选的交叉编译工具3 默认执行python是python34 获取源码5 使用其他版本的方法 二、非交叉编译编译1 在 src目录执行 安装所需的依赖2 执行命令 三、 交叉编译1 …...

华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...

JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...

大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...

九天毕昇深度学习平台 | 如何安装库?
pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子: 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...

C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...

算法岗面试经验分享-大模型篇
文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer (1)资源 论文&a…...

基于TurtleBot3在Gazebo地图实现机器人远程控制
1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...