当前位置: 首页 > article >正文

mini-job极简分布式延迟任务队列 — 基于 Redis,支持 Cron 周期任务、异步协程和多执行器

mini-job极简分布式延迟任务队列 — 基于 Redis支持 Cron 周期任务、异步协程和多执行器。特性特性说明延迟任务设定延迟秒数到期自动执行Cron 周期调度支持标准 cron 表达式分 时 日 月 星期三种执行器async协程IO 密集、thread线程通用、process进程CPU 密集队列级执行器隔离Redis Key{ns}:ready:{executor}三队列隔离消费者只拉取专属队列零竞争死信队列失败的一次性任务自动进入死信队列可排查重试可见性超时回收消费者崩溃后任务自动回收重入队列不丢任务命名空间隔离多环境共用同一 Redis 实例Key 前缀隔离监控指标内置QueueMetrics统计各生命周期计数背压控制队列深度超阈值自动告警Pydantic 配置集中配置管理环境变量覆盖类型校验Lua 原子操作抢占和回收均为 Redis 端原子执行无竞态优雅关闭SIGTERM/SIGINT 信号处理等待任务完成安装pipinstallmini-job# 核心依赖pipinstallmini-job[script]# 含 pandas/numpy脚本执行模式依赖Python 3.12Redis 7.4croniterpydantic-settingspython-dotenv快速开始1. 确保 Redis 运行redis-cliping# PONG2. 生产者 — 发布任务frommini_jobimportDelayQueue dqDelayQueue(namespacemyapp)# 注册脚本动态执行模式dq.register_script(send_email, def handler(payload): to_email payload.get(to) print(f发送邮件到: {to_email}) return {status: sent, to: to_email} ,)# 发布任务 — executor 参数指定执行器类型dq.publish(send_email,{to:userexample.com,subject:欢迎,content:注册成功},executorasync,# async / thread / process)# 延迟 30 秒执行dq.publish(send_email,{...},delay_seconds30)# 每天凌晨 2 点执行cron 表达式分 时 日 月 星期dq.publish(send_email,{...},cron0 2 * * *)# 查询任务结果resultdq.get_task_result(task_id)3. 消费者 — 按类型独立启动frommini_jobimportDelayQueue# 注册本地函数defsend_sms(payload):print(f发送短信 -{payload[phone]})TASK_REGISTRY{send_sms:send_sms,}dqDelayQueue(namespacemyapp)dq.start(task_registryTASK_REGISTRY,executor_typeasync,# 本进程只消费 async 任务)启动不同执行器类型的消费者3 个终端python consumer.py async# 协程消费者 — IO 密集任务python consumer.py thread# 线程消费者 — 通用任务python consumer.py process# 进程消费者 — CPU 密集任务核心概念执行器类型类型适用场景实现推荐并发数asyncIO 密集发邮件、HTTP 请求、DB 操作asyncio协程100~500thread通用任务、阻塞操作ThreadPoolExecutor30~100processCPU 密集数据处理、报表生成ProcessPoolExecutorCPU 核数任务路由表TASK_REGISTRY{# 简单格式默认 async 执行器send_sms:send_sms,# 带配置格式指定执行器类型daily_report:(daily_report,{executor:thread}),}状态生命周期pending → running → completed ↘ failed → 死信队列一次性任务 下次重试周期任务Redis Key 设计{namespace}:ready:{executor} — 按执行器隔离的就绪 ZSetasync/thread/process {namespace}:processing:{id} — 消费者专属处理列表 {namespace}:processing:timeout — 全局超时追踪 ZSet {namespace}:dead_letter — 死信队列 {namespace}:dead_letter:detail — 死信详情 {namespace}:task:meta — 任务元数据 {namespace}:task:result:{id} — 任务结果独立 TTL {namespace}:scripts — 注册脚本API 参考DelayQueuedqDelayQueue(namespacemyapp)# 或使用配置对象frommini_jobimportQueueConfig dqDelayQueue(QueueConfig(namespacemyapp))生产者方法方法说明publish(func, payload, delay_seconds0, cronNone, executorasync)发布任务 → 返回 task_idregister_script(name, content, languagepython, use[])注册动态脚本get_script(name)获取脚本信息delete_script(name)删除脚本list_scripts()列出所有脚本get_task_result(task_id)查询任务状态和结果消费者方法方法说明start(task_registry, executor_typeasync, **kwargs)启动消费者stop()手动触发优雅关闭start()参数参数默认值说明task_registry(必填)任务路由表{name: func}executor_typeasync执行器类型async / thread / processpoll_interval0.5轮询间隔秒grab_limit80每次最多抢占任务数worker_threads50工作线程/协程/进程数task_timeout30单个任务超时秒visibility_timeout60可见性超时秒配置通过 Pydantic Settings 管理支持.env文件、环境变量覆盖、类型校验。队列配置DQ_*参数环境变量默认值类型说明namespaceDQ_NAMESPACEdqstrRedis Key 命名空间前缀多环境隔离consumer_idDQ_CONSUMER_ID自动生成str消费者唯一标识默认worker- 8 位 hexresult_ttlDQ_RESULT_TTL86400int任务结果保留时间秒默认 1 天reclaim_intervalDQ_RECLAIM_INTERVAL10int超时回收检查间隔轮询周期数每 N 轮检查一次Redis 连接配置DQ_REDIS_*参数环境变量默认值类型说明hostDQ_REDIS_HOSTlocalhoststrRedis 主机地址portDQ_REDIS_PORT6379intRedis 端口dbDQ_REDIS_DB0intRedis 数据库编号passwordDQ_REDIS_PASSWORDNonestrRedis 密码可选max_connectionsDQ_REDIS_MAX_CONNECTIONS50int连接池最大连接数socket_timeoutDQ_REDIS_SOCKET_TIMEOUT5.0float单次操作超时秒socket_connect_timeoutDQ_REDIS_SOCKET_CONNECT_TIMEOUT5.0float连接建立超时秒retry_on_timeoutDQ_REDIS_RETRY_ON_TIMEOUTTruebool超时是否自动重试health_check_intervalDQ_REDIS_HEALTH_CHECK_INTERVAL30int连接健康检查间隔秒消费者配置DQ_CONSUMER_*参数环境变量默认值类型说明poll_intervalDQ_CONSUMER_POLL_INTERVAL0.5float轮询间隔秒影响任务延迟精度grab_limitDQ_CONSUMER_GRAB_LIMIT80int每次最多抢占任务数建议 worker × 1.5~2worker_threadsDQ_CONSUMER_WORKER_THREADS50int工作协程/线程/进程数task_timeoutDQ_CONSUMER_TASK_TIMEOUT30int单个任务执行超时秒超时后标记失败visibility_timeoutDQ_CONSUMER_VISIBILITY_TIMEOUT60int可见性超时秒消费者需在此时间内完成任务shutdown_timeoutDQ_CONSUMER_SHUTDOWN_TIMEOUT30int优雅关闭最大等待时间秒max_queue_depthDQ_CONSUMER_MAX_QUEUE_DEPTH10000int队列深度告警阈值超阈值打印 WARNING示例.env# 队列DQ_NAMESPACEproductionDQ_CONSUMER_IDweb-server-01# RedisDQ_REDIS_HOSTredis.example.comDQ_REDIS_PORT6379DQ_REDIS_PASSWORDsecret# 消费者DQ_CONSUMER_POLL_INTERVAL0.3DQ_CONSUMER_GRAB_LIMIT100DQ_CONSUMER_WORKER_THREADS80DQ_CONSUMER_TASK_TIMEOUT60DQ_CONSUMER_VISIBILITY_TIMEOUT120监控# 获取监控指标快照snapshotdq.metrics.snapshot()# {published: 1000, completed: 980, failed: 15, timeout: 5, ...}指标说明指标含义published已发布任务总数completed成功完成数failed执行失败数timeout超时任务数dead_lettered进入死信队列数reclaimed超时回收重入队数项目结构mini_job/ ├── __init__.py # 公共导出 ├── config.py # Pydantic Settings 配置 ├── core/ │ ├── delay_queue.py # DelayQueue 核心 │ └── task.py # 任务模型 ├── executor/ │ ├── base.py # 执行器抽象基类 │ ├── async_io.py # 协程执行器 │ ├── thread.py # 线程执行器 │ └── process.py # 进程执行器 ├── redis/ │ ├── client.py # Redis 连接 Lua 脚本 │ └── scripts.lua # 原子 Lua 脚本 ├── utils/ │ ├── retry.py # 重试装饰器 │ ├── metrics.py # 监控指标 │ └── decorators.py # 任务装饰器 ├── consumer.py # 消费者示例 └── producer.py # 生产者示例LicenseMIT

