当前位置: 首页 > news >正文

33.5 remote实战项目之设计prometheus数据源的结构

本节重点介绍 :

  • 项目要求
    • 通过remote read读取prometheus中的数据
    • 通过remote write向prometheus中写入数据
  • 准备工作
    • 新建项目 prome_remote_read_write
    • 设计prometheus 数据源的结构
    • 初始化

项目要求

  • 通过remote read读取prometheus中的数据
  • 通过remote write向prometheus中写入数据

准备工作

新建项目 prome_remote_read_write

go mod init prome_remote_read_write

准备配置文件 prome_remote_read_write.yml

  • remoteWrite代表 支持remote_write的多个后端
  • remoteRead代表 支持remote_read的多个后端
remoteWrite:# m3db的配置#- name: m3db01#  url: http://localhost:7201/api/v1/prom/remote/write#  remoteTimeoutSecond: 5# prometheus的配置- name: prome01url: http://172.20.70.205:9090/api/v1/writeremoteTimeoutSecond: 5
remoteRead:- name: prome01url: http://172.20.70.205:9090/api/v1/readremoteTimeoutSecond: 5

配置文件解析

  • config/config.go
package configimport ("github.com/toolkits/pkg/logger""gopkg.in/yaml.v2""io/ioutil"
)type RemoteConfig struct {Name                string `yaml:"name"`Url                 string `yaml:"url"`RemoteTimeoutSecond int    `yaml:"remoteTimeoutSecond"`
}type PromeSection struct {RemoteWrite []RemoteConfig `yaml:"remoteWrite"`RemoteRead  []RemoteConfig `yaml:"remoteRead"`
}func Load(s string) (*PromeSection, error) {cfg := &PromeSection{}err := yaml.Unmarshal([]byte(s), cfg)if err != nil {return nil, err}return cfg, nil
}func LoadFile(filename string) (*PromeSection, error) {content, err := ioutil.ReadFile(filename)if err != nil {return nil, err}cfg, err := Load(string(content))if err != nil {logger.Errorf("[parsing YAML file errr...][error:%v]", err)return nil, err}return cfg, nil
}

main.go中解析配置

package mainimport ("flag""github.com/toolkits/pkg/logger""math/rand""prome_remote_read_write/config""prome_remote_read_write/datasource""time"
)func main() {rand.Seed(time.Now().UnixNano())configFile := flag.String("config", "prome_remote_read_write.yml","Address on which to expose metrics and web interface.")flag.Parse()sConfig, err := config.LoadFile(*configFile)if err != nil {logger.Infof("config.LoadFile Error,Exiting ...error:%v", err)return}
}

设计prometheus 数据源的结构

  • 位置 datasource/prome.go
package datasourceimport ("github.com/go-kit/kit/log""github.com/prometheus/client_golang/prometheus"config_util "github.com/prometheus/common/config""github.com/prometheus/common/model""github.com/prometheus/common/promlog"pc "github.com/prometheus/prometheus/config""github.com/prometheus/prometheus/prompb""github.com/prometheus/prometheus/promql""github.com/prometheus/prometheus/storage""github.com/prometheus/prometheus/storage/remote""github.com/toolkits/pkg/logger""go.uber.org/atomic""io/ioutil""net/http""net/url""prome_remote_read_write/config""time"
)type PromeDataSource struct {Section      *config.PromeSection            //配置PushQueue    chan []prompb.TimeSeries        // 数据推送的chanLocalTmpDir  string                          // 本地临时目录,存放queries.active文件Queryable    storage.SampleAndChunkQueryable // 除了promql的查询,需要后端存储,如查询seriesQueryEngine  *promql.Engine                  // promql相关查询WriteTargets []*HttpClient                   // remote_write写入的后端地址
}type HttpClient struct {remoteName string // Used to differentiate clients in metrics.url        *url.URLClient     *http.Clienttimeout    time.Duration
}

new函数

  • 根据传入的配置new
