当前位置: 首页 > article >正文

BullMQ:AI系统缺失的队列层

你是否曾经花了大量时间只是在等待等待 API 调用完成。看着请求超时。盯着加载中的转圈动画。在某个时刻你会意识到问题不在于代码而在于架构。我们不能只是调用一个慢的东西然后期望一切顺利。这就是人们发明任务队列的原因。1、什么是任务队列这是一种朴素的方法——在完美世界中可行的方法用户上传文档 → 服务器调用 AI API → 服务器等待 → 服务器响应用户很简单对吧直到 AI API 需要 30 秒。或者你的 HTTP 网关在 25 秒时超时。或者第三方服务在第 50 次请求后对你限流。或者三个用户同时上传你同时冲击同一个端点。同步方法假设一切都是快速和可靠的。没有什么是快速和可靠的。任务队列通过解耦请求和实际工作来解决这个问题。与其立即执行工作你将一个便签放入队列上面写着嘿有人需要做这件事。一个独立的进程——一个Worker——拾取这个便签并在自己的时间以可控的速度完成实际工作。把它想象成邮局。你不需要在柜台等待邮递员亲自把包裹送到目的地。你交出包裹拿到一个追踪号然后回家。投递在后台进行。你可以稍后查看追踪号以确认是否到达。那个追踪号就是你的任务 ID。2、如果你来自 RabbitMQ 或 Kafka 的快速说明在这个入门教程中我们将使用 BullMQ。如果你之前用过 RabbitMQ、Apache Kafka甚至 Amazon Simple Queue ServiceBullMQ 可能会感觉太简单了。那是因为 BullMQ 并不试图成为通用的消息代理或事件流平台。它是一个任务队列一个用来表达请稍后做这项工作的工具。使用 RabbitMQ 时你可能会想到交换机、路由键、发布者、消费者以及多个服务订阅同一条消息。使用 Kafka 时你可能会想到主题、分区、偏移量和可重放的事件流。BullMQ 并不做这些事情。没有交换机。没有分区。没有扇出式发布/订阅。一个任务通常有一个生产者、一个队列和一个最终处理它的 Worker。这是有意为之的。对于本文所讨论的问题——后台任务例如文档分块生成嵌入向量发送电子邮件调整图片大小调用慢速 AI API——BullMQ 通常更容易理解也更快上手。特别是如果你的技术栈已经使用了 Node.js 和 Redis。如果 RabbitMQ 是一个邮件分拣中心Kafka 是整个货运铁路网络那么 BullMQ 就是熟食店柜台上的取号机取一个号码排队等候最终轮到你时有人会叫你。权衡之处在于 BullMQ 的范围更窄。如果你以后需要多个独立的服务消费同一个事件、高级路由规则或重放历史事件那通常就是你升级到 RabbitMQ 或 Kafka 的时候。3、 BullMQ的三个主要角色让我们先建立正确的思维模型。系统中有三个角色生产者Producer你的应用中创建任务的部分。通常是你的 API 处理程序或控制器。当用户触发一些可能很耗资源的操作时你不会立即执行工作而是将其入队。队列Queue等待区域。它是一个数据结构在 BullMQ 中由 Redis 支持用于存储任务并跟踪它们的状态。任务在这里等待直到有 Worker 准备好。Worker实际执行工作的进程。它监视队列逐个或批量拾取任务运行你的处理逻辑并将任务标记为完成或失败。生产者和 Worker 不需要知道彼此。它们只是都与队列通信。这就是全部的诀窍。4、前后对比队列实际改变了什么让我用一个真实的例子来说明——将文档导入 RAG 系统。这意味着从 PDF 中提取文本、将其分成块、评估质量、生成嵌入向量并将所有内容写入向量数据库。没有队列时这在你的 HTTP 请求处理程序中同步运行这是消息队列系统在我们的例子中任务队列能极大改善用户体验的典型案例。使用队列后HTTP 处理程序只做一件事入队。然后立即返回如下面的时序图所示差异不仅仅是外观上的。有了队列API 保持快速无论后台工作需要多长时间系统可以处理 100 个并发上传而不会有 100 个被阻塞的线程嵌入 API 的速率限制由 Worker 遵守而不是由 100 个竞争请求管理如果嵌入 API 宕机任务会在队列中排队并自动重试。不会丢失任何工作。5、代码示例下面是一些代码示例如果你想尝试使用 BullMQ 创建任务队列。5.1 设置 RedisBullMQ 将所有内容存储在 Redis 中任务负载、状态、重试计数所有东西。Redis 很快能在重启后存活而且 BullMQ 专门围绕其数据结构构建。在本地运行 Redis 最快的方式是 Dockerdocker run -d -p 6379:6379 redis:7就这样。Redis 现在运行在localhost:6379。本地开发无需额外配置。5.2 安装 BullMQ如果你已经在使用 NodeJSBullMQ 非常适合。我们需要安装这两个库bullmq队列库和ioredis它底层使用的 Redis 客户端。npm install bullmq ioredis5.3 你的第一个队列和生产者假设我们正在构建一个文档处理功能。当用户上传文件时我们想要启动一个后台任务而不是阻塞 HTTP 响应。首先设置一个共享的 Redis 连接并创建一个队列import { Queue } from bullmq; import IORedis from ioredis; // create a connection to Redis const connection new IORedis({ maxRetriesPerRequest: null }); // create a queue with the name document-processing const document_queue new Queue(document-processing, { connection });现在在我们的 API 处理程序中通常我们会同步执行工作的地方我们只需入队// create a new job and // enqueue it in the document-processing queue const job await document_queue.add( process-document, { documentId: doc_abc123, userId: user_xyz, fileUrl: https://storage.example.com/uploads/abc123.pdf, } ); console.log(Job enqueued with ID: ${job.id}); // return job.id to the client so they can poll for status laterqueue.add()接受一个任务名称和一个负载。负载是我们的 Worker 需要的任何数据。该方法返回一个带有 ID 的任务对象请保存好它。5.4 我们的第一个 Worker现在我们需要一些东西来实际拾取这些任务。下面是一个基本的 Workerimport { Worker } from bullmq; import IORedis from ioredis; // create a connection to Redis const connection new IORedis({ maxRetriesPerRequest: null }); // create the worker for // the document-processing queue we have created previously const worker new Worker( document-processing, // - the name of the queue async (job) { console.log(Processing job ${job.id}); console.log(Document ID: ${job.data.documentId}); // This is where your actual processing logic goes // e.g., download file, call AI API, save results. // Before using job queue, an API handler would just // immediately call this function. await process_document(job.data); return { status: done, processedAt: new Date().toISOString() }; }, { connection } ); // event listener 1 worker.on(completed, (job) { console.log(Job ${job.id} finished.); }); // event listener 2 worker.on(failed, (job, err) { console.error(Job ${job?.id} failed:, err.message); });Worker 通过名称连接到同一个队列。BullMQ 处理所有的轮询——你的处理函数只在有任务需要处理时运行。你从处理器中return的任何内容都会被保存为任务的结果数据。6、任务状态入队后会发生什么每个任务都会经历一个生命周期。了解这一点会让调试容易得多。还有waiting-children用于任务依赖和paused状态但这些是高级主题。对于上面的例子waiting → active → completed/failed是核心循环。6.1 重试和退避因为事情会失败外部 API 会失败。网络会出问题。你的 AI 提供商在凌晨 3 点返回 503 错误。我们希望任务能在不需要人工干预的情况下自动重试。我们可以在入队时配置重试。前面的示例代码可以这样写const job await document_queue.add( process-document, { documentId: doc_abc123, userId: user_xyz, fileUrl: https://storage.example.com/uploads/abc123.pdf, }, // retry configuration { attempts: 5, backoff: { type: exponential, delay: 2000, // start with 2 seconds }, } );指数退避意味着等待 2 秒然后 4 秒然后 8 秒然后 16 秒……每次重试大约等待上一次的两倍时间。这很重要因为如果一个服务正在挣扎中每秒都去冲击它只会让情况更糟。给它恢复的空间。使用attempts: 5BullMQ 在第一次失败后最多会再重试四次之后才会最终将任务标记为failed。7、限流这是我最初在 RAG 系统中使用队列的原因之一。假设我们正在调用一个允许每分钟 100 次请求的外部 API。在并发用户足够多的情况下我们会超出限制并开始到处收到429错误。BullMQ 有一个内置的限流器。我们在 Worker 上配置它// adjusting the previous worker by adding a limiter configuration const worker new Worker( document-processing, ... { connection, // adding limiter limiter: { max: 100, // max jobs processed duration: 60000 // per 60 seconds (in ms) }, } );现在 BullMQ 自动将处理速度限制为每分钟 100 个任务无论队列中有多少任务或有多少 Worker 实例在运行。任务只是停留在delayed状态直到有处理能力。没有429错误不需要手动限流逻辑。仅这一点就值得采用队列系统。8、 检查任务状态轮询还记得我们从queue.add()获得的那个任务 ID 吗以下是我们如何使用它。这就是我们的前端在显示处理中…状态时轮询的内容import { Queue } from bullmq; const document_queue new Queue(document-processing, { connection }); // check job status async function getJobStatus(job_id: string) { const job await document_queue.getJob(job_id); if (!job) { return { status: not_found }; } const state await job.getState(); const result job.returnvalue; // only populated when completed return { id: job.id, state, // waiting | active | completed | failed | delayed result, failedReason: job.failedReason, }; }我们的 API 端点暴露了这个功能我们的前端每隔几秒轮询一次我们在不保持开放连接的情况下显示进度。干净简洁。9、常见问题解答为什么选 BullMQ 而不是 RabbitMQ这个问题经常被问到。两者都是合法的队列系统但它们解决的是略有不同的问题。RabbitMQ 是一个完整的消息代理一个独立的服务有自己的协议AMQP、自己的概念交换机、绑定、路由键和自己的运维开销。它功能强大且久经考验特别适用于需要根据复杂规则在许多不同消费者之间路由消息的微服务。BullMQ 是一个运行在 Redis 之上的任务队列库而 Redis 我们可能已经有了。它专注于一个用例将任务入队、可靠地处理它们、跟踪它们的状态。简而言之BullMQ 最适合后台任务和任务队列。RabbitMQ 最适合事件流和微服务消息传递。何时切换到 RabbitMQ 或 Kafka在某些场景下你确实会想要迁移到 RabbitMQ 或 Kafka。切换到 RabbitMQ 的时机你需要扇出——一个事件需要同时触发多个独立的消费者例如文档已上传应该同时触发导入流水线、通知服务和分析事件你正在构建一个微服务网格其中许多服务需要通过消息以复杂的路由规则进行通信你需要协议级别的互操作性——你的一些服务不是 Node.js需要使用 AMQP 协议切换到 Kafka 的时机你需要一个事件日志——你想要重放过去的事件而不仅仅是处理一次你正在处理极高的吞吐量——想想每秒数百万事件社交媒体信息流、IoT 遥测、点击流你需要流处理——实时聚合、转换或连接事件流你的保留要求意味着你需要将事件保存数天或数周以便下游消费者赶上10、结束语我们涵盖了以下内容为什么同步 API 调用在现实条件下会崩溃生产者/队列/Worker的思维模型一个可以处理重试、限流和任务状态跟踪的 BullMQ 工作设置以及最后什么时候坚持使用 BullMQ什么时候该使用更重的工具。BullMQ 确实足以处理大多数用例的后台处理。从你的 API 处理程序中投入任务让 Worker 以可控的速度处理它们然后从客户端轮询状态。后续方向Webhook 替代轮询——不再每隔几秒轮询一次检查状态我们可以让 Worker 在任务完成时调用一个 webhook URL。更高效更少噪音。优先级队列——有些任务比其他任务更紧急。BullMQ 支持queue.add()上的priority字段。BullMQ 仪表盘——bull-board是一个用于监控队列、检查失败任务和手动重试的仪表盘 UI。在生产环境中非常有用。任务流——BullMQ 有父/子任务的概念父任务会等待所有子任务完成。适用于多步骤流水线。如果你是消息队列系统的新手队列模式可能需要一些思维上的开销来设置。但一旦它在那里添加新的后台任务类型就是小事一桩。而且你将为未来更复杂的消息队列系统做好准备。添加队列是那些能快速获得回报的架构决策之一。一如既往持续学习快乐编码原文链接BullMQAI系统缺失的队列层 - 汇智网

