当前位置: 首页 > news >正文

tp6 RabbitMQ

1、composer 安装 AMQP 扩展
composer require php-amqplib/php-amqplib
2、RabbitMQ 配置

 在 config 目录下创建 rabbitmq.php 文件

<?php
return ['host'=>'','port'=>'5672','user'=>'','password'=>'','vhost'=>'','exchange_name' => '','queue_name' => '','route_key' => '','consumer_tag' => '',
];
3、生产者代码

app目录下创建Producer.php

<?phpnamespace app;use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;class Producer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],'itcast');//创建通道$this->channel = $this->connection->channel();}public function send($data){/*** 创建队列(Queue)* name: hello         // 队列名称* passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: true       // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失;设置true,则代表是一个持久化的队列,服务重启后也会存在,因为服务会把持久化的queue存放到磁盘上当服务重启的时候,会重新加载之前被持久化的queue* exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除**/$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);/*** 创建交换机(Exchange)* name: vckai_exchange// 交换机名称* type: direct        // 交换机类型,分别为direct/fanout/topic,参考另外文章的Exchange Type说明。* passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: false      // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失* auto_delete: false  // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除*/$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);// 绑定消息交换机和队列$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'],$this->mq_config['route_key']);$messageBody = json_encode($data);//将要发送数据变为json字符串/*** 创建AMQP消息类型* delivery_mode 消息是否持久化* AMQPMessage::DELIVERY_MODE_NON_PERSISTENT  不持久化* AMQPMessage::DELIVERY_MODE_PERSISTENT      持久化*/$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));/*** 发送消息* msg: $message            // AMQP消息内容* exchange: vckai_exchange // 交换机名称* routing_key: hello       // 路由key*/$this->channel->basic_publish($message, $this->mq_config['exchange_name'], $this->mq_config['route_key']);//关闭连接$this->stop();}//关闭进程public function stop(){$this->channel->close();$this->connection->close();}}
4、消费者代码

app目录下创建Consumer.php

<?phpnamespace app;use app\index\controller\ApiCommunity;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use think\db\exception\PDOException;
use think\facade\Log;class Consumer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],$this->mq_config['vhost']);//创建通道$this->channel = $this->connection->channel();}/*** @param $channel* @param $connection* 关闭进程*/function shutdown($channel, $connection){$channel->close();$connection->close();}/*** @param $message* 消息处理*/function process_message($message){//消息处理逻辑echo $message->body . "\n";if ($message->body !== 'quit') {$obj = json_decode($message->body);if (!isset($obj->id)) {Log::write("error data:" . $message->body, 2);} else {try {Log::write("data:" . json_encode($message));//消息处理} catch (\Think\Exception  $e) {Log::write($e->getMessage(), 2);Log::write(json_encode($message), 2);} catch (PDOException $pe) {Log::write($pe->getMessage(), 2);Log::write(json_encode($message), 2);}}}// 手动确认ack,确保消息已经处理$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);// Send a message with the string "quit" to cancel the consumer.if ($message->body === 'quit') {$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);}}/*** @throws \ErrorException* 启动** nohup php index.php index/Message_Consume/start &*/public function start(){// 设置消费者(Consumer)客户端同时只处理一条队列// 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。//消费者端要把自动确认autoAck设置为false,basic_qos才有效。//$this->channel->basic_qos(0, 1, false);// 同样是创建路由和队列,以及绑定路由队列,注意要跟producer(生产者)的一致// 这里其实可以不用设置,但是为了防止队列没有被创建所以做的容错处理$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'], $this->mq_config['route_key']);/**** queue: queue_name    // 被消费的队列名称* consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端* no_local: false      // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现* no_ack: true         // 收到消息后,是否不需要回复确认即被认为被消费* exclusive: false     // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下* nowait: false        // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错* callback: $callback  // 回调逻辑处理函数**/$this->channel->basic_consume($this->mq_config['queue_name'], $this->mq_config['consumer_tag'], false, false, false, false, array($this, 'process_message'));register_shutdown_function(array($this, 'shutdown'), $this->channel, $this->connection);while (count($this->channel->callbacks)) {$this->channel->wait();}}
}
5、创建自定义命令
php think make:command Consumer

在项目跟目录执行以下命令,会自动生成 在 command 目录生成 Consumer 控制器 

