当前位置: 首页 > news >正文

使用golang实现日志收集系统的logagent

整体架构

参考 七米老师的日志收集项目
在这里插入图片描述
主要用go实现logagent的部分,logagent的作用主要是实时监控日志追加的变化,并将变化发送到kafka中。
之前我们已经实现了 用go连接kafka并向其中发送数据,也实现了使用tail库监控日志追加操作。
我们把这两部分结合起来实现监控日志追加并发送到kafka。

使用github.com/go-ini/ini配置参数

// 读取配置参数cfg, err:=ini.Load("config/config.ini")if err!=nil {logrus.Error((" load config error"))return}
[kafka]
address = 127.0.0.1:9092
chan_size = 1000[collect]
logfile_path= D:/learn/go/log-collector-lmh/log_agent/config_version/log_file/xx.log

配置参数主要包括,kafka的启动端口,存储的数据大小限制,日志文件的路径。

初始化kafka

kafka.go

package kafkaimport ("github.com/Shopify/sarama""github.com/sirupsen/logrus"
)var (Client sarama.SyncProducerMsgChan chan *sarama.ProducerMessage //占用的字节数少,传递的指针
)func InitKafka(kafkaAddr string, chanSize int64) (err error){config:=sarama.NewConfig()// 生产者配置config.Producer.RequiredAcks=sarama.WaitForAllconfig.Producer.Partitioner=sarama.NewRandomPartitionerconfig.Producer.Return.Successes=true// 连接kafkaClient,err=sarama.NewSyncProducer([]string{kafkaAddr}, config)if err!=nil {logrus.Error("producer closed", err)return}// 从管道中读取日志并发送到kafkaMsgChan = make(chan *sarama.ProducerMessage, chanSize)go sendMsg()return
}func sendMsg(){for {select {case msg := <- MsgChan:pid, offset, err := Client.SendMessage(msg)if err != nil {logrus.Warning("send msg failed, err:", err)return}logrus.Infof("send msg to kafka success. pid:%v offset:%v", pid, offset)}}
}

这里实现了连接kafka,并使用协程不断地读取MsgChan,读取到数据后向kafka发送,这里MsgChan通道的数据由tail监控到的日志变化写入。
main.go中调用

// 初始化kafkakafkaAddr:=cfg.Section("kafka").Key("address").String()chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)err=kafka.InitKafka(kafkaAddr, chanSize)if err!=nil {logrus.Error("kafka init failed")}logrus.Info("Kafka init success")

初始化tailf,并将日志数据写入ChanMsg

tailF.go

package tailF
import ("github.com/hpcloud/tail""fmt"
)
var (TailObj *tail.Tail
)
func InitTail(filename string) (err error) {config := tail.Config{ReOpen: true,Follow: true,Location: &tail.SeekInfo{Offset: 0, Whence: 2},MustExist: false,Poll: true,}// 打开文件开始读取数据TailObj, err =  tail.TailFile(filename, config)if err != nil {fmt.Printf("create tail %s failed, err:%v\n", filename, err)return}return
}

main.go中对应

// 初始化tailffileName:=cfg.Section("collect").Key("logfile_path").String()err=tailF.InitTail(fileName)if err!=nil {logrus.Error(" tailf init failed")}logrus.Info("Init tail success")// 把读取的日志发往kafkaerr=run()if err!=nil {logrus.Error(" run error%s", err)return}logrus.Info("run success")

main.go中实现的run函数,读取tailF的数据,并写入ChanMsg