相关文章:

BullMQ:AI系统缺失的队列层

你是否曾经花了大量时间只是在等待?等待 API 调用完成。看着请求超时。盯着加载中的转圈动画。在某个时刻你会意识到:问题不在于代码,而在于架构。我们不能只是调用一个慢的东西然后期望一切顺利。这就是人们发明任务队列的原因。 1、什么是…...

收藏!2026年大模型红利爆发|程序员+小白必看,阿里跳槽案例+薪资表

昨天和一位杭州的老友闲聊,意外得知她成功跳槽至阿里,任职产品设计岗,谈妥的年包直接给到35万,如今已经顺利入职一个月,彻底站稳了脚跟。 熟悉她的人都清楚,这份收获绝非偶然——在此之前,她在二…...

JSON 小传:从 JavaScript 捡来的“数据网红”

先花一秒钟点个关注,今天的内容保证让你有收获。 JSON: JavaScript Object Notation(JavaScript 对象表示法) JSON 是存储和交换文本信息的语法,类似 XML。 JSON 比 XML 更小、更快,更易解析。 JSON 易于人阅读和编写。 C、Python、C、J…...

DESIGN.md:一个正在重塑AI开发美学的纯文本文件

DESIGN.md:一个正在重塑AI开发美学的纯文本文件 如果用一句话概括2026年AI开发圈最让人兴奋的变化,那一定是:AI终于开始“懂设计”了。 几个月前,你让Cursor或Claude Code帮你生成一个落地页,功能都没问题,…...

