当前位置: 首页 > news >正文

Python 中的 Kombu 类库

Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功能。安装命令 pip install kombu redis

一.主要功能

1.消息队列

提供可靠的消息传递和队列机制,允许将消息从生产者发送到消费者。

2.消息代理支持

支持多种消息代理,如 RabbitMQ、Redis、Amazon SQS、MongoDB 等。

3.异步任务

可以用来实现异步任务处理,配合 Celery 使用时,可以构建分布式任务队列。

4.消息格式

支持多种消息格式,包括 JSON、YAML、pickle 等。

5.路由和交换

提供了高级的消息路由和交换功能,可以实现复杂的消息分发逻辑。

二.基本使用

1. 创建消息生产者

生产者负责向消息队列发送消息。

(1)Redis 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")

2. 创建消息消费者

消费者从消息队列中接收和处理消息。

(1)Redis 消息代理

from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'def callback(body, message):print(f"Received message: {body}")message.ack()  # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'def callback(body, message):print(f"Received message: {body}")message.ack()  # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()

3. 高级用法:消息路由

Kombu 支持复杂的消息路由配置,以下示例展示了如何使用路由功能将消息发送到不同的队列。

(1)Redis 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")

4. 结合 Celery 使用

Kombu 通常与 Celery 一起使用来处理异步任务。简单理解,Kombu 是 Celery 的依赖库,Celery 需要 Kombu 来访问消息队列系统。同时 Celery 扩展了 Kombu 的功能,提供了一个高级的任务队列系统。Celery 使用 Kombu 来处理与消息代理之间的连接、消息发送、消息接收等操作。

(1)Redis 消息代理

from celery import Celery# 配置 Celery 使用 Redis 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y

在 Dify 中默认消息代理使用 Redis,如下所示:

(2)RabbitMQ 消息代理

from celery import Celery# 配置 Celery 使用 RabbitMQ 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='amqp://guest:guest@localhost//')@app.task
def add(x, y):return x + y

Kombu 是一个强大的消息传递库,提供了多种消息代理的支持,并能实现复杂的消息队列和路由功能。它支持多种消息格式和高级功能,如交换机、队列、路由等。基础用法 包括创建生产者和消费者,通过消息代理发送和接收消息。高级用法 包括消息路由、与 Celery 集成等,用于构建分布式系统和异步任务处理。

参考文献

[1] https://github.com/celery/kombu

[2] https://docs.celeryq.dev/projects/kombu/en/stable/

[3] 消息队列 Kombu 之 基本架构:https://www.cnblogs.com/rossiXYZ/p/14454761.html

[4] Kombu 库用法详解(连接、连接池、生产者、消费者):https://blog.csdn.net/weixin_44799217/article/details/128490325

NLP工程化(星球号)

相关文章:

Python 中的 Kombu 类库

Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功…...

safepoint是什么?有什么用?

在JVM中,safepoint(安全点)是一个非常重要的概念,特别是在垃圾回收(GC)和其他需要暂停所有应用线程的操作中。 什么是safepoint Safepoint是JVM执行过程中一个特定的位置,在这个位置上&#x…...

axios相关知识点

一、基本概念 1、基于Promise:Axios通过Promise实现异步请求,避免了传统回调函数导致的“回调地狱”问题,使得代码更加清晰和易于维护。 2、跨平台:Axios既可以在浏览器中运行,也可以在Node.js环境中使用,为前后端开…...

LeetCode 面试经典150题 67.二进制求和

415.字符串相加 思路一模一样 题目:给你两个二进制字符串 a 和 b ,以二进制字符串的形式返回它们的和。 eg: 输入a“1010” b“1011” 输出“10101” 思路:从右开始遍历两个字符串,因为右边是低位先运算。如果…...

Dell PowerEdge 网络恢复笔记

我有一台Dell的PowerEdge服务器,之前安装了Ubuntu 20 桌面版。突然有一天不能开机了。 故障排查 Disk Error 首先是看一下机器的正面,有一个非常小的液晶显示器,只能显示一排字。 上面显示Disk Error,然后看挂载的硬盘仓&#…...

Java面试——集合篇

1.Java中常用的容器有哪些? 容器主要包括 Collection 和 Map 两种,Collection 存储着对象的集合,而 Map 存储着键值对(两个对象)的映射表。 如图: 面试官追问:说说集合有哪些类及他们各自的区别和特点? S…...

