tp6 RabbitMQ
1、composer 安装 AMQP 扩展
composer require php-amqplib/php-amqplib
2、RabbitMQ 配置
在 config 目录下创建 rabbitmq.php 文件
<?php
return ['host'=>'','port'=>'5672','user'=>'','password'=>'','vhost'=>'','exchange_name' => '','queue_name' => '','route_key' => '','consumer_tag' => '',
];
3、生产者代码
app目录下创建Producer.php
<?phpnamespace app;use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;class Producer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],'itcast');//创建通道$this->channel = $this->connection->channel();}public function send($data){/*** 创建队列(Queue)* name: hello // 队列名称* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: true // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失;设置true,则代表是一个持久化的队列,服务重启后也会存在,因为服务会把持久化的queue存放到磁盘上当服务重启的时候,会重新加载之前被持久化的queue* exclusive: false // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除**/$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);/*** 创建交换机(Exchange)* name: vckai_exchange// 交换机名称* type: direct // 交换机类型,分别为direct/fanout/topic,参考另外文章的Exchange Type说明。* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: false // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除*/$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);// 绑定消息交换机和队列$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'],$this->mq_config['route_key']);$messageBody = json_encode($data);//将要发送数据变为json字符串/*** 创建AMQP消息类型* delivery_mode 消息是否持久化* AMQPMessage::DELIVERY_MODE_NON_PERSISTENT 不持久化* AMQPMessage::DELIVERY_MODE_PERSISTENT 持久化*/$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));/*** 发送消息* msg: $message // AMQP消息内容* exchange: vckai_exchange // 交换机名称* routing_key: hello // 路由key*/$this->channel->basic_publish($message, $this->mq_config['exchange_name'], $this->mq_config['route_key']);//关闭连接$this->stop();}//关闭进程public function stop(){$this->channel->close();$this->connection->close();}}
4、消费者代码
app目录下创建Consumer.php
<?phpnamespace app;use app\index\controller\ApiCommunity;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use think\db\exception\PDOException;
use think\facade\Log;class Consumer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],$this->mq_config['vhost']);//创建通道$this->channel = $this->connection->channel();}/*** @param $channel* @param $connection* 关闭进程*/function shutdown($channel, $connection){$channel->close();$connection->close();}/*** @param $message* 消息处理*/function process_message($message){//消息处理逻辑echo $message->body . "\n";if ($message->body !== 'quit') {$obj = json_decode($message->body);if (!isset($obj->id)) {Log::write("error data:" . $message->body, 2);} else {try {Log::write("data:" . json_encode($message));//消息处理} catch (\Think\Exception $e) {Log::write($e->getMessage(), 2);Log::write(json_encode($message), 2);} catch (PDOException $pe) {Log::write($pe->getMessage(), 2);Log::write(json_encode($message), 2);}}}// 手动确认ack,确保消息已经处理$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);// Send a message with the string "quit" to cancel the consumer.if ($message->body === 'quit') {$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);}}/*** @throws \ErrorException* 启动** nohup php index.php index/Message_Consume/start &*/public function start(){// 设置消费者(Consumer)客户端同时只处理一条队列// 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。//消费者端要把自动确认autoAck设置为false,basic_qos才有效。//$this->channel->basic_qos(0, 1, false);// 同样是创建路由和队列,以及绑定路由队列,注意要跟producer(生产者)的一致// 这里其实可以不用设置,但是为了防止队列没有被创建所以做的容错处理$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'], $this->mq_config['route_key']);/**** queue: queue_name // 被消费的队列名称* consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端* no_local: false // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现* no_ack: true // 收到消息后,是否不需要回复确认即被认为被消费* exclusive: false // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下* nowait: false // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错* callback: $callback // 回调逻辑处理函数**/$this->channel->basic_consume($this->mq_config['queue_name'], $this->mq_config['consumer_tag'], false, false, false, false, array($this, 'process_message'));register_shutdown_function(array($this, 'shutdown'), $this->channel, $this->connection);while (count($this->channel->callbacks)) {$this->channel->wait();}}
}
5、创建自定义命令
php think make:command Consumer
在项目跟目录执行以下命令,会自动生成 在 command 目录生成 Consumer 控制器
<?php
declare (strict_types = 1);namespace app\command;use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class Consumer extends Command
{protected function configure(){// 指令配置$this->setName('consumer')->setDescription('the consumer command');}protected function execute(Input $input, Output $output){// 指令输出$output->writeln('consumer');$consumer = new \app\Consumer();
// $consumer->process_message(11)$consumer->start();}
}
config/console.php 代码增加如下:
// 指令定义
'commands' => ['consumer' => 'app\command\Consumer',
],
6、命令
消费者命令
php think consumer
生产者执行命令
$producer = new Producer();
$data = ['message' => "发送的消息内容"
];
$producer->send($data);
相关文章:
tp6 RabbitMQ
1、composer 安装 AMQP 扩展 composer require php-amqplib/php-amqplib 2、RabbitMQ 配置 在 config 目录下创建 rabbitmq.php 文件 <?php return [host>,port>5672,user>,password>,vhost>,exchange_name > ,queue_name > ,route_key > ,cons…...
java Spring Boot yml多环境拆分文件管理优化
上文 java Spring Boot yml多环境配置 我们讲了多环境开发 但这种东西都放在一起 还是非常容易暴露信息的 并且对维护来讲 也不是非常的友好 这里 我们在resources下创建三个文件 分别叫 application-pro.yml application-dev.yml application-test.yml 我们直接将三个环境 转…...
【设计模式——学习笔记】23种设计模式——状态模式State(原理讲解+应用场景介绍+案例介绍+Java代码实现)
文章目录 案例引入介绍基本介绍登场角色应用场景 案例实现案例一类图实现 案例二:借贷平台源码剖析传统方式实现分析状态修改流程类图实现 案例三:金库警报系统系统的运行逻辑伪代码传统实现方式使用状态模式 类图实现分析问题问题一问题二 总结文章说明…...
【LeetCode每日一题】——41.缺失的第一个正数
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 困难 三【题目编号】 41.缺失的第一个正数 四【题目描述】 给你一个…...
typedef函数代码段解释以及部分Windows下的系统函数
文章目录 1、typedef int (WINAPI* LPSDOLInitialize)(const SDOLAppInfo* pAppInfo)2、typedef int (WINAPI* LPSDOLGetModule)(REFIID riid, void** intf)3、typedef int (WINAPI* LPSDOLTerminal)();4、GetProcAddress运行时获取一个动态链接库(DLL)中…...
Typora常用手册
常用快捷键 加粗: Ctrl B 标题: Ctrl H 插入链接: Ctrl K 插入代码: Ctrl Shift C – 无法执行 行内代码: Ctrl Shift K 插入图片: Ctrl Shift I 无序列表:Ctrl Shift L – 无法执行…...
互联网发展历程:从网线不够长到中继器的引入
互联网,这个如今贯穿我们生活的无所不在的网络,其发展历程充满了无数的创新和变革。有一项看似不太起眼的技术却在互联网的发展中发挥着至关重要的作用,那就是中继器。本文将带您深入了解互联网的发展历程,探讨在网线不够长的情况…...
【Java】异常处理 之 使用SLF4J 和 Logback
使用SLF4J和Logback 前面介绍了Commons Logging 和Log4j 这一对好基友,它们一个负责充当日志 API,一个负责实现日志底层,搭配使用非常便于开发。 有的童鞋可能还听说过SLF4J和Logback。这两个东东看上去也像日志,它们又是啥&…...
C++11并发与多线程笔记 (1)
C11并发与多线程笔记(1) 1、并发、进程、线程的基本概念和综述1.1 并发1.2 可执行程序1.3 进程1.4 线程1.5 学习心得 2、并发的实现方法2.1 多进程并发2.2 多线程并发 3、C11新标准线程库 1、并发、进程、线程的基本概念和综述 1.1 并发 指在一个时间段…...
07_Hudi案例实战、Flink CDC 实时数据采集、Presto、FineBI 报表可视化等
7.第七章 Hudi案例实战 7.1 案例架构 7.2 业务数据 7.2.1 客户信息表 7.2.2 客户意向表 7.2.3 客户线索表 7.2.4 线索申诉表 7.2.5 客户访问咨询记录表 7.3 Flink CDC 实时数据采集 7.3.1 开启MySQL binlog 7.3.2 环境准备 7.3.3 实时采集数据 7.3.3.1 客户信息表 7.3.3.2 客户…...
ceph相关概念和部署
Ceph 可用于向云提供 Ceph 对象存储 平台和 Ceph 可用于提供 Ceph 块设备服务 到云平台。Ceph 可用于部署 Ceph 文件 系统。所有 Ceph 存储集群部署都从设置 每个 Ceph 节点,然后设置网络。 Ceph 存储集群需要满足以下条件:至少一个 Ceph 监控器&#x…...
Android Jetpack Compose 中的分页与缓存展示
Android Jetpack Compose 中的分页与缓存展示 在几乎任何类型的移动项目中,移动开发人员在某个时候都会处理分页数据。如果数据列表太大,无法一次从服务器检索完毕,这就是必需的。因此,我们的后端同事为我们提供了一个端点&#…...
无名管道 / 有名管道(FIFO)
根据上节所讲就可以了解到:管道其实就是实现进程间通讯IPC中的一种类型方法 基本概念(无名管道) 管道是一种最基本的IPC机制,通常指无名管道,也是UNIX系统IPC最古老的形式。管道只能作用于有血缘关系的进程之间&…...
Three.js纹理贴图
目录 Three.js入门 Three.js光源 Three.js阴影 Three.js纹理贴图 纹理是一种图像或图像数据,用于为物体的材质提供颜色、纹理、法线、位移等信息,从而实现更加逼真的渲染结果。 纹理可以应用于Three.js中的材质类型,如MeshBasicMaterial…...
1+X Web前端开发职业技能等级证书建设方案
一 、系统概述 1X Web前端开发技术是计算机类专业重要的核心课程,课程所包含的教学内容多,实践性强,并且相关技术更新快。传统的课堂讲授模式以教师为中心,学生被动式接收,难以调动学生学习的积极性和主动性。混合式教…...
Rx.NET in Action 第二章学习笔记
2 Hello, Rx 本章节涵盖的内容: 不使用Rx的工作方式向项目中添加Rx创建你的第一个Rx应用程序 Rx 的目标是协调和统筹来自社交网络、传感器、用户界面事件等不同来源的基于事件的异步计算。例如,建筑物周围的监控摄像头和移动传感器会在有人靠近建筑物时触发…...
【软件工程 | 模块耦合】什么是模块耦合及分类
概念 耦合(coupling)是对两个模块之间联接程度的一种度量。模块间的依赖程度越大,则其耦合程度也就越大; 反之,模块间的依赖程度越小,则其耦合程度也就越小。 很显然,为了使软件具有较好的可维护性和可修改性…...
OCT介绍和分类
前言:研究方向和OCT有关,为了方便以后回顾,所以整理了OCT相关的一些内容。 OCT介绍和分类 OCT介绍分类时域OCT频域OCT扫频OCT谱域OCT OCT介绍 名称:OCT、光学相干层析成像术、Optical Coherence Tomography。 概念:O…...
07-2_Qt 5.9 C++开发指南_二进制文件读写(stm和dat格式)
文章目录 1. 实例功能概述2. Qt预定义编码文件的读写2.1 保存为stm文件2.2 stm文件格式2.3 读取stm文件 3. 标准编码文件的读写3.1 保存为dat文件3.2 dat文件格式3.3 读取dat文件 4. 框架及源码4.1 可视化UI设计4.2 mainwindow.cpp 1. 实例功能概述 除了文本文件之外ÿ…...
SpringBoot复习:(41)配置文件中配置的server开头的属性是怎么配置到Servlet容器中起作用的?
ServletWebServerFactoryAutoConfiguration类: 可以看到其中使用了EnableConfigurationProperties导入了ServerProperties 而ServerProperties通过使用ConfigurationProperties注解导入了配置文件中已server开头的那些配置项。 可以看到ServletWebServerFactory定…...
GNU C扩展语法在嵌入式开发中的实战应用
1. GNU C扩展语法概述在嵌入式开发领域,GNU C编译器因其强大的扩展功能而广受欢迎。作为一名长期从事嵌入式开发的工程师,我发现这些扩展语法不仅能提高代码效率,还能解决许多标准C语言难以处理的场景问题。GNU C扩展主要包括以下几个重要特性…...
YOLOE开源镜像生产环境部署:YOLOE-v8m-seg在Docker Swarm集群实践
YOLOE开源镜像生产环境部署:YOLOE-v8m-seg在Docker Swarm集群实践 1. 引言:从单机到集群的跨越 如果你已经体验过YOLOE官版镜像在单台服务器上的强大能力,比如用文本描述就能识别图片里的任何物体,或者用一张示例图就能完成精准…...
OpenClaw多模态实践:千问3.5-27B图片理解+文件整理自动化
OpenClaw多模态实践:千问3.5-27B图片理解文件整理自动化 1. 为什么需要自动化图片管理 上周整理项目资料时,我发现桌面上散落着237张截图——有会议纪要片段、代码报错提示、参考文档关键页,甚至还有随手截的灵感草图。手动分类这些文件花了…...
未发表!25年顶级SCI算法SOO优化CNN-LSTM-Attention一键实现多步预测!多步预测全家桶更新啦!
目录 多步预测案例 多步预测教程 创新点与原理 ①创新点一:基于CNN-LSTM的多尺度特征联合提取架构 ②创新点二:融合SE通道注意力机制的自适应特征重标定策略 ③创新点三:基于SOO智能算法的超参数自适应寻优 结果展示 全家桶目录 获取…...
结构调整法降AI怎么做?4步把AI率从80%降到30%以内
结构调整法是把AI生成的“标准段落结构“打散重组,通过改变逻辑顺序来消除AI检测特征。原理上可行,但操作比翻译大法更复杂。 我用一篇8000字论文测试了完整流程,结论是:结构调整法效果不如专业工具稳定,但作为人工辅…...
Flutter鸿蒙应用开发:数据分享功能实现
🔥Flutter鸿蒙应用开发:数据分享功能实现(macOSDevEco Studio) 欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net 📄 文章摘要 本文为Flutter for OpenHarmony跨平台应用开发系列实…...
嵌入式开发中段错误的成因分析与GDB调试实战
1. 嵌入式软件段错误概述段错误(Segmentation Fault)是嵌入式开发中最令人头疼的运行时错误之一。作为一名在嵌入式领域摸爬滚打多年的工程师,我处理过的段错误案例不下百例。每次遇到这种错误,就像在漆黑的迷宫里寻找出口&#x…...
AI显微镜-Swin2SR惊艳效果展示:JPG噪点去除+边缘重构真实案例
AI显微镜-Swin2SR惊艳效果展示:JPG噪点去除边缘重构真实案例 1. 引言:当模糊图片遇见AI“脑补” 你有没有遇到过这种情况?翻出多年前的老照片,却发现它模糊不清,布满了马赛克和噪点;或者从网上下载了一张…...
别再只盯着输入了!时间序列预测中,被忽视的‘标签自相关’问题与FreDF解法
时间序列预测的盲区:标签自相关性如何悄悄破坏你的模型精度 想象一下,你花费数周时间调整模型架构、优化超参数,甚至尝试了最新的Transformer变体,但预测结果始终差强人意。问题可能并不出在你精心设计的输入特征工程上࿰…...
瑞斯康达Raisecom交换机VLAN与ERPS实战配置指南
1. 瑞斯康达交换机基础配置入门 第一次接触瑞斯康达交换机的朋友可能会被命令行界面吓到,其实它的操作逻辑和主流厂商设备非常相似。以Gazelle系列交换机为例,默认登录账号密码都是raisecom,这个设计对新手特别友好——至少不用像某些品牌设备…...
