Python 中的 Kombu 类库
Kombu
是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu
支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功能。安装命令 pip install kombu redis
。
一.主要功能
1.消息队列
提供可靠的消息传递和队列机制,允许将消息从生产者发送到消费者。
2.消息代理支持
支持多种消息代理,如 RabbitMQ、Redis、Amazon SQS、MongoDB 等。
3.异步任务
可以用来实现异步任务处理,配合 Celery 使用时,可以构建分布式任务队列。
4.消息格式
支持多种消息格式,包括 JSON、YAML、pickle 等。
5.路由和交换
提供了高级的消息路由和交换功能,可以实现复杂的消息分发逻辑。
二.基本使用
1. 创建消息生产者
生产者负责向消息队列发送消息。
(1)Redis 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")
(2)RabbitMQ 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({'key': 'value'},exchange=exchange,routing_key='my_key',serializer='json')print("Message sent.")
2. 创建消息消费者
消费者从消息队列中接收和处理消息。
(1)Redis 消息代理
from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'def callback(body, message):print(f"Received message: {body}")message.ack() # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()
(2)RabbitMQ 消息代理
from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'def callback(body, message):print(f"Received message: {body}")message.ack() # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange = Exchange('my_exchange', type='direct')queue = Queue('my_queue', exchange, routing_key='my_key')# 创建消费者with Consumer(conn, [queue], callback=callback) as consumer:print("Waiting for messages...")# 运行消费者,等待消息while True:conn.drain_events()
3. 高级用法:消息路由
Kombu
支持复杂的消息路由配置,以下示例展示了如何使用路由功能将消息发送到不同的队列。
(1)Redis 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")
(2)RabbitMQ 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')def route_message(message):if message['type'] == 'type1':return 'key1'return 'key2'# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({'type': 'type1', 'data': 'value1'},exchange=exchange,routing_key=route_message({'type': 'type1'}),serializer='json')print("Message routed and sent.")
4. 结合 Celery 使用
Kombu
通常与 Celery 一起使用来处理异步任务。简单理解,Kombu 是 Celery 的依赖库,Celery 需要 Kombu 来访问消息队列系统。同时 Celery 扩展了 Kombu 的功能,提供了一个高级的任务队列系统。Celery 使用 Kombu 来处理与消息代理之间的连接、消息发送、消息接收等操作。
(1)Redis 消息代理
from celery import Celery# 配置 Celery 使用 Redis 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
在 Dify 中默认消息代理使用 Redis,如下所示:
(2)RabbitMQ 消息代理
from celery import Celery# 配置 Celery 使用 RabbitMQ 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='amqp://guest:guest@localhost//')@app.task
def add(x, y):return x + y
Kombu
是一个强大的消息传递库,提供了多种消息代理的支持,并能实现复杂的消息队列和路由功能。它支持多种消息格式和高级功能,如交换机、队列、路由等。基础用法 包括创建生产者和消费者,通过消息代理发送和接收消息。高级用法 包括消息路由、与 Celery 集成等,用于构建分布式系统和异步任务处理。
参考文献
[1] https://github.com/celery/kombu
[2] https://docs.celeryq.dev/projects/kombu/en/stable/
[3] 消息队列 Kombu 之 基本架构:https://www.cnblogs.com/rossiXYZ/p/14454761.html
[4] Kombu 库用法详解(连接、连接池、生产者、消费者):https://blog.csdn.net/weixin_44799217/article/details/128490325
NLP工程化(星球号)
相关文章:

Python 中的 Kombu 类库
Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功…...
safepoint是什么?有什么用?
在JVM中,safepoint(安全点)是一个非常重要的概念,特别是在垃圾回收(GC)和其他需要暂停所有应用线程的操作中。 什么是safepoint Safepoint是JVM执行过程中一个特定的位置,在这个位置上&#x…...
axios相关知识点
一、基本概念 1、基于Promise:Axios通过Promise实现异步请求,避免了传统回调函数导致的“回调地狱”问题,使得代码更加清晰和易于维护。 2、跨平台:Axios既可以在浏览器中运行,也可以在Node.js环境中使用,为前后端开…...
LeetCode 面试经典150题 67.二进制求和
415.字符串相加 思路一模一样 题目:给你两个二进制字符串 a 和 b ,以二进制字符串的形式返回它们的和。 eg: 输入a“1010” b“1011” 输出“10101” 思路:从右开始遍历两个字符串,因为右边是低位先运算。如果…...

