Celery(分布式任务队列)入门学习笔记
Celery 的简单介绍
用 Celery 官方的介绍:它是一个分布式任务队列; 简单,灵活,可靠的处理大量消息的分布式系统; 它专注于实时处理,并支持任务调度。
Celery 如果使用 RabbitMQ 作为消息系统的话,整个应用体系就是下面这张图

Celery 官方给出的 Hello World, 对于未接触它的人来说根本就不知道是什么
| 1 2 3 4 5 6 7 | from celery import Celery app = Celery('hello', broker='amqp://guest@localhost//') @app.task def hello(): return 'hello world' |
还是有必要按住上面那张图看 Celery 的组成部分
- Celery 自身实现的部分其实是 Producer 和 Consumer. Producer 创建任务,并发送消息到消息队列,我们称这个队列为 Broker。Consumer 从 Broker 中接收消息,完成计算任务,把结果存到 Backend
- Broker 就是那个消息队列,可选择的实现有 RabbitMQ, Redis, Amazon SQS
- 结果存储(Backend), 可选择 AMQP(像 RabbitMQ 就是它的一个实现), Redis, Memcached, Cassandra, Elasticsearch, MongoDB, CouchDB, DynamoDB, Amazon S3, File system 等等,看来它的定制性很强
- 消息和结果的存储还涉及到一个序列化的问题,可选择 pickle(Python 专用), json, yaml, msgpack. 消息可用 zlib, bzip2 进行压缩, 或加密存储
- Worker 的并发可采用 prefork(多进程), thread(多线程), Eventlet, gevent, solo(单线程)]
Celery 应用的基础选型
Celery 的 Broker 和 Backend 有非常多的选择组合,RabbitMQ 和 Redis 都是即可作为 Broker 又能用作 Backend。但 Celery 的推荐是用 RabbitMQ 作为 Broker, 小的结果这里选择用 Redis 作为 Backend, 所以这里的选型是
- Broker: RabbitMQ
- Backend: Redis
- 序列化:JSON -- 方便在学习中查到消息中的数据
准备 Redis
安装 Python 包
在需要运行 Producer 和 Consumer(worker) 的机器上创建一个 Python 虚拟环境,然后安装下面的包
$ pip install celery redis
实践中只需要安装 celery redis 就能运行后面的例子,没有安装 librabbitmq, "celery[librabbitmq]" 也行,安装了这两个库能使用更高效的 librabbitmq C 库。如果安装了 librabbitmq 库,broker='amqp://...' 默认使用 librabbitmq, 找不到 librabbitmq 的话就用 broker='pyamqp://...'
$ pip install librabbitmq
$ pip install "celery[librabbitmq]"
注:中括号中的是安装 Celery 提供的 bundle, 它定义在 setup.py 的 setup 函数中的 extras_require。
Celery 应用实战
我们不用 Celery 的 Hello World 实例,那不能帮助我们理解背后发生了什么。创建一个 tasks.py 文件
| 1 2 3 4 5 6 7 8 9 10 11 | from celery import Celery app = Celery('celery-demo', broker='amqp://celery:your-password@192.168.86.181:5672/', backend='redis://192.168.86.181:6379') @app.task def add(x, y): return x + y |
这里配置连接到 brocker 的 / vhost, 如果连接到别的 vhost, 如 celery 的话, url 写成 amqp://celery:your-passoword@192.168.86.181:5672/celery. backend 的 redis 如果要配置密码, 和 db 的话,写成 redis://:password@192.168.86.181:6379/2
暂且不在该脚本中直接执行 add.delay(15, 30), 而是放到 Python 控制台下方便测试
现在进到 Python 控制台
| 1 2 3 4 5 6 | >>> from tasks import add >>> task = add.delay(15, 30) >>> task.id 'c3552fa2-502a-450b-933b-19a1da65ba33' >>> task.status 'PENDING' |
由于 Worker 还没有启动,所以得到一个 task_id, 状态是 PENDING。趁这时候看看 Celery 目前做了什么,来查看到 RabbitMQ
| 7 | celery direct |
Celery 在 RabbitMQ 中创建了的资源有
- 一个 Exchange: celery direct
- 两个 binding: 送到
默认(空字符串)或celeryexchange 的, routing-key 为 celery 的消息会转发到队列celery中 - 一个队列
celery
查看队列 celery 中的消息
| 1 2 3 4 5 6 | vagrant@celery:~$ rabbitmqadmin get queue=celery ackmode=ack_requeue_true +-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+ | routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered | +-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+ | celery | | 0 | [[15, 30], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}] | 83 | string | False | +-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+ |
ackmode=ack_requeue_true, 所以消息仍然在队列中, Redis 中什么也还没发生,接下来要
启动 Celery Worker
要用到 celery 命令,不过只要是 Python 的程序,命令行能做的事情总是能用 Python 代码来执行,用 celery --help 可看它的详细说明。
$ celery -A tasks worker -l INFO
tasks 是自己创建的模块文件 tasks.py
这时候显示出一条绿绿的芹菜出来了,所以得用屏幕截图来表现

