当前位置: 首页 > news >正文

记一次Kafka warning排查过程

1、前因

在配合测试某个需求的时候,正好看到控制台打印了个报错,如下:

2023-03-06 17:05:58,565[325651ms][pool-28-thread-1][org.apache.kafka.common.utils.AppInfoParser][WARN] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:406)at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:392)at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:463)at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:401)at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:216)

很明显是Kafka在创建Producer实例的时候重复了,正好趁着有空排查排查,不然谁知道后面会因为这个导致什么问题。

2、BUG定位

根据堆栈信息,找到与Kafka有关的报错代码,进到类 AppInfoParser 的 registerAppInfo方法中,代码如下:

public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics, long nowMs) {try {ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + Sanitizer.jmxSanitize(id));AppInfo mBean = new AppInfo(nowMs);ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);registerMetrics(metrics, mBean); // prefix will be added later by JmxReporter} catch (JMException e) {log.warn("Error registering AppInfo mbean", e);}
}

从方法名可以推测,应当是 Kafka 在创建 Producer 实例时,会按 Producer 的 id 构造一个 AppInfo,并注册到一个公共的类似Map的东西中,而我们的代码创建了多个实例,并且 id 重复了,基于这个猜测来看Kafka的配置文件(已脱敏):

<!-- 定义producer1的参数 -->
<bean id="producerProperties1" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="localhost:9092"/><entry key="retries" value="3"/><entry key="batch.size" value="4096"/><entry key="linger.ms" value="10"/><entry key="buffer.memory" value="40960"/><entry key="acks" value="all"/><entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/><entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/></map></constructor-arg>
</bean><!-- 定义producer2的参数 -->
<bean id="producerProperties2" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="localhost:9092"/><entry key="retries" value="3"/><entry key="batch.size" value="4096"/><entry key="linger.ms" value="10"/><entry key="buffer.memory" value="40960"/><entry key="acks" value="all"/><entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/><entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/></map></constructor-arg>
</bean>

可以看到项目中配置了两个 Kafka 的 Producer,并且都未指定 Producer 的 id,符合我们的猜测,那么我们要怎么修复,如果我们指定了 id,Producer 在多线程的情况下,每个线程的 id 是否又会重复。
基于几个问题,进到类 KafkaProducer 的构造方法中,来看 AppInfoParser.registerAppInfo() 方法调用语句:

AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());

可以看到前面说的 Producer 的 id 实际上是 clientId,往前找到 clientId 的赋值语句:

this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId);

进到 buildClientId 里面:

private static String buildClientId(String configuredClientId, String transactionalId) {if (!configuredClientId.isEmpty())return configuredClientId;if (transactionalId != null)return "producer-" + transactionalId;return "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
}

可知如果 configuredClientId 和 transactionalId 都为空,那么clientId就会自动生成,继续往上追溯,来看 transactionalId 的赋值语句:

String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;

其中 ProducerConfig.TRANSACTIONAL_ID_CONFIG 值为 transactional.id,可见 transactionalId 的值取得是用户配置(userProvidedConfigs)中的 transactional.id 的值,而 configuredClientId 值并不是直接获取的用户配置(userProvidedConfigs)的 client.id,而是拿的构造方法中传入的config中的 client.id 对应的值,说明 config 很有可能是在用户配置(userProvidedConfigs)的基础上进行了些许处理。
继续往上追溯,进到 DefaultKafkaProducerFactory.createKafkaProducer 方法中:

protected Producer<K, V> createKafkaProducer() {if (this.clientIdPrefix == null) {return new KafkaProducer<>(this.configs, this.keySerializerSupplier.get(),this.valueSerializerSupplier.get());}else {Map<String, Object> newConfigs = new HashMap<>(this.configs);newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());return new KafkaProducer<>(newConfigs, this.keySerializerSupplier.get(),this.valueSerializerSupplier.get());}
}

