当前位置: 首页 > news >正文

Thinkphp6使用RabbitMQ消息队列

Thinkphp6连接使用RabbitMQ(不止tp6,其他框架对应改下也一样),如何使用Docker部署RabbitMQ,在上一篇已经讲了->传送门<-。

部署环境

开始前先进入RabbitMQ的web管理界面,选择Queues菜单,点击底部的Add a new queue,新建一个test的队列。

安装thinkphp6框架

composer create-project topthink/think tp 

安装workerman扩展

composer require topthink/think-worker

安装rabbitmq扩展

composer require workerman/rabbitmq

代码编写

生产者

  • 在app目录下新建workerman目录,并在其下创建Send.php文件,$options数组中的host地址改成你的rabbitmq地址。
<?phpnamespace app\workerman;
use Bunny\Channel;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Send extends Server
{//websocket地址,一会用于测试。protected $socket = 'websocket://127.0.0.1:2345';/*** 收到信息* @param $connection* @param $data*/public function onMessage($connection, $data){//websocket发送过来的消息$connection->send('我收到你的信息了:'.$data);//rabbitMQ配置$options = ['host'=>'127.0.0.1',//rabbitMQ IP'port'=>5672,//rabbitMQ 通讯端口'user'=>'admin',//rabbitMQ 账号'password'=>'123456'//rabbitMQ 密码];(new Client($options))->connect()->then(function (Client $client) {return $client->channel();})->then(function (Channel $channel) {/*** 创建队列(Queue)* name: ceshi         // 队列名称* passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,*                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue* exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除*  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除*/return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {return $channel;});})->then(function (Channel $channel) use($data){echo "发送消息内容:".$data."\n";/*** 发送消息* body 发送的数据* headers 数据头,建议 ['content_type' => 'text/plain'],这样消费端是springboot注解接收直接是字符串类型* exchange 交换器名称* routingKey 路由key* mandatory* immediate* @return bool|PromiseInterface|int*/return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {return $channel;});})->then(function (Channel $channel) {//echo " [x] Sent 'Hello World!'\n";$client = $channel->getClient();return $channel->close()->then(function () use ($client) {return $client;});})->then(function (Client $client) {$client->disconnect();});}/*** 当连接建立时触发的回调函数* @param $connection*/public function onConnect($connection){}/*** 当连接断开时触发的回调函数* @param $connection*/public function onClose($connection){}/*** 当客户端的连接上发生错误时触发* @param $connection* @param $code* @param $msg*/public function onError($connection, $code, $msg){echo "error $code $msg\n";}/*** 每个进程启动* @param $worker*/public function onWorkerStart($worker){}
}
  • 在config/worker_server.php中设置worker_class值为'app\workerman\Send'

  • 启动这个生产者

php think worker:server
方式1:通过tcp发送数据
  • 发送数据
    通过在线网址发送数据(websocket方式),->传送门<-
    输入【ws://127.0.0.1:2345】后点击发送数据!
    在这里插入图片描述
  • 前往rabbitMQ控制台查看
    在这里插入图片描述
    至此,生产这一步就走完了,那么如果我不想通过websocket方式,想用tcp方式生产数据怎么办?
方式2:通过tcp发送数据

接口给内置服务器发消息->内置服务去发消息给rabbitMQ

  • 将Send.php中websocket:127.0.0.1改成tcp:127.0.0.1
  • 重启服务
  • 把controller目录中Index.php修改为以下内容
