当前位置: 首页 > news >正文

【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)

【通用消息通知服务】0x3 - 发送我们第一条消息

项目地址: A generic message notification system[Github]

实现接收/发送Websocket消息

Websocket Connection Pool

import asyncio
from asyncio.queues import Queue
from asyncio.queues import QueueEmpty
from contextlib import suppress
from typing import Anyimport async_timeout
import orjson
from sanic.log import logger
from ulid import ULIDfrom common.depend import DependencyPING = "#ping"
PONG = "#pong"class WebsocketConnectionPoolDependency(Dependency, dependency_name="WebsocketPool", dependency_alias="ws_pool"
):def __init__(self, app) -> None:super().__init__(app)self.lock = asyncio.Lock()self.connections = {}	# 存储websocket connectionsself.send_queues = {}   # 各websocket发送队列self.recv_queues = {}   # 各websocket接收消息队列self.close_callbacks = {} # websocket销毁回调self.listeners = {} # 连接监听函数def _gen_id(self) -> str:return str(ULID())async def add_connection(self, connection) -> str:async with self.lock:id = self._gen_id()self.connections[id] = connectionself.send_queues[id] = Queue()self.app.add_task(self.send_task(self.send_queues[id], connection),name=f"websocket_{id}_send_task",)self.recv_queues[id] = Queue()self.app.add_task(self.recv_task(self.recv_queues[id], connection),name=f"websocket_{id}_recv_task",)self.app.add_task(self.notify_task(id), name=f"websocket_{id}_notify_task")self.app.add_task(self.is_alive_task(id), name=f"websocket_{id}_is_alive_task")setattr(connection, "_id", id)return connection._iddef get_connection(self, connection_id: str):return self.connections.get(connection_id)async def add_listener(self, connection_id, handler) -> str:async with self.lock:id = self._gen_id()self.listeners.setdefault(connection_id, {}).update({id: handler})return idasync def remove_listener(self, connection_id, listener_id):async with self.lock:self.listeners.get(connection_id, {}).pop(listener_id, None)async def add_close_callback(self, connection_id, callback):async with self.lock:self.close_callbacks.setdefault(connection_id, []).append(callback)def is_alive(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idreturn connection_id in self.connectionsasync def remove_connection(self, connection: Any):if hasattr(connection, "_id"):connection_id = connection._idelse:connection_id = connectionif connection_id not in self.connections:# removed alreadyreturnasync with self.lock:logger.info(f"remove connection: {connection_id}")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_send_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_recv_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_notify_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_is_alive_task")if connection_id in self.send_queues:del self.send_queues[connection_id]if connection_id in self.recv_queues:del self.recv_queues[connection_id]if connection_id in self.listeners:del self.listeners[connection_id]if connection_id in self.close_callbacks:await self.do_close_callbacks(connection_id)del self.close_callbacks[connection_id]if connection_id in self.connections:del self.connections[connection_id]async def do_close_callbacks(self, connection_id):for cb in self.close_callbacks.get(connection_id, []):self.app.add_task(cb(connection_id))async def prepare(self):self.is_prepared = Truelogger.info("dependency:WebsocketPool is prepared")return self.is_preparedasync def check(self):return Trueasync def send_task(self, queue, connection):while self.is_alive(connection):try:data = queue.get_nowait()except QueueEmpty:await asyncio.sleep(0)continuetry:if isinstance(data, (bytes, str, int)):await connection.send(data)else:await connection.send(orjson.dumps(data).decode())queue.task_done()except Exception as err:breakasync def recv_task(self, queue, connection):while self.is_alive(connection):try:data = await connection.recv()await queue.put(data)logger.info(f"recv message: {data} from connection: {connection._id}")except Exception as err:breakasync def notify_task(self, connection_id):while self.is_alive(connection_id):try:logger.info(f"notify connection: {connection_id}'s listeners")data = await self.recv_queues[connection_id].get()for listener in self.listeners.get(connection_id, {}).values():await listener(connection_id, data)except Exception as err:passasync def is_alive_task(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idget_pong = asyncio.Event()async def wait_pong(connection_id, data):if data != PONG:returnget_pong.set()while True:get_pong.clear()await self.send(connection_id, PING)listener_id = await self.add_listener(connection_id, wait_pong)with suppress(asyncio.TimeoutError):async with async_timeout.timeout(self.app.config.WEBSOCKET_PING_TIMEOUT):await get_pong.wait()await self.remove_listener(connection_id, listener_id)if get_pong.is_set():# this connection is closedawait asyncio.sleep(self.app.config.WEBSOCKET_PING_INTERVAL)else:await self.remove_connection(connection_id)async def wait_closed(self, connection_id: str):"""if negative=True, only release when client close this connection."""while self.is_alive(connection_id):await asyncio.sleep(0)return Falseasync def send(self, connection_id: str, data: Any) -> bool:if not self.is_alive(connection_id):return Falseif connection_id not in self.send_queues:return Falseawait self.send_queues[connection_id].put(data)return True

Websocket Provider

