php:使用Ratchet类实现分布式websocket服务
一、前言
最近需要做一个有关聊天的小程序,逻辑很简单,所以不打算用Swoole和workerman之类的,最后选择了Ratchet,因为简单易用,适合小型websocket服务。
二、问题
但是目前我的项目是分布式环境,统一通过Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?这就涉及到了分布式websocket服务,但是我不希望太复杂,所以采用了消息队列的方式实现效果。
三、安装Ratchet类
直接使用composer安装,我用版本是"cboden/ratchet": "^0.2.8",比较老了。
composer require cboden/ratchet
四、创建websocket服务
因为php是同步阻塞型语言,通常每次请求都会从头到尾执行完成,在PHP中,所有的代码都按照顺序执行,直到脚本结束为止。但是我们要在一个进程中启动websocket服务还要监听消息队列,就需要用到ratchet中的事件循环机制,实现异步非阻塞通信效果。
<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use React\EventLoop\Factory as LoopFactory;
use React\Socket\Server as ReactSocket;set_time_limit(0);
ini_set('default_socket_timeout', -1);
/*** Websocket_Server*/
class ControllerWebsocket_Server
{public function indexAction(){try {$port = 8083;// 创建事件循环(使用该机制实现异步非阻塞通信)$loop = LoopFactory::create();// 创建 React Socket 服务器$socket = new ReactSocket($loop);$socket->listen($port, '0.0.0.0'); // 指定监听的端口和地址// 启动 WebSocket 服务器$server = new IoServer(new WsServer(new \ModelWebsocket_Handler($loop)),$socket,$loop);// 启动事件循环$loop->run();} catch (\Exception $e) {echo $e->getMessage();}}
}
其中ModelWebsocket_Handler是封装好的websocket操作类
<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;/*** websocket服务端-相关操作*/
class ModelWebsocket_Handler implements MessageComponentInterface {//数据缓存const REDIS_KEY_RESOURCE_DATA_MAP = 'h:websocket:resource:data:map';//客户端public $clients;public function __construct($loop) {$this->clients = new \SplObjectStorage();$this->subscribeMessage($loop);}/*** 连接建立时的逻辑*/public function onOpen(ConnectionInterface $conn) {$this->clients->attach($conn);echo "New connection! ({$conn->resourceId})\n";//获取连接请求的参数$params = [];$queryString = $conn->WebSocket->request->getQuery();parse_str($queryString, $params);//存储资源id相关数据$this->setResourceDataMap($conn->resourceId, $params);}/*** 收到消息时的逻辑*/public function onMessage(ConnectionInterface $from, $msg) {echo "Received message: {$msg}\n";foreach ($this->clients as $client) {if ($client === $from) {continue;}//发送消息$client->send($msg);}}/*** 连接关闭时的逻辑*/public function onClose(ConnectionInterface $conn) {$this->delResourceDataMap($conn->resourceId);$this->clients->detach($conn);echo "Connection {$conn->resourceId} has disconnected\n";}/*** 错误处理逻辑*/public function onError(ConnectionInterface $conn, \Exception $e) {echo "An error occurred: {$e->getMessage()}\n";$this->delResourceDataMap($conn->resourceId);$conn->close();}/*** 存储资源id相关数据* * @param string $resourceId* @param array $data* @return bool*/public function setResourceDataMap($resourceId, $data) {$redis = Comm_Redis::init(Comm_Redis::REDIS_TVDB, true);$rs = $redis->hSet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId, json_encode($data));return $rs;}/*** 获取资源id相关数据* * @param string $resourceId* @return array*/public function getResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hGet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return json_decode($rs, true) ?: [];}/*** 删除资源id相关数据* * @param string $resourceId* @return bool*/public function delResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hDel(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return $rs;}/*** 订阅消息*/public function subscribeMessage($loop){$loop->addPeriodicTimer(1, function () {//在这里可以使用redis订阅消息、也可以使用kafka消费消息,然后再比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用foreach ($this->clients as $client) {$client->send("测试");} });}
}
其中:subscribeMessage方法监听消息队列,收到消息之后比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用。
当然如果你能直接找到用户所连接的服务器,并且可以直接推给相应的服务器,那更好,可以节省流量开销和一些额外的逻辑处理。
相关文章:
php:使用Ratchet类实现分布式websocket服务
一、前言 最近需要做一个有关聊天的小程序,逻辑很简单,所以不打算用Swoole和workerman之类的,最后选择了Ratchet,因为简单易用,适合小型websocket服务。 二、问题 但是目前我的项目是分布式环境,统一通过Ng…...
储能场站安全风险挑战
电化学储能目前最大的痛点问题就是安全问题,制约了储能行业的发展。 首先:锂作为最活泼的金属加上有机溶剂的电解液,安全性天生就差。基因不行。 其次储能系统的BMS对电池管理相对粗放,不足以保证锂电池的安全运行。 当前储能产业…...
Ubuntu系统为同一逻辑网口配置不同网段的IP
近期遇到一个问题:机载计算机的载版上有两个网口,但是这两个网口本质上是一个独立网口一个交换机,即对于机载计算机而言这两个物理网口是同一个逻辑网口。但是我需要将这两个网口分别连接到两个设备,并配置不同网段的IP࿰…...
MySQL出现Waiting for table metadata lock的原因以及解决方法(已亲测)
参考:MySQL出现Waiting for table metadata lock的原因以及解决方法 - digdeep - 博客园 当对表执行truncate\drop 操作时,会出现一直处于等待的状态,通过show processlist可以看到TableA停滞在Waiting for table metadata lock的状态。kill…...
学会Lambda,让程序Pythonic一点
Lambda是Python里的高阶用法,要把代码写得Pythonic,就需要了解这些高阶用法,想说自己是一名真正的Python程序员,先要把代码写得Pythonic。 今天聊下Lambda的用法,写篇简短的用法说明。 Lambda是匿名函数的意思&#…...
GDPU 信息安全 期末复习
文章目录 第一章 绪论✅ 单选题✅ 简答题6. 假定你是单位的安全主管,为了提高单位的网络安全性,在制定单位的安全保障方案时,有哪些措施(包括技术和非技术的)?9. 有人说只要我有足够多的钱,就可…...
Python 使用 Token 认证方案连接 Kubernetes (k8s) 的详细过程
在 Kubernetes 中,使用 Token 认证是一种常见的客户端身份验证方式,尤其适用于 ServiceAccount。以下是详细的步骤,包括如何查看 Token、获取 API 服务地址、配置远程连接,以及如何在 Python 中连接 k8s。 1. 获取 Token 首先&a…...
【C++】ReadFile概述,及实践使用时ReadFile的速率影响研究
ReadFile 函数概述 ReadFile 是 Windows API 函数,用于从文件或设备(如串口、硬盘等)中读取数据。它是同步和异步 I/O 操作的基础函数。 函数原型 BOOL ReadFile(_In_ HANDLE hFile, // 文件或设备句柄_Out_write…...
Mysql的UPDATE(更新数据)详解
MySQL的UPDATE语句是用于修改数据库表中已存在的记录,本文将详细介绍UPDATE语句的基本语法、高级用法、性能优化策略以及注意事项,帮助您更好地理解和应用这一重要的SQL命令。 1. 基本语法 单表更新 单表更新的基本语法如下: UPDATE [LOW…...
基于Java Springboot高校奖助学金系统
一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术:Html、Css、Js、Vue、Element-ui 数据库:MySQL 后端技术:Java、Spring Boot、MyBatis 三、运行环境 开发工具:IDEA/eclipse 数据…...
如何在 Ubuntu 22.04 上安装带有 Nginx 的 ELK Stack
今天我们来聊聊如何在 Ubuntu 22.04 服务器上安装 ELK Stack,并集成 Nginx 作为 Web 服务器,同时使用 Let’s Encrypt Certbot 进行 SSL 认证。ELK Stack,包括 Elasticsearch、Logstash 和 Kibana,是一套强大的工具,用…...
Python爬虫:深入探索1688关键词接口获取之道
在数字化经济的浪潮中,数据的价值愈发凸显,尤其是在电商领域。对于电商平台而言,关键词不仅是搜索流量的入口,也是洞察市场趋势、优化营销策略的重要工具。1688作为中国领先的B2B电商平台,其关键词接口的获取对于商家来…...
Let‘s Encrypt SSL证书:acmessl.cn申请免费3个月证书
目录 一、CA机构 二、Lets Encrypt特点 三、申请SSL 一、CA机构 Lets Encrypt是一个由非营利组织Internet Security Research Group (ISRG)运营的证书颁发机构(CA),旨在通过自动化和开放的方式为全球网站提供免费、可靠的SSL/TLS证书。…...
JSON Web Token (JWT)的简单介绍、验证过程及令牌刷新思路
目录 一、JWT 1、什么是Jwt 2、为什么要使用Jwt 3、应用场景 4.Jwt的组成 4.1、Header 4.2、Payload 4.3、signature 二、Jwt验证过程 1、生成Jwt令牌 2、解析旧的Jwt 3、复制Jwt 4、Jwt有效时间测试 三、Jwt令牌刷新思路 1、配置JwtFilter过滤器 2、登录生成Jwt令…...
xxl-job入门
xxl-job , 定时任务 分布式 , 带来的问题的 解决方案 像之前 很多项目都用到定时任务, 但是如果要改为 分布式, 那么定时任务 就要用到 xxl-job 1.用户画像 拼多多,看了某个东西后,推荐类似东西, 做埋…...
100.【C语言】数据结构之二叉树的堆实现(顺序结构) 1
目录 1.顺序结构 2.示意图 编辑 从物理结构还原为逻辑结构的方法 3.父子节点编号的规律 4.顺序存储的前提条件 5.堆的简介 堆的定义 堆的两个重要性质 小根堆和大根堆 6.堆的插入 7.堆的实现及操作堆的函数 堆的结构体定义 堆初始化函数HeapInit 堆插入元素函…...
大模型 VS 大语言模型
最近很多朋友搞不懂大模型和大预言模型的区别,总是把大模型就认为是大语言模型。 今天就用这篇帖子做一个科普。 大模型 概念:大模型是指拥有超大规模参数(通常在十亿个以上)、复杂计算结构的机器学习模型。它通常能够处理海量数…...
Linux高阶——1117—TCP客户端服务端
目录 1、sock.h socket常用函数 网络初始化函数 首次响应函数 测试IO处理函数 获取时间函数 总代码 2、sock.c SOCKET() ACCEPT()——服务端使用这个函数等待客户端连接 CONNECT()——客户端使用这个函数连接服务端 BIND()——一般只有服务端使用 LISTEN()——服务端…...
【Qt】Qt 在main.cpp中使用tr()函数报错
1. 问题 Qt 在main.cpp中使用tr()报错。 error: tr was not declared in this scope2. 解决方法 main.cpp中注意如下: //添加头文件 #include <QObject>//添加QObject QObject::tr("Hello")3. 参考 Qt tr()函数不起效的小问题...
面向对象高级(5)接口
面向对象高级(5) 接口 接口就是规范,定义的是一组规则,体现了现实世界中“如果是...则必须能...”的思想。继承是一个"是不是"的is-a关系,而接口实现则是 "能不能"的has-a关系。 1、接口的定义格…...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...
Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...
算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...
RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
Golang——6、指针和结构体
指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...
深入理解Optional:处理空指针异常
1. 使用Optional处理可能为空的集合 在Java开发中,集合判空是一个常见但容易出错的场景。传统方式虽然可行,但存在一些潜在问题: // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...