可以看到如果 clientIdPrefix 不为空的情况下,会在 config 中放入 client.id 的键值对,很明显这种情况下不会有我们所说的 clientId 重复的情况发生,因此我们只需要保证 clientIdPrefix 不为空即可。在 DefaultKafkaProducerFactory 构造方法中找到 clientIdPrefix 的赋值语句:

if (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
}

其中 ProducerConfig.CLIENT_ID_CONFIG 值为 client.id,所以只需要在用户配置中添加 client.id 的值,那么 KafkaProducer 在创建时,就会在自动生成的 clientId 中添加前缀字符串,从而避免不同的 KafkaProducer 的 id 冲突。

3、BUG修复

将上述Kafka配置文件修改如下:

<!-- 定义producer1的参数 -->
<bean id="producerProperties1" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="localhost:9092"/><entry key="client.id" value="a"/><entry key="retries" value="3"/><entry key="batch.size" value="4096"/><entry key="linger.ms" value="10"/><entry key="buffer.memory" value="40960"/><entry key="acks" value="all"/><entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/><entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/></map></constructor-arg>
</bean><!-- 定义producer2的参数 -->
<bean id="producerProperties2" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="localhost:9092"/><entry key="client.id" value="b"/><entry key="retries" value="3"/><entry key="batch.size" value="4096"/><entry key="linger.ms" value="10"/><entry key="buffer.memory" value="40960"/><entry key="acks" value="all"/><entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/><entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/></map></constructor-arg>
</bean>

相关文章:

记一次Kafka warning排查过程

1、前因 在配合测试某个需求的时候&#xff0c;正好看到控制台打印了个报错&#xff0c;如下&#xff1a; 2023-03-06 17:05:58,565[325651ms][pool-28-thread-1][org.apache.kafka.common.utils.AppInfoParser][WARN] - Error registering AppInfo mbean javax.management.I…...

MySQL学习笔记(6.视图)

1. 视图作用 (1). 简化业务&#xff0c;将多个复杂条件&#xff0c;改为视图 (2). mysql对用户授权&#xff0c;只能控制表权限&#xff0c;通过视图可以控制用户字段权限。 (3). 可以避免基本表变更&#xff0c;影响业务。只需更改视图即可。 2. 视图&#xff08;创建&…...

java多线程与线程池-01多线程知识复习

多线程知识复习 文章目录 多线程知识复习第1章 多线程基础1.1.2 线程与进程的关系1.2 多线程启动1.2.1 线程标识1.2.2 Thread与Runnable1.2.3 run()与start()1.2.4 Thread源码分析1.3 线程状态1.3.1 NEW状态1.3.2 RUNNABLE状态1.3.3 BLOCKED状态1.3.4 WAITING状态1…...

Typescript - 将命名空间A导入另一个命名空间B作为B的子命名空间,并全局暴露命名空间B

前言 最近相统一管理 ts 中的类型声明&#xff0c;这就需要将各模块下的命名空间整合到全局的命名空间下&#xff0c;牵涉到从别的文件中引入命名空间并作为子命名空间在全局命名空间中统一暴露。 将命名空间A导入另一个命名空间B作为B的子命名空间 文件说明 assets.ts 文件中…...

Windows下实现Linux内核的Python开发(WSL2+Conda+Pycharm)

许多软件可以通过Python交互&#xff0c;但没有开发Windows版本&#xff0c;这个时候装双系统或虚拟机都很不方便&#xff0c;可以采取WSL2CondaPycharm的策略来进行基于Linux内核的Python开发。启动WSL2&#xff0c;安装Linux内核教程&#xff1a;旧版 WSL 的手动安装步骤 | M…...

新闻发布网站分析及适用场景

在当今数字时代&#xff0c;发布新闻的渠道已经不再局限于传统媒体&#xff0c;越来越多的企业、组织和个人开始使用互联网平台发布新闻稿&#xff0c;以提升品牌知名度和影响力。本文将介绍一些可以发布新闻的网站&#xff0c;并分析其特点和适用场景。一、新闻稿发布平台1.新…...

云原生时代顶流消息中间件Apache Pulsar部署实操之Pulsar IO与Pulsar SQL

