当前位置: 首页 > article >正文

php方案 Beanstalkd

安装 composer require pda/pheanstalk monolog/monolog---项目结构 src/├── Queue/│ ├── QueueManager.php # 队列管理器 │ ├── JobDispatcher.php # 任务派发 │ └── WorkerPool.php # Worker 管理 ├── Jobs/│ ├── JobInterface.php # 任务接口 │ ├── EmailJob.php # 发邮件任务 │ ├── ImageResizeJob.php # 图片处理任务 │ └── ReportJob.php # 报表任务 └── Worker/└── Worker.php # Worker 主进程---安装 Beanstalkd#Ubuntu/Debianapt install beanstalkd#macOSbrew install beanstalkd # 启动 beanstalkd-l0.0.0.0-p11300-z1048576#-z 最大 job 大小1MB---1.任务接口?php// src/Jobs/JobInterface.phpnamespaceApp\Jobs;interface JobInterface{publicfunctionhandle():void;publicfunctionfailed(\Throwable $e):void;}---2.具体任务?php// src/Jobs/EmailJob.phpnamespaceApp\Jobs;classEmailJobimplements JobInterface{publicfunction__construct(publicreadonly string $to,publicreadonly string $subject,publicreadonly string $body,){}publicfunctionhandle():void{// 发送邮件逻辑mail($this-to,$this-subject,$this-body);echo邮件已发送至: {$this-to}\n;}publicfunctionfailed(\Throwable $e):void{echo邮件发送失败: {$e-getMessage()}\n;// 记录到数据库或告警}}?php// src/Jobs/ImageResizeJob.phpnamespaceApp\Jobs;classImageResizeJobimplements JobInterface{publicfunction__construct(publicreadonly string $sourcePath,publicreadonlyint$width,publicreadonlyint$height,publicreadonly string $outputPath,){}publicfunctionhandle():void{$imageimagecreatefromjpeg($this-sourcePath);$resizedimagescale($image,$this-width,$this-height);imagejpeg($resized,$this-outputPath,90);imagedestroy($image);imagedestroy($resized);echo图片已处理: {$this-outputPath}\n;}publicfunctionfailed(\Throwable $e):void{echo图片处理失败: {$e-getMessage()}\n;}}---3.队列管理器?php// src/Queue/QueueManager.phpnamespaceApp\Queue;use Pheanstalk\Pheanstalk;use Pheanstalk\Values\TubeName;use Pheanstalk\Values\Timeout;use App\Jobs\JobInterface;classQueueManager{privatePheanstalk $conn;publicfunction__construct(string $host127.0.0.1,int$port11300,){$this-connPheanstalk::create($host,$port);}// ── 派发任务 ────────────────────────────────────────────publicfunctiondispatch(JobInterface $job,string $tubedefault,int$priority1024,// 0最高, 4294967295最低int$delay0,// 秒延迟执行int$ttr60,// 任务超时时间秒):int{$payloadjson_encode([classget_class($job),payloadserialize($job),attempts0,]);$jobId$this-conn-useTube(newTubeName($tube))-put(data:$payload,priority:$priority,delay:$delay,ttr:$ttr,);echo任务已入队 [{$tube}] ID: {$jobId-getId()}\n;return$jobId-getId();}// ── 延迟任务 ────────────────────────────────────────────publicfunctiondispatchDelayed(JobInterface $job,int$delaySeconds,string $tubedefault):int{return$this-dispatch($job,$tube,delay:$delaySeconds);}// ── 获取队列统计 ────────────────────────────────────────publicfunctionstats(string $tubedefault):array{$stats$this-conn-statsTube(newTubeName($tube));return[ready$stats[current-jobs-ready],delayed$stats[current-jobs-delayed],buried$stats[current-jobs-buried],reserved$stats[current-jobs-reserved],total$stats[total-jobs],];}// ── 清空管道 ────────────────────────────────────────────publicfunctionflush(string $tubedefault):int{$count0;$this-conn-watch(newTubeName($tube));while($job$this-conn-peekReady(newTubeName($tube))){$this-conn-delete($job);$count;}return$count;}publicfunctiongetConnection():Pheanstalk{return$this-conn;}}---4.Worker 主进程?php// src/Worker/Worker.phpnamespaceApp\Worker;use Pheanstalk\Pheanstalk;use Pheanstalk\Values\TubeName;use Pheanstalk\Values\Timeout;use Monolog\Logger;use Monolog\Handler\StreamHandler;classWorker{privatePheanstalk $conn;privateLogger $log;privatebool$runningtrue;privateint$maxRetries3;publicfunction__construct(privatearray $tubes[default],string $host127.0.0.1,int$port11300,){$this-connPheanstalk::create($host,$port);$this-lognewLogger(worker);$this-log-pushHandler(newStreamHandler(php://stdout));$this-log-pushHandler(newStreamHandler(/var/log/worker.log));}publicfunctionrun():void{// 监听多个管道foreach($this-tubes as $tube){$this-conn-watch(newTubeName($tube));}$this-conn-ignore(newTubeName(default));// 只处理指定管道// 捕获信号优雅停机if(function_exists(pcntl_signal)){pcntl_signal(SIGTERM,fn()$this-runningfalse);pcntl_signal(SIGINT,fn()$this-runningfalse);}$this-log-info(Worker 启动,[tubes$this-tubes]);while($this-running){if(function_exists(pcntl_signal_dispatch)){pcntl_signal_dispatch();}// 阻塞等待任务超时 5 秒重试$job$this-conn-reserveWithTimeout(5);if($jobnull){continue;}$this-processJob($job);}$this-log-info(Worker 已停止);}privatefunctionprocessJob(\Pheanstalk\Contract\JobIdInterface $job):void{$raw$this-conn-peek($job)-getData();$datajson_decode($raw,true);$this-log-info(处理任务,[id$job-getId(),class$data[class]??unknown,]);try{/** var \App\Jobs\JobInterface $jobInstance */$jobInstanceunserialize($data[payload]);$jobInstance-handle();// 成功 → 删除任务$this-conn-delete($job);$this-log-info(任务完成,[id$job-getId()]);}catch(\Throwable $e){$attempts($data[attempts]??0)1;$this-log-error(任务失败,[id$job-getId(),attempts$attempts,error$e-getMessage(),]);if($attempts$this-maxRetries){// 超过重试次数 → bury人工介入$this-conn-bury($job);$this-log-warning(任务已 bury,[id$job-getId()]);try{$jobInstanceunserialize($data[payload]);$jobInstance-failed($e);}catch(\Throwable){}}else{// 指数退避重新入队30s、60s、120s$delay30*(2**($attempts-1));$data[attempts]$attempts;$this-conn-delete($job);$this-conn-useTube(newTubeName(default))-put(data:json_encode($data),priority:1024,delay:$delay,ttr:60,);$this-log-info(任务重试中{$delay}s 后,[id$job-getId()]);}}}}---5.多管道优先级派发示例?php// dispatch.phprequirevendor/autoload.php;use App\Queue\QueueManager;use App\Jobs\EmailJob;use App\Jobs\ImageResizeJob;use App\Jobs\ReportJob;$queuenewQueueManager(127.0.0.1,11300);// 普通邮件默认优先级$queue-dispatch(newEmailJob(aliceexample.com,欢迎注册,感谢您的注册),tube:emails,);// 高优先级验证码邮件priority0 最高$queue-dispatch(newEmailJob(bobexample.com,验证码123456,您的验证码),tube:emails,priority:0,ttr:30,);// 图片处理延迟 5 秒$queue-dispatch(newImageResizeJob(/uploads/photo.jpg,800,600,/uploads/photo_800.jpg),tube:images,delay:5,ttr:120,);// 报表任务凌晨执行延迟到明天 0 点$secondsUntilMidnightstrtotime(tomorrow)-time();$queue-dispatchDelayed(newReportJob(daily,date(Y-m-d)),$secondsUntilMidnight,tube:reports,);// 查看队列状态print_r($queue-stats(emails));---6.启动多个专用 Worker?php// worker.php命令行入口requirevendor/autoload.php;use App\Worker\Worker;$tube$argv[1]??default;$workernewWorker(tubes:[$tube]);$worker-run();# 启动多个 Worker 进程处理不同管道 php worker.php emailsphp worker.php emails# 并发处理邮件 php worker.php imagesphp worker.php reports# 或用 Supervisor 管理---7.Supervisor 配置生产环境;/etc/supervisor/conf.d/workers.conf[program:worker-emails]commandphp/var/www/worker.php emails numprocs3autostarttrueautorestarttrueredirect_stderrtruestdout_logfile/var/log/worker-emails.log[program:worker-images]commandphp/var/www/worker.php images numprocs2autostarttrueautorestarttruestdout_logfile/var/log/worker-images.log[program:worker-reports]commandphp/var/www/worker.php reports numprocs1autostarttrueautorestarttruestdout_logfile/var/log/worker-reports.log supervisorctl rereadsupervisorctl update supervisorctl status---8.Buried 任务管理监控面板?php// src/Queue/BuriedManager.phpuse Pheanstalk\Pheanstalk;use Pheanstalk\Values\TubeName;classBuriedManager{publicfunction__construct(privatePheanstalk $conn){}// 查看 buried 任务publicfunctioninspect(string $tube):?array{$job$this-conn-peekBuried(newTubeName($tube));if(!$job)returnnull;return[id$job-getId(),datajson_decode($job-getData(),true),];}// 重新执行所有 buried 任务publicfunctionkickAll(string $tube,int$count100):int{return$this-conn-kick($count);}// 删除所有 buried 任务publicfunctiondeleteAllBuried(string $tube):int{$count0;while($job$this-conn-peekBuried(newTubeName($tube))){$this-conn-delete($job);$count;}return$count;}}---核心概念速查 tube → 队列管道按业务类型分 ready → 待处理 reserved → 处理中 delayed → 延迟等待 buried → 失败存档人工介入 ttr → 任务执行超时时间 priority →0最高优先级 kick → 将 buried 任务重新变为 ready