生物科研工作者的终极图标库:Bioicons 如何彻底改变你的科学绘图体验

生物科研工作者的终极图标库:Bioicons 如何彻底改变你的科学绘图体验 【免费下载链接】bioicons A library of free open source icons for science illustrations in biology and chemistry 项目地址: https://gitcode.com/gh_mirrors/bi/bioicons 作为一名…...

Java+AI<AI的使用与Java的基础学习4>

今天通过学习了解了隐式转换和强制转换隐式转换也叫自动类型提升。就是把一个取值范围小的数据或者变量,赋值给另一个取值范围大的变量。此时不需要我们额外写代码单独实现,是程序自动帮我们完成的。有两个需要记忆的规则规则一:如有byte sho…...

2026五一出行运动扭伤,五种常用止痛药怎么选?

五一假期户外活动增多,爬山、打球、跑步时脚踝扭伤或肌肉拉伤并不少见。很多人第一时间想到吃止痛药,但市面上的选择众多:布洛芬、对乙酰氨基酚、塞来昔布、双氯芬酸口服缓释片、双氯芬酸外用凝胶,到底哪个更适合急性扭伤&#xf…...

【踩坑】你以为在过人机验证,实际上正亲手把木马装进电脑 | ClickFix攻击

转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,欢迎[点赞、收藏、关注]哦~学长今天在日常逛软件分享网站时候,跳出来一个谷歌人机验证,一开始没在意,但跟选图片的验证不一样的是,这…...

