当前位置: 首页 > news >正文

【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)

【通用消息通知服务】0x3 - 发送我们第一条消息

项目地址: A generic message notification system[Github]

实现接收/发送Websocket消息

Websocket Connection Pool

import asyncio
from asyncio.queues import Queue
from asyncio.queues import QueueEmpty
from contextlib import suppress
from typing import Anyimport async_timeout
import orjson
from sanic.log import logger
from ulid import ULIDfrom common.depend import DependencyPING = "#ping"
PONG = "#pong"class WebsocketConnectionPoolDependency(Dependency, dependency_name="WebsocketPool", dependency_alias="ws_pool"
):def __init__(self, app) -> None:super().__init__(app)self.lock = asyncio.Lock()self.connections = {}	# 存储websocket connectionsself.send_queues = {}   # 各websocket发送队列self.recv_queues = {}   # 各websocket接收消息队列self.close_callbacks = {} # websocket销毁回调self.listeners = {} # 连接监听函数def _gen_id(self) -> str:return str(ULID())async def add_connection(self, connection) -> str:async with self.lock:id = self._gen_id()self.connections[id] = connectionself.send_queues[id] = Queue()self.app.add_task(self.send_task(self.send_queues[id], connection),name=f"websocket_{id}_send_task",)self.recv_queues[id] = Queue()self.app.add_task(self.recv_task(self.recv_queues[id], connection),name=f"websocket_{id}_recv_task",)self.app.add_task(self.notify_task(id), name=f"websocket_{id}_notify_task")self.app.add_task(self.is_alive_task(id), name=f"websocket_{id}_is_alive_task")setattr(connection, "_id", id)return connection._iddef get_connection(self, connection_id: str):return self.connections.get(connection_id)async def add_listener(self, connection_id, handler) -> str:async with self.lock:id = self._gen_id()self.listeners.setdefault(connection_id, {}).update({id: handler})return idasync def remove_listener(self, connection_id, listener_id):async with self.lock:self.listeners.get(connection_id, {}).pop(listener_id, None)async def add_close_callback(self, connection_id, callback):async with self.lock:self.close_callbacks.setdefault(connection_id, []).append(callback)def is_alive(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idreturn connection_id in self.connectionsasync def remove_connection(self, connection: Any):if hasattr(connection, "_id"):connection_id = connection._idelse:connection_id = connectionif connection_id not in self.connections:# removed alreadyreturnasync with self.lock:logger.info(f"remove connection: {connection_id}")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_send_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_recv_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_notify_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_is_alive_task")if connection_id in self.send_queues:del self.send_queues[connection_id]if connection_id in self.recv_queues:del self.recv_queues[connection_id]if connection_id in self.listeners:del self.listeners[connection_id]if connection_id in self.close_callbacks:await self.do_close_callbacks(connection_id)del self.close_callbacks[connection_id]if connection_id in self.connections:del self.connections[connection_id]async def do_close_callbacks(self, connection_id):for cb in self.close_callbacks.get(connection_id, []):self.app.add_task(cb(connection_id))async def prepare(self):self.is_prepared = Truelogger.info("dependency:WebsocketPool is prepared")return self.is_preparedasync def check(self):return Trueasync def send_task(self, queue, connection):while self.is_alive(connection):try:data = queue.get_nowait()except QueueEmpty:await asyncio.sleep(0)continuetry:if isinstance(data, (bytes, str, int)):await connection.send(data)else:await connection.send(orjson.dumps(data).decode())queue.task_done()except Exception as err:breakasync def recv_task(self, queue, connection):while self.is_alive(connection):try:data = await connection.recv()await queue.put(data)logger.info(f"recv message: {data} from connection: {connection._id}")except Exception as err:breakasync def notify_task(self, connection_id):while self.is_alive(connection_id):try:logger.info(f"notify connection: {connection_id}'s listeners")data = await self.recv_queues[connection_id].get()for listener in self.listeners.get(connection_id, {}).values():await listener(connection_id, data)except Exception as err:passasync def is_alive_task(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idget_pong = asyncio.Event()async def wait_pong(connection_id, data):if data != PONG:returnget_pong.set()while True:get_pong.clear()await self.send(connection_id, PING)listener_id = await self.add_listener(connection_id, wait_pong)with suppress(asyncio.TimeoutError):async with async_timeout.timeout(self.app.config.WEBSOCKET_PING_TIMEOUT):await get_pong.wait()await self.remove_listener(connection_id, listener_id)if get_pong.is_set():# this connection is closedawait asyncio.sleep(self.app.config.WEBSOCKET_PING_INTERVAL)else:await self.remove_connection(connection_id)async def wait_closed(self, connection_id: str):"""if negative=True, only release when client close this connection."""while self.is_alive(connection_id):await asyncio.sleep(0)return Falseasync def send(self, connection_id: str, data: Any) -> bool:if not self.is_alive(connection_id):return Falseif connection_id not in self.send_queues:return Falseawait self.send_queues[connection_id].put(data)return True

