tp6 RabbitMQ
1、composer 安装 AMQP 扩展
composer require php-amqplib/php-amqplib
2、RabbitMQ 配置
在 config 目录下创建 rabbitmq.php 文件
<?php
return ['host'=>'','port'=>'5672','user'=>'','password'=>'','vhost'=>'','exchange_name' => '','queue_name' => '','route_key' => '','consumer_tag' => '',
];
3、生产者代码
app目录下创建Producer.php
<?phpnamespace app;use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;class Producer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],'itcast');//创建通道$this->channel = $this->connection->channel();}public function send($data){/*** 创建队列(Queue)* name: hello // 队列名称* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: true // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失;设置true,则代表是一个持久化的队列,服务重启后也会存在,因为服务会把持久化的queue存放到磁盘上当服务重启的时候,会重新加载之前被持久化的queue* exclusive: false // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除**/$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);/*** 创建交换机(Exchange)* name: vckai_exchange// 交换机名称* type: direct // 交换机类型,分别为direct/fanout/topic,参考另外文章的Exchange Type说明。* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: false // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除*/$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);// 绑定消息交换机和队列$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'],$this->mq_config['route_key']);$messageBody = json_encode($data);//将要发送数据变为json字符串/*** 创建AMQP消息类型* delivery_mode 消息是否持久化* AMQPMessage::DELIVERY_MODE_NON_PERSISTENT 不持久化* AMQPMessage::DELIVERY_MODE_PERSISTENT 持久化*/$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));/*** 发送消息* msg: $message // AMQP消息内容* exchange: vckai_exchange // 交换机名称* routing_key: hello // 路由key*/$this->channel->basic_publish($message, $this->mq_config['exchange_name'], $this->mq_config['route_key']);//关闭连接$this->stop();}//关闭进程public function stop(){$this->channel->close();$this->connection->close();}}
4、消费者代码
app目录下创建Consumer.php
<?phpnamespace app;use app\index\controller\ApiCommunity;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use think\db\exception\PDOException;
use think\facade\Log;class Consumer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],$this->mq_config['vhost']);//创建通道$this->channel = $this->connection->channel();}/*** @param $channel* @param $connection* 关闭进程*/function shutdown($channel, $connection){$channel->close();$connection->close();}/*** @param $message* 消息处理*/function process_message($message){//消息处理逻辑echo $message->body . "\n";if ($message->body !== 'quit') {$obj = json_decode($message->body);if (!isset($obj->id)) {Log::write("error data:" . $message->body, 2);} else {try {Log::write("data:" . json_encode($message));//消息处理} catch (\Think\Exception $e) {Log::write($e->getMessage(), 2);Log::write(json_encode($message), 2);} catch (PDOException $pe) {Log::write($pe->getMessage(), 2);Log::write(json_encode($message), 2);}}}// 手动确认ack,确保消息已经处理$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);// Send a message with the string "quit" to cancel the consumer.if ($message->body === 'quit') {$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);}}/*** @throws \ErrorException* 启动** nohup php index.php index/Message_Consume/start &*/public function start(){// 设置消费者(Consumer)客户端同时只处理一条队列// 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。//消费者端要把自动确认autoAck设置为false,basic_qos才有效。//$this->channel->basic_qos(0, 1, false);// 同样是创建路由和队列,以及绑定路由队列,注意要跟producer(生产者)的一致// 这里其实可以不用设置,但是为了防止队列没有被创建所以做的容错处理$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'], $this->mq_config['route_key']);/**** queue: queue_name // 被消费的队列名称* consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端* no_local: false // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现* no_ack: true // 收到消息后,是否不需要回复确认即被认为被消费* exclusive: false // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下* nowait: false // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错* callback: $callback // 回调逻辑处理函数**/$this->channel->basic_consume($this->mq_config['queue_name'], $this->mq_config['consumer_tag'], false, false, false, false, array($this, 'process_message'));register_shutdown_function(array($this, 'shutdown'), $this->channel, $this->connection);while (count($this->channel->callbacks)) {$this->channel->wait();}}
}
5、创建自定义命令
php think make:command Consumer
在项目跟目录执行以下命令,会自动生成 在 command 目录生成 Consumer 控制器
<?php
declare (strict_types = 1);namespace app\command;use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class Consumer extends Command
{protected function configure(){// 指令配置$this->setName('consumer')->setDescription('the consumer command');}protected function execute(Input $input, Output $output){// 指令输出$output->writeln('consumer');$consumer = new \app\Consumer();
// $consumer->process_message(11)$consumer->start();}
}
config/console.php 代码增加如下:
// 指令定义
'commands' => ['consumer' => 'app\command\Consumer',
],
6、命令
消费者命令
php think consumer
生产者执行命令
$producer = new Producer();
$data = ['message' => "发送的消息内容"
];
$producer->send($data);
相关文章:
tp6 RabbitMQ
1、composer 安装 AMQP 扩展 composer require php-amqplib/php-amqplib 2、RabbitMQ 配置 在 config 目录下创建 rabbitmq.php 文件 <?php return [host>,port>5672,user>,password>,vhost>,exchange_name > ,queue_name > ,route_key > ,cons…...
java Spring Boot yml多环境拆分文件管理优化
上文 java Spring Boot yml多环境配置 我们讲了多环境开发 但这种东西都放在一起 还是非常容易暴露信息的 并且对维护来讲 也不是非常的友好 这里 我们在resources下创建三个文件 分别叫 application-pro.yml application-dev.yml application-test.yml 我们直接将三个环境 转…...
【设计模式——学习笔记】23种设计模式——状态模式State(原理讲解+应用场景介绍+案例介绍+Java代码实现)
文章目录 案例引入介绍基本介绍登场角色应用场景 案例实现案例一类图实现 案例二:借贷平台源码剖析传统方式实现分析状态修改流程类图实现 案例三:金库警报系统系统的运行逻辑伪代码传统实现方式使用状态模式 类图实现分析问题问题一问题二 总结文章说明…...
【LeetCode每日一题】——41.缺失的第一个正数
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 困难 三【题目编号】 41.缺失的第一个正数 四【题目描述】 给你一个…...
typedef函数代码段解释以及部分Windows下的系统函数
文章目录 1、typedef int (WINAPI* LPSDOLInitialize)(const SDOLAppInfo* pAppInfo)2、typedef int (WINAPI* LPSDOLGetModule)(REFIID riid, void** intf)3、typedef int (WINAPI* LPSDOLTerminal)();4、GetProcAddress运行时获取一个动态链接库(DLL)中…...
Typora常用手册
常用快捷键 加粗: Ctrl B 标题: Ctrl H 插入链接: Ctrl K 插入代码: Ctrl Shift C – 无法执行 行内代码: Ctrl Shift K 插入图片: Ctrl Shift I 无序列表:Ctrl Shift L – 无法执行…...
互联网发展历程:从网线不够长到中继器的引入
互联网,这个如今贯穿我们生活的无所不在的网络,其发展历程充满了无数的创新和变革。有一项看似不太起眼的技术却在互联网的发展中发挥着至关重要的作用,那就是中继器。本文将带您深入了解互联网的发展历程,探讨在网线不够长的情况…...
【Java】异常处理 之 使用SLF4J 和 Logback
使用SLF4J和Logback 前面介绍了Commons Logging 和Log4j 这一对好基友,它们一个负责充当日志 API,一个负责实现日志底层,搭配使用非常便于开发。 有的童鞋可能还听说过SLF4J和Logback。这两个东东看上去也像日志,它们又是啥&…...
C++11并发与多线程笔记 (1)
C11并发与多线程笔记(1) 1、并发、进程、线程的基本概念和综述1.1 并发1.2 可执行程序1.3 进程1.4 线程1.5 学习心得 2、并发的实现方法2.1 多进程并发2.2 多线程并发 3、C11新标准线程库 1、并发、进程、线程的基本概念和综述 1.1 并发 指在一个时间段…...
07_Hudi案例实战、Flink CDC 实时数据采集、Presto、FineBI 报表可视化等
7.第七章 Hudi案例实战 7.1 案例架构 7.2 业务数据 7.2.1 客户信息表 7.2.2 客户意向表 7.2.3 客户线索表 7.2.4 线索申诉表 7.2.5 客户访问咨询记录表 7.3 Flink CDC 实时数据采集 7.3.1 开启MySQL binlog 7.3.2 环境准备 7.3.3 实时采集数据 7.3.3.1 客户信息表 7.3.3.2 客户…...
ceph相关概念和部署
Ceph 可用于向云提供 Ceph 对象存储 平台和 Ceph 可用于提供 Ceph 块设备服务 到云平台。Ceph 可用于部署 Ceph 文件 系统。所有 Ceph 存储集群部署都从设置 每个 Ceph 节点,然后设置网络。 Ceph 存储集群需要满足以下条件:至少一个 Ceph 监控器&#x…...
Android Jetpack Compose 中的分页与缓存展示
Android Jetpack Compose 中的分页与缓存展示 在几乎任何类型的移动项目中,移动开发人员在某个时候都会处理分页数据。如果数据列表太大,无法一次从服务器检索完毕,这就是必需的。因此,我们的后端同事为我们提供了一个端点&#…...
无名管道 / 有名管道(FIFO)
根据上节所讲就可以了解到:管道其实就是实现进程间通讯IPC中的一种类型方法 基本概念(无名管道) 管道是一种最基本的IPC机制,通常指无名管道,也是UNIX系统IPC最古老的形式。管道只能作用于有血缘关系的进程之间&…...
Three.js纹理贴图
目录 Three.js入门 Three.js光源 Three.js阴影 Three.js纹理贴图 纹理是一种图像或图像数据,用于为物体的材质提供颜色、纹理、法线、位移等信息,从而实现更加逼真的渲染结果。 纹理可以应用于Three.js中的材质类型,如MeshBasicMaterial…...
1+X Web前端开发职业技能等级证书建设方案
一 、系统概述 1X Web前端开发技术是计算机类专业重要的核心课程,课程所包含的教学内容多,实践性强,并且相关技术更新快。传统的课堂讲授模式以教师为中心,学生被动式接收,难以调动学生学习的积极性和主动性。混合式教…...
Rx.NET in Action 第二章学习笔记
2 Hello, Rx 本章节涵盖的内容: 不使用Rx的工作方式向项目中添加Rx创建你的第一个Rx应用程序 Rx 的目标是协调和统筹来自社交网络、传感器、用户界面事件等不同来源的基于事件的异步计算。例如,建筑物周围的监控摄像头和移动传感器会在有人靠近建筑物时触发…...
【软件工程 | 模块耦合】什么是模块耦合及分类
概念 耦合(coupling)是对两个模块之间联接程度的一种度量。模块间的依赖程度越大,则其耦合程度也就越大; 反之,模块间的依赖程度越小,则其耦合程度也就越小。 很显然,为了使软件具有较好的可维护性和可修改性…...
OCT介绍和分类
前言:研究方向和OCT有关,为了方便以后回顾,所以整理了OCT相关的一些内容。 OCT介绍和分类 OCT介绍分类时域OCT频域OCT扫频OCT谱域OCT OCT介绍 名称:OCT、光学相干层析成像术、Optical Coherence Tomography。 概念:O…...
07-2_Qt 5.9 C++开发指南_二进制文件读写(stm和dat格式)
文章目录 1. 实例功能概述2. Qt预定义编码文件的读写2.1 保存为stm文件2.2 stm文件格式2.3 读取stm文件 3. 标准编码文件的读写3.1 保存为dat文件3.2 dat文件格式3.3 读取dat文件 4. 框架及源码4.1 可视化UI设计4.2 mainwindow.cpp 1. 实例功能概述 除了文本文件之外ÿ…...
SpringBoot复习:(41)配置文件中配置的server开头的属性是怎么配置到Servlet容器中起作用的?
ServletWebServerFactoryAutoConfiguration类: 可以看到其中使用了EnableConfigurationProperties导入了ServerProperties 而ServerProperties通过使用ConfigurationProperties注解导入了配置文件中已server开头的那些配置项。 可以看到ServletWebServerFactory定…...
词云AI电话机器人在金融风控与合规通知的核心价值与应用场景-系列五
金融行业对风控与合规的要求极高:逾期提醒不能断,交易核实不能慢,授信通知不能错,续保提醒不能漏。词云AI电话机器人以自动化、可留痕、高并发的智能外呼能力,承担风险预警、交易核实、授信告知、还款与续保提醒等高频…...
星图GPU云体验OpenClaw:免安装调试Phi-3-mini-128k-instruct镜像
星图GPU云体验OpenClaw:免安装调试Phi-3-mini-128k-instruct镜像 1. 为什么选择云端体验OpenClaw 上周我尝试在本地笔记本上部署OpenClaw时,被各种环境依赖和权限问题折磨得够呛。正当我准备放弃时,偶然发现星图平台提供了预装OpenClaw的GP…...
OpenClaw稳定性提升:Qwen3-14B长时运行的内存泄漏排查
OpenClaw稳定性提升:Qwen3-14B长时运行的内存泄漏排查 1. 问题背景:72小时无人值守的意外崩溃 上周我尝试用OpenClawQwen3-14B搭建一个自动化内容处理流水线,期望它能724小时不间断工作。前48小时运行良好,但在第72小时突然发现…...
iSDIO库:嵌入式系统中FlashAir Wi-Fi卡的SDIO协议栈
1. iSDIO库概述:面向TOSHIBA FlashAir的嵌入式SDIO协议栈iSDIO(intelligent SDIO)库是一个专为东芝(TOSHIBA)FlashAir系列Wi-Fi SD卡设计的轻量级嵌入式驱动与通信中间件。该库并非通用SDIO主机控制器驱动,…...
leetcode 101.对称二叉树(不会做)
遇到递归就抓瞎 # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # self.left left # self.right right class Solution:def isSymmetric(self, root: Optional[Tre…...
Docker TLS 证书一键生成脚本(安全加密远程访问)
Docker TLS 证书一键生成脚本(安全加密远程访问) 这是一键自动生成 Docker TLS 加密证书的 Shell 脚本,无需手动输入复杂命令,自动生成 CA 证书、服务端证书、客户端证书,配置好权限,直接复制就能用&#x…...
FlinkX异构数据同步:从安装到实战的5个关键技巧
FlinkX异构数据同步:从安装到实战的5个关键技巧 在数据驱动的时代,企业常常面临不同数据源之间高效同步的挑战。FlinkX作为一款基于Apache Flink的分布式数据同步工具,凭借其强大的异构数据源支持能力和灵活的插件架构,正在成为技…...
华三MSR系列路由器单臂路由配置全记录:从实验环境搭建到真机部署避坑指南
华三MSR路由器单臂路由实战:从实验室到生产环境的全流程指南 单臂路由技术在企业网络架构中扮演着关键角色,特别是在需要实现多VLAN互通又要求流量隔离的中小型网络场景。华三MSR系列路由器(如MSR2600/3600)凭借其稳定的性能和灵活…...
2025年中国市场SCA工具深度评测:国产化浪潮下的安全新选择
随着数字化转型进入深水区,软件供应链安全已成为企业不可忽视的战略要地。2025年,在信创政策持续深化与国产化替代加速的双重背景下,软件成分分析(SCA)工具作为DevSecOps体系中的关键一环,正迎来前所未有的市场机遇与挑战。这场由…...
智慧医疗X光图像手骨骨折检测数据集VOC+YOLO格式20307张3类别
数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件)图片数量(jpg文件个数):20307标注数量(xml文件个数):20307标注数量(txt文件个数):20307标注类…...