操作系统(四)

一、调度算法的评价标准 1.cpu利用率: cpu利用率cpu忙碌时间 / 总时间 2.系统吞吐量: 单位时间内完成的作业数量; 系统吞吐量完成的作业数量/总时间3.周转时间: 从作业被提交给系统开始,到作业完成为止的整个时间周期…...

从C++老手到Python新手:用你熟悉的CLion无缝切换,配置Python开发环境保姆级教程

从C老手到Python新手:用CLion无缝切换的Python开发环境配置指南 作为一名长期使用CLion进行C开发的程序员,当你决定探索Python世界时,最明智的选择不是抛弃熟悉的工具,而是让CLion成为你学习新语言的跳板。JetBrains系列IDE的强大…...

26.单调栈

三种双层循环 排列 可以出现(0,1),(1,0) 包含自己的组合 严格组合 739. 每日温度 暴力解法 单调栈解法 思路 将原来的数组中找比自己的温度,放到了栈中。单调性,用的…...

【C++入门】命名空间、缺省参数、函数重载

这里我就不过多的进行描述了,有兴趣的可以去网络搜索一番。总而言之,从名称上面我们也可以看得出来,C是在C的基础上进行不断地优化发展。事实上确实是这样,C语言中90%以上的语法在C中都适用。同时我们还要知道C作为众多…...