from typing import Dict
from typing import List
from typing import Unionfrom pydantic import BaseModel
from pydantic import field_serializer
from sanic.log import loggerfrom apps.message.common.constants import MessageProviderType
from apps.message.common.constants import MessageStatus
from apps.message.common.interfaces import SendResult
from apps.message.providers.base import MessageProviderModel
from apps.message.validators.types import EndpointExID
from apps.message.validators.types import EndpointTag
from apps.message.validators.types import ETag
from apps.message.validators.types import ExID
from utils import get_appclass WebsocketMessageProviderModel(MessageProviderModel):class Info:name = "websocket"description = "Bio-Channel Communication"type = MessageProviderType.WEBSOCKETclass Capability:is_enabled = Truecan_send = Trueclass Message(BaseModel):connections: List[Union[EndpointTag, EndpointExID, str]]action: strpayload: Union[List, Dict, str, bytes]@field_serializer("connections")def serialize_connections(self, connections):return list(set(map(str, connections)))async def send(self, provider_id, message: Message) -> SendResult:app = get_app()websocket_pool = app.ctx.ws_poolsent_list = set()connections = []for connection in message.connections:if isinstance(connection, ETag):connections.extend([wfor c in await connection.decode()for w in c.get("websockets", [])])elif isinstance(connection, ExID):endpoint = await connection.decode()if endpoint:connections.extend(endpoint.get("websockets", []))else:connections.append(connection)connections = list(set(filter(lambda x: app.ctx.ws_pool.is_alive(connection), connections)))# logger.info(f"sending websocket message to {connections}")for connection in connections:if await websocket_pool.send(connection, data=message.model_dump_json(exclude=["connections"])):sent_list.add(connection)if sent_list:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.SUCCEEDED)else:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.FAILED)

websocket接口


@app.websocket("/websocket")
async def handle_websocket(request, ws):from apps.endpoint.listeners import register_websocket_endpointfrom apps.endpoint.listeners import unregister_websocket_endpointcon_id = Nonetry:ctx = request.app.ctxcon_id = await ctx.ws_pool.add_connection(ws)logger.info(f"new connection connected -> {con_id}")await ctx.ws_pool.add_listener(con_id, register_websocket_endpoint)await ctx.ws_pool.add_close_callback(con_id, unregister_websocket_endpoint)await ctx.ws_pool.send(con_id, data={"action": "on.connect", "payload": {"connection_id": con_id}})await ctx.ws_pool.wait_closed(con_id) # 等待连接断开finally:# 如果连接被客户端断开, handle_websocket将会被直接销毁, 所以销毁处理需要放在finally。request.app.add_task(request.app.ctx.ws_pool.remove_connection(con_id))

结果截图

websocket connected

相关文章:

【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)

【通用消息通知服务】0x3 - 发送我们第一条消息 项目地址: A generic message notification system[Github] 实现接收/发送Websocket消息 Websocket Connection Pool import asyncio from asyncio.queues import Queue from asyncio.queues import QueueEmpty from contextli…...

Eclipse打jar包与JavaDOC文档的生成

补充知识点——Eclipse打jar包与JavaDOC文档的生成 1、Eclipse如何打jar包,如何运行jar包 Java当中编写的Java代码,Java类、方法、接口这些东西就是项目中相关内容,到时候我们需要把代码提供给甲方、或者是我们需要运行我们编写的代码&…...

力扣:80. 删除有序数组中的重复项 II(Python3)

题目: 给你一个有序数组 nums ,请你 原地 删除重复出现的元素,使得出现次数超过两次的元素只出现两次 ,返回删除后数组的新长度。 不要使用额外的数组空间,你必须在 原地 修改输入数组 并在使用 O(1) 额外空间的条件下…...

linux:需要注意docker和aws的rds的mysql默认是UTC而不是中国时区

问题: 如题 解决办法: docker参考: mysql时间不对,修改时区_set global time_zone 无效_《小书生》的博客-CSDN博客 aws参考: https://www.youtube.com/watch?vB-NaqV-A1BY mysql - AWS修改RDS时区 - 个人文章 - Segm…...

访问 GitHub 方法

访问 GitHub 方法 方法一:最常见的就是 fq,但这个是违法的行为,自己私下搞可以,不能教你们。 方法二:利用加速器,这是正规合法操作。这里推荐一个免费的加速器,下载安装 Watt Toolkit加速器,原名…...

旅游APP外包开发注意事项

旅游类APP通常具有多种功能,以提供给用户更好的旅行体验。以下分享常见的旅游类APP功能以及在开发和使用这些APP时需要注意的问题,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 常见功能…...

ROS机器人编程---------(二)ROS中的核心概念

ROS机器人编程 ROS中的核心概念 ROS的通信机制 在ROS中结点是最小单元,比如说机器人的遥控器可以作为一个控制结点,机器人上的摄像头也可以看作一个结点,ROS通过协调各个结点来实现 在启动任何ROS结点之前,都必须先启动ROS Mas…...

Python学习教程:进程的调度