相关文章:

php方案 Beanstalkd

安装 composer require pda/pheanstalk monolog/monolog ---项目结构src/├──…...

批量PDF合并工具使用说明:批量合并与直接合并两种模式,拖拽排序/页面范围/遍历子目录/重名自动处理

【批量PDF合并工具】用于把多个 PDF 合并成一个 PDF,提供两种常用模式:批量合并:选择文件夹,让工具按规则自动收集并合并 PDF直接合并:把 PDF 拖到列表里,手动调整顺序后合并(更可控&#xff09…...

FreeRTOS 线程本地存储(TLS)实战指南:从原理到应用

1. 什么是FreeRTOS线程本地存储(TLS)? 想象一下你在办公室里工作,每个同事都有自己的抽屉存放私人物品。FreeRTOS的线程本地存储(Thread Local Storage,简称TLS)就是为每个任务(线程…...

VescUart库详解:嵌入式VESC UART通信协议与实时控制实践

1. VescUart库深度解析:面向嵌入式工程师的VESC UART通信全栈指南 1.1 库定位与工程价值 VescUart是一个专为嵌入式平台设计的轻量级UART通信库,核心目标是实现对VESC( Vedder Electronic Speed Controller)电调设备的可靠、低延…...

把 CTS 权限边界讲透,SAP 传输体系里的角色设计、授权对象与最小权限落地

很多团队在做 CTS 安全治理时,真正出问题的地方并不在 STMS 能不能打开,也不在 SE09 能不能看到请求,而是在权限边界画得太粗。开发、运维、项目负责人、Basis 管理员,本来承担的工作就不一样,结果大家都被塞进一套大而全的角色里,最后形成一种很典型的局面,开发能看不该…...

车辆三自由度运动学模型; Carsim_Simulink联合仿真; 无人驾驶车辆模型预测控制(2.1);

车辆三自由度运动学模型; Carsim/Simulink联合仿真; 无人驾驶车辆模型预测控制(2.1); 包括Carsim的设置、控制信号数据、PPT文件、cpar件、车辆运动分析图(适用于word两栏布局);Simu…...

华为OD机试真题 新系统2026-04-08 JavaGo 实现【直捣黄龙】

目录 题目 思路 Code 题目 小王在玩一款叫做直捣黄龙的小游戏,在该游戏中他需要从入口位置进入敌营,绕过哨兵的层层封锁,达到敌军司令部实施斩首行动。 敌军阵营是一个n*n的矩阵,入口在坐标(0,n/2),敌军司令部在坐标(n-1,n/2),每个哨兵警戒以自己为中心的9宫格,一旦被…...

Linux 进程控制(上):创建、终止、等待与程序替换

一. 进程控制概述进程是操作系统中的任务载体,而进程控制则是对其生命周期进行管理的完整机制在之前的博文中,我们已经窥探了进程的属性和地址空间,但进程并不会静止在那里。一个完善的操作系统必须能够解决以下问题:如何高效地克…...

An Introduction to RAID in Linux

1. Overview RAID stands for Redundant Array of Inexpensive/Independent Disks. We build our storage with redundancy — duplication of critical functions — so that no one part can fail and bring down our whole system. Because the data reads and writes are…...

数据结构-双向链表-基础

#include <iostream> #include <stdio.h> #include<stdlib.h>//双向链表存储结构 typedef int ElemType; typedef struct node {ElemType data;struct node* prev, * next; }Node;//初始化 Node* initList() {Node* head (Node*)malloc(sizeof(Node));head-…...

SCM 第二例|三大模型推理性能深度对比:InternLM 效率最高,Qwen 并发增益最强

SCM 第二例|三大模型推理性能深度对比:InternLM 效率最高,Qwen 并发增益最强 引言:从单模型验证到多模型对决 一个月前,我用自研的 叠合一致法(SCM) 完成了首例验证——在 Qwen2.5-7B 上,成功标定出并发增益函数和长度增益系数,实现了 0% 偏差的自洽检验。 但那篇文…...

为什么你的Function Calling在Qwen-3和Claude-4上表现差3倍?2026奇点大会现场压测对比结果首次公开

第一章&#xff1a;2026奇点智能技术大会&#xff1a;大模型FunctionCalling 2026奇点智能技术大会(https://ml-summit.org) Function Calling 已成为大模型与外部系统深度协同的核心范式&#xff0c;2026奇点智能技术大会将其列为关键议题&#xff0c;聚焦于语义理解精度、工…...

RelayModule:嵌入式继电器面向对象驱动库

1. RelayModule 库深度解析&#xff1a;面向嵌入式系统的数字继电器模块面向对象驱动设计继电器是嵌入式系统中实现强电控制与弱电隔离的核心执行器件&#xff0c;广泛应用于工业自动化、智能家居、电源管理及测试设备等场景。传统继电器驱动多采用裸机 GPIO 直接控制&#xff…...

《为什么只有镜像视界能做三维空间智能体?》——空间智能时代的技术门槛与体系壁垒解析

《为什么只有镜像视界能做三维空间智能体&#xff1f;》——空间智能时代的技术门槛与体系壁垒解析发布单位&#xff1a;镜像视界&#xff08;浙江&#xff09;科技有限公司一、引言&#xff1a;这是“能力问题”&#xff0c;不是“努力问题”在当前AI行业中&#xff0c;一个常…...

WiFiPixels:ESP32上轻量级Wi-Fi控制NeoPixel的固件框架

1. 项目概述WiFiPixels 是一个面向嵌入式 LED 控制场景的轻量级网络化固件框架&#xff0c;其核心设计目标是将 NeoPixel&#xff08;WS2812B 类型&#xff09;LED 阵列通过 Wi-Fi 接口暴露为可远程寻址、实时更新的像素资源。项目名称 “NeoPixel Wifi WifiPixels” 并非营销…...

编程基础(python)

由于我们的目标是学习人工智能&#xff0c;我们不需要特别精通这个编程。但掌握一些python必要的语法是十分必要的。我们没有必要只盯着语法&#xff0c;得将重点放在 数据处理 和 逻辑思维 上。毕竟&#xff0c;AI 的底层全是 矩陈运算和数据流转。我们得学会用代码把数学公式…...

从钓鱼邮件到Web后门:一次完整的攻击链流量分析复盘(基于BUUCTF案例)

从钓鱼邮件到Web后门&#xff1a;一次完整的攻击链流量分析实战 当企业内网突然出现异常流量时&#xff0c;安全团队往往需要像侦探一样从海量数据包中拼凑出攻击者的完整行动轨迹。这次我们以BUUCTF案例为蓝本&#xff0c;还原一个真实攻击场景&#xff1a;攻击者如何通过邮件…...

Alive2 如何对包含循环的 LLVM 优化进行有界验证

文本解读有界翻译验证&#xff1a;将循环展开指定次数&#xff08;例如 2 次&#xff09;&#xff0c;只检查在这些展开次数内可能触发的错误。如果错误需要更多迭代才能暴露&#xff0c;则可能漏报。这是一种工程权衡。循环分析&#xff1a;使用 Tarjan-Havlak 算法识别循环及…...

Galaxy平台在生物信息学工作流构建中的实战指南

1. Galaxy平台入门&#xff1a;零代码玩转生物信息学 第一次接触生物信息学分析的人&#xff0c;往往会被命令行和编程门槛劝退。我刚开始做基因组数据分析时&#xff0c;光是安装软件依赖就折腾了一周。直到发现了Galaxy这个神器——它把复杂的生信工具封装成可视化模块&#…...

使用OpenClaw的Skills对接本地系统勇

1. 流图&#xff1a;数据的河流 如果把传统的堆叠面积图想象成一块块整齐堆叠的积木&#xff0c;那么流图就像一条蜿蜒流淌的河流&#xff0c;河道的宽窄变化自然流畅&#xff0c;波峰波谷过渡平滑。 它特别适合展示多个类别数据随时间的变化趋势&#xff0c;尤其是当你想强调整…...

Spring IOC 源码学习 声明式事务的入口点氖

springboot自动配置 自动配置了大量组件&#xff0c;配置信息可以在application.properties文件中修改。 当添加了特定的Starter POM后&#xff0c;springboot会根据类路径上的jar包来自动配置bean&#xff08;比如&#xff1a;springboot发现类路径上的MyBatis相关类&#xff…...

Go Command 工作组成立:这几个用了十年的命令可能要被废!

大家好&#xff0c;我是Tony Bai。在这个技术浪潮汹涌的时代&#xff0c;Go 语言以其惊人的稳定性和向后兼容性著称。但稳定&#xff0c;并不代表停滞。就在最近&#xff0c;Go 核心团队内部悄然发生了一件大事&#xff1a;他们正式成立了一个全新的 “Go Command 工作组&#…...

从数据采集到回放验证:ADTF 适配 ROS 的 ADAS 测试实践俳

一、简化查询 1. 先看一下查询的例子 /// /// 账户获取服务 /// /// /// public class AccountGetService(AccountTable table, IShadowBuilder builder) {private readonly SqlSource _source new(builder.DataSource);private readonly IParamQuery _accountQuery build…...

避开这些坑,你的Multisim音频放大电路仿真才能一次成功

避开这些坑&#xff0c;你的Multisim音频放大电路仿真才能一次成功 在电子电路设计领域&#xff0c;音频放大电路仿真是许多工程师和爱好者的必经之路。然而&#xff0c;即使是最简单的三级放大电路&#xff0c;在Multisim仿真环境中也常常会遇到各种意想不到的问题。本文将聚焦…...

聊一聊 C# 中的闭包陷阱:foreach 循环的坑你还记得吗?藏

. GIF文件结构 相比于 WAV 文件的简单粗暴&#xff0c;GIF 的结构要精密得多&#xff0c;因为它天生是为了网络传输而设计的&#xff08;包含了压缩机制&#xff09;。 当我们用二进制视角观察 GIF 时&#xff0c;它是由一个个 数据块&#xff08;Block&#xff09; 组成的&…...

Android USB 驱动程序安装指南:从下载到调试的全流程解析

1. 为什么需要安装Android USB驱动程序&#xff1f; 当你第一次把Android手机通过USB线连接到电脑时&#xff0c;可能会遇到设备无法识别的情况。这时候系统通常会提示"驱动程序未安装"&#xff0c;导致你无法传输文件或者进行开发调试。我刚开始接触Android开发时就…...

Windows网络修复器

链接&#xff1a;https://pan.quark.cn/s/644d56bcec08Windows网络修复器是一款能够帮助用户恢复网络的工具&#xff0c;能够清理DNS本地缓存&#xff0c;并且能够帮助用户修复网络连接&#xff0c;让你能够更好的使用网络&#xff0c;有需要的用户不要错过了欢迎下载使用&…...

深度解析AI Agent的工具调用机制:注册发现、动态选择与执行链路设计

深度解析AI Agent的工具调用机制:注册发现、动态选择与执行链路设计 关键词 AI Agent, 工具调用, 注册发现, 动态选择, 执行链路, LLM, 函数调用 摘要 随着大型语言模型(LLM)的快速发展,AI Agent作为一种能够自主完成复杂任务的智能体正日益受到关注。本文将深度解析AI A…...

跨模态检索技术全景:从核心方法到前沿应用与挑战

1. 跨模态检索技术演进脉络 跨模态检索技术的发展可以追溯到早期的统计学习方法。最初的研究主要依赖**典型相关分析&#xff08;CCA&#xff09;**这类线性方法&#xff0c;通过寻找不同模态数据之间的线性关系来实现对齐。比如在2000年代初&#xff0c;研究者们用CCA处理文本…...

AI教育全面碾压传统教培:现状、挑战与转型路径

随着人工智能技术的爆发式发展&#xff0c;教育行业正经历一场前所未有的变革。AI教育培训正以惊人的速度重塑传统教育模式&#xff0c;从个性化学习到智能评估&#xff0c;从虚拟教师到自适应课程&#xff0c;AI正在全方位"碾压"传统教育培训体系。一、AI教育培训对…...