Celery(分布式任务队列)入门学习笔记
Celery 的简单介绍
用 Celery 官方的介绍:它是一个分布式任务队列; 简单,灵活,可靠的处理大量消息的分布式系统; 它专注于实时处理,并支持任务调度。
Celery 如果使用 RabbitMQ 作为消息系统的话,整个应用体系就是下面这张图
Celery 官方给出的 Hello World, 对于未接触它的人来说根本就不知道是什么
1 2 3 4 5 6 7 | from celery import Celery app = Celery('hello', broker='amqp://guest@localhost//') @app.task def hello(): return 'hello world' |
还是有必要按住上面那张图看 Celery 的组成部分
- Celery 自身实现的部分其实是 Producer 和 Consumer. Producer 创建任务,并发送消息到消息队列,我们称这个队列为 Broker。Consumer 从 Broker 中接收消息,完成计算任务,把结果存到 Backend
- Broker 就是那个消息队列,可选择的实现有 RabbitMQ, Redis, Amazon SQS
- 结果存储(Backend), 可选择 AMQP(像 RabbitMQ 就是它的一个实现), Redis, Memcached, Cassandra, Elasticsearch, MongoDB, CouchDB, DynamoDB, Amazon S3, File system 等等,看来它的定制性很强
- 消息和结果的存储还涉及到一个序列化的问题,可选择 pickle(Python 专用), json, yaml, msgpack. 消息可用 zlib, bzip2 进行压缩, 或加密存储
- Worker 的并发可采用 prefork(多进程), thread(多线程), Eventlet, gevent, solo(单线程)]
Celery 应用的基础选型
Celery 的 Broker 和 Backend 有非常多的选择组合,RabbitMQ 和 Redis 都是即可作为 Broker 又能用作 Backend。但 Celery 的推荐是用 RabbitMQ 作为 Broker, 小的结果这里选择用 Redis 作为 Backend, 所以这里的选型是
- Broker: RabbitMQ
- Backend: Redis
- 序列化:JSON -- 方便在学习中查到消息中的数据
准备 Redis
安装 Python 包
在需要运行 Producer 和 Consumer(worker) 的机器上创建一个 Python 虚拟环境,然后安装下面的包
$ pip install celery redis
实践中只需要安装 celery redis 就能运行后面的例子,没有安装 librabbitmq, "celery[librabbitmq]" 也行,安装了这两个库能使用更高效的 librabbitmq C 库。如果安装了 librabbitmq 库,broker='amqp://...' 默认使用 librabbitmq, 找不到 librabbitmq 的话就用 broker='pyamqp://...'
$ pip install librabbitmq
$ pip install "celery[librabbitmq]"
注:中括号中的是安装 Celery 提供的 bundle, 它定义在 setup.py 的 setup 函数中的 extras_require。
Celery 应用实战
我们不用 Celery 的 Hello World 实例,那不能帮助我们理解背后发生了什么。创建一个 tasks.py 文件
1 2 3 4 5 6 7 8 9 10 11 | from celery import Celery app = Celery('celery-demo', broker='amqp://celery:your-password@192.168.86.181:5672/', backend='redis://192.168.86.181:6379') @app.task def add(x, y): return x + y |
这里配置连接到 brocker 的 /
vhost, 如果连接到别的 vhost, 如 celery 的话, url 写成 amqp://celery:your-passoword@192.168.86.181:5672/celery
. backend 的 redis 如果要配置密码, 和 db 的话,写成 redis://:password@192.168.86.181:6379/2
暂且不在该脚本中直接执行 add.delay(15, 30)
, 而是放到 Python 控制台下方便测试
现在进到 Python 控制台
1 2 3 4 5 6 | >>> from tasks import add >>> task = add.delay(15, 30) >>> task.id 'c3552fa2-502a-450b-933b-19a1da65ba33' >>> task.status 'PENDING' |
由于 Worker 还没有启动,所以得到一个 task_id, 状态是 PENDING。趁这时候看看 Celery 目前做了什么,来查看到 RabbitMQ
7 | celery direct |
Celery 在 RabbitMQ 中创建了的资源有
- 一个 Exchange: celery direct
- 两个 binding: 送到
默认(空字符串)
或celery
exchange 的, routing-key 为 celery 的消息会转发到队列celery
中 - 一个队列
celery
查看队列 celery
中的消息
1 2 3 4 5 6 | vagrant@celery:~$ rabbitmqadmin get queue=celery ackmode=ack_requeue_true +-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+ | routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered | +-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+ | celery | | 0 | [[15, 30], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}] | 83 | string | False | +-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+ |
ackmode=ack_requeue_true, 所以消息仍然在队列中, Redis 中什么也还没发生,接下来要
启动 Celery Worker
要用到 celery 命令,不过只要是 Python 的程序,命令行能做的事情总是能用 Python 代码来执行,用 celery --help
可看它的详细说明。
$ celery -A tasks worker -l INFO
tasks
是自己创建的模块文件 tasks.py
这时候显示出一条绿绿的芹菜出来了,所以得用屏幕截图来表现
取出消息并显示任务执行完成,这时候去看 RabbitMQ 的队列 celery 中的消息不见了,启动 Worker 后也会在 RabbitMQ 中创建 queue, 及对应的 binding, exchange。
再回到提交任务的 Python 控制台
1 2 3 4 | >>> task.status 'SUCCESS' >>> task.result 45 |
一个 Celery 全套服务圆满完成。结果存在了 Redis 中
192.168.86.181:6379> keys *
1) "celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33"
192.168.86.181:6379> TTL celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
(integer) 85840
192.168.86.181:6379> get celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
"{\"status\": \"SUCCESS\", \"result\": 45, \"traceback\": null, \"children\": [], \"date_done\": \"2022-01-17T07:23:48.901999\", \"task_id\": \"c3552fa2-502a-450b-933b-19a1da65ba33\"}"
Redis 中的结果保存时长为 24 小时,失败的任务会记录下异常信息。
关于 Worker 的控制查看帮助 celery worker --help
, 比如
- -c, --concurrency: 并发数,默认为系统中 CPU 的内核数
- -P, --pool [prefork|eventlet|gevent|solo|processes|threads]: worker 池的实现方式
- --max-tasks-per-child INTEGER: worker 执行的最大任务数,达到最大数目后便重启当前 worker
- -Q, --queues: 指定处理任务的队列名称,逗号分隔
任务的状态变迁是:PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
Celery 的配置
除了在声明 Celery 对象时可以指定 broker, backend 属性之外,我们可以用 py 配置文件的形式来配置更多的内容,配置文件 celeryconfig.py
, 内容是 Configuration and defautls 中列出的项目
比如 celeryconfig.py
1 2 3 4 5 6 7 | broker_url = 'amqp://celery:your-password@192.168.86.181:5672/' result_backend = 'redis://192.168.86.181:6379' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'America/Chicago' enable_utc = True |
新的格式是用小写的,旧格式用大写,如 BROKER_URL
, 但是同一个配置文件中不能混合大小写,同时写 BROKER_URL
和 result_backend
就不行了。
然后在 tasks.py 中加载配置文件
1 2 3 4 | from celery import Celery import celeryconfig app = Celery('celery-demo') app.config_from_object(celeryconfig) |
Celery 实时监控工具
Flower 是一个基于 Web 的监控 Celery 中任务的工具,安装和启动
$ pip install flower
$ celery -A tasks flower
打开链接 http://localhost:5555
其他剩下的问题,应该就是如何安排 Worker(比如结合 AutoScaling),从 Python 代码中启动 Worker, 怎么做灵活的配置, 调度任务的执行,其他的 backend 选择等等。
其他补充
backend rpc:// 的组合
如果配置中用
1 2 | broker_url = 'amqp://celery:password@192.168.86.50:5672/celery' result_backend = 'rpc://' |
amqp 和 rpc:// 的组合,任务和结果都会存在 RabbitMQ 中
1 2 | broker_url = 'redis://192.168.86.50' result_backend = 'rpc://' |
redis 和 rpc:// 的组合,任务和结果都保存在 Redis 中
为什么 Celery 推荐使用 RabbitMQ, 一说是它的一开发人员负责开发过 RabbitMQ, 所以即使使用 Redis 时,也会在 Redis 中写入有关 RabbitMQ 概念的数据,如 exchange, routing key 等。
常见问题
Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案
先安装eventlet
pip install eventlet
然后,启动worker的时候加一个参数,如下:
celery -A <moduleName> worker -l info -P eventlet
然后就可以正常运行worker执行任务了
相关文章:

Celery(分布式任务队列)入门学习笔记
Celery 的简单介绍 用 Celery 官方的介绍:它是一个分布式任务队列; 简单,灵活,可靠的处理大量消息的分布式系统; 它专注于实时处理,并支持任务调度。 Celery 如果使用 RabbitMQ 作为消息系统的话,整个应用体系就是下…...
【网络】tcp协议如何保证可靠性
TCP(Transmission Control Protocol)是一种面向连接的、可靠的传输层协议,为网络通信提供了可靠性和连接稳定性。本文将详细介绍 TCP 协议如何保证数据的可靠传输和连接的稳定性,并分析其优缺点。 可靠性保证 序号和确认机制&…...

select,poll,epoll
在 Linux Socket 服务器短编程时,为了处理大量客户的连接请求,需要使用非阻塞I/O和复用,select,poll 和 epoll 是 Linux API 提供的I/O复用方式。 \selectpollepoll操作方式遍历遍历回调底层实现数组链表哈希表IO效率每次调用都进…...
【48天笔试强训】day18
题目1 描述 有一种兔子,从出生后第3个月起每个月都生一只兔子,小兔子长到第三个月后每个月又生一只兔子。 例子:假设一只兔子第3个月出生,那么它第5个月开始会每个月生一只兔子。 一月的时候有一只兔子,假如兔子都…...