取出消息并显示任务执行完成,这时候去看 RabbitMQ 的队列 celery 中的消息不见了,启动 Worker 后也会在 RabbitMQ 中创建 queue, 及对应的 binding, exchange。
再回到提交任务的 Python 控制台
| 1 2 3 4 | >>> task.status 'SUCCESS' >>> task.result 45 |
一个 Celery 全套服务圆满完成。结果存在了 Redis 中
192.168.86.181:6379> keys *
1) "celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33"
192.168.86.181:6379> TTL celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
(integer) 85840
192.168.86.181:6379> get celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
"{\"status\": \"SUCCESS\", \"result\": 45, \"traceback\": null, \"children\": [], \"date_done\": \"2022-01-17T07:23:48.901999\", \"task_id\": \"c3552fa2-502a-450b-933b-19a1da65ba33\"}"
Redis 中的结果保存时长为 24 小时,失败的任务会记录下异常信息。
关于 Worker 的控制查看帮助 celery worker --help, 比如
- -c, --concurrency: 并发数,默认为系统中 CPU 的内核数
- -P, --pool [prefork|eventlet|gevent|solo|processes|threads]: worker 池的实现方式
- --max-tasks-per-child INTEGER: worker 执行的最大任务数,达到最大数目后便重启当前 worker
- -Q, --queues: 指定处理任务的队列名称,逗号分隔
任务的状态变迁是:PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
Celery 的配置
除了在声明 Celery 对象时可以指定 broker, backend 属性之外,我们可以用 py 配置文件的形式来配置更多的内容,配置文件 celeryconfig.py, 内容是 Configuration and defautls 中列出的项目
比如 celeryconfig.py
| 1 2 3 4 5 6 7 | broker_url = 'amqp://celery:your-password@192.168.86.181:5672/' result_backend = 'redis://192.168.86.181:6379' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'America/Chicago' enable_utc = True |
新的格式是用小写的,旧格式用大写,如 BROKER_URL, 但是同一个配置文件中不能混合大小写,同时写 BROKER_URL 和 result_backend 就不行了。
然后在 tasks.py 中加载配置文件
| 1 2 3 4 | from celery import Celery import celeryconfig app = Celery('celery-demo') app.config_from_object(celeryconfig) |
Celery 实时监控工具
Flower 是一个基于 Web 的监控 Celery 中任务的工具,安装和启动
$ pip install flower
$ celery -A tasks flower

打开链接 http://localhost:5555

