当前位置: 首页 > news >正文

Python 中的 Kombu 类库

Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功能。安装命令 pip install kombu redis

一.主要功能

1.消息队列

提供可靠的消息传递和队列机制,允许将消息从生产者发送到消费者。

2.消息代理支持

支持多种消息代理,如 RabbitMQ、Redis、Amazon SQS、MongoDB 等。

3.异步任务

可以用来实现异步任务处理,配合 Celery 使用时,可以构建分布式任务队列。

4.消息格式

支持多种消息格式,包括 JSON、YAML、pickle 等。

5.路由和交换

提供了高级的消息路由和交换功能,可以实现复杂的消息分发逻辑。

二.基本使用

1. 创建消息生产者

生产者负责向消息队列发送消息。

(1)Redis 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")

2. 创建消息消费者

消费者从消息队列中接收和处理消息。

(1)Redis 消息代理

from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'def callback(body, message):print(f"Received message: {body}")message.ack()  # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'def callback(body, message):print(f"Received message: {body}")message.ack()  # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()

3. 高级用法:消息路由

Kombu 支持复杂的消息路由配置,以下示例展示了如何使用路由功能将消息发送到不同的队列。

(1)Redis 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")

4. 结合 Celery 使用

Kombu 通常与 Celery 一起使用来处理异步任务。简单理解,Kombu 是 Celery 的依赖库,Celery 需要 Kombu 来访问消息队列系统。同时 Celery 扩展了 Kombu 的功能,提供了一个高级的任务队列系统。Celery 使用 Kombu 来处理与消息代理之间的连接、消息发送、消息接收等操作。

(1)Redis 消息代理

from celery import Celery# 配置 Celery 使用 Redis 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y

在 Dify 中默认消息代理使用 Redis,如下所示:

(2)RabbitMQ 消息代理

from celery import Celery# 配置 Celery 使用 RabbitMQ 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='amqp://guest:guest@localhost//')@app.task
def add(x, y):return x + y

Kombu 是一个强大的消息传递库,提供了多种消息代理的支持,并能实现复杂的消息队列和路由功能。它支持多种消息格式和高级功能,如交换机、队列、路由等。基础用法 包括创建生产者和消费者,通过消息代理发送和接收消息。高级用法 包括消息路由、与 Celery 集成等,用于构建分布式系统和异步任务处理。

参考文献

[1] https://github.com/celery/kombu

[2] https://docs.celeryq.dev/projects/kombu/en/stable/

[3] 消息队列 Kombu 之 基本架构:https://www.cnblogs.com/rossiXYZ/p/14454761.html

[4] Kombu 库用法详解(连接、连接池、生产者、消费者):https://blog.csdn.net/weixin_44799217/article/details/128490325

NLP工程化(星球号)

相关文章:

Python 中的 Kombu 类库

Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功…...

safepoint是什么?有什么用?

在JVM中,safepoint(安全点)是一个非常重要的概念,特别是在垃圾回收(GC)和其他需要暂停所有应用线程的操作中。 什么是safepoint Safepoint是JVM执行过程中一个特定的位置,在这个位置上&#x…...

axios相关知识点

一、基本概念 1、基于Promise:Axios通过Promise实现异步请求,避免了传统回调函数导致的“回调地狱”问题,使得代码更加清晰和易于维护。 2、跨平台:Axios既可以在浏览器中运行,也可以在Node.js环境中使用,为前后端开…...

LeetCode 面试经典150题 67.二进制求和

415.字符串相加 思路一模一样 题目:给你两个二进制字符串 a 和 b ,以二进制字符串的形式返回它们的和。 eg: 输入a“1010” b“1011” 输出“10101” 思路:从右开始遍历两个字符串,因为右边是低位先运算。如果…...

Dell PowerEdge 网络恢复笔记

我有一台Dell的PowerEdge服务器,之前安装了Ubuntu 20 桌面版。突然有一天不能开机了。 故障排查 Disk Error 首先是看一下机器的正面,有一个非常小的液晶显示器,只能显示一排字。 上面显示Disk Error,然后看挂载的硬盘仓&#…...

Java面试——集合篇

1.Java中常用的容器有哪些? 容器主要包括 Collection 和 Map 两种,Collection 存储着对象的集合,而 Map 存储着键值对(两个对象)的映射表。 如图: 面试官追问:说说集合有哪些类及他们各自的区别和特点? S…...

算法【双向广搜】

双向广搜常见用途 1:小优化。bfs的剪枝策略,分两侧展开分支,哪侧数量少就从哪侧展开。 2:用于解决特征很明显的一类问题。特征:全量样本不允许递归完全展开,但是半量样本可以完全展开。过程:把…...

javascript检测数据类型的方法

