解决 Python RabbitMQ/Pika 报错:pop from an empty deque
使用 python 的 pika 包连接rabbitmq,代码如下:
import pika
import threading
import timedef on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagprint(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")channel.basic_ack(delivery_tag)credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()
执行结果:
(Test) [user@user test]$ python mq1.py
main thread id: 140682766104384
on_message thread id: 140682766104384
b'1' start
0
1
2
3
4
5
6
7
8
9
b'1' end
Traceback (most recent call last):File "mq1.py", line 38, in <module>channel.start_consuming()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consumingself._process_data_events(time_limit=None)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_eventsself.connection.process_data_events(time_limit=time_limit)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 833, in process_data_eventsself._dispatch_channel_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_eventsimpl_channel._get_cookie()._dispatch_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1492, in _dispatch_eventsconsumer_info.on_message_callback(self, evt.method,File "mq1.py", line 24, in on_messagechannel.basic_ack(delivery_tag)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2112, in basic_ackself._flush_output()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_outputself._connection._flush_output(lambda: self.is_closed, *waiters)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_outputraise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
从结果来看,异常发生在一次长时间的消费过程(200s)完成后报错,具体为调用channel.basic_ack(delivery_tag)发生报错;推测是此时与MQ Server的连接已经被重置ConnectionResetError(104, 'Connection reset by peer'),此时再主动确认就发生报错。
正确解决方案如下:
"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
"""
import functools
import pika
import threading
import timedef ack_message(channel, delivery_tag):print(f'ack_message thread id: {threading.get_ident()}')if channel.is_open:channel.basic_ack(delivery_tag)else:# Channel is already closed, so we can't ACK this message;# log and/or do something that makes sense for your app in this case.passdef do_work(channel, delivery_tag, body):print(f'do_work thread id: {threading.get_ident()}')print(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")cb = functools.partial(ack_message, channel, delivery_tag)channel.connection.add_callback_threadsafe(cb)def on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagt = threading.Thread(target=do_work, args=(channel, delivery_tag, body))t.start()credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()
思路是pika是线程不安全的,所以在接收消息和ACK响应消息时需要另外线程。
相关文章:
解决 Python RabbitMQ/Pika 报错:pop from an empty deque
使用 python 的 pika 包连接rabbitmq,代码如下: import pika import threading import timedef on_message(channel, method_frame, header_frame, body):print(fon_message thread id: {threading.get_ident()})delivery_tag method_frame.delivery_t…...
观察者模式实战
场景 假设创建订单后需要发短信、发邮件等其它的操作,放在业务逻辑会使代码非常臃肿,可以使用观察者模式优化代码 代码实现 自定义一个事件 发送邮件 发送短信 最后再创建订单的业务逻辑进行监听,创建订单 假设后面还需要做其它的…...
035_小驰私房菜_Qualcomm账号注册以及提case流程
全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 一、账号注册 1)登陆高通网站Wireless Technology & Innovation | Mobile Technology | Qualcomm, 采用…...
uniapp input输入框placeholder文本右对齐
input输入框placeholder文本右对齐 给input标签加上placeholder-class,这个是给placeholder设置样式,右对齐这就是text-align:right;字体颜色之类依次编辑即可。...
分布式监控平台—zabbix
前言一、zabbix概述1.1 什么是zabbix1.2 zabbix的监控原理1.3 zabbix常见五个应用程序1.4 zabbix的监控模式1.5 监控架构1.5.1 C/S(server—client)1.5.2 server—proxy—client1.5.3 master—node—client 二、部署zabbix2.1 部署 zabbix server 端2.2 …...
【leetcode】第一章数组-2
977. 有序数组的平方 简单的方法是平方后使用排序方法第2种方法是双指针方法,从两边进行判断,将最大的从后往前放 public static int[] sortedSquares(int[] nums) {// 输入:nums [-4,-1,0,3,10]// 输出:[0,1,9,16,100]// 解释…...
程序使用Microsoft.XMLHTTP对象请求https时出错解决
程序中使用Microsoft.XMLHTTP组件请求https时出现如下错误: 出错程序代码示例: strUrl "https://www.xxx.com/xxx.asp?id11" dim objXmlHttp set objXmlHttp Server.CreateObject("Microsoft.XMLHTTP") objXmlHttp.open "…...
Linux安装配置nginx+php搭建
Linux安装配置nginxphp搭建 文章目录 Linux安装配置nginxphp搭建1.nginx源码包编译环境和安装相应的依赖1.1 安装编译环境1.2 安装pcre库、zlib库和openssl库 2.安装nginx2.1 在[nginx官网](https://nginx.org/en/download.html)上获取源码包并进行下载2.2 进行解压编译 3.启动…...
springboot的各种配置
1.AOP配置 <!-- AOP的依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency>package com.qf.HomeWork.aop;import lombok.extern.slf4j.Slf4j; im…...
OSI七层模型及TCP/IP四层模型
目录 OSI七层模型 TCP/IP四层模型 OIS七层模型和TCP/IP模型图 七层详解 两种模型比较 为什么OSI七层体系结构不常用 四层详解 网络为什么要分层? 说说 OSI 七层模型和 TCP/IP 四层模型的关系和区别 OSI七层模型 OSI(Open System Interconnect&a…...
MDN-Web APIs
参考资料 文章目录 简介DOM APIXMLHttpRequestWeb Storage APIWebsockets API 简介 Web APIs(Application Programming Interfaces)是用于与浏览器环境中的 Web 功能进行交互的一组接口和方法集合。通过 Web APIs,开发人员可以访问浏览器提…...
2023国赛数学建模C题思路分析
文章目录 0 赛题思路1 竞赛信息2 竞赛时间3 建模常见问题类型3.1 分类问题3.2 优化问题3.3 预测问题3.4 评价问题 4 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 竞赛信息 全国大学生数学建模…...
暑假集训笔记
刷题刷的好累啊...不想刷题了...然后就来写题解了... 昨天晚上打了场div2..2000来名,加了155分....现在rating1281...我是菜鸡..暑假之前就打到了1200分以上了,结果暑假一掉再掉,直接掉到1100了...然后我就一直压力很大....... 昨天在机房打…...
【枚举+推式子】牛客小白月赛 63 E
登录—专业IT笔试面试备考平台_牛客网 题意: 思路: 首先是个计数问题,考虑组合数学 组合数学就是在考虑枚举所有包含1和n的区间 这个典中典就是枚举1和n的位置然后算贡献 双指针超时,考虑推式子: Code:…...
Android多屏幕支持-Android12
Android多屏幕支持-Android12 1、概览及相关文章2、屏幕窗口配置2.1 配置xml文件2.2 DisplayInfo#uniqueId 屏幕标识2.3 adb查看信息 3、配置文件解析3.1 xml字段读取3.2 简要时序图 4、每屏幕焦点 android12-release 1、概览及相关文章 AOSP > 文档 > 心主题 > 多屏…...
python环境下载安装教程,python运行环境怎么下载
本篇文章给大家谈谈python安装步骤以及环境变量配置,以及下载python需要设置环境变量吗,希望对各位有所帮助,不要忘了收藏本站喔。 1.https://www.python.org/downloads/windows/ 下载适合自己电脑的python安装包 2.下载后安装即可 3.配置环…...
【0.2】lubancat鲁班猫4远程ubuntu22.04.2 无需任何安装
环境 lubancat4鲁班猫4 (4G0)不带emmc 系统镜像ubuntu-22.04.2-desktop-arm64-lubancat-4.img 网络环境:有线网络与本win10电脑同意环境 操作步骤ubuntu正常开机登陆用户,连接好网络进入设置>网络查看设备当前局域网IP 如192.168.199.159进入设置>共享>远程…...
Flutter 状态管理 Provider
状态管理必要性 Flutter基于声明式构建UI,原生则是命令式,状态管理是用于解决声明式开发带来的问题。 例:命令式的原生,数据更新需要拿到对应控件并更改其显示值;而声明式则需要更改数据值并通过setstate更新状态&am…...
【设计模式】观察者模式
什么是观察者模式? 观察者模式(又被称为发布-订阅(Publish/Subscribe)模式,属于行为型模式的一种,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态…...
ORCA优化器浅析——CDXLOperator Base class for operators in a DXL tree
如上图所示,CDXLOperator作为Base class for operators in a DXL tree,其子类CDXLLogical、CDXLScalar、CDXLPhysical作为逻辑节点、物理节点和Scalar节点的DXL表示类,因此其包含了这些类的共同部分特性,比如获取其DXL节点表示的函…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...
搭建DNS域名解析服务器(正向解析资源文件)
正向解析资源文件 1)准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2)服务端安装软件:bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
关于easyexcel动态下拉选问题处理
前些日子突然碰到一个问题,说是客户的导入文件模版想支持部分导入内容的下拉选,于是我就找了easyexcel官网寻找解决方案,并没有找到合适的方案,没办法只能自己动手并分享出来,针对Java生成Excel下拉菜单时因选项过多导…...
实战三:开发网页端界面完成黑白视频转为彩色视频
一、需求描述 设计一个简单的视频上色应用,用户可以通过网页界面上传黑白视频,系统会自动将其转换为彩色视频。整个过程对用户来说非常简单直观,不需要了解技术细节。 效果图 二、实现思路 总体思路: 用户通过Gradio界面上…...
深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏
一、引言 在深度学习中,我们训练出的神经网络往往非常庞大(比如像 ResNet、YOLOv8、Vision Transformer),虽然精度很高,但“太重”了,运行起来很慢,占用内存大,不适合部署到手机、摄…...
Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...
