解决 Python RabbitMQ/Pika 报错:pop from an empty deque
使用 python 的 pika 包连接rabbitmq,代码如下:
import pika
import threading
import timedef on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagprint(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")channel.basic_ack(delivery_tag)credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()
执行结果:
(Test) [user@user test]$ python mq1.py
main thread id: 140682766104384
on_message thread id: 140682766104384
b'1' start
0
1
2
3
4
5
6
7
8
9
b'1' end
Traceback (most recent call last):File "mq1.py", line 38, in <module>channel.start_consuming()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consumingself._process_data_events(time_limit=None)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_eventsself.connection.process_data_events(time_limit=time_limit)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 833, in process_data_eventsself._dispatch_channel_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_eventsimpl_channel._get_cookie()._dispatch_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1492, in _dispatch_eventsconsumer_info.on_message_callback(self, evt.method,File "mq1.py", line 24, in on_messagechannel.basic_ack(delivery_tag)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2112, in basic_ackself._flush_output()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_outputself._connection._flush_output(lambda: self.is_closed, *waiters)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_outputraise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
从结果来看,异常发生在一次长时间的消费过程(200s)完成后报错,具体为调用channel.basic_ack(delivery_tag)发生报错;推测是此时与MQ Server的连接已经被重置ConnectionResetError(104, 'Connection reset by peer'),此时再主动确认就发生报错。
正确解决方案如下:
"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
"""
import functools
import pika
import threading
import timedef ack_message(channel, delivery_tag):print(f'ack_message thread id: {threading.get_ident()}')if channel.is_open:channel.basic_ack(delivery_tag)else:# Channel is already closed, so we can't ACK this message;# log and/or do something that makes sense for your app in this case.passdef do_work(channel, delivery_tag, body):print(f'do_work thread id: {threading.get_ident()}')print(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")cb = functools.partial(ack_message, channel, delivery_tag)channel.connection.add_callback_threadsafe(cb)def on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagt = threading.Thread(target=do_work, args=(channel, delivery_tag, body))t.start()credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()
思路是pika
是线程不安全的,所以在接收消息和ACK响应消息时需要另外线程。
相关文章:
解决 Python RabbitMQ/Pika 报错:pop from an empty deque
使用 python 的 pika 包连接rabbitmq,代码如下: import pika import threading import timedef on_message(channel, method_frame, header_frame, body):print(fon_message thread id: {threading.get_ident()})delivery_tag method_frame.delivery_t…...

观察者模式实战
场景 假设创建订单后需要发短信、发邮件等其它的操作,放在业务逻辑会使代码非常臃肿,可以使用观察者模式优化代码 代码实现 自定义一个事件 发送邮件 发送短信 最后再创建订单的业务逻辑进行监听,创建订单 假设后面还需要做其它的…...

035_小驰私房菜_Qualcomm账号注册以及提case流程
全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 一、账号注册 1)登陆高通网站Wireless Technology & Innovation | Mobile Technology | Qualcomm, 采用…...

uniapp input输入框placeholder文本右对齐
input输入框placeholder文本右对齐 给input标签加上placeholder-class,这个是给placeholder设置样式,右对齐这就是text-align:right;字体颜色之类依次编辑即可。...

分布式监控平台—zabbix
前言一、zabbix概述1.1 什么是zabbix1.2 zabbix的监控原理1.3 zabbix常见五个应用程序1.4 zabbix的监控模式1.5 监控架构1.5.1 C/S(server—client)1.5.2 server—proxy—client1.5.3 master—node—client 二、部署zabbix2.1 部署 zabbix server 端2.2 …...
【leetcode】第一章数组-2
977. 有序数组的平方 简单的方法是平方后使用排序方法第2种方法是双指针方法,从两边进行判断,将最大的从后往前放 public static int[] sortedSquares(int[] nums) {// 输入:nums [-4,-1,0,3,10]// 输出:[0,1,9,16,100]// 解释…...

程序使用Microsoft.XMLHTTP对象请求https时出错解决
程序中使用Microsoft.XMLHTTP组件请求https时出现如下错误: 出错程序代码示例: strUrl "https://www.xxx.com/xxx.asp?id11" dim objXmlHttp set objXmlHttp Server.CreateObject("Microsoft.XMLHTTP") objXmlHttp.open "…...

Linux安装配置nginx+php搭建
Linux安装配置nginxphp搭建 文章目录 Linux安装配置nginxphp搭建1.nginx源码包编译环境和安装相应的依赖1.1 安装编译环境1.2 安装pcre库、zlib库和openssl库 2.安装nginx2.1 在[nginx官网](https://nginx.org/en/download.html)上获取源码包并进行下载2.2 进行解压编译 3.启动…...
springboot的各种配置
1.AOP配置 <!-- AOP的依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency>package com.qf.HomeWork.aop;import lombok.extern.slf4j.Slf4j; im…...

OSI七层模型及TCP/IP四层模型
目录 OSI七层模型 TCP/IP四层模型 OIS七层模型和TCP/IP模型图 七层详解 两种模型比较 为什么OSI七层体系结构不常用 四层详解 网络为什么要分层? 说说 OSI 七层模型和 TCP/IP 四层模型的关系和区别 OSI七层模型 OSI(Open System Interconnect&a…...
MDN-Web APIs
参考资料 文章目录 简介DOM APIXMLHttpRequestWeb Storage APIWebsockets API 简介 Web APIs(Application Programming Interfaces)是用于与浏览器环境中的 Web 功能进行交互的一组接口和方法集合。通过 Web APIs,开发人员可以访问浏览器提…...

2023国赛数学建模C题思路分析
文章目录 0 赛题思路1 竞赛信息2 竞赛时间3 建模常见问题类型3.1 分类问题3.2 优化问题3.3 预测问题3.4 评价问题 4 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 竞赛信息 全国大学生数学建模…...
暑假集训笔记
刷题刷的好累啊...不想刷题了...然后就来写题解了... 昨天晚上打了场div2..2000来名,加了155分....现在rating1281...我是菜鸡..暑假之前就打到了1200分以上了,结果暑假一掉再掉,直接掉到1100了...然后我就一直压力很大....... 昨天在机房打…...

【枚举+推式子】牛客小白月赛 63 E
登录—专业IT笔试面试备考平台_牛客网 题意: 思路: 首先是个计数问题,考虑组合数学 组合数学就是在考虑枚举所有包含1和n的区间 这个典中典就是枚举1和n的位置然后算贡献 双指针超时,考虑推式子: Code:…...

Android多屏幕支持-Android12
Android多屏幕支持-Android12 1、概览及相关文章2、屏幕窗口配置2.1 配置xml文件2.2 DisplayInfo#uniqueId 屏幕标识2.3 adb查看信息 3、配置文件解析3.1 xml字段读取3.2 简要时序图 4、每屏幕焦点 android12-release 1、概览及相关文章 AOSP > 文档 > 心主题 > 多屏…...
python环境下载安装教程,python运行环境怎么下载
本篇文章给大家谈谈python安装步骤以及环境变量配置,以及下载python需要设置环境变量吗,希望对各位有所帮助,不要忘了收藏本站喔。 1.https://www.python.org/downloads/windows/ 下载适合自己电脑的python安装包 2.下载后安装即可 3.配置环…...

【0.2】lubancat鲁班猫4远程ubuntu22.04.2 无需任何安装
环境 lubancat4鲁班猫4 (4G0)不带emmc 系统镜像ubuntu-22.04.2-desktop-arm64-lubancat-4.img 网络环境:有线网络与本win10电脑同意环境 操作步骤ubuntu正常开机登陆用户,连接好网络进入设置>网络查看设备当前局域网IP 如192.168.199.159进入设置>共享>远程…...
Flutter 状态管理 Provider
状态管理必要性 Flutter基于声明式构建UI,原生则是命令式,状态管理是用于解决声明式开发带来的问题。 例:命令式的原生,数据更新需要拿到对应控件并更改其显示值;而声明式则需要更改数据值并通过setstate更新状态&am…...

【设计模式】观察者模式
什么是观察者模式? 观察者模式(又被称为发布-订阅(Publish/Subscribe)模式,属于行为型模式的一种,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态…...

ORCA优化器浅析——CDXLOperator Base class for operators in a DXL tree
如上图所示,CDXLOperator作为Base class for operators in a DXL tree,其子类CDXLLogical、CDXLScalar、CDXLPhysical作为逻辑节点、物理节点和Scalar节点的DXL表示类,因此其包含了这些类的共同部分特性,比如获取其DXL节点表示的函…...
打卡第47天
作业:对比不同卷积层热图可视化的结果 核心差异总结 浅层卷积层(如第 1-3 层) 关注细节:聚焦输入图像的边缘、纹理、颜色块等基础特征(例:猫脸的胡须边缘、树叶的脉络)。热图特点:区…...

agent 开发
什么是 agent? Agent智能体(又称AI Agent)是一种具备自主感知、决策与行动能力的智能系统,其核心在于模仿人类的认知过程来处理复杂任务。以下是其关键特性和发展现状的综合分析: 一、核心定义与特征 ### 自主决策…...
结构体和指针1
#include <iostream> using namespace std; #include <string> struct Student{ int age; string name; double score; }; int main() { //静态分配 Student s1 {18,"小明",88.5}; //cout << s1.name<<"的成绩为…...

STM32H562----------ADC外设详解
1、ADC 简介 STM32H5xx 系列有 2 个 ADC,都可以独立工作,其中 ADC1 和 ADC2 还可以组成双模式(提高采样率)。每个 ADC 最多可以有 20 个复用通道。这些 ADC 外设与 AHB 总线相连。 STM32H5xx 的 ADC 模块主要有如下几个特性: 1、可配置 12 位、10 位、8 位、6 位分辨率,…...
数据治理在制造业的实践案例
一、数据治理在制造业的重要性 随着工业4.0的到来,制造业正经历着前所未有的变革。数据治理作为制造业数字化转型的关键组成部分,对提升企业竞争力、优化生产流程、提高产品质量和客户满意度等方面起着至关重要的作用。在制造业中,数据治理不仅涉及到数据的收集、存…...

智能制造数字孪生全要素交付一张网:智造中枢,孪生领航,共建智造生态共同体
在制造业转型升级的浪潮中,数字孪生技术正成为推动行业变革的核心引擎。从特斯拉通过数字孪生体实现车辆全生命周期优化,到海尔卡奥斯工业互联网平台赋能千行百业,数字孪生技术已从概念验证走向规模化落地。通过构建覆盖全国的交付网络&#…...

Leetcode 1892. 页面推荐Ⅱ
1.题目基本信息 1.1.题目描述 表: Friendship ---------------------- | Column Name | Type | ---------------------- | user1_id | int | | user2_id | int | ---------------------- (user1_id,user2_id) 是 Friendship 表的主键(具有唯一值的列的组合…...
Python实现快速排序的三种经典写法及算法解析
今天想熟悉一下python的基础写法,那就从最经典的快速排序来开始吧: 1、经典分治写法(原地排序) 时间复杂度:平均O(nlogn),最坏O(n) 空间复杂度:O(logn)递归栈空间 特点:通过左右指针…...

v1.0.1版本更新·2025年5月22日发布-优雅草星云物联网AI智控系统
v1.0.1版本更新2025年5月22日发布-优雅草星云物联网AI智控系统 开源地址 星云智控官网: 优雅草星云物联网AI智控软件-移动端vue: 优雅草星云物联网AI智控软件-移动端vue 星云智控PC端开源: 优雅草星云物联网AI智控软件-PC端vue: 优雅草星云物联网AI…...

大量企业系统超龄服役!R²AIN SUITE 一体化企业提效解决方案重构零售数智化基因
《中国百货商业协会2024零售IT及数字化系统需求调查报告》为我们呈现了零售企业在数字化转型中的复杂图景。数据显示,82%的企业高管对AI改变行业未来充满信心 source:中国百货商业协会 ,零售IT及数字化系统需求调查报告 ,2024年 但…...