相关文章:

mini-job极简分布式延迟任务队列 — 基于 Redis,支持 Cron 周期任务、异步协程和多执行器

mini-job 极简分布式延迟任务队列 — 基于 Redis,支持 Cron 周期任务、异步协程和多执行器。 特性特性说明延迟任务设定延迟秒数,到期自动执行Cron 周期调度支持标准 cron 表达式(分 时 日 月 星期)三种执行器async 协程&#xff…...

内网IP如何申请SSL证书?

一、为什么需要内网IP证书? 很多企业有一个误区:认为“只有域名才能做HTTPS”,或者“内网用HTTP没关系”。现实恰恰相反: 合规硬指标:《数据安全法》等法规明确要求数据传输必须加密,内网明文传输在等保测…...

FastAPI + PostgreSL 实战:给应用装上“缓存”和“日志”翅膀

1. 哑铃图是什么? 哑铃图(Dumbbell Plot),有时也称为DNA图或杠铃图,是一种用于比较两个相关数据点的可视化图表。 它源于人们对更有效数据比较方式的持续探索。 在传统的时间序列比较中,我们通常使用两条折…...

PMC Organometallix宣布所有产品提价

鉴于市场环境发生重大变化,PMC Organometallix, Inc. 宣布,自2026年5月1日起(或根据合同条款允许的时间),全球所有产品线的价格将上调10%至25%。此次调整源于关键投入成本的持续压力,包括原材料成本上涨以及…...