算法【双向广搜】

双向广搜常见用途 1:小优化。bfs的剪枝策略,分两侧展开分支,哪侧数量少就从哪侧展开。 2:用于解决特征很明显的一类问题。特征:全量样本不允许递归完全展开,但是半量样本可以完全展开。过程:把…...

javascript检测数据类型的方法

1. typeof 运算符 typeof是一个用来检测变量的基本数据类型的运算符。它可以返回以下几种类型的字符串:“undefined”、“boolean”、“number”、“string”、“object”、“function” 和 “symbol”(ES6)。需要注意的是,对于 n…...

生信初学者教程(五):R语言基础

文章目录 数据类型整型逻辑型字符型日期型数值型复杂数数据结构向量矩阵数组列表因子数据框ts特殊值缺失值 (NA)无穷大 (Inf)非数字 (NaN)安装R包学习材料R语言是一种用于统计计算和图形展示的编程语言和软件环境,广泛应用于数据分析、统计建模和数据可视化。1991年:R语言的最…...

深度学习计算

一、层和块 块可以描述单个层、多个层组成的组件或整个模型。 通过定义块,组装块,可以实现复杂的神经网络。 一个块可以由多个class组成。 其实就是 自己定义神经网络net,自己定义层的顺序和具体的init、 forward函数。 层和块的顺序由sequen…...

Hexo博客私有部署Twikoo评论系统并迁移评论记录(自定义邮件回复模板)

部署 之前一直使用的artalk,现在想改用Twikoo,采用私有部署的方式。 私有部署 (Docker) 端口可以根据实际情况进行修改 docker run --name twikoo -e TWIKOO_THROTTLE1000 -p 8100:8100 -v ${PWD}/data:/app/data -e TWIKOO_PORT8100 -d imaegoo/twi…...

Vue.js 与 Flask/Django 后端配合:构建现代 Web 应用的最佳实践

Vue.js 与 Flask/Django 后端配合:构建现代 Web 应用的最佳实践 在现代 Web 开发中,前后端分离的架构已经成为主流。Vue.js 作为一个渐进式 JavaScript 框架,因其灵活性和易用性而广受欢迎。而 Flask 和 Django 则是 Python 生态中两个非常流…...

【笔记】自动驾驶预测与决策规划_Part3_路径与轨迹规划

文章目录 0. 前言1. 基于搜索的路径规划1.1 A* 算法1.2 Hybrid A* 算法 2. 基于采样的路径规划2.1 Frent Frame方法2.2 Cartesian →Frent 1D ( x , y ) (x, y) (x,y) —> ( s , l ) (s, l) (s,l)2.3 Cartesian →Frent 3D2.4 贝尔曼Bellman最优性原理2.5 高速轨迹采样——…...

Shiro-721—漏洞分析(CVE-2019-12422)

文章目录 Padding Oracle Attack 原理PKCS5填充怎么爆破攻击 漏洞原理源码分析漏洞复现 本文基于shiro550漏洞基础上分析,建议先看上期内容: https://blog.csdn.net/weixin_60521036/article/details/142373353 Padding Oracle Attack 原理 网上看了很多…...

【Python语言初识(一)】

一、python简史 1.1、python的历史 1989年圣诞节:Guido von Rossum开始写Python语言的编译器。1991年2月:第一个Python编译器(同时也是解释器)诞生,它是用C语言实现的(后面),可以调…...

Python 中的方法解析顺序(MRO)

在 Python 中,MRO(Method Resolution Order,方法解析顺序)是指类继承体系中,Python 如何确定在调用方法时的解析顺序。MRO 决定了在多继承环境下,Python 如何寻找方法或属性,即它会根据一定规则…...

MySQL表的内外连接

内连接 其实就是from 两个表 把笛卡尔积的表 再用where 进行条件筛选 ——之前我们写的多表查询就是内连接 基本格式: 外链接 没有向内连接那样真的把两个表连接形式一个表显示,而只是建立关系 外连接分为左链接和右链接 左链接 联合查询时候&#…...

系统架构设计师:软件架构的演化和维护

简简单单 Online zuozuo: 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo :本心、输入输出、结果 简简单单 Online zuozuo : 文章目录 系统架构设计师:软件架构的演化和维护前言软件架构演化的重要性面向对象的软件架构演…...

QT的dropEvent函数进入不了