func run () (err error){for {line,ok:=<-tailF.TailObj.Linesif !ok {logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)// 读取出错等一秒time.Sleep(time.Second)continue}// 使用通道将传输日志改为异步// 读取的日志封装为ProducerMessagemsg:=&sarama.ProducerMessage{}msg.Topic="web_log"msg.Value=sarama.StringEncoder(line.Text)// 放到channel中kafka.MsgChan<-msg}
}

完整main.go

package mainimport ("config_version/kafka""config_version/tailF""time""github.com/Shopify/sarama""github.com/go-ini/ini""github.com/sirupsen/logrus"
)func main() {// 读取配置参数cfg, err:=ini.Load("config/config.ini")if err!=nil {logrus.Error((" load config error"))return}// 初始化kafkakafkaAddr:=cfg.Section("kafka").Key("address").String()chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)err=kafka.InitKafka(kafkaAddr, chanSize)if err!=nil {logrus.Error("kafka init failed")}logrus.Info("Kafka init success")// 初始化tailffileName:=cfg.Section("collect").Key("logfile_path").String()err=tailF.InitTail(fileName)if err!=nil {logrus.Error(" tailf init failed")}logrus.Info("Init tail success")// 把读取的日志发往kafkaerr=run()if err!=nil {logrus.Error(" run error%s", err)return}logrus.Info("run success")}func run () (err error){for {line,ok:=<-tailF.TailObj.Linesif !ok {logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)// 读取出错等一秒time.Sleep(time.Second)continue}// 使用通道将传输日志改为异步// 读取的日志封装为ProducerMessagemsg:=&sarama.ProducerMessage{}msg.Topic="web_log"msg.Value=sarama.StringEncoder(line.Text)// 放到channel中kafka.MsgChan<-msg}
}

至此, 我们实现了简化版的日志收集系统的logagent功能,目前日志的路径还需要手动写入配置文件中,修改的话还需重启项目,之后可以使用ETCD实现日志路径的自动配置。

相关文章:

使用golang实现日志收集系统的logagent

整体架构 参考 七米老师的日志收集项目 主要用go实现logagent的部分&#xff0c;logagent的作用主要是实时监控日志追加的变化&#xff0c;并将变化发送到kafka中。 之前我们已经实现了 用go连接kafka并向其中发送数据&#xff0c;也实现了使用tail库监控日志追加操作。 我们…...

小红书点赞不显示怎么回事?小红书笔记评论被吞怎么办

小红书作为一个互联网产品&#xff0c;是一个软件。既然是软件就会有一定的程序漏洞&#xff0c;这是无法避免的。但是很多时候其实并不一定是漏洞的问题。今天就来和大家谈谈小红书点赞不显示怎么回事&#xff0c;小红书评论被吞又是怎么一回事&#xff0c;这些难道都是程序性…...

地址变换和缺页置换习题

1.设某进程页面的访问序列为4,3,2,1,4,3,5,4,3&#xff0c;2,1,5&#xff0c;当分配给该进程的内存页框数分别为3和4时&#xff0c;对于先进先出&#xff0c;最近最少使用&#xff0c;最佳页面置换算法&#xff0c;分别发生多少次缺页中断&#xff1f; 答&#xff1a; 分配的…...

PAT 乙级 1010 一元多项式求导(解题思路+AC代码)

题目&#xff1a; 设计函数求一元多项式的导数。&#xff08;注&#xff1a;xn&#xff08;n为整数&#xff09;的一阶导数为nxn−1。&#xff09; 输入格式: 以指数递降方式输入多项式非零项系数和指数&#xff08;绝对值均为不超过 1000 的整数&#xff09;。数字间以空格分…...

一维河流污染持续排放模拟(水污染扩散)

一、处理河道转换为geojson数据 以淮河为例处理示例数据&#xff1a; {"type": "FeatureCollection","features": [{"geometry": {"coordinates": [[[115.5803,34.4982],[115.5922,34.498],[115.6061,34.4994],[115.6203,…...

数据优化 | CnOpenDataA股上市公司招聘数据

就业是经济的“晴雨表”&#xff0c;更是社会的“稳定器”。稳定和扩大就业一直是国家宏观调控的重要目标&#xff0c;2021年中央经济工作会议八次提到“就业”这一关键词。在新冠肺炎疫情蔓延、世界经济下行及人口老龄化加快等多重因素的叠加之下&#xff0c;稳就业保民生成为…...

nacos和eureka的区别

nacos和eureka的区别 Eureka是什么 Eureka详解Nacos是什么 Nacos详解Nacos和Eureka的区别 CAP理论连接方式服务异常剔除操作实例方式自我保护机制 Eureka是什么 Eureka 是Spring Cloud 微服务框架默认的也是推荐的服务注册中心,由Netflix公司与2012将其开源出来,Eureka基于RE…...

canvas.toDataURL生成图片报错的解决方案

问题原因&#xff1a; toDataURL方法存在跨域限制&#xff0c;如果执行时dom内含有跨域的图片则浏览器执行时会报错。 这个根据不同的系统有不同的表现&#xff0c;例如&#xff1a;生成完毕但控制台有warning类型的警告&#xff0c;或者直接异常报error。 解决思路&#xff…...

电容笔和Apple pencil的区别是什么?好用电容笔推荐

Apple Pencil与目前市场上常见的电容笔最大的不同之处在于&#xff0c;普通电容笔并不具备苹果Pencil特有的重力压感&#xff0c;而仅仅是一种倾斜的压感。不过&#xff0c;其在其它方面的表现也很出色&#xff0c;与Apple Pencil相似&#xff0c;而且价格仅为200元。现在&…...

关于onnx 转ncnn 的问题

文章目录修改模型Detect层设计转换后处理优质文章由于有些操作是没法支持的 如5维的操作&#xff1a; Unsupported slice axes ! Unsupported slice axes ! Unsupported slice axes ! Unsupported slice axes ! Unsupported slice axes ! Unsupported slice axes !参考&#…...

设计模式之《责任链模式》

------《责任链模式》责任链模式的概念为什么用责任链模式工作中用在哪里设计思路代码实现总结责任链模式的概念 责任链模式是一种行为型设计模式&#xff0c;它允许你将请求沿着处理链传递&#xff0c;直到有一个处理者能够处理该请求为止。 在责任链模式中&#xff0c;每个…...

Android Studio实现多功能日记本

项目目录一、项目概述二、系统特点三、开发环境四、详细设计1、E-R图2、数据库3、系统设置五、运行演示一、项目概述 本次实现了功能实用且齐全的日记本&#xff0c;界面友好&#xff0c;使用便捷&#xff0c;采用MVC架构设计。使用SQLite数据库存储数据&#xff0c;数据表有主…...

只依赖Tensorrt和opencv的yolov5源代码

simple_yolo.hpp #ifndef SIMPLE_YOLO_HPP #define SIMPLE_YOLO_HPP/*简单的yolo接口&#xff0c;容易集成但是高性能 */#include <vector> #include <memory> #include <string> #include <future> #include <opencv2/opencv.hpp>namespace Si…...

多路I/O转接 poll(了解)

poll() 的机制与 select() 类似&#xff0c;与 select() 在本质上没有多大差别&#xff0c;管理多个描述符也是进行轮询&#xff0c;根据描述符的状态进行处理&#xff0c;但是 poll() 没有最大文件描述符数量的限制&#xff08;但是数量过大后性能也是会下降&#xff09;。 p…...

听说你也在为配置tomcat server而烦恼,看我这一篇,让你醍醐灌顶!

一.通过maven创建项目 二.下载tomcat服务器 我们一般在tomcat官网中进行tomcat的下载 Apache Tomcat - Welcome! 三.添加配置&#xff1a;我们点击下图中的文件配置 四.测试配置的tomcat 我们在文件的body中输入 测试内容&#xff1a; 在控制台中显式tomcat运行的信息&#…...

【从零开始学Skynet】工具篇(二):虚拟机文件的复制粘贴

大家在Linux系统下开发的时候肯定会遇到虚拟机与主机间无法复制粘贴的问题&#xff0c;现在我们就来解决这样的问题&#xff0c;方便我们的开发。 1、打开设置 我们可以系统界面的菜单栏点击“控制”&#xff0c;然后打开“设置”&#xff1b; 也可以在VirtualBox界面打开“设…...

全球自动驾驶竞争力最新排行榜,4家中国企业上榜

发展至今&#xff0c;自动驾驶技术不仅是汽车行业的一个主战场&#xff0c;更是全球科技领域中备受关注和充满竞争的一个重要领域。近年来&#xff0c;各大汽车制造商和科技公司都在投入大量财力物力人力进行自动驾驶技术的研发&#xff0c;并进一步争夺市场份额。 当然&#…...

APP启动流程分析

1、要分析的问题 1、与正常trace比对&#xff0c;确认过耗时在哪个步骤&#xff08;am create/pause/stop/start/doframe)&#xff1f; 2、与正常trace比对&#xff0c;确认过耗时在哪个cpu state(Running/Runnable/Sleep/Uninterruptible Sleep)&#xff1f; 2、启动分析 …...

IIR数字滤波器简介与实现

一、简介&#xff1a; IIR是一种数字滤波器&#xff0c;其输出是输入信号和过去输出的某些加权和。IIR滤波器由反馈和前馈组成&#xff0c;可以用于滤除或增强信号的特定频率成分。 IIR滤波器的输出表示为&#xff1a; y[n] b0 * x[n] b1 * x[n-1] b2 * x[n-2] … - a1 * …...

3.5 函数的极值与最大值和最小值

学习目标&#xff1a; 我要学习函数的极值、最大值和最小值&#xff0c;我会采取以下几个步骤&#xff1a; 理解基本概念&#xff1a;首先&#xff0c;我会理解函数的极值、最大值和最小值的概念。例如&#xff0c;我会学习函数在特定区间内的最高点和最低点&#xff0c;并且理…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装&#xff08;Encapsulation&#xff09; 定义&#xff1a;将数据&#xff08;属性&#xff09;和操作数据的方法绑定在一起&#xff0c;通过访问控制符&#xff08;private、protected、public&#xff09;隐藏内部实现细节。示例&#xff1a; public …...

CMake基础:构建流程详解

目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

ffmpeg(四):滤镜命令

FFmpeg 的滤镜命令是用于音视频处理中的强大工具&#xff0c;可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下&#xff1a; ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜&#xff1a; ffmpeg…...

【Oracle】分区表

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...