网络安全渗透测试入门|无线安全渗透与防御完整教程

前言 这是给粉丝盆友们整理的网络安全渗透测试入门阶段无线安全渗透与防御教程 喜欢的朋友们,记得给我点赞支持和收藏一下,关注我,学习黑客技术。 1.Aircrack-ng简介 Aircrack- NG是一个完整的工具来评估Wi-Fi网络安全套件。 捕获&#x…...

告别Swagger默认丑界面!.NET Core 6项目集成Knife4jUI保姆级教程

.NET Core 6项目集成Knife4jUI:打造专业级API文档体验 在当今快节奏的开发环境中,API文档的质量直接影响着团队协作效率。许多.NET Core开发者虽然已经使用Swagger生成基础文档,却常常面临界面简陋、功能单一的问题。Knife4jUI作为Swagger UI…...

Qt项目拆分之术:如何用SUBDIRS把大工程拆成小模块(从app到lib的实战)

Qt项目模块化实战:用SUBDIRS构建可扩展工程架构 当你的Qt项目从几百行代码膨胀到数万行时,编译时间开始以分钟计算,团队协作频繁出现文件冲突,新成员面对庞杂的目录结构不知所措——这就是我们需要模块化拆分的临界点。上周我接手…...

5分钟搭建家庭电视直播系统:Kodi IPTV Simple完全指南

5分钟搭建家庭电视直播系统:Kodi IPTV Simple完全指南 【免费下载链接】pvr.iptvsimple IPTV Simple client for Kodi PVR 项目地址: https://gitcode.com/gh_mirrors/pv/pvr.iptvsimple 还在为电视直播体验烦恼吗?想用最简单的方式把网络直播源整…...

Python程序打包为EXE

