python使用httpx_sse调用sse流式接口对响应格式为application/json的错误信息的处理
目录
- 问题描述
- 方案
问题描述
调用sse流式接口使用httpx_sse的方式
import httpxfrom httpx_sse import connect_sse# 省略无关代码try:with httpx.Client() as client:with connect_sse(client, "GET", url, params=param) as event_source:clear_textbox(response_textbox)# 把 iter_sse() 迭代完, 就相当于处理完了一次流式调用for sse in event_source.iter_sse():# 流式响应中,每次响应体的处理逻辑print(f"generated_answer的值是: '{sse.data}'")response = sse.dataif response != '':# self.response = responseappend_text(response_textbox, response)except httpx.RequestError as e:print(f"请求错误:{e}")except Exception as e:print(f"发生了一个错误:{e}")
httpx_sse的connet_sse源码:
@contextmanager
def connect_sse(client: httpx.Client, method: str, url: str, **kwargs: Any
) -> Iterator[EventSource]:headers = kwargs.pop("headers", {})headers["Accept"] = "text/event-stream"headers["Cache-Control"] = "no-store"with client.stream(method, url, headers=headers, **kwargs) as response:yield EventSource(response)
可以看到connect_sse源码中的headers的"Accept"设置了只接受"text/event-stream"流式结果,正常这么调用是没错的。但是当后端的流式接口因为401权限问题等报错返回了"application/json"格式,如
{ “code”:401, “msg”:“登录过期,请重新登录”, “data”:null} 这样的json格式结果时,以上代码就会报错,因为他不是"text/event-stream"流式响应结果头。那么该怎么办呢?
方案
重新写一个自定义的connect_sse。
import httpx
from httpx_sse import EventSource
from typing import Any, Iterator
from contextlib import contextmanager
import json
# 自定义调用sse接口@contextmanagerdef custom_connect_sse(self, client: httpx.Client, method: str, url: str, **kwargs: Any) -> Iterator[EventSource]:headers = kwargs.pop("headers", {})# 只有当没有指定Accept时才添加默认值headers["Accept"] = "*/*"headers["Cache-Control"] = "no-store"with client.stream(method, url, headers=headers, **kwargs) as response:content_type = response.headers.get('content-type', '').lower()json_flag = Falseif 'text/event-stream' in content_type:# 处理SSE流yield json_flag, EventSource(response)elif 'application/json' in content_type:# yield response # 在这里你可以决定如何进一步处理这个JSON响应# 读取并合并所有文本块text_data = ''.join([chunk for chunk in response.iter_text()])# 解析整个响应体为JSONjson_data = json.loads(text_data)json_flag = Trueyield json_flag, json_data
调用代码
# 使用自定义的connect_sse函数try:with httpx.Client() as client:with self.custom_connect_sse(client, "GET", url, params=param, headers=headers) as (json_flag, event_source):if json_flag:code = event_source.get("code")msg = event_source.get("msg")print(f"Code: {code}, Message: {msg}")else:full_answer = ""clear_textbox(response_textbox)for sse in event_source.iter_sse():print(f"generated_answer的值是: '{sse.data}'")response = sse.dataif response:append_text(response_textbox, response)full_answer += responseuser_record += reply + full_answer + "\n"print(f"user_record:{user_record}")except httpx.RequestError as e:print(f"请求错误:{e}")except Exception as e:print(f"发生了一个错误:{e}")
关键步骤:
1.设置headers[“Accept”] = “/”,所有响应头都可以接收
2.content_type = response.headers.get(‘content-type’, ‘’).lower() 判断响应头是流式还是json,并用json_flag记录是否json标识,返回不同的结果。如果是json,则循环合并处理chunk块,拼装完整json返回结果(实测第一次就返回完整json结构了,但是代码得这么写)。
3.使用自定义connect_sse方法时,根据json_flag来分别处理成功调用流式结果还是异常的json结果。
相关文章:
python使用httpx_sse调用sse流式接口对响应格式为application/json的错误信息的处理
目录 问题描述方案 问题描述 调用sse流式接口使用httpx_sse的方式 import httpxfrom httpx_sse import connect_sse# 省略无关代码try:with httpx.Client() as client:with connect_sse(client, "GET", url, paramsparam) as event_source:clear_textbox(response_t…...
在列线图上标记做为线性模型的局部解释
改造列线图做为线性模型的解释 除了使用列线图算法产生的meta数据和score数据进行线性模型的解释(全局性解释和局部性解释),另外一种做法是改造列线图来作为线性模型的解释。这里尝试改造列线图来对线性模型进行全局性和局部性解释。 全局…...
KubeKey一键安装部署k8s集群和KubeSphere详细教程
目录 一、KubeKey简介 二、k8s集群KubeSphere安装 集群规划 硬件要求 Kubernetes支持版本 操作系统要求 SSH免密登录 配置集群时钟 所有节点安装依赖 安装docker DNS要求 存储要求 下载 KubeKey 验证KubeKey 配置集群文件 安装集群 验证命令 登录页面 一、Ku…...
CPU多级缓存与缓存一致性协议
CPU多级缓存与缓存一致性协议 CPU多级缓存和缓存一致性协议是计算机体系结构中优化性能与保证数据正确性的核心机制。以下从缓存层级设计、工作原理、一致性协议(如MESI)及其实现细节展开说明。 一、为什么需要多级缓存? CPU的计算速度远高…...
Docker用户的困境:免费项目的减少与成本的增加
摘要 在生产环境中,Docker用户正面临新的挑战:免费项目逐渐减少,收费服务成为主流趋势。表面上免费的选项,由于缺乏必要的支持和及时更新,反而可能导致更高的隐性成本。对于依赖Docker进行开发和部署的企业而言&#x…...
车载诊断数据库 --- AUTOSAR诊断文件DEXT简介
我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 简单,单纯,喜欢独处,独来独往,不易合同频过着接地气的生活,除了生存温饱问题之外,没有什么过多的欲望,表面看起来很高冷,内心热情,如果你身…...
Linux lsblk 命令详解:查看磁盘和分区信息 (中英双语)
Linux lsblk 命令详解:查看磁盘和分区信息 在 Linux 系统中,管理磁盘设备和分区是日常运维工作的重要部分。而 lsblk 命令是一个强大的工具,它用于列出系统中的块设备(block devices)信息,可以帮助我们快速…...
庙算兵棋推演AI开发初探(5-数据处理)
碎碎念:这最近几个月过得那叫一个难受,研究生开题没过、需求评审会在4月和6月开了2次、7月紧接着软件设计评审会,加班干得都是文档的事情,还有开会前的会务和乱七八糟的琐事,我们干的还被规定弄的束手束脚,…...
【MyBatis】#{} 与 ${} 的区别(常见面试题)
目录 前言 预编译SQL和即时SQL 什么是预编译SQL? 什么是即时SQL? 区别 #{} 与 ${}的使用 防止SQL注入 什么是SQL注入? 原理 排序功能 模糊查询 总结#{}和${}的区别 前言 在前面的学习中,我们已经知道了如果SQL语句想…...
鸿蒙开发环境搭建-入门篇
本文章讲述如何搭建鸿蒙应用开发环境:新建工程、虚拟机运行、真机调试等。 开发工具: DevEco Studio 5.0.3.906 os系统: mac 参考文档:https://juejin.cn/post/7356143704699699227 官网鸿蒙应用开发学习文档:https://developer.huawei.com/c…...
力扣-贪心-53 最大子数组和
思路 先把每一个值都加到当前集合中,记录当前的和,直到当前记录和小于0了,再重置改记录,再次尝试累加 代码 class Solution { public:int maxSubArray(vector<int>& nums) {int res INT32_MIN;int curSum 0;for(in…...
iOS开发 网络安全
iOS开发中的网络安全 在当前的数字化时代,任何应用程序都需要重视网络安全。尤其是对于iOS应用开发者而言,确保应用与服务器之间的数据传输安全是至关重要的。接下来,我们将学习“iOS开发 网络安全”的实现过程。 流程步骤 以下是实现iOS网…...
MATLAB在投资组合优化中的应用:从基础理论到实践
引言 投资组合优化是现代金融理论中的核心问题之一,旨在通过合理配置资产,实现风险与收益的最佳平衡。MATLAB凭借其强大的数学计算能力和丰富的金融工具箱,成为投资组合优化的理想工具。本文将详细介绍如何使用MATLAB进行投资组合优化&#…...
银河麒麟系统安装mysql5.7【亲测可行】
一、安装环境 cpu:I5-10代; 主板:华硕; OS:银河麒麟V10(SP1)未激活 架构:Linux 5.10.0-9-generic x86_64 GNU/Linux mysql版本:mysql-5.7.34-linux-glibc2.12-x86_64.ta…...
自动创建spring boot应用(eclipse版本)
使用spring starter project创建项目 设置Service URL 把Service URL设置为 https://start.aliyun.com/ 如下图: 使用这个网址,创建项目更快。 选择Spring Web依赖 项目结构 mvnw和mvnw.cmd:这是maven包装器(wrapper)脚本&…...
基于Flask的第七次人口普查数据分析系统的设计与实现
【Flask】基于Flask的第七次人口普查数据分析系统的设计与实现(完整系统源码开发笔记详细部署教程)✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 基于Flask的人口普查可视化分析系统 二、项目界面展示 登录/注册 首页/详情 …...
DNS, domain name system
DNS 是一种应用层协议和http/https是同一等级的 其传输层主要用的是udp,也可能用tcp DNS协议完成的作用:查 域名对应的 ip DNS服务器完成的作用:存储 域名 -> ip 的映射 DNS服务器有三个等级:根DNS,顶级域DNS&…...
Linux:文件(三)
1. 磁盘 基本概念 机械磁盘在现在的计算机中基本是唯一的一个机械设备 速度较内存更慢,容量大价格便宜。 磁盘是永久性存储介质,断电后数据还在。 内存是易失性存储介质,断电后(未写入磁盘的)数据丢失。 物理存储结构 扇区:…...
DeepSeek 给我一个 DeepSeekUI 页面
接着上次分享内容 三步安装 DeepSeek 说,DeepSeek 下载好了,总不能是黑框框对话吧,总得找一个 UI 界面使用吧。 本地运行 DeepSeek 比安装 python、jdk 简单多了,本地还没装过的可以参考上次的文档安装。 于是找了几个开源的试了试…...
Java NIO与传统IO性能对比分析
Java NIO与传统IO性能对比分析 在Java中,I/O(输入输出)操作是开发中最常见的任务之一。传统的I/O方式基于阻塞模型,而Java NIO(New I/O)引入了非阻塞和基于通道(Channel)和缓冲区&a…...
054 redisson
文章目录 使用Redisson演示可重入锁读写锁信号量闭锁获取三级分类redisson分布式锁 package com.xd.cubemall.product.config;import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context…...
C#上位机--进程和线程的区别
引言 在 C# 上位机开发中,进程和线程是两个非常重要的概念,它们在程序的运行和性能优化方面起着关键作用。理解进程和线程的区别,能够帮助开发者更好地设计和实现高效、稳定的上位机程序。本文将深入探讨 C# 上位机中进程和线程的区别&#…...
小智机器人CMakeLists编译文件解析
编译完成后,成功烧录! 这段代码是一个CMake脚本,用于配置和构建一个嵌入式项目,特别是针对ESP32系列芯片的项目。CMake是一个跨平台的构建系统,用于管理项目的编译过程。 set(SOURCES "audio_codecs/audio_code…...
【科研绘图系列】R语言绘制SCI论文图合集
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载Load dataFigure 1Fig 1B: functional assays adhensionFIG 1C: Functional assays OPK Figure 2Fig 2C: Settings and function fo…...
Luckfox Pico Max运行RKNN-Toolkit2中的Yolov5 adb USB仿真
1:下载rknn-toolkit2 git clone https://github.com/rockchip-linux/rknn-toolkit2 2:修改onnx目录下的yolov5的test.py的代码 # pre-process config print(--> Config model) rknn.config(mean_values[[0, 0, 0]], std_values[[255, 255, …...
VSCode ssh远程连接内网服务器(不能上网的内网环境的Linux服务器)的终极解决方案
VSCode ssh远程连接内网服务器(不能上网的内网环境的Linux服务器) 离线下载vscode-server并安装: 如果远程端不能联网可以下载包离线安装,下载 vscode-server 的 url 需要和 vscode 客户端版本的 commit-id 对应.通过 vscode 面板的帮助->关于可以获…...
大语言模型:从开发到运行的深度解构
一、LLM开发训练的全流程解析 1. 数据工程的炼金术 数据采集:构建涵盖网页文本(Common Crawl)、书籍、论文、代码等领域的超大规模语料库,典型规模可达数十TB。例如GPT-4的训练数据包含超过13万亿token数据清洗:通过…...
支持向量机(SVM):算法讲解与原理推导
1 SVM介绍 SVM是一个二类分类器,它的全称是Support Vector Machine,即支持向量机。 SVM的目标是找到一个超平面,使用两类数据离这个超平面越远越好,从而对新的数据分类更准确,即使分类器更加健壮。比如上面的图中&am…...
Redis 缓存穿透、击穿、雪崩:问题与解决方案
在使用 Redis 作为缓存中间件时,系统可能会面临一些常见的问题,如 缓存穿透、缓存击穿 和 缓存雪崩。这些问题如果不加以解决,可能会导致数据库压力过大、系统响应变慢甚至崩溃。本文将详细分析这三种问题的起因,并提供有效的解决…...
macos sequoia 禁用 ctrl+enter 打开鼠标右键菜单功能
macos sequoia默认ctrlenter会打开鼠标右键菜单,使得很多软件有冲突。关闭方法: end...
