记一次hyperf框架封装swoole自定义进程
背景
公司准备引入swoole和rabbitmq来处理公司业务。因此,我引入hyperf框架,想用swoole的多进程来实现。
自定义启动服务封装
<?php
/*** 进程启动服务【manager】*/
declare(strict_types=1);namespace App\Command;use Swoole;
use Swoole\Process;
use Swoole\Process\Pool;
use App\Process\BaseProcess;
use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;/*** @Command*/
#[Command]
class TaskProcessCommand extends HyperfCommand
{const MANAGER_PROCESS_PID_PATH = BASE_PATH . '/runtime/taskProcess.pid';/*** @var ContainerInterface*/protected $container;protected $coroutine = false;public function __construct(ContainerInterface $container){$this->container = $container;parent::__construct('task');}public function configure(){parent::configure();$this->setDescription('自定义进程任务');$this->addOption('daemonize', 'd', InputOption::VALUE_NONE, '守护进程化');$this->addArgument('action', InputArgument::REQUIRED, 'start/stop/restart 启动/关闭/重启');}public function handle(){$action = $this->input->getArgument('action');if ($action === 'start') {$this->start();} elseif ($action === 'stop') {$this->stop();} elseif ($action === 'restart') {$this->restart();} else {echo "不支持的action, 请输入 -h 参数查看" . PHP_EOL;}}/*** 重启:php bin/hyperf.php task restart*/protected function restart(){$this->stop();$this->start();}/*** 停止:php bin/hyperf.php task stop*/protected function stop(){if (file_exists(self::MANAGER_PROCESS_PID_PATH)) {//后期可以写入数据表,根据状态进行重启$managerPid = file_get_contents(self::MANAGER_PROCESS_PID_PATH);echo "stopping...\n";echo "kill pid $managerPid \n";$managerPid = intval($managerPid);$startTime = time();$timeout = config('server.settings.max_wait_time', 10);@Process::kill($managerPid);//等待主进程结束while (@Process::kill($managerPid, 0)) {//waiting process stopecho "waiting...\r";usleep(100000);echo " \r";echo "waiting.\r";usleep(100000);echo " \r";//超时 强杀所有子进程if ($managerPid > 0 && time() - $startTime >= $timeout) {echo "wait timeout, kill -9 child process, pid: $managerPid \n";echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/'") . PHP_EOL;echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/ {print $2}'|xargs kill -9") . PHP_EOL;}}unlink(self::MANAGER_PROCESS_PID_PATH);echo "stopped. \n";} else {echo "找不到manager pid, path: " . self::MANAGER_PROCESS_PID_PATH;}}/*** 启动:php bin/hyperf.php task start* 守护进程启动:php bin/hyperf.php task start -d*/protected function start(){$processConfig = config('processes');if ($processConfig) {echo "start now.\n";$daemonize = $this->input->getOption('daemonize');if ($daemonize) {//重定向标准输出到指定日志文件fclose(STDOUT);fclose(STDERR);$STDOUT = fopen(BASE_PATH . '/runtime/logs/taskProcess_output.log', 'ab');$STDERR = fopen(BASE_PATH . '/runtime/logs/taskProcess_error.log', 'ab');Process::daemon(true, true);}//save pidfile_put_contents(self::MANAGER_PROCESS_PID_PATH, getmypid());//TODO 后期可以根据需要写入配置或者数据表,开启多个主进程、挂载多个子进程BaseProcess::setProcessName('manager');//主进程$startFuncMap = [];foreach ($processConfig as $processClass) {$processObj = new $processClass;if ($processObj->isEnable && ($processObj instanceof BaseProcess) && isset($processObj->nums) && $processObj->nums > 0) {for ($i = 0; $i < $processObj->nums; $i++) {$startFuncMap[] = [[$processObj, 'handle'],$processObj->enableCoroutine ?? false,$i,];}}}$pool = new Pool(count($startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, false);$pool->on('workerStart', function (Pool $pool, int $workerId) use ($startFuncMap) {[$func, $enableCoroutine, $idx] = $startFuncMap[$workerId];if ($enableCoroutine) {run(function () use ($func, $pool, $workerId, $idx) {$pm = $func[0];//process下的类$idx += 1;BaseProcess::setProcessName($pm->name . "[{$idx}/{$pm->nums}]");//多个子进程call_user_func($func, $pool, $workerId);});} else {$func($pool, $workerId);//baseProcess下的handle}});$pool->on('Message', function (Swoole\Process\Pool $pool, string $data) {echo 'process Message,data=' .json_encode($data). PHP_EOL;});//进程关闭$pool->on("WorkerStop", function (Swoole\Process\Pool $pool, int $workerId) {echo "process WorkerId={$workerId} is stopped". PHP_EOL;});$pool->start();} else {printf("没有可启动的自定义进程, 请在配置task_process中声明,且继承%s\n", BaseProcess::class);}}/*** 查看运行状态:php bin/hyperf.php task status*/protected function status(){//TODO 查看任务执行状态}public function getProcess($pid = -1){if ($pid === -1) {$pid = getmypid();}return static::$process[$pid] ?? null;}public function getAllProcess(){return static::$process;}
}
基础process封装
此处可以用hyperf框架自带的,也可以自己封装
<?phpdeclare (strict_types = 1);namespace App\Process;use Swoole;
use Swoole\Process\Pool;abstract class BaseProcess {/*** 进程数* @var integer*/public $nums = 0;/*** 进程名称* @var string*/public $name = '';/*** 是否启用协程* @var bool*/public $enableCoroutine = true;/*** 是否随进程启动服务* @var bool*/public $isEnable = true;protected $isRunning = true;protected $process;static $signal = 0;function __construct() {//进程自动命名if (empty($this->name)) {$this->name = trim(str_replace('\\', '.', str_replace(__NAMESPACE__, '', get_called_class())), '.');}}final public function handle(Pool $pool, int $workerId): void {try {$this->processInit($pool->getProcess());$this->beforeRun();while (true) {//进程结束信号if (BaseProcess::$signal === SIGTERM) {$this->onProcessExit();break;}$this->run();}} catch (\Throwable $e) {throw $e;}}protected function onProcessExit() {$this->isRunning = false;}protected function processInit($process) {$this->process = $process;echo "process {$this->name} start, pid: " . getmypid().PHP_EOL;//注册信号处理器,实现优雅重启(等待任务执行完后或者等待超时)pcntl_signal(SIGTERM, function () {BaseProcess::$signal = SIGTERM;$maxWaitTime = config('server.settings.max_wait_time', 5);$sTime = time();//检查进程任务状态Swoole\Timer::tick(500, function () use ($sTime, $maxWaitTime) {$coStat = \Swoole\Coroutine::stats();//如果主循环结束,且其它协程任务执行完,清理定时器以退出进程if (!$this->isRunning && $coStat['coroutine_num'] <= 1) {Swoole\Timer::clearAll();$this->process->exit();}//等待超时,强制结束进程elseif (time() - $sTime >= $maxWaitTime) {Swoole\Timer::clearAll();if ($this->isRunning) {$this->onProcessExit();}$this->process->exit();}});});}public static function setProcessName(string $name) {swoole_set_process_name(env('APP_NAME', 'app') . '.taskProcess.' . $name);}/*** 事件循环前调用* @return [type] [description]*/abstract function beforeRun();/*** 事件循环,注意这里不能使用死循环* @return [type] [description]*/abstract function run();}
使用demo
demo1
<?phpdeclare (strict_types = 1);namespace App\Process;/*** test*/
class TestProcess extends BaseProcess {/*** 进程数* @var integer*/public $nums = 5;public $enableCoroutine = true;/*** 不随服务启动进程* @var bool */public $isEnable = false;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体echo date('Y-m-d H:i:s').PHP_EOL;usleep(1000);}}
demo2
<?phpnamespace App\Process;use App\Amqp\Producer\JbtyProducer;
use App\Amqp\Producer\UpdateZeroStockProducer;
use App\Library\Jbchip\JbchipRequest;
use App\Model\HqchipGoodsModel;
use App\Model\IcbaseGoodsModel;
use App\Model\JbtyGoodsModel;
use App\Model\LcscGoodsModel;
use App\Model\OneyacGoodsModel;
use Hyperf\Amqp\Producer;
use Hyperf\Redis\Redis;
use Hyperf\Utils\ApplicationContext;class UpdateZeroStock extends BaseProcess
{const ZERO_STOCK_KEY = 'platform_zero_stock_cache_key';/*** 进程数* @var integer*/public $nums = 1;public $enableCoroutine = true;/*** 随服务启动进程* @var bool */public $isEnable=true;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体$this->updateZeroStock();echo date('Y-m-d H:i:s').PHP_EOL;sleep(300);}public function updateZeroStock(){// 1.全量更新$list_hq = HqchipGoodsModel::select(['id','spu','stock','manufacturer'])->where('excute_time','<',8)->limit(1000)->get();$container = ApplicationContext::getContainer();$redis = $container->get(Redis::class);$producer = ApplicationContext::getContainer()->get(Producer::class);$today = date('Y-m-d');if($list_hq){foreach ($list_hq as $item){$spu = trim($item['spu']);$zeroStockKey = $this->getZeroStockKey($today,'hqchip',$item['manufacturer']);if($redis->exists($zeroStockKey) && !$redis->hGet($zeroStockKey,$spu)){$sendData = $item;$sendData['appKey'] = $this->appSecretKey();$sendData['platform'] = 'hqchip';$message = new UpdateZeroStockProducer($sendData);$res = $producer->produce($message);echo date('Y-m-d H:i:s') . 'rabbitmq hqchip sendMq: ' .$res . PHP_EOL;}}}}/*** 零库存缓存KEY* @param $brand* @param $sku* @return string*/private function getZeroStockKey($day,$platfrom,$brand){return self::ZERO_STOCK_KEY .":". $platfrom .":" . $day . ":" . $brand;}/*** 密钥生产* @return string*/private function appSecretKey(){$a = 'chipmall-spider&V2&' . date('Y-m-d');$appKey = base64_encode(md5($a) .'||'. base64_encode(time() . '|' . $a));return $appKey;}
}
在配置中进程需要执行的服务

以守护进程方式启动服务
php bin/hyperf.php task start -d

查看进程命令
ps -ef|grep taskProcess

疑惑
这次封装还存在两个点需要完善!!!
1.重复执行:
php bin/hyperf.php task start -d
会启动多个manager进程
2、没有封装查看进程状态的status方法
相关文章:
记一次hyperf框架封装swoole自定义进程
背景 公司准备引入swoole和rabbitmq来处理公司业务。因此,我引入hyperf框架,想用swoole的多进程来实现。 自定义启动服务封装 <?php /*** 进程启动服务【manager】*/ declare(strict_types1);namespace App\Command;use Swoole; use Swoole\Proce…...
多输入多输出 | MATLAB实现GA-BP遗传算法优化BP神经网络多输入多输出
多输入多输出 | MATLAB实现GA-BP遗传算法优化BP神经网络多输入多输出 目录 多输入多输出 | MATLAB实现GA-BP遗传算法优化BP神经网络多输入多输出预测效果基本介绍程序设计往期精彩参考资料 预测效果 基本介绍 多输入多输出 | MATLAB实现GA-BP遗传算法优化BP神经网络多输入多输出…...
李宏毅机器学习笔记-transformer
transformer是什么呢?是一个seq2seq的model。具体应用如上图所示,输入和输出的序列长度不固定,由model自己决定。 语音翻译指的是,直接输入一段语音信号,例如英文,输出的直接是翻译之后的中文。 seq2seq如…...
基于Java的酒店管理系统
博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…...
Go语言的单元测试与基准测试详解
文章目录 单元测试基准测试 单元测试 以一个加法函数为例,对其进行单元测试。 首先编写add.go文件: //add.go package mainfunc add(a, b int) int {return a b }其次编写add_test.go文件,在go语言中,测试文件均已_test结尾&a…...
【多态】为什么析构函数的名称统一处理为destructor?
析构函数的名称统一处理为destructor的目的是为了解决析构函数的重写。 而这又引出了一个问题:为什么要进行析构函数的重写? 是为了下面这种情况: class Person { public:~Person() { cout << "~Person" << endl; } }…...
6.4 Case Studies - A Simple Logging Archive Class
下面这段内容介绍了一个示例,目的是帮助澄清"归档概念(Archive Concept)"的用法,以便用户可以实现自己的归档类。simple_log_archive.hpp 实现了一个简单但实用的归档类,用于将任何可序列化类型以可读的格式…...
【深度学习实验】前馈神经网络(九):整合训练、评估、预测过程(Runner)
目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. __init__(初始化) 2. train(训练) 3. evaluate(评估) 4. predict(预测) 5. save_model 6. load_model 7. 代码整合 一、实验介绍 二、实验环境 本系列实验使用…...
002-第一代硬件系统架构确立及产品选型
第一代硬件系统架构确立及产品选型 文章目录 第一代硬件系统架构确立及产品选型项目介绍摘要硬件架构硬件结构选型及设计单片机选型上位机选型扯点别的 关键字: Qt、 Qml、 信号采集机、 数据处理、 上位机 项目介绍 欢迎来到我们的 QML & C 项目ÿ…...
Go基础语法:指针和make和new
8 指针、make、new 8.1 指针(pointer) Go 语言中没有指针操作,只需要记住两个符号即可: & 取内存地址* 根据地址取值 package mainimport "fmt"func main() {a : 18// 获取 a 的地址值并复制给 pp : &a// …...
039_小驰私房菜_Camera perfermance debug
全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 一、抓取trace 1. adb shell "echo vendor.debug.trace.perf=1 >> /system/build.prop" 2. …...
Caché for Windows安装及配置
本文介绍在Windows上安装Cach的操作步骤。本文假设用户熟悉Windows目录结构、实用程序和命令。本文包含如下主要部分: 1)Cach安装...
代码随想录算法训练营20期|第四十六天|动态规划part08|● 139.单词拆分 ● 关于多重背包,你该了解这些! ● 背包问题总结篇!
139.单词拆分 感觉这个板块要重新刷,完全没有印象 class Solution {public boolean wordBreak(String s, List<String> wordDict) {Set<String> set new HashSet<>(wordDict);boolean[] dp new boolean[s.length() 1];dp[0] true;for (int i…...
系统安装(一)CentOS 7 本地安装
CentOS与Ubuntu并称为Linux最著名的两个发行版,但由于笔者主要从事深度学习图像算法工作,Ubuntu作为谷歌和多数依赖库的亲儿子占据着最高生态位。但最近接手的一个项目里,甲方指定需要在CentOS7上运行项目代码,笔者被迫小小cos了一…...
obsidian使用指南
插入代码块快捷键设置 插入代码块 用英文搜索快捷键名字 英文搜索的【Insert code block】对应的是 (6个点) 中文搜索的【代码块】对应的是 (2个点) 查看word、excel等非md文件设置 电脑端obsidian->设置->文件与链接->检测所有类型文件->…...
【ardunio】青少年机器人四级实操代码(2023年9月)
目录 一、题目 二、示意图 三、流程图 四、硬件连接 1、舵机 2、超声波 3、LED灯 五、程序 一、题目 实操考题(共1题,共100分) 1. 主题: 迎宾机器人 器件:Atmega328P主控板1块,舵机1个,超声波传感器1个&…...
MYSQL的存储过程
存储过程 存储过程是事先经过编译并存储在数据库中的一段 SQL 语句的集合,调用存储过程可以简化应用开发人员的很多工作,减少数据在数据库和应用服务器之间的传输,对于提高数据处理的效率是有好处的。存储过程思想上很简单,就是…...
[kubernetes/docker] failed to resolve reference ...:latest: not found
问题描述: pod一直pending, kubectl describe pod ... 显示: Warning Failed 9s (x3 over 63s) kubelet Failed to pull image "mathemagics/my-kube-scheduler": rpc error: code NotFound desc failed to pull and unpack image "docker…...
彻底解决win11系统0x80070032
经过各种尝试,终于找到原因。第一个是电脑加密软件,第二个是需要的部分功能没有开启,第三个BIOS设置。个人觉得第三个不重要。 解决方法 笔记本型号 笔记本型号是Thinkpad T14 gen2。进入BIOS的按键是按住Enter键。 1、关闭山丽防水墙服务…...
解决因为修改SELINUX配置文件出错导致Faild to load SELinux poilcy无法进入CentOS7系统的问题
一、问题 最近学习Kubernetes,需要设置永久关闭SELINUX,结果修改错了一个SELINUX配置参数,关机重新启动后导致无法进入CentOS7系统,卡在启动进度条界面。 二、解决 多次重启后,在启动日志中发现 Faild to load SELinux poilcy…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
UDP(Echoserver)
网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...
linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
CMake控制VS2022项目文件分组
我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...
音视频——I2S 协议详解
I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议,专门用于在数字音频设备之间传输数字音频数据。它由飞利浦(Philips)公司开发,以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...
RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...
MySQL JOIN 表过多的优化思路
当 MySQL 查询涉及大量表 JOIN 时,性能会显著下降。以下是优化思路和简易实现方法: 一、核心优化思路 减少 JOIN 数量 数据冗余:添加必要的冗余字段(如订单表直接存储用户名)合并表:将频繁关联的小表合并成…...
接口自动化测试:HttpRunner基础
相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具,支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议,涵盖接口测试、性能测试、数字体验监测等测试类型…...
【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案
目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后,迭代器会失效,因为顺序迭代器在内存中是连续存储的,元素删除后,后续元素会前移。 但一些场景中,我们又需要在执行删除操作…...
