当前位置: 首页 > news >正文

33.5 remote实战项目之设计prometheus数据源的结构

本节重点介绍 :

  • 项目要求
    • 通过remote read读取prometheus中的数据
    • 通过remote write向prometheus中写入数据
  • 准备工作
    • 新建项目 prome_remote_read_write
    • 设计prometheus 数据源的结构
    • 初始化

项目要求

  • 通过remote read读取prometheus中的数据
  • 通过remote write向prometheus中写入数据

准备工作

新建项目 prome_remote_read_write

go mod init prome_remote_read_write

准备配置文件 prome_remote_read_write.yml

  • remoteWrite代表 支持remote_write的多个后端
  • remoteRead代表 支持remote_read的多个后端
remoteWrite:# m3db的配置#- name: m3db01#  url: http://localhost:7201/api/v1/prom/remote/write#  remoteTimeoutSecond: 5# prometheus的配置- name: prome01url: http://172.20.70.205:9090/api/v1/writeremoteTimeoutSecond: 5
remoteRead:- name: prome01url: http://172.20.70.205:9090/api/v1/readremoteTimeoutSecond: 5

配置文件解析

  • config/config.go
package configimport ("github.com/toolkits/pkg/logger""gopkg.in/yaml.v2""io/ioutil"
)type RemoteConfig struct {Name                string `yaml:"name"`Url                 string `yaml:"url"`RemoteTimeoutSecond int    `yaml:"remoteTimeoutSecond"`
}type PromeSection struct {RemoteWrite []RemoteConfig `yaml:"remoteWrite"`RemoteRead  []RemoteConfig `yaml:"remoteRead"`
}func Load(s string) (*PromeSection, error) {cfg := &PromeSection{}err := yaml.Unmarshal([]byte(s), cfg)if err != nil {return nil, err}return cfg, nil
}func LoadFile(filename string) (*PromeSection, error) {content, err := ioutil.ReadFile(filename)if err != nil {return nil, err}cfg, err := Load(string(content))if err != nil {logger.Errorf("[parsing YAML file errr...][error:%v]", err)return nil, err}return cfg, nil
}

main.go中解析配置

package mainimport ("flag""github.com/toolkits/pkg/logger""math/rand""prome_remote_read_write/config""prome_remote_read_write/datasource""time"
)func main() {rand.Seed(time.Now().UnixNano())configFile := flag.String("config", "prome_remote_read_write.yml","Address on which to expose metrics and web interface.")flag.Parse()sConfig, err := config.LoadFile(*configFile)if err != nil {logger.Infof("config.LoadFile Error,Exiting ...error:%v", err)return}
}

设计prometheus 数据源的结构

  • 位置 datasource/prome.go
package datasourceimport ("github.com/go-kit/kit/log""github.com/prometheus/client_golang/prometheus"config_util "github.com/prometheus/common/config""github.com/prometheus/common/model""github.com/prometheus/common/promlog"pc "github.com/prometheus/prometheus/config""github.com/prometheus/prometheus/prompb""github.com/prometheus/prometheus/promql""github.com/prometheus/prometheus/storage""github.com/prometheus/prometheus/storage/remote""github.com/toolkits/pkg/logger""go.uber.org/atomic""io/ioutil""net/http""net/url""prome_remote_read_write/config""time"
)type PromeDataSource struct {Section      *config.PromeSection            //配置PushQueue    chan []prompb.TimeSeries        // 数据推送的chanLocalTmpDir  string                          // 本地临时目录,存放queries.active文件Queryable    storage.SampleAndChunkQueryable // 除了promql的查询,需要后端存储,如查询seriesQueryEngine  *promql.Engine                  // promql相关查询WriteTargets []*HttpClient                   // remote_write写入的后端地址
}type HttpClient struct {remoteName string // Used to differentiate clients in metrics.url        *url.URLClient     *http.Clienttimeout    time.Duration
}

new函数

  • 根据传入的配置new