PowerShell 用anaconda创建虚拟环境 conda -n create XXXconda initconda activate xxx进入要打包的文件夹中安装依赖pip install -r requirements.txt 打包pyinstaller -F -w main.py --clean --noconfirm...

软件产品负责人管理中的需求决策者

在软件开发领域,产品负责人(Product Owner)是决定产品成败的关键角色之一,而需求决策者则是这一角色的核心职能。他们不仅需要理解市场和用户需求,还要在资源有限的情况下,权衡优先级,确保团队交…...

【基于 macOS 虚拟机的 iMessage 批量消息处理技术实践】

一、研究背景与技术意义iMessage 作为苹果生态内置的原生通讯服务,依托系统底层优势,具备端到端加密、无运营商拦截、原生展示等特性,常用于企业内部事务提醒、授权用户服务告知等合规场景。在技术研究过程中,手动单条发送消息效率…...

从ArrayList到VectorSpecies:Java向量化开发全流程拆解,含GraalVM AOT+Linux perf火焰图调优实战

更多请点击: https://intelliparadigm.com 第一章:Java 25 向量 API 硬件加速概览 Java 25 正式将 jdk.incubator.vector 模块升级为标准 API(java.util.vector),标志着 JVM 首次原生支持跨平台向量化计算&#xff0c…...

Live Avatar数字人模型保姆级部署教程:4步搞定AI视频生成

Live Avatar数字人模型保姆级部署教程:4步搞定AI视频生成 1. 准备工作:硬件与软件环境检查 1.1 硬件要求详解 Live Avatar对硬件有明确要求,这是确保模型正常运行的基础: 显卡要求: 最低配置:单卡NVIDIA…...

如何提升域名价值——评估标准

