当前位置: 首页 > article >正文

Celery 核心概念详解及示例

Celery 核心概念详解及示例

Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,提供对任务队列的操作,并支持任务的调度和异步执行。它常用于深度优化 Web 应用的性能和响应速度,通过将耗时的操作移到后台异步执行,使主程序不被阻塞。

本文将统一介绍 Celery 中的核心概念,并通过示例进行说明,帮助您深入理解并应用 Celery 来构建高性能的异步任务处理系统。


目录

  • Celery 核心概念详解及示例
    • 任务(Task)
      • 示例
    • 工作者(Worker)
      • 启动 Worker
    • 消息代理(Broker)
      • 配置示例
    • 客户端(Client)
      • 任务调用示例
    • 结果后端(Result Backend)
      • 配置示例
      • 获取任务结果
    • 任务队列(Queue)
      • 定义队列中的任务
      • 启动 Worker 监听指定队列
    • 任务路由(Task Routing)
      • 配置路由
    • 并发(Concurrency)
      • 启动 Worker 指定并发方式
    • 定时任务(Periodic Tasks)
      • 配置定时任务
      • 启动 Celery Beat
    • 任务重试与超时(Task Retry and Timeout)
      • 设置任务重试
      • 设置任务超时
    • 任务组与工作流(Task Groups and Workflows)
      • 组任务(Group)
      • 链任务(Chain)
    • 示例项目结构
      • tasks.py
      • app.py
      • worker.sh
    • 总结
    • 参考资料


任务(Task)

任务(Task) 是 Celery 的基本执行单元,表示要执行的具体函数或操作。在 Celery 中,任务通常由装饰器 @task@shared_task 声明。

示例

# tasks.py
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
  • 说明
    • 使用 @app.task 装饰器将函数 add 声明为一个 Celery 任务。
    • 任务可以被工作者执行,并可以异步调用。

工作者(Worker)

工作者(Worker) 是实际执行任务的进程或节点。工作者从消息代理中获取任务,执行任务函数,并返回结果。

启动 Worker

celery -A tasks worker --loglevel=INFO
  • 说明
    • -A tasks:指定 Celery 应用实例的位置。
    • worker:启动一个工作者进程。
    • --loglevel=INFO:设置日志级别。

消息代理(Broker)

消息代理(Broker) 是任务队列的中介,负责接收来自客户端的任务,并将任务分发给工作者。常用的消息代理包括 RedisRabbitMQAmazon SQS 等。

配置示例

# 使用 Redis 作为消息代理
app = Celery('tasks', broker='redis://localhost:6379/0')
  • 说明
    • broker 参数指定消息代理的连接 URL。

客户端(Client)

客户端(Client) 是发送任务到消息代理的应用程序。它可以是 Web 应用、脚本等,使用 Celery 的 API 将任务异步地发送到 Broker。

任务调用示例

# 调用加法任务
from tasks import addresult = add.delay(4, 6)
  • 说明
    • 使用 delay() 方法将任务发送到消息代理,立即返回一个 AsyncResult 对象。
    • 任务将在后台执行,客户端无需等待任务完成。

结果后端(Result Backend)

结果后端(Result Backend) 用于存储任务执行的结果,允许客户端在任务执行完成后获取结果。常用的结果后端包括 RedisRabbitMQ数据库 等。

配置示例

# 使用 Redis 作为结果后端
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

获取任务结果

# 获取任务结果
result = add.delay(4, 6)
print(result.get(timeout=10))  # 输出 10
  • 说明
    • result.get(timeout=10):等待任务完成并获取结果,超时设置为 10 秒。

任务队列(Queue)

任务队列(Queue) 是存储任务的队列,允许将任务分类、路由和分配。队列的使用可以实现任务的隔离和资源的优化。

定义队列中的任务

@app.task(queue='priority_high')
def high_priority_task():pass

启动 Worker 监听指定队列

celery -A tasks worker -Q priority_high --loglevel=INFO
  • 说明
    • -Q priority_high:指定 Worker 只监听名为 priority_high 的队列。

任务路由(Task Routing)