3分钟终极指南:用KMS智能激活脚本永久激活Windows和Office

3分钟终极指南:用KMS智能激活脚本永久激活Windows和Office 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 还在为Windows系统激活弹窗而烦恼吗?或者Office突然变成只读模…...

必知必会:奖励模型训练与PPO稳定训练方法详解

必知必会:奖励模型训练与PPO稳定训练方法详解 AI-Compass 致力于构建最全面、最实用、最前沿的AI技术学习和实践生态,通过六大核心模块的系统化组织,为不同层次的学习者和开发者提供从完整学习路径。 github地址:AI-Compass👈:https://github.com/tingaicompass/AI-Com…...

必知必会:大模型对齐数据构造与PPO算法详解

必知必会:大模型对齐数据构造与PPO算法详解 AI-Compass 致力于构建最全面、最实用、最前沿的AI技术学习和实践生态,通过六大核心模块的系统化组织,为不同层次的学习者和开发者提供完整学习路径。 github地址:AI-Compass👈:https://github.com/tingaicompass/AI-Compass…...

ToastFish:如何在工作间隙悄无声息地提升英语词汇量?

ToastFish:如何在工作间隙悄无声息地提升英语词汇量? 【免费下载链接】ToastFish 一个利用摸鱼时间背单词的软件。 项目地址: https://gitcode.com/GitHub_Trending/to/ToastFish 你是否曾经想过,那些在等待会议开始、代码编译或文件下…...

不止画板子:用嘉立创EDA专业版搞定面板打印,从设计到下单全流程解析

不止画板子:用嘉立创EDA专业版搞定面板打印,从设计到下单全流程解析 当硬件产品经理或工业设计师完成PCB设计后,如何为产品打造专业的外观面板?嘉立创EDA专业版的面板打印功能,让您无需切换软件就能实现从电路设计到外…...

基于Foundation Models框架的AI应用开发实战指南

1. 项目概述:一个面向基础模型应用开发的实战框架最近在GitHub上看到一个挺有意思的项目,叫rudrankriyam/Foundation-Models-Framework-Example。光看名字,可能有点抽象,但如果你正在尝试将像GPT、Claude、Llama这类大语言模型&am…...

树莓派PICO的板载LED还能这么玩?用MicroPython做个呼吸灯和SOS求救信号

树莓派PICO的创意灯光秀:从呼吸灯到SOS信号的MicroPython实战 第一次看到树莓派PICO板载的那颗蓝色LED时,你可能觉得它只是个简单的状态指示灯。但在这个小小的发光二极管背后,隐藏着无限的可能性。今天,我们就来解锁这颗LED的创意…...

基于MCP协议构建AI趋势分析工具:连接Google Trends与智能助手

1. 项目概述:一个连接趋势数据与AI的桥梁如果你正在构建一个需要实时洞察市场动态、追踪社交媒体热点或分析行业趋势的AI应用,那么你很可能面临一个核心痛点:如何让AI模型(比如ChatGPT、Claude等)直接、可靠地获取到这…...

