当前位置: 首页 > news >正文

php:使用Ratchet类实现分布式websocket服务

一、前言

        最近需要做一个有关聊天的小程序,逻辑很简单,所以不打算用Swoole和workerman之类的,最后选择了Ratchet,因为简单易用,适合小型websocket服务。

二、问题

        但是目前我的项目是分布式环境,统一通过Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?这就涉及到了分布式websocket服务,但是我不希望太复杂,所以采用了消息队列的方式实现效果。

三、安装Ratchet类

直接使用composer安装,我用版本是"cboden/ratchet": "^0.2.8",比较老了。

composer require cboden/ratchet

四、创建websocket服务

因为php是同步阻塞型语言,通常每次请求都会从头到尾执行完成,在PHP中,所有的代码都按照顺序执行,直到脚本结束为止。但是我们要在一个进程中启动websocket服务还要监听消息队列,就需要用到ratchet中的事件循环机制,实现异步非阻塞通信效果。

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use React\EventLoop\Factory as LoopFactory;
use React\Socket\Server as ReactSocket;set_time_limit(0);
ini_set('default_socket_timeout', -1);
/*** Websocket_Server*/
class ControllerWebsocket_Server
{public function indexAction(){try {$port = 8083;// 创建事件循环(使用该机制实现异步非阻塞通信)$loop = LoopFactory::create();// 创建 React Socket 服务器$socket = new ReactSocket($loop);$socket->listen($port, '0.0.0.0'); // 指定监听的端口和地址// 启动 WebSocket 服务器$server = new IoServer(new WsServer(new \ModelWebsocket_Handler($loop)),$socket,$loop);// 启动事件循环$loop->run();} catch (\Exception $e) {echo $e->getMessage();}}
}

其中ModelWebsocket_Handler是封装好的websocket操作类

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;/*** websocket服务端-相关操作*/
class ModelWebsocket_Handler implements MessageComponentInterface {//数据缓存const REDIS_KEY_RESOURCE_DATA_MAP = 'h:websocket:resource:data:map';//客户端public $clients;public function __construct($loop) {$this->clients = new \SplObjectStorage();$this->subscribeMessage($loop);}/*** 连接建立时的逻辑*/public function onOpen(ConnectionInterface $conn) {$this->clients->attach($conn);echo "New connection! ({$conn->resourceId})\n";//获取连接请求的参数$params = [];$queryString = $conn->WebSocket->request->getQuery();parse_str($queryString, $params);//存储资源id相关数据$this->setResourceDataMap($conn->resourceId, $params);}/*** 收到消息时的逻辑*/public function onMessage(ConnectionInterface $from, $msg) {echo "Received message: {$msg}\n";foreach ($this->clients as $client) {if ($client === $from) {continue;}//发送消息$client->send($msg);}}/*** 连接关闭时的逻辑*/public function onClose(ConnectionInterface $conn) {$this->delResourceDataMap($conn->resourceId);$this->clients->detach($conn);echo "Connection {$conn->resourceId} has disconnected\n";}/*** 错误处理逻辑*/public function onError(ConnectionInterface $conn, \Exception $e) {echo "An error occurred: {$e->getMessage()}\n";$this->delResourceDataMap($conn->resourceId);$conn->close();}/*** 存储资源id相关数据* * @param  string  $resourceId* @param  array   $data* @return bool*/public function setResourceDataMap($resourceId, $data) {$redis = Comm_Redis::init(Comm_Redis::REDIS_TVDB, true);$rs = $redis->hSet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId, json_encode($data));return $rs;}/*** 获取资源id相关数据* * @param  string  $resourceId* @return array*/public function getResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hGet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return json_decode($rs, true) ?: [];}/*** 删除资源id相关数据* * @param  string  $resourceId* @return bool*/public function delResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hDel(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return $rs;}/*** 订阅消息*/public function subscribeMessage($loop){$loop->addPeriodicTimer(1, function () {//在这里可以使用redis订阅消息、也可以使用kafka消费消息,然后再比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用foreach ($this->clients as $client) {$client->send("测试");} });}
}

其中:subscribeMessage方法监听消息队列,收到消息之后比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用。

当然如果你能直接找到用户所连接的服务器,并且可以直接推给相应的服务器,那更好,可以节省流量开销和一些额外的逻辑处理。

相关文章:

php:使用Ratchet类实现分布式websocket服务

一、前言 最近需要做一个有关聊天的小程序&#xff0c;逻辑很简单&#xff0c;所以不打算用Swoole和workerman之类的&#xff0c;最后选择了Ratchet&#xff0c;因为简单易用&#xff0c;适合小型websocket服务。 二、问题 但是目前我的项目是分布式环境&#xff0c;统一通过Ng…...

储能场站安全风险挑战

电化学储能目前最大的痛点问题就是安全问题&#xff0c;制约了储能行业的发展。 首先&#xff1a;锂作为最活泼的金属加上有机溶剂的电解液&#xff0c;安全性天生就差。基因不行。 其次储能系统的BMS对电池管理相对粗放&#xff0c;不足以保证锂电池的安全运行。 当前储能产业…...

Ubuntu系统为同一逻辑网口配置不同网段的IP

近期遇到一个问题&#xff1a;机载计算机的载版上有两个网口&#xff0c;但是这两个网口本质上是一个独立网口一个交换机&#xff0c;即对于机载计算机而言这两个物理网口是同一个逻辑网口。但是我需要将这两个网口分别连接到两个设备&#xff0c;并配置不同网段的IP&#xff0…...

MySQL出现Waiting for table metadata lock的原因以及解决方法(已亲测)

参考&#xff1a;MySQL出现Waiting for table metadata lock的原因以及解决方法 - digdeep - 博客园 当对表执行truncate\drop 操作时&#xff0c;会出现一直处于等待的状态&#xff0c;通过show processlist可以看到TableA停滞在Waiting for table metadata lock的状态。kill…...

学会Lambda,让程序Pythonic一点

Lambda是Python里的高阶用法&#xff0c;要把代码写得Pythonic&#xff0c;就需要了解这些高阶用法&#xff0c;想说自己是一名真正的Python程序员&#xff0c;先要把代码写得Pythonic。 今天聊下Lambda的用法&#xff0c;写篇简短的用法说明。 Lambda是匿名函数的意思&#…...

GDPU 信息安全 期末复习

文章目录 第一章 绪论✅ 单选题✅ 简答题6. 假定你是单位的安全主管&#xff0c;为了提高单位的网络安全性&#xff0c;在制定单位的安全保障方案时&#xff0c;有哪些措施&#xff08;包括技术和非技术的&#xff09;&#xff1f;9. 有人说只要我有足够多的钱&#xff0c;就可…...

Python 使用 Token 认证方案连接 Kubernetes (k8s) 的详细过程

在 Kubernetes 中&#xff0c;使用 Token 认证是一种常见的客户端身份验证方式&#xff0c;尤其适用于 ServiceAccount。以下是详细的步骤&#xff0c;包括如何查看 Token、获取 API 服务地址、配置远程连接&#xff0c;以及如何在 Python 中连接 k8s。 1. 获取 Token 首先&a…...

【C++】ReadFile概述,及实践使用时ReadFile的速率影响研究

ReadFile 函数概述 ReadFile 是 Windows API 函数&#xff0c;用于从文件或设备&#xff08;如串口、硬盘等&#xff09;中读取数据。它是同步和异步 I/O 操作的基础函数。 函数原型 BOOL ReadFile(_In_ HANDLE hFile, // 文件或设备句柄_Out_write…...

Mysql的UPDATE(更新数据)详解

MySQL的UPDATE语句是用于修改数据库表中已存在的记录&#xff0c;本文将详细介绍UPDATE语句的基本语法、高级用法、性能优化策略以及注意事项&#xff0c;帮助您更好地理解和应用这一重要的SQL命令。 1. 基本语法 单表更新 单表更新的基本语法如下&#xff1a; UPDATE [LOW…...

基于Java Springboot高校奖助学金系统

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据…...

如何在 Ubuntu 22.04 上安装带有 Nginx 的 ELK Stack

今天我们来聊聊如何在 Ubuntu 22.04 服务器上安装 ELK Stack&#xff0c;并集成 Nginx 作为 Web 服务器&#xff0c;同时使用 Let’s Encrypt Certbot 进行 SSL 认证。ELK Stack&#xff0c;包括 Elasticsearch、Logstash 和 Kibana&#xff0c;是一套强大的工具&#xff0c;用…...

Python爬虫:深入探索1688关键词接口获取之道

在数字化经济的浪潮中&#xff0c;数据的价值愈发凸显&#xff0c;尤其是在电商领域。对于电商平台而言&#xff0c;关键词不仅是搜索流量的入口&#xff0c;也是洞察市场趋势、优化营销策略的重要工具。1688作为中国领先的B2B电商平台&#xff0c;其关键词接口的获取对于商家来…...

Let‘s Encrypt SSL证书:acmessl.cn申请免费3个月证书

目录 一、CA机构 二、Lets Encrypt特点 三、申请SSL 一、CA机构 ‌Lets Encrypt‌是一个由非营利组织Internet Security Research Group (ISRG)运营的证书颁发机构&#xff08;CA&#xff09;&#xff0c;旨在通过自动化和开放的方式为全球网站提供免费、可靠的SSL/TLS证书。…...

JSON Web Token (JWT)的简单介绍、验证过程及令牌刷新思路

目录 一、JWT 1、什么是Jwt 2、为什么要使用Jwt 3、应用场景 4.Jwt的组成 4.1、Header 4.2、Payload 4.3、signature 二、Jwt验证过程 1、生成Jwt令牌 2、解析旧的Jwt 3、复制Jwt 4、Jwt有效时间测试 三、Jwt令牌刷新思路 1、配置JwtFilter过滤器 2、登录生成Jwt令…...

xxl-job入门

xxl-job , 定时任务 分布式 &#xff0c; 带来的问题的 解决方案 像之前 很多项目都用到定时任务&#xff0c; 但是如果要改为 分布式&#xff0c; 那么定时任务 就要用到 xxl-job 1.用户画像 拼多多&#xff0c;看了某个东西后&#xff0c;推荐类似东西&#xff0c; 做埋…...

100.【C语言】数据结构之二叉树的堆实现(顺序结构) 1

目录 1.顺序结构 2.示意图 ​编辑 从物理结构还原为逻辑结构的方法 3.父子节点编号的规律 4.顺序存储的前提条件 5.堆的简介 堆的定义 堆的两个重要性质 小根堆和大根堆 6.堆的插入 7.堆的实现及操作堆的函数 堆的结构体定义 堆初始化函数HeapInit 堆插入元素函…...

大模型 VS 大语言模型

最近很多朋友搞不懂大模型和大预言模型的区别&#xff0c;总是把大模型就认为是大语言模型。 今天就用这篇帖子做一个科普。 大模型 概念&#xff1a;大模型是指拥有超大规模参数&#xff08;通常在十亿个以上&#xff09;、复杂计算结构的机器学习模型。它通常能够处理海量数…...

Linux高阶——1117—TCP客户端服务端

目录 1、sock.h socket常用函数 网络初始化函数 首次响应函数 测试IO处理函数 获取时间函数 总代码 2、sock.c SOCKET() ACCEPT()——服务端使用这个函数等待客户端连接 CONNECT()——客户端使用这个函数连接服务端 BIND()——一般只有服务端使用 LISTEN()——服务端…...

【Qt】Qt 在main.cpp中使用tr()函数报错

1. 问题 Qt 在main.cpp中使用tr()报错。 error: tr was not declared in this scope2. 解决方法 main.cpp中注意如下&#xff1a; //添加头文件 #include <QObject>//添加QObject QObject::tr("Hello")3. 参考 Qt tr()函数不起效的小问题...

面向对象高级(5)接口

面向对象高级&#xff08;5&#xff09; 接口 接口就是规范&#xff0c;定义的是一组规则&#xff0c;体现了现实世界中“如果是...则必须能...”的思想。继承是一个"是不是"的is-a关系&#xff0c;而接口实现则是 "能不能"的has-a关系。 1、接口的定义格…...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

测试markdown--肇兴

day1&#xff1a; 1、去程&#xff1a;7:04 --11:32高铁 高铁右转上售票大厅2楼&#xff0c;穿过候车厅下一楼&#xff0c;上大巴车 &#xffe5;10/人 **2、到达&#xff1a;**12点多到达寨子&#xff0c;买门票&#xff0c;美团/抖音&#xff1a;&#xffe5;78人 3、中饭&a…...

家政维修平台实战20:权限设计

目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系&#xff0c;主要是分成几个表&#xff0c;用户表我们是记录用户的基础信息&#xff0c;包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题&#xff0c;不同的角色&#xf…...

AspectJ 在 Android 中的完整使用指南

一、环境配置&#xff08;Gradle 7.0 适配&#xff09; 1. 项目级 build.gradle // 注意&#xff1a;沪江插件已停更&#xff0c;推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...