其他剩下的问题,应该就是如何安排 Worker(比如结合 AutoScaling),从 Python 代码中启动 Worker, 怎么做灵活的配置, 调度任务的执行,其他的 backend 选择等等。
其他补充
backend rpc:// 的组合
如果配置中用
| 1 2 | broker_url = 'amqp://celery:password@192.168.86.50:5672/celery' result_backend = 'rpc://' |
amqp 和 rpc:// 的组合,任务和结果都会存在 RabbitMQ 中
| 1 2 | broker_url = 'redis://192.168.86.50' result_backend = 'rpc://' |
redis 和 rpc:// 的组合,任务和结果都保存在 Redis 中
为什么 Celery 推荐使用 RabbitMQ, 一说是它的一开发人员负责开发过 RabbitMQ, 所以即使使用 Redis 时,也会在 Redis 中写入有关 RabbitMQ 概念的数据,如 exchange, routing key 等。
常见问题
Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案
先安装eventlet
pip install eventlet
然后,启动worker的时候加一个参数,如下:
celery -A <moduleName> worker -l info -P eventlet
然后就可以正常运行worker执行任务了
相关文章:
Celery(分布式任务队列)入门学习笔记
Celery 的简单介绍 用 Celery 官方的介绍:它是一个分布式任务队列; 简单,灵活,可靠的处理大量消息的分布式系统; 它专注于实时处理,并支持任务调度。 Celery 如果使用 RabbitMQ 作为消息系统的话,整个应用体系就是下…...
【网络】tcp协议如何保证可靠性
TCP(Transmission Control Protocol)是一种面向连接的、可靠的传输层协议,为网络通信提供了可靠性和连接稳定性。本文将详细介绍 TCP 协议如何保证数据的可靠传输和连接的稳定性,并分析其优缺点。 可靠性保证 序号和确认机制&…...
select,poll,epoll
在 Linux Socket 服务器短编程时,为了处理大量客户的连接请求,需要使用非阻塞I/O和复用,select,poll 和 epoll 是 Linux API 提供的I/O复用方式。 \selectpollepoll操作方式遍历遍历回调底层实现数组链表哈希表IO效率每次调用都进…...
【48天笔试强训】day18
题目1 描述 有一种兔子,从出生后第3个月起每个月都生一只兔子,小兔子长到第三个月后每个月又生一只兔子。 例子:假设一只兔子第3个月出生,那么它第5个月开始会每个月生一只兔子。 一月的时候有一只兔子,假如兔子都…...
链表经典面试题01
目录 引言 面试题01:返回倒数第k个节点 题目描述: 思路分析: 代码展示: 面试题02:链表的回文结构 题目描述: 描述 思路分析: 代码展示: 面试题03:相交链表 题目描述: 思路分析: 代码展示: 小结: 引言 这次的题均来自力扣和牛客有关链表的经典面试题,代码只会展示…...
基于java的CRM客户关系管理系统的设计与实现(论文 + 源码 )
【免费】基于Java的CRM客户关系管理系统的设计和实现.zip资源-CSDN文库https://download.csdn.net/download/JW_559/89273409 基于Java的CRM客户关系管理系统的设计与实现 摘 要 随着互联网的高速发展,市场经济的信息化,让企业之间的竞争变得࿰…...
【动态规划-最长上升子序列模型part2】:拦截导弹、导弹防御系统、最长公共上升子序列【已更新完成】
1、拦截导弹 某国为了防御敌国的导弹袭击,发展出一种导弹拦截系统。 但是这种导弹拦截系统有一个缺陷:虽然它的第一发炮弹能够到达任意的高度,但是以后每一发炮弹都不能高于前一发的高度。 某天,雷达捕捉到敌国的导弹来袭。 由于…...
Spring 如何解决 Bean 循环依赖
循环依赖解释 bean A 属性注入时依赖bean B ,并且bean B属性注入时也依赖bean A ,造成 bean A 和bean B 都无法完成初始化问题,形成了闭环。 注意 项目中存在Bean的循环依赖,是Bean对象职责划分不明确、代码质量不高的表现&#…...
【driver4】锁,错误码,休眠唤醒,中断,虚拟内存,tasklet
文章目录 1.互斥锁和自旋锁选择:自旋锁(开销少)的自旋时间和被锁住的代码执行时间成正比关系2.linux错误码:64位系统内核空间最后一页地址为0xfffffffffffff000~0xffffffffffffffff,这段地址是被保留的,如果…...
python之 函数相关知识解析
01 函数的注释与嵌套 1.函数的注释 函数的注释与普通注释的区别:用来说明当前函数的参数含义 param 参数名: 参数的注释信息 return: 函数的返回值 例如: def fun1(name):""":param name: 参数的注释信息:return: 函数的返回值"…...
监视器和显示器的区别,普通硬盘和监控硬盘的区别
监视器与显示器的区别,你真的知道吗? 中小型视频监控系统中,显示系统是最能展现效果的一个重要环节,显示系统的优劣将直接影响视频监控系统的用户体验满意度。 中小型视频监控系统中,显示系统是最能展现效果的一个重要…...
Linux:升级OpenSSL和OpenSSH
原因是现有版本存在安全漏洞,需要升级到新版本 原有版本和升级后的版本 OpenSSL 1.0.2k-fips 26 Jan 2017 -> OpenSSL 1.1.1w 11 Sep 2023OpenSSH_7.4p1, OpenSSL 1.0.2k-fips 26 Jan 2017 -> OpenSSH_9.5p1, OpenSSL 1.1.1w 11 Sep 2023目录 查看现有版…...
方法的入栈和出栈
一.作用域问题 1.全局作用域 在全局都能进行访问的变量 var a 10;function fn() {var b 20;return a b;}console.log(fn()); 2.局部的作用域 只能在限定的范围内进行访问 function fn() {var b 20;}console.log(b); b is not defined 打印的结果是b这个变量没用定义 3…...
PHP介绍
🐌博主主页:🐌倔强的大蜗牛🐌 📚专栏分类:PHP❤️感谢大家点赞👍收藏⭐评论✍️ 目录 一、PHP是什么? 二、 PHP 文件是什么? 三、PHP能做什么? 四、P…...
接口自动化测试之-requests模块详解
一、requests背景 Requests 继承了urllib2的所有特性。Requests支持HTTP连接保持和连接池,支持使用cookie保持会话,支持文件上传,支持自动确定响应内容的编码,支持国际化的 URL 和 POST 数据自动编码。 二、requests安装 利用p…...
低代码+定制物资管理:创新解决方案探析
引言 在当今快速变化的商业环境中,企业面临着不断增长的挑战,如提高效率、降低成本、满足客户需求等。为了应对这些挑战,企业需要不断创新并采用先进的技术解决方案。在这样的背景下,低代码开发和定制化物资管理成为了引领企业变…...
13 【PS作图】人物绘画理论-脸型
三庭五眼 三庭:脸的长度比例 (1)发际线到眉毛 (2)眉毛到鼻底 (3)鼻底到下巴 三个部分大致为三等分 五眼:脸的宽度比例 以眼睛长度为单位,把脸的宽度分成五等分&#x…...
Python批量修改图片文件名中的指定名称
批量处理图像时,图片名有时需要统一,本教程仅针对图片中名如:0001x4.png,批量将图片名中的x4去除,只留下0001.png的情况。 如果想要按照原图片顺序批量修改图片名,参考其它博文:按照原顺序批量…...
STM32点灯大师(点了一颗LED灯,轮询法)
配置操作: 一、使用CubeMX配置到大致的操作 1.1 选择芯片 1.2 选择引脚(根据电路图) 1.3 配置gpio口 1.4 配置系统 1.5文件项目操作 最后就是点击 二、点击CubeMX生成的代码,并且修改代码 2.1 看看效果 2.2 写代码...
动态规划算法:路径问题
例题一 解法(动态规划): 算法思路: 1. 状态表⽰: 对于这种「路径类」的问题,我们的状态表⽰⼀般有两种形式: i. 从 [i, j] 位置出发,巴拉巴拉; ii. 从起始位置出…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配
AI3D视觉的工业赋能者 迁移科技成立于2017年,作为行业领先的3D工业相机及视觉系统供应商,累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成,通过稳定、易用、高回报的AI3D视觉系统,为汽车、新能源、金属制造等行…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...
RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
多模态图像修复系统:基于深度学习的图片修复实现
多模态图像修复系统:基于深度学习的图片修复实现 1. 系统概述 本系统使用多模态大模型(Stable Diffusion Inpainting)实现图像修复功能,结合文本描述和图片输入,对指定区域进行内容修复。系统包含完整的数据处理、模型训练、推理部署流程。 import torch import numpy …...
MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
认识CMake并使用CMake构建自己的第一个项目
1.CMake的作用和优势 跨平台支持:CMake支持多种操作系统和编译器,使用同一份构建配置可以在不同的环境中使用 简化配置:通过CMakeLists.txt文件,用户可以定义项目结构、依赖项、编译选项等,无需手动编写复杂的构建脚本…...
【FTP】ftp文件传输会丢包吗?批量几百个文件传输,有一些文件没有传输完整,如何解决?
FTP(File Transfer Protocol)本身是一个基于 TCP 的协议,理论上不会丢包。但 FTP 文件传输过程中仍可能出现文件不完整、丢失或损坏的情况,主要原因包括: ✅ 一、FTP传输可能“丢包”或文件不完整的原因 原因描述网络…...
