Celery,一个实时处理的 Python 分布式系统
大家好!我是爱摸鱼的小鸿,关注我,收看每期的编程干货。
一个简单的库,也许能够开启我们的智慧之门,
一个普通的方法,也许能在危急时刻挽救我们于水深火热,
一个新颖的思维方式,也许能激发我们无尽的创造力,
一个独特的技巧,也许能成为我们的隐形盾牌……
神奇的 Python 库之旅,第 9 章
目录
- 一、什么是 Celery?
 - 二、为什么选择 Celery?
 - 三、Celery 编程示例
 - 四、总结
 - 五、作者Info
 
一、什么是 Celery?
Celery 是一个强大的工具,它能够帮助我们管理和调度复杂的任务。无论是处理异步任务、计划任务,还是分布式任务,Celery 都能轻松胜任。在这篇文章中,我们将深入探讨 Celery 的魅力,通过多个代码示例,带你全面了解这个神器。
 Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,并提供了维护这些任务的工具。它的使用场景非常广泛,比如电子邮件发送、视频处理、数据分析、Web 爬虫等。
 Github 项目地址:
https://github.com/celery/celery

 …
二、为什么选择 Celery?
选择 Celery 的理由有很多,这里列出几个主要的优势:
- 异步任务处理:能让你在不阻塞主进程的情况下处理任务;
 - 定时任务:支持类似 cron 的定时任务调度;
 - 分布式执行:支持将任务分发到多个机器上执行,提升系统性能;
 - 高可用性:可以与消息队列(如 RabbitMQ, Redis)结合,实现高可用性和可靠性。
 

 
在开始之前,我们需要安装 Celery 和一个消息代理(这里我们选择 Redis):
pip install celery redis
 
…
三、Celery 编程示例
快速开始:一个简单的任务
 让我们从一个简单的例子开始,创建一个任务来演示 Celery 的基本用法。
# tasks.py
from celery import Celery# 创建 Celery 实例
app = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
 
在这个例子中,我们定义了一个名为 add 的任务,它接收两个参数并返回它们的和。接下来,我们可以启动一个 Celery worker 来处理这个任务:
celery -A tasks worker --loglevel=info
 
启动 Celery worker 后,我们可以在 Python 交互式环境中调用这个任务:
>>> from tasks import add
>>> result = add.delay(4, 6)
>>> result.get(timeout=10)
10
 
深入挖掘:任务链和组
 Celery 的强大之处在于它能够处理复杂的工作流,比如任务链和任务组。
 任务链
 任务链允许我们将多个任务串联起来,前一个任务的输出作为下一个任务的输入:
from celery import chain# 定义任务
@app.task
def multiply(x, y):return x * y@app.task
def add_and_multiply(x, y, z):return chain(add.s(x, y), multiply.s(z))()# 调用任务链
result = add_and_multiply(2, 3, 4)
print(result.get())  # 输出 20,因为 (2 + 3) * 4 = 20
 
任务组
 任务组允许我们并行执行一组任务,并在所有任务完成后获得结果:
from celery import group# 定义任务组
@app.task
def sum_list(numbers):return sum(numbers)@app.task
def process_groups():return group(sum_list.s([1, 2, 3]), sum_list.s([4, 5, 6]), sum_list.s([7, 8, 9]))().get()# 调用任务组
result = process_groups()
print(result)  # 输出 [6, 15, 24]
 
…
 定时任务
 Celery 还支持定时任务,这类似于 Unix 系统的 cron 作业:
from celery import Celery
from celery.schedules import crontabapp = Celery('periodic_tasks', broker='redis://localhost:6379/0')@app.task
def scheduled_task():print("This task runs every 10 seconds")app.conf.beat_schedule = {'run-every-10-seconds': {'task': 'periodic_tasks.scheduled_task','schedule': 10.0,},
}# 启动 Celery beat 进程
celery -A periodic_tasks beat --loglevel=info
 
…
 错误处理与重试机制
 在实际应用中,任务失败是不可避免的。Celery 提供了优雅的错误处理和重试机制:
@app.task(bind=True, max_retries=3)
def unreliable_task(self):try:# 可能失败的操作risky_operation()except Exception as exc:# 捕获异常并重试raise self.retry(exc=exc, countdown=60)
 
在这个例子中,unreliable_task 如果失败,会在 60 秒后重试,总共重试 3 次。
 使用 Celery 在 Web 应用
 Celery 常用于 Web 应用中来处理后台任务。下面是一个简单的 Flask 应用,展示了如何集成 Celery:
from flask import Flask, request, jsonify
from celery import Celeryapp = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'def make_celery(app):celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])celery.conf.update(app.config)class ContextTask(celery.Task):def __call__(self, *args, **kwargs):with app.app_context():return self.run(*args, **kwargs)celery.Task = ContextTaskreturn celerycelery = make_celery(app)@celery.task
def long_task():import timetime.sleep(10)return 'Task completed!'@app.route('/start-task', methods=['POST'])
def start_task():task = long_task.apply_async()return jsonify({}), 202, {'Location': url_for('get_status', task_id=task.id)}@app.route('/status/<task_id>')
def get_status(task_id):task = long_task.AsyncResult(task_id)response = {'state': task.state,'result': task.result,}return jsonify(response)if __name__ == '__main__':app.run(debug=True)
 
在这个示例中,我们定义了一个 Flask 应用,并将 Celery 集成到其中。long_task 是一个模拟长时间运行的任务,客户端可以通过 start-task 路由启动任务,并通过 status/<task_id> 路由查询任务状态。
 更多功能、详细用法可参考官方文档:
https://docs.celeryq.dev/en/stable
…
四、总结
Celery 是一个强大的工具,它为我们处理异步任务、计划任务和分布式任务提供了极大的便利。在这篇文章中,我们通过多个代码示例,展示了 Celery 的基本用法、任务链和组、定时任务、错误处理和重试机制,以及如何在 Web 应用中集成 Celery。
 如果你在日常工作中需要处理复杂的任务调度和分布式任务,那么 Celery 绝对是一个值得深入学习和使用的工具。希望这篇文章能够帮助你更好地理解和使用 Celery,让你的工作更加高效、顺畅。
 试一试吧,Celery 会让你的任务调度变得简单又高效!
 
 …