文章目录Pulsar IO (Connector连接器)基础定义安装Pulsar和内置连接器连接Pulsar到Cassandra安装cassandra集群配置Cassandra接收器创建Cassandra Sink验证Cassandra Sink结果删除Cassandra Sink连接Pulsar到PostgreSQL安装PostgreSQL集群配置JDBC接收器创建JDBC Sink验证JDBC …...

Input子系统(一)启动篇

代码路径 基于AndroidS&#xff08;12.0&#xff09;代码 system/core/libutils/Threads.cppframeworks/base/services- java/com/android/server/SystemServer.java- core- java/com/android/server/input/InputManagerService.java- jni/com_android_server_input_InputMan…...

WuThreat身份安全云-TVD每日漏洞情报-2023-03-08

漏洞名称:Agilebio Lab Collector 远程命令执行 漏洞级别:高危 漏洞编号:CVE-2023-24217,CNNVD-202303-375 相关涉及:Agilebio Lab Collector 4.234 漏洞状态:EXP 参考链接:https://tvd.wuthreat.com/#/listDetail?TVD_IDTVD-2023-05536 漏洞名称:PrestaShop “Xen Forum”模…...

ABP IStringLocalizer部分场景不生效的问题

问题描述&#xff1a; 本地项目依赖注入本地化服务时候生效&#xff0c;第三方项目调用本地接口时候出现本地化失效的问题。 解决方案&#xff1a; 第三方服务封装的 GetHttp 请求的请求头中添加 语言相关信息 request.Headers.Add("accept-language", "zh-C…...

数组(四)-- LC[167] 两数之和-有序数组

1 两数之和 1.1 题目描述 题目链接&#xff1a;https://leetcode.cn/problems/two-sum/description/ 1.2 求解思路 1. 暴力枚举 最容易想到的方法是枚举数组中的每一个数 x&#xff0c;寻找数组中是否存在 target - x 参考代码 class Solution(object):def twoSum(self, n…...

Mac电脑,python+appium+安卓模拟器使用步骤

1、第一步&#xff0c;环境搭建&#xff0c;参考这位博主的文章&#xff0c;很齐全 https://blog.csdn.net/qq_44757414/article/details/128142859 我在最后一步安装appium-doctor的时候&#xff0c;提示权限不足&#xff0c;换成sudo appium-doctor即可 2、第二步&#xff0…...

Linux命令·find进阶

find是我们很常用的一个Linux命令&#xff0c;但是我们一般查找出来的并不仅仅是看看而已&#xff0c;还会有进一步的操作&#xff0c;这个时候exec的作用就显现出来了。 exec解释&#xff1a;-exec 参数后面跟的是command命令&#xff0c;它的终止是以;为结束标志的&#xff0…...

R语言ggplot2 | 用百分比格式表示数值

&#x1f4cb;文章目录Percent() 函数介绍例子1&#xff0c;在向量中格式化百分比&#xff1a;例子2&#xff0c;格式化数据框列中的百分比&#xff1a;例子3&#xff0c;格式化多个数据框列中的百分比&#xff1a;如何使用percent()函数在绘图过程展示通常在绘图时&#xff0c…...

【代码训练营】day53 | 1143.最长公共子序列 1035.不相交的线 53. 最大子序和

所用代码 java 最长公告子序列 LeetCode 1143 题目链接&#xff1a;最长公告子序列 LeetCode 1143 - 中等 思路 这个相等于上一题的不连续状态 dp[i] [j]&#xff1a;以[0, i-1]text1和以[0, j-1]text2 的最长公共子序列的长度为dp[i] [j]递推公式&#xff1a; 相同&#x…...

消息队列理解

为什么使用消息队列 使⽤消息队列主要是为了&#xff1a; 减少响应所需时间和削峰。降低系统耦合性&#xff08;解耦/提升系统可扩展性&#xff09;。 当我们不使⽤消息队列的时候&#xff0c;所有的⽤户的请求会直接落到服务器&#xff0c;然后通过数据库或者 缓存响应。假…...

