Celery的任务流

Celery的任务流
在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此目标,Celery使用一种叫做signatures的东西
celery的简单使用
signature的引入
signature官方文档
可以简单理解为signature是将之前的异步任务以某种方式包装,包装后的异步任务仍可以使用之前的delay()和apply_async()方法,并且包装后的异步任务就可以以多种方式组合成复杂的工作流程
先创建一个tasks.py
import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a + breturn c# 减
@app.task
def subtract_num(a, b):print(f'{a}-{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a - breturn c# 乘
@app.task
def multiply_num(a, b):print(f'{a}*{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a * breturn c# 除
@app.task
def divide_num(a, b):print(f'{a}/{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a / breturn c@app.task
def test(args):print(args)return args
运行celery消费者
# 格式为:celery -A app对象所在的文件 worker -l 日志级别 -Q 队列名称(也可以不指定,默认为celry)
celery -A tasks worker -l info -Q test

用signature对上面的add_num包装
from celery import signature
from tasks import add_num, subtract_num, multiply_num, divide_num# 方法1
sign = add_num.signature((1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法2
sign = signature('tasks.add_num', (1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法3
sign = signature(add_num, (1, 1), queue='test')
ret = sign.delay()
print(ret.get())
chain的使用
chain官方链接
chain可以将signature包装的任务函数一个一个执行,一个执行完将执行return结果传递给下一个任务函数
from celery import signature, chain
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (2,), queue='test')
multiply_sign = signature(multiply_num, (2,), queue='test')
divide_sign = signature(divide_num, (2,), queue='test')# 对某个数依次做加减乘除处理
chain1 = chain(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = chain1.delay()
print(ret.get())

可以看到异步任务依次执行,并将上一个异步任务的结果作为参数传递给下一个,形成一个链条
group的使用
group官方链接
group可以将signature包装的任务函数并行执行,返回一组返回值
from celery import signature, chain, group
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')# 对某个数分别做加减乘除处理group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = group1.delay()
print(ret.get())
#[8, 4, 12, 3.0]

可以看到相比于chain,group里的任务是同时执行
chord的使用
chord官方链接
依赖一个group任务,group任务结束后,将所有子任务的返回值作为参数传递给chord的回调函数,即chord由group任务组与回调函数组成
上代码
from celery import signature, chain, group, chord
from tasks import add_num, subtract_num, multiply_num, divide_num, testadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)#包装test异步任务函数
test_sign = signature(test, queue='test')c1 = chord(group1, test_sign)
c1.delay()

可以看出,在执行完加减乘除所有异步任务后,chord会将任务组的结果作为list交给test函数,这里的test有点像回调函数
PS:根据我的观察,chain,group,chord在执行完后都会返回一个任务id,其中chain的任务id为任务链里最后一个任务的id,group的任务id是一个临时的任务id(group任务都结束后就会消失),chord的任务id是回调函数的任务id。因此chain和chord在任务结束后,任务结果还是可以查到的,而group则查询不到,因此group的任务结果可能无法用AsyncResult查询到
最后附上celery关于任务工作流的官方链接
celery工作流
PS
有的时候我们可能需要在celery的task函数中调用其他的celery函数,并且需要同步的获取结果(其实着本质上就是把异步的celery函数变成同步运行),具体如下,先创建一个tasks.py
import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():#在test函数里调用add_num函数,并且同步获取结果,将结果作为test函数的返回值ret = add_num.delay(1,2)ret = ret.get()return ret
#启动消费者
celery -A tasks worker -l info
调用test异步函数
from tasks import test
ret = test.delay()

结果就是出错了,因为官方不建议在一个异步任务中区等待另一个异步任务的返回结果,所以这个时候就可以通过上面的chain方法实现这个需求。当然还有一种不建议的方法就是在同步获取celery任务结果的get方法中添加参数disable_sync_subtasks=False,具体如下
import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3) # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():ret = add_num.delay(1, 2)ret = ret.get(disable_sync_subtasks=False)#在这添加disable_sync_subtasks=Falsereturn ret
再调用一次test方法

成功调用
详见celery官方链接
链接传送门
结语
写这些,仅记录自己学习使用celery的过程。如果有什么错误的地方,还请大家批评指正。最后,希望小伙伴们都能有所收获。

相关文章:
Celery的任务流
Celery的任务流 在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此…...
使用Arcpy进行数据批处理-批量裁剪
时空大数据使我们面临前所未有的机遇和挑战,尤其在地学、遥感或空间技术等专业领域,无疑是一个全新的时代。 伴随着时空大数据的到来,海量数据的处理是一个所有科研工作者都无法忽视的重要问题。传统的数据(主要指空间数据&#x…...
【攻防世界】ics-05
php://filter 伪协议查看源码 preg_replace 函数漏洞 1.获取网页源代码。多点点界面,发现点云平台设备维护中心时,页面发生变化。 /?pageindex 输入什么显示什么,有回显。 用php://filter读取网页源代码 ?pagephp://filter/readconvert.…...
VTK的交互器
VTK中鼠标消息是在交互类型对象(interactorstyle)中响应,因此通过为交互类型对象(interactorstyle)添加观察者(observer)来监听相应的消息,当消息触发时,由命令模式执行相…...
ChatGPT(3.5版本)开放无需注册:算力背后的数据之战悄然打响
✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢,在这里我会分享我的知识和经验。&am…...
python项目练习——14.学生管理系统
这个项目可以让用户管理学生的信息,包括学生的姓名、年龄、成绩等,并提供添加、编辑、删除、查询等功能。这个项目涉及到数据库操作、用户界面设计、数据验证等方面的技术。 代码示例: import tkinter as tk # 导入 Tkinter 库 import sqli…...
基于SpringBoot的公益慈善平台
一、项目背景介绍: 基于SpringBoot的公益慈善平台是一款致力于为社会所有人带来便利服务的B/S架构的应用程序。随着网络技术的发展,公益慈善网站已经逐渐成为公益行动的主要信息载体。在这个平台上,主要有管理员、捐赠者和志愿者三种角色&…...
Python网络爬虫(一):HTML/CSS/JavaScript介绍
1 HTML语言 1.1 HTML简介 HTML指的是超文本标记语言:HyperText Markup Language,它不是一门编程语言,而是一种标记语言,即一套标记标签。HTML是纯文本类型的语言,使用HTML编写的网页文件也是标准的文本文件,可以使用任意的文本编辑器例如记事本打开HTML文件,查看并修改H…...
机器学习每周挑战——旅游景点数据分析
数据的截图,数据的说明: # 字段 数据类型 # 城市 string # 名称 string # 星级 string # 评分 float # 价格 float # 销量 int # 省/市/区 string # 坐标 string # 简介 string # 是否免费 bool # 具体地址 string拿到数据…...
开发语言漫谈-C语言
个人认为C语言是最伟大的开发语言(没有之一)。C语言开创了高级语言的新时代。比C更低级的是汇编语言,这个东西就是反人类的玩意。之后的语言或多或少都受C语言的影响。更神奇的是直到现在,C语言还有生命力。C语言的发明人丹尼斯里…...
vue3导入excel并解析excel数据渲染到表格中,纯前端实现。
需求 用户将已有的excel上传到系统,并将excel数据同步到页面的表格中进行二次编辑,由于excel数据不是最终数据,只是批量的一个初始模板,后端不需要存储,所以该功能由前端独立完成。 吐槽 系统中文件上传下载预览三部…...
Java常用API之Encoders类解读
写在开头:本文用于作者学习Java常用API 我将官方文档中Encoders类中所有API全测了一遍并打印了结果,日拱一卒,常看常新 在Spark中,Encoders类提供了一些静态方法用于创建不同数据类型的编码器。 首先,我遇到这样一个…...
java中大型医院HIS系统源码 Angular+Nginx+SpringBoot云HIS运维平台源码
java中大型医院HIS系统源码 AngularNginxSpringBoot云HIS运维平台源码 云HIS系统是一款满足基层医院各类业务需要的健康云产品。该产品能帮助基层医院完成日常各类业务,提供病患预约挂号支持、病患问诊、电子病历、开药发药、会员管理、统计查询、医生工作站和护士工…...
windows部署Jenkins并远程部署tomcat
目录 1、Jenkins官网下载Jenkins 2、安装Jenkins 3、修改Home directory 4、插件安装及系统配置 5、Tomcat安装及配置 5.1、修改配置文件,屏蔽以下代码 5.2、新增登录用户 5.3、编码格式修改 5.4、启动tomcat 6、Jenkins远程部署war包 6.1、General配置 6.2、Sourc…...
设计模式|责任链模式(Chain of Responsibility Pattern)
文章目录 结构优点缺点使用责任链的步骤示例有哪些知名框架采用了责任链模式责任链模式和链表有什么关联常见面试题 责任链模式(Chain of Responsibility Pattern)是一种行为设计模式,它允许你创建一个对象链。请求将沿着这个链传递ÿ…...
文件服务器之二:SAMBA服务器
文章目录 什么是SAMBASAMBA的发展历史与名称的由来SAMBA常见的应用 SAMBA服务器基础配置配置共享资源Windows挂载共享Linux挂载共享 什么是SAMBA 下图来自百度百科 SAMBA的发展历史与名称的由来 Samba是一款开源的文件共享软件,它基于SMB(Server Messa…...
20.安全性测试与评估
每年都会涉及;可能会考大题;多记!!! 典型考点:sql注入、xss; 从2个方面记: 1、测试对象的功能、性能; 2、相关设备的工作原理; 如防火墙,要了解防…...
阿里巴巴实习面经
本人bg:浙江大学,计算机研二,本科也是浙大计算机专业的。 在阿里巴巴达摩院实习,算法岗,我是去年拿到的阿里巴巴达摩院的实习offer,这个过程还是比较惊心动魄,所以我称之为惊心动魄版本…...
javaweb学习(day11-监听器Listener过滤器Filter)
一、监听器Listener 1 Listener介绍 Listener 监听器它是 JavaWeb 的三大组件之一。JavaWeb 的三大组件分别是:Servlet 程 序、Listener 监听器、Filter 过滤器 Listener 是 JavaEE 的规范,就是接口 监听器的作用是,监听某种变化(一般就是对…...
教你快速认识Java中的抽象类和接口
目录 引言 抽象类(Abstract Class) 抽象类的概念 抽象类的图标 抽象类的语法 抽象类的特点 接口(Interface) 接口的概念 接口的图标 接口的语法 接口的特点 接口的使用 接口的意义 抽象类与接口的区别 Object类 结…...
【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
2024年赣州旅游投资集团社会招聘笔试真
2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...
12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...
优选算法第十二讲:队列 + 宽搜 优先级队列
优选算法第十二讲:队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...
使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台
🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...
中医有效性探讨
文章目录 西医是如何发展到以生物化学为药理基础的现代医学?传统医学奠基期(远古 - 17 世纪)近代医学转型期(17 世纪 - 19 世纪末)现代医学成熟期(20世纪至今) 中医的源远流长和一脉相承远古至…...
如何更改默认 Crontab 编辑器 ?
在 Linux 领域中,crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用,用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益,允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...
学习一下用鸿蒙DevEco Studio HarmonyOS5实现百度地图
在鸿蒙(HarmonyOS5)中集成百度地图,可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API,可以构建跨设备的定位、导航和地图展示功能。 1. 鸿蒙环境准备 开发工具:下载安装 De…...
