当前位置: 首页 > news >正文

php:使用Ratchet类实现分布式websocket服务

一、前言

        最近需要做一个有关聊天的小程序,逻辑很简单,所以不打算用Swoole和workerman之类的,最后选择了Ratchet,因为简单易用,适合小型websocket服务。

二、问题

        但是目前我的项目是分布式环境,统一通过Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?这就涉及到了分布式websocket服务,但是我不希望太复杂,所以采用了消息队列的方式实现效果。

三、安装Ratchet类

直接使用composer安装,我用版本是"cboden/ratchet": "^0.2.8",比较老了。

composer require cboden/ratchet

四、创建websocket服务

因为php是同步阻塞型语言,通常每次请求都会从头到尾执行完成,在PHP中,所有的代码都按照顺序执行,直到脚本结束为止。但是我们要在一个进程中启动websocket服务还要监听消息队列,就需要用到ratchet中的事件循环机制,实现异步非阻塞通信效果。

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use React\EventLoop\Factory as LoopFactory;
use React\Socket\Server as ReactSocket;set_time_limit(0);
ini_set('default_socket_timeout', -1);
/*** Websocket_Server*/
class ControllerWebsocket_Server
{public function indexAction(){try {$port = 8083;// 创建事件循环(使用该机制实现异步非阻塞通信)$loop = LoopFactory::create();// 创建 React Socket 服务器$socket = new ReactSocket($loop);$socket->listen($port, '0.0.0.0'); // 指定监听的端口和地址// 启动 WebSocket 服务器$server = new IoServer(new WsServer(new \ModelWebsocket_Handler($loop)),$socket,$loop);// 启动事件循环$loop->run();} catch (\Exception $e) {echo $e->getMessage();}}
}

其中ModelWebsocket_Handler是封装好的websocket操作类

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;/*** websocket服务端-相关操作*/
class ModelWebsocket_Handler implements MessageComponentInterface {//数据缓存const REDIS_KEY_RESOURCE_DATA_MAP = 'h:websocket:resource:data:map';//客户端public $clients;public function __construct($loop) {$this->clients = new \SplObjectStorage();$this->subscribeMessage($loop);}/*** 连接建立时的逻辑*/public function onOpen(ConnectionInterface $conn) {$this->clients->attach($conn);echo "New connection! ({$conn->resourceId})\n";//获取连接请求的参数$params = [];$queryString = $conn->WebSocket->request->getQuery();parse_str($queryString, $params);//存储资源id相关数据$this->setResourceDataMap($conn->resourceId, $params);}/*** 收到消息时的逻辑*/public function onMessage(ConnectionInterface $from, $msg) {echo "Received message: {$msg}\n";foreach ($this->clients as $client) {if ($client === $from) {continue;}//发送消息$client->send($msg);}}/*** 连接关闭时的逻辑*/public function onClose(ConnectionInterface $conn) {$this->delResourceDataMap($conn->resourceId);$this->clients->detach($conn);echo "Connection {$conn->resourceId} has disconnected\n";}/*** 错误处理逻辑*/public function onError(ConnectionInterface $conn, \Exception $e) {echo "An error occurred: {$e->getMessage()}\n";$this->delResourceDataMap($conn->resourceId);$conn->close();}/*** 存储资源id相关数据* * @param  string  $resourceId* @param  array   $data* @return bool*/public function setResourceDataMap($resourceId, $data) {$redis = Comm_Redis::init(Comm_Redis::REDIS_TVDB, true);$rs = $redis->hSet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId, json_encode($data));return $rs;}/*** 获取资源id相关数据* * @param  string  $resourceId* @return array*/public function getResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hGet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return json_decode($rs, true) ?: [];}/*** 删除资源id相关数据* * @param  string  $resourceId* @return bool*/public function delResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hDel(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return $rs;}/*** 订阅消息*/public function subscribeMessage($loop){$loop->addPeriodicTimer(1, function () {//在这里可以使用redis订阅消息、也可以使用kafka消费消息,然后再比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用foreach ($this->clients as $client) {$client->send("测试");} });}
}

其中:subscribeMessage方法监听消息队列,收到消息之后比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用。

当然如果你能直接找到用户所连接的服务器,并且可以直接推给相应的服务器,那更好,可以节省流量开销和一些额外的逻辑处理。

相关文章:

php:使用Ratchet类实现分布式websocket服务

一、前言 最近需要做一个有关聊天的小程序&#xff0c;逻辑很简单&#xff0c;所以不打算用Swoole和workerman之类的&#xff0c;最后选择了Ratchet&#xff0c;因为简单易用&#xff0c;适合小型websocket服务。 二、问题 但是目前我的项目是分布式环境&#xff0c;统一通过Ng…...

储能场站安全风险挑战

电化学储能目前最大的痛点问题就是安全问题&#xff0c;制约了储能行业的发展。 首先&#xff1a;锂作为最活泼的金属加上有机溶剂的电解液&#xff0c;安全性天生就差。基因不行。 其次储能系统的BMS对电池管理相对粗放&#xff0c;不足以保证锂电池的安全运行。 当前储能产业…...

Ubuntu系统为同一逻辑网口配置不同网段的IP

近期遇到一个问题&#xff1a;机载计算机的载版上有两个网口&#xff0c;但是这两个网口本质上是一个独立网口一个交换机&#xff0c;即对于机载计算机而言这两个物理网口是同一个逻辑网口。但是我需要将这两个网口分别连接到两个设备&#xff0c;并配置不同网段的IP&#xff0…...

MySQL出现Waiting for table metadata lock的原因以及解决方法(已亲测)

参考&#xff1a;MySQL出现Waiting for table metadata lock的原因以及解决方法 - digdeep - 博客园 当对表执行truncate\drop 操作时&#xff0c;会出现一直处于等待的状态&#xff0c;通过show processlist可以看到TableA停滞在Waiting for table metadata lock的状态。kill…...

学会Lambda,让程序Pythonic一点

Lambda是Python里的高阶用法&#xff0c;要把代码写得Pythonic&#xff0c;就需要了解这些高阶用法&#xff0c;想说自己是一名真正的Python程序员&#xff0c;先要把代码写得Pythonic。 今天聊下Lambda的用法&#xff0c;写篇简短的用法说明。 Lambda是匿名函数的意思&#…...

GDPU 信息安全 期末复习

文章目录 第一章 绪论✅ 单选题✅ 简答题6. 假定你是单位的安全主管&#xff0c;为了提高单位的网络安全性&#xff0c;在制定单位的安全保障方案时&#xff0c;有哪些措施&#xff08;包括技术和非技术的&#xff09;&#xff1f;9. 有人说只要我有足够多的钱&#xff0c;就可…...

Python 使用 Token 认证方案连接 Kubernetes (k8s) 的详细过程

在 Kubernetes 中&#xff0c;使用 Token 认证是一种常见的客户端身份验证方式&#xff0c;尤其适用于 ServiceAccount。以下是详细的步骤&#xff0c;包括如何查看 Token、获取 API 服务地址、配置远程连接&#xff0c;以及如何在 Python 中连接 k8s。 1. 获取 Token 首先&a…...

【C++】ReadFile概述,及实践使用时ReadFile的速率影响研究

ReadFile 函数概述 ReadFile 是 Windows API 函数&#xff0c;用于从文件或设备&#xff08;如串口、硬盘等&#xff09;中读取数据。它是同步和异步 I/O 操作的基础函数。 函数原型 BOOL ReadFile(_In_ HANDLE hFile, // 文件或设备句柄_Out_write…...

Mysql的UPDATE(更新数据)详解

MySQL的UPDATE语句是用于修改数据库表中已存在的记录&#xff0c;本文将详细介绍UPDATE语句的基本语法、高级用法、性能优化策略以及注意事项&#xff0c;帮助您更好地理解和应用这一重要的SQL命令。 1. 基本语法 单表更新 单表更新的基本语法如下&#xff1a; UPDATE [LOW…...

基于Java Springboot高校奖助学金系统

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据…...

如何在 Ubuntu 22.04 上安装带有 Nginx 的 ELK Stack

今天我们来聊聊如何在 Ubuntu 22.04 服务器上安装 ELK Stack&#xff0c;并集成 Nginx 作为 Web 服务器&#xff0c;同时使用 Let’s Encrypt Certbot 进行 SSL 认证。ELK Stack&#xff0c;包括 Elasticsearch、Logstash 和 Kibana&#xff0c;是一套强大的工具&#xff0c;用…...

Python爬虫:深入探索1688关键词接口获取之道

在数字化经济的浪潮中&#xff0c;数据的价值愈发凸显&#xff0c;尤其是在电商领域。对于电商平台而言&#xff0c;关键词不仅是搜索流量的入口&#xff0c;也是洞察市场趋势、优化营销策略的重要工具。1688作为中国领先的B2B电商平台&#xff0c;其关键词接口的获取对于商家来…...

Let‘s Encrypt SSL证书:acmessl.cn申请免费3个月证书

目录 一、CA机构 二、Lets Encrypt特点 三、申请SSL 一、CA机构 ‌Lets Encrypt‌是一个由非营利组织Internet Security Research Group (ISRG)运营的证书颁发机构&#xff08;CA&#xff09;&#xff0c;旨在通过自动化和开放的方式为全球网站提供免费、可靠的SSL/TLS证书。…...

JSON Web Token (JWT)的简单介绍、验证过程及令牌刷新思路

目录 一、JWT 1、什么是Jwt 2、为什么要使用Jwt 3、应用场景 4.Jwt的组成 4.1、Header 4.2、Payload 4.3、signature 二、Jwt验证过程 1、生成Jwt令牌 2、解析旧的Jwt 3、复制Jwt 4、Jwt有效时间测试 三、Jwt令牌刷新思路 1、配置JwtFilter过滤器 2、登录生成Jwt令…...

xxl-job入门

xxl-job , 定时任务 分布式 &#xff0c; 带来的问题的 解决方案 像之前 很多项目都用到定时任务&#xff0c; 但是如果要改为 分布式&#xff0c; 那么定时任务 就要用到 xxl-job 1.用户画像 拼多多&#xff0c;看了某个东西后&#xff0c;推荐类似东西&#xff0c; 做埋…...

100.【C语言】数据结构之二叉树的堆实现(顺序结构) 1

目录 1.顺序结构 2.示意图 ​编辑 从物理结构还原为逻辑结构的方法 3.父子节点编号的规律 4.顺序存储的前提条件 5.堆的简介 堆的定义 堆的两个重要性质 小根堆和大根堆 6.堆的插入 7.堆的实现及操作堆的函数 堆的结构体定义 堆初始化函数HeapInit 堆插入元素函…...

大模型 VS 大语言模型

最近很多朋友搞不懂大模型和大预言模型的区别&#xff0c;总是把大模型就认为是大语言模型。 今天就用这篇帖子做一个科普。 大模型 概念&#xff1a;大模型是指拥有超大规模参数&#xff08;通常在十亿个以上&#xff09;、复杂计算结构的机器学习模型。它通常能够处理海量数…...

Linux高阶——1117—TCP客户端服务端

目录 1、sock.h socket常用函数 网络初始化函数 首次响应函数 测试IO处理函数 获取时间函数 总代码 2、sock.c SOCKET() ACCEPT()——服务端使用这个函数等待客户端连接 CONNECT()——客户端使用这个函数连接服务端 BIND()——一般只有服务端使用 LISTEN()——服务端…...

【Qt】Qt 在main.cpp中使用tr()函数报错

1. 问题 Qt 在main.cpp中使用tr()报错。 error: tr was not declared in this scope2. 解决方法 main.cpp中注意如下&#xff1a; //添加头文件 #include <QObject>//添加QObject QObject::tr("Hello")3. 参考 Qt tr()函数不起效的小问题...

面向对象高级(5)接口

面向对象高级&#xff08;5&#xff09; 接口 接口就是规范&#xff0c;定义的是一组规则&#xff0c;体现了现实世界中“如果是...则必须能...”的思想。继承是一个"是不是"的is-a关系&#xff0c;而接口实现则是 "能不能"的has-a关系。 1、接口的定义格…...

在软件开发中正确使用MySQL日期时间类型的深度解析

在日常软件开发场景中&#xff0c;时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志&#xff0c;到供应链系统的物流节点时间戳&#xff0c;时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库&#xff0c;其日期时间类型的…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

java 实现excel文件转pdf | 无水印 | 无限制

文章目录 目录 文章目录 前言 1.项目远程仓库配置 2.pom文件引入相关依赖 3.代码破解 二、Excel转PDF 1.代码实现 2.Aspose.License.xml 授权文件 总结 前言 java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

QT: `long long` 类型转换为 `QString` 2025.6.5

在 Qt 中&#xff0c;将 long long 类型转换为 QString 可以通过以下两种常用方法实现&#xff1a; 方法 1&#xff1a;使用 QString::number() 直接调用 QString 的静态方法 number()&#xff0c;将数值转换为字符串&#xff1a; long long value 1234567890123456789LL; …...

AI,如何重构理解、匹配与决策?

AI 时代&#xff0c;我们如何理解消费&#xff1f; 作者&#xff5c;王彬 封面&#xff5c;Unplash 人们通过信息理解世界。 曾几何时&#xff0c;PC 与移动互联网重塑了人们的购物路径&#xff1a;信息变得唾手可得&#xff0c;商品决策变得高度依赖内容。 但 AI 时代的来…...

JAVA后端开发——多租户

数据隔离是多租户系统中的核心概念&#xff0c;确保一个租户&#xff08;在这个系统中可能是一个公司或一个独立的客户&#xff09;的数据对其他租户是不可见的。在 RuoYi 框架&#xff08;您当前项目所使用的基础框架&#xff09;中&#xff0c;这通常是通过在数据表中增加一个…...

Webpack性能优化:构建速度与体积优化策略

一、构建速度优化 1、​​升级Webpack和Node.js​​ ​​优化效果​​&#xff1a;Webpack 4比Webpack 3构建时间降低60%-98%。​​原因​​&#xff1a; V8引擎优化&#xff08;for of替代forEach、Map/Set替代Object&#xff09;。默认使用更快的md4哈希算法。AST直接从Loa…...

比较数据迁移后MySQL数据库和OceanBase数据仓库中的表

设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...

python爬虫——气象数据爬取

一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用&#xff1a; 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests&#xff1a;发送 …...