Celery 核心概念详解及示例
Celery 核心概念详解及示例
Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,提供对任务队列的操作,并支持任务的调度和异步执行。它常用于深度优化 Web 应用的性能和响应速度,通过将耗时的操作移到后台异步执行,使主程序不被阻塞。
本文将统一介绍 Celery 中的核心概念,并通过示例进行说明,帮助您深入理解并应用 Celery 来构建高性能的异步任务处理系统。
目录
- Celery 核心概念详解及示例
- 任务(Task)
- 示例
- 工作者(Worker)
- 启动 Worker
- 消息代理(Broker)
- 配置示例
- 客户端(Client)
- 任务调用示例
- 结果后端(Result Backend)
- 配置示例
- 获取任务结果
- 任务队列(Queue)
- 定义队列中的任务
- 启动 Worker 监听指定队列
- 任务路由(Task Routing)
- 配置路由
- 并发(Concurrency)
- 启动 Worker 指定并发方式
- 定时任务(Periodic Tasks)
- 配置定时任务
- 启动 Celery Beat
- 任务重试与超时(Task Retry and Timeout)
- 设置任务重试
- 设置任务超时
- 任务组与工作流(Task Groups and Workflows)
- 组任务(Group)
- 链任务(Chain)
- 示例项目结构
- tasks.py
- app.py
- worker.sh
- 总结
- 参考资料
任务(Task)
任务(Task) 是 Celery 的基本执行单元,表示要执行的具体函数或操作。在 Celery 中,任务通常由装饰器 @task
或 @shared_task
声明。
示例
# tasks.py
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
- 说明:
- 使用
@app.task
装饰器将函数add
声明为一个 Celery 任务。 - 任务可以被工作者执行,并可以异步调用。
- 使用
工作者(Worker)
工作者(Worker) 是实际执行任务的进程或节点。工作者从消息代理中获取任务,执行任务函数,并返回结果。
启动 Worker
celery -A tasks worker --loglevel=INFO
- 说明:
-A tasks
:指定 Celery 应用实例的位置。worker
:启动一个工作者进程。--loglevel=INFO
:设置日志级别。
消息代理(Broker)
消息代理(Broker) 是任务队列的中介,负责接收来自客户端的任务,并将任务分发给工作者。常用的消息代理包括 Redis、RabbitMQ、Amazon SQS 等。
配置示例
# 使用 Redis 作为消息代理
app = Celery('tasks', broker='redis://localhost:6379/0')
- 说明:
broker
参数指定消息代理的连接 URL。
客户端(Client)
客户端(Client) 是发送任务到消息代理的应用程序。它可以是 Web 应用、脚本等,使用 Celery 的 API 将任务异步地发送到 Broker。
任务调用示例
# 调用加法任务
from tasks import addresult = add.delay(4, 6)
- 说明:
- 使用
delay()
方法将任务发送到消息代理,立即返回一个AsyncResult
对象。 - 任务将在后台执行,客户端无需等待任务完成。
- 使用
结果后端(Result Backend)
结果后端(Result Backend) 用于存储任务执行的结果,允许客户端在任务执行完成后获取结果。常用的结果后端包括 Redis、RabbitMQ、数据库 等。
配置示例
# 使用 Redis 作为结果后端
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
获取任务结果
# 获取任务结果
result = add.delay(4, 6)
print(result.get(timeout=10)) # 输出 10
- 说明:
result.get(timeout=10)
:等待任务完成并获取结果,超时设置为 10 秒。
任务队列(Queue)
任务队列(Queue) 是存储任务的队列,允许将任务分类、路由和分配。队列的使用可以实现任务的隔离和资源的优化。
定义队列中的任务
@app.task(queue='priority_high')
def high_priority_task():pass
启动 Worker 监听指定队列
celery -A tasks worker -Q priority_high --loglevel=INFO
- 说明:
-Q priority_high
:指定 Worker 只监听名为priority_high
的队列。
任务路由(Task Routing)
任务路由(Task Routing) 用于将任务分配到特定的队列,可以在任务定义、配置文件或运行时指定。
配置路由
# 配置路由规则
app.conf.task_routes = {'tasks.add': {'queue': 'math_queue'},'tasks.mul': {'queue': 'math_queue'},'tasks.email': {'queue': 'email_queue'},
}
- 说明:
- 将
add
和mul
任务路由到math_queue
队列,email
任务路由到email_queue
队列。
- 将
并发(Concurrency)
Celery 支持多种并发方式,包括多进程、多线程和协程。常用的并发池有 prefork(默认,多进程)、eventlet(协程)、gevent(协程)。
启动 Worker 指定并发方式
# 使用多进程并发,设置并发数为 4
celery -A tasks worker --concurrency=4# 使用 gevent 并发
celery -A tasks worker -P gevent --concurrency=1000
- 说明:
--concurrency=4
:设置并发的工作进程数。-P gevent
:指定并发池为gevent
,适用于 I/O 密集型任务。
定时任务(Periodic Tasks)
定时任务(Periodic Tasks) 是指按照预设的时间间隔或特定时间点自动执行的任务。Celery 提供了 Celery Beat 调度器来管理定时任务。
配置定时任务
from celery.schedules import crontabapp.conf.beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},'multiply-at-midnight': {'task': 'tasks.mul','schedule': crontab(hour=0, minute=0),'args': (4, 5),},
}
启动 Celery Beat
celery -A tasks beat --loglevel=INFO
- 说明:
- Celery Beat 会按照配置的计划周期性地发送任务到 Broker。
任务重试与超时(Task Retry and Timeout)
任务在执行过程中可能失败,Celery 提供了任务重试和超时机制,增强任务的可靠性。
设置任务重试
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def unstable_task(self, *args):try:# 可能出现异常的代码passexcept Exception as exc:# 任务失败后重试raise self.retry(exc=exc)
- 说明:
max_retries=3
:最大重试次数为 3 次。default_retry_delay=60
:每次重试之间的间隔为 60 秒。self.retry(exc=exc)
:任务失败时抛出retry
异常,实现重试。
设置任务超时
@app.task(time_limit=300, soft_time_limit=200)
def long_running_task():pass
- 说明:
time_limit=300
:任务的硬超时时间为 300 秒,超时后工作进程会被杀死。soft_time_limit=200
:任务的软超时时间为 200 秒,超时后会抛出SoftTimeLimitExceeded
异常,可以在任务中捕获并处理。
任务组与工作流(Task Groups and Workflows)
Celery 支持任务的组合执行,包括组任务(Group Tasks)、链任务(Chain Tasks) 和 回调(Callbacks),用于构建复杂的工作流。
组任务(Group)
同时执行一组独立的任务,所有任务完成后返回结果列表。
from celery import group# 定义任务组
job = group(add.s(i, i) for i in range(10))
result = job.apply_async()# 获取所有任务的结果
print(result.get())
- 说明:
add.s(i, i)
:生成任务的签名。group()
:创建任务组。apply_async()
:异步执行任务组。
链任务(Chain)
顺序执行一系列任务,每个任务的结果作为下一个任务的输入。
from celery import chain# 定义任务链
job = chain(add.s(2, 2) | mul.s(4))result = job.apply_async()# 获取最终结果
print(result.get()) # 输出 (2 + 2) * 4 = 16
- 说明:
|
运算符连接任务,表示顺序执行。
示例项目结构
一个简单的 Celery 项目通常包含以下结构:
project/
│
├── tasks.py # 定义任务
├── app.py # 定义 Celery 应用
└── worker.sh # 启动 Worker 的脚本
tasks.py
from app import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y
app.py
from celery import Celeryapp = Celery('project',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0')# 可在此处配置任务路由、定时任务等
worker.sh
#!/bin/bash
celery -A app worker --loglevel=INFO
总结
Celery 通过引入 任务、工作者、消息代理、结果后端、任务队列 等核心概念,构建了一个强大的异步任务处理框架。通过合理地配置和使用这些概念,您可以:
- 异步执行耗时任务,提高应用程序的响应速度。
- 实现任务的调度和重试,增强系统的可靠性。
- 优化资源利用率,通过并发和队列管理提升性能。
- 构建复杂的任务工作流,满足多样化的业务需求。
希望通过本文对 Celery 核心概念的统一介绍和示例说明,您可以更好地理解和应用 Celery,在实际项目中构建高性能的异步任务处理系统。
参考资料
- Celery 官方文档
- Celery 入门教程
- Celery 最佳实践
- 使用 Celery 构建异步任务队列
相关文章:
Celery 核心概念详解及示例
Celery 核心概念详解及示例 Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,提供对任务队列的操作,并支持任务的调度和异步执行。它常用于深度优化 Web 应用的性能和响应速度,通过将耗时的操作移到后台异步执行&am…...

欢乐熊大话蓝牙知识14:用 STM32 或 EFR32 实现 BLE 通信模块:从0到蓝牙,你也能搞!
🚀 用 STM32 或 EFR32 实现 BLE 通信模块:从0到蓝牙,你也能搞! “我能不能自己用 STM32 或 EFR32 实现一个 BLE 模块?” 答案当然是:能!还能很帅! 👨🏭 前…...

IDEA 在公司内网配置gitlab
赋值项目链接 HTTPS 将HTTP的链接 ip地址换成 内网地址 例如:https:172.16.100.18/...... 如果出现需要需要Token验证的情况: 参考:Idea2024中拉取代码时GitLab提示输入token的问题_gitlab token-CSDN博客...

黑马Java面试笔记之 微服务篇(业务)
一. 限流 你们项目中有没有做过限流?怎么做的? 为什么要限流呢? 一是并发的确大(突发流量) 二是防止用户恶意刷接口 限流的实现方式: Tomcat:可以设置最大连接数 可以通过maxThreads设置最大Tomcat连接数,实现限流,但是适用于单体架构 Nginx:漏桶算法网关,令牌桶算法自定…...

通过WiFi无线连接小米手机摄像头到电脑的方法
通过WiFi无线连接小米手机摄像头到电脑的方法 以下是基于Scrcpy和DroidCam两种工具的无线连接方案,需提前完成开发者模式与USB调试的开启(参考原教程步骤): 方法一:Scrcpy无线投屏(无需手机端安装…...

长短期记忆(LSTM)网络模型
一、概述 长短期记忆(Long Short-Term Memory,LSTM)网络是一种特殊的循环神经网络(RNN),专门设计用于解决传统 RNN 在处理长序列数据时面临的梯度消失 / 爆炸问题,能够有效捕捉长距离依赖关系。…...
深入理解 Linux 文件系统与日志文件分析
一、Linux 文件系统概述 1. 文件系统的基本概念 文件系统(File System)是操作系统用于管理和组织存储设备上数据的机制。它提供了一种结构,使得用户和应用程序能够方便地存储和访问数据。 2. Linux 文件系统结构 Linux 文件系统采用树状目…...

CSS3美化页面元素
1. 字体 <span>标签 字体样式⭐ 字体类型(font-family) 字体大小(font-size) 字体风格(font-style) 字体粗细(font-weight) 字体属性(font) 2. 文本 文…...
网络安全-等级保护(等保)3-0 等级保护测评要求现行技术标准
################################################################################ 第三章:测评要求、测评机构要求,最终目的是通过测评,所以我们将等保要求和测评相关要求一一对应形成表格。 GB/T 28448-2019 《信息安全技术 网络安全等…...

WPS 利用 宏 脚本拆分 Excel 多行文本到多行
文章目录 WPS 利用 宏 脚本拆分 Excel 多行文本到多行效果需求背景🛠 操作步骤代码实现代码详解使用场景注意事项总结 WPS 利用 宏 脚本拆分 Excel 多行文本到多行 在 Excel 工作表中,我们经常遇到一列中包含多行文本(用换行符分隔ÿ…...
R语言错误处理方法大全
在R语言的批量运行中,常需要自动跳过错误,继续向下运行。 1、使用 tryCatch() 捕获错误并返回占位符 # 示例:循环中跳过错误继续执行 results <- numeric(5) # 预分配结果向量for(i in 1:5) {# 用 tryCatch 包裹可能出错的代码results[…...

AI“实体化”革命:具身智能如何重构体育、工业与未来生活
近年来,人工智能(AI)技术的飞速发展正在重塑各行各业,而具身智能(Embodied AI)作为AI领域的重要分支,正逐渐从实验室走向现实应用。具身智能的核心在于让AI系统具备物理实体,能够与环…...
Opencv4 c++ 自用笔记 05 形态学操作
图像形态学主要获取物体的形状与位置信息。利用具有一定形态的结构元素度量和提取图像中的对应形状,达到对图像分析和识别的目的。操作主要包括腐蚀、膨胀、开运算和闭运算。 像素距离与连通域 图像形态学中,将不与其他区域链接的独立区域称为集合或者…...
DrissionPage 数据提取技巧全解析:从入门到实战
在当今数据驱动的时代,网页数据提取已成为自动化办公、市场分析和爬虫开发的核心技能。作为新一代网页自动化工具,DrissionPage 以其独特的双模式融合设计(Selenium Requests)脱颖而出。本文将结合官方文档与实战案例,…...
如何构建自适应架构的镜像
目标 我有一个服务叫xxx,一开始它运行在x86架构的机器上,所以最开始有个xxx:stable-amd64的镜像,后来它又需要运行在arm64架构的机器上,所以又重新打了个xxx:stable-arm64的镜像 但是对于安装脚本来说,我不希望我在拉…...

R语言基础| 创建数据集
在R语言中,有多种数据类型,用以存储和处理数据。每种数据类型都有其特定的用途和操作函数,使得R语言在处理各种数据分析任务时非常灵活和强大: 向量(Vector): 向量是R语言中最基本的数据类型,它…...
剑指offer15_数值的整数次方
数值的整数次方 实现函数 double Power(double base, int exponent) 题目要求 计算 base exponent \text{base}^{\text{exponent}} baseexponent: 不得使用库函数不需要考虑大数问题,绝对误差不超过 10 − 2 10^{-2} 10−2不会出现底数和指数同为 0…...

Centos7搭建zabbix6.0
此方法适用于zabbix6以上版本zabbix6.0前期环境准备:Lamp(linux httpd mysql8.0 php)mysql官网下载位置:https://dev.mysql.com/downloads/mysql/Zabbix源码包地址:https://www.zabbix.com/cn/download_sourcesZabbix6…...
使用Redis的四个常见问题及其解决方案
Redis 缓存穿透 定义:redis查询一个不存在的数据,导致每次都查询数据库 解决方案: 如果查询的数据为空,在redis对应的key缓存空数据,并设置短TTL。 因为缓存穿透通常是因为被恶意用不存在的查询参数进行压测攻击&…...

Docker 部署前后端分离项目
1.Docker 1.1 什么是 Docker ? Docker 是一种开源的 容器化平台,用于开发、部署和运行应用程序。它通过 容器(Container) 技术,将应用程序及其依赖项打包在一个轻量级、可移植的环境中,确保应用在不同计算…...

云游戏混合架构
云游戏混合架构通过整合本地计算资源与云端能力,形成了灵活且高性能的技术体系,其核心架构及技术特征可概括如下: 一、混合架构的典型模式 分层混合模式 前端应用部署于公有云(如渲染流化服务),后端逻辑…...

【小红书】API接口,获取笔记核心数据
小红书笔记核心数据API接口详解 - 深圳小于科技提供专业数据服务 深圳小于科技(官网:https://www.szlessthan.com)推出的小红书笔记核心数据API接口,为开发者提供精准的笔记互动数据分析能力,助力内容运营与商业决策。…...

会议室钥匙总丢失?换预约功能的智能门锁更安全
在企业日常运营中,会议室作为重要的沟通与协作场所,其管理效率与安全性直接影响着企业的运作顺畅度。然而,传统会议室管理方式中钥匙丢失、管理不便等问题频发,给企业带来了不少困扰。近期,某企业引入了启辰智慧预约系…...

Redis底层数据结构之跳表(SkipList)
SkipList是Redis有序结合ZSet底层的数据结构,也是ZSet的灵魂所在。与之相应的,Redis还有一个无序集合Set,这两个在底层的实现是不一样的。 标准的SkipList: 跳表的本质是一个链表。链表这种结构虽然简单清晰,但是在查…...
跨架构镜像打包问题及解决方案
问题背景: 需求: 有一个镜像是 docker.io 的,是 docker.io/aquasec/kube-bench:v0.10.6,我想把该镜像在本地电脑(可翻墙)下载下来,然后 docker save 打包成一个 tar 包,传输到服务器…...
云原生时代 Kafka 深度实践:05性能调优与场景实战
5.1 性能调优全攻略 Producer调优 批量发送与延迟发送 通过调整batch.size和linger.ms参数提升吞吐量: props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 默认16KB props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待10ms以积累更多消息ba…...

Ubuntu安装Docker命令清单(以20.04为例)
在你虚拟机上完成Ubuntu的下载后打开终端!!! Ubuntu安装Docker终极命令清单(以20.04为例) # 1. 卸载旧版本(全新系统可跳过) sudo apt-get remove docker docker-engine docker.io containerd …...
使用 Python 制作 GIF 动图,并打包为 EXE 可执行程序
文章目录 成品百度网盘下载🎬 使用 Python 制作 GIF 动图,并打包为 EXE 可执行程序(含图形界面)🧰 环境准备💻 功能预览🧑💻 完整代码(图形界面 功能)如何…...

HarmonyOS Next 弹窗系列教程(2)
HarmonyOS Next 弹窗系列教程(2) 上一章节我们讲了自定义弹出框 (openCustomDialog),那对于一些简单的业务场景,不一定需要都是自定义,也可以使用 HarmonyOS Next 内置的一些弹窗效果。比如: 名称描述不依…...
Ubuntu 18.04 上源码安装 protobuf 3.7.0
🔧 1️⃣ 安装依赖 sudo apt update sudo apt install -y autoconf automake libtool curl make g unzip📥 2️⃣ 下载源码 cd ~ git clone https://github.com/protocolbuffers/protobuf.git cd protobuf git checkout v3.7.0⚙️ 3️⃣ 编译 & 安…...