Websocket Provider

from typing import Dict
from typing import List
from typing import Unionfrom pydantic import BaseModel
from pydantic import field_serializer
from sanic.log import loggerfrom apps.message.common.constants import MessageProviderType
from apps.message.common.constants import MessageStatus
from apps.message.common.interfaces import SendResult
from apps.message.providers.base import MessageProviderModel
from apps.message.validators.types import EndpointExID
from apps.message.validators.types import EndpointTag
from apps.message.validators.types import ETag
from apps.message.validators.types import ExID
from utils import get_appclass WebsocketMessageProviderModel(MessageProviderModel):class Info:name = "websocket"description = "Bio-Channel Communication"type = MessageProviderType.WEBSOCKETclass Capability:is_enabled = Truecan_send = Trueclass Message(BaseModel):connections: List[Union[EndpointTag, EndpointExID, str]]action: strpayload: Union[List, Dict, str, bytes]@field_serializer("connections")def serialize_connections(self, connections):return list(set(map(str, connections)))async def send(self, provider_id, message: Message) -> SendResult:app = get_app()websocket_pool = app.ctx.ws_poolsent_list = set()connections = []for connection in message.connections:if isinstance(connection, ETag):connections.extend([wfor c in await connection.decode()for w in c.get("websockets", [])])elif isinstance(connection, ExID):endpoint = await connection.decode()if endpoint:connections.extend(endpoint.get("websockets", []))else:connections.append(connection)connections = list(set(filter(lambda x: app.ctx.ws_pool.is_alive(connection), connections)))# logger.info(f"sending websocket message to {connections}")for connection in connections:if await websocket_pool.send(connection, data=message.model_dump_json(exclude=["connections"])):sent_list.add(connection)if sent_list:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.SUCCEEDED)else:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.FAILED)

websocket接口


@app.websocket("/websocket")
async def handle_websocket(request, ws):from apps.endpoint.listeners import register_websocket_endpointfrom apps.endpoint.listeners import unregister_websocket_endpointcon_id = Nonetry:ctx = request.app.ctxcon_id = await ctx.ws_pool.add_connection(ws)logger.info(f"new connection connected -> {con_id}")await ctx.ws_pool.add_listener(con_id, register_websocket_endpoint)await ctx.ws_pool.add_close_callback(con_id, unregister_websocket_endpoint)await ctx.ws_pool.send(con_id, data={"action": "on.connect", "payload": {"connection_id": con_id}})await ctx.ws_pool.wait_closed(con_id) # 等待连接断开finally:# 如果连接被客户端断开, handle_websocket将会被直接销毁, 所以销毁处理需要放在finally。request.app.add_task(request.app.ctx.ws_pool.remove_connection(con_id))

结果截图

websocket connected

相关文章:

【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)

【通用消息通知服务】0x3 - 发送我们第一条消息 项目地址: A generic message notification system[Github] 实现接收/发送Websocket消息 Websocket Connection Pool import asyncio from asyncio.queues import Queue from asyncio.queues import QueueEmpty from contextli…...

Eclipse打jar包与JavaDOC文档的生成

补充知识点——Eclipse打jar包与JavaDOC文档的生成 1、Eclipse如何打jar包,如何运行jar包 Java当中编写的Java代码,Java类、方法、接口这些东西就是项目中相关内容,到时候我们需要把代码提供给甲方、或者是我们需要运行我们编写的代码&…...

力扣:80. 删除有序数组中的重复项 II(Python3)

题目: 给你一个有序数组 nums ,请你 原地 删除重复出现的元素,使得出现次数超过两次的元素只出现两次 ,返回删除后数组的新长度。 不要使用额外的数组空间,你必须在 原地 修改输入数组 并在使用 O(1) 额外空间的条件下…...

linux:需要注意docker和aws的rds的mysql默认是UTC而不是中国时区

问题: 如题 解决办法: docker参考: mysql时间不对,修改时区_set global time_zone 无效_《小书生》的博客-CSDN博客 aws参考: https://www.youtube.com/watch?vB-NaqV-A1BY mysql - AWS修改RDS时区 - 个人文章 - Segm…...

访问 GitHub 方法

访问 GitHub 方法 方法一:最常见的就是 fq,但这个是违法的行为,自己私下搞可以,不能教你们。 方法二:利用加速器,这是正规合法操作。这里推荐一个免费的加速器,下载安装 Watt Toolkit加速器,原名…...

旅游APP外包开发注意事项

旅游类APP通常具有多种功能,以提供给用户更好的旅行体验。以下分享常见的旅游类APP功能以及在开发和使用这些APP时需要注意的问题,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 常见功能…...

ROS机器人编程---------(二)ROS中的核心概念

ROS机器人编程 ROS中的核心概念 ROS的通信机制 在ROS中结点是最小单元,比如说机器人的遥控器可以作为一个控制结点,机器人上的摄像头也可以看作一个结点,ROS通过协调各个结点来实现 在启动任何ROS结点之前,都必须先启动ROS Mas…...