<?php
namespace app\controller;use app\BaseController;class Index extends BaseController
{public function index(string $msg){//连接本地tcp服务$client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);//发送字符串fwrite($client, $msg."\n");//断开服务fclose($client);return 'OK';}}
  • 用Postman访问对应接口就好,也会有数据进入队列

消费者

同生产者一样新创建一个thinkphp6项目,注意端口别和生产者冲突!这里我设置的是2346端口

  • 在app目录下新建workerman目录,并在其下创建Receive.php文件,$options数组中的host地址改成你的rabbitmq地址。
<?phpnamespace app\workerman;
use Bunny\Channel;
use Bunny\Message;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Receive extends Server
{protected $socket = 'tcp://127.0.0.1:2346';/*** 收到信息* @param $connection* @param $data*/public function onMessage($connection, $data){}/*** 当连接建立时触发的回调函数* @param $connection*/public function onConnect($connection){}/*** 当连接断开时触发的回调函数* @param $connection*/public function onClose($connection){}/*** 当客户端的连接上发生错误时触发* @param $connection* @param $code* @param $msg*/public function onError($connection, $code, $msg){echo "error $code $msg\n";}/*** 每个进程启动* @param $worker*/public function onWorkerStart($worker){//rabbitMQ配置$options = ['host'=>'127.0.0.1',//rabbitMQ IP'port'=>5672,//rabbitMQ 通讯端口'user'=>'admin',//rabbitMQ 账号'password'=>'123456'//rabbitMQ 密码];(new Client($options))->connect()->then(function (Client $client) {return $client->channel();})->then(function (Channel $channel) {/*** 创建队列(Queue)* name: ceshi         // 队列名称* passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,*                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue* exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除*  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除*/return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {return $channel;});})->then(function (Channel $channel) {echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";$channel->consume(function (Message $message, Channel $channel, Client $client) {echo "接收消息内容:", $message->content, "\n";},'ceshi','',false,true);});}
}
  • 在config/worker_server.php中设置worker_class值为'app\workerman\Receive',并将端口改为2346

  • 启动这个消费者

php think worker:server

到这里消费者也就结束啦!

使用

接下来我用cmd来启动两个服务,然后用接口发送消息和消费测试!
在这里插入图片描述




部分参考自:
https://www.workerman.net/doc/workerman/components/workerman-rabbitmq.html
https://blog.csdn.net/weixin_47723549/article/details/124493059

相关文章:

Thinkphp6使用RabbitMQ消息队列

Thinkphp6连接使用RabbitMQ&#xff08;不止tp6&#xff0c;其他框架对应改下也一样&#xff09;&#xff0c;如何使用Docker部署RabbitMQ&#xff0c;在上一篇已经讲了->传送门<-。 部署环境 开始前先进入RabbitMQ的web管理界面&#xff0c;选择Queues菜单&#xff0c;点…...

小成本互联网创业怎么做?低成本创业的方法分享

多数人都会有想法创业&#xff0c;尤其是在互联网上面创业&#xff0c;很多人看到了商机&#xff0c;但是因为成本的原因又放弃了&#xff0c;实际上&#xff0c;小成本也可以互联网创业&#xff01;那么&#xff0c;小成本互联网创业怎么做&#xff1f;低成本创业的方法在这里…...

六、栈、栈的相关问题

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、栈 1.栈概述 2.栈的实现 2.1 栈的API 2.2 栈的实现 二、栈的括号匹配问题 1.问题描述 2.代码实现 三、逆波兰表达式求值问题 1.问题描述 2.代码 总结 前言 提…...

Java安全停止线程

Thread 类虽提供了一个 stop() 方法&#xff08;已经被废弃&#xff09;&#xff0c;但由于 stop() 方法强制终止一个正在执行的线程&#xff0c;可能会造成数据不一致的问题&#xff0c;所以在生产环境中最好不要使用。 场景&#xff1a; 由于一些操作需要轮询处理&#xff…...

12 readdir 函数

前言 在之前 ls 命令 中我们可以看到, ls 命令的执行也是依赖于 opendir, readdir, stat, lstat 等相关操作系统提供的相关系统调用来处理业务 因此 我们这里来进一步看一下 更细节的这些 系统调用 我们这里关注的是 readdir 这个函数, 入口系统调用是 getdents 如下调试…...

Windows环境搭建Android开发环境-Android Studio/Git/JDK

Windows环境搭建Android开发环境-Android Studio/Git/JDK 因为休假回来后公司的开发环境由Ubuntu变为了Windows&#xff0c;所以需要重新配置一下开发环境。 工作多年第一次使用Windows环境进行开发工作&#xff0c;作次记录下来。 一、 Git安装 1.1git 标题软件下载 网址&…...

全国爱耳日丨听力受损严重有哪些解决办法

——【科学爱耳护耳&#xff0c;实现主动健康】随着数码电子设备使用越来越方便、日常使用时间越来越长&#xff0c;听力障碍、患上耳道疾病一系列问题也接踵而至&#xff0c;在当下我们必须重视听力健康&#xff0c;采取更科学的听音方式&#xff0c;保护听力健康&#xff0c;…...

【抽水蓄能电站】基于粒子群优化算法的抽水蓄能电站的最佳调度方案研究(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5;&#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密…...

【异常】因多租户字段缺少导致Error updating database. Column ‘tenant_id‘ cannot be null

一、报错内容 org.springframework.dao.DataIntegrityViolationException: ### Error updating database. Cause: java.sql.SQLIntegrityConstraintViolationException: Column tenant_id cannot be null ### The error may exist in com/xxx/cloud/mall/admin/mapper/Goods…...

类和对象(上)

文章目录 面向对象的初步认知类的实例化this引用对象的构造及初始化封装static成员代码块内部类 对象的打印一、面向对象的初步认知 Java是一门纯面向对象的语言(Object Oriented Program&#xff0c;简称OOP)&#xff0c;在面向对象的世界里&#xff0c;一切皆为对象。在java中…...

Java经典面试题——谈谈 Java 反射机制,动态代理是基于什么原理?

典型回答 反射机制是 Java 语言提供的一种基本功能&#xff0c;赋予程序在运行时 自省&#xff08;introspect&#xff0c;官方用语&#xff09;的能力。通过反射我们可以直接操作类或者对象&#xff0c;比如获取某个对象的类定义&#xff0c;获取类声明的属性和方法&#xff…...

19 客户端服务订阅机制的核心流程

Nacos客户端服务订阅机制的核心流程 说起Nacos的服务订阅机制&#xff0c;大家会觉得比较难理解&#xff0c;那我们就来详细分析一下&#xff0c;那我们先从Nacos订阅的概述说起 Nacos订阅概述 Nacos的订阅机制&#xff0c;如果用一句话来描述就是&#xff1a;Nacos客户端通…...

教师论文|科技专著管理系统

技术&#xff1a;Java、JSP等摘要&#xff1a;随着计算机和互联网技术的发展&#xff0c;社会的信息化程度越来越高&#xff0c;各行各业只有适应这种发展趋势&#xff0c;才能增强自己的适应能力和竞争能力&#xff0c;不断发展壮大。大学作为教育的基地&#xff0c;是社会进步…...

骨传导耳机是什么意思,骨传导耳机的好处具体有哪些

​在这个全民都是手机的时代&#xff0c;各种蓝牙耳机&#xff0c;入耳式耳机&#xff0c;真无线耳机等各种款式琳琅满目。而骨传导耳机是一种全新的科技产物&#xff0c;顾名思义就是通过头骨振动将声音传至外耳内的耳机。由于无需入耳&#xff0c;不会对耳朵造成任何影响。那…...

elasticsearch—使用汇总

文档结构1、概念简介2、使用创景3、核心组件4、环境部署5、操作实践官方网站&#xff1a;https://www.elastic.co/cn/elasticsearch/ 官方手册&#xff1a;https://www.elastic.co/guide/en/elasticsearch/reference/8.6/getting-started.html 参考教程&#xff1a; A&#xff…...

聊一聊代码重构——我们为什么要代码重构

代码重构 事情的起因是在去年下半年&#xff0c;我们终于无法承受往年的历史包袱而决定开始进行代码重构。 在以前我们尝试过进行代码重构但是从来没有系统性的考虑过如何重构。在对代码重构的过程中很多经验都是来自《重构&#xff1a;改善既有代码的设计》这本书&#xff0c;…...

【Python学习笔记】第二十九节 Python2 和Python3发生了哪些变化

Python 版本分为两大流派&#xff0c;一个是 Python 2.x 版本&#xff0c;另外一个是 Python 3.x 版本&#xff0c;Python 官方同时提供了对这两个版本的支持和维护。2020 年 1 月 1 日&#xff0c;Python 官方终止了对 Python 2.7 版本&#xff08;最后一个 Python 2.x 版本&a…...

[oeasy]python0099_雅达利大崩溃_IBM的开放架构_兼容机_oem

雅达利大崩溃 回忆上次内容 个人计算机浪潮已经来临 苹果公司迅速发展微软公司脱离mits准备做纯软件公司IBM用大型机思路制作的5100惨败 Commodore 64 既做计算机又做游戏机 计算机行业和游戏行业 跟随着底层技术不断迭代已经进入了战乱纷纷的年代最终又会如何呢&#xff1f…...

学术论文投稿之同行评审过程中可能会遭遇哪些偏见?

同行评审过程的顺利进行&#xff0c;在很大程度上取决于学术界的积极参与和相互信任&#xff0c;以及需要参与各方都以负责任的态度行事。作为审稿专家&#xff0c;向作者提供公正、客观的评价是至关重要的。同行评审过程中&#xff0c;若有任何偏离客观性的行为&#xff0c;均…...

Python写一个自动发送直播弹幕的工具,非常简单

哈喽大家好&#xff0c;今天给大家用Python整一个可以在直播间自动发弹幕的工具&#xff0c;来为喜欢的主播疯狂扣6 &#xff01; 事情原由昨晚回家&#xff0c;表弟在看LOL直播&#xff0c;看得我气不打一处来&#xff0c;差点就想锤他。 身为程序员的表弟&#xff0c;看直…...

做定制开发的定制软件开发公司平台

在数字化转型浪潮下&#xff0c;“定制软件开发”几乎成了每一家力图通过技术构建壁垒的企业的必选项。然而&#xff0c;一个令人尴尬的现实是&#xff1a;很多企业在数字化上砸了重金&#xff0c;不仅没换来效率&#xff0c;反而陷入了“开发超预算、交付总延期、上线全是坑”…...

DeepSeek Clean Code终极阈值(v2.3.1正式版):超出3个指标即触发强制重构——你达标了吗?

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;DeepSeek Clean Code终极阈值的演进与哲学内核 DeepSeek Clean Code 的“终极阈值”并非静态指标&#xff0c;而是代码可维护性、语义清晰度与执行确定性三者动态收敛的临界点。它源于对 LLM 推理链中 …...

如何在3分钟内完成Windows与Office智能激活:KMS_VL_ALL_AIO完全指南

如何在3分钟内完成Windows与Office智能激活&#xff1a;KMS_VL_ALL_AIO完全指南 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 还在为Windows操作系统和Office办公软件的正版激活而烦恼吗&…...

基于Helm与Kubernetes的以太坊节点自动化部署与运维实战

1. 项目概述&#xff1a;当以太坊遇见Kubernetes如果你和我一样&#xff0c;在区块链基础设施领域摸爬滚打多年&#xff0c;从早期手动编译客户端、配置systemd服务&#xff0c;到后来用Docker Compose编排节点&#xff0c;每一步都伴随着大量的重复劳动和运维痛点。当节点数量…...

【Oracle数据库指南】第06篇:Oracle DML语句与事务控制——数据操作与ACID特性深度解析

上一篇【第05篇】Oracle子查询与集合操作——嵌套查询与结果合并全解析 下一篇【第07篇】SQL*Plus基础——登录、环境设置与缓冲区操作 摘要 本文全面讲解Oracle DML&#xff08;数据操作语言&#xff09;语句&#xff0c;包括INSERT、UPDATE、DELETE和MERGE的详细用法&#x…...

大模型选型生死局(企业CTO私藏对比清单):Claude在长文档法律分析胜出32%,Gemini在实时多跳检索快4.8倍——你的业务该选谁?

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;大模型选型生死局&#xff1a;Claude vs Gemini核心能力全景图 在企业级AI应用落地的关键阶段&#xff0c;模型选型已远非单纯比拼参数量或基准分数&#xff0c;而是对推理鲁棒性、上下文工程适配度、多…...

基于WebSocket的Web即时通讯后端架构设计与实战部署指南

1. 项目概述&#xff1a;一个面向开发者的Web即时通讯解决方案最近在折腾一个内部协作工具&#xff0c;需要集成一个稳定、可控且能深度定制的即时通讯模块。市面上成熟的IM SDK很多&#xff0c;但要么是黑盒&#xff0c;出了问题排查困难&#xff1b;要么是功能臃肿&#xff0…...

欢迎来到Marp世界

欢迎来到Marp世界 【免费下载链接】marp The entrance repository of Markdown presentation ecosystem 项目地址: https://gitcode.com/gh_mirrors/mar/marp 用Markdown创建专业演示文稿从未如此简单&#xff01; 第二张幻灯片 列表项1列表项2列表项3 第三张幻灯片&am…...

如何用DdddOcr在3分钟内构建离线验证码识别系统

如何用DdddOcr在3分钟内构建离线验证码识别系统 【免费下载链接】ddddocr 带带弟弟 通用验证码识别OCR pypi版 项目地址: https://gitcode.com/gh_mirrors/dd/ddddocr 在当今的自动化测试、数据采集和网络安全领域&#xff0c;验证码识别是绕不开的技术难题。传统的在线…...

中兴860A四川电信高安版救砖记:遥控失效后,我是如何通过修改init.rc寄生脚本让遥控器起死回生的

中兴860A四川电信高安版遥控失效深度修复指南 当你的中兴860A四川电信高安版机顶盒突然"罢工"&#xff0c;遥控器怎么按都没反应&#xff0c;那种感觉就像电视突然变成了哑巴。这不是简单的配对问题&#xff0c;而是一场与系统底层限制的较量。本文将带你深入Android…...