当前位置: 首页 > news >正文

解决 Python RabbitMQ/Pika 报错:pop from an empty deque

使用 python 的 pika 包连接rabbitmq,代码如下:

import pika
import threading
import timedef on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagprint(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")channel.basic_ack(delivery_tag)credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()

执行结果:

(Test) [user@user test]$ python mq1.py 
main thread id: 140682766104384
on_message thread id: 140682766104384
b'1' start
0
1
2
3
4
5
6
7
8
9
b'1' end
Traceback (most recent call last):File "mq1.py", line 38, in <module>channel.start_consuming()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consumingself._process_data_events(time_limit=None)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_eventsself.connection.process_data_events(time_limit=time_limit)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 833, in process_data_eventsself._dispatch_channel_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_eventsimpl_channel._get_cookie()._dispatch_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1492, in _dispatch_eventsconsumer_info.on_message_callback(self, evt.method,File "mq1.py", line 24, in on_messagechannel.basic_ack(delivery_tag)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2112, in basic_ackself._flush_output()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_outputself._connection._flush_output(lambda: self.is_closed, *waiters)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_outputraise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

 从结果来看,异常发生在一次长时间的消费过程(200s)完成后报错,具体为调用channel.basic_ack(delivery_tag)发生报错;推测是此时与MQ Server的连接已经被重置ConnectionResetError(104, 'Connection reset by peer'),此时再主动确认就发生报错。

正确解决方案如下:

"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
"""
import functools
import pika
import threading
import timedef ack_message(channel, delivery_tag):print(f'ack_message thread id: {threading.get_ident()}')if channel.is_open:channel.basic_ack(delivery_tag)else:# Channel is already closed, so we can't ACK this message;# log and/or do something that makes sense for your app in this case.passdef do_work(channel, delivery_tag, body):print(f'do_work thread id: {threading.get_ident()}')print(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")cb = functools.partial(ack_message, channel, delivery_tag)channel.connection.add_callback_threadsafe(cb)def on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagt = threading.Thread(target=do_work, args=(channel, delivery_tag, body))t.start()credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()

思路是pika是线程不安全的,所以在接收消息和ACK响应消息时需要另外线程。

相关文章:

解决 Python RabbitMQ/Pika 报错:pop from an empty deque

使用 python 的 pika 包连接rabbitmq&#xff0c;代码如下&#xff1a; import pika import threading import timedef on_message(channel, method_frame, header_frame, body):print(fon_message thread id: {threading.get_ident()})delivery_tag method_frame.delivery_t…...

观察者模式实战

场景 假设创建订单后需要发短信、发邮件等其它的操作&#xff0c;放在业务逻辑会使代码非常臃肿&#xff0c;可以使用观察者模式优化代码 代码实现 自定义一个事件 发送邮件 发送短信 最后再创建订单的业务逻辑进行监听&#xff0c;创建订单 假设后面还需要做其它的…...

035_小驰私房菜_Qualcomm账号注册以及提case流程

全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 一、账号注册 1)登陆高通网站Wireless Technology & Innovation | Mobile Technology | Qualcomm, 采用…...

uniapp input输入框placeholder文本右对齐

input输入框placeholder文本右对齐 给input标签加上placeholder-class&#xff0c;这个是给placeholder设置样式&#xff0c;右对齐这就是text-align:right;字体颜色之类依次编辑即可。...

分布式监控平台—zabbix

前言一、zabbix概述1.1 什么是zabbix1.2 zabbix的监控原理1.3 zabbix常见五个应用程序1.4 zabbix的监控模式1.5 监控架构1.5.1 C/S&#xff08;server—client&#xff09;1.5.2 server—proxy—client1.5.3 master—node—client 二、部署zabbix2.1 部署 zabbix server 端2.2 …...

【leetcode】第一章数组-2

977. 有序数组的平方 简单的方法是平方后使用排序方法第2种方法是双指针方法&#xff0c;从两边进行判断&#xff0c;将最大的从后往前放 public static int[] sortedSquares(int[] nums) {// 输入&#xff1a;nums [-4,-1,0,3,10]// 输出&#xff1a;[0,1,9,16,100]// 解释…...

程序使用Microsoft.XMLHTTP对象请求https时出错解决

程序中使用Microsoft.XMLHTTP组件请求https时出现如下错误&#xff1a; 出错程序代码示例&#xff1a; strUrl "https://www.xxx.com/xxx.asp?id11" dim objXmlHttp set objXmlHttp Server.CreateObject("Microsoft.XMLHTTP") objXmlHttp.open "…...

Linux安装配置nginx+php搭建

Linux安装配置nginxphp搭建 文章目录 Linux安装配置nginxphp搭建1.nginx源码包编译环境和安装相应的依赖1.1 安装编译环境1.2 安装pcre库、zlib库和openssl库 2.安装nginx2.1 在[nginx官网](https://nginx.org/en/download.html)上获取源码包并进行下载2.2 进行解压编译 3.启动…...

springboot的各种配置

1.AOP配置 <!-- AOP的依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency>package com.qf.HomeWork.aop;import lombok.extern.slf4j.Slf4j; im…...

OSI七层模型及TCP/IP四层模型

目录 OSI七层模型 TCP/IP四层模型 OIS七层模型和TCP/IP模型图 七层详解 两种模型比较 为什么OSI七层体系结构不常用 四层详解 网络为什么要分层&#xff1f; 说说 OSI 七层模型和 TCP/IP 四层模型的关系和区别 OSI七层模型 OSI&#xff08;Open System Interconnect&a…...

MDN-Web APIs

参考资料 文章目录 简介DOM APIXMLHttpRequestWeb Storage APIWebsockets API 简介 Web APIs&#xff08;Application Programming Interfaces&#xff09;是用于与浏览器环境中的 Web 功能进行交互的一组接口和方法集合。通过 Web APIs&#xff0c;开发人员可以访问浏览器提…...

2023国赛数学建模C题思路分析

文章目录 0 赛题思路1 竞赛信息2 竞赛时间3 建模常见问题类型3.1 分类问题3.2 优化问题3.3 预测问题3.4 评价问题 4 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 竞赛信息 全国大学生数学建模…...

暑假集训笔记

刷题刷的好累啊...不想刷题了...然后就来写题解了... 昨天晚上打了场div2..2000来名&#xff0c;加了155分....现在rating1281...我是菜鸡..暑假之前就打到了1200分以上了&#xff0c;结果暑假一掉再掉&#xff0c;直接掉到1100了...然后我就一直压力很大....... 昨天在机房打…...

【枚举+推式子】牛客小白月赛 63 E

登录—专业IT笔试面试备考平台_牛客网 题意&#xff1a; 思路&#xff1a; 首先是个计数问题&#xff0c;考虑组合数学 组合数学就是在考虑枚举所有包含1和n的区间 这个典中典就是枚举1和n的位置然后算贡献 双指针超时&#xff0c;考虑推式子&#xff1a; Code&#xff1a…...

Android多屏幕支持-Android12

Android多屏幕支持-Android12 1、概览及相关文章2、屏幕窗口配置2.1 配置xml文件2.2 DisplayInfo#uniqueId 屏幕标识2.3 adb查看信息 3、配置文件解析3.1 xml字段读取3.2 简要时序图 4、每屏幕焦点 android12-release 1、概览及相关文章 AOSP > 文档 > 心主题 > 多屏…...

python环境下载安装教程,python运行环境怎么下载

本篇文章给大家谈谈python安装步骤以及环境变量配置&#xff0c;以及下载python需要设置环境变量吗&#xff0c;希望对各位有所帮助&#xff0c;不要忘了收藏本站喔。 1.https://www.python.org/downloads/windows/ 下载适合自己电脑的python安装包 2.下载后安装即可 3.配置环…...

【0.2】lubancat鲁班猫4远程ubuntu22.04.2 无需任何安装

环境 lubancat4鲁班猫4 (4G0)不带emmc 系统镜像ubuntu-22.04.2-desktop-arm64-lubancat-4.img 网络环境:有线网络与本win10电脑同意环境 操作步骤ubuntu正常开机登陆用户&#xff0c;连接好网络进入设置>网络查看设备当前局域网IP 如192.168.199.159进入设置>共享>远程…...

Flutter 状态管理 Provider

状态管理必要性 Flutter基于声明式构建UI&#xff0c;原生则是命令式&#xff0c;状态管理是用于解决声明式开发带来的问题。 例&#xff1a;命令式的原生&#xff0c;数据更新需要拿到对应控件并更改其显示值&#xff1b;而声明式则需要更改数据值并通过setstate更新状态&am…...

【设计模式】观察者模式

什么是观察者模式&#xff1f; 观察者模式&#xff08;又被称为发布-订阅&#xff08;Publish/Subscribe&#xff09;模式&#xff0c;属于行为型模式的一种&#xff0c;它定义了一种一对多的依赖关系&#xff0c;让多个观察者对象同时监听某一个主题对象。这个主题对象在状态…...

ORCA优化器浅析——CDXLOperator Base class for operators in a DXL tree

如上图所示&#xff0c;CDXLOperator作为Base class for operators in a DXL tree&#xff0c;其子类CDXLLogical、CDXLScalar、CDXLPhysical作为逻辑节点、物理节点和Scalar节点的DXL表示类&#xff0c;因此其包含了这些类的共同部分特性&#xff0c;比如获取其DXL节点表示的函…...

前端交互设计实现

前端交互设计实现&#xff1a;打造流畅用户体验的艺术 在数字化时代&#xff0c;前端交互设计已成为用户体验的核心。无论是网页、移动应用还是智能设备界面&#xff0c;优秀的交互设计能显著提升用户满意度与留存率。前端交互设计不仅关乎视觉美观&#xff0c;更注重用户操作…...

01 微服务

一、认识微服务 1.1 微服务架构演变 单体架构&#xff1a; 将业务的所有功能集中在一个项目中开发&#xff0c;打成一个包部署&#xff08;简单方便&#xff0c;高度耦合&#xff0c;拓展性差&#xff0c;适合小型项目&#xff0c;如学生管理系统&#xff09;&#xff1b;分布式…...

别再手动标注了!用百度大脑EasyData的多人协同功能,3步搞定团队数据标注

高效团队数据标注实战&#xff1a;用协同工具提升3倍效率 当五个人围着一堆猫狗图片争论"这只算狸花猫还是虎斑猫"时&#xff0c;数据标注工作就变成了效率黑洞。我们实验室去年标注10万张医疗影像的经历让我深刻理解&#xff1a;团队标注的核心痛点从来不是工具操作…...

洛克王国世界T0精灵合集!配无线副屏看攻略丝滑开荒!

《洛克王国&#xff1a;世界》自3月26日公测以来&#xff0c;首日新进用户就超过1500万&#xff0c;无数小洛克一头扎进这片超大地图开始冒险。但面对首发实装的超400只精灵&#xff0c;很多萌新最大的困惑就是——我到底该养谁&#xff1f;开荒选谁最稳&#xff1f;别慌。开局…...

从Google Spanner到阿里OceanBase:拆解Paxos在万亿级数据库里是怎么‘打工’的

万亿级数据库背后的Paxos工程实践&#xff1a;从理论到工业级实现 在分布式数据库的世界里&#xff0c;Paxos协议就像一位默默无闻的"超级员工"&#xff0c;它不直接处理用户查询&#xff0c;不参与SQL解析&#xff0c;却在幕后确保每个数据变更都能在全球多个数据中…...

AI编程实战:从零到一搭建全栈项目潜

1. 核心概念 在 Antigravity 中&#xff0c;技能系统分为两层&#xff1a; Skills (全局库)&#xff1a;实际的代码、脚本和指南&#xff0c;存储在系统级目录&#xff08;如 ~/.gemini/antigravity/skills&#xff09;。它们是“能力”的本体。 Workflows (项目级)&#xff1a…...

VescUart库详解:嵌入式VESC UART通信协议与实时控制实践

1. VescUart库深度解析&#xff1a;面向嵌入式工程师的VESC UART通信全栈指南 1.1 库定位与工程价值 VescUart是一个专为嵌入式平台设计的轻量级UART通信库&#xff0c;核心目标是实现对VESC&#xff08; Vedder Electronic Speed Controller&#xff09;电调设备的可靠、低延…...

【毕业季求生帖】论文盲目降AI等于白送钱?10款降AI软件红黑榜揭秘

今年毕业季&#xff0c;降AI率最大的难点其实早就不仅是降不降得下来&#xff0c;还有降完之后还能不能看&#xff0c;随着知网、维普接连升级AIGC检测算法&#xff0c;靠简单同义词替换已经完全行不通了。 而且最让大家崩溃的往往是这三点&#xff1a;第一&#xff0c;降完之后…...

基于HFSS的侧馈矩形微带天线仿真与优化实战

1. 侧馈矩形微带天线设计基础 微带天线作为现代无线通信系统中的关键部件&#xff0c;因其体积小、重量轻、易于集成等优势被广泛应用。侧馈矩形微带天线是最基础也最具代表性的结构&#xff0c;特别适合2.45GHz这类常见频段的应用场景。我第一次接触这类天线设计时&#xff0c…...

Arduino设备控制项目实战:从Demo代码到量产固件

1. 项目概述Goldfish4Tech 并非一个标准开源嵌入式库&#xff0c;其 GitHub 或公开技术平台中未收录可检索的源码仓库、API 文档或硬件设计资料。根据所提供的唯一有效输入信息——项目标题 "Goldfish4Tech"、摘要 "Arduino demo code for project"、关键词…...