func NewPromeDataSource(cg *config.PromeSection) *PromeDataSource {pd := &PromeDataSource{Section:   cg,PushQueue: make(chan []prompb.TimeSeries, 10000),}return pd
}

Init初始化函数

  • 完整代码如下

type safePromQLNoStepSubqueryInterval struct {value atomic.Int64
}func durationToInt64Millis(d time.Duration) int64 {return int64(d / time.Millisecond)
}
func (i *safePromQLNoStepSubqueryInterval) Set(ev model.Duration) {i.value.Store(durationToInt64Millis(time.Duration(ev)))
}
func (i *safePromQLNoStepSubqueryInterval) Get(int64) int64 {return i.value.Load()
}func NewPromeDataSource(cg *config.PromeSection) *PromeDataSource {pd := &PromeDataSource{Section:   cg,PushQueue: make(chan []prompb.TimeSeries, 10000),}return pd
}func (pd *PromeDataSource) Init() {// 模拟创建本地存储目录dbDir, err := ioutil.TempDir("", "tsdb-api-ready")if err != nil {logger.Errorf("[error_create_local_tsdb_dir][err: %v]", err)return}pd.LocalTmpDir = dbDirpromlogConfig := promlog.Config{}// 使用本地目录创建remote-storageremoteS := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {return 0, nil}, dbDir, 1*time.Minute, nil)// ApplyConfig 加载queryablesremoteReadC := make([]*pc.RemoteReadConfig, 0)for _, u := range pd.Section.RemoteRead {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}remoteReadC = append(remoteReadC,&pc.RemoteReadConfig{URL:           &config_util.URL{URL: ur},RemoteTimeout: model.Duration(time.Duration(u.RemoteTimeoutSecond) * time.Second),ReadRecent:    true,},)}if len(remoteReadC) == 0 {logger.Errorf("[prome_ds_error_got_zero_remote_read_storage]")return}err = remoteS.ApplyConfig(&pc.Config{RemoteReadConfigs: remoteReadC})if err != nil {logger.Errorf("[error_load_remote_read_config][err: %v]", err)return}pLogger := log.NewNopLogger()noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{}queryQueueDir, err := ioutil.TempDir(dbDir, "prom_query_concurrency")opts := promql.EngineOpts{Logger:                   log.With(pLogger, "component", "query engine"),Reg:                      prometheus.DefaultRegisterer,MaxSamples:               50000000,Timeout:                  30 * time.Second,ActiveQueryTracker:       promql.NewActiveQueryTracker(queryQueueDir, 20, log.With(pLogger, "component", "activeQueryTracker")),LookbackDelta:            5 * time.Minute,NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,EnableAtModifier:         true,}queryEngine := promql.NewEngine(opts)pd.QueryEngine = queryEnginepd.Queryable = remoteS// 初始化writeClientsif len(pd.Section.RemoteWrite) == 0 {logger.Warningf("[prome_ds_init_with_zero_RemoteWrite_target]")logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(pd.Section.RemoteRead),len(pd.Section.RemoteWrite),)return}writeTs := make([]*HttpClient, 0)for _, u := range pd.Section.RemoteWrite {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}writeTs = append(writeTs,&HttpClient{remoteName: u.Name,url:        ur,Client:     &http.Client{},timeout:    time.Duration(u.RemoteTimeoutSecond) * time.Second,})}pd.WriteTargets = writeTs// 开启prometheus 队列消费协程go pd.remoteWrite()logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(remoteReadC),len(writeTs),)
}

创建本地存储目录和remote-storage

  • 模拟创建本地存储目录
	// 模拟创建本地存储目录dbDir, err := ioutil.TempDir("", "tsdb-api-ready")if err != nil {logger.Errorf("[error_create_local_tsdb_dir][err: %v]", err)return}pd.LocalTmpDir = dbDir
  • 使用本地目录创建remote-storage
	// 使用本地目录创建remote-storageremoteS := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {return 0, nil}, dbDir, 1*time.Minute, nil)

