当前位置: 首页 > news >正文

腾讯mini项目-【指标监控服务重构】2023-08-12

今日已办

Watermill

Handler

将 4 个阶段的逻辑处理定义为 Handler

image-20230812100758947

image-20230812100744775

测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。

参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)

Middleware

ProfileCtx实现 context.Context 接口

// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""go.uber.org/zap""profile/internal/config""profile/internal/log""profile/internal/schema""time"
)// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {// Properties that can be called by inherited subclassesStatus intCtx    context.ContextRouter *message.RouterEvent  schema.EventAppID         string // API 上报FetchScenario string // API 上报
}// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Return *ProfileContext
func NewProfileContext() *ProfileContext {profileCtx := &ProfileContext{Ctx: context.Background(),}profileCtx.init()return profileCtx
}// init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) init() {logger := watermill.NewStdLogger(false, false)saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers:               []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler:           kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup:         config.Profile.GetString("kafka.group"),},logger,)if err != nil {log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.CorrelationID,middleware.Retry{MaxRetries:      3,InitialInterval: time.Millisecond * 100,Logger:          logger,}.Middleware,middleware.Recoverer,)topic := "to_analyzer__0.PERF_CRASH"router.AddNoPublisherHandler("WriteKafka", topic, subscriber, profileCtx.WriteKafka).AddMiddleware(profileCtx.UnpackKafkaMessage,profileCtx.InitPerformanceEvent,profileCtx.AnalyzeEvent,)profileCtx.Router = router
}// Run
// @Description
// @Author xzx 2023-08-12 13:52:53
func (profileCtx *ProfileContext) Run() {// router.Run contains defer cancel()if err := profileCtx.Router.Run(profileCtx.Ctx); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}func (profileCtx *ProfileContext) Done() <-chan struct{} {return profileCtx.Ctx.Done()
}func (profileCtx *ProfileContext) Err() error {return profileCtx.Ctx.Err()
}func (profileCtx *ProfileContext) Deadline() (deadline time.Time, ok bool) {return profileCtx.Ctx.Deadline()
}func (profileCtx *ProfileContext) Value(key any) any {return profileCtx.Ctx.Value(key)
}

【测试】前 3 个阶段处理为 Middleware,最后一个阶段为 Handler

// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("encoding/json""github.com/ThreeDotsLabs/watermill/message""github.com/segmentio/kafka-go""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""profile/internal/state"
)// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {// 反序列化,存入通用结构体if contextErr := json.Unmarshal(message.Payload, &profileCtx.Event); contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn h(message)}log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))message.SetContext(profileCtx)return h(message)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)if contextErr != nil {profileCtx.Status = state.StatusEventFactoryErrorreturn h(message)}log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", profileCtx.Event))profileCtx.Event.ProfileData = eventmessage.SetContext(profileCtx)return h(message)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)contextErr := profileCtx.Event.ProfileData.Analyze()if contextErr != nil {profileCtx.Status = state.StatusAnalyzeErrorreturn h(message)}log.Logger.Info("[AnalyzeEvent] analyze event success", zap.Any("event", profileCtx.Event))// clear dimensions and valuesprofileCtx.Event.Dimensions = nilprofileCtx.Event.Values = nilmessage.SetContext(profileCtx)return h(message)}
}// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {profileCtx = msg.Context().(*ProfileContext)toWriteBytes, contextErr := json.Marshal(profileCtx.Event)if contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn}topic := connector.GetTopic(profileCtx.Event.Category)contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafka.Message{Topic: topic,Key:   []byte(profileCtx.Event.ID),Value: toWriteBytes,})if contextErr != nil {profileCtx.Status = state.StatusWriteKafkaErrorreturn}log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))return
}

可以实现正常的效果

image-20230812130912792

Router

  • 目前的 topic 是固定写死的,要考虑正则表达式,将不同 topic 的共有逻辑抽出为 Middleware,特定逻辑抽出为 Handler
  • 消息处理似乎不是并发的