五、作者Info
Author:小鸿的摸鱼日常
Goal:让编程更有趣! 专注于 Web 开发、爬虫,游戏开发,数据分析、自然语言处理,AI 等,期待你的关注,让我们一起成长、一起Coding!
版权说明:本文禁止抄袭、转载,侵权必究!
相关文章:
Celery,一个实时处理的 Python 分布式系统
大家好!我是爱摸鱼的小鸿,关注我,收看每期的编程干货。 一个简单的库,也许能够开启我们的智慧之门, 一个普通的方法,也许能在危急时刻挽救我们于水深火热, 一个新颖的思维方式,也许能…...
源码编译安装 LAMP
源码编译安装 LAMP Apache 网站服务基础Apache 简介安装 httpd 服务器 httpd 服务器的基本配置Web 站点的部署过程httpd.conf 配置文件 构建虚拟 Web 主机基于域名的虚拟主机基于IP 地址、基于端口的虚拟主机 MySQL 的编译安装构建 PHP 运行环境安装PHP软件包设置 LAMP 组件环境…...
PostgreSQL的pg_filedump工具
PostgreSQL的pg_filedump工具 基础信息 OS版本:Red Hat Enterprise Linux Server release 7.9 (Maipo) DB版本:16.2 pg软件目录:/home/pg16/soft pg数据目录:/home/pg16/data 端口:5777pg_filedump 是一个工具&#x…...
Java语言+后端+前端Vue,ElementUI 数字化产科管理平台 产科电子病历系统源码
Java语言后端前端Vue,ElementUI 数字化产科管理平台 产科电子病历系统源码 Java开发的数字化产科管理系统,已在多家医院实施,支持直接部署。系统涵盖孕产全程,包括门诊、住院、统计和移动服务,整合高危管理、智能提醒、档案追踪等…...
Linux 服务器环境搭建
一、安装 JDK 官网下载地址:https://www.oracle.com/java/technologies/downloads # 创建目录 mkdir /usr/local/java/# 解压 tar -zxvf jdk-8u333-linux-x64.tar.gz -C /usr/local/java/# 配置环境变量 vim /etc/profileexport export JAVA_HOME/usr/local/java/…...
RabbitMQ 更改服务端口号
需求 windows环境下,将RabbitMQ默认的端口号 5672 改为 11001 实现 本机RabbitMQ版本为3.8.16,找到配置文件位置,路径为:C:\Users\%USERNAME%\AppData\Roaming\RabbitMQ\advanced.config 配置文件默认内容为空 填写修改端口号…...
16:9横屏短视频素材库有哪些?横屏短视频素材网站分享
在这个视觉内容至关重要的时代,16:9横屏视频因其宽广的画面和优越的观赏体验,已经成为无数创作者和营销专家的首选格式。但要创造出吸引人的横屏视频,高质量的视频素材库是不可或缺的。不管你是资深视频制作人还是刚入行的新手,下…...
在Java中,创建一个实现了Callable接口的类可以提供强大的灵活性,特别是当你需要在多线程环境中执行任务并获取返回结果时。
在Java中,创建一个实现了Callable接口的类可以提供强大的灵活性,特别是当你需要在多线程环境中执行任务并获取返回结果时。以下是一个简单的案例,演示了如何创建一个实现了Callable接口的类,并在线程池中执行它。 首先࿰…...
Vuforia AR篇(八)— AR塔防上篇
目录 前言一、设置Vuforia AR环境1. 添加AR Camera2. 设置目标图像 二、创建塔防游戏基础1. 导入素材2. 搭建场景3. 创建敌人4. 创建脚本 前言 在增强现实(AR)技术快速发展的今天,Vuforia作为一个强大的AR开发平台,为开发者提供了…...
Spring AOP源码篇四之 数据库事务
了解了Spring AOP执行过程,再看Spring事务源码其实非常简单。 首先从简单使用开始, 演示Spring事务使用过程 Xml配置: <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema…...
小波与傅里叶变换的对比(Python)
直接上代码,理论可以去知乎看。 #Import necessary libraries %matplotlib inline import numpy as np import matplotlib.pyplot as plt import seaborn as snsimport pywt from scipy.ndimage import gaussian_filter1d from scipy.signal import chirp import m…...
Linux-sqlplus安装
1.下载安装包 下载入口:安装包 下载对应版本: oracle-instantclient-sqlplus-21.14.0.0.0-1.x86_64.rpm oracle-instantclient-basic-21.14.0.0.0-1.x86_64.rpm oracle-instantclient-devel-21.14.0.0.0-1.x86_64.rpm 2.安装 [rootpromethues-01 tmp…...
LeetCode 算法:课程表 c++
原题链接🔗:课程表 难度:中等⭐️⭐️ 题目 你这个学期必须选修 numCourses 门课程,记为 0 到 numCourses - 1 。 在选修某些课程之前需要一些先修课程。 先修课程按数组 prerequisites 给出,其中 prerequisites[i]…...
前端面试题30(闭包和作用域链的关系)
闭包和作用域链在JavaScript中是紧密相关的两个概念,理解它们之间的关系对于深入掌握JavaScript的执行机制至关重要。 作用域链 作用域链是一个链接列表,它包含了当前执行上下文的所有父级执行上下文的变量对象。每当函数被调用时,JavaScri…...
A股本周在3000点以下继续筑底,本周依然继续探底?
夜已深,市场传来了3个浓烈的消息,炸锅了,恐有大事发生,马上告诉所有人: 消息面: 1、中国经济周刊首席评论员钮文新称:不要等中小投资者都彻底希望,销户离场了,才发现该…...
Javadoc介绍
Javadoc 是用于生成 Java 代码文档的工具。它利用特定的注释格式,将 Java 源代码中的注释提取出来,并生成 HTML 文档。Javadoc 注释通常位于类、接口、构造函数、方法和字段的声明之前,以 /** 开始,以 */ 结束。以下是 Javadoc 注释的一些主要元素和使用方法: 基本语法 …...
C# Application.DoEvents()的作用
文章目录 1、详解 Application.DoEvents()2、示例处理用户事件响应系统事件控制台输出游戏和多媒体应用与操作系统的交互 3、注意事项总结 Application.DoEvents() 是 .NET 框架中的一个方法,它主要用于处理消息队列中的事件。在 Windows 应用程序中,当一…...
IDEA如何创建原生maven子模块
文件 -> 新建 -> 新模块 -> Maven ArcheTypeMaven ArcheType界面中的输入框介绍 名称:子模块的名称位置:子模块存放的路径名创建Git仓库:子模块不单独作为一个git仓库,无需勾选JDK:JDK版本号父项:…...
LCD EMC 辐射 测试随想
最近做几个产品过认证。 有带2.8寸 MCU8080接口的小屏(320 X 240),也有RGB接口的10.1寸的大屏(800*600). 以下为个人随想,不知道是否正确,仅作记录。 测试发现辐射的核心问题还是在于时钟及其倍频所产生的尖峰。 记得读…...
Docker安装遇到问题:curl: (7) Failed to connect to download.docker.com port 443: 拒绝连接
问题描述 首先,完全按照Docker官方文档进行安装: Install Docker Engine on Ubuntu | Docker Docs 在第1步:Set up Dockers apt repository,执行如下指令: sudo curl -fsSL https://download.docker.com/linux/ubu…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
CTF show Web 红包题第六弹
提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了  先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...
MMaDA: Multimodal Large Diffusion Language Models
CODE : https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA,它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
VTK如何让部分单位不可见
最近遇到一个需求,需要让一个vtkDataSet中的部分单元不可见,查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行,是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示,主要是最后一个参数,透明度…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
CMake控制VS2022项目文件分组
我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...
解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...
