Python 中的 Kombu 类库
Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功能。安装命令 pip install kombu redis。
一.主要功能
1.消息队列
提供可靠的消息传递和队列机制,允许将消息从生产者发送到消费者。
2.消息代理支持
支持多种消息代理,如 RabbitMQ、Redis、Amazon SQS、MongoDB 等。
3.异步任务
可以用来实现异步任务处理,配合 Celery 使用时,可以构建分布式任务队列。
4.消息格式
支持多种消息格式,包括 JSON、YAML、pickle 等。
5.路由和交换
提供了高级的消息路由和交换功能,可以实现复杂的消息分发逻辑。
二.基本使用
1. 创建消息生产者
生产者负责向消息队列发送消息。
(1)Redis 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")
(2)RabbitMQ 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")
2. 创建消息消费者
消费者从消息队列中接收和处理消息。
(1)Redis 消息代理
from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'def callback(body, message):print(f"Received message: {body}")message.ack() # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()
(2)RabbitMQ 消息代理
from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'def callback(body, message):print(f"Received message: {body}")message.ack() # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()
3. 高级用法:消息路由
Kombu 支持复杂的消息路由配置,以下示例展示了如何使用路由功能将消息发送到不同的队列。
(1)Redis 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")
(2)RabbitMQ 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")
4. 结合 Celery 使用
Kombu 通常与 Celery 一起使用来处理异步任务。简单理解,Kombu 是 Celery 的依赖库,Celery 需要 Kombu 来访问消息队列系统。同时 Celery 扩展了 Kombu 的功能,提供了一个高级的任务队列系统。Celery 使用 Kombu 来处理与消息代理之间的连接、消息发送、消息接收等操作。
(1)Redis 消息代理
from celery import Celery# 配置 Celery 使用 Redis 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
在 Dify 中默认消息代理使用 Redis,如下所示:


(2)RabbitMQ 消息代理
from celery import Celery# 配置 Celery 使用 RabbitMQ 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='amqp://guest:guest@localhost//')@app.task
def add(x, y):return x + y
Kombu 是一个强大的消息传递库,提供了多种消息代理的支持,并能实现复杂的消息队列和路由功能。它支持多种消息格式和高级功能,如交换机、队列、路由等。基础用法 包括创建生产者和消费者,通过消息代理发送和接收消息。高级用法 包括消息路由、与 Celery 集成等,用于构建分布式系统和异步任务处理。
参考文献
[1] https://github.com/celery/kombu
[2] https://docs.celeryq.dev/projects/kombu/en/stable/
[3] 消息队列 Kombu 之 基本架构:https://www.cnblogs.com/rossiXYZ/p/14454761.html
[4] Kombu 库用法详解(连接、连接池、生产者、消费者):https://blog.csdn.net/weixin_44799217/article/details/128490325
NLP工程化(星球号)

