当前位置: 首页 > news >正文

Go实现LogCollect:海量日志收集系统【上篇——LogAgent实现】

Go实现LogCollect:海量日志收集系统【上篇——LogAgent实现】

下篇:Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】

项目架构图:
在这里插入图片描述

0 项目背景与方案选择

背景

当公司发展的越来越大,业务越来越复杂时,每个业务系统都有自己的日志。此时我们就应该将不同业务线的日志进行实时收集,存储到一个日志收集中心,最后再通过web页面展示出来。

  • 解决方案:
  1. 把机器上的日志实时收集,统一的存储到中心系统
  2. 对这些日志建立索引,通过搜索即可以找到对应日志
  3. 提供界面友好的web界面,通过web即可以完成日志搜索

该系统可能会出现的问题:

  • 实时日志量非常大,每天几十亿条
  • 日志准实时收集 ,延迟控制在分钟级别
  • 能够水平可扩展

方案选择与设计

①方案选择:

  • 早期的ELK(Elasticsearch,Logstash, Kibana)到现在的EFK(Elasticsearch,FilebeatorFluentd, Kibana)。ELK在每台服务器上部署logstash,比较重量级,所以演化成客户端部署filebeat的EFK,由filebeat收集向logstash中写数据,最后落地到elasticsearch,通过kibana界面进行日志检索。其中Logstash主要用于收集、解析、转换
    • 优:现成的解决方案,可以直接拿来使用
    • 缺:运维成本高,每增加一个日志收集项都需要手动修改配置;无法准确获取logstash的状态,无法做到定制化开发与维护

方案设计:
在这里插入图片描述

各个组件说明:

  • Log Agent:日志收集客户端,用来收集服务器上的日志
  • Kafka:高吞吐量的分布式消息队列
  • Elasticsearch:开源搜索引擎框架,提供基于http RESTFul的web接口
  • Flink、Spark:分布式计算框架,能够对大量数据进行分布式处理

1 开发

1.1 收集日志信息到Kafka

①docker-compose搭建kafka

 vim docker-compose.yml

docker-compose.yml:

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:6.2.0ports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:6.2.0ports:- "9092:9092"environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181#KAFKA_ADVERTISED_LISTENERS后面改为自己本地宿主机的ip,例如我本地mac的ip为192.168.0.101KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1depends_on:- zookeeper
# 进入到docker-compose.yml所在目录,执行下面命令
docker-compose up -d
# 查看部署结果,状态为up表明部署成功
docker-compose ps 

在这里插入图片描述

②创建topic并通过golang消费数据

# 1. 创建对应topic
docker-compose exec kafka kafka-topics --create --topic nginx_log --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092# 2. 查看topic列表
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181
//golang中操作kafka的库
go get github.com/IBM/sarama
package mainimport ("fmt""time""github.com/IBM/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partitionconfig.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回// 连接kafkaclient, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)if err != nil {fmt.Println("producer close, err:", err)return}defer client.Close()for {// 构造⼀个消息msg := &sarama.ProducerMessage{}msg.Topic = "nginx_log"msg.Value = sarama.StringEncoder("this is a good test, my message is good")// 发送消息pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send message failed,", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)time.Sleep(10 * time.Millisecond)}
}

1.2 简单版本LogAgent的实现

  1. 根据log_agent.conf的LogAgent配置,初始化LogAgent参数,确认LogAgent工作日志(log_agent.log)的存放位置
  2. tail读取nginx_log.log日志信息,将读取到的信息通过kafka连接发送到kafka中
  3. kafka消费对应的信息

①代码结构

	.├─conf│      log_agent.conf│├─kafka│ 		kafka.go	│		├─consumer│      		consumer.go│├─logs│      log_agent.log│├─main│      config.go│      log.go│      main.go│      server.go│├─tailf│      tail.gogo.mod└─ go.sum

在这里插入图片描述

②代码

1. conf/log_agent.conf:LogAgent的配置文件
[logs]
log_level = debug
log_path = /Users/xxx/GolandProjects/LogAgent/log/log_agent.log[collect]
log_path = /Users/xxx/GolandProjects/LogAgent/nginx_log.log
topic = nginx_log
chan_size = 100[kafka]
server_addr = localhost:9092
2. kafka/consumer/consumer.go:创建kafka消费者

用于消费发送到kafka分区中的数据