func NewPromeDataSource(cg *config.PromeSection) *PromeDataSource {pd := &PromeDataSource{Section:   cg,PushQueue: make(chan []prompb.TimeSeries, 10000),}return pd
}

Init初始化函数

  • 完整代码如下

type safePromQLNoStepSubqueryInterval struct {value atomic.Int64
}func durationToInt64Millis(d time.Duration) int64 {return int64(d / time.Millisecond)
}
func (i *safePromQLNoStepSubqueryInterval) Set(ev model.Duration) {i.value.Store(durationToInt64Millis(time.Duration(ev)))
}
func (i *safePromQLNoStepSubqueryInterval) Get(int64) int64 {return i.value.Load()
}func NewPromeDataSource(cg *config.PromeSection) *PromeDataSource {pd := &PromeDataSource{Section:   cg,PushQueue: make(chan []prompb.TimeSeries, 10000),}return pd
}func (pd *PromeDataSource) Init() {// 模拟创建本地存储目录dbDir, err := ioutil.TempDir("", "tsdb-api-ready")if err != nil {logger.Errorf("[error_create_local_tsdb_dir][err: %v]", err)return}pd.LocalTmpDir = dbDirpromlogConfig := promlog.Config{}// 使用本地目录创建remote-storageremoteS := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {return 0, nil}, dbDir, 1*time.Minute, nil)// ApplyConfig 加载queryablesremoteReadC := make([]*pc.RemoteReadConfig, 0)for _, u := range pd.Section.RemoteRead {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}remoteReadC = append(remoteReadC,&pc.RemoteReadConfig{URL:           &config_util.URL{URL: ur},RemoteTimeout: model.Duration(time.Duration(u.RemoteTimeoutSecond) * time.Second),ReadRecent:    true,},)}if len(remoteReadC) == 0 {logger.Errorf("[prome_ds_error_got_zero_remote_read_storage]")return}err = remoteS.ApplyConfig(&pc.Config{RemoteReadConfigs: remoteReadC})if err != nil {logger.Errorf("[error_load_remote_read_config][err: %v]", err)return}pLogger := log.NewNopLogger()noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{}queryQueueDir, err := ioutil.TempDir(dbDir, "prom_query_concurrency")opts := promql.EngineOpts{Logger:                   log.With(pLogger, "component", "query engine"),Reg:                      prometheus.DefaultRegisterer,MaxSamples:               50000000,Timeout:                  30 * time.Second,ActiveQueryTracker:       promql.NewActiveQueryTracker(queryQueueDir, 20, log.With(pLogger, "component", "activeQueryTracker")),LookbackDelta:            5 * time.Minute,NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,EnableAtModifier:         true,}queryEngine := promql.NewEngine(opts)pd.QueryEngine = queryEnginepd.Queryable = remoteS// 初始化writeClientsif len(pd.Section.RemoteWrite) == 0 {logger.Warningf("[prome_ds_init_with_zero_RemoteWrite_target]")logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(pd.Section.RemoteRead),len(pd.Section.RemoteWrite),)return}writeTs := make([]*HttpClient, 0)for _, u := range pd.Section.RemoteWrite {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}writeTs = append(writeTs,&HttpClient{remoteName: u.Name,url:        ur,Client:     &http.Client{},timeout:    time.Duration(u.RemoteTimeoutSecond) * time.Second,})}pd.WriteTargets = writeTs// 开启prometheus 队列消费协程go pd.remoteWrite()logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(remoteReadC),len(writeTs),)
}

创建本地存储目录和remote-storage

  • 模拟创建本地存储目录
	// 模拟创建本地存储目录dbDir, err := ioutil.TempDir("", "tsdb-api-ready")if err != nil {logger.Errorf("[error_create_local_tsdb_dir][err: %v]", err)return}pd.LocalTmpDir = dbDir
  • 使用本地目录创建remote-storage
	// 使用本地目录创建remote-storageremoteS := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {return 0, nil}, dbDir, 1*time.Minute, nil)