MCP 2026医疗数据跨境传输新规生效在即:三甲医院已启动紧急审计,你还在用传统API网关?

更多请点击: https://intelliparadigm.com 第一章:MCP 2026医疗数据跨境传输新规的核心要义与合规边界 监管框架的结构性跃迁 MCP 2026(Medical Cross-border Protocol 2026)并非对既有《个人信息出境标准合同办法》的简单修订&…...

OpenClaw时空之锚——从离散指令到硅基时空连续体的本体论坍缩(第二十二篇)

OpenClaw时空之锚——从离散指令到硅基时空连续体的本体论坍缩(第二十二篇)导言:当龙虾挣脱离散的钟摆,时间便有了肉体在4月26日实时传输协议赋予Agent“感觉运动通路”后,4月29日的更新以一种近乎暴烈的方式&#xff…...

Provision CLI:将AI工作流转化为可复用技能,破解团队知识孤岛

1. 项目概述:从零散经验到可复用的AI技能在AI工具深度融入日常工作的今天,一个普遍且令人头疼的现象是:团队里总有人能摸索出一套高效的工作流,比如用Claude Code快速生成特定业务场景的代码,或者用Cursor精准地重构某…...

LILYGO 7.5英寸电子墨水屏与ESP32开发实战指南

1. 项目概述:LILYGO 7.5英寸电子墨水屏与ESP32开发板组合方案作为一名长期关注嵌入式显示技术的开发者,最近LILYGO推出的7.5英寸电子墨水屏(E-Paper)引起了我的注意。这款售价52美元的大尺寸显示屏完美适配该品牌多款T5系列ESP32开…...

SOCD Cleaner终极指南:5分钟解决游戏按键冲突的免费方案

SOCD Cleaner终极指南:5分钟解决游戏按键冲突的免费方案 【免费下载链接】socd Key remapper for epic gamers 项目地址: https://gitcode.com/gh_mirrors/so/socd 在竞技游戏的巅峰对决中,毫秒级的操作延迟可能决定胜负。当玩家同时按下W和S键时…...

GEM框架下的强化学习环境设计与多智能体交互实践

1. 为什么需要GEM框架下的强化学习环境在强化学习领域,环境模拟一直是个头疼的问题。我刚开始做多智能体研究时,最痛苦的就是每个项目都要从头搭建测试环境。不同论文的环境接口五花八门,有的用OpenAI Gym标准,有的自定义协议&…...

Ex-Omni框架:用自然语言生成3D面部动画的实战指南

1. 项目背景与核心价值去年在参与一个虚拟数字人项目时,我们团队曾为如何让AI生成的面部动画更自然真实而头疼。传统方案要么依赖复杂的动作捕捉设备,要么需要美术师逐帧调整,成本高且效率低下。直到接触到Ex-Omni这个开源框架,才…...

多模态AI技术助力听障沟通:HI-TransPA系统解析

1. 项目背景与核心价值作为一名长期关注无障碍技术发展的从业者,我见证了太多听障人士在语音沟通场景中面临的困境。传统的手语翻译服务存在人力成本高、响应延迟大等问题,而市面上大多数语音转文字工具又难以处理复杂的环境音和方言口音。这就是我们团队…...

从账单明细看 Taotoken 按 token 计费如何帮助项目厘清成本

从账单明细看 Taotoken 按 token 计费如何帮助项目厘清成本 1. 账单明细的核心价值 在项目管理中,资源消耗的透明化是成本控制的基础。Taotoken 提供的账单明细功能将每个 API Key 的调用记录按模型分类统计,精确到 token 粒度的计费方式让团队能够追溯…...

qapyq:AI模型训练数据集的图像管理与标注工作站实战指南

1. 项目概述:一个为AI模型训练而生的图像管理与标注工作站 如果你正在为Stable Diffusion、LoRA或者任何生成式AI模型准备训练数据集,那你一定体会过那种在成千上万张图片和文本标签之间反复横跳的痛苦。传统的看图软件和文本编辑器在这种高强度、高精度…...