当前位置: 首页 > news >正文

Celery的任务流

在这里插入图片描述

Celery的任务流

在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此目标,Celery使用一种叫做signatures的东西

celery的简单使用

signature的引入

signature官方文档

可以简单理解为signature是将之前的异步任务以某种方式包装,包装后的异步任务仍可以使用之前的delay()和apply_async()方法,并且包装后的异步任务就可以以多种方式组合成复杂的工作流程

先创建一个tasks.py

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c# 减
@app.task
def subtract_num(a, b):print(f'{a}-{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a - breturn c# 乘
@app.task
def multiply_num(a, b):print(f'{a}*{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a * breturn c# 除
@app.task
def divide_num(a, b):print(f'{a}/{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a / breturn c@app.task
def test(args):print(args)return args

运行celery消费者

# 格式为:celery -A app对象所在的文件 worker -l 日志级别 -Q 队列名称(也可以不指定,默认为celry)
celery -A tasks worker -l info -Q test

在这里插入图片描述

用signature对上面的add_num包装

from celery import signature
from tasks import add_num, subtract_num, multiply_num, divide_num# 方法1
sign = add_num.signature((1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法2
sign = signature('tasks.add_num', (1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法3
sign = signature(add_num, (1, 1), queue='test')
ret = sign.delay()
print(ret.get())

chain的使用

chain官方链接

chain可以将signature包装的任务函数一个一个执行,一个执行完将执行return结果传递给下一个任务函数

from celery import signature, chain
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (2,), queue='test')
multiply_sign = signature(multiply_num, (2,), queue='test')
divide_sign = signature(divide_num, (2,), queue='test')# 对某个数依次做加减乘除处理
chain1 = chain(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = chain1.delay()
print(ret.get())

在这里插入图片描述

可以看到异步任务依次执行,并将上一个异步任务的结果作为参数传递给下一个,形成一个链条

group的使用

group官方链接

group可以将signature包装的任务函数并行执行,返回一组返回值

from celery import signature, chain, group
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')# 对某个数分别做加减乘除处理group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = group1.delay()
print(ret.get())
#[8, 4, 12, 3.0]

在这里插入图片描述
可以看到相比于chain,group里的任务是同时执行

chord的使用

chord官方链接
依赖一个group任务,group任务结束后,将所有子任务的返回值作为参数传递给chord的回调函数,即chord由group任务组与回调函数组成

上代码

from celery import signature, chain, group, chord
from tasks import add_num, subtract_num, multiply_num, divide_num, testadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)#包装test异步任务函数
test_sign = signature(test, queue='test')c1 = chord(group1, test_sign)
c1.delay()

在这里插入图片描述
可以看出,在执行完加减乘除所有异步任务后,chord会将任务组的结果作为list交给test函数,这里的test有点像回调函数

PS:根据我的观察,chain,group,chord在执行完后都会返回一个任务id,其中chain的任务id为任务链里最后一个任务的id,group的任务id是一个临时的任务id(group任务都结束后就会消失),chord的任务id是回调函数的任务id。因此chain和chord在任务结束后,任务结果还是可以查到的,而group则查询不到,因此group的任务结果可能无法用AsyncResult查询到

最后附上celery关于任务工作流的官方链接
celery工作流

PS

有的时候我们可能需要在celery的task函数中调用其他的celery函数,并且需要同步的获取结果(其实着本质上就是把异步的celery函数变成同步运行),具体如下,先创建一个tasks.py

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():#在test函数里调用add_num函数,并且同步获取结果,将结果作为test函数的返回值ret = add_num.delay(1,2)ret = ret.get()return ret
#启动消费者
celery -A tasks worker -l info

调用test异步函数

from tasks import test
ret = test.delay()

在这里插入图片描述

结果就是出错了,因为官方不建议在一个异步任务中区等待另一个异步任务的返回结果,所以这个时候就可以通过上面的chain方法实现这个需求。当然还有一种不建议的方法就是在同步获取celery任务结果的get方法中添加参数disable_sync_subtasks=False,具体如下

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():ret = add_num.delay(1, 2)ret = ret.get(disable_sync_subtasks=False)#在这添加disable_sync_subtasks=Falsereturn ret

再调用一次test方法

在这里插入图片描述
成功调用

详见celery官方链接
链接传送门

结语

写这些,仅记录自己学习使用celery的过程。如果有什么错误的地方,还请大家批评指正。最后,希望小伙伴们都能有所收获。

在这里插入图片描述

相关文章:

Celery的任务流

Celery的任务流 在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此…...

使用Arcpy进行数据批处理-批量裁剪

时空大数据使我们面临前所未有的机遇和挑战,尤其在地学、遥感或空间技术等专业领域,无疑是一个全新的时代。 伴随着时空大数据的到来,海量数据的处理是一个所有科研工作者都无法忽视的重要问题。传统的数据(主要指空间数据&#x…...

【攻防世界】ics-05

php://filter 伪协议查看源码 preg_replace 函数漏洞 1.获取网页源代码。多点点界面,发现点云平台设备维护中心时,页面发生变化。 /?pageindex 输入什么显示什么,有回显。 用php://filter读取网页源代码 ?pagephp://filter/readconvert.…...

VTK的交互器

VTK中鼠标消息是在交互类型对象(interactorstyle)中响应,因此通过为交互类型对象(interactorstyle)添加观察者(observer)来监听相应的消息,当消息触发时,由命令模式执行相…...

ChatGPT(3.5版本)开放无需注册:算力背后的数据之战悄然打响

✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢,在这里我会分享我的知识和经验。&am…...

python项目练习——14.学生管理系统

这个项目可以让用户管理学生的信息,包括学生的姓名、年龄、成绩等,并提供添加、编辑、删除、查询等功能。这个项目涉及到数据库操作、用户界面设计、数据验证等方面的技术。 代码示例: import tkinter as tk # 导入 Tkinter 库 import sqli…...

基于SpringBoot的公益慈善平台

一、项目背景介绍: 基于SpringBoot的公益慈善平台是一款致力于为社会所有人带来便利服务的B/S架构的应用程序。随着网络技术的发展,公益慈善网站已经逐渐成为公益行动的主要信息载体。在这个平台上,主要有管理员、捐赠者和志愿者三种角色&…...

Python网络爬虫(一):HTML/CSS/JavaScript介绍

1 HTML语言 1.1 HTML简介 HTML指的是超文本标记语言:HyperText Markup Language,它不是一门编程语言,而是一种标记语言,即一套标记标签。HTML是纯文本类型的语言,使用HTML编写的网页文件也是标准的文本文件,可以使用任意的文本编辑器例如记事本打开HTML文件,查看并修改H…...

机器学习每周挑战——旅游景点数据分析

数据的截图,数据的说明: # 字段 数据类型 # 城市 string # 名称 string # 星级 string # 评分 float # 价格 float # 销量 int # 省/市/区 string # 坐标 string # 简介 string # 是否免费 bool # 具体地址 string拿到数据…...

开发语言漫谈-C语言

个人认为C语言是最伟大的开发语言(没有之一)。C语言开创了高级语言的新时代。比C更低级的是汇编语言,这个东西就是反人类的玩意。之后的语言或多或少都受C语言的影响。更神奇的是直到现在,C语言还有生命力。C语言的发明人丹尼斯里…...

vue3导入excel并解析excel数据渲染到表格中,纯前端实现。

需求 用户将已有的excel上传到系统,并将excel数据同步到页面的表格中进行二次编辑,由于excel数据不是最终数据,只是批量的一个初始模板,后端不需要存储,所以该功能由前端独立完成。 吐槽 系统中文件上传下载预览三部…...

Java常用API之Encoders类解读

写在开头:本文用于作者学习Java常用API 我将官方文档中Encoders类中所有API全测了一遍并打印了结果,日拱一卒,常看常新 在Spark中,Encoders类提供了一些静态方法用于创建不同数据类型的编码器。 首先,我遇到这样一个…...

java中大型医院HIS系统源码 Angular+Nginx+SpringBoot云HIS运维平台源码

java中大型医院HIS系统源码 AngularNginxSpringBoot云HIS运维平台源码 云HIS系统是一款满足基层医院各类业务需要的健康云产品。该产品能帮助基层医院完成日常各类业务,提供病患预约挂号支持、病患问诊、电子病历、开药发药、会员管理、统计查询、医生工作站和护士工…...

windows部署Jenkins并远程部署tomcat

目录 1、Jenkins官网下载Jenkins 2、安装Jenkins 3、修改Home directory 4、插件安装及系统配置 5、Tomcat安装及配置 5.1、修改配置文件,屏蔽以下代码 5.2、新增登录用户 5.3、编码格式修改 5.4、启动tomcat 6、Jenkins远程部署war包 6.1、General配置 6.2、Sourc…...

设计模式|责任链模式(Chain of Responsibility Pattern)

文章目录 结构优点缺点使用责任链的步骤示例有哪些知名框架采用了责任链模式责任链模式和链表有什么关联常见面试题 责任链模式(Chain of Responsibility Pattern)是一种行为设计模式,它允许你创建一个对象链。请求将沿着这个链传递&#xff…...

文件服务器之二:SAMBA服务器

文章目录 什么是SAMBASAMBA的发展历史与名称的由来SAMBA常见的应用 SAMBA服务器基础配置配置共享资源Windows挂载共享Linux挂载共享 什么是SAMBA 下图来自百度百科 SAMBA的发展历史与名称的由来 Samba是一款开源的文件共享软件,它基于SMB(Server Messa…...

20.安全性测试与评估

每年都会涉及;可能会考大题;多记!!! 典型考点:sql注入、xss; 从2个方面记: 1、测试对象的功能、性能; 2、相关设备的工作原理; 如防火墙,要了解防…...

阿里巴巴实习面经

本人bg:浙江大学,计算机研二,本科也是浙大计算机专业的。 在阿里巴巴达摩院实习,算法岗,我是去年拿到的阿里巴巴达摩院的实习offer,这个过程还是比较惊心动魄,所以我称之为惊心动魄版本&#xf…...

javaweb学习(day11-监听器Listener过滤器Filter)

一、监听器Listener 1 Listener介绍 Listener 监听器它是 JavaWeb 的三大组件之一。JavaWeb 的三大组件分别是:Servlet 程 序、Listener 监听器、Filter 过滤器 Listener 是 JavaEE 的规范,就是接口 监听器的作用是,监听某种变化(一般就是对…...

教你快速认识Java中的抽象类和接口

目录 引言 抽象类(Abstract Class) 抽象类的概念 抽象类的图标 抽象类的语法 抽象类的特点 接口(Interface) 接口的概念 接口的图标 接口的语法 接口的特点 接口的使用 接口的意义 抽象类与接口的区别 Object类 结…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波:可以用来解决所提出的地质任务的波;干扰波:所有妨碍辨认、追踪有效波的其他波。 地震勘探中,有效波和干扰波是相对的。例如,在反射波…...

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...

Appium+python自动化(十六)- ADB命令

简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止

<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet&#xff1a; https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...

基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解

JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用&#xff0c;结合SQLite数据库实现联系人管理功能&#xff0c;并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能&#xff0c;同时可以最小化到系统…...

嵌入式学习笔记DAY33(网络编程——TCP)

一、网络架构 C/S &#xff08;client/server 客户端/服务器&#xff09;&#xff1a;由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序&#xff0c;负责提供用户界面和交互逻辑 &#xff0c;接收用户输入&#xff0c;向服务器发送请求&#xff0c;并展示服务…...