链表经典面试题01
目录 引言 面试题01:返回倒数第k个节点 题目描述: 思路分析: 代码展示: 面试题02:链表的回文结构 题目描述: 描述 思路分析: 代码展示: 面试题03:相交链表 题目描述: 思路分析: 代码展示: 小结: 引言 这次的题均来自力扣和牛客有关链表的经典面试题,代码只会展示…...

基于java的CRM客户关系管理系统的设计与实现(论文 + 源码 )
【免费】基于Java的CRM客户关系管理系统的设计和实现.zip资源-CSDN文库https://download.csdn.net/download/JW_559/89273409 基于Java的CRM客户关系管理系统的设计与实现 摘 要 随着互联网的高速发展,市场经济的信息化,让企业之间的竞争变得࿰…...
【动态规划-最长上升子序列模型part2】:拦截导弹、导弹防御系统、最长公共上升子序列【已更新完成】
1、拦截导弹 某国为了防御敌国的导弹袭击,发展出一种导弹拦截系统。 但是这种导弹拦截系统有一个缺陷:虽然它的第一发炮弹能够到达任意的高度,但是以后每一发炮弹都不能高于前一发的高度。 某天,雷达捕捉到敌国的导弹来袭。 由于…...
Spring 如何解决 Bean 循环依赖
循环依赖解释 bean A 属性注入时依赖bean B ,并且bean B属性注入时也依赖bean A ,造成 bean A 和bean B 都无法完成初始化问题,形成了闭环。 注意 项目中存在Bean的循环依赖,是Bean对象职责划分不明确、代码质量不高的表现&#…...

