一个诡异的 Pulsar InterruptedException 异常
背景

今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。
和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了,这是整个问题的背景。
前置排查
拿到该问题后首先排查下是否是共性问题,查看了其他的应用没有发现类似的异常;同时也查看了 Pulsar broker 的监控大盘,在这个时间段依然没有波动和异常;
这样可以初步排除是 Pulsar 服务端的问题。
接着便是查看应用那段时间的负载情况,从应用 QPS 到 JVM 的各个内存情况依然没发现有什么明显的变化。
Pulsar 源码排查
既然看起来应用本身和 Pulsar broker 都没有问题的话那就只能从异常本身来排查了。
首先第一步要得知具体使用的是 Pulsar-client 是版本是多少,因为业务使用的是内部基于官方 SDK 封装 springboot starter 所以第一步还得排查这个 starter 是否有影响。
通过查看源码基本排除了 starter 的嫌疑,里面只是简单的封装了 SDK 的功能而已。
org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java)
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343)
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318)
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)
接下来便只能是分析堆栈了,因为 Pulsar-client 的部分实现源码是没有直接打包到依赖中的,反编译的话许多代码行数对不上,所以需要将官方的源码拉到本地,切换到对于的分支进行查看。
这一步稍微有点麻烦,首先是代码库还挺大的,加上之前如果没有准备好 Pulsar 的开发环境的话估计会劝退一部分人;但其实大部分问题都是网络造成的,只要配置一些 Maven 镜像多试几次总会编译成功。
我这里直接将分支切换到 branch-2.8。
从堆栈的顶部开始排查 TypedMessageBuilderImpl.java:91:

看起来是内部异步发送消息的时候抛了异常。
接着往下看到这里:
java.lang.InterruptedException
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at

看起来是这里没错,但是代码行数明显不对;因为 2.8 这个分支也是修复过几个版本,所以中间有修改导致代码行数与最新代码对不上也正常。
semaphore.get().acquire();
不过初步来看应该是这行代码抛出的线程终端异常,这里看起来只有他最有可能了。

为了确认是否是真的是这行代码,这个文件再往前翻了几个版本最终确认了就是这行代码没错了。
我们点开java.util.concurrent.Semaphore#acquire()的源码,
/*** <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** @throws InterruptedException if the current thread is interrupted*/public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted() ||(tryAcquireShared(arg) < 0 &&acquire(null, arg, true, true, false, 0L) < 0))throw new InterruptedException();}
通过源码会发现 acquire() 函数确实会响应中断,一旦检测到当前线程被中断后便会抛出 InterruptedException 异常。
定位问题
所以问题的原因基本确定了,就是在 Pulsar 的发送消息线程被中断了导致的,但为啥会被中断还需要继续排查。
我们知道线程中断是需要调用 Thread.currentThread().interrupt(); API的,首先猜测是否 Pulsar 客户端内部有个线程中断了这个发送线程。
于是我在 pulsar-client 这个模块中搜索了相关代码:

排除掉和 producer 不相关的地方,其余所有中断线程的代码都是在有了该异常之后继续传递而已;所以初步来看 pulsar-client 内部没有主动中断的操作。
既然 Pulsar 自己没有做,那就只可能是业务做的了?
于是我在业务代码中搜索了一下:

果然在业务代码中搜到了唯一一处中断的地方,而且通过调用关系得知这段代码是在消息发送前执行的,并且和 Pulsar 发送函数处于同一线程。
大概的伪代码如下:
List.of(1, 2, 3).stream().map(e -> {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException ex) {throw new RuntimeException(ex);}return e;});}).collect(Collectors.toList()).forEach(f -> {try {Integer integer = f.get();log.info("====" + integer);if (integer==3){TimeUnit.SECONDS.sleep(10);Thread.currentThread().interrupt();}} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}});MessageId send = producer.newMessage().value(msg.getBytes()).send();
执行这段代码可以完全复现同样的堆栈。
幸好中断这里还打得有日志:


通过日志搜索发现异常的时间和这个中断的日志时间点完全重合,这样也就知道根本原因了。
因为业务线程和消息发送线程是同一个,在某些情况下会执行 Thread.currentThread().interrupt();,其实单纯执行这行函数并不会发生什么,只要没有去响应这个中断,也就是 Semaphore 源码中的判断了线程中断的标记:
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted() ||(tryAcquireShared(arg) < 0 &&acquire(null, arg, true, true, false, 0L) < 0))throw new InterruptedException();}
但恰好这里业务中断后自己并没有去判断这个标记,导致 Pulsar 内部去判断了,最终抛出了这个异常。
总结
所以归根结底还是这里的代码不合理导致的,首先是自己中断了线程但也没使用,从而导致有被其他基础库使用的可能,所以会造成了一些不可预知的后果。
再一个是不建议在业务代码中使用 Thread.currentThread().interrupt(); 这类代码,第一眼根本不知道是要干啥,也不易维护。
其实本质上线程中断也是线程间通信的一种手段,有这类需求完全可以换为内置的 BlockQueue 这类函数来实现。
相关文章:
一个诡异的 Pulsar InterruptedException 异常
背景 今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。 和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了&…...
Java岗面试题--Java并发(volatile 专题)
目录1. 面试题一:谈谈 volatile 的使用及其原理补充:内存屏障volatile 的原理2. 面试题二:volatile 为什么不能保证原子性3. 面试题三:volatile 的内存语义4. 面试题四:volatile 的实现机制5. 面试题五:vol…...
Java---打家劫舍ⅠⅡ
目录 打家劫舍Ⅰ 题目分析 代码一 代码二 打家劫舍Ⅱ 打家劫舍Ⅰ 你是一个专业的小偷,计划偷窃沿街的房屋。每间房内都藏有一定的现金,影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统,如果两间相邻的房屋在同一晚上被…...
MySQL Lesson4
1:关于查询结果集的去重(distinct) select distinct job from emp; **distinct只能出现在所有字段的最前面。所表示的含有是所有的结果联合起来去重。 select distinct deptno,job from emp order by deptno; select count(distinct job)from…...
浅谈权限获取方法之文件上传
概述 文件上传漏洞是发生在有上传功能的应用中,如果应用程序对用户的上传文件没有控制或者存在缺陷,攻击者可以利用应用上传功能存在的缺陷,上传木马、病毒等有危害的文件到服务器上面,控制服务器。 漏洞成因及危害 文件上传漏…...
资产设备防拆标签安全防护和资产定位解决方案
随着社会经济的发展和高新技术的日新月异,对各方面的安全要求也在不断地提高,以物联网安防、入侵报警和出入口控制、应急系统等为主的安全防范系统日益成为各类文物场所智能化弱电工程不可或缺的组成部分,是重点资产管理场所内加强管理和安全…...
企业电子招标采购源码之电子招标投标全流程!
随着各级政府部门的大力推进,以及国内互联网的建设,电子招投标已经逐渐成为国内主流的招标投标方式,但是依然有很多人对电子招投标的流程不够了解,在具体操作上存在困难。虽然各个交易平台的招标投标在线操作会略有不同࿰…...
【考研408】计算机网络笔记
文章目录计算机网络体系结构计算机网络概述计算机网络的组成计算机网络的功能计算机网络的分类计算机网络的性能指标课后习题计算机网络体系结构与参考模型计算机网络协议、接口、服务的概念ISO/OSI参考模型和TCP/IP模型课后习题物理层通信基础基本概念奈奎斯特定理与香农定理编…...
[C++]继承
🥁作者: 华丞臧 📕专栏:【C】 各位读者老爷如果觉得博主写的不错,请诸位多多支持(点赞收藏关注)。如果有错误的地方,欢迎在评论区指出。 推荐一款刷题网站 👉LeetCode 文章目录一、继承…...
优化知识管理方法丨整理零碎信息,提高数据价值
信息流时代,知识成集合倍数增长,看似我们学习了很多知识,但知识零碎无系统,知识之间缺乏联系,没有深度,所以虽然你很努力,但你发现自己的能力增长特别缓慢,你需要整理知识将零散的知…...
Windows操作系统的体系结构、运行环境和运行状态
我是荔园微风,作为一名在IT界整整25年的老兵,今天我们来重新审视一下Windows这个我们熟悉的不能再熟悉的系统。说Windows操作系统的运行环境和运行状态,首先要介绍一下Windows操作系统的体系结构,然后再要说到最重要的两个概念:核…...
【工作笔记】Http响应头过长
起因 突然有测试小伙伴反馈进公司官网主页会白屏,但只是个例不是普遍现象 查监控发现没监控到异常问题 查了很久(这个很久单指对于线上问题来说)才定位是请求的异常,因为这套系统的异常用的是 ExceptionHandler,这也导…...
hive建分区表,分桶表,内部表,外部表
hive建分区表,分桶表,内部表,外部表 一、概念介绍 Hive是基于Hadoop的一个工具,用来帮助不熟悉 MapReduce的人使用SQL对存储在Hadoop中的大规模数据进行数据提取、转化、加载。Hive数据仓库工具能将结构化的数据文件映射为一张数…...
【分享】灌溉制度设计小程序VB源代码
说明 根据作物需水特性和当地气候、土壤、农业技术及灌水技术等因素制定的灌水方案。主要内容包括灌水次数、灌水时间、灌水定额和灌溉定额。灌溉制度是规划、设计灌溉工程和进行灌区运行管理的基本资料,是编制和执行灌区用水计划的重要依据。 1—计划湿润土层允…...
PR9268/300-000库存现货振动传感器 雄霸工控
PR9268/300-000库存现货振动传感器 雄霸工控PR9268/300-000库存现货振动传感器 雄霸工控SDM010PR9670/110-100PR9670/010-100PR9670/003-000PR9670/002-000PR9670/001-000PR9670/000-000PR9600/014-000PR9600/011-000PR9376/010-021PR9376/010-011PR9376/010-011PR9376/010-001…...
浅谈模型评估选择及重要性
作者:王同学 来源:投稿 编辑:学姐 模型评估作为机器学习领域一项不可分割的部分,却常常被大家忽略,其实在机器学习领域中重要的不仅仅是模型结构和参数量,对模型的评估也是至关重要的,只有选择那…...
多线程的初识和创建
✨个人主页:bit me👇 ✨当前专栏:Java EE初阶👇 ✨每日一语:知不足而奋进,望远山而前行。 目 录💤一. 认识线程(Thread)🍎1. 线程的引入🍏2. 线程…...
一句话设计模式3:工厂模式
工厂模式:new多种对象的简单方式。 文章目录 工厂模式:new多种对象的简单方式。前言一、两种工厂模式二、如何实现工厂模式1. 简单工厂2. 抽象工厂总结前言 工厂模式可以说比较常见的设计模式,仔细观察在很多源码中都有此种模式的应用;用来解决创建对象的创建问题; 一、两种工…...
【Codeforces Round #853 (Div. 2)】C. Serval and Toxel‘s Arrays【题解】
题目 Toxel likes arrays. Before traveling to the Paldea region, Serval gave him an array aaa as a gift. This array has nnn pairwise distinct elements. In order to get more arrays, Toxel performed mmm operations with the initial array. In the iii-th opera…...
100天精通Python(数据可视化篇)——第77天:数据可视化入门基础大全(万字总结+含常用图表动图展示)
文章目录1. 什么是数据可视化?2. 为什么会用数据可视化?3. 数据可视化的好处?4. 如何使用数据可视化?5. Python数据可视化常用工具1)Matplotlib绘图2)Seaborn绘图3)Bokeh绘图6. 常用图表介绍及其…...
【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
是否存在路径(FIFOBB算法)
题目描述 一个具有 n 个顶点e条边的无向图,该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序,确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数,分别表示n 和 e 的值(1…...
【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...
Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
Python 实现 Web 静态服务器(HTTP 协议)
目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1)下载安装包2)配置环境变量3)安装镜像4)node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1)使用 http-server2)详解 …...
Python竞赛环境搭建全攻略
Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型(算法、数据分析、机器学习等)不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...