创建remote_read对象

  • 遍历配置中的remote_read,构造RemoteReadConfig
  • 使用RemoteReadConfig.ApplyConfig 生效配置
	// ApplyConfig 加载queryablesremoteReadC := make([]*pc.RemoteReadConfig, 0)for _, u := range pd.Section.RemoteRead {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}remoteReadC = append(remoteReadC,&pc.RemoteReadConfig{URL:           &config_util.URL{URL: ur},RemoteTimeout: model.Duration(time.Duration(u.RemoteTimeoutSecond) * time.Second),ReadRecent:    true,},)}if len(remoteReadC) == 0 {logger.Errorf("[prome_ds_error_got_zero_remote_read_storage]")return}err = remoteS.ApplyConfig(&pc.Config{RemoteReadConfigs: remoteReadC})if err != nil {logger.Errorf("[error_load_remote_read_config][err: %v]", err)return}

创建QueryEngine并赋值

	noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{}queryQueueDir, err := ioutil.TempDir(dbDir, "prom_query_concurrency")opts := promql.EngineOpts{Logger:                   log.With(pLogger, "component", "query engine"),Reg:                      prometheus.DefaultRegisterer,MaxSamples:               50000000,Timeout:                  30 * time.Second,ActiveQueryTracker:       promql.NewActiveQueryTracker(queryQueueDir, 20, log.With(pLogger, "component", "activeQueryTracker")),LookbackDelta:            5 * time.Minute,NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,EnableAtModifier:         true,}queryEngine := promql.NewEngine(opts)pd.QueryEngine = queryEnginepd.Queryable = remoteS

初始化writeClients创建RemoteWrite对象

  • 遍历RemoteWrite配置创建
  • 开启prometheus 队列消费协程
	// 初始化writeClientsif len(pd.Section.RemoteWrite) == 0 {logger.Warningf("[prome_ds_init_with_zero_RemoteWrite_target]")logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(pd.Section.RemoteRead),len(pd.Section.RemoteWrite),)return}writeTs := make([]*HttpClient, 0)for _, u := range pd.Section.RemoteWrite {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}writeTs = append(writeTs,&HttpClient{remoteName: u.Name,url:        ur,Client:     &http.Client{},timeout:    time.Duration(u.RemoteTimeoutSecond) * time.Second,})}pd.WriteTargets = writeTs// 开启prometheus 队列消费协程go pd.remoteWrite()logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(remoteReadC),len(writeTs),)

本节重点总结 :

  • 项目要求
    • 通过remote read读取prometheus中的数据
    • 通过remote write向prometheus中写入数据
  • 准备工作
    • 新建项目 prome_remote_read_write
    • 设计prometheus 数据源的结构
    • 初始化

相关文章:

33.5 remote实战项目之设计prometheus数据源的结构

本节重点介绍 : 项目要求 通过remote read读取prometheus中的数据通过remote write向prometheus中写入数据 准备工作 新建项目 prome_remote_read_write设计prometheus 数据源的结构初始化 项目要求 通过remote read读取prometheus中的数据通过remote write向prometheus中写…...

微服务springboot详细解析(一)

目录 1.Spring概述 2.什么是SpringBoot? 3.第一个SpringBoot程序 4.配置参数优先级 5.springboot自动装配原理 6.SpringBootApplication&SpringApplication.run 7.ConfigurationProperties(prefix "") 8.Validated数据校验 29、聊聊该如何写一…...

深入探讨Go语言中的双向链表

简介 双向链表是链表家族中的一种高级结构,每个节点不仅指向下一个节点,还指向上一个节点。今天,我们将学习如何在Go语言中实现和操作这种灵活的数据结构。 双向链表的优缺点 优点: 可以从任一方向遍历链表,灵活性高…...

Fastapi + vue3 自动化测试平台---移动端App自动化篇