package mainimport ("fmt""github.com/IBM/sarama"
)// kafka consumerfunc main() {consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)if err != nil {fmt.Printf("fail to start consumer, err:%v\n", err)return}partitionList, err := consumer.Partitions("nginx_log") // 根据topic取到所有的分区if err != nil {fmt.Printf("fail to get list of partition:err%v\n", err)return}fmt.Println(partitionList)for partition := range partitionList { // 遍历所有的分区// 针对每个分区创建一个对应的分区消费者pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)return}defer pc.AsyncClose()// 异步从每个分区消费信息go func(sarama.PartitionConsumer) {for msg := range pc.Messages() {fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))}}(pc)}//演示时使用select {}
}
3. kafka/kafka.go:初始化kafka,向kafka中发送数据
package kafkaimport ("github.com/IBM/sarama""github.com/astaxie/beego/logs"
)var (client sarama.SyncProducer
)func InitKafka(addr string) (err error) {// Kafka生产者配置config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partitionconfig.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回// 新建一个生产者对象client, err = sarama.NewSyncProducer([]string{addr}, config)if err != nil {logs.Error("初识化Kafka producer失败:", err)return}logs.Debug("初始化Kafka producer成功,地址为:", addr)return
}func SendToKafka(data, topic string) (err error) {msg := &sarama.ProducerMessage{}msg.Topic = topicmsg.Value = sarama.StringEncoder(data)pid, offset, err := client.SendMessage(msg)if err != nil {logs.Error("发送信息失败, err:%v, data:%v, topic:%v", err, data, topic)return}logs.Debug("read success, pid:%v, offset:%v, topic:%v\n", pid, offset, topic)return
}
4. main/config.go:用于解析log_agent.conf文件
package mainimport ("LogAgent/tailf""errors""fmt""github.com/astaxie/beego/config"
)var (logConfig *Config
)// 日志配置
type Config struct {logLevel    stringlogPath     stringchanSize    intKafkaAddr   stringCollectConf []tailf.CollectConf
}// 日志收集配置
func loadCollectConf(conf config.Configer) (err error) {var c tailf.CollectConfc.LogPath = conf.String("collect::log_path")if len(c.LogPath) == 0 {err = errors.New("无效的 collect::log_path ")return}c.Topic = conf.String("collect::topic")if len(c.Topic) == 0 {err = errors.New("无效的 collect::topic ")return}logConfig.CollectConf = append(logConfig.CollectConf, c)return
}// 导入解析LogAgent初始化配置
func loadInitConf(confType, filename string) (err error) {conf, err := config.NewConfig(confType, filename)if err != nil {fmt.Printf("初始化配置文件出错:%v\n", err)return}// 导入配置信息logConfig = &Config{}// 日志级别logConfig.logLevel = conf.String("logs::log_level")if len(logConfig.logLevel) == 0 {logConfig.logLevel = "debug"}// 日志输出路径logConfig.logPath = conf.String("logs::log_path")if len(logConfig.logPath) == 0 {logConfig.logPath = "/Users/xxx/GolandProjects/LogAgent/log/log_agent.log"}// 管道大小logConfig.chanSize, err = conf.Int("collect::chan_size")if err != nil {logConfig.chanSize = 100}// KafkalogConfig.KafkaAddr = conf.String("kafka::server_addr")if len(logConfig.KafkaAddr) == 0 {err = fmt.Errorf("初识化Kafka失败")return}err = loadCollectConf(conf)if err != nil {fmt.Printf("导入日志收集配置错误:%v", err)return}return
}
5. main/log.go:初始化LogAgent的日志打印
package mainimport ("encoding/json""fmt""github.com/astaxie/beego/logs"
)func convertLogLevel(level string) int {switch level {case "debug":return logs.LevelDebugcase "warn":return logs.LevelWarncase "info":return logs.LevelInfocase "trace":return logs.LevelTrace}return logs.LevelDebug
}func initLogger() (err error) {config := make(map[string]interface{})config["filename"] = logConfig.logPathconfig["level"] = convertLogLevel(logConfig.logLevel)configStr, err := json.Marshal(config)if err != nil {fmt.Println("初始化日志, 序列化失败:", err)return}_ = logs.SetLogger(logs.AdapterFile, string(configStr))return
}
6. main/main.go:服务入口
package mainimport ("LogAgent/kafka""LogAgent/tailf""fmt""github.com/astaxie/beego/logs"
)func main() {fmt.Println("开始")// 读取logAgent配置文件filename := "/Users/xxx/GolandProjects/LogAgent/conf/log_agent.conf"err := loadInitConf("ini", filename)if err != nil {fmt.Printf("导入配置文件错误:%v\n", err)panic("导入配置文件错误")return}// 初始化日志信息err = initLogger()if err != nil {fmt.Printf("导入日志文件错误:%v\n", err)panic("导入日志文件错误")return}// 输出成功信息logs.Debug("导入日志成功%v", logConfig)// 初始化tailf(解析nginx_log日志文件所在路径等,管道大小)err = tailf.InitTail(logConfig.CollectConf, logConfig.chanSize)if err != nil {logs.Error("初始化tailf失败:", err)return}logs.Debug("初始化tailf成功!")// 初始化Kafkaerr = kafka.InitKafka(logConfig.KafkaAddr)if err != nil {logs.Error("初识化kafka producer失败:", err)return}logs.Debug("初始化Kafka成功!")// 运行err = serverRun()if err != nil {logs.Error("serverRun failed:", err)}logs.Info("程序退出")
}
7. main/server.go:向kafka发送数据
package mainimport ("LogAgent/kafka""LogAgent/tailf""fmt""github.com/astaxie/beego/logs""time"
)func serverRun() (err error) {for {msg := tailf.GetOneLine()err = sendToKafka(msg)if err != nil {logs.Error("发送消息到Kafka 失败, err:%v", err)time.Sleep(time.Second)continue}}}func sendToKafka(msg *tailf.TextMsg) (err error) {fmt.Printf("读取 msg:%s, topic:%s\n", msg.Msg, msg.Topic) // 将消息打印在终端_ = kafka.SendToKafka(msg.Msg, msg.Topic)return
}
8. tailf/tail.go:用于读取nginx_log.log中的日志信息,并将信息发送到kafka
package tailfimport ("fmt""github.com/astaxie/beego/logs""github.com/hpcloud/tail""time"
)// 将日志收集配置放在tailf包下,方便其他包引用
type CollectConf struct {LogPath stringTopic   string
}// 存入Collect
type TailObj struct {tail *tail.Tailconf CollectConf
}// 定义Message信息
type TextMsg struct {Msg   stringTopic string
}// 管理系统所有tail对象
type TailObjMgr struct {tailsObjs []*TailObjmsgChan   chan *TextMsg
}// 定义全局变量
var (tailObjMgr *TailObjMgr
)func GetOneLine() (msg *TextMsg) {msg = <-tailObjMgr.msgChanreturn
}func InitTail(conf []CollectConf, chanSize int) (err error) {// 加载配置项if len(conf) == 0 {err = fmt.Errorf("无效的log collect conf:%v", conf)return}tailObjMgr = &TailObjMgr{msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道}// 循环导入for _, v := range conf {// 初始化Tailfmt.Println(v)tails, errTail := tail.TailFile(v.LogPath, tail.Config{ReOpen:    true,Follow:    true,Location:  &tail.SeekInfo{Offset: 0, Whence: 0},MustExist: false,Poll:      true,})if errTail != nil {err = errTailfmt.Println("tail 操作文件错误:", err)return}// 导入配置项obj := &TailObj{conf: v,tail: tails,}tailObjMgr.tailsObjs = append(tailObjMgr.tailsObjs, obj)go readFromTail(obj)}return
}// 读入日志数据
func readFromTail(tailObj *TailObj) {for true {msg, ok := <-tailObj.tail.Linesif !ok {logs.Warn("Tail file close reopen, filename:%s\n", tailObj.tail.Filename)time.Sleep(100 * time.Millisecond)continue}textMsg := &TextMsg{Msg:   msg.Text,Topic: tailObj.conf.Topic,}// 放入chan里面tailObjMgr.msgChan <- textMsg}
}

③效果

在这里插入图片描述

消费结果:
在这里插入图片描述

tailf读取nginx_log.log文件中的日志信息,并发送到kafka,由kakfa的消费者来进行消费
在这里插入图片描述

如果发现无法访问到docker中的kafka了,可能是因为你物理主机的ip更换了。docker-compose down暂停部署,然后重新修改docker-compose.yml中kafka绑定的物理主机IP即可,然后docker-compose up -d 重新部署。

1.3 引入etcd,创建多个tailtask

①环境准备:docker启动etcd与项目结构

1. docker启动etcd:搭建etcd集群
  1. 新建一个docker网络,方便etcd集群内部通信
docker network create etcd-network
  1. 启动etcd1,etcd第一个节点
docker run -d --name etcd1 --network etcd-network -p 2379:2379 -p 2380:2380 quay.io/coreos/etcd:v3.4.13 etcd \
--name etcd1 \
--advertise-client-urls http://0.0.0.0:2379 \
--listen-client-urls http://0.0.0.0:2379 \
--initial-advertise-peer-urls http://0.0.0.0:2380 \
--listen-peer-urls http://0.0.0.0:2380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd1=http://0.0.0.0:2380 \
--initial-cluster-state new
  1. 启动etcd2
docker run -d --name etcd2 --network etcd-network -p 22379:2379 -p 22380:2380 quay.io/coreos/etcd:v3.4.13 etcd \
--name etcd2 \
--advertise-client-urls http://0.0.0.0:22379 \
--listen-client-urls http://0.0.0.0:22379 \
--initial-advertise-peer-urls http://0.0.0.0:22380 \
--listen-peer-urls http://0.0.0.0:22380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd1=http://etcd1:2380,etcd2=http://0.0.0.0:22380 \
--initial-cluster-state existing
  1. 启动etcd3
docker run -d --name etcd3 --network etcd-network -p 32379:2379 -p 32380:2380 quay.io/coreos/etcd:v3.4.13 etcd \
--name etcd3 \
--advertise-client-urls http://0.0.0.0:32379 \
--listen-client-urls http://0.0.0.0:32379 \
--initial-advertise-peer-urls http://0.0.0.0:32380 \
--listen-peer-urls http://0.0.0.0:32380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://0.0.0.0:32380 \
--initial-cluster-state existing

这样,我们就成功在Docker中搭建了一个由3个etcd节点组成的集群,并分别暴露了端口2379、22379和32379。您可以使用docker ps命令来查看正在运行的容器,使用docker logs <container_name>命令来查看每个etcd容器的日志

2. 项目结构
.
│  go.mod
│  go.sum
│
│
├─conf
│      log_agent.conf
│
├─kafka
│      kafka.go
│
├─logs
│      log_agent.log
│
├─main
│      config.go
│      etcd.go
│      ip.go
│      log.go
│      main.go
│      server.go
│
├─tailf
│      tail.go
│
└─tools└─SetConfmain.go

②代码

1. tools/SetConf/main.go:将配置信息存入etcd
package mainimport ("LogAgent/tailf""context""encoding/json""fmt""go.etcd.io/etcd/client/v3""time"
)// 定义etcd的前缀key
const (EtcdKey = "/backend/logagent/config/192.168.0.101"
)func SetLogConfToEtcd() {cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},DialTimeout: 5 * time.Second,})if err != nil {fmt.Println("connect failed, err:", err)return}fmt.Println("connect succ")defer cli.Close()var logConfArr []tailf.CollectConflogConfArr = append(logConfArr,tailf.CollectConf{LogPath: "/Users/xxx/GolandProjects/LogAgent/mysql_log.log",Topic:   "mysql_log",},)logConfArr = append(logConfArr,tailf.CollectConf{LogPath: "/Users/xxx/GolandProjects/LogAgent/nginx_log.log",Topic:   "nginx_log",},)// Json打包data, err := json.Marshal(logConfArr)if err != nil {fmt.Println("json failed, ", err)return}ctx, cancel := context.WithTimeout(context.Background(), time.Second)_, err = cli.Put(ctx, EtcdKey, string(data))cancel()if err != nil {fmt.Println("put failed, err:", err)return}ctx, cancel = context.WithTimeout(context.Background(), time.Second)resp, err := cli.Get(ctx, EtcdKey)cancel()if err != nil {fmt.Println("get failed, err:", err)return}for _, ev := range resp.Kvs {fmt.Printf("%s : %s\n", ev.Key, ev.Value)}
}func main() {SetLogConfToEtcd()
}

