tp6 RabbitMQ
1、composer 安装 AMQP 扩展
composer require php-amqplib/php-amqplib
2、RabbitMQ 配置
在 config 目录下创建 rabbitmq.php 文件
<?php
return ['host'=>'','port'=>'5672','user'=>'','password'=>'','vhost'=>'','exchange_name' => '','queue_name' => '','route_key' => '','consumer_tag' => '',
];
3、生产者代码
app目录下创建Producer.php
<?phpnamespace app;use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;class Producer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],'itcast');//创建通道$this->channel = $this->connection->channel();}public function send($data){/*** 创建队列(Queue)* name: hello // 队列名称* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: true // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失;设置true,则代表是一个持久化的队列,服务重启后也会存在,因为服务会把持久化的queue存放到磁盘上当服务重启的时候,会重新加载之前被持久化的queue* exclusive: false // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除**/$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);/*** 创建交换机(Exchange)* name: vckai_exchange// 交换机名称* type: direct // 交换机类型,分别为direct/fanout/topic,参考另外文章的Exchange Type说明。* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: false // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除*/$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);// 绑定消息交换机和队列$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'],$this->mq_config['route_key']);$messageBody = json_encode($data);//将要发送数据变为json字符串/*** 创建AMQP消息类型* delivery_mode 消息是否持久化* AMQPMessage::DELIVERY_MODE_NON_PERSISTENT 不持久化* AMQPMessage::DELIVERY_MODE_PERSISTENT 持久化*/$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));/*** 发送消息* msg: $message // AMQP消息内容* exchange: vckai_exchange // 交换机名称* routing_key: hello // 路由key*/$this->channel->basic_publish($message, $this->mq_config['exchange_name'], $this->mq_config['route_key']);//关闭连接$this->stop();}//关闭进程public function stop(){$this->channel->close();$this->connection->close();}}
4、消费者代码
app目录下创建Consumer.php
<?phpnamespace app;use app\index\controller\ApiCommunity;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use think\db\exception\PDOException;
use think\facade\Log;class Consumer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],$this->mq_config['vhost']);//创建通道$this->channel = $this->connection->channel();}/*** @param $channel* @param $connection* 关闭进程*/function shutdown($channel, $connection){$channel->close();$connection->close();}/*** @param $message* 消息处理*/function process_message($message){//消息处理逻辑echo $message->body . "\n";if ($message->body !== 'quit') {$obj = json_decode($message->body);if (!isset($obj->id)) {Log::write("error data:" . $message->body, 2);} else {try {Log::write("data:" . json_encode($message));//消息处理} catch (\Think\Exception $e) {Log::write($e->getMessage(), 2);Log::write(json_encode($message), 2);} catch (PDOException $pe) {Log::write($pe->getMessage(), 2);Log::write(json_encode($message), 2);}}}// 手动确认ack,确保消息已经处理$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);// Send a message with the string "quit" to cancel the consumer.if ($message->body === 'quit') {$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);}}/*** @throws \ErrorException* 启动** nohup php index.php index/Message_Consume/start &*/public function start(){// 设置消费者(Consumer)客户端同时只处理一条队列// 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。//消费者端要把自动确认autoAck设置为false,basic_qos才有效。//$this->channel->basic_qos(0, 1, false);// 同样是创建路由和队列,以及绑定路由队列,注意要跟producer(生产者)的一致// 这里其实可以不用设置,但是为了防止队列没有被创建所以做的容错处理$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'], $this->mq_config['route_key']);/**** queue: queue_name // 被消费的队列名称* consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端* no_local: false // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现* no_ack: true // 收到消息后,是否不需要回复确认即被认为被消费* exclusive: false // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下* nowait: false // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错* callback: $callback // 回调逻辑处理函数**/$this->channel->basic_consume($this->mq_config['queue_name'], $this->mq_config['consumer_tag'], false, false, false, false, array($this, 'process_message'));register_shutdown_function(array($this, 'shutdown'), $this->channel, $this->connection);while (count($this->channel->callbacks)) {$this->channel->wait();}}
}
5、创建自定义命令
php think make:command Consumer
在项目跟目录执行以下命令,会自动生成 在 command 目录生成 Consumer 控制器
<?php
declare (strict_types = 1);namespace app\command;use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class Consumer extends Command
{protected function configure(){// 指令配置$this->setName('consumer')->setDescription('the consumer command');}protected function execute(Input $input, Output $output){// 指令输出$output->writeln('consumer');$consumer = new \app\Consumer();
// $consumer->process_message(11)$consumer->start();}
}
config/console.php 代码增加如下:
// 指令定义
'commands' => ['consumer' => 'app\command\Consumer',
],
6、命令
消费者命令
php think consumer
生产者执行命令
$producer = new Producer();
$data = ['message' => "发送的消息内容"
];
$producer->send($data);
相关文章:
tp6 RabbitMQ
1、composer 安装 AMQP 扩展 composer require php-amqplib/php-amqplib 2、RabbitMQ 配置 在 config 目录下创建 rabbitmq.php 文件 <?php return [host>,port>5672,user>,password>,vhost>,exchange_name > ,queue_name > ,route_key > ,cons…...

java Spring Boot yml多环境拆分文件管理优化
上文 java Spring Boot yml多环境配置 我们讲了多环境开发 但这种东西都放在一起 还是非常容易暴露信息的 并且对维护来讲 也不是非常的友好 这里 我们在resources下创建三个文件 分别叫 application-pro.yml application-dev.yml application-test.yml 我们直接将三个环境 转…...

【设计模式——学习笔记】23种设计模式——状态模式State(原理讲解+应用场景介绍+案例介绍+Java代码实现)
文章目录 案例引入介绍基本介绍登场角色应用场景 案例实现案例一类图实现 案例二:借贷平台源码剖析传统方式实现分析状态修改流程类图实现 案例三:金库警报系统系统的运行逻辑伪代码传统实现方式使用状态模式 类图实现分析问题问题一问题二 总结文章说明…...

【LeetCode每日一题】——41.缺失的第一个正数
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 困难 三【题目编号】 41.缺失的第一个正数 四【题目描述】 给你一个…...
typedef函数代码段解释以及部分Windows下的系统函数
文章目录 1、typedef int (WINAPI* LPSDOLInitialize)(const SDOLAppInfo* pAppInfo)2、typedef int (WINAPI* LPSDOLGetModule)(REFIID riid, void** intf)3、typedef int (WINAPI* LPSDOLTerminal)();4、GetProcAddress运行时获取一个动态链接库(DLL)中…...

Typora常用手册
常用快捷键 加粗: Ctrl B 标题: Ctrl H 插入链接: Ctrl K 插入代码: Ctrl Shift C – 无法执行 行内代码: Ctrl Shift K 插入图片: Ctrl Shift I 无序列表:Ctrl Shift L – 无法执行…...

互联网发展历程:从网线不够长到中继器的引入
互联网,这个如今贯穿我们生活的无所不在的网络,其发展历程充满了无数的创新和变革。有一项看似不太起眼的技术却在互联网的发展中发挥着至关重要的作用,那就是中继器。本文将带您深入了解互联网的发展历程,探讨在网线不够长的情况…...
【Java】异常处理 之 使用SLF4J 和 Logback
使用SLF4J和Logback 前面介绍了Commons Logging 和Log4j 这一对好基友,它们一个负责充当日志 API,一个负责实现日志底层,搭配使用非常便于开发。 有的童鞋可能还听说过SLF4J和Logback。这两个东东看上去也像日志,它们又是啥&…...

C++11并发与多线程笔记 (1)
C11并发与多线程笔记(1) 1、并发、进程、线程的基本概念和综述1.1 并发1.2 可执行程序1.3 进程1.4 线程1.5 学习心得 2、并发的实现方法2.1 多进程并发2.2 多线程并发 3、C11新标准线程库 1、并发、进程、线程的基本概念和综述 1.1 并发 指在一个时间段…...

07_Hudi案例实战、Flink CDC 实时数据采集、Presto、FineBI 报表可视化等
7.第七章 Hudi案例实战 7.1 案例架构 7.2 业务数据 7.2.1 客户信息表 7.2.2 客户意向表 7.2.3 客户线索表 7.2.4 线索申诉表 7.2.5 客户访问咨询记录表 7.3 Flink CDC 实时数据采集 7.3.1 开启MySQL binlog 7.3.2 环境准备 7.3.3 实时采集数据 7.3.3.1 客户信息表 7.3.3.2 客户…...

ceph相关概念和部署
Ceph 可用于向云提供 Ceph 对象存储 平台和 Ceph 可用于提供 Ceph 块设备服务 到云平台。Ceph 可用于部署 Ceph 文件 系统。所有 Ceph 存储集群部署都从设置 每个 Ceph 节点,然后设置网络。 Ceph 存储集群需要满足以下条件:至少一个 Ceph 监控器&#x…...

Android Jetpack Compose 中的分页与缓存展示
Android Jetpack Compose 中的分页与缓存展示 在几乎任何类型的移动项目中,移动开发人员在某个时候都会处理分页数据。如果数据列表太大,无法一次从服务器检索完毕,这就是必需的。因此,我们的后端同事为我们提供了一个端点&#…...

无名管道 / 有名管道(FIFO)
根据上节所讲就可以了解到:管道其实就是实现进程间通讯IPC中的一种类型方法 基本概念(无名管道) 管道是一种最基本的IPC机制,通常指无名管道,也是UNIX系统IPC最古老的形式。管道只能作用于有血缘关系的进程之间&…...

Three.js纹理贴图
目录 Three.js入门 Three.js光源 Three.js阴影 Three.js纹理贴图 纹理是一种图像或图像数据,用于为物体的材质提供颜色、纹理、法线、位移等信息,从而实现更加逼真的渲染结果。 纹理可以应用于Three.js中的材质类型,如MeshBasicMaterial…...

1+X Web前端开发职业技能等级证书建设方案
一 、系统概述 1X Web前端开发技术是计算机类专业重要的核心课程,课程所包含的教学内容多,实践性强,并且相关技术更新快。传统的课堂讲授模式以教师为中心,学生被动式接收,难以调动学生学习的积极性和主动性。混合式教…...

Rx.NET in Action 第二章学习笔记
2 Hello, Rx 本章节涵盖的内容: 不使用Rx的工作方式向项目中添加Rx创建你的第一个Rx应用程序 Rx 的目标是协调和统筹来自社交网络、传感器、用户界面事件等不同来源的基于事件的异步计算。例如,建筑物周围的监控摄像头和移动传感器会在有人靠近建筑物时触发…...
【软件工程 | 模块耦合】什么是模块耦合及分类
概念 耦合(coupling)是对两个模块之间联接程度的一种度量。模块间的依赖程度越大,则其耦合程度也就越大; 反之,模块间的依赖程度越小,则其耦合程度也就越小。 很显然,为了使软件具有较好的可维护性和可修改性…...

OCT介绍和分类
前言:研究方向和OCT有关,为了方便以后回顾,所以整理了OCT相关的一些内容。 OCT介绍和分类 OCT介绍分类时域OCT频域OCT扫频OCT谱域OCT OCT介绍 名称:OCT、光学相干层析成像术、Optical Coherence Tomography。 概念:O…...

07-2_Qt 5.9 C++开发指南_二进制文件读写(stm和dat格式)
文章目录 1. 实例功能概述2. Qt预定义编码文件的读写2.1 保存为stm文件2.2 stm文件格式2.3 读取stm文件 3. 标准编码文件的读写3.1 保存为dat文件3.2 dat文件格式3.3 读取dat文件 4. 框架及源码4.1 可视化UI设计4.2 mainwindow.cpp 1. 实例功能概述 除了文本文件之外ÿ…...

SpringBoot复习:(41)配置文件中配置的server开头的属性是怎么配置到Servlet容器中起作用的?
ServletWebServerFactoryAutoConfiguration类: 可以看到其中使用了EnableConfigurationProperties导入了ServerProperties 而ServerProperties通过使用ConfigurationProperties注解导入了配置文件中已server开头的那些配置项。 可以看到ServletWebServerFactory定…...

Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...

Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

c#开发AI模型对话
AI模型 前面已经介绍了一般AI模型本地部署,直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型,但是目前国内可能使用不多,至少实践例子很少看见。开发训练模型就不介绍了&am…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】
大家好,我是java1234_小锋老师,看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】,分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...
Oracle11g安装包
Oracle 11g安装包 适用于windows系统,64位 下载路径 oracle 11g 安装包...