pub/sub kafka-go

  • custom pub/sub

  • Kafka Pub/Sub for the Watermill project, based on Shopify’s Sarama

  • qiulin/watermill-kafkago: Kafka Pub/Sub for the Watermill project, based on segmentio/kafka-go (github.com)

明日待办

  • 组内开会
  • 继续开发需求

相关文章:

腾讯mini项目-【指标监控服务重构】2023-08-12

今日已办 Watermill Handler 将 4 个阶段的逻辑处理定义为 Handler 测试发现&#xff0c;添加的 handler 会被覆盖掉&#xff0c;故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。 参考https://watermill.io/docs/messages-router/实现不同topic&am…...

kubeadm部署k8sv1.24使用cri-docker做为CRI

目的 测试使用cri-docker做为containerd和docker的中间层垫片。 规划 IP系统主机名10.0.6.5ubuntu 22.04.3 jammymaster01.kktb.org10.0.6.6ubuntu 22.04.3 jammymaster02.kktb.org10.0.6.7ubuntu 22.04.3 jammymaster03.kktb.org 配置 步骤&#xff1a; 系统优化 禁用sw…...

在c#中使用CancellationToken取消任务

目录 &#x1f680;介绍&#xff1a; &#x1f424;简单举例 &#x1f680;IsCancellationRequested &#x1f680;ThrowIfCancellationRequested &#x1f424;在控制器中使用 &#x1f680;通过异步方法的参数使用cancellationToken &#x1f680;api结合ThrowIfCancel…...

【项目经验】:elementui多选表格默认选中

一.需求 在页面刚打开就默认选中指定项。 二.方法Table Methods toggleRowSelection用于多选表格&#xff0c;切换某一行的选中状态&#xff0c;如果使用了第二个参数&#xff0c;则是设置这一行选中与否&#xff08;selected 为 true 则选中&#xff09;row, selected 详细…...

外星人入侵游戏-(创新版)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…...

HTML 学习笔记(基础)

它是超文本标记语言&#xff0c;由一大堆约定俗成的标签组成&#xff0c;而其标签里一般又有一些属性值可以设置。 W3C标准&#xff1a;网页主要三大部分 结构&#xff1a;HTML表现&#xff1a;CSS行为&#xff1a;JavaScript <!DOCTYPE html> <html lang"zh-…...

最小二乘法

Least Square Method 1、相关的矩阵公式2、线性回归3、最小二乘法3.1、损失函数&#xff08;Loss Function&#xff09;3.2、多维空间的损失函数3.3、解析法求解3.4、梯度下降法求解 1、相关的矩阵公式 P r e c o n d i t i o n : ξ ∈ R n , A ∈ R n ∗ n i : σ A ξ σ ξ…...

使用stelnet进行安全的远程管理

1. telnet有哪些不足&#xff1f; 2.ssh如何保证数据传输安全&#xff1f; 需求&#xff1a;远程telnet管理设备 用户定义需要在AAA模式下&#xff1a; 开启远程登录的服务&#xff1a;定义vty接口 然后从R2登录&#xff1a;是可以登录的 同理R3登录&#xff1a; 在R1也可以查…...

python 二手车数据分析以及价格预测

二手车交易信息爬取、数据分析以及交易价格预测 引言一、数据爬取1.1 解析数据1.2 编写代码爬1.2.1 获取详细信息1.2.2 数据处理 二、数据分析2.1 统计分析2.2 可视化分析 三、价格预测3.1 价格趋势分析(特征分析)3.2 价格预测 引言 本文着眼于车辆信息&#xff0c;结合当下较…...

JAVA医药进销存管理系统(附源码+调试)

JAVA医药进销存管理系统 功能描述 &#xff08;1&#xff09;登录模块&#xff1a;登录信息等存储在数据库中 &#xff08;2&#xff09;基本信息模块&#xff1a;分为药品信息模块、客户情况模块、供应商情况模块&#xff1b; &#xff08;3&#xff09;业务管理模块&#x…...

H5 <blockquote> 标签

