当前位置: 首页 > news >正文

一个诡异的 Pulsar InterruptedException 异常

背景

今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。

和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了,这是整个问题的背景。

前置排查

拿到该问题后首先排查下是否是共性问题,查看了其他的应用没有发现类似的异常;同时也查看了 Pulsar broker 的监控大盘,在这个时间段依然没有波动和异常;

这样可以初步排除是 Pulsar 服务端的问题。

接着便是查看应用那段时间的负载情况,从应用 QPS 到 JVM 的各个内存情况依然没发现有什么明显的变化。

Pulsar 源码排查

既然看起来应用本身和 Pulsar broker 都没有问题的话那就只能从异常本身来排查了。

首先第一步要得知具体使用的是 Pulsar-client 是版本是多少,因为业务使用的是内部基于官方 SDK 封装 springboot starter 所以第一步还得排查这个 starter 是否有影响。

通过查看源码基本排除了 starter 的嫌疑,里面只是简单的封装了 SDK 的功能而已。

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source) 
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343) 
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)

接下来便只能是分析堆栈了,因为 Pulsar-client 的部分实现源码是没有直接打包到依赖中的,反编译的话许多代码行数对不上,所以需要将官方的源码拉到本地,切换到对于的分支进行查看。

这一步稍微有点麻烦,首先是代码库还挺大的,加上之前如果没有准备好 Pulsar 的开发环境的话估计会劝退一部分人;但其实大部分问题都是网络造成的,只要配置一些 Maven 镜像多试几次总会编译成功。

我这里直接将分支切换到 branch-2.8

从堆栈的顶部开始排查 TypedMessageBuilderImpl.java:91

看起来是内部异步发送消息的时候抛了异常。

接着往下看到这里:

java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at

看起来是这里没错,但是代码行数明显不对;因为 2.8 这个分支也是修复过几个版本,所以中间有修改导致代码行数与最新代码对不上也正常。

semaphore.get().acquire();

不过初步来看应该是这行代码抛出的线程终端异常,这里看起来只有他最有可能了。

为了确认是否是真的是这行代码,这个文件再往前翻了几个版本最终确认了就是这行代码没错了。

我们点开java.util.concurrent.Semaphore#acquire()的源码,

    /*** <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** @throws InterruptedException if the current thread is interrupted*/public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted() ||(tryAcquireShared(arg) < 0 &&acquire(null, arg, true, true, false, 0L) < 0))throw new InterruptedException();}    

通过源码会发现 acquire() 函数确实会响应中断,一旦检测到当前线程被中断后便会抛出 InterruptedException 异常。

定位问题

所以问题的原因基本确定了,就是在 Pulsar 的发送消息线程被中断了导致的,但为啥会被中断还需要继续排查。

我们知道线程中断是需要调用 Thread.currentThread().interrupt(); API的,首先猜测是否 Pulsar 客户端内部有个线程中断了这个发送线程。

于是我在 pulsar-client 这个模块中搜索了相关代码:

排除掉和 producer 不相关的地方,其余所有中断线程的代码都是在有了该异常之后继续传递而已;所以初步来看 pulsar-client 内部没有主动中断的操作。

既然 Pulsar 自己没有做,那就只可能是业务做的了?

于是我在业务代码中搜索了一下:

果然在业务代码中搜到了唯一一处中断的地方,而且通过调用关系得知这段代码是在消息发送前执行的,并且和 Pulsar 发送函数处于同一线程。

大概的伪代码如下:

        List.of(1, 2, 3).stream().map(e -> {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException ex) {throw new RuntimeException(ex);}return e;});}).collect(Collectors.toList()).forEach(f -> {try {Integer integer = f.get();log.info("====" + integer);if (integer==3){TimeUnit.SECONDS.sleep(10);Thread.currentThread().interrupt();}} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}});MessageId send = producer.newMessage().value(msg.getBytes()).send();

执行这段代码可以完全复现同样的堆栈。

幸好中断这里还打得有日志:

通过日志搜索发现异常的时间和这个中断的日志时间点完全重合,这样也就知道根本原因了。

因为业务线程和消息发送线程是同一个,在某些情况下会执行 Thread.currentThread().interrupt();,其实单纯执行这行函数并不会发生什么,只要没有去响应这个中断,也就是 Semaphore 源码中的判断了线程中断的标记:

    public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted() ||(tryAcquireShared(arg) < 0 &&acquire(null, arg, true, true, false, 0L) < 0))throw new InterruptedException();}