Dell PowerEdge 网络恢复笔记
我有一台Dell的PowerEdge服务器,之前安装了Ubuntu 20 桌面版。突然有一天不能开机了。 故障排查 Disk Error 首先是看一下机器的正面,有一个非常小的液晶显示器,只能显示一排字。 上面显示Disk Error,然后看挂载的硬盘仓&#…...

Java面试——集合篇
1.Java中常用的容器有哪些? 容器主要包括 Collection 和 Map 两种,Collection 存储着对象的集合,而 Map 存储着键值对(两个对象)的映射表。 如图: 面试官追问:说说集合有哪些类及他们各自的区别和特点? S…...
算法【双向广搜】
双向广搜常见用途 1:小优化。bfs的剪枝策略,分两侧展开分支,哪侧数量少就从哪侧展开。 2:用于解决特征很明显的一类问题。特征:全量样本不允许递归完全展开,但是半量样本可以完全展开。过程:把…...
javascript检测数据类型的方法
1. typeof 运算符 typeof是一个用来检测变量的基本数据类型的运算符。它可以返回以下几种类型的字符串:“undefined”、“boolean”、“number”、“string”、“object”、“function” 和 “symbol”(ES6)。需要注意的是,对于 n…...

生信初学者教程(五):R语言基础
文章目录 数据类型整型逻辑型字符型日期型数值型复杂数数据结构向量矩阵数组列表因子数据框ts特殊值缺失值 (NA)无穷大 (Inf)非数字 (NaN)安装R包学习材料R语言是一种用于统计计算和图形展示的编程语言和软件环境,广泛应用于数据分析、统计建模和数据可视化。1991年:R语言的最…...
深度学习计算
一、层和块 块可以描述单个层、多个层组成的组件或整个模型。 通过定义块,组装块,可以实现复杂的神经网络。 一个块可以由多个class组成。 其实就是 自己定义神经网络net,自己定义层的顺序和具体的init、 forward函数。 层和块的顺序由sequen…...

Hexo博客私有部署Twikoo评论系统并迁移评论记录(自定义邮件回复模板)
部署 之前一直使用的artalk,现在想改用Twikoo,采用私有部署的方式。 私有部署 (Docker) 端口可以根据实际情况进行修改 docker run --name twikoo -e TWIKOO_THROTTLE1000 -p 8100:8100 -v ${PWD}/data:/app/data -e TWIKOO_PORT8100 -d imaegoo/twi…...
Vue.js 与 Flask/Django 后端配合:构建现代 Web 应用的最佳实践
Vue.js 与 Flask/Django 后端配合:构建现代 Web 应用的最佳实践 在现代 Web 开发中,前后端分离的架构已经成为主流。Vue.js 作为一个渐进式 JavaScript 框架,因其灵活性和易用性而广受欢迎。而 Flask 和 Django 则是 Python 生态中两个非常流…...

【笔记】自动驾驶预测与决策规划_Part3_路径与轨迹规划
文章目录 0. 前言1. 基于搜索的路径规划1.1 A* 算法1.2 Hybrid A* 算法 2. 基于采样的路径规划2.1 Frent Frame方法2.2 Cartesian →Frent 1D ( x , y ) (x, y) (x,y) —> ( s , l ) (s, l) (s,l)2.3 Cartesian →Frent 3D2.4 贝尔曼Bellman最优性原理2.5 高速轨迹采样——…...

Shiro-721—漏洞分析(CVE-2019-12422)
文章目录 Padding Oracle Attack 原理PKCS5填充怎么爆破攻击 漏洞原理源码分析漏洞复现 本文基于shiro550漏洞基础上分析,建议先看上期内容: https://blog.csdn.net/weixin_60521036/article/details/142373353 Padding Oracle Attack 原理 网上看了很多…...

【Python语言初识(一)】
一、python简史 1.1、python的历史 1989年圣诞节:Guido von Rossum开始写Python语言的编译器。1991年2月:第一个Python编译器(同时也是解释器)诞生,它是用C语言实现的(后面),可以调…...

Python 中的方法解析顺序(MRO)
在 Python 中,MRO(Method Resolution Order,方法解析顺序)是指类继承体系中,Python 如何确定在调用方法时的解析顺序。MRO 决定了在多继承环境下,Python 如何寻找方法或属性,即它会根据一定规则…...