主要应用于&#xff1a;内容引用 标签定义及使用说明 <blockquote> 标签定义摘自另一个源的块引用。 浏览器通常会对 <blockquote> 元素进行缩进。 提示和注释 提示&#xff1a;如果标记是不需要段落分隔的短引用&#xff0c;请使用 <q>。 HTML 4.01 与 H…...

nginx配置指南

nginx.conf配置 找到Nginx的安装目录下的nginx.conf文件&#xff0c;该文件负责Nginx的基础功能配置。 配置文件概述 Nginx的主配置文件(conf/nginx.conf)按以下结构组织&#xff1a; 配置块功能描述全局块与Nginx运行相关的全局设置events块与网络连接有关的设置http块代理…...

【数据结构】优先级队列(堆)

文章目录 &#x1f490;1. 优先级队列1.1 概念 &#x1f490;2.堆的概念及存储方式2.1 什么是堆2.2 为什么要用完全二叉树描述堆呢&#xff1f;2.3 为什么说堆是在完全二叉树的基础上进行的调整&#xff1f;2.4 使用数组还原完全二叉树 &#x1f490;3. 堆的常用操作-模拟实现3…...

前端笔试2

1.下面哪一个是检验对象是否有一个以自身定义的属性? foo.hasOwnProperty("bar")bar in foo foo["bar"] ! undefinedfoo.bar ! null 解析&#xff1a; bar in foo 检查 foo 对象是否包含名为 bar 的属性&#xff0c;但是这个属性可以是从原型链继承来的&a…...

LeetCode:66.加一

66.加一 来源:力扣(LeetCode) 链接: https://leetcode.cn/problems/plus-one/description/ 给定一个由 整数 组成的 非空 数组所表示的非负整数,在该数的基础上加一。 最高位数字存放在数组的首位, 数组中每个元素只存储单个数字。 你可以假设除了整数 0 之外,这个整数…...

Redis 常用命令

目录 全局命令 1&#xff09;keys 2&#xff09;exists 3) del(delete) 4&#xff09;expire 5&#xff09;type SET命令 GET命令 MSET 和 MGET命令 其他SET命令 计数命令 redis-cli&#xff0c;进入redis 最核心的命令&#xff1a;我们这里只是先介绍 set 和 get 最简单的操作…...

Integer.valueOf()用于字符和字符串的区别

