Celery的任务流
Celery的任务流
在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此目标,Celery使用一种叫做signatures的东西
celery的简单使用
signature的引入
signature官方文档
可以简单理解为signature是将之前的异步任务以某种方式包装,包装后的异步任务仍可以使用之前的delay()和apply_async()方法,并且包装后的异步任务就可以以多种方式组合成复杂的工作流程
先创建一个tasks.py
import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a + breturn c# 减
@app.task
def subtract_num(a, b):print(f'{a}-{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a - breturn c# 乘
@app.task
def multiply_num(a, b):print(f'{a}*{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a * breturn c# 除
@app.task
def divide_num(a, b):print(f'{a}/{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a / breturn c@app.task
def test(args):print(args)return args
运行celery消费者
# 格式为:celery -A app对象所在的文件 worker -l 日志级别 -Q 队列名称(也可以不指定,默认为celry)
celery -A tasks worker -l info -Q test
用signature对上面的add_num包装
from celery import signature
from tasks import add_num, subtract_num, multiply_num, divide_num# 方法1
sign = add_num.signature((1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法2
sign = signature('tasks.add_num', (1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法3
sign = signature(add_num, (1, 1), queue='test')
ret = sign.delay()
print(ret.get())
chain的使用
chain官方链接
chain可以将signature包装的任务函数一个一个执行,一个执行完将执行return结果传递给下一个任务函数
from celery import signature, chain
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (2,), queue='test')
multiply_sign = signature(multiply_num, (2,), queue='test')
divide_sign = signature(divide_num, (2,), queue='test')# 对某个数依次做加减乘除处理
chain1 = chain(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = chain1.delay()
print(ret.get())
可以看到异步任务依次执行,并将上一个异步任务的结果作为参数传递给下一个,形成一个链条
group的使用
group官方链接
group可以将signature包装的任务函数并行执行,返回一组返回值
from celery import signature, chain, group
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')# 对某个数分别做加减乘除处理group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = group1.delay()
print(ret.get())
#[8, 4, 12, 3.0]
可以看到相比于chain,group里的任务是同时执行
chord的使用
chord官方链接
依赖一个group任务,group任务结束后,将所有子任务的返回值作为参数传递给chord的回调函数,即chord由group任务组与回调函数组成
上代码
from celery import signature, chain, group, chord
from tasks import add_num, subtract_num, multiply_num, divide_num, testadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)#包装test异步任务函数
test_sign = signature(test, queue='test')c1 = chord(group1, test_sign)
c1.delay()
可以看出,在执行完加减乘除所有异步任务后,chord会将任务组的结果作为list交给test函数,这里的test有点像回调函数
PS:根据我的观察,chain,group,chord在执行完后都会返回一个任务id,其中chain的任务id为任务链里最后一个任务的id,group的任务id是一个临时的任务id(group任务都结束后就会消失),chord的任务id是回调函数的任务id。因此chain和chord在任务结束后,任务结果还是可以查到的,而group则查询不到,因此group的任务结果可能无法用AsyncResult查询到
最后附上celery关于任务工作流的官方链接
celery工作流
PS
有的时候我们可能需要在celery的task函数中调用其他的celery函数,并且需要同步的获取结果(其实着本质上就是把异步的celery函数变成同步运行),具体如下,先创建一个tasks.py
import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():#在test函数里调用add_num函数,并且同步获取结果,将结果作为test函数的返回值ret = add_num.delay(1,2)ret = ret.get()return ret
#启动消费者
celery -A tasks worker -l info
调用test异步函数
from tasks import test
ret = test.delay()
结果就是出错了,因为官方不建议在一个异步任务中区等待另一个异步任务的返回结果,所以这个时候就可以通过上面的chain方法实现这个需求。当然还有一种不建议的方法就是在同步获取celery任务结果的get方法中添加参数disable_sync_subtasks=False,具体如下
import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():ret = add_num.delay(1, 2)ret = ret.get(disable_sync_subtasks=False)#在这添加disable_sync_subtasks=Falsereturn ret
再调用一次test方法
成功调用
详见celery官方链接
链接传送门
结语
写这些,仅记录自己学习使用celery的过程。如果有什么错误的地方,还请大家批评指正。最后,希望小伙伴们都能有所收获。
相关文章:

Celery的任务流
Celery的任务流 在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此…...

使用Arcpy进行数据批处理-批量裁剪
时空大数据使我们面临前所未有的机遇和挑战,尤其在地学、遥感或空间技术等专业领域,无疑是一个全新的时代。 伴随着时空大数据的到来,海量数据的处理是一个所有科研工作者都无法忽视的重要问题。传统的数据(主要指空间数据&#x…...

【攻防世界】ics-05
php://filter 伪协议查看源码 preg_replace 函数漏洞 1.获取网页源代码。多点点界面,发现点云平台设备维护中心时,页面发生变化。 /?pageindex 输入什么显示什么,有回显。 用php://filter读取网页源代码 ?pagephp://filter/readconvert.…...
VTK的交互器
VTK中鼠标消息是在交互类型对象(interactorstyle)中响应,因此通过为交互类型对象(interactorstyle)添加观察者(observer)来监听相应的消息,当消息触发时,由命令模式执行相…...

ChatGPT(3.5版本)开放无需注册:算力背后的数据之战悄然打响
✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢,在这里我会分享我的知识和经验。&am…...
python项目练习——14.学生管理系统
这个项目可以让用户管理学生的信息,包括学生的姓名、年龄、成绩等,并提供添加、编辑、删除、查询等功能。这个项目涉及到数据库操作、用户界面设计、数据验证等方面的技术。 代码示例: import tkinter as tk # 导入 Tkinter 库 import sqli…...

基于SpringBoot的公益慈善平台
一、项目背景介绍: 基于SpringBoot的公益慈善平台是一款致力于为社会所有人带来便利服务的B/S架构的应用程序。随着网络技术的发展,公益慈善网站已经逐渐成为公益行动的主要信息载体。在这个平台上,主要有管理员、捐赠者和志愿者三种角色&…...

Python网络爬虫(一):HTML/CSS/JavaScript介绍
1 HTML语言 1.1 HTML简介 HTML指的是超文本标记语言:HyperText Markup Language,它不是一门编程语言,而是一种标记语言,即一套标记标签。HTML是纯文本类型的语言,使用HTML编写的网页文件也是标准的文本文件,可以使用任意的文本编辑器例如记事本打开HTML文件,查看并修改H…...

机器学习每周挑战——旅游景点数据分析
数据的截图,数据的说明: # 字段 数据类型 # 城市 string # 名称 string # 星级 string # 评分 float # 价格 float # 销量 int # 省/市/区 string # 坐标 string # 简介 string # 是否免费 bool # 具体地址 string拿到数据…...
开发语言漫谈-C语言
个人认为C语言是最伟大的开发语言(没有之一)。C语言开创了高级语言的新时代。比C更低级的是汇编语言,这个东西就是反人类的玩意。之后的语言或多或少都受C语言的影响。更神奇的是直到现在,C语言还有生命力。C语言的发明人丹尼斯里…...
vue3导入excel并解析excel数据渲染到表格中,纯前端实现。
需求 用户将已有的excel上传到系统,并将excel数据同步到页面的表格中进行二次编辑,由于excel数据不是最终数据,只是批量的一个初始模板,后端不需要存储,所以该功能由前端独立完成。 吐槽 系统中文件上传下载预览三部…...
Java常用API之Encoders类解读
写在开头:本文用于作者学习Java常用API 我将官方文档中Encoders类中所有API全测了一遍并打印了结果,日拱一卒,常看常新 在Spark中,Encoders类提供了一些静态方法用于创建不同数据类型的编码器。 首先,我遇到这样一个…...

java中大型医院HIS系统源码 Angular+Nginx+SpringBoot云HIS运维平台源码
java中大型医院HIS系统源码 AngularNginxSpringBoot云HIS运维平台源码 云HIS系统是一款满足基层医院各类业务需要的健康云产品。该产品能帮助基层医院完成日常各类业务,提供病患预约挂号支持、病患问诊、电子病历、开药发药、会员管理、统计查询、医生工作站和护士工…...

windows部署Jenkins并远程部署tomcat
目录 1、Jenkins官网下载Jenkins 2、安装Jenkins 3、修改Home directory 4、插件安装及系统配置 5、Tomcat安装及配置 5.1、修改配置文件,屏蔽以下代码 5.2、新增登录用户 5.3、编码格式修改 5.4、启动tomcat 6、Jenkins远程部署war包 6.1、General配置 6.2、Sourc…...
设计模式|责任链模式(Chain of Responsibility Pattern)
文章目录 结构优点缺点使用责任链的步骤示例有哪些知名框架采用了责任链模式责任链模式和链表有什么关联常见面试题 责任链模式(Chain of Responsibility Pattern)是一种行为设计模式,它允许你创建一个对象链。请求将沿着这个链传递ÿ…...

文件服务器之二:SAMBA服务器
文章目录 什么是SAMBASAMBA的发展历史与名称的由来SAMBA常见的应用 SAMBA服务器基础配置配置共享资源Windows挂载共享Linux挂载共享 什么是SAMBA 下图来自百度百科 SAMBA的发展历史与名称的由来 Samba是一款开源的文件共享软件,它基于SMB(Server Messa…...

20.安全性测试与评估
每年都会涉及;可能会考大题;多记!!! 典型考点:sql注入、xss; 从2个方面记: 1、测试对象的功能、性能; 2、相关设备的工作原理; 如防火墙,要了解防…...
阿里巴巴实习面经
本人bg:浙江大学,计算机研二,本科也是浙大计算机专业的。 在阿里巴巴达摩院实习,算法岗,我是去年拿到的阿里巴巴达摩院的实习offer,这个过程还是比较惊心动魄,所以我称之为惊心动魄版本…...

javaweb学习(day11-监听器Listener过滤器Filter)
一、监听器Listener 1 Listener介绍 Listener 监听器它是 JavaWeb 的三大组件之一。JavaWeb 的三大组件分别是:Servlet 程 序、Listener 监听器、Filter 过滤器 Listener 是 JavaEE 的规范,就是接口 监听器的作用是,监听某种变化(一般就是对…...

教你快速认识Java中的抽象类和接口
目录 引言 抽象类(Abstract Class) 抽象类的概念 抽象类的图标 抽象类的语法 抽象类的特点 接口(Interface) 接口的概念 接口的图标 接口的语法 接口的特点 接口的使用 接口的意义 抽象类与接口的区别 Object类 结…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
深入理解Optional:处理空指针异常
1. 使用Optional处理可能为空的集合 在Java开发中,集合判空是一个常见但容易出错的场景。传统方式虽然可行,但存在一些潜在问题: // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能
指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...

sshd代码修改banner
sshd服务连接之后会收到字符串: SSH-2.0-OpenSSH_9.5 容易被hacker识别此服务为sshd服务。 是否可以通过修改此banner达到让人无法识别此服务的目的呢? 不能。因为这是写的SSH的协议中的。 也就是协议规定了banner必须这么写。 SSH- 开头,…...
[特殊字符] 手撸 Redis 互斥锁那些坑
📖 手撸 Redis 互斥锁那些坑 最近搞业务遇到高并发下同一个 key 的互斥操作,想实现分布式环境下的互斥锁。于是私下顺手手撸了个基于 Redis 的简单互斥锁,也顺便跟 Redisson 的 RLock 机制对比了下,记录一波,别踩我踩过…...

内窥镜检查中基于提示的息肉分割|文献速递-深度学习医疗AI最新文献
Title 题目 Prompt-based polyp segmentation during endoscopy 内窥镜检查中基于提示的息肉分割 01 文献速递介绍 以下是对这段英文内容的中文翻译: ### 胃肠道癌症的发病率呈上升趋势,且有年轻化倾向(Bray等人,2018&#x…...
HTML中各种标签的作用
一、HTML文件主要标签结构及说明 1. <!DOCTYPE html> 作用:声明文档类型,告知浏览器这是 HTML5 文档。 必须:是。 2. <html lang“zh”>. </html> 作用:包裹整个网页内容,lang"z…...