任务路由(Task Routing) 用于将任务分配到特定的队列,可以在任务定义、配置文件或运行时指定。

配置路由

# 配置路由规则
app.conf.task_routes = {'tasks.add': {'queue': 'math_queue'},'tasks.mul': {'queue': 'math_queue'},'tasks.email': {'queue': 'email_queue'},
}
  • 说明
    • addmul 任务路由到 math_queue 队列,email 任务路由到 email_queue 队列。

并发(Concurrency)

Celery 支持多种并发方式,包括多进程、多线程和协程。常用的并发池有 prefork(默认,多进程)、eventlet(协程)、gevent(协程)。

启动 Worker 指定并发方式

# 使用多进程并发,设置并发数为 4
celery -A tasks worker --concurrency=4# 使用 gevent 并发
celery -A tasks worker -P gevent --concurrency=1000
  • 说明
    • --concurrency=4:设置并发的工作进程数。
    • -P gevent:指定并发池为 gevent,适用于 I/O 密集型任务。

定时任务(Periodic Tasks)

定时任务(Periodic Tasks) 是指按照预设的时间间隔或特定时间点自动执行的任务。Celery 提供了 Celery Beat 调度器来管理定时任务。

配置定时任务

from celery.schedules import crontabapp.conf.beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},'multiply-at-midnight': {'task': 'tasks.mul','schedule': crontab(hour=0, minute=0),'args': (4, 5),},
}

启动 Celery Beat

celery -A tasks beat --loglevel=INFO
  • 说明
    • Celery Beat 会按照配置的计划周期性地发送任务到 Broker。

任务重试与超时(Task Retry and Timeout)

任务在执行过程中可能失败,Celery 提供了任务重试和超时机制,增强任务的可靠性。

设置任务重试

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def unstable_task(self, *args):try:# 可能出现异常的代码passexcept Exception as exc:# 任务失败后重试raise self.retry(exc=exc)
  • 说明
    • max_retries=3:最大重试次数为 3 次。
    • default_retry_delay=60:每次重试之间的间隔为 60 秒。
    • self.retry(exc=exc):任务失败时抛出 retry 异常,实现重试。

设置任务超时

@app.task(time_limit=300, soft_time_limit=200)
def long_running_task():pass
  • 说明
    • time_limit=300:任务的硬超时时间为 300 秒,超时后工作进程会被杀死。
    • soft_time_limit=200:任务的软超时时间为 200 秒,超时后会抛出 SoftTimeLimitExceeded 异常,可以在任务中捕获并处理。

任务组与工作流(Task Groups and Workflows)

Celery 支持任务的组合执行,包括组任务(Group Tasks)链任务(Chain Tasks)回调(Callbacks),用于构建复杂的工作流。

组任务(Group)

同时执行一组独立的任务,所有任务完成后返回结果列表。

from celery import group# 定义任务组
job = group(add.s(i, i) for i in range(10))
result = job.apply_async()# 获取所有任务的结果
print(result.get())
  • 说明
    • add.s(i, i):生成任务的签名。
    • group():创建任务组。
    • apply_async():异步执行任务组。

链任务(Chain)

顺序执行一系列任务,每个任务的结果作为下一个任务的输入。

from celery import chain# 定义任务链
job = chain(add.s(2, 2) | mul.s(4))result = job.apply_async()# 获取最终结果
print(result.get())  # 输出 (2 + 2) * 4 = 16
  • 说明
    • | 运算符连接任务,表示顺序执行。

示例项目结构

一个简单的 Celery 项目通常包含以下结构:

project/
│
├── tasks.py          # 定义任务
├── app.py            # 定义 Celery 应用
└── worker.sh         # 启动 Worker 的脚本

tasks.py

from app import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y

app.py

