tp6 RabbitMQ
1、composer 安装 AMQP 扩展
composer require php-amqplib/php-amqplib
2、RabbitMQ 配置
在 config 目录下创建 rabbitmq.php 文件
<?php
return ['host'=>'','port'=>'5672','user'=>'','password'=>'','vhost'=>'','exchange_name' => '','queue_name' => '','route_key' => '','consumer_tag' => '',
];
3、生产者代码
app目录下创建Producer.php
<?phpnamespace app;use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;class Producer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],'itcast');//创建通道$this->channel = $this->connection->channel();}public function send($data){/*** 创建队列(Queue)* name: hello // 队列名称* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: true // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失;设置true,则代表是一个持久化的队列,服务重启后也会存在,因为服务会把持久化的queue存放到磁盘上当服务重启的时候,会重新加载之前被持久化的queue* exclusive: false // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除**/$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);/*** 创建交换机(Exchange)* name: vckai_exchange// 交换机名称* type: direct // 交换机类型,分别为direct/fanout/topic,参考另外文章的Exchange Type说明。* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: false // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除*/$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);// 绑定消息交换机和队列$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'],$this->mq_config['route_key']);$messageBody = json_encode($data);//将要发送数据变为json字符串/*** 创建AMQP消息类型* delivery_mode 消息是否持久化* AMQPMessage::DELIVERY_MODE_NON_PERSISTENT 不持久化* AMQPMessage::DELIVERY_MODE_PERSISTENT 持久化*/$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));/*** 发送消息* msg: $message // AMQP消息内容* exchange: vckai_exchange // 交换机名称* routing_key: hello // 路由key*/$this->channel->basic_publish($message, $this->mq_config['exchange_name'], $this->mq_config['route_key']);//关闭连接$this->stop();}//关闭进程public function stop(){$this->channel->close();$this->connection->close();}}
4、消费者代码
app目录下创建Consumer.php
<?phpnamespace app;use app\index\controller\ApiCommunity;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use think\db\exception\PDOException;
use think\facade\Log;class Consumer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],$this->mq_config['vhost']);//创建通道$this->channel = $this->connection->channel();}/*** @param $channel* @param $connection* 关闭进程*/function shutdown($channel, $connection){$channel->close();$connection->close();}/*** @param $message* 消息处理*/function process_message($message){//消息处理逻辑echo $message->body . "\n";if ($message->body !== 'quit') {$obj = json_decode($message->body);if (!isset($obj->id)) {Log::write("error data:" . $message->body, 2);} else {try {Log::write("data:" . json_encode($message));//消息处理} catch (\Think\Exception $e) {Log::write($e->getMessage(), 2);Log::write(json_encode($message), 2);} catch (PDOException $pe) {Log::write($pe->getMessage(), 2);Log::write(json_encode($message), 2);}}}// 手动确认ack,确保消息已经处理$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);// Send a message with the string "quit" to cancel the consumer.if ($message->body === 'quit') {$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);}}/*** @throws \ErrorException* 启动** nohup php index.php index/Message_Consume/start &*/public function start(){// 设置消费者(Consumer)客户端同时只处理一条队列// 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。//消费者端要把自动确认autoAck设置为false,basic_qos才有效。//$this->channel->basic_qos(0, 1, false);// 同样是创建路由和队列,以及绑定路由队列,注意要跟producer(生产者)的一致// 这里其实可以不用设置,但是为了防止队列没有被创建所以做的容错处理$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'], $this->mq_config['route_key']);/**** queue: queue_name // 被消费的队列名称* consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端* no_local: false // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现* no_ack: true // 收到消息后,是否不需要回复确认即被认为被消费* exclusive: false // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下* nowait: false // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错* callback: $callback // 回调逻辑处理函数**/$this->channel->basic_consume($this->mq_config['queue_name'], $this->mq_config['consumer_tag'], false, false, false, false, array($this, 'process_message'));register_shutdown_function(array($this, 'shutdown'), $this->channel, $this->connection);while (count($this->channel->callbacks)) {$this->channel->wait();}}
}
5、创建自定义命令
php think make:command Consumer
在项目跟目录执行以下命令,会自动生成 在 command 目录生成 Consumer 控制器
<?php
declare (strict_types = 1);namespace app\command;use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class Consumer extends Command
{protected function configure(){// 指令配置$this->setName('consumer')->setDescription('the consumer command');}protected function execute(Input $input, Output $output){// 指令输出$output->writeln('consumer');$consumer = new \app\Consumer();
// $consumer->process_message(11)$consumer->start();}
}
config/console.php 代码增加如下:
// 指令定义
'commands' => ['consumer' => 'app\command\Consumer',
],
6、命令
消费者命令
php think consumer
生产者执行命令
$producer = new Producer();
$data = ['message' => "发送的消息内容"
];
$producer->send($data);
相关文章:
tp6 RabbitMQ
1、composer 安装 AMQP 扩展 composer require php-amqplib/php-amqplib 2、RabbitMQ 配置 在 config 目录下创建 rabbitmq.php 文件 <?php return [host>,port>5672,user>,password>,vhost>,exchange_name > ,queue_name > ,route_key > ,cons…...
java Spring Boot yml多环境拆分文件管理优化
上文 java Spring Boot yml多环境配置 我们讲了多环境开发 但这种东西都放在一起 还是非常容易暴露信息的 并且对维护来讲 也不是非常的友好 这里 我们在resources下创建三个文件 分别叫 application-pro.yml application-dev.yml application-test.yml 我们直接将三个环境 转…...
【设计模式——学习笔记】23种设计模式——状态模式State(原理讲解+应用场景介绍+案例介绍+Java代码实现)
文章目录 案例引入介绍基本介绍登场角色应用场景 案例实现案例一类图实现 案例二:借贷平台源码剖析传统方式实现分析状态修改流程类图实现 案例三:金库警报系统系统的运行逻辑伪代码传统实现方式使用状态模式 类图实现分析问题问题一问题二 总结文章说明…...
【LeetCode每日一题】——41.缺失的第一个正数
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 困难 三【题目编号】 41.缺失的第一个正数 四【题目描述】 给你一个…...
typedef函数代码段解释以及部分Windows下的系统函数
文章目录 1、typedef int (WINAPI* LPSDOLInitialize)(const SDOLAppInfo* pAppInfo)2、typedef int (WINAPI* LPSDOLGetModule)(REFIID riid, void** intf)3、typedef int (WINAPI* LPSDOLTerminal)();4、GetProcAddress运行时获取一个动态链接库(DLL)中…...
Typora常用手册
常用快捷键 加粗: Ctrl B 标题: Ctrl H 插入链接: Ctrl K 插入代码: Ctrl Shift C – 无法执行 行内代码: Ctrl Shift K 插入图片: Ctrl Shift I 无序列表:Ctrl Shift L – 无法执行…...
互联网发展历程:从网线不够长到中继器的引入
互联网,这个如今贯穿我们生活的无所不在的网络,其发展历程充满了无数的创新和变革。有一项看似不太起眼的技术却在互联网的发展中发挥着至关重要的作用,那就是中继器。本文将带您深入了解互联网的发展历程,探讨在网线不够长的情况…...
【Java】异常处理 之 使用SLF4J 和 Logback
使用SLF4J和Logback 前面介绍了Commons Logging 和Log4j 这一对好基友,它们一个负责充当日志 API,一个负责实现日志底层,搭配使用非常便于开发。 有的童鞋可能还听说过SLF4J和Logback。这两个东东看上去也像日志,它们又是啥&…...
C++11并发与多线程笔记 (1)
C11并发与多线程笔记(1) 1、并发、进程、线程的基本概念和综述1.1 并发1.2 可执行程序1.3 进程1.4 线程1.5 学习心得 2、并发的实现方法2.1 多进程并发2.2 多线程并发 3、C11新标准线程库 1、并发、进程、线程的基本概念和综述 1.1 并发 指在一个时间段…...
07_Hudi案例实战、Flink CDC 实时数据采集、Presto、FineBI 报表可视化等
7.第七章 Hudi案例实战 7.1 案例架构 7.2 业务数据 7.2.1 客户信息表 7.2.2 客户意向表 7.2.3 客户线索表 7.2.4 线索申诉表 7.2.5 客户访问咨询记录表 7.3 Flink CDC 实时数据采集 7.3.1 开启MySQL binlog 7.3.2 环境准备 7.3.3 实时采集数据 7.3.3.1 客户信息表 7.3.3.2 客户…...
ceph相关概念和部署
Ceph 可用于向云提供 Ceph 对象存储 平台和 Ceph 可用于提供 Ceph 块设备服务 到云平台。Ceph 可用于部署 Ceph 文件 系统。所有 Ceph 存储集群部署都从设置 每个 Ceph 节点,然后设置网络。 Ceph 存储集群需要满足以下条件:至少一个 Ceph 监控器&#x…...
Android Jetpack Compose 中的分页与缓存展示
Android Jetpack Compose 中的分页与缓存展示 在几乎任何类型的移动项目中,移动开发人员在某个时候都会处理分页数据。如果数据列表太大,无法一次从服务器检索完毕,这就是必需的。因此,我们的后端同事为我们提供了一个端点&#…...
无名管道 / 有名管道(FIFO)
根据上节所讲就可以了解到:管道其实就是实现进程间通讯IPC中的一种类型方法 基本概念(无名管道) 管道是一种最基本的IPC机制,通常指无名管道,也是UNIX系统IPC最古老的形式。管道只能作用于有血缘关系的进程之间&…...
Three.js纹理贴图
目录 Three.js入门 Three.js光源 Three.js阴影 Three.js纹理贴图 纹理是一种图像或图像数据,用于为物体的材质提供颜色、纹理、法线、位移等信息,从而实现更加逼真的渲染结果。 纹理可以应用于Three.js中的材质类型,如MeshBasicMaterial…...
1+X Web前端开发职业技能等级证书建设方案
一 、系统概述 1X Web前端开发技术是计算机类专业重要的核心课程,课程所包含的教学内容多,实践性强,并且相关技术更新快。传统的课堂讲授模式以教师为中心,学生被动式接收,难以调动学生学习的积极性和主动性。混合式教…...
Rx.NET in Action 第二章学习笔记
2 Hello, Rx 本章节涵盖的内容: 不使用Rx的工作方式向项目中添加Rx创建你的第一个Rx应用程序 Rx 的目标是协调和统筹来自社交网络、传感器、用户界面事件等不同来源的基于事件的异步计算。例如,建筑物周围的监控摄像头和移动传感器会在有人靠近建筑物时触发…...
【软件工程 | 模块耦合】什么是模块耦合及分类
概念 耦合(coupling)是对两个模块之间联接程度的一种度量。模块间的依赖程度越大,则其耦合程度也就越大; 反之,模块间的依赖程度越小,则其耦合程度也就越小。 很显然,为了使软件具有较好的可维护性和可修改性…...
OCT介绍和分类
前言:研究方向和OCT有关,为了方便以后回顾,所以整理了OCT相关的一些内容。 OCT介绍和分类 OCT介绍分类时域OCT频域OCT扫频OCT谱域OCT OCT介绍 名称:OCT、光学相干层析成像术、Optical Coherence Tomography。 概念:O…...
07-2_Qt 5.9 C++开发指南_二进制文件读写(stm和dat格式)
文章目录 1. 实例功能概述2. Qt预定义编码文件的读写2.1 保存为stm文件2.2 stm文件格式2.3 读取stm文件 3. 标准编码文件的读写3.1 保存为dat文件3.2 dat文件格式3.3 读取dat文件 4. 框架及源码4.1 可视化UI设计4.2 mainwindow.cpp 1. 实例功能概述 除了文本文件之外ÿ…...
SpringBoot复习:(41)配置文件中配置的server开头的属性是怎么配置到Servlet容器中起作用的?
ServletWebServerFactoryAutoConfiguration类: 可以看到其中使用了EnableConfigurationProperties导入了ServerProperties 而ServerProperties通过使用ConfigurationProperties注解导入了配置文件中已server开头的那些配置项。 可以看到ServletWebServerFactory定…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
Matlab | matlab常用命令总结
常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
Ascend NPU上适配Step-Audio模型
1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统,支持多语言对话(如 中文,英文,日语),语音情感(如 开心,悲伤)&#x…...
中医有效性探讨
文章目录 西医是如何发展到以生物化学为药理基础的现代医学?传统医学奠基期(远古 - 17 世纪)近代医学转型期(17 世纪 - 19 世纪末)现代医学成熟期(20世纪至今) 中医的源远流长和一脉相承远古至…...
安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖
在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会࿰…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...
提升移动端网页调试效率:WebDebugX 与常见工具组合实践
在日常移动端开发中,网页调试始终是一个高频但又极具挑战的环节。尤其在面对 iOS 与 Android 的混合技术栈、各种设备差异化行为时,开发者迫切需要一套高效、可靠且跨平台的调试方案。过去,我们或多或少使用过 Chrome DevTools、Remote Debug…...