但恰好这里业务中断后自己并没有去判断这个标记,导致 Pulsar 内部去判断了,最终抛出了这个异常。

总结

所以归根结底还是这里的代码不合理导致的,首先是自己中断了线程但也没使用,从而导致有被其他基础库使用的可能,所以会造成了一些不可预知的后果。

再一个是不建议在业务代码中使用 Thread.currentThread().interrupt(); 这类代码,第一眼根本不知道是要干啥,也不易维护。

其实本质上线程中断也是线程间通信的一种手段,有这类需求完全可以换为内置的 BlockQueue 这类函数来实现。

相关文章:

一个诡异的 Pulsar InterruptedException 异常

背景 今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了&#xff0c;经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。 和业务沟通后得知是在一个 gRPC 接口中触发的消息发送&#xff0c;大约持续了半个小时的异常后便恢复正常了&…...

Java岗面试题--Java并发(volatile 专题)

目录1. 面试题一&#xff1a;谈谈 volatile 的使用及其原理补充&#xff1a;内存屏障volatile 的原理2. 面试题二&#xff1a;volatile 为什么不能保证原子性3. 面试题三&#xff1a;volatile 的内存语义4. 面试题四&#xff1a;volatile 的实现机制5. 面试题五&#xff1a;vol…...

Java---打家劫舍ⅠⅡ

目录 打家劫舍Ⅰ 题目分析 代码一 代码二 打家劫舍Ⅱ 打家劫舍Ⅰ 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被…...

MySQL Lesson4

1&#xff1a;关于查询结果集的去重&#xff08;distinct&#xff09; select distinct job from emp; **distinct只能出现在所有字段的最前面。所表示的含有是所有的结果联合起来去重。 select distinct deptno,job from emp order by deptno; select count(distinct job)from…...

浅谈权限获取方法之文件上传

概述 文件上传漏洞是发生在有上传功能的应用中&#xff0c;如果应用程序对用户的上传文件没有控制或者存在缺陷&#xff0c;攻击者可以利用应用上传功能存在的缺陷&#xff0c;上传木马、病毒等有危害的文件到服务器上面&#xff0c;控制服务器。 漏洞成因及危害 文件上传漏…...

资产设备防拆标签安全防护和资产定位解决方案

随着社会经济的发展和高新技术的日新月异&#xff0c;对各方面的安全要求也在不断地提高&#xff0c;以物联网安防、入侵报警和出入口控制、应急系统等为主的安全防范系统日益成为各类文物场所智能化弱电工程不可或缺的组成部分&#xff0c;是重点资产管理场所内加强管理和安全…...

企业电子招标采购源码之电子招标投标全流程!

随着各级政府部门的大力推进&#xff0c;以及国内互联网的建设&#xff0c;电子招投标已经逐渐成为国内主流的招标投标方式&#xff0c;但是依然有很多人对电子招投标的流程不够了解&#xff0c;在具体操作上存在困难。虽然各个交易平台的招标投标在线操作会略有不同&#xff0…...

【考研408】计算机网络笔记

文章目录计算机网络体系结构计算机网络概述计算机网络的组成计算机网络的功能计算机网络的分类计算机网络的性能指标课后习题计算机网络体系结构与参考模型计算机网络协议、接口、服务的概念ISO/OSI参考模型和TCP/IP模型课后习题物理层通信基础基本概念奈奎斯特定理与香农定理编…...

[C++]继承

&#x1f941;作者&#xff1a; 华丞臧 &#x1f4d5;​​​​专栏&#xff1a;【C】 各位读者老爷如果觉得博主写的不错&#xff0c;请诸位多多支持(点赞收藏关注)。如果有错误的地方&#xff0c;欢迎在评论区指出。 推荐一款刷题网站 &#x1f449;LeetCode 文章目录一、继承…...

优化知识管理方法丨整理零碎信息,提高数据价值

信息流时代&#xff0c;知识成集合倍数增长&#xff0c;看似我们学习了很多知识&#xff0c;但知识零碎无系统&#xff0c;知识之间缺乏联系&#xff0c;没有深度&#xff0c;所以虽然你很努力&#xff0c;但你发现自己的能力增长特别缓慢&#xff0c;你需要整理知识将零散的知…...

Windows操作系统的体系结构、运行环境和运行状态

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天我们来重新审视一下Windows这个我们熟悉的不能再熟悉的系统。说Windows操作系统的运行环境和运行状态&#xff0c;首先要介绍一下Windows操作系统的体系结构&#xff0c;然后再要说到最重要的两个概念:核…...