概述 好久写文章了,专注于新框架,新UI界面的实践,废话不多说,开搞 技术架构 后端: Fastapi Airtest multiprocessing 前端: 基于 Vue3、Vite、TypeScript、Pinia、Pinia持久化插件、Unocss 和 Elemen…...

ElasticSearch easy-es 聚合函数 group by 混合写法求Top N 词云 分词

1.将用户访问记录表数据同步到ES&#xff0c;并且分词&#xff0c;获取用户访问最多前十条词语。 Elasticsearch、Easy-es 快速入门 SearchAfterPage分页 若依前后端分离 Ruoyi-Vue SpringBoot 使用结巴分词器 <!-- 分词器--><dependency><groupId>com.hua…...

在 ASP.NET C# Web API 中实现 Serilog 以增强请求和响应的日志记录

介绍 日志记录是任何 Web 应用程序的关键方面。它有助于调试、性能监控和了解用户交互。在 ASP.NET C# 中&#xff0c;集成 Serilog 作为记录请求和响应&#xff08;包括传入和传出的数据&#xff09;的中间件可以显著提高 Web API 的可观察性和故障排除能力。 在过去的几周里&…...

2024年顶级小型语言模型前15名

本文&#xff0c;我们将深入了解2024年备受瞩目的十五款小型语言模型&#xff08;SLMs&#xff09;&#xff0c;它们分别是Llama 3.1 8B、Gemma2、Qwen 2、Mistral Nemo、Phi-3.5等。这些SLMs以其精巧的体积和高效率著称&#xff0c;它们不需要依赖庞大的服务器资源&#xff0c…...

精通 Python 网络安全(一)

前言 最近&#xff0c;Python 开始受到越来越多的关注&#xff0c;最新的 Python 更新添加了许多可用于执行关键任务的包。我们的主要目标是帮助您利用 Python 包来检测和利用漏洞&#xff0c;并解决网络挑战。 本书将首先带您了解与网络和安全相关的 Python 脚本和库。然后&…...

【python自动化二】pytest集成allure生成测试报告

pytest本身不会直接生成测试报告&#xff0c;而allure是一种生成测试报告的公共插件&#xff0c;可与多种测试框架配合生成测试报告&#xff0c;本文介绍下如何集成allure生成测试报告。 1.allure安装 1.安装allure-pytest 先安装allure的pytest插件&#xff0c;用于在pytes…...

网络版本的通讯录青春版(protobuf)

环境搭建 Protobuf 还常⽤于通讯协议、服务端数据交换场景。 因为我们主要目的只是为了学习protobuf&#xff0c;因此对于客户端&#xff0c;原本应该具备&#xff1a; 新增⼀个联系⼈ ◦ 删除⼀个联系⼈ ◦ 查询通讯录列表 ◦ 查询⼀个联系⼈的详细信息 这样四个功能。 …...

开源模型应用落地-安全合规篇-用户输入价值观判断(三)

一、前言 在深度合规功能中,对用户输入内容的价值观判断具有重要意义。这一功能不仅仅是对信息合法性和合规性的简单审核,更是对信息背后隐含的伦理道德和社会责任的深刻洞察。通过对价值观的判断,系统能够识别可能引发不当影响或冲突的内容,从而为用户提供更安全、更和谐的…...

神经网络入门实战:(十四)pytorch 官网内置的 CIFAR10 数据集,及其网络模型

(一) pytorch 官网内置的网络模型 图像处理&#xff1a; Models and pre-trained weights — Torchvision 0.20 documentation (二) CIFAR10数据集的分类网络模型&#xff08;仅前向传播&#xff09;&#xff1a; 下方的网络模型图片有误&#xff0c;已做修改&#xff0c;具…...

【Rust在WASM中实现pdf文件的生成】

Rust在WASM中实现pdf文件的生成 前言概念和依赖问题描述分步实现pdf转Blob生成URL两种方式利用localstorage传递参数处理图片Vec<u8>到pdf格式的Vec<u8>使用rust创建iframe显示pdf的Blob最后 前言 实现了一个通用的前端jpg转pdf的wasm,因为动态响应框架无法直接打…...