注意📢:编写完之后,要先运行该代码,将对应的k-v存入etcd,然后再启动LogAgent,因为我们的LogAgent会从etcd中获取对应配置

2. main/etcd.go

用于初始化连接etcd、从etcd中取出配置信息

package mainimport ("LogAgent/tailf""context""encoding/json""fmt""github.com/astaxie/beego/logs"clientv3 "go.etcd.io/etcd/client/v3""strings""time"
)type EtcdClient struct {client *clientv3.Client
}var (etcdClient *EtcdClient
)func initEtcd(addr string, key string) (collectConf []tailf.CollectConf, err error) {// 初始化连接etcdcli, err := clientv3.New(clientv3.Config{//Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},Endpoints:   []string{addr},DialTimeout: 5 * time.Second,})if err != nil {logs.Error("连接etcd失败:", err)return}etcdClient = &EtcdClient{client: cli,}// 如果Key不是以"/"结尾, 则自动加上"/"if strings.HasSuffix(key, "/") == false {key = key + "/"}for _, ip := range localIPArray {etcdKey := fmt.Sprintf("%s%s", key, ip)ctx, cancel := context.WithTimeout(context.Background(), time.Second)resp, err := cli.Get(ctx, etcdKey)if err != nil {logs.Error("etcd get请求失败:", err)continue}cancel()logs.Debug("resp from etcd:%v", resp.Kvs)for _, v := range resp.Kvs {if string(v.Key) == etcdKey {// 将从etcd中取出来的json格式反序列化为结构体err = json.Unmarshal(v.Value, &collectConf)if err != nil {logs.Error("反序列化失败:", err)continue}logs.Debug("日志设置为%v", collectConf)}}}logs.Debug("连接etcd成功")return
}
3. main/ip.go