【工作笔记】Http响应头过长

起因 突然有测试小伙伴反馈进公司官网主页会白屏&#xff0c;但只是个例不是普遍现象 查监控发现没监控到异常问题 查了很久&#xff08;这个很久单指对于线上问题来说&#xff09;才定位是请求的异常&#xff0c;因为这套系统的异常用的是 ExceptionHandler&#xff0c;这也导…...

hive建分区表,分桶表,内部表,外部表

hive建分区表&#xff0c;分桶表&#xff0c;内部表&#xff0c;外部表 一、概念介绍 Hive是基于Hadoop的一个工具&#xff0c;用来帮助不熟悉 MapReduce的人使用SQL对存储在Hadoop中的大规模数据进行数据提取、转化、加载。Hive数据仓库工具能将结构化的数据文件映射为一张数…...

【分享】灌溉制度设计小程序VB源代码

说明 根据作物需水特性和当地气候、土壤、农业技术及灌水技术等因素制定的灌水方案。主要内容包括灌水次数、灌水时间、灌水定额和灌溉定额。灌溉制度是规划、设计灌溉工程和进行灌区运行管理的基本资料&#xff0c;是编制和执行灌区用水计划的重要依据。 1—计划湿润土层允…...

PR9268/300-000库存现货振动传感器 雄霸工控

PR9268/300-000库存现货振动传感器 雄霸工控PR9268/300-000库存现货振动传感器 雄霸工控SDM010PR9670/110-100PR9670/010-100PR9670/003-000PR9670/002-000PR9670/001-000PR9670/000-000PR9600/014-000PR9600/011-000PR9376/010-021PR9376/010-011PR9376/010-011PR9376/010-001…...

浅谈模型评估选择及重要性

作者&#xff1a;王同学 来源&#xff1a;投稿 编辑&#xff1a;学姐 模型评估作为机器学习领域一项不可分割的部分&#xff0c;却常常被大家忽略&#xff0c;其实在机器学习领域中重要的不仅仅是模型结构和参数量&#xff0c;对模型的评估也是至关重要的&#xff0c;只有选择那…...

多线程的初识和创建

✨个人主页&#xff1a;bit me&#x1f447; ✨当前专栏&#xff1a;Java EE初阶&#x1f447; ✨每日一语&#xff1a;知不足而奋进&#xff0c;望远山而前行。 目 录&#x1f4a4;一. 认识线程&#xff08;Thread&#xff09;&#x1f34e;1. 线程的引入&#x1f34f;2. 线程…...

一句话设计模式3:工厂模式

工厂模式:new多种对象的简单方式。 文章目录 工厂模式:new多种对象的简单方式。前言一、两种工厂模式二、如何实现工厂模式1. 简单工厂2. 抽象工厂总结前言 工厂模式可以说比较常见的设计模式,仔细观察在很多源码中都有此种模式的应用;用来解决创建对象的创建问题; 一、两种工…...

【Codeforces Round #853 (Div. 2)】C. Serval and Toxel‘s Arrays【题解】

题目 Toxel likes arrays. Before traveling to the Paldea region, Serval gave him an array aaa as a gift. This array has nnn pairwise distinct elements. In order to get more arrays, Toxel performed mmm operations with the initial array. In the iii-th opera…...

100天精通Python(数据可视化篇)——第77天:数据可视化入门基础大全(万字总结+含常用图表动图展示)

文章目录1. 什么是数据可视化&#xff1f;2. 为什么会用数据可视化&#xff1f;3. 数据可视化的好处&#xff1f;4. 如何使用数据可视化&#xff1f;5. Python数据可视化常用工具1&#xff09;Matplotlib绘图2&#xff09;Seaborn绘图3&#xff09;Bokeh绘图6. 常用图表介绍及其…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…...

业务系统对接大模型的基础方案:架构设计与关键步骤

业务系统对接大模型&#xff1a;架构设计与关键步骤 在当今数字化转型的浪潮中&#xff0c;大语言模型&#xff08;LLM&#xff09;已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中&#xff0c;不仅可以优化用户体验&#xff0c;还能为业务决策提供…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)

一、数据处理与分析实战 &#xff08;一&#xff09;实时滤波与参数调整 基础滤波操作 60Hz 工频滤波&#xff1a;勾选界面右侧 “60Hz” 复选框&#xff0c;可有效抑制电网干扰&#xff08;适用于北美地区&#xff0c;欧洲用户可调整为 50Hz&#xff09;。 平滑处理&…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

Linux简单的操作

ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...

[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...