在MySQL中执行sum case when报错:SUM does not exist

1. 报错 在pgsql中能正常运行的一段SQL在MySQL中运行的时候报错了&#xff1a; SELECT DATE( hr.handle_time ) AS statsDate,SUM ( CASE WHEN hma.app_type IN ( 2, 5 ) THEN ch_money ELSE 0 END ) AS aliPayAmt,SUM ( CASE WHEN hma.app_type IN ( 1, 4 ) THEN ch_money EL…...

【openssl】相关指令

熟悉下相关概念 x509&#xff1a;证书标准pem和der&#xff1a;两种&#xff08;包括公私钥、证书签名请求、证书等内容的&#xff09;的格式&#xff0c;前者是文本形式&#xff0c;linux常用&#xff0c;后者是二进制形式&#xff0c;windows常用&#xff0c;仅仅是格式&…...

实例分割详解

实例分割详解 引言 实例分割是计算机视觉领域的一项复杂任务&#xff0c;它要求模型能够识别图像中不同类别的对象&#xff0c;并对每个单独的对象进行像素级别的分类。与语义分割不同的是&#xff0c;实例分割不仅要区分不同的类别&#xff0c;还要识别同一类别中的不同个体…...

D87【python 接口自动化学习】- pytest基础用法

day87 pytest运行参数 -m -k 学习日期&#xff1a;20241203 学习目标&#xff1a;pytest基础用法 -- pytest运行参数-m -k 学习笔记&#xff1a; 常用运行参数 pytest运行参数-m -k pytest -m 执行特定的测试用例&#xff0c;markers最好使用英文 [pytest] testpaths./te…...

浅谈MySQL路由

华子目录 mysql-router介绍下载mysql-router安装mysql-router实验 mysql-router介绍 mysql-router是一个对应用程序透明的InnoDB Cluster连接路由服务&#xff0c;提供负载均衡、应用连接故障转移和客户端路由利用路由器的连接路由特性&#xff0c;用户可以编写应用程序来连接到…...

matlab中disp,fprintf,sprintf,display,dlmwrite输出函数之间的区别

下面是他们之间的区别&#xff1a; disp函数与fprintf函数的区别 输出格式的灵活性 disp函数&#xff1a;输出格式相对固定。它会自动将变量以一种比较直接的方式显示出来。对于数组&#xff0c;会按照行列形式展示&#xff1b;对于字符串&#xff0c;直接原样输出并换行。例如…...

30.100ASK_T113-PRO 用QT编写视频播放器(一)

1.再buildroot中添加视频解码库 X264, 执行 make menuconfig Target packages -->Libraries --> Multimedia --> X264 CLI 还需要添加 FFmpeg 2. 保存,重新编译 make all 3.将镜像下载开发板...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版​分享

平时用 iPhone 的时候&#xff0c;难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵&#xff0c;或者买了二手 iPhone 却被原来的 iCloud 账号锁住&#xff0c;这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)

设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile&#xff0c;新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

听写流程自动化实践,轻量级教育辅助

随着智能教育工具的发展&#xff0c;越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式&#xff0c;也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建&#xff0c;…...

【分享】推荐一些办公小工具

1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由&#xff1a;大部分的转换软件需要收费&#xff0c;要么功能不齐全&#xff0c;而开会员又用不了几次浪费钱&#xff0c;借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...

无人机侦测与反制技术的进展与应用

国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机&#xff08;无人驾驶飞行器&#xff0c;UAV&#xff09;技术的快速发展&#xff0c;其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统&#xff0c;无人机的“黑飞”&…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机

这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机&#xff0c;因为在使用过程中发现 Airsim 对外部监控相机的描述模糊&#xff0c;而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置&#xff0c;最后在源码示例中找到了&#xff0c;所以感…...