获取本机所有网卡ip去连接etcd

  • 考虑到以后添加新服务器时,不需要手动添加ip,这里将ip信息全部存入localIPArray中
package mainimport ("fmt""net"
)var (localIPArray []string
)func init() {addrs, err := net.InterfaceAddrs()if err != nil {panic(fmt.Sprintf("获取网卡ip失败, %v", err))}for _, addr := range addrs {if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {if ipnet.IP.To4() != nil {localIPArray = append(localIPArray, ipnet.IP.String())}}}fmt.Println(localIPArray)
}
4. main/config.go
package mainimport ("LogAgent/tailf""errors""fmt""github.com/astaxie/beego/config"
)var (logConfig *Config
)// 日志配置
type Config struct {logLevel    stringlogPath     stringchanSize    intKafkaAddr   stringCollectConf []tailf.CollectConfetcdAddr    stringetcdKey     string
}// 日志收集配置
func loadCollectConf(conf config.Configer) (err error) {var c tailf.CollectConfc.LogPath = conf.String("collect::log_path")if len(c.LogPath) == 0 {err = errors.New("无效的 collect::log_path ")return}c.Topic = conf.String("collect::topic")if len(c.Topic) == 0 {err = errors.New("无效的 collect::topic ")return}logConfig.CollectConf = append(logConfig.CollectConf, c)return
}// 导入解析LogAgent初始化配置
func loadInitConf(confType, filename string) (err error) {conf, err := config.NewConfig(confType, filename)if err != nil {fmt.Printf("初始化配置文件出错:%v\n", err)return}// 导入配置信息logConfig = &Config{}// 日志级别logConfig.logLevel = conf.String("logs::log_level")if len(logConfig.logLevel) == 0 {logConfig.logLevel = "debug"}// 日志输出路径logConfig.logPath = conf.String("logs::log_path")if len(logConfig.logPath) == 0 {logConfig.logPath = "/Users/xxx/GolandProjects/LogAgent/log/log_agent.log"}// 管道大小logConfig.chanSize, err = conf.Int("collect::chan_size")if err != nil {logConfig.chanSize = 100}// KafkalogConfig.KafkaAddr = conf.String("kafka::server_addr")if len(logConfig.KafkaAddr) == 0 {err = fmt.Errorf("初识化Kafka失败")return}err = loadCollectConf(conf)if err != nil {fmt.Printf("导入日志收集配置错误:%v", err)return}// etcdlogConfig.etcdAddr = conf.String("etcd::addr")if len(logConfig.etcdAddr) == 0 {err = fmt.Errorf("初识化etcd addr失败")return}logConfig.etcdKey = conf.String("etcd::configKey")if len(logConfig.etcdKey) == 0 {err = fmt.Errorf("初识化etcd configKey失败")return}return
}
5. tailf/tail.go

修改tail.go文件:添加json标签,用于反序列化