前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 要想多个进程交替运行,操作系统必须对这些进程进行调度, 这个调度也不是随即进行的,而是需要遵循一定的法则,由此就有了进程的调度算法。 python更多源码/资料/解答/教程等 …...

ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案

ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案 本文是ElasticSearch第三讲,在了解ElaticSearch之后,我们还要了解Elastic背后的生态 即我们常说的ELK;与此同时,还会给你展示ElasticSearch的案例场景&…...

基于Java+SpringBoot+Vue前后端分离农商对接系统设计和实现

博主介绍:✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专…...

【模方ModelFun】实景三维建模和修模4.0.7最新版安装包以及图文安装教程

模方ModelFun 具有多种功能,旨在帮助用户进行实景三维建模和修模。以下是一些主要功能的简要介绍: 实景三维建模:【模方ModelFun】提供了自动化的实景三维重建功能,可以从实景图像中提取几何形状和纹理信息,生成高质量…...

介绍几个搜索引擎

Google:全球最大的搜索引擎,提供全面的搜索服务,包括网页、图片、视频、新闻、地图等。 Baidu:中国最大的搜索引擎,提供类似于Google的全面搜索服务,同时也有网盘、知道等功能。 Bing:微软公司…...

iPhone 隔空投送使用指南:详细教程

本文介绍了如何在iPhone上使用隔空投送,包括如何在iOS 11到iOS 14的iPhone上启用它、发送文件以及接受或拒绝AirDrop发送给你的文件。对于iOS 7以上的旧款iPhone,提供了另一种方法。 如何打开隔空投送 你可以通过以下两种方式之一启动隔空投送功能:在“设置”应用程序或控…...

百度文心一言GPT免费入口也来了!!!

文心一言入口地址:文心一言能力全面开放 文心一言是百度全新一代知识增强大语言模型,文心大模型家族的新成员,能够与人对话互动,回答问题,协助创作,高效便捷地帮助人们获取信息、知识和灵感。 文心一言的技…...

线程调度和线程控制

在Java中,线程调度和线程控制是多线程编程中重要的概念,它们用于管理和控制线程的执行。以下是关于线程调度和线程控制的一些重要概念和技术: **1. 线程调度(Thread Scheduling): ** 线程调度是操作系统或Java虚拟机决定哪个线程在何时执行的过程。Java提供了多种线程调度…...

laravel excel导入导出

一、安装第三方 composer require maatwebsite/excel版本2.1和现在版本 有所不一样 二、导入 <?php namespace App\Import; use Maatwebsite\Excel\Concerns\ToCollection;class TestImport implements ToCollection {public function __construct(){}public function c…...

Windows无法删除分区怎么办?

我们知道Windows系统内置的磁盘管理工具是一个很实用的程序&#xff0c;可以帮助我们完成很多磁盘分区相关的基础操作&#xff0c;比如当我们想要删除硬盘上的某一个分区时&#xff0c;先想到的可能会是磁盘管理工具。但是当我们准备在磁盘管理工具中删除某个分区时&#xff0c…...

【请求报错:javax.net.ssl.SSLHandshakeException: No appropriate protocol】

1、问题描述 在请求服务时报错说SSL握手异常协议禁用啥的&#xff0c;而且我的连接数据库的url也加了useSSLfalse javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate)2、解决方法 在网上查找了方法…...

elementUI textarea可自适应文本高度的文本域

效果图; 通过设置 autosize 属性可以使得文本域的高度能够根据文本内容自动进行调整&#xff0c;并且 autosize 还可以设定为一个对象&#xff0c;指定最小行数和最大行数。 <el-inputtype"textarea"autosizeplaceholder"请输入内容"v-model"te…...

WebRTC-Streamer交叉编译

WebRTC-Streamer交叉编译 flyfish 文章目录 WebRTC-Streamer交叉编译零、前言一、提前准备工作1 安装需要的工具2 可选的交叉编译工具3 默认执行python是python34 获取源码5 使用其他版本的方法 二、非交叉编译编译1 在 src目录执行 安装所需的依赖2 执行命令 三、 交叉编译1 …...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

基础测试工具使用经验

背景 vtune&#xff0c;perf, nsight system等基础测试工具&#xff0c;都是用过的&#xff0c;但是没有记录&#xff0c;都逐渐忘了。所以写这篇博客总结记录一下&#xff0c;只要以后发现新的用法&#xff0c;就记得来编辑补充一下 perf 比较基础的用法&#xff1a; 先改这…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...

GitFlow 工作模式(详解)

今天再学项目的过程中遇到使用gitflow模式管理代码&#xff0c;因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存&#xff0c;无论是github还是gittee&#xff0c;都是一种基于git去保存代码的形式&#xff0c;这样保存代码…...

手机平板能效生态设计指令EU 2023/1670标准解读

手机平板能效生态设计指令EU 2023/1670标准解读 以下是针对欧盟《手机和平板电脑生态设计法规》(EU) 2023/1670 的核心解读&#xff0c;综合法规核心要求、最新修正及企业合规要点&#xff1a; 一、法规背景与目标 生效与强制时间 发布于2023年8月31日&#xff08;OJ公报&…...