from celery import Celeryapp = Celery('project',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0')# 可在此处配置任务路由、定时任务等

worker.sh

#!/bin/bash
celery -A app worker --loglevel=INFO

总结

Celery 通过引入 任务工作者消息代理结果后端任务队列 等核心概念,构建了一个强大的异步任务处理框架。通过合理地配置和使用这些概念,您可以:

  • 异步执行耗时任务,提高应用程序的响应速度。
  • 实现任务的调度和重试,增强系统的可靠性。
  • 优化资源利用率,通过并发和队列管理提升性能。
  • 构建复杂的任务工作流,满足多样化的业务需求。

希望通过本文对 Celery 核心概念的统一介绍和示例说明,您可以更好地理解和应用 Celery,在实际项目中构建高性能的异步任务处理系统。


参考资料

  • Celery 官方文档
  • Celery 入门教程
  • Celery 最佳实践
  • 使用 Celery 构建异步任务队列

相关文章:

Celery 核心概念详解及示例

Celery 核心概念详解及示例 Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,提供对任务队列的操作,并支持任务的调度和异步执行。它常用于深度优化 Web 应用的性能和响应速度,通过将耗时的操作移到后台异步执行&am…...

欢乐熊大话蓝牙知识14:用 STM32 或 EFR32 实现 BLE 通信模块:从0到蓝牙,你也能搞!

🚀 用 STM32 或 EFR32 实现 BLE 通信模块:从0到蓝牙,你也能搞! “我能不能自己用 STM32 或 EFR32 实现一个 BLE 模块?” 答案当然是:能!还能很帅! 👨‍🏭 前…...

IDEA 在公司内网配置gitlab

赋值项目链接 HTTPS 将HTTP的链接 ip地址换成 内网地址 例如:https:172.16.100.18/...... 如果出现需要需要Token验证的情况: 参考:Idea2024中拉取代码时GitLab提示输入token的问题_gitlab token-CSDN博客...

黑马Java面试笔记之 微服务篇(业务)

一. 限流 你们项目中有没有做过限流?怎么做的? 为什么要限流呢? 一是并发的确大(突发流量) 二是防止用户恶意刷接口 限流的实现方式: Tomcat:可以设置最大连接数 可以通过maxThreads设置最大Tomcat连接数,实现限流,但是适用于单体架构 Nginx:漏桶算法网关,令牌桶算法自定…...

通过WiFi无线连接小米手机摄像头到电脑的方法

通过WiFi无线连接小米手机摄像头到电脑的方法 以下是基于Scrcpy和DroidCam两种工具的无线连接方案,需提前完成开发者模式与USB调试的开启(参考原教程步骤): 方法一:Scrcpy无线投屏(无需手机端安装&#xf…...

长短期记忆(LSTM)网络模型

一、概述 长短期记忆(Long Short-Term Memory,LSTM)网络是一种特殊的循环神经网络(RNN),专门设计用于解决传统 RNN 在处理长序列数据时面临的梯度消失 / 爆炸问题,能够有效捕捉长距离依赖关系。…...

深入理解 Linux 文件系统与日志文件分析

一、Linux 文件系统概述 1. 文件系统的基本概念 文件系统(File System)是操作系统用于管理和组织存储设备上数据的机制。它提供了一种结构,使得用户和应用程序能够方便地存储和访问数据。 2. Linux 文件系统结构 Linux 文件系统采用树状目…...

CSS3美化页面元素

1. 字体 <span>标签 字体样式⭐ 字体类型&#xff08;font-family&#xff09; 字体大小&#xff08;font-size&#xff09; 字体风格&#xff08;font-style&#xff09; 字体粗细&#xff08;font-weight&#xff09; 字体属性&#xff08;font&#xff09; 2. 文本 文…...

网络安全-等级保护(等保)3-0 等级保护测评要求现行技术标准

################################################################################ 第三章&#xff1a;测评要求、测评机构要求&#xff0c;最终目的是通过测评&#xff0c;所以我们将等保要求和测评相关要求一一对应形成表格。 GB/T 28448-2019 《信息安全技术 网络安全等…...

WPS 利用 宏 脚本拆分 Excel 多行文本到多行

文章目录 WPS 利用 宏 脚本拆分 Excel 多行文本到多行效果需求背景&#x1f6e0; 操作步骤代码实现代码详解使用场景注意事项总结 WPS 利用 宏 脚本拆分 Excel 多行文本到多行 在 Excel 工作表中&#xff0c;我们经常遇到一列中包含多行文本&#xff08;用换行符分隔&#xff…...

R语言错误处理方法大全

在R语言的批量运行中&#xff0c;常需要自动跳过错误&#xff0c;继续向下运行。 1、使用 tryCatch() 捕获错误并返回占位符 # 示例&#xff1a;循环中跳过错误继续执行 results <- numeric(5) # 预分配结果向量for(i in 1:5) {# 用 tryCatch 包裹可能出错的代码results[…...

AI“实体化”革命:具身智能如何重构体育、工业与未来生活

近年来&#xff0c;人工智能&#xff08;AI&#xff09;技术的飞速发展正在重塑各行各业&#xff0c;而具身智能&#xff08;Embodied AI&#xff09;作为AI领域的重要分支&#xff0c;正逐渐从实验室走向现实应用。具身智能的核心在于让AI系统具备物理实体&#xff0c;能够与环…...

Opencv4 c++ 自用笔记 05 形态学操作

图像形态学主要获取物体的形状与位置信息。利用具有一定形态的结构元素度量和提取图像中的对应形状&#xff0c;达到对图像分析和识别的目的。操作主要包括腐蚀、膨胀、开运算和闭运算。 像素距离与连通域 图像形态学中&#xff0c;将不与其他区域链接的独立区域称为集合或者…...

DrissionPage 数据提取技巧全解析:从入门到实战

在当今数据驱动的时代&#xff0c;网页数据提取已成为自动化办公、市场分析和爬虫开发的核心技能。作为新一代网页自动化工具&#xff0c;DrissionPage 以其独特的双模式融合设计&#xff08;Selenium Requests&#xff09;脱颖而出。本文将结合官方文档与实战案例&#xff0c…...

如何构建自适应架构的镜像

目标 我有一个服务叫xxx&#xff0c;一开始它运行在x86架构的机器上&#xff0c;所以最开始有个xxx:stable-amd64的镜像&#xff0c;后来它又需要运行在arm64架构的机器上&#xff0c;所以又重新打了个xxx:stable-arm64的镜像 但是对于安装脚本来说&#xff0c;我不希望我在拉…...

R语言基础| 创建数据集

在R语言中&#xff0c;有多种数据类型&#xff0c;用以存储和处理数据。每种数据类型都有其特定的用途和操作函数&#xff0c;使得R语言在处理各种数据分析任务时非常灵活和强大&#xff1a; 向量&#xff08;Vector&#xff09;: 向量是R语言中最基本的数据类型&#xff0c;它…...

剑指offer15_数值的整数次方

数值的整数次方 实现函数 double Power(double base, int exponent) 题目要求 计算 base exponent \text{base}^{\text{exponent}} baseexponent&#xff1a; 不得使用库函数不需要考虑大数问题&#xff0c;绝对误差不超过 10 − 2 10^{-2} 10−2不会出现底数和指数同为 0…...

Centos7搭建zabbix6.0

此方法适用于zabbix6以上版本zabbix6.0前期环境准备&#xff1a;Lamp&#xff08;linux httpd mysql8.0 php&#xff09;mysql官网下载位置&#xff1a;https://dev.mysql.com/downloads/mysql/Zabbix源码包地址&#xff1a;https://www.zabbix.com/cn/download_sourcesZabbix6…...

使用Redis的四个常见问题及其解决方案

Redis 缓存穿透 定义&#xff1a;redis查询一个不存在的数据&#xff0c;导致每次都查询数据库 解决方案&#xff1a; 如果查询的数据为空&#xff0c;在redis对应的key缓存空数据&#xff0c;并设置短TTL。 因为缓存穿透通常是因为被恶意用不存在的查询参数进行压测攻击&…...

Docker 部署前后端分离项目

1.Docker 1.1 什么是 Docker &#xff1f; Docker 是一种开源的 容器化平台&#xff0c;用于开发、部署和运行应用程序。它通过 容器&#xff08;Container&#xff09; 技术&#xff0c;将应用程序及其依赖项打包在一个轻量级、可移植的环境中&#xff0c;确保应用在不同计算…...

云游戏混合架构

云游戏混合架构通过整合本地计算资源与云端能力&#xff0c;形成了灵活且高性能的技术体系&#xff0c;其核心架构及技术特征可概括如下&#xff1a; 一、混合架构的典型模式 分层混合模式‌ 前端应用部署于公有云&#xff08;如渲染流化服务&#xff09;&#xff0c;后端逻辑…...

【小红书】API接口,获取笔记核心数据

小红书笔记核心数据API接口详解 - 深圳小于科技提供专业数据服务 深圳小于科技&#xff08;官网&#xff1a;https://www.szlessthan.com&#xff09;推出的小红书笔记核心数据API接口&#xff0c;为开发者提供精准的笔记互动数据分析能力&#xff0c;助力内容运营与商业决策。…...

会议室钥匙总丢失?换预约功能的智能门锁更安全

在企业日常运营中&#xff0c;会议室作为重要的沟通与协作场所&#xff0c;其管理效率与安全性直接影响着企业的运作顺畅度。然而&#xff0c;传统会议室管理方式中钥匙丢失、管理不便等问题频发&#xff0c;给企业带来了不少困扰。近期&#xff0c;某企业引入了启辰智慧预约系…...

Redis底层数据结构之跳表(SkipList)

SkipList是Redis有序结合ZSet底层的数据结构&#xff0c;也是ZSet的灵魂所在。与之相应的&#xff0c;Redis还有一个无序集合Set&#xff0c;这两个在底层的实现是不一样的。 标准的SkipList&#xff1a; 跳表的本质是一个链表。链表这种结构虽然简单清晰&#xff0c;但是在查…...

跨架构镜像打包问题及解决方案

问题背景&#xff1a; 需求&#xff1a; 有一个镜像是 docker.io 的&#xff0c;是 docker.io/aquasec/kube-bench:v0.10.6&#xff0c;我想把该镜像在本地电脑&#xff08;可翻墙&#xff09;下载下来&#xff0c;然后 docker save 打包成一个 tar 包&#xff0c;传输到服务器…...

云原生时代 Kafka 深度实践:05性能调优与场景实战

5.1 性能调优全攻略 Producer调优 批量发送与延迟发送 通过调整batch.size和linger.ms参数提升吞吐量&#xff1a; props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 默认16KB props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待10ms以积累更多消息ba…...

Ubuntu安装Docker命令清单(以20.04为例)

在你虚拟机上完成Ubuntu的下载后打开终端&#xff01;&#xff01;&#xff01; Ubuntu安装Docker终极命令清单&#xff08;以20.04为例&#xff09; # 1. 卸载旧版本&#xff08;全新系统可跳过&#xff09; sudo apt-get remove docker docker-engine docker.io containerd …...

使用 Python 制作 GIF 动图,并打包为 EXE 可执行程序

文章目录 成品百度网盘下载&#x1f3ac; 使用 Python 制作 GIF 动图&#xff0c;并打包为 EXE 可执行程序&#xff08;含图形界面&#xff09;&#x1f9f0; 环境准备&#x1f4bb; 功能预览&#x1f9d1;‍&#x1f4bb; 完整代码&#xff08;图形界面 功能&#xff09;如何…...

HarmonyOS Next 弹窗系列教程(2)

HarmonyOS Next 弹窗系列教程&#xff08;2&#xff09; 上一章节我们讲了自定义弹出框 (openCustomDialog)&#xff0c;那对于一些简单的业务场景&#xff0c;不一定需要都是自定义&#xff0c;也可以使用 HarmonyOS Next 内置的一些弹窗效果。比如&#xff1a; 名称描述不依…...

Ubuntu 18.04 上源码安装 protobuf 3.7.0

&#x1f527; 1️⃣ 安装依赖 sudo apt update sudo apt install -y autoconf automake libtool curl make g unzip&#x1f4e5; 2️⃣ 下载源码 cd ~ git clone https://github.com/protocolbuffers/protobuf.git cd protobuf git checkout v3.7.0⚙️ 3️⃣ 编译 & 安…...