相关文章:
Python 中的 Kombu 类库
Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功…...
safepoint是什么?有什么用?
在JVM中,safepoint(安全点)是一个非常重要的概念,特别是在垃圾回收(GC)和其他需要暂停所有应用线程的操作中。 什么是safepoint Safepoint是JVM执行过程中一个特定的位置,在这个位置上&#x…...
axios相关知识点
一、基本概念 1、基于Promise:Axios通过Promise实现异步请求,避免了传统回调函数导致的“回调地狱”问题,使得代码更加清晰和易于维护。 2、跨平台:Axios既可以在浏览器中运行,也可以在Node.js环境中使用,为前后端开…...
LeetCode 面试经典150题 67.二进制求和
415.字符串相加 思路一模一样 题目:给你两个二进制字符串 a 和 b ,以二进制字符串的形式返回它们的和。 eg: 输入a“1010” b“1011” 输出“10101” 思路:从右开始遍历两个字符串,因为右边是低位先运算。如果…...
Dell PowerEdge 网络恢复笔记
我有一台Dell的PowerEdge服务器,之前安装了Ubuntu 20 桌面版。突然有一天不能开机了。 故障排查 Disk Error 首先是看一下机器的正面,有一个非常小的液晶显示器,只能显示一排字。 上面显示Disk Error,然后看挂载的硬盘仓&#…...
Java面试——集合篇
1.Java中常用的容器有哪些? 容器主要包括 Collection 和 Map 两种,Collection 存储着对象的集合,而 Map 存储着键值对(两个对象)的映射表。 如图: 面试官追问:说说集合有哪些类及他们各自的区别和特点? S…...
算法【双向广搜】
双向广搜常见用途 1:小优化。bfs的剪枝策略,分两侧展开分支,哪侧数量少就从哪侧展开。 2:用于解决特征很明显的一类问题。特征:全量样本不允许递归完全展开,但是半量样本可以完全展开。过程:把…...
javascript检测数据类型的方法
1. typeof 运算符 typeof是一个用来检测变量的基本数据类型的运算符。它可以返回以下几种类型的字符串:“undefined”、“boolean”、“number”、“string”、“object”、“function” 和 “symbol”(ES6)。需要注意的是,对于 n…...
生信初学者教程(五):R语言基础
文章目录 数据类型整型逻辑型字符型日期型数值型复杂数数据结构向量矩阵数组列表因子数据框ts特殊值缺失值 (NA)无穷大 (Inf)非数字 (NaN)安装R包学习材料R语言是一种用于统计计算和图形展示的编程语言和软件环境,广泛应用于数据分析、统计建模和数据可视化。1991年:R语言的最…...
深度学习计算
一、层和块 块可以描述单个层、多个层组成的组件或整个模型。 通过定义块,组装块,可以实现复杂的神经网络。 一个块可以由多个class组成。 其实就是 自己定义神经网络net,自己定义层的顺序和具体的init、 forward函数。 层和块的顺序由sequen…...
Hexo博客私有部署Twikoo评论系统并迁移评论记录(自定义邮件回复模板)
部署 之前一直使用的artalk,现在想改用Twikoo,采用私有部署的方式。 私有部署 (Docker) 端口可以根据实际情况进行修改 docker run --name twikoo -e TWIKOO_THROTTLE1000 -p 8100:8100 -v ${PWD}/data:/app/data -e TWIKOO_PORT8100 -d imaegoo/twi…...
Vue.js 与 Flask/Django 后端配合:构建现代 Web 应用的最佳实践
Vue.js 与 Flask/Django 后端配合:构建现代 Web 应用的最佳实践 在现代 Web 开发中,前后端分离的架构已经成为主流。Vue.js 作为一个渐进式 JavaScript 框架,因其灵活性和易用性而广受欢迎。而 Flask 和 Django 则是 Python 生态中两个非常流…...
【笔记】自动驾驶预测与决策规划_Part3_路径与轨迹规划
文章目录 0. 前言1. 基于搜索的路径规划1.1 A* 算法1.2 Hybrid A* 算法 2. 基于采样的路径规划2.1 Frent Frame方法2.2 Cartesian →Frent 1D ( x , y ) (x, y) (x,y) —> ( s , l ) (s, l) (s,l)2.3 Cartesian →Frent 3D2.4 贝尔曼Bellman最优性原理2.5 高速轨迹采样——…...
Shiro-721—漏洞分析(CVE-2019-12422)
文章目录 Padding Oracle Attack 原理PKCS5填充怎么爆破攻击 漏洞原理源码分析漏洞复现 本文基于shiro550漏洞基础上分析,建议先看上期内容: https://blog.csdn.net/weixin_60521036/article/details/142373353 Padding Oracle Attack 原理 网上看了很多…...
【Python语言初识(一)】
一、python简史 1.1、python的历史 1989年圣诞节:Guido von Rossum开始写Python语言的编译器。1991年2月:第一个Python编译器(同时也是解释器)诞生,它是用C语言实现的(后面),可以调…...
Python 中的方法解析顺序(MRO)
在 Python 中,MRO(Method Resolution Order,方法解析顺序)是指类继承体系中,Python 如何确定在调用方法时的解析顺序。MRO 决定了在多继承环境下,Python 如何寻找方法或属性,即它会根据一定规则…...
MySQL表的内外连接
内连接 其实就是from 两个表 把笛卡尔积的表 再用where 进行条件筛选 ——之前我们写的多表查询就是内连接 基本格式: 外链接 没有向内连接那样真的把两个表连接形式一个表显示,而只是建立关系 外连接分为左链接和右链接 左链接 联合查询时候&#…...
系统架构设计师:软件架构的演化和维护
简简单单 Online zuozuo: 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo :本心、输入输出、结果 简简单单 Online zuozuo : 文章目录 系统架构设计师:软件架构的演化和维护前言软件架构演化的重要性面向对象的软件架构演…...
QT的dropEvent函数进入不了
在使用QT想实现拖拽功能的时候,发现了dropEvent没有调用运行,遂查找原因: 首先是网上都说要在dragEnterEvent里面使用event->accept(); 但我这边在出现问题之前就已经这样做了: void CanvasView::dragEnterEvent(QDragEnterEv…...
Spring Boot 入门
前言 Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级的基于 Spring 的应用程序。它简化了基于 Spring 的应用开发,通过提供一系列的“起步依赖”来快速启动和运行 Spring 应用。本文将带你深入了解 Spring Boot 的核心概念、关键特性&am…...
Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...
为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?
在建筑行业,项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升,传统的管理模式已经难以满足现代工程的需求。过去,许多企业依赖手工记录、口头沟通和分散的信息管理,导致效率低下、成本失控、风险频发。例如&#…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...
3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...
Python Ovito统计金刚石结构数量
大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
CSS | transition 和 transform的用处和区别
省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...
Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...
NPOI操作EXCEL文件 ——CAD C# 二次开发
缺点:dll.版本容易加载错误。CAD加载插件时,没有加载所有类库。插件运行过程中用到某个类库,会从CAD的安装目录找,找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库,就用插件程序加载进…...