LeetCode 17 电话号码的字母组合 先贴代码 class Solution {List<String> result new ArrayList<>();String temp new String("");Integer num;public List<String> letterCombinations(String digits) {dfs(digits, 0);return result;} publi…...

机械寿命预测(基于NASA C-MAPSS数据的剩余使用寿命RUL预测,Python代码,CNN_LSTM模型,有详细中文注释)

1.效果视频&#xff1a;机械寿命预测&#xff08;NASA涡轮风扇发动机剩余使用寿命RUL预测&#xff0c;Python代码&#xff0c;CNN_LSTM模型&#xff0c;有详细中文注释&#xff09;_哔哩哔哩_bilibili 环境库版本&#xff1a; 2.数据来源&#xff1a;https://www.nasa.gov/int…...

ConfigMaps-1

文章目录 主要内容一.使用 YAML 文件创建1.在data节点创建了一些键值&#xff1a;代码如下&#xff08;示例&#xff09;: 2.解释 二.使用命令行创建1.创建了一个名为 person 的键值&#xff1a;代码如下&#xff08;示例&#xff09;: 2.解释3.创建了一个 index.html 文件&…...

docker上安装es

安装docker 1 安装docker依赖 yum install -y yum-utils2 设置docker仓库镜像地址 yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo3 安装制定版本的docker yum -y install docker-ce-20.10.17-3.el74 查看是否安装成功 y…...

机器学习在芯片电容提取中的应用与CapBench数据集

1. 电容提取与机器学习结合的背景与挑战在芯片设计流程中&#xff0c;电容提取是决定最终产品性能的关键环节。当设计进入物理实现阶段&#xff0c;工程师需要精确计算互连线之间的寄生电容&#xff0c;这些数据直接影响时序分析和功耗估算的准确性。传统基于场求解器的方法&am…...

AI辅助构建复古像素风Hacker News聚合器:全栈开发实战

1. 项目概述&#xff1a;一个AI驱动的复古风Hacker News聚合器最近在逛Hacker News的时候&#xff0c;我总感觉“Show HN”板块里那些有趣的个人项目像流星一样&#xff0c;刷一下就过去了&#xff0c;想回头再找特别费劲。作为一个喜欢折腾的开发者&#xff0c;我就在想&#…...

卷积运算:数字信号处理的核心原理与实践

1. 卷积在数字信号处理中的核心地位第一次接触卷积这个概念时&#xff0c;我正坐在实验室里调试一个音频滤波器。示波器上的波形始终无法达到预期效果&#xff0c;直到导师走过来画了那个著名的"翻转滑动"示意图。那一刻我突然明白&#xff0c;卷积不是抽象的数学运算…...

量子计算中CV-DV混合门集原理与应用

1. 量子计算中的CV-DV门集基础在混合量子系统中&#xff0c;连续变量(CV)和离散变量(DV)门集的协同工作为量子算法设计提供了独特优势。CV系统通常由量子谐振荡器实现&#xff0c;其状态存在于无限维希尔伯特空间中&#xff0c;而DV系统则以量子比特为基本单元。这两类系统的结…...

智慧工地起重机吊钩检测数据集VOC+YOLO格式1138张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件)图片数量(jpg文件个数)&#xff1a;1138标注数量(xml文件个数)&#xff1a;1138标注数量(txt文件个数)&#xff1a;1138标注类别…...

ARM架构CNTHP_CTL_EL2寄存器详解与虚拟化应用

1. ARM架构中的CNTHP_CTL_EL2寄存器深度解析在ARMv8-A架构的虚拟化环境中&#xff0c;定时器管理是Hypervisor实现高效资源调度和时间隔离的关键组件。作为EL2特权级的物理定时器控制寄存器&#xff0c;CNTHP_CTL_EL2为虚拟化软件提供了精确的计时控制能力。本文将深入剖析该寄…...

基于MCP协议与向量检索,为AI编程助手构建跨会话持久记忆

1. 项目概述&#xff1a;为AI编程助手构建持久记忆如果你和我一样&#xff0c;日常重度依赖Cursor、Claude Code、Windsurf这类AI编程助手&#xff0c;那你一定遇到过这个让人头疼的场景&#xff1a;昨天在Cursor里花了半小时跟AI解释清楚了一个复杂模块的业务逻辑和设计思路&a…...

一键部署Obsidian环境:自动化脚本实现跨设备配置同步

1. 项目概述&#xff1a;为什么我们需要一个“一键式”的 Obsidian 安装脚本&#xff1f;如果你是一个深度依赖 Obsidian 进行知识管理、笔记写作或项目规划的从业者&#xff0c;无论是程序员、作家、学生还是研究员&#xff0c;大概率都经历过这样的场景&#xff1a;换了一台新…...

从好奇号火星着陆看复杂系统工程:天空起重机方案与工程管理启示

1. 项目概述&#xff1a;从“不可能”到“火星新地标”的工程壮举2012年8月6日&#xff0c;当“好奇号”火星车在盖尔陨石坑成功着陆&#xff0c;传回第一张火星地表照片时&#xff0c;整个喷气推进实验室&#xff08;JPL&#xff09;控制中心沸腾了。这不仅仅是一次成功的行星…...

OpenClaw Memory启动器:快速构建AI记忆系统的开源脚手架

1. 项目概述&#xff1a;一个为AI记忆系统设计的开源启动器最近在折腾AI应用开发&#xff0c;特别是那些需要长期记忆和上下文管理的项目时&#xff0c;发现了一个挺有意思的GitHub仓库&#xff1a;christiancaviedes/openclaw-memory-starter。这本质上是一个为“OpenClaw Mem…...