创建remote_read对象

  • 遍历配置中的remote_read,构造RemoteReadConfig
  • 使用RemoteReadConfig.ApplyConfig 生效配置
	// ApplyConfig 加载queryablesremoteReadC := make([]*pc.RemoteReadConfig, 0)for _, u := range pd.Section.RemoteRead {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}remoteReadC = append(remoteReadC,&pc.RemoteReadConfig{URL:           &config_util.URL{URL: ur},RemoteTimeout: model.Duration(time.Duration(u.RemoteTimeoutSecond) * time.Second),ReadRecent:    true,},)}if len(remoteReadC) == 0 {logger.Errorf("[prome_ds_error_got_zero_remote_read_storage]")return}err = remoteS.ApplyConfig(&pc.Config{RemoteReadConfigs: remoteReadC})if err != nil {logger.Errorf("[error_load_remote_read_config][err: %v]", err)return}

创建QueryEngine并赋值

	noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{}queryQueueDir, err := ioutil.TempDir(dbDir, "prom_query_concurrency")opts := promql.EngineOpts{Logger:                   log.With(pLogger, "component", "query engine"),Reg:                      prometheus.DefaultRegisterer,MaxSamples:               50000000,Timeout:                  30 * time.Second,ActiveQueryTracker:       promql.NewActiveQueryTracker(queryQueueDir, 20, log.With(pLogger, "component", "activeQueryTracker")),LookbackDelta:            5 * time.Minute,NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,EnableAtModifier:         true,}queryEngine := promql.NewEngine(opts)pd.QueryEngine = queryEnginepd.Queryable = remoteS

初始化writeClients创建RemoteWrite对象

  • 遍历RemoteWrite配置创建
  • 开启prometheus 队列消费协程
	// 初始化writeClientsif len(pd.Section.RemoteWrite) == 0 {logger.Warningf("[prome_ds_init_with_zero_RemoteWrite_target]")logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(pd.Section.RemoteRead),len(pd.Section.RemoteWrite),)return}writeTs := make([]*HttpClient, 0)for _, u := range pd.Section.RemoteWrite {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}writeTs = append(writeTs,&HttpClient{remoteName: u.Name,url:        ur,Client:     &http.Client{},timeout:    time.Duration(u.RemoteTimeoutSecond) * time.Second,})}pd.WriteTargets = writeTs// 开启prometheus 队列消费协程go pd.remoteWrite()logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(remoteReadC),len(writeTs),)

本节重点总结 :

  • 项目要求
    • 通过remote read读取prometheus中的数据
    • 通过remote write向prometheus中写入数据
  • 准备工作
    • 新建项目 prome_remote_read_write
    • 设计prometheus 数据源的结构
    • 初始化

相关文章:

33.5 remote实战项目之设计prometheus数据源的结构

本节重点介绍 : 项目要求 通过remote read读取prometheus中的数据通过remote write向prometheus中写入数据 准备工作 新建项目 prome_remote_read_write设计prometheus 数据源的结构初始化 项目要求 通过remote read读取prometheus中的数据通过remote write向prometheus中写…...

微服务springboot详细解析(一)

目录 1.Spring概述 2.什么是SpringBoot? 3.第一个SpringBoot程序 4.配置参数优先级 5.springboot自动装配原理 6.SpringBootApplication&SpringApplication.run 7.ConfigurationProperties(prefix "") 8.Validated数据校验 29、聊聊该如何写一…...

深入探讨Go语言中的双向链表

简介 双向链表是链表家族中的一种高级结构,每个节点不仅指向下一个节点,还指向上一个节点。今天,我们将学习如何在Go语言中实现和操作这种灵活的数据结构。 双向链表的优缺点 优点: 可以从任一方向遍历链表,灵活性高…...

Fastapi + vue3 自动化测试平台---移动端App自动化篇

概述 好久写文章了,专注于新框架,新UI界面的实践,废话不多说,开搞 技术架构 后端: Fastapi Airtest multiprocessing 前端: 基于 Vue3、Vite、TypeScript、Pinia、Pinia持久化插件、Unocss 和 Elemen…...