<?php
declare (strict_types = 1);namespace app\command;use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class Consumer extends Command
{protected function configure(){// 指令配置$this->setName('consumer')->setDescription('the consumer command');}protected function execute(Input $input, Output $output){// 指令输出$output->writeln('consumer');$consumer = new \app\Consumer();
//        $consumer->process_message(11)$consumer->start();}
}

config/console.php 代码增加如下:

// 指令定义
'commands' => ['consumer' => 'app\command\Consumer',
],
6、命令

消费者命令

php think consumer

 生产者执行命令

$producer = new Producer();
$data = ['message' => "发送的消息内容"
];
$producer->send($data);

相关文章:

tp6 RabbitMQ

1、composer 安装 AMQP 扩展 composer require php-amqplib/php-amqplib 2、RabbitMQ 配置 在 config 目录下创建 rabbitmq.php 文件 <?php return [host>,port>5672,user>,password>,vhost>,exchange_name > ,queue_name > ,route_key > ,cons…...

java Spring Boot yml多环境拆分文件管理优化

上文 java Spring Boot yml多环境配置 我们讲了多环境开发 但这种东西都放在一起 还是非常容易暴露信息的 并且对维护来讲 也不是非常的友好 这里 我们在resources下创建三个文件 分别叫 application-pro.yml application-dev.yml application-test.yml 我们直接将三个环境 转…...

【设计模式——学习笔记】23种设计模式——状态模式State(原理讲解+应用场景介绍+案例介绍+Java代码实现)

文章目录 案例引入介绍基本介绍登场角色应用场景 案例实现案例一类图实现 案例二&#xff1a;借贷平台源码剖析传统方式实现分析状态修改流程类图实现 案例三&#xff1a;金库警报系统系统的运行逻辑伪代码传统实现方式使用状态模式 类图实现分析问题问题一问题二 总结文章说明…...

【LeetCode每日一题】——41.缺失的第一个正数

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 困难 三【题目编号】 41.缺失的第一个正数 四【题目描述】 给你一个…...

typedef函数代码段解释以及部分Windows下的系统函数

文章目录 1、typedef int (WINAPI* LPSDOLInitialize)(const SDOLAppInfo* pAppInfo)2、typedef int (WINAPI* LPSDOLGetModule)(REFIID riid, void** intf)3、typedef int (WINAPI* LPSDOLTerminal)();4、GetProcAddress运行时获取一个动态链接库&#xff08;DLL&#xff09;中…...

Typora常用手册

常用快捷键 加粗&#xff1a; Ctrl B 标题&#xff1a; Ctrl H 插入链接&#xff1a; Ctrl K 插入代码&#xff1a; Ctrl Shift C – 无法执行 行内代码&#xff1a; Ctrl Shift K 插入图片&#xff1a; Ctrl Shift I 无序列表&#xff1a;Ctrl Shift L – 无法执行…...

互联网发展历程:从网线不够长到中继器的引入

互联网&#xff0c;这个如今贯穿我们生活的无所不在的网络&#xff0c;其发展历程充满了无数的创新和变革。有一项看似不太起眼的技术却在互联网的发展中发挥着至关重要的作用&#xff0c;那就是中继器。本文将带您深入了解互联网的发展历程&#xff0c;探讨在网线不够长的情况…...

【Java】异常处理 之 使用SLF4J 和 Logback

使用SLF4J和Logback 前面介绍了Commons Logging 和Log4j 这一对好基友&#xff0c;它们一个负责充当日志 API&#xff0c;一个负责实现日志底层&#xff0c;搭配使用非常便于开发。 有的童鞋可能还听说过SLF4J和Logback。这两个东东看上去也像日志&#xff0c;它们又是啥&…...

C++11并发与多线程笔记 (1)

C11并发与多线程笔记&#xff08;1&#xff09; 1、并发、进程、线程的基本概念和综述1.1 并发1.2 可执行程序1.3 进程1.4 线程1.5 学习心得 2、并发的实现方法2.1 多进程并发2.2 多线程并发 3、C11新标准线程库 1、并发、进程、线程的基本概念和综述 1.1 并发 指在一个时间段…...

07_Hudi案例实战、Flink CDC 实时数据采集、Presto、FineBI 报表可视化等

7.第七章 Hudi案例实战 7.1 案例架构 7.2 业务数据 7.2.1 客户信息表 7.2.2 客户意向表 7.2.3 客户线索表 7.2.4 线索申诉表 7.2.5 客户访问咨询记录表 7.3 Flink CDC 实时数据采集 7.3.1 开启MySQL binlog 7.3.2 环境准备 7.3.3 实时采集数据 7.3.3.1 客户信息表 7.3.3.2 客户…...

ceph相关概念和部署

Ceph 可用于向云提供 Ceph 对象存储 平台和 Ceph 可用于提供 Ceph 块设备服务 到云平台。Ceph 可用于部署 Ceph 文件 系统。所有 Ceph 存储集群部署都从设置 每个 Ceph 节点&#xff0c;然后设置网络。 Ceph 存储集群需要满足以下条件&#xff1a;至少一个 Ceph 监控器&#x…...

Android Jetpack Compose 中的分页与缓存展示

Android Jetpack Compose 中的分页与缓存展示 在几乎任何类型的移动项目中&#xff0c;移动开发人员在某个时候都会处理分页数据。如果数据列表太大&#xff0c;无法一次从服务器检索完毕&#xff0c;这就是必需的。因此&#xff0c;我们的后端同事为我们提供了一个端点&#…...

无名管道 / 有名管道(FIFO)

根据上节所讲就可以了解到&#xff1a;管道其实就是实现进程间通讯IPC中的一种类型方法 基本概念&#xff08;无名管道&#xff09; 管道是一种最基本的IPC机制&#xff0c;通常指无名管道&#xff0c;也是UNIX系统IPC最古老的形式。管道只能作用于有血缘关系的进程之间&…...

Three.js纹理贴图

目录 Three.js入门 Three.js光源 Three.js阴影 Three.js纹理贴图 纹理是一种图像或图像数据&#xff0c;用于为物体的材质提供颜色、纹理、法线、位移等信息&#xff0c;从而实现更加逼真的渲染结果。 纹理可以应用于Three.js中的材质类型&#xff0c;如MeshBasicMaterial…...

1+X Web前端开发职业技能等级证书建设方案

一 、系统概述 1X Web前端开发技术是计算机类专业重要的核心课程&#xff0c;课程所包含的教学内容多&#xff0c;实践性强&#xff0c;并且相关技术更新快。传统的课堂讲授模式以教师为中心&#xff0c;学生被动式接收&#xff0c;难以调动学生学习的积极性和主动性。混合式教…...

Rx.NET in Action 第二章学习笔记

2 Hello, Rx 本章节涵盖的内容: 不使用Rx的工作方式向项目中添加Rx创建你的第一个Rx应用程序 Rx 的目标是协调和统筹来自社交网络、传感器、用户界面事件等不同来源的基于事件的异步计算。例如&#xff0c;建筑物周围的监控摄像头和移动传感器会在有人靠近建筑物时触发&#xf…...

【软件工程 | 模块耦合】什么是模块耦合及分类

概念 耦合(coupling)是对两个模块之间联接程度的一种度量。模块间的依赖程度越大&#xff0c;则其耦合程度也就越大&#xff1b; 反之&#xff0c;模块间的依赖程度越小&#xff0c;则其耦合程度也就越小。 很显然&#xff0c;为了使软件具有较好的可维护性和可修改性&#xf…...

OCT介绍和分类

前言&#xff1a;研究方向和OCT有关&#xff0c;为了方便以后回顾&#xff0c;所以整理了OCT相关的一些内容。 OCT介绍和分类 OCT介绍分类时域OCT频域OCT扫频OCT谱域OCT OCT介绍 名称&#xff1a;OCT、光学相干层析成像术、Optical Coherence Tomography。 概念&#xff1a;O…...

07-2_Qt 5.9 C++开发指南_二进制文件读写(stm和dat格式)

文章目录 1. 实例功能概述2. Qt预定义编码文件的读写2.1 保存为stm文件2.2 stm文件格式2.3 读取stm文件 3. 标准编码文件的读写3.1 保存为dat文件3.2 dat文件格式3.3 读取dat文件 4. 框架及源码4.1 可视化UI设计4.2 mainwindow.cpp 1. 实例功能概述 除了文本文件之外&#xff…...

SpringBoot复习:(41)配置文件中配置的server开头的属性是怎么配置到Servlet容器中起作用的?

ServletWebServerFactoryAutoConfiguration类&#xff1a; 可以看到其中使用了EnableConfigurationProperties导入了ServerProperties 而ServerProperties通过使用ConfigurationProperties注解导入了配置文件中已server开头的那些配置项。 可以看到ServletWebServerFactory定…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

Appium+python自动化(十六)- ADB命令

简介 Android 调试桥(adb)是多种用途的工具&#xff0c;该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具&#xff0c;其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利&#xff0c;如安装和调试…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

【机器视觉】单目测距——运动结构恢复

ps&#xff1a;图是随便找的&#xff0c;为了凑个封面 前言 在前面对光流法进行进一步改进&#xff0c;希望将2D光流推广至3D场景流时&#xff0c;发现2D转3D过程中存在尺度歧义问题&#xff0c;需要补全摄像头拍摄图像中缺失的深度信息&#xff0c;否则解空间不收敛&#xf…...

关于uniapp展示PDF的解决方案

在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项&#xff1a; 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库&#xff1a; npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...

MySQL:分区的基本使用

目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区&#xff08;Partitioning&#xff09;是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分&#xff08;分区&#xff09;可以独立存储、管理和优化&#xff0c;…...

手机平板能效生态设计指令EU 2023/1670标准解读

手机平板能效生态设计指令EU 2023/1670标准解读 以下是针对欧盟《手机和平板电脑生态设计法规》(EU) 2023/1670 的核心解读&#xff0c;综合法规核心要求、最新修正及企业合规要点&#xff1a; 一、法规背景与目标 生效与强制时间 发布于2023年8月31日&#xff08;OJ公报&…...

MySQL的pymysql操作

本章是MySQL的最后一章&#xff0c;MySQL到此完结&#xff0c;下一站Hadoop&#xff01;&#xff01;&#xff01; 这章很简单&#xff0c;完整代码在最后&#xff0c;详细讲解之前python课程里面也有&#xff0c;感兴趣的可以往前找一下 一、查询操作 我们需要打开pycharm …...

《Docker》架构

文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器&#xff0c;docker&#xff0c;镜像&#xff0c;k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...