1. typeof 运算符 typeof是一个用来检测变量的基本数据类型的运算符。它可以返回以下几种类型的字符串:“undefined”、“boolean”、“number”、“string”、“object”、“function” 和 “symbol”(ES6)。需要注意的是,对于 n…...

生信初学者教程(五):R语言基础

文章目录 数据类型整型逻辑型字符型日期型数值型复杂数数据结构向量矩阵数组列表因子数据框ts特殊值缺失值 (NA)无穷大 (Inf)非数字 (NaN)安装R包学习材料R语言是一种用于统计计算和图形展示的编程语言和软件环境,广泛应用于数据分析、统计建模和数据可视化。1991年:R语言的最…...

深度学习计算

一、层和块 块可以描述单个层、多个层组成的组件或整个模型。 通过定义块,组装块,可以实现复杂的神经网络。 一个块可以由多个class组成。 其实就是 自己定义神经网络net,自己定义层的顺序和具体的init、 forward函数。 层和块的顺序由sequen…...

Hexo博客私有部署Twikoo评论系统并迁移评论记录(自定义邮件回复模板)

部署 之前一直使用的artalk,现在想改用Twikoo,采用私有部署的方式。 私有部署 (Docker) 端口可以根据实际情况进行修改 docker run --name twikoo -e TWIKOO_THROTTLE1000 -p 8100:8100 -v ${PWD}/data:/app/data -e TWIKOO_PORT8100 -d imaegoo/twi…...

Vue.js 与 Flask/Django 后端配合:构建现代 Web 应用的最佳实践

Vue.js 与 Flask/Django 后端配合:构建现代 Web 应用的最佳实践 在现代 Web 开发中,前后端分离的架构已经成为主流。Vue.js 作为一个渐进式 JavaScript 框架,因其灵活性和易用性而广受欢迎。而 Flask 和 Django 则是 Python 生态中两个非常流…...

【笔记】自动驾驶预测与决策规划_Part3_路径与轨迹规划

文章目录 0. 前言1. 基于搜索的路径规划1.1 A* 算法1.2 Hybrid A* 算法 2. 基于采样的路径规划2.1 Frent Frame方法2.2 Cartesian →Frent 1D ( x , y ) (x, y) (x,y) —> ( s , l ) (s, l) (s,l)2.3 Cartesian →Frent 3D2.4 贝尔曼Bellman最优性原理2.5 高速轨迹采样——…...

Shiro-721—漏洞分析(CVE-2019-12422)

文章目录 Padding Oracle Attack 原理PKCS5填充怎么爆破攻击 漏洞原理源码分析漏洞复现 本文基于shiro550漏洞基础上分析,建议先看上期内容: https://blog.csdn.net/weixin_60521036/article/details/142373353 Padding Oracle Attack 原理 网上看了很多…...

【Python语言初识(一)】

一、python简史 1.1、python的历史 1989年圣诞节:Guido von Rossum开始写Python语言的编译器。1991年2月:第一个Python编译器(同时也是解释器)诞生,它是用C语言实现的(后面),可以调…...

Python 中的方法解析顺序(MRO)

在 Python 中,MRO(Method Resolution Order,方法解析顺序)是指类继承体系中,Python 如何确定在调用方法时的解析顺序。MRO 决定了在多继承环境下,Python 如何寻找方法或属性,即它会根据一定规则…...

MySQL表的内外连接

内连接 其实就是from 两个表 把笛卡尔积的表 再用where 进行条件筛选 ——之前我们写的多表查询就是内连接 基本格式: 外链接 没有向内连接那样真的把两个表连接形式一个表显示,而只是建立关系 外连接分为左链接和右链接 左链接 联合查询时候&#…...

系统架构设计师:软件架构的演化和维护

简简单单 Online zuozuo: 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo :本心、输入输出、结果 简简单单 Online zuozuo : 文章目录 系统架构设计师:软件架构的演化和维护前言软件架构演化的重要性面向对象的软件架构演…...

QT的dropEvent函数进入不了

在使用QT想实现拖拽功能的时候,发现了dropEvent没有调用运行,遂查找原因: 首先是网上都说要在dragEnterEvent里面使用event->accept(); 但我这边在出现问题之前就已经这样做了: void CanvasView::dragEnterEvent(QDragEnterEv…...

Spring Boot 入门

前言 Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级的基于 Spring 的应用程序。它简化了基于 Spring 的应用开发,通过提供一系列的“起步依赖”来快速启动和运行 Spring 应用。本文将带你深入了解 Spring Boot 的核心概念、关键特性&am…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...

在rocky linux 9.5上在线安装 docker

前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes&#xff0…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...

Java入门学习详细版(一)

大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如&#xff1a…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

怎么让Comfyui导出的图像不包含工作流信息,

为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐)​​ 在 save_images 方法中,​​删除或注释掉所有与 metadata …...