ElasticSearch easy-es 聚合函数 group by 混合写法求Top N 词云 分词

1.将用户访问记录表数据同步到ES&#xff0c;并且分词&#xff0c;获取用户访问最多前十条词语。 Elasticsearch、Easy-es 快速入门 SearchAfterPage分页 若依前后端分离 Ruoyi-Vue SpringBoot 使用结巴分词器 <!-- 分词器--><dependency><groupId>com.hua…...

在 ASP.NET C# Web API 中实现 Serilog 以增强请求和响应的日志记录

介绍 日志记录是任何 Web 应用程序的关键方面。它有助于调试、性能监控和了解用户交互。在 ASP.NET C# 中&#xff0c;集成 Serilog 作为记录请求和响应&#xff08;包括传入和传出的数据&#xff09;的中间件可以显著提高 Web API 的可观察性和故障排除能力。 在过去的几周里&…...

2024年顶级小型语言模型前15名

本文&#xff0c;我们将深入了解2024年备受瞩目的十五款小型语言模型&#xff08;SLMs&#xff09;&#xff0c;它们分别是Llama 3.1 8B、Gemma2、Qwen 2、Mistral Nemo、Phi-3.5等。这些SLMs以其精巧的体积和高效率著称&#xff0c;它们不需要依赖庞大的服务器资源&#xff0c…...

精通 Python 网络安全(一)

前言 最近&#xff0c;Python 开始受到越来越多的关注&#xff0c;最新的 Python 更新添加了许多可用于执行关键任务的包。我们的主要目标是帮助您利用 Python 包来检测和利用漏洞&#xff0c;并解决网络挑战。 本书将首先带您了解与网络和安全相关的 Python 脚本和库。然后&…...

【python自动化二】pytest集成allure生成测试报告

pytest本身不会直接生成测试报告&#xff0c;而allure是一种生成测试报告的公共插件&#xff0c;可与多种测试框架配合生成测试报告&#xff0c;本文介绍下如何集成allure生成测试报告。 1.allure安装 1.安装allure-pytest 先安装allure的pytest插件&#xff0c;用于在pytes…...

网络版本的通讯录青春版(protobuf)

环境搭建 Protobuf 还常⽤于通讯协议、服务端数据交换场景。 因为我们主要目的只是为了学习protobuf&#xff0c;因此对于客户端&#xff0c;原本应该具备&#xff1a; 新增⼀个联系⼈ ◦ 删除⼀个联系⼈ ◦ 查询通讯录列表 ◦ 查询⼀个联系⼈的详细信息 这样四个功能。 …...

开源模型应用落地-安全合规篇-用户输入价值观判断(三)

一、前言 在深度合规功能中,对用户输入内容的价值观判断具有重要意义。这一功能不仅仅是对信息合法性和合规性的简单审核,更是对信息背后隐含的伦理道德和社会责任的深刻洞察。通过对价值观的判断,系统能够识别可能引发不当影响或冲突的内容,从而为用户提供更安全、更和谐的…...

神经网络入门实战:(十四)pytorch 官网内置的 CIFAR10 数据集,及其网络模型

(一) pytorch 官网内置的网络模型 图像处理&#xff1a; Models and pre-trained weights — Torchvision 0.20 documentation (二) CIFAR10数据集的分类网络模型&#xff08;仅前向传播&#xff09;&#xff1a; 下方的网络模型图片有误&#xff0c;已做修改&#xff0c;具…...

【Rust在WASM中实现pdf文件的生成】

Rust在WASM中实现pdf文件的生成 前言概念和依赖问题描述分步实现pdf转Blob生成URL两种方式利用localstorage传递参数处理图片Vec<u8>到pdf格式的Vec<u8>使用rust创建iframe显示pdf的Blob最后 前言 实现了一个通用的前端jpg转pdf的wasm,因为动态响应框架无法直接打…...

在MySQL中执行sum case when报错:SUM does not exist