MySQL表的内外连接
内连接 其实就是from 两个表 把笛卡尔积的表 再用where 进行条件筛选 ——之前我们写的多表查询就是内连接 基本格式: 外链接 没有向内连接那样真的把两个表连接形式一个表显示,而只是建立关系 外连接分为左链接和右链接 左链接 联合查询时候&#…...
系统架构设计师:软件架构的演化和维护
简简单单 Online zuozuo: 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo 简简单单 Online zuozuo :本心、输入输出、结果 简简单单 Online zuozuo : 文章目录 系统架构设计师:软件架构的演化和维护前言软件架构演化的重要性面向对象的软件架构演…...
QT的dropEvent函数进入不了
在使用QT想实现拖拽功能的时候,发现了dropEvent没有调用运行,遂查找原因: 首先是网上都说要在dragEnterEvent里面使用event->accept(); 但我这边在出现问题之前就已经这样做了: void CanvasView::dragEnterEvent(QDragEnterEv…...

Spring Boot 入门
前言 Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级的基于 Spring 的应用程序。它简化了基于 Spring 的应用开发,通过提供一系列的“起步依赖”来快速启动和运行 Spring 应用。本文将带你深入了解 Spring Boot 的核心概念、关键特性&am…...
LDD学习2--Scull(TODO)
《Linux Device Drivers》(LDD)书籍中的 scull(Simple Character Utility for Loading Localities)是一个用于演示 Linux 字符设备驱动程序编写的示例代码。它为理解 Linux 内核模块和字符设备驱动程序的编写提供了基础实践平台&a…...
【算法-堆排序】
堆排序是一种基于堆这种数据结构的比较排序算法,它是一种原地、不稳定的排序算法,时间复杂度为 O(n log n)。堆排序的基本思想是将数组构建成一个二叉堆,并通过反复调整堆顶和未排序部分之间的关系来实现排序。 堆的定义 堆是一种特殊的完全…...

音视频入门基础:AAC专题(4)——ADTS格式的AAC裸流实例分析
音视频入门基础:AAC专题系列文章: 音视频入门基础:AAC专题(1)——AAC官方文档下载 音视频入门基础:AAC专题(2)——使用FFmpeg命令生成AAC裸流文件 音视频入门基础:AAC…...

【第33章】Spring Cloud之SkyWalking服务链路追踪
文章目录 前言一、介绍1. 架构图2. SkyWalking APM 二、服务端和控制台1. 下载2. 解压3. 初始化数据库4. 增加驱动5. 修改后端配置6. 启动7. 访问控制台8. 数据库表 三、客户端1. 下载2. 设置java代理3. idea配置3.1 环境变量3.2 JVM参数3.3 启动日志 4. 启用网关插件 四、链路…...

如何选择OS--Linux不同Distribution的选用
写在前言: 刚写了Windows PC的不同editions的选用,趁热,把Linux不同的Distribution选用也介绍下,希望童鞋们可以了解-->理解-->深入了解-->深入理解--...以致于能掌握特定版本的Linux的使用甚者精通。……^.^…… so&a…...
cesium效果不酷炫怎么办--增加渲染器
DrawCommand 可以发挥 WebGL 全部潜力吗? 回答: Cesium 的 DrawCommand 是一个用于表示 WebGL 渲染管线中单个绘制调用的低级抽象。它封装了执行 WebGL 绘制所需的所有信息,包括着色器程序、顶点数组、渲染状态、统一变量(unifo…...

计算机网络:概述 --- 体系结构
目录 一. 体系结构总览 1.1 OSI七层协议体系结构 1.2 TCP/IP四层(或五层)模型结构 二. 数据传输过程 2.1 同网段传输 2.2 跨网段传输 三. 体系结构相关概念 3.1 实体 3.2 协议 3.3 服务 这里我们专门来讲一下计算机网络中的体系结构。其实我们之前…...

DEPLOT: One-shot visual language reasoning by plot-to-table translation论文阅读
文章链接:https://arxiv.org/abs/2308.01979http://arxiv.org/abs/2212.10505https://arxiv.org/abs/2308.01979 源码链接:https://github.com/cse-ai-lab/RealCQA 启发:two-stage方法可能是未来主要研究方向,能够增强模型可解释…...

从 HDFS 迁移到 MinIO 企业对象存储
云原生、面向 Kubernetes 、基于微服务的架构推动了对 MinIO 等网络存储的需求。在云原生环境中,对象存储的优势很多 - 它允许独立于存储硬件对计算硬件进行弹性扩展。它使应用程序无状态,因为状态是通过网络存储的,并且通过降低操作复杂性&a…...
Rust 常见问题汇总
问题1: cargo build 一直提示Blocking waiting for file lock on package cache。 在 cargo.toml 文件中添加了依赖之后,运行 cargo build 命令时,如果卡在 blocking waiting for file lock on package cache lock 这里, 后来发…...