当前位置: 首页 > news >正文

解决 Python RabbitMQ/Pika 报错:pop from an empty deque

使用 python 的 pika 包连接rabbitmq,代码如下:

import pika
import threading
import timedef on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagprint(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")channel.basic_ack(delivery_tag)credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()

执行结果:

(Test) [user@user test]$ python mq1.py 
main thread id: 140682766104384
on_message thread id: 140682766104384
b'1' start
0
1
2
3
4
5
6
7
8
9
b'1' end
Traceback (most recent call last):File "mq1.py", line 38, in <module>channel.start_consuming()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consumingself._process_data_events(time_limit=None)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_eventsself.connection.process_data_events(time_limit=time_limit)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 833, in process_data_eventsself._dispatch_channel_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_eventsimpl_channel._get_cookie()._dispatch_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1492, in _dispatch_eventsconsumer_info.on_message_callback(self, evt.method,File "mq1.py", line 24, in on_messagechannel.basic_ack(delivery_tag)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2112, in basic_ackself._flush_output()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_outputself._connection._flush_output(lambda: self.is_closed, *waiters)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_outputraise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

 从结果来看,异常发生在一次长时间的消费过程(200s)完成后报错,具体为调用channel.basic_ack(delivery_tag)发生报错;推测是此时与MQ Server的连接已经被重置ConnectionResetError(104, 'Connection reset by peer'),此时再主动确认就发生报错。

正确解决方案如下:

"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
"""
import functools
import pika
import threading
import timedef ack_message(channel, delivery_tag):print(f'ack_message thread id: {threading.get_ident()}')if channel.is_open:channel.basic_ack(delivery_tag)else:# Channel is already closed, so we can't ACK this message;# log and/or do something that makes sense for your app in this case.passdef do_work(channel, delivery_tag, body):print(f'do_work thread id: {threading.get_ident()}')print(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")cb = functools.partial(ack_message, channel, delivery_tag)channel.connection.add_callback_threadsafe(cb)def on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagt = threading.Thread(target=do_work, args=(channel, delivery_tag, body))t.start()credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()

思路是pika是线程不安全的,所以在接收消息和ACK响应消息时需要另外线程。

相关文章:

解决 Python RabbitMQ/Pika 报错:pop from an empty deque

使用 python 的 pika 包连接rabbitmq&#xff0c;代码如下&#xff1a; import pika import threading import timedef on_message(channel, method_frame, header_frame, body):print(fon_message thread id: {threading.get_ident()})delivery_tag method_frame.delivery_t…...

观察者模式实战

场景 假设创建订单后需要发短信、发邮件等其它的操作&#xff0c;放在业务逻辑会使代码非常臃肿&#xff0c;可以使用观察者模式优化代码 代码实现 自定义一个事件 发送邮件 发送短信 最后再创建订单的业务逻辑进行监听&#xff0c;创建订单 假设后面还需要做其它的…...

035_小驰私房菜_Qualcomm账号注册以及提case流程

全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 一、账号注册 1)登陆高通网站Wireless Technology & Innovation | Mobile Technology | Qualcomm, 采用…...

uniapp input输入框placeholder文本右对齐

input输入框placeholder文本右对齐 给input标签加上placeholder-class&#xff0c;这个是给placeholder设置样式&#xff0c;右对齐这就是text-align:right;字体颜色之类依次编辑即可。...

分布式监控平台—zabbix

前言一、zabbix概述1.1 什么是zabbix1.2 zabbix的监控原理1.3 zabbix常见五个应用程序1.4 zabbix的监控模式1.5 监控架构1.5.1 C/S&#xff08;server—client&#xff09;1.5.2 server—proxy—client1.5.3 master—node—client 二、部署zabbix2.1 部署 zabbix server 端2.2 …...

【leetcode】第一章数组-2

977. 有序数组的平方 简单的方法是平方后使用排序方法第2种方法是双指针方法&#xff0c;从两边进行判断&#xff0c;将最大的从后往前放 public static int[] sortedSquares(int[] nums) {// 输入&#xff1a;nums [-4,-1,0,3,10]// 输出&#xff1a;[0,1,9,16,100]// 解释…...

程序使用Microsoft.XMLHTTP对象请求https时出错解决

程序中使用Microsoft.XMLHTTP组件请求https时出现如下错误&#xff1a; 出错程序代码示例&#xff1a; strUrl "https://www.xxx.com/xxx.asp?id11" dim objXmlHttp set objXmlHttp Server.CreateObject("Microsoft.XMLHTTP") objXmlHttp.open "…...

Linux安装配置nginx+php搭建

Linux安装配置nginxphp搭建 文章目录 Linux安装配置nginxphp搭建1.nginx源码包编译环境和安装相应的依赖1.1 安装编译环境1.2 安装pcre库、zlib库和openssl库 2.安装nginx2.1 在[nginx官网](https://nginx.org/en/download.html)上获取源码包并进行下载2.2 进行解压编译 3.启动…...

springboot的各种配置

1.AOP配置 <!-- AOP的依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency>package com.qf.HomeWork.aop;import lombok.extern.slf4j.Slf4j; im…...

OSI七层模型及TCP/IP四层模型

目录 OSI七层模型 TCP/IP四层模型 OIS七层模型和TCP/IP模型图 七层详解 两种模型比较 为什么OSI七层体系结构不常用 四层详解 网络为什么要分层&#xff1f; 说说 OSI 七层模型和 TCP/IP 四层模型的关系和区别 OSI七层模型 OSI&#xff08;Open System Interconnect&a…...

MDN-Web APIs

参考资料 文章目录 简介DOM APIXMLHttpRequestWeb Storage APIWebsockets API 简介 Web APIs&#xff08;Application Programming Interfaces&#xff09;是用于与浏览器环境中的 Web 功能进行交互的一组接口和方法集合。通过 Web APIs&#xff0c;开发人员可以访问浏览器提…...

2023国赛数学建模C题思路分析

文章目录 0 赛题思路1 竞赛信息2 竞赛时间3 建模常见问题类型3.1 分类问题3.2 优化问题3.3 预测问题3.4 评价问题 4 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 竞赛信息 全国大学生数学建模…...

暑假集训笔记

刷题刷的好累啊...不想刷题了...然后就来写题解了... 昨天晚上打了场div2..2000来名&#xff0c;加了155分....现在rating1281...我是菜鸡..暑假之前就打到了1200分以上了&#xff0c;结果暑假一掉再掉&#xff0c;直接掉到1100了...然后我就一直压力很大....... 昨天在机房打…...

【枚举+推式子】牛客小白月赛 63 E

登录—专业IT笔试面试备考平台_牛客网 题意&#xff1a; 思路&#xff1a; 首先是个计数问题&#xff0c;考虑组合数学 组合数学就是在考虑枚举所有包含1和n的区间 这个典中典就是枚举1和n的位置然后算贡献 双指针超时&#xff0c;考虑推式子&#xff1a; Code&#xff1a…...

Android多屏幕支持-Android12

Android多屏幕支持-Android12 1、概览及相关文章2、屏幕窗口配置2.1 配置xml文件2.2 DisplayInfo#uniqueId 屏幕标识2.3 adb查看信息 3、配置文件解析3.1 xml字段读取3.2 简要时序图 4、每屏幕焦点 android12-release 1、概览及相关文章 AOSP > 文档 > 心主题 > 多屏…...

python环境下载安装教程,python运行环境怎么下载

本篇文章给大家谈谈python安装步骤以及环境变量配置&#xff0c;以及下载python需要设置环境变量吗&#xff0c;希望对各位有所帮助&#xff0c;不要忘了收藏本站喔。 1.https://www.python.org/downloads/windows/ 下载适合自己电脑的python安装包 2.下载后安装即可 3.配置环…...

【0.2】lubancat鲁班猫4远程ubuntu22.04.2 无需任何安装

环境 lubancat4鲁班猫4 (4G0)不带emmc 系统镜像ubuntu-22.04.2-desktop-arm64-lubancat-4.img 网络环境:有线网络与本win10电脑同意环境 操作步骤ubuntu正常开机登陆用户&#xff0c;连接好网络进入设置>网络查看设备当前局域网IP 如192.168.199.159进入设置>共享>远程…...

Flutter 状态管理 Provider

状态管理必要性 Flutter基于声明式构建UI&#xff0c;原生则是命令式&#xff0c;状态管理是用于解决声明式开发带来的问题。 例&#xff1a;命令式的原生&#xff0c;数据更新需要拿到对应控件并更改其显示值&#xff1b;而声明式则需要更改数据值并通过setstate更新状态&am…...

【设计模式】观察者模式

什么是观察者模式&#xff1f; 观察者模式&#xff08;又被称为发布-订阅&#xff08;Publish/Subscribe&#xff09;模式&#xff0c;属于行为型模式的一种&#xff0c;它定义了一种一对多的依赖关系&#xff0c;让多个观察者对象同时监听某一个主题对象。这个主题对象在状态…...

ORCA优化器浅析——CDXLOperator Base class for operators in a DXL tree

如上图所示&#xff0c;CDXLOperator作为Base class for operators in a DXL tree&#xff0c;其子类CDXLLogical、CDXLScalar、CDXLPhysical作为逻辑节点、物理节点和Scalar节点的DXL表示类&#xff0c;因此其包含了这些类的共同部分特性&#xff0c;比如获取其DXL节点表示的函…...

打卡第47天

作业&#xff1a;对比不同卷积层热图可视化的结果 核心差异总结 浅层卷积层&#xff08;如第 1-3 层&#xff09; 关注细节&#xff1a;聚焦输入图像的边缘、纹理、颜色块等基础特征&#xff08;例&#xff1a;猫脸的胡须边缘、树叶的脉络&#xff09;。热图特点&#xff1a;区…...

agent 开发

什么是 agent&#xff1f; Agent智能体&#xff08;又称AI Agent&#xff09;是一种具备自主感知、决策与行动能力的智能系统&#xff0c;其核心在于模仿人类的认知过程来处理复杂任务。以下是其关键特性和发展现状的综合分析&#xff1a; 一、核心定义与特征 #‌## 自主决策…...

结构体和指针1

#include <iostream> using namespace std; #include <string> struct Student{ int age; string name; double score; }; int main() { //静态分配 Student s1 {18,"小明",88.5}; //cout << s1.name<<"的成绩为…...

STM32H562----------ADC外设详解

1、ADC 简介 STM32H5xx 系列有 2 个 ADC,都可以独立工作,其中 ADC1 和 ADC2 还可以组成双模式(提高采样率)。每个 ADC 最多可以有 20 个复用通道。这些 ADC 外设与 AHB 总线相连。 STM32H5xx 的 ADC 模块主要有如下几个特性: 1、可配置 12 位、10 位、8 位、6 位分辨率,…...

数据治理在制造业的实践案例

一、数据治理在制造业的重要性 随着工业4.0的到来,制造业正经历着前所未有的变革。数据治理作为制造业数字化转型的关键组成部分,对提升企业竞争力、优化生产流程、提高产品质量和客户满意度等方面起着至关重要的作用。在制造业中,数据治理不仅涉及到数据的收集、存…...

智能制造数字孪生全要素交付一张网:智造中枢,孪生领航,共建智造生态共同体

在制造业转型升级的浪潮中&#xff0c;数字孪生技术正成为推动行业变革的核心引擎。从特斯拉通过数字孪生体实现车辆全生命周期优化&#xff0c;到海尔卡奥斯工业互联网平台赋能千行百业&#xff0c;数字孪生技术已从概念验证走向规模化落地。通过构建覆盖全国的交付网络&#…...

Leetcode 1892. 页面推荐Ⅱ

1.题目基本信息 1.1.题目描述 表&#xff1a; Friendship ---------------------- | Column Name | Type | ---------------------- | user1_id | int | | user2_id | int | ---------------------- (user1_id,user2_id) 是 Friendship 表的主键(具有唯一值的列的组合…...

Python实现快速排序的三种经典写法及算法解析

今天想熟悉一下python的基础写法&#xff0c;那就从最经典的快速排序来开始吧&#xff1a; 1、经典分治写法&#xff08;原地排序&#xff09; 时间复杂度&#xff1a;平均O(nlogn)&#xff0c;最坏O(n) 空间复杂度&#xff1a;O(logn)递归栈空间 特点&#xff1a;通过左右指针…...

v1.0.1版本更新·2025年5月22日发布-优雅草星云物联网AI智控系统

v1.0.1版本更新2025年5月22日发布-优雅草星云物联网AI智控系统 开源地址 星云智控官网&#xff1a; 优雅草星云物联网AI智控软件-移动端vue: 优雅草星云物联网AI智控软件-移动端vue 星云智控PC端开源&#xff1a; 优雅草星云物联网AI智控软件-PC端vue: 优雅草星云物联网AI…...

大量企业系统超龄服役!R²AIN SUITE 一体化企业提效解决方案重构零售数智化基因

《中国百货商业协会2024零售IT及数字化系统需求调查报告》为我们呈现了零售企业在数字化转型中的复杂图景。数据显示&#xff0c;82%的企业高管对AI改变行业未来充满信心 source&#xff1a;中国百货商业协会 &#xff0c;零售IT及数字化系统需求调查报告 &#xff0c;2024年 但…...