当前位置: 首页 > news >正文

kafka消费堆积问题探索

背景

我们的商城项目用PHP写的,原本写日志方案用的是PHP的方案,但是,这个方案导致资源消耗一直降不下来,使用了20个CPU。后面考虑使用通过kafka的方案写日志,商城中把产生的日志丢到kafka中,在以go写的项目中消费kafka中的日志,并打印到控制台,最后,统一使用阿里sls抓取日志。我们kafka的分区有12个,go程序部署在k8s集群中,开启了弹性扩缩容,最多开启了8个pod进行消费,每秒产生的日志数量高峰在1500条左右,在这种情况下,依然产生了消息的堆积。

消费中执行的逻辑只有对象的映射和日志写控制台,所以,这种情况下产生了消息堆积,令我倍感困惑。

探索之路

第一步,确认一下每一步的执行时间。

func (s KafkaLogService) ReaderCreateLog(ctx context.Context, msg *customerkafka.CustomKafkaMsg) error {now := time.Now().UnixNano()var logEntry LogEntrydata, ok := msg.Data.(string)if !ok {global.GIN_LOG.Error(ctx, "消息数据类型错误", "data", msg.Data)return fmt.Errorf("消息数据类型错误: %v", msg.Data)}// 解析 JSON 数据if err := json.Unmarshal([]byte(data), &logEntry); err != nil {global.GIN_LOG.Error(ctx, "解析 JSON 数据失败:", "error", err, "message", data)return fmt.Errorf("解析消息失败: %w", err)}now2 := time.Now().UnixNano()fmt.Printf("*******************分隔符*******************Unmarshal logEntry 耗时:%d 纳秒\n", now2-now)var logMessage LogMessageif err := json.Unmarshal([]byte(logEntry.Message), &logMessage); err != nil {global.GIN_LOG.Error(ctx, "解析 JSON 数据失败:", "error", err, "message", logEntry.Message)return fmt.Errorf("解析消息失败: %w", err)}now3 := time.Now().UnixNano()fmt.Printf("*******************分隔符*******************Unmarshal LogMessage 耗时:%d 纳秒\n", now3-now2)// 日志等级判断switch logEntry.Level {case "ERROR":global.GIN_LOG.Error(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))case "WARN":global.GIN_LOG.Warn(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))case "INFO":global.GIN_LOG.Info(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))default:global.GIN_LOG.Warn(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))}now4 := time.Now().UnixNano()fmt.Printf("*******************分隔符*******************log 耗时:%d 纳秒\n", now4-now3)return nil
}

 

 json的Unmarshal的耗时,倒是符合我的认知,在预期之中。

但是,打印日志尽然需要耗时1.5毫秒,这个有点超出我的意料之外。这个时间似乎有点夸张啊。

但是,即便如此,一个消费者每秒也可以消费670条左右的消息,在起了8个实例的情况下,也不应该造成kafka消息的阻塞。

继续我们的探索之路

下面这一段是我对于kakfa消费者的封装。大概的逻辑就是每一个ActionType起一个协程进行消费。在这篇《基于kafka-go写的生产者和消费者》文章中写过这个封装背后的设计逻辑,有兴趣的可以移步过去一探究竟。