package tailfimport ("fmt""github.com/astaxie/beego/logs""github.com/hpcloud/tail""time"
)// 将日志收集配置放在tailf包下,方便其他包引用
type CollectConf struct {LogPath string `json:"logpath"`Topic   string `json:"topic"`
}// 存入Collect
type TailObj struct {tail *tail.Tailconf CollectConf
}// 定义Message信息
type TextMsg struct {Msg   stringTopic string
}// 管理系统所有tail对象
type TailObjMgr struct {tailsObjs []*TailObjmsgChan   chan *TextMsg
}// 定义全局变量
var (tailObjMgr *TailObjMgr
)func GetOneLine() (msg *TextMsg) {msg = <-tailObjMgr.msgChanreturn
}func InitTail(conf []CollectConf, chanSize int) (err error) {// 加载配置项if len(conf) == 0 {err = fmt.Errorf("无效的log collect conf:%v", conf)return}tailObjMgr = &TailObjMgr{msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道}// 循环导入for _, v := range conf {// 初始化Tailfmt.Println(v)tails, errTail := tail.TailFile(v.LogPath, tail.Config{ReOpen:    true,Follow:    true,Location:  &tail.SeekInfo{Offset: 0, Whence: 0},MustExist: false,Poll:      true,})if errTail != nil {err = errTailfmt.Println("tail 操作文件错误:", err)return}// 导入配置项obj := &TailObj{conf: v,tail: tails,}tailObjMgr.tailsObjs = append(tailObjMgr.tailsObjs, obj)go readFromTail(obj)}return
}// 读入日志数据
func readFromTail(tailObj *TailObj) {for true {msg, ok := <-tailObj.tail.Linesif !ok {logs.Warn("Tail file close reopen, filename:%s\n", tailObj.tail.Filename)time.Sleep(100 * time.Millisecond)continue}textMsg := &TextMsg{Msg:   msg.Text,Topic: tailObj.conf.Topic,}// 放入chan里面tailObjMgr.msgChan <- textMsg}
}
6. main/main.go

将initEtcd放到InitTail函数之前,不然无法从etcd中获取值

package mainimport ("LogAgent/kafka""LogAgent/tailf""fmt""github.com/astaxie/beego/logs"
)func main() {fmt.Println("开始")// 读取初始化配置文件filename := "/Users/xxx/GolandProjects/LogAgent/conf/log_agent.conf"err := loadInitConf("ini", filename)if err != nil {fmt.Printf("导入配置文件错误:%v\n", err)panic("导入配置文件错误")return}// 初始化日志信息err = initLogger()if err != nil {fmt.Printf("导入日志文件错误:%v\n", err)panic("导入日志文件错误")return}// 输出成功信息logs.Debug("导入日志成功%v", logConfig)// 初识化etcdcollectConf, err := initEtcd(logConfig.etcdAddr, logConfig.etcdKey)if err != nil {logs.Error("初始化etcd失败", err)}logs.Debug("初始化etcd成功!")// 初始化tailferr = tailf.InitTail(collectConf, logConfig.chanSize)if err != nil {logs.Error("初始化tailf失败:", err)return}logs.Debug("初始化tailf成功!")// 初始化Kafkaerr = kafka.InitKafka(logConfig.KafkaAddr)if err != nil {logs.Error("初识化Kafka producer失败:", err)return}logs.Debug("初始化Kafka成功!")// 运行err = serverRun()if err != nil {logs.Error("serverRun failed:", err)}logs.Info("程序退出")
}
效果

在这里插入图片描述

  • 当没有对应日志文件存在时:
    在这里插入图片描述
  • 当对应日志文件存在并有对应内容时:
    在这里插入图片描述

1.4 监听etcd配置项的变更

在真实生产环境中时会常常添加新的服务器, 这时我们需要借助之前的ip.go获取所有ip节点, 并且实时监控,修改EtcdClient结构体增加keys

①修改main/etcd.go

在main/etcd.go中添加initEtcdWatcher与watchKey函数并且在函数initEtcd中调用

package mainimport ("LogAgent/tailf""context""encoding/json""fmt""github.com/astaxie/beego/logs"clientv3 "go.etcd.io/etcd/client/v3""strings""time"
)type EtcdClient struct {client *clientv3.Clientkeys   []string
}var (etcdClient *EtcdClient
)func initEtcd(addr string, key string) (collectConf []tailf.CollectConf, err error) {// 初始化连接etcdcli, err := clientv3.New(clientv3.Config{//Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},Endpoints:   []string{addr},DialTimeout: 5 * time.Second,})if err != nil {logs.Error("连接etcd失败:", err)return}etcdClient = &EtcdClient{client: cli,}// 如果Key不是以"/"结尾, 则自动加上"/"if strings.HasSuffix(key, "/") == false {key = key + "/"}for _, ip := range localIPArray {etcdKey := fmt.Sprintf("%s%s", key, ip)ctx, cancel := context.WithTimeout(context.Background(), time.Second)resp, err := cli.Get(ctx, etcdKey)if err != nil {logs.Error("etcd get请求失败:", err)continue}cancel()logs.Debug("resp from etcd:%v", resp.Kvs)for _, v := range resp.Kvs {if string(v.Key) == etcdKey {// 将从etcd中取出来的json格式反序列化为结构体err = json.Unmarshal(v.Value, &collectConf)if err != nil {logs.Error("反序列化失败:", err)continue}logs.Debug("日志设置为%v", collectConf)}}}logs.Debug("连接etcd成功")initEtcdWatcher(addr)return
}// 初始化多个watch监控etcd中配置节点
func initEtcdWatcher(addr string) {for _, key := range etcdClient.keys {go watchKey(addr, key)}
}func watchKey(addr string, key string) {// 初始化连接etcdcli, err := clientv3.New(clientv3.Config{//Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},Endpoints:   []string{addr},DialTimeout: 5 * time.Second,})if err != nil {logs.Error("连接etcd失败:", err)return}logs.Debug("开始监控key:", key)// Watch操作wch := cli.Watch(context.Background(), key)for resp := range wch {for _, ev := range resp.Events {fmt.Printf("Type: %v, Key:%v, Value:%v\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))}}
}

②修改tailf/tail.go

package tailfimport ("github.com/astaxie/beego/logs""github.com/hpcloud/tail""time"
)// 定义常量
const (StatusNormal = 1 // 正常状态StatusDelete = 2 // 删除状态
)// 将日志收集配置放在tailf包下,方便其他包引用
type CollectConf struct {LogPath string `json:"logpath"`Topic   string `json:"topic"`
}// 存入Collect
type TailObj struct {tail     *tail.Tailconf     CollectConfstatus   intexitChan chan int
}// 定义Message信息
type TextMsg struct {Msg   stringTopic string
}// 管理系统所有tail对象
type TailObjMgr struct {tailsObjs []*TailObjmsgChan   chan *TextMsg
}// 定义全局变量
var (tailObjMgr *TailObjMgr
)func GetOneLine() (msg *TextMsg) {msg = <-tailObjMgr.msgChanreturn
}// 初始化tail
func InitTail(conf []CollectConf, chanSize int) (err error) {tailObjMgr = &TailObjMgr{msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道}// 加载配置项if len(conf) == 0 {logs.Error("无效的日志collect配置: ", conf)}// 循环导入for _, v := range conf {createNewTask(v)}return
}// 读入日志数据
func readFromTail(tailObj *TailObj) {for true {select {case msg, ok := <-tailObj.tail.Lines:if !ok {logs.Warn("Tail file close reopen, filename:%s\n", tailObj.tail.Filename)time.Sleep(100 * time.Millisecond)continue}textMsg := &TextMsg{Msg:   msg.Text,Topic: tailObj.conf.Topic,}// 放入chan里tailObjMgr.msgChan <- textMsg// 如果exitChan为1, 则删除对应配置项case <-tailObj.exitChan:logs.Warn("tail obj 退出, 配置项为conf:%v", tailObj.conf)return}}
}// 新增etcd配置项
func UpdateConfig(confs []CollectConf) (err error) {// 创建新的tailtaskfor _, oneConf := range confs {// 对于已经运行的所有实例, 路径是否一样var isRuning = falsefor _, obj := range tailObjMgr.tailsObjs {// 路径一样则证明是同一实例if oneConf.LogPath == obj.conf.LogPath {isRuning = trueobj.status = StatusNormalbreak}}// 检查是否已经存在if isRuning {continue}// 如果不存在该配置项 新建一个tailtask任务createNewTask(oneConf)}// 遍历所有查看是否存在删除操作var tailObjs []*TailObjfor _, obj := range tailObjMgr.tailsObjs {obj.status = StatusDeletefor _, oneConf := range confs {if oneConf.LogPath == obj.conf.LogPath {obj.status = StatusNormalbreak}}// 如果status为删除, 则将exitChan置为1if obj.status == StatusDelete {obj.exitChan <- 1}// 将obj存入临时的数组中tailObjs = append(tailObjs, obj)}// 将临时数组传入tailsObjs中tailObjMgr.tailsObjs = tailObjsreturn
}func createNewTask(conf CollectConf) {// 初始化Tailf实例tails, errTail := tail.TailFile(conf.LogPath, tail.Config{ReOpen:    true,Follow:    true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},MustExist: false,Poll:      true,})if errTail != nil {logs.Error("收集文件[%s]错误: %v", conf.LogPath, errTail)return}// 导入配置项obj := &TailObj{conf:     conf,exitChan: make(chan int, 1),}obj.tail = tailstailObjMgr.tailsObjs = append(tailObjMgr.tailsObjs, obj)go readFromTail(obj)
}

③测试etcd的watch机制

执行下面命令,将下面的key1换成自己真实的key,将value换成自己真实想要配置的value,比如:docker exec etcd1 etcdctl put /backend/logagent/config/192.168.0.103 "[{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/mysql_log.log\",\"topic\":\"mysql_log\"},{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/nginx_log.log\",\"topic\":\"nginx_log\"}]"

  • 该命令是操作docker中的etcd,向etcd中新增一个key:/backend/logagent/config/192.168.0.101
    value(注意转义): “[{“logpath”:”/Users/xxx/GolandProjects/LogCollect/LogAgent/mysql_log.log",“topic”:“mysql_log”},{“logpath”:“/Users/xxx/GolandProjects/LogCollect/LogAgent/nginx_log.log”,“topic”:“nginx_log”}]"
# 查看etcd中所有key
docker exec etcd1 etcdctl get "" --prefix --keys-only# 向etcd中添加key-value对:
docker exec etcd1 etcdctl put key1 value1#从etcd中删除指定的key:
docker exec etcd1 etcdctl del key1#从etcd中获取指定的key的值:
docker exec etcd1 etcdctl get key1

执行对应操作后,观察日志信息:

在这里插入图片描述
在这里插入图片描述

可以从LogAgent的日志中发现已经,成功监听到了etcd的变化

参考:https://blog.csdn.net/qq_43442524/article/details/105024906

相关文章:

Go实现LogCollect:海量日志收集系统【上篇——LogAgent实现】

Go实现LogCollect&#xff1a;海量日志收集系统【上篇——LogAgent实现】 下篇&#xff1a;Go实现LogCollect&#xff1a;海量日志收集系统【下篇——开发LogTransfer】 项目架构图&#xff1a; 0 项目背景与方案选择 背景 当公司发展的越来越大&#xff0c;业务越来越复杂…...

MySQL (1)

目录 操作须知 数据类型 1 DDL 1.1 操作库 1.2 操作表 1.3 操作字段(ALTER TABLE 表名) 2 DML 3 DQL(见下章) 操作须知 ※ MySQL在windows环境不区分大小写,但在Linux环境严格区分大小写 ※ 不同的数据库可能存在同名的表,可以给表前加"数据库前缀" //例:…...

MR混合现实汽车维修情景实训教学演示

MR混合现实技术应用于汽车维修课堂中&#xff0c;能够赋予学生更加真实&#xff0c;逼真地学习环境&#xff0c;让学生在情景体验中不断提高自己的专业能力。 MR混合现实汽车维修情景实训教学演示具体体现在&#xff1a; 1. 虚拟维修指导&#xff1a;利用MR技术&#xff0c;可…...

ChatGPT在航空航天工程和太空探索中的潜在应用如何?

ChatGPT在航空航天工程和太空探索领域具有广泛的潜在应用。这些应用可以涵盖从设计和模拟到任务控制和数据分析的多个方面。本文将探讨ChatGPT在航空航天和太空探索中的各种可能应用&#xff0c;包括设计优化、任务规划、智能导航、卫星通信、数据分析和太空探测器运行。 ### …...

算法基础第三章

算法基础第三章 1、dfs(深度搜索)1.1、 递归回溯1.2、递归剪枝&#xff08;剪枝就是判断接下来的递归都不会满足条件&#xff0c;直接回溯&#xff0c;不再继续往下无意义的递归&#xff09; 2、bfs(广度搜索)2.1、最优路径&#xff08;只适合于边权都相等的题&#xff09; 3、…...

ElementUI浅尝辄止20:Pagination 分页

分页组件常见于管理系统的列表查询页面&#xff0c;数据量巨大时需要分页的操作。 当数据量过多时&#xff0c;使用分页分解数据。 1.如何使用&#xff1f; /*设置layout&#xff0c;表示需要显示的内容&#xff0c;用逗号分隔&#xff0c;布局元素会依次显示。prev表示上一页…...

Docker从认识到实践再到底层原理(二-1)|容器技术发展史+虚拟化容器概念和简介

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量博客汇总 然后就是博主最近最花时间的一个专栏…...

什么是大模型?1750亿、700GB的GPT大模型大在哪?

文章目录 什么是大模型&#xff1f;1750亿、700GB的GPT大模型大在哪&#xff1f; 什么是大模型&#xff1f; 在人工智能领域&#xff0c;模型是指一种对数据进行处理和分析的数学结构。模型越复杂&#xff0c;能够处理的数据量和处理的准确性都会得到提高。 随着人工智能技术…...

剑指 Offer 10- II. 青蛙跳台阶问题

剑指 Offer 10- II. 青蛙跳台阶问题 和 剑指 Offer 10- I. 斐波那契数列 很像&#xff0c;改一下初始值就行了。 方法一 class Solution {int mod (int) 1e9 7;public int numWays(int n) {if(n < 1) return 1;int[] dp new int[n 1];dp[1] 1;dp[2] 2;for(int i 3…...

oracle10和11功能说明比较

Oracle 10g/11g的特点和优势 首先&#xff0c;Oracle 10g/11g具有以下几个特点&#xff1a; 1. 可靠性和稳定性&#xff1a;Oracle 10g采用了多种技术来确保数据的可靠性和稳定性&#xff0c;如ACID事务处理和数据备份与恢复机制。它还提供了高可用性的解决方案&#xff0c;如…...

golang-bufio 缓冲写

1. 缓冲写 在阅读这篇博客之前&#xff0c;请先阅读上一篇&#xff1a;golang-bufio 缓冲读 // buffered output// Writer implements buffering for an io.Writer object. // If an error occurs writing to a Writer, no more data will be // accepted and all subsequent…...

Windows修改电脑DNS

访问浏览器出现无法访问此页面&#xff0c;找不到DNS地址&#xff0c;则可以通过如下方式修改DNS 按下windows键R键(两个键一起按) 出现下面窗口 输入control按回车键(Enter键)就会出现下面的窗口 DNS可以填下面这些&#xff1a; 114.114.114.114 和 114.114.115.115 阿里DNS&a…...

Linux驱动之Linux自带的LED灯驱动

目录 一、简介 二、使能Linux自带LED驱动 三、Linux内核自带LED驱动框架 四、设备树节点编写 五、运行测试 一、简介 前面我们都是自己编写 LED 灯驱动&#xff0c;其实像 LED 灯这样非常基础的设备驱动&#xff0c; Linux 内核已经集成了。 Linux 内核的 LED 灯驱动采用 …...

C盘清理 “ProgramData\Microsoft\Search“ 文件夹过大

修改索引存放位置 进入控制面板->查找方式改成大图标&#xff0c; 选择索引选项 进入高级 填写新的索引位置 删除C盘索引信息 删除C:\ProgramData\Microsoft\Search\Data\Applications 下面的文件夹 如果报索引正在使用&#xff0c;参照第一步替换索引位置。关闭索引...

深入了解字符串处理算法与文本操作技巧

深入了解字符串处理算法与文本操作技巧 引言 字符串处理是计算机科学和数据处理的核心领域之一。本博客将深入介绍一些常见的字符串处理算法和文本操作技巧&#xff0c;包括字符串匹配、搜索、正则表达式、字符串操作和文本标准化等。 暴力匹配算法 什么是暴力匹配&#xf…...

Python爬虫:打开盈利大门的利器

导言&#xff1a; 随着互联网的迅速发展&#xff0c;越来越多的企业和个人开始意识到数据的重要性。而Python爬虫作为一种自动化获取互联网信息的技术&#xff0c;为人们提供了更便捷、高效的数据获取方式。本文将介绍基于Python爬虫的五种盈利模式&#xff0c;并提供实际案例…...

17.CSS发光按钮悬停特效

效果 源码 <!DOCTYPE html> <html> <head><title>CSS Modern Button</title><link rel="stylesheet" type="text/css" href="style.css"> </head> <body><a href="#" style=&quo…...

CSS中如何实现弹性盒子布局(Flexbox)的换行和排序功能?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 换行&#xff08;Flexbox Wrapping&#xff09;⭐ 示例&#xff1a;实现换行⭐ 排序&#xff08;Flexbox Ordering&#xff09;⭐ 示例&#xff1a;实现排序⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得…...

spark底层为什么选择使用scala语言开发

Spark 底层使用 Scala 开发有以下几个原因&#xff1a; 基于Scala的语言特性 集成性&#xff1a;Scala 是一种运行在 Java 虚拟机&#xff08;JVM&#xff09;上的静态类型编程语言&#xff0c;可以与 Java 代码无缝集成。由于 Spark 涉及到与大量 Java 生态系统的交互&#x…...

基于RabbitMQ的模拟消息队列之三——硬盘数据管理

文章目录 一、数据库管理1.设计数据库2.添加sqlite依赖3.配置application.properties文件4.创建接口MetaMapper5.创建MetaMapper.xml文件6.数据库操作7.封装数据库操作 二、文件管理1.消息持久化2.消息文件格式3.序列化/反序列化4.创建文件管理类MessageFileManager5.垃圾回收 …...

DHorse v1.3.2 发布,基于 k8s 的发布平台

版本说明 新增特性 构建版本、部署应用时的线程池可配置化&#xff1b; 优化特性 构建版本跳过单元测试&#xff1b; 解决问题 解决Vue应用详情页面报错的问题&#xff1b;解决Linux环境下脚本运行失败的问题&#xff1b;解决下载Maven安装文件失败的问题&#xff1b; 升…...

在vue.config.js中配置文件路径代理名

今天在公司项目中看到一个非常有趣的导入路径 crud 先是一蒙 这是个啥 突然想起一个被自己遗漏的知识点 在vue.config.js中配置路径指向 这里 我们随便找一个vue项目 在src下找到 components 目录 如果没有就创建一个 下面找到HelloWorld.vue 如果没有也是自己创建一个就好 然…...

深度学习优化算法相关文章

综述性文章 一个框架看懂优化算法之异同 SGD/AdaGrad/Adam 从 SGD 到 Adam —— 深度学习优化算法概览(一)...

echarts自定义Y轴刻度及其颜色

yAxis: [{min:0,max:5,axisLabel: {color: "#999",textStyle: {fontSize: 14,fontWeight: 400,// 设置分段颜色color: function (value) {console.log("试试", value);if (value 1) {return "rgba(140,198,63,1)";} else if (value 2) {return…...

【云原生进阶之PaaS中间件】第一章Redis-1.3Redis配置

1 Redis配置概述 Redis支持采用其内置默认配置的方式来进行启动&#xff0c;而不需要提前配置任何文件&#xff0c;但是这种启动方式只推荐在测试和开发环境中使用&#xff0c;但更好的方式是通过提供一个Redis的配置文件来对Redis进行配置&#xff0c; 这个配置文件一般命名为…...

C++ 动态内存

C 程序中的内存分为栈和堆两个部分&#xff1a; 栈&#xff1a;在函数内部声明的所有变量都将占用栈内存&#xff1b;堆&#xff1a;这是程序中未使用的内存&#xff0c;在程序运行时可用于动态分配内存。 堆与栈的详细请参考&#xff1a;一文读懂堆与栈的区别_堆和栈的区别_恋…...

swagger 接口测试,用 python 写自动化时该如何处理?

在使用Python进行Swagger接口测试时&#xff0c;可以使用requests库来发送HTTP请求&#xff0c;并使用json库和yaml库来处理响应数据。以下是一个简单的示例代码&#xff1a; import requests import json import yaml# Swagger API文档地址和需要测试的接口路径 swagger_url …...

QT使用QXlsx实现Excel图片与图表操作 QT基础入门【Excel的操作】

构建图表数据 /// 构建图表数据for (int i = 1; i < 10; ++i) {mxlsx.write(i, 1, i * i * i); // A1:A9mxlsx.write(i, 2, i * i); // B1:B9mxlsx.write(i, 3, i * i - 1); // C1:C9} 需要包含头文件 #include "xlsxchart.h" 1. 饼状图 Chart *pieChart = mxlsx.…...

【Python常用函数】一文让你彻底掌握Python中的numpy.clip函数

大数据时代的到来,使得很多工作都需要进行数据挖掘,从而发现更多有利的规律,或规避风险,或发现商业价值。而大数据分析的基础是学好编程语言。本文和你一起来探索Python中的clip函数,让你以最短的时间明白这个函数的原理。也可以利用碎片化的时间巩固这个函数,让你在处理…...

Matlab(GUI程式设计)

目录 1.MatlabGUI 1.1 坐标区普通按钮 1.1.1 对齐组件 1.1.2 按钮属性 1.1.3 脚本说明 1.1.4 选择呈现 1.3 编译GUI程序 在以前的时候&#xff0c;我们的电脑还是这样的 随着科技的不断进步&#xff0c;我们的电脑也发生着翻天覆地的改变1990s&#xff1a; 在未来&#xff0c…...