1. 报错 在pgsql中能正常运行的一段SQL在MySQL中运行的时候报错了&#xff1a; SELECT DATE( hr.handle_time ) AS statsDate,SUM ( CASE WHEN hma.app_type IN ( 2, 5 ) THEN ch_money ELSE 0 END ) AS aliPayAmt,SUM ( CASE WHEN hma.app_type IN ( 1, 4 ) THEN ch_money EL…...

【openssl】相关指令

熟悉下相关概念 x509&#xff1a;证书标准pem和der&#xff1a;两种&#xff08;包括公私钥、证书签名请求、证书等内容的&#xff09;的格式&#xff0c;前者是文本形式&#xff0c;linux常用&#xff0c;后者是二进制形式&#xff0c;windows常用&#xff0c;仅仅是格式&…...

实例分割详解

实例分割详解 引言 实例分割是计算机视觉领域的一项复杂任务&#xff0c;它要求模型能够识别图像中不同类别的对象&#xff0c;并对每个单独的对象进行像素级别的分类。与语义分割不同的是&#xff0c;实例分割不仅要区分不同的类别&#xff0c;还要识别同一类别中的不同个体…...

D87【python 接口自动化学习】- pytest基础用法

day87 pytest运行参数 -m -k 学习日期&#xff1a;20241203 学习目标&#xff1a;pytest基础用法 -- pytest运行参数-m -k 学习笔记&#xff1a; 常用运行参数 pytest运行参数-m -k pytest -m 执行特定的测试用例&#xff0c;markers最好使用英文 [pytest] testpaths./te…...

浅谈MySQL路由

华子目录 mysql-router介绍下载mysql-router安装mysql-router实验 mysql-router介绍 mysql-router是一个对应用程序透明的InnoDB Cluster连接路由服务&#xff0c;提供负载均衡、应用连接故障转移和客户端路由利用路由器的连接路由特性&#xff0c;用户可以编写应用程序来连接到…...

matlab中disp,fprintf,sprintf,display,dlmwrite输出函数之间的区别

下面是他们之间的区别&#xff1a; disp函数与fprintf函数的区别 输出格式的灵活性 disp函数&#xff1a;输出格式相对固定。它会自动将变量以一种比较直接的方式显示出来。对于数组&#xff0c;会按照行列形式展示&#xff1b;对于字符串&#xff0c;直接原样输出并换行。例如…...

30.100ASK_T113-PRO 用QT编写视频播放器(一)

1.再buildroot中添加视频解码库 X264, 执行 make menuconfig Target packages -->Libraries --> Multimedia --> X264 CLI 还需要添加 FFmpeg 2. 保存,重新编译 make all 3.将镜像下载开发板...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

工业安全零事故的智能守护者:一体化AI智能安防平台

前言&#xff1a; 通过AI视觉技术&#xff0c;为船厂提供全面的安全监控解决方案&#xff0c;涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面&#xff0c;能够实现对应负责人反馈机制&#xff0c;并最终实现数据的统计报表。提升船厂…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)

概述 在 Swift 开发语言中&#xff0c;各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过&#xff0c;在涉及到多个子类派生于基类进行多态模拟的场景下&#xff0c;…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

Redis数据倾斜问题解决

Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中&#xff0c;部分节点存储的数据量或访问量远高于其他节点&#xff0c;导致这些节点负载过高&#xff0c;影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

2023赣州旅游投资集团

单选题 1.“不登高山&#xff0c;不知天之高也&#xff1b;不临深溪&#xff0c;不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

【JVM】Java虚拟机(二)——垃圾回收

目录 一、如何判断对象可以回收 &#xff08;一&#xff09;引用计数法 &#xff08;二&#xff09;可达性分析算法 二、垃圾回收算法 &#xff08;一&#xff09;标记清除 &#xff08;二&#xff09;标记整理 &#xff08;三&#xff09;复制 &#xff08;四&#xff…...

【Linux系统】Linux环境变量:系统配置的隐形指挥官

。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量&#xff1a;setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...