// Start 方法启动消费者并开始读取消息,根据actionType调用不同的处理函数
func (c *ConsumerClient) Start(ctx context.Context, handlers map[string]ActionHandler) error {for {select {case <-ctx.Done():return nil // 上下文取消,直接返回default:msg, err := c.reader.ReadMessage(ctx)if err != nil {c.logger.Error(ctx, "Failed to read message from Kafka", "error", err)continue}c.logger.Info(ctx, fmt.Sprintf("Message on topic: %s value: %s partion:%d offset:%d", msg.Topic, string(msg.Value), msg.Partition, msg.Offset))var kafkaMsg CustomKafkaMsgif err := json.Unmarshal(msg.Value, &kafkaMsg); err != nil {c.logger.Error(ctx, "Failed to unmarshal Kafka message", "error", err)continue}channel := make(chan *CustomKafkaMsg)// 使用 sync.Map 来管理 workerworker, loaded := c.workerMap.LoadOrStore(kafkaMsg.ActionType, channel)if !loaded {c.wg.Add(1) // 增加 WaitGroup 计数if ch, ok := worker.(chan *CustomKafkaMsg); ok {go c.startWorker(ctx, kafkaMsg.ActionType, handlers, ch)}}// 发送消息到对应的通道,避免阻塞其他消息消费// 只有在 handlers 中存在对应的 actionType 时才发送消息到对应的通道if _, ok := handlers[kafkaMsg.ActionType]; ok {if ch, ok := worker.(chan *CustomKafkaMsg); ok {ch <- &kafkaMsg}}}}
}

 由于我的这个写法,让我产生了一点担忧,虽然,我想的是每个ActionType只起一个协程进行消费,难道,实际情况并不是如我预期一样运行,而是,一条kafka消息就起了一个协程进行消费,如果是这种情况的话,那么,会导致大量的垃圾回收,程序的性能就会下降,那么,消息阻塞的问题也就可以解释了。

为了,验证我的这一想法,基于pprof工具看一下实际情况。

实际验证,排除我的担忧,符合我的预期,不是有一条kafka消息就开一个协程进行消费,而是,一个ActionType就只有一个协程进行消费。 

模拟生产环境测试

上述的探索,依然不能够完美解释文章开头提到的现象,起了8个消费者,依然导致消息堆积的现象。为了进一步探究其背后的原因,我模拟生产环境的状态,每秒钟往kafka中丢了1000条消息,再观察,我发现,在这种情况下,有时json.Unmarshal也有比较长的耗时,会出现1.5毫秒的耗时,另外,而写日志需要5毫秒左右,如此,每秒只能消费140条消息,消息堆积的现象也就能够解释了。

结论 

消息堆积的主要原因是日志打印操作耗时较长,最差时每秒只能消费140条消息。此外,有时JSON解析的时间也较长,这也是一个需要关注的问题。

接下来的目标是找出JSON解析耗时较长和日志打印慢的具体原因,并进行优化。通过解决这些问题,我们有望提高日志处理的效率,从而解决消息堆积的问题。

相关文章:

kafka消费堆积问题探索

背景 我们的商城项目用PHP写的&#xff0c;原本写日志方案用的是PHP的方案&#xff0c;但是&#xff0c;这个方案导致资源消耗一直降不下来&#xff0c;使用了20个CPU。后面考虑使用通过kafka的方案写日志&#xff0c;商城中把产生的日志丢到kafka中&#xff0c;在以go写的项目…...

Vue.js 使用插槽(Slots)优化组件结构

Vue.js 使用插槽&#xff08;Slots&#xff09;优化组件结构 今天我们聊聊 Vue.js 的一个超实用功能——插槽&#xff08;Slots&#xff09;。插槽是 Vue 组件开发中的神器&#xff0c;用好它&#xff0c;你可以让组件变得更灵活、更可复用&#xff0c;还能写出优雅的代码结构…...

Broker如何进行定时心跳发送和故障感知

1.前言 此文章是在儒猿课程中的学习笔记&#xff0c;感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》&#xff0c;我本人觉得这个作者还是不错&#xff0c;都是从场景来进行分析&#xff0c;感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔…...

网络安全设备主要有什么

网络安全设备指的肯定是硬件设备了&#xff0c;国内卖安全硬件的没几家&#xff0c;天融信&#xff0c;启明星辰&#xff0c;绿盟&#xff0c;深信服&#xff0c;就这四家卖的比较齐全吧&#xff0c;上它们官网看一下&#xff0c;就知道市面上主要的网络安全设备有哪些了。分类…...

Android Framework WMS全面概述和知识要点

一、概述 定义与作用 在 Android 系统中&#xff0c;WindowManagerService&#xff08;WMS&#xff09;就像是一个大管家&#xff0c;负责管理整个系统的窗口界面。它是 Android Framework 的核心组件之一&#xff0c;处于 system_server 进程内&#xff0c;在 Framework 层占…...

记一次某红蓝演练经历

在某天接到任务&#xff0c;对xxx进行一次红蓝演练&#xff0c;于是把自己渗透过程给记录下来&#xff0c;漏洞关键地方也会打码&#xff0c;希望各位大佬理解&#xff0c;菜鸡一枚&#xff0c;勿喷/(ㄒoㄒ)/~~ 概述 拿到目标域名第一件事就是信息收集&#xff0c;曾经一位大…...

一个运行在浏览器中的开源Web操作系统Puter本地部署与远程访问

文章目录 前言1.关于Puter2.本地部署Puter3.Puter简单使用4. 安装内网穿透5.配置puter公网地址6. 配置固定公网地址 &#x1f4a1; 推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击跳转到网站…...

【零基础入门Go语言】struct 和 interface:Go语言是如何实现继承的?

提到面向对象编程中的继承&#xff0c;许多人脑海中可能会浮现出 Java、C 等语言中那一套熟悉的类继承体系。然而&#xff0c;Go 语言作为一门别具一格的编程语言&#xff0c;并没有遵循传统的继承模式。那么&#xff0c;在 Go 语言的世界里&#xff0c;它是怎样实现类似于继承…...

麦田物语学习笔记:实现拖拽物品交换数据和在地图上生成物品

基本流程 1.代码思路 (1)InventoryUI的PlayerSlots与PlayerBag里一一对应,所以想要实现交换数据实际上是,先拿到被拖拽的物体所对的Slot的序号和目标的Slot序号,然后将这两个序号对调一下 (2)物品交换的数据逻辑应该在InventoryManager里去调用,因为InventoryManager里管理了p…...

一些计算机零碎知识随写(25年1月)-1

我原以为世界上有技术的那批人不会那么闲&#xff0c;我错了&#xff0c;被脚本真实了。 今天正隔着画画呢&#xff0c;手机突然弹出几条安全告警通知。 急忙打开服务器&#xff0c;发现问题不简单&#xff0c;直接关服务器重装系统..... 首先&#xff0c;不要认为小网站&…...

Qt学习笔记第81到90讲

第81讲 串口调试助手实现自动发送 为这个名叫“定时发送”的QCheckBox编写槽函数。 想要做出定时发送的效果&#xff0c;必须引入QT框架下的毫秒级定时器QTimer&#xff0c;查阅手册了解详情。 在widget.h内添加新的私有成员变量&#xff1a; QTimer *timer; 在widget类的构造…...

Centos9 + Docker 安装 MySQL8.4.0 + 定时备份数据库到本地

Centos9 Docker 安装 MySQL8.4.0 定时备份数据库到本地 创建目录&#xff0c;创建配置文件启动容器命令定时备份MySQL执行脚本Linux每日定时任务命令文件内参数其他时间参数 AT一次性定时任务 创建目录&#xff0c;创建配置文件 $ mkdir -p /opt/mysql/conf$ vim /opt/mysql/…...

网络原理一>UDP协议详解

UDP和TCP都是应用层中的重要协议&#xff0c;如果做基础架构开发&#xff0c;会用得多一些。 这一篇我们先简单聊一下的UDP TCP格式呈现&#xff1a; 我们知道UDP是一种无连接&#xff0c;面向数据报&#xff0c;全双工&#xff0c;不可靠传输特性的网络协议。 基本格式如图…...

MySQL的小问题

编码问题 不管官方使用什么编码&#xff1a;latin1、gbk、utf8、utfmb4。统一使用utfmb4 MySQL中的utf8并不是utf-8&#xff0c;它省略了一个字节&#xff0c;只是用三个字节存储所有的符号&#xff0c;utfmb4才是utf-8 远程登录问题&#xff1a; MySQL官方默认没有启动远程…...

Mac——Docker desktop安装与使用教程

摘要 本文是一篇关于Mac系统下Docker Desktop安装与使用教程的博文。首先介绍连接WiFi网络&#xff0c;然后详细阐述了如何在Mac上安装Docker&#xff0c;包括下载地址以及不同芯片版本的选择。接着讲解了如何下载基础镜像和指定版本镜像&#xff0c;旨在帮助用户在Mac上高效使…...

FastApi Swagger 序列化问题

问题 错误现象&#xff1a; fastapi的 swagger 界面无法正常打开控制台报错&#xff1a;raise PydanticInvalidForJsonSchema(fCannot generate a JsonSchema for {error_info}) 详细报错&#xff1a; File "d:\Envs\miniconda3\envs\xdagent\lib\site-packages\pydan…...

《机器学习》——sklearn库中CountVectorizer方法(词频矩阵)

CountVectorizer方法介绍 CountVectorizer 是 scikit-learn 库中的一个工具&#xff0c;它主要用于将文本数据转换为词频矩阵&#xff0c;而不是传统意义上的词向量转换&#xff0c;但可以作为词向量转换的一种基础形式。用于将文本数据转换为词频矩阵&#xff0c;它是文本特征…...

UML系列之Rational Rose笔记三:活动图(泳道图)

一、新建活动图&#xff08;泳道图&#xff09; 依旧在用例视图里面&#xff0c;新建一个activity diagram&#xff1b;新建好之后&#xff0c;就可以绘制活动图了&#xff1a; 正常每个活动需要一个开始&#xff0c;点击黑点&#xff0c;然后在图中某个位置安放&#xff0c;接…...

Java面向对象面经总结

目录 面向对象基础 面向对象与面向过程的区别 创建一个对象用什么运算符&#xff0c;对象实体与对象引用的区别 对象相等和引用相等的区别 构造方法的特点&#xff0c;是否可被重写&#xff1f; 面向对象三大特征 封装 继承 多态 接口和抽象类的共同点和区别 深拷贝…...

红队工具使用全解析:揭开网络安全神秘面纱一角

红队工具使用全解析&#xff1a;揭开网络安全神秘面纱一角 B站红队公益课&#xff1a;https://space.bilibili.com/350329294 学习网盘资源链接&#xff1a;https://pan.quark.cn/s/4079487939e8 嘿&#xff0c;各位网络安全爱好者们&#xff01;在风云变幻的网络安全战场上&am…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

2.Vue编写一个app

1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

高等数学(下)题型笔记(八)空间解析几何与向量代数

目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

C++.OpenGL (10/64)基础光照(Basic Lighting)

基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂度…...

#Uniapp篇:chrome调试unapp适配

chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器&#xff1a;Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

R 语言科研绘图第 55 期 --- 网络图-聚类

在发表科研论文的过程中&#xff0c;科研绘图是必不可少的&#xff0c;一张好看的图形会是文章很大的加分项。 为了便于使用&#xff0c;本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中&#xff0c;获取方式&#xff1a; R 语言科研绘图模板 --- sciRplothttps://mp.…...

elementUI点击浏览table所选行数据查看文档

项目场景&#xff1a; table按照要求特定的数据变成按钮可以点击 解决方案&#xff1a; <el-table-columnprop"mlname"label"名称"align"center"width"180"><template slot-scope"scope"><el-buttonv-if&qu…...