【driver4】锁,错误码,休眠唤醒,中断,虚拟内存,tasklet
文章目录 1.互斥锁和自旋锁选择:自旋锁(开销少)的自旋时间和被锁住的代码执行时间成正比关系2.linux错误码:64位系统内核空间最后一页地址为0xfffffffffffff000~0xffffffffffffffff,这段地址是被保留的,如果…...
python之 函数相关知识解析
01 函数的注释与嵌套 1.函数的注释 函数的注释与普通注释的区别:用来说明当前函数的参数含义 param 参数名: 参数的注释信息 return: 函数的返回值 例如: def fun1(name):""":param name: 参数的注释信息:return: 函数的返回值"…...

监视器和显示器的区别,普通硬盘和监控硬盘的区别
监视器与显示器的区别,你真的知道吗? 中小型视频监控系统中,显示系统是最能展现效果的一个重要环节,显示系统的优劣将直接影响视频监控系统的用户体验满意度。 中小型视频监控系统中,显示系统是最能展现效果的一个重要…...
Linux:升级OpenSSL和OpenSSH
原因是现有版本存在安全漏洞,需要升级到新版本 原有版本和升级后的版本 OpenSSL 1.0.2k-fips 26 Jan 2017 -> OpenSSL 1.1.1w 11 Sep 2023OpenSSH_7.4p1, OpenSSL 1.0.2k-fips 26 Jan 2017 -> OpenSSH_9.5p1, OpenSSL 1.1.1w 11 Sep 2023目录 查看现有版…...

方法的入栈和出栈
一.作用域问题 1.全局作用域 在全局都能进行访问的变量 var a 10;function fn() {var b 20;return a b;}console.log(fn()); 2.局部的作用域 只能在限定的范围内进行访问 function fn() {var b 20;}console.log(b); b is not defined 打印的结果是b这个变量没用定义 3…...
PHP介绍
🐌博主主页:🐌倔强的大蜗牛🐌 📚专栏分类:PHP❤️感谢大家点赞👍收藏⭐评论✍️ 目录 一、PHP是什么? 二、 PHP 文件是什么? 三、PHP能做什么? 四、P…...

接口自动化测试之-requests模块详解
一、requests背景 Requests 继承了urllib2的所有特性。Requests支持HTTP连接保持和连接池,支持使用cookie保持会话,支持文件上传,支持自动确定响应内容的编码,支持国际化的 URL 和 POST 数据自动编码。 二、requests安装 利用p…...

低代码+定制物资管理:创新解决方案探析
引言 在当今快速变化的商业环境中,企业面临着不断增长的挑战,如提高效率、降低成本、满足客户需求等。为了应对这些挑战,企业需要不断创新并采用先进的技术解决方案。在这样的背景下,低代码开发和定制化物资管理成为了引领企业变…...

13 【PS作图】人物绘画理论-脸型
三庭五眼 三庭:脸的长度比例 (1)发际线到眉毛 (2)眉毛到鼻底 (3)鼻底到下巴 三个部分大致为三等分 五眼:脸的宽度比例 以眼睛长度为单位,把脸的宽度分成五等分&#x…...

Python批量修改图片文件名中的指定名称
批量处理图像时,图片名有时需要统一,本教程仅针对图片中名如:0001x4.png,批量将图片名中的x4去除,只留下0001.png的情况。 如果想要按照原图片顺序批量修改图片名,参考其它博文:按照原顺序批量…...

STM32点灯大师(点了一颗LED灯,轮询法)
配置操作: 一、使用CubeMX配置到大致的操作 1.1 选择芯片 1.2 选择引脚(根据电路图) 1.3 配置gpio口 1.4 配置系统 1.5文件项目操作 最后就是点击 二、点击CubeMX生成的代码,并且修改代码 2.1 看看效果 2.2 写代码...

动态规划算法:路径问题
例题一 解法(动态规划): 算法思路: 1. 状态表⽰: 对于这种「路径类」的问题,我们的状态表⽰⼀般有两种形式: i. 从 [i, j] 位置出发,巴拉巴拉; ii. 从起始位置出…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
在Ubuntu24上采用Wine打开SourceInsight
1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...
【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error
在前端开发中,JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作(如 Promise、async/await 等),开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝(r…...
Vue 模板语句的数据来源
🧩 Vue 模板语句的数据来源:全方位解析 Vue 模板(<template> 部分)中的表达式、指令绑定(如 v-bind, v-on)和插值({{ }})都在一个特定的作用域内求值。这个作用域由当前 组件…...

CSS3相关知识点
CSS3相关知识点 CSS3私有前缀私有前缀私有前缀存在的意义常见浏览器的私有前缀 CSS3基本语法CSS3 新增长度单位CSS3 新增颜色设置方式CSS3 新增选择器CSS3 新增盒模型相关属性box-sizing 怪异盒模型resize调整盒子大小box-shadow 盒子阴影opacity 不透明度 CSS3 新增背景属性ba…...
node.js的初步学习
那什么是node.js呢? 和JavaScript又是什么关系呢? node.js 提供了 JavaScript的运行环境。当JavaScript作为后端开发语言来说, 需要在node.js的环境上进行当JavaScript作为前端开发语言来说,需要在浏览器的环境上进行 Node.js 可…...