关于Dynadot Dynadot是通过ICANN认证的域名注册商,自2002年成立以来,服务于全球108个国家和地区的客户,为数以万计的客户提供简洁,优惠,安全的域名注册以及管理服务。 Dynadot平台操作教程索引(包括域名邮…...

深度对比:瑞芯微RK3588边缘盒子 vs 其他方案,在智慧油站车牌识别场景下的真实表现

智慧油站车牌识别实战:RK3588边缘计算盒子的性能突围战 当加油站开始拥抱智能化转型,车牌识别系统便成了连接物理世界与数字服务的"第一道闸机"。在华北某连锁油站的改造案例中,技术团队曾面临这样的困境:传统工控机处理…...

告别zipfile!用PyZipper给你的Python压缩文件加上AES-256密码锁(附中文乱码解决方案)

用PyZipper实现AES-256加密压缩:Python开发者的安全实践指南 在数据交换和备份场景中,ZIP文件是最常见的归档格式之一。但标准库zipfile提供的加密功能实际上非常脆弱——它使用的传统加密算法早在1999年就被证明可以在极短时间内被暴力破解。当我们需要…...

DownKyi:你的B站视频下载管家,从新手到高手的完整指南 [特殊字符]

DownKyi:你的B站视频下载管家,从新手到高手的完整指南 🎬 【免费下载链接】downkyi 哔哩下载姬downkyi,哔哩哔哩网站视频下载工具,支持批量下载,支持8K、HDR、杜比视界,提供工具箱(音…...

Hypnos-i1-8B开发环境配置:VSCode远程连接与调试教程

Hypnos-i1-8B开发环境配置:VSCode远程连接与调试教程 1. 引言 如果你正在使用Hypnos-i1-8B这样的开源大模型,可能会遇到一个常见问题:本地电脑配置不够,而远程服务器上的开发体验又不够友好。今天我们就来解决这个痛点&#xff…...

DC‑1 靶机完整渗透思路 + 详细步骤(可直接复现)

核心思路:信息收集 → Drupal 远程代码执行 → 拿 Webshell → 数据库信息利用 → SUID 提权 → 拿 Root 与全部 Flag一、环境准备攻击机:Kali Linux(NAT 模式)靶机:DC‑1(VulnHub 下载,NAT 模式…...

从自动驾驶到机器人:双目视差生成点云在实际项目里怎么用?

从自动驾驶到机器人:双目视差生成点云在实际项目中的工程化实践 当机器人需要在未知环境中自主导航,或是自动驾驶汽车试图理解周围的三维空间时,双目视觉系统往往扮演着关键角色。不同于激光雷达的高成本,双目相机以相对经济的硬件…...

告别鬼影!用PyTorch复现动态场景HDR融合论文(附数据集构建与训练代码)

告别鬼影!用PyTorch复现动态场景HDR融合论文(附数据集构建与训练代码) 在计算机视觉领域,高动态范围(HDR)成像技术一直是研究热点。当面对动态场景时,传统HDR融合方法往往会产生令人头疼的"…...

2026年热门会议纪要神器实测对比转写整理全维度比拼,差距竟然这么大

做自媒体这几年,每天要转访谈录音、剪视频做字幕、整理线上分享内容,踩过不下10款转写工具的坑。这次特意把市面上主流的工具全部实测了一遍,负责任说:对比了多款工具,听脑AI是综合体验最好的,没有之一。 直…...

AWPortrait-Z问题解决:常见生成问题排查与优化技巧

AWPortrait-Z问题解决:常见生成问题排查与优化技巧 1. 常见生成问题诊断与修复 1.1 图像质量不理想的五大原因 当生成的人像效果不符合预期时,通常由以下原因导致: 提示词过于简单:仅使用"a beautiful woman"这类泛…...

如何3分钟搞定B站视频下载:DownKyi哔哩下载姬的终极免费方案

如何3分钟搞定B站视频下载:DownKyi哔哩下载姬的终极免费方案 【免费下载链接】downkyi 哔哩下载姬downkyi,哔哩哔哩网站视频下载工具,支持批量下载,支持8K、HDR、杜比视界,提供工具箱(音视频提取、去水印等…...

如何一键下载百度文库等30+文档平台?kill-doc脚本全攻略

如何一键下载百度文库等30文档平台?kill-doc脚本全攻略 【免费下载链接】kill-doc 看到经常有小伙伴们需要下载一些免费文档,但是相关网站浏览体验不好各种广告,各种登录验证,需要很多步骤才能下载文档,该脚本就是为了…...

Oumuamua-7b-RP实战教程:将自定义角色导出为JSON并在多端复用

Oumuamua-7b-RP实战教程:将自定义角色导出为JSON并在多端复用 1. 项目概述 Oumuamua-7b-RP 是一个基于Mistral-7B架构的日语角色扮演专用大语言模型Web界面,专为沉浸式角色对话体验设计。这个工具让用户能够创建、保存和复用自定义角色设定&#xff0c…...

如何从零打造你的第一把模块化机械键盘:HelloWord-Keyboard终极指南

如何从零打造你的第一把模块化机械键盘:HelloWord-Keyboard终极指南 【免费下载链接】HelloWord-Keyboard 项目地址: https://gitcode.com/gh_mirrors/he/HelloWord-Keyboard 你是否厌倦了千篇一律的机械键盘设计?是否曾幻想拥有一把能完全按照自…...

为什么禁止我请求别的网站的接口?——跨域与CORS _

你有没有遇到过这种情况:在自己的网页上想请求别人的API,结果浏览器直接报错:Access-Control-Allow-Origin header is missing。为什么浏览器要阻止你?服务器不响应不就完了吗? 今天,用小区门禁的故事&…...

记忆碎片化测试标准:软件测试领域的新兴挑战与应对框架

在数字化与信息爆炸的时代,人类的认知模式正经历着深刻的变革,记忆碎片化现象已从心理学概念渗透至日常工作和专业实践。对于软件测试从业者而言,这一现象不仅关乎个人效率,更对测试活动的严谨性、系统性和有效性构成了潜在威胁。…...

通过受管控的控制平面加速商品陈列优化

作者:来自 Elastic Alexander Marquardt, Honza Krl 及 Taylor Roy 搜索行为的变化不应该需要一个工程工单。了解受管控的控制平面如何让业务团队在数小时内更新搜索策略,而无需部署,也无需承担风险。 Elasticsearch 新手?参加我…...