Python学习教程:进程的调度

前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 要想多个进程交替运行,操作系统必须对这些进程进行调度, 这个调度也不是随即进行的,而是需要遵循一定的法则,由此就有了进程的调度算法。 python更多源码/资料/解答/教程等 …...

ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案

ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案 本文是ElasticSearch第三讲,在了解ElaticSearch之后,我们还要了解Elastic背后的生态 即我们常说的ELK;与此同时,还会给你展示ElasticSearch的案例场景&…...

基于Java+SpringBoot+Vue前后端分离农商对接系统设计和实现

博主介绍:✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专…...

【模方ModelFun】实景三维建模和修模4.0.7最新版安装包以及图文安装教程

模方ModelFun 具有多种功能,旨在帮助用户进行实景三维建模和修模。以下是一些主要功能的简要介绍: 实景三维建模:【模方ModelFun】提供了自动化的实景三维重建功能,可以从实景图像中提取几何形状和纹理信息,生成高质量…...

介绍几个搜索引擎

Google:全球最大的搜索引擎,提供全面的搜索服务,包括网页、图片、视频、新闻、地图等。 Baidu:中国最大的搜索引擎,提供类似于Google的全面搜索服务,同时也有网盘、知道等功能。 Bing:微软公司…...

iPhone 隔空投送使用指南:详细教程

本文介绍了如何在iPhone上使用隔空投送,包括如何在iOS 11到iOS 14的iPhone上启用它、发送文件以及接受或拒绝AirDrop发送给你的文件。对于iOS 7以上的旧款iPhone,提供了另一种方法。 如何打开隔空投送 你可以通过以下两种方式之一启动隔空投送功能:在“设置”应用程序或控…...

百度文心一言GPT免费入口也来了!!!

文心一言入口地址:文心一言能力全面开放 文心一言是百度全新一代知识增强大语言模型,文心大模型家族的新成员,能够与人对话互动,回答问题,协助创作,高效便捷地帮助人们获取信息、知识和灵感。 文心一言的技…...

线程调度和线程控制

在Java中,线程调度和线程控制是多线程编程中重要的概念,它们用于管理和控制线程的执行。以下是关于线程调度和线程控制的一些重要概念和技术: **1. 线程调度(Thread Scheduling): ** 线程调度是操作系统或Java虚拟机决定哪个线程在何时执行的过程。Java提供了多种线程调度…...

laravel excel导入导出

一、安装第三方 composer require maatwebsite/excel版本2.1和现在版本 有所不一样 二、导入 <?php namespace App\Import; use Maatwebsite\Excel\Concerns\ToCollection;class TestImport implements ToCollection {public function __construct(){}public function c…...

Windows无法删除分区怎么办?

我们知道Windows系统内置的磁盘管理工具是一个很实用的程序&#xff0c;可以帮助我们完成很多磁盘分区相关的基础操作&#xff0c;比如当我们想要删除硬盘上的某一个分区时&#xff0c;先想到的可能会是磁盘管理工具。但是当我们准备在磁盘管理工具中删除某个分区时&#xff0c…...

【请求报错:javax.net.ssl.SSLHandshakeException: No appropriate protocol】

1、问题描述 在请求服务时报错说SSL握手异常协议禁用啥的&#xff0c;而且我的连接数据库的url也加了useSSLfalse javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate)2、解决方法 在网上查找了方法…...

elementUI textarea可自适应文本高度的文本域

效果图; 通过设置 autosize 属性可以使得文本域的高度能够根据文本内容自动进行调整&#xff0c;并且 autosize 还可以设定为一个对象&#xff0c;指定最小行数和最大行数。 <el-inputtype"textarea"autosizeplaceholder"请输入内容"v-model"te…...

WebRTC-Streamer交叉编译

WebRTC-Streamer交叉编译 flyfish 文章目录 WebRTC-Streamer交叉编译零、前言一、提前准备工作1 安装需要的工具2 可选的交叉编译工具3 默认执行python是python34 获取源码5 使用其他版本的方法 二、非交叉编译编译1 在 src目录执行 安装所需的依赖2 执行命令 三、 交叉编译1 …...

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》

引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

服务器硬防的应用场景都有哪些?

服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式&#xff0c;避免服务器受到各种恶意攻击和网络威胁&#xff0c;那么&#xff0c;服务器硬防通常都会应用在哪些场景当中呢&#xff1f; 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

家政维修平台实战20:权限设计

目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系&#xff0c;主要是分成几个表&#xff0c;用户表我们是记录用户的基础信息&#xff0c;包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题&#xff0c;不同的角色&#xf…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

springboot整合VUE之在线教育管理系统简介

可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生&#xff0c;小白用户&#xff0c;想学习知识的 有点基础&#xff0c;想要通过项…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

PHP 8.5 即将发布:管道操作符、强力调试

前不久&#xff0c;PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5&#xff01;作为 PHP 语言的又一次重要迭代&#xff0c;PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是&#xff0c;借助强大的本地开发环境 ServBay&am…...