【Linux内核一】在Linux系统下网口数据收发包的具体流向是什么?

在TCP/IP网络分层模型里&#xff0c;整个协议栈被分成了物理层、链路层、网络层&#xff0c;传输层和应用层。物理层对应的是网卡和网线&#xff0c;应用层对应的是我们常见的Nginx&#xff0c;FTP等等各种应用。Linux实现的是链路层、网络层和传输层这三层。 在Linux内核实现中…...

南京、西安集成电路企业和高校分布一览(附产业链主要厂商及高校名录)

前言 3月2日&#xff0c;国务院副总理刘鹤在北京调研集成电路企业发展&#xff0c;并主持召开座谈会。刘鹤指出&#xff0c;集成电路是现代化产业体系的核心枢纽&#xff0c;关系国家安全和中国式现代化进程。他表示&#xff0c;我国已形成较完整的集成电路产业链&#xff0c;也…...

后端Java随机比大小游戏实战讲解

## - 利用print打印输出提示用户 ## - 利用Scanner函数抓取数据 ## - 利用Math方法实现随机数 #### 1.首先用到的是print函数&#xff0c;对用户进行提醒进一步的操作 通过System.out.print();提示用户进行选择买大买小。 #### 2.然后利用Scanner函数&#xff0c;对用户输出…...

dolphinschedule使用shell任务结束状态研究

背景&#xff1a;配置的dolphin任务&#xff0c;使用的是shell&#xff0c;shell里包含了spark-submit 如下截图。 dolphin shell 介绍完毕&#xff0c;开始说明现象。 有天有人调整了集群的cdp配置&#xff0c;executor-cores max1 我之前这里写的是2&#xff0c;所以spark任…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中&#xff0c;具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类&#xff1a; 身份验证机制&#xff1a;直接将未经授权的爬虫阻挡在外反爬技术体系&#xff1a;通过各种技术手段增加爬虫获取数据的难度…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...

基于SpringBoot在线拍卖系统的设计和实现

摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统&#xff0c;主要的模块包括管理员&#xff1b;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...

02.运算符

目录 什么是运算符 算术运算符 1.基本四则运算符 2.增量运算符 3.自增/自减运算符 关系运算符 逻辑运算符 &&&#xff1a;逻辑与 ||&#xff1a;逻辑或 &#xff01;&#xff1a;逻辑非 短路求值 位运算符 按位与&&#xff1a; 按位或 | 按位取反~ …...

Linux-进程间的通信

1、IPC&#xff1a; Inter Process Communication&#xff08;进程间通信&#xff09;&#xff1a; 由于每个进程在操作系统中有独立的地址空间&#xff0c;它们不能像线程那样直接访问彼此的内存&#xff0c;所以必须通过某种方式进行通信。 常见的 IPC 方式包括&#…...

深度解析云存储:概念、架构与应用实践

在数据爆炸式增长的时代&#xff0c;传统本地存储因容量限制、管理复杂等问题&#xff0c;已难以满足企业和个人的需求。云存储凭借灵活扩展、便捷访问等特性&#xff0c;成为数据存储领域的主流解决方案。从个人照片备份到企业核心数据管理&#xff0c;云存储正重塑数据存储与…...

二叉树-144.二叉树的前序遍历-力扣(LeetCode)

一、题目解析 对于递归方法的前序遍历十分简单&#xff0c;但对于一位合格的程序猿而言&#xff0c;需要掌握将递归转化为非递归的能力&#xff0c;毕竟递归调用的时候会调用大量的栈帧&#xff0c;存在栈溢出风险。 二、算法原理 递归调用本质是系统建立栈帧&#xff0c;而非…...

Cursor AI 账号纯净度维护与高效注册指南

Cursor AI 账号纯净度维护与高效注册指南&#xff1a;解决限制问题的实战方案 风车无限免费邮箱系统网页端使用说明|快速获取邮箱|cursor|windsurf|augment 问题背景 在成功解决 Cursor 环境配置问题后&#xff0c;许多开发者仍面临账号纯净度不足导致的限制问题。无论使用 16…...