在使用QT想实现拖拽功能的时候,发现了dropEvent没有调用运行,遂查找原因: 首先是网上都说要在dragEnterEvent里面使用event->accept(); 但我这边在出现问题之前就已经这样做了: void CanvasView::dragEnterEvent(QDragEnterEv…...

Spring Boot 入门

前言 Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级的基于 Spring 的应用程序。它简化了基于 Spring 的应用开发,通过提供一系列的“起步依赖”来快速启动和运行 Spring 应用。本文将带你深入了解 Spring Boot 的核心概念、关键特性&am…...

从多路复用到三维光阵:Arduino驱动8x8x8 LED立方体全解析

1. 项目概述:用Arduino点亮一个三维世界几年前,我第一次在创客展上看到一个8x8x8的LED立方体,那种由数百个光点构成的、在三维空间中流动的动画效果,瞬间就把我吸引住了。它不像普通的平面LED屏,而是真正有“深度”的光…...

潮州东方轻奢风全屋高定找哪家

开篇引言根据《2026年中国全屋定制行业发展报告》,潮州市全屋定制市场规模同比增长38%,其中全屋高端定制细分市场同比增长52%。目前,潮州市家庭全屋定制需求占比72%,高端定制需求占比45%。为了帮助潮州市消费者选择合规、靠谱、差…...

16个分片+2副本:pg_shard的master_create_worker_shards最佳实践

16个分片2副本:pg_shard的master_create_worker_shards最佳实践 【免费下载链接】pg_shard ATTENTION: pg_shard is superseded by Citus, its more powerful replacement 项目地址: https://gitcode.com/gh_mirrors/pg/pg_shard pg_shard作为PostgreSQL的分…...

Adobe-GenP 3.0:轻松激活Adobe全家桶的完整指南

Adobe-GenP 3.0:轻松激活Adobe全家桶的完整指南 【免费下载链接】Adobe-GenP Adobe CC 2019/2020/2021/2022/2023 GenP Universal Patch 3.0 项目地址: https://gitcode.com/gh_mirrors/ad/Adobe-GenP Adobe-GenP 3.0是一款专为Adobe Creative Cloud系列软件…...

终极STL到STEP转换指南:如何实现3D打印模型到CAD设计的无缝衔接

终极STL到STEP转换指南:如何实现3D打印模型到CAD设计的无缝衔接 【免费下载链接】stltostp Convert stl files to STEP brep files 项目地址: https://gitcode.com/gh_mirrors/st/stltostp 在数字化制造和工程设计领域,STL到STEP转换已成为连接3D…...

收藏|2026年大模型算法岗崛起!程序员小白入门高薪赛道全攻略

前些年,算法岗位一直稳居技术圈高薪行列,无数程序员争相入局,也成为计算机专业毕业生求职首选方向。 伴随大模型技术飞速迭代落地,行业就业格局迎来重大变革。如今含金量最高、人才缺口最大、长期发展潜力顶尖的岗位,已…...

Win11Debloat:Windows系统精简与隐私保护的专业解决方案

Win11Debloat:Windows系统精简与隐私保护的专业解决方案 【免费下载链接】Win11Debloat A simple, lightweight PowerShell script that allows you to remove pre-installed apps, disable telemetry, as well as perform various other changes to declutter and …...

CANN runtime:昇腾NPU 运行时的职责边界

个人主页:ujainu 文章目录前言为什么需要运行时这一层runtime管什么,不管什么Stream:并行的基本调度单位Event:跨Stream的同步锚点内存池化:少一次malloc就少一次卡顿任务队列:从计算图到硬件指令的最后一跳…...

【小白快速上手】 OpenClaw 安装部署全流程(含安装包)

OpenClaw 一键安装包|一键部署,告别复杂环境配置 适配系统:Windows10/11 64 位当前版本:v2.7.5(虾壳云版)核心优势:全程可视化操作,无需命令行、无需手动配置 Python/Node.js&#…...

完整解决方案:PL2303 Windows 10驱动快速安装指南

完整解决方案:PL2303 Windows 10驱动快速安装指南 【免费下载链接】pl2303-win10 Windows 10 driver for end-of-life PL-2303 chipsets. 项目地址: https://gitcode.com/gh_mirrors/pl/pl2303-win10 如果你正在Windows 10系统上使用PL-2303HXA或PL-2303XA芯…...