腾讯mini项目-【指标监控服务重构】2023-08-12
今日已办
Watermill
Handler
将 4 个阶段的逻辑处理定义为 Handler
测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。
参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)
Middleware
ProfileCtx实现 context.Context 接口
// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""go.uber.org/zap""profile/internal/config""profile/internal/log""profile/internal/schema""time"
)// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {// Properties that can be called by inherited subclassesStatus intCtx context.ContextRouter *message.RouterEvent schema.EventAppID string // API 上报FetchScenario string // API 上报
}// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Return *ProfileContext
func NewProfileContext() *ProfileContext {profileCtx := &ProfileContext{Ctx: context.Background(),}profileCtx.init()return profileCtx
}// init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) init() {logger := watermill.NewStdLogger(false, false)saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers: []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler: kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup: config.Profile.GetString("kafka.group"),},logger,)if err != nil {log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.CorrelationID,middleware.Retry{MaxRetries: 3,InitialInterval: time.Millisecond * 100,Logger: logger,}.Middleware,middleware.Recoverer,)topic := "to_analyzer__0.PERF_CRASH"router.AddNoPublisherHandler("WriteKafka", topic, subscriber, profileCtx.WriteKafka).AddMiddleware(profileCtx.UnpackKafkaMessage,profileCtx.InitPerformanceEvent,profileCtx.AnalyzeEvent,)profileCtx.Router = router
}// Run
// @Description
// @Author xzx 2023-08-12 13:52:53
func (profileCtx *ProfileContext) Run() {// router.Run contains defer cancel()if err := profileCtx.Router.Run(profileCtx.Ctx); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}func (profileCtx *ProfileContext) Done() <-chan struct{} {return profileCtx.Ctx.Done()
}func (profileCtx *ProfileContext) Err() error {return profileCtx.Ctx.Err()
}func (profileCtx *ProfileContext) Deadline() (deadline time.Time, ok bool) {return profileCtx.Ctx.Deadline()
}func (profileCtx *ProfileContext) Value(key any) any {return profileCtx.Ctx.Value(key)
}
【测试】前 3 个阶段处理为 Middleware,最后一个阶段为 Handler
// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("encoding/json""github.com/ThreeDotsLabs/watermill/message""github.com/segmentio/kafka-go""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""profile/internal/state"
)// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {// 反序列化,存入通用结构体if contextErr := json.Unmarshal(message.Payload, &profileCtx.Event); contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn h(message)}log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))message.SetContext(profileCtx)return h(message)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)if contextErr != nil {profileCtx.Status = state.StatusEventFactoryErrorreturn h(message)}log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", profileCtx.Event))profileCtx.Event.ProfileData = eventmessage.SetContext(profileCtx)return h(message)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)contextErr := profileCtx.Event.ProfileData.Analyze()if contextErr != nil {profileCtx.Status = state.StatusAnalyzeErrorreturn h(message)}log.Logger.Info("[AnalyzeEvent] analyze event success", zap.Any("event", profileCtx.Event))// clear dimensions and valuesprofileCtx.Event.Dimensions = nilprofileCtx.Event.Values = nilmessage.SetContext(profileCtx)return h(message)}
}// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {profileCtx = msg.Context().(*ProfileContext)toWriteBytes, contextErr := json.Marshal(profileCtx.Event)if contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn}topic := connector.GetTopic(profileCtx.Event.Category)contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafka.Message{Topic: topic,Key: []byte(profileCtx.Event.ID),Value: toWriteBytes,})if contextErr != nil {profileCtx.Status = state.StatusWriteKafkaErrorreturn}log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))return
}
可以实现正常的效果
Router
- 目前的 topic 是固定写死的,要考虑正则表达式,将不同 topic 的共有逻辑抽出为 Middleware,特定逻辑抽出为 Handler
- 消息处理似乎不是并发的
pub/sub kafka-go
-
custom pub/sub
-
Kafka Pub/Sub for the Watermill project, based on Shopify’s Sarama
-
qiulin/watermill-kafkago: Kafka Pub/Sub for the Watermill project, based on segmentio/kafka-go (github.com)
明日待办
- 组内开会
- 继续开发需求
相关文章:

腾讯mini项目-【指标监控服务重构】2023-08-12
今日已办 Watermill Handler 将 4 个阶段的逻辑处理定义为 Handler 测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。 参考https://watermill.io/docs/messages-router/实现不同topic&am…...
kubeadm部署k8sv1.24使用cri-docker做为CRI
目的 测试使用cri-docker做为containerd和docker的中间层垫片。 规划 IP系统主机名10.0.6.5ubuntu 22.04.3 jammymaster01.kktb.org10.0.6.6ubuntu 22.04.3 jammymaster02.kktb.org10.0.6.7ubuntu 22.04.3 jammymaster03.kktb.org 配置 步骤: 系统优化 禁用sw…...

在c#中使用CancellationToken取消任务
目录 🚀介绍: 🐤简单举例 🚀IsCancellationRequested 🚀ThrowIfCancellationRequested 🐤在控制器中使用 🚀通过异步方法的参数使用cancellationToken 🚀api结合ThrowIfCancel…...

【项目经验】:elementui多选表格默认选中
一.需求 在页面刚打开就默认选中指定项。 二.方法Table Methods toggleRowSelection用于多选表格,切换某一行的选中状态,如果使用了第二个参数,则是设置这一行选中与否(selected 为 true 则选中)row, selected 详细…...

外星人入侵游戏-(创新版)
🌈write in front🌈 🧸大家好,我是Aileen🧸.希望你看完之后,能对你有所帮助,不足请指正!共同学习交流. 🆔本文由Aileen_0v0🧸 原创 CSDN首发🐒 如…...

HTML 学习笔记(基础)
它是超文本标记语言,由一大堆约定俗成的标签组成,而其标签里一般又有一些属性值可以设置。 W3C标准:网页主要三大部分 结构:HTML表现:CSS行为:JavaScript <!DOCTYPE html> <html lang"zh-…...

最小二乘法
Least Square Method 1、相关的矩阵公式2、线性回归3、最小二乘法3.1、损失函数(Loss Function)3.2、多维空间的损失函数3.3、解析法求解3.4、梯度下降法求解 1、相关的矩阵公式 P r e c o n d i t i o n : ξ ∈ R n , A ∈ R n ∗ n i : σ A ξ σ ξ…...

使用stelnet进行安全的远程管理
1. telnet有哪些不足? 2.ssh如何保证数据传输安全? 需求:远程telnet管理设备 用户定义需要在AAA模式下: 开启远程登录的服务:定义vty接口 然后从R2登录:是可以登录的 同理R3登录: 在R1也可以查…...

python 二手车数据分析以及价格预测
二手车交易信息爬取、数据分析以及交易价格预测 引言一、数据爬取1.1 解析数据1.2 编写代码爬1.2.1 获取详细信息1.2.2 数据处理 二、数据分析2.1 统计分析2.2 可视化分析 三、价格预测3.1 价格趋势分析(特征分析)3.2 价格预测 引言 本文着眼于车辆信息,结合当下较…...

JAVA医药进销存管理系统(附源码+调试)
JAVA医药进销存管理系统 功能描述 (1)登录模块:登录信息等存储在数据库中 (2)基本信息模块:分为药品信息模块、客户情况模块、供应商情况模块; (3)业务管理模块&#x…...
H5 <blockquote> 标签
主要应用于:内容引用 标签定义及使用说明 <blockquote> 标签定义摘自另一个源的块引用。 浏览器通常会对 <blockquote> 元素进行缩进。 提示和注释 提示:如果标记是不需要段落分隔的短引用,请使用 <q>。 HTML 4.01 与 H…...

nginx配置指南
nginx.conf配置 找到Nginx的安装目录下的nginx.conf文件,该文件负责Nginx的基础功能配置。 配置文件概述 Nginx的主配置文件(conf/nginx.conf)按以下结构组织: 配置块功能描述全局块与Nginx运行相关的全局设置events块与网络连接有关的设置http块代理…...

【数据结构】优先级队列(堆)
文章目录 💐1. 优先级队列1.1 概念 💐2.堆的概念及存储方式2.1 什么是堆2.2 为什么要用完全二叉树描述堆呢?2.3 为什么说堆是在完全二叉树的基础上进行的调整?2.4 使用数组还原完全二叉树 💐3. 堆的常用操作-模拟实现3…...

前端笔试2
1.下面哪一个是检验对象是否有一个以自身定义的属性? foo.hasOwnProperty("bar")bar in foo foo["bar"] ! undefinedfoo.bar ! null 解析: bar in foo 检查 foo 对象是否包含名为 bar 的属性,但是这个属性可以是从原型链继承来的&a…...
LeetCode:66.加一
66.加一 来源:力扣(LeetCode) 链接: https://leetcode.cn/problems/plus-one/description/ 给定一个由 整数 组成的 非空 数组所表示的非负整数,在该数的基础上加一。 最高位数字存放在数组的首位, 数组中每个元素只存储单个数字。 你可以假设除了整数 0 之外,这个整数…...

Redis 常用命令
目录 全局命令 1)keys 2)exists 3) del(delete) 4)expire 5)type SET命令 GET命令 MSET 和 MGET命令 其他SET命令 计数命令 redis-cli,进入redis 最核心的命令:我们这里只是先介绍 set 和 get 最简单的操作…...
Integer.valueOf()用于字符和字符串的区别
LeetCode 17 电话号码的字母组合 先贴代码 class Solution {List<String> result new ArrayList<>();String temp new String("");Integer num;public List<String> letterCombinations(String digits) {dfs(digits, 0);return result;} publi…...

机械寿命预测(基于NASA C-MAPSS数据的剩余使用寿命RUL预测,Python代码,CNN_LSTM模型,有详细中文注释)
1.效果视频:机械寿命预测(NASA涡轮风扇发动机剩余使用寿命RUL预测,Python代码,CNN_LSTM模型,有详细中文注释)_哔哩哔哩_bilibili 环境库版本: 2.数据来源:https://www.nasa.gov/int…...

ConfigMaps-1
文章目录 主要内容一.使用 YAML 文件创建1.在data节点创建了一些键值:代码如下(示例): 2.解释 二.使用命令行创建1.创建了一个名为 person 的键值:代码如下(示例): 2.解释3.创建了一个 index.html 文件&…...
docker上安装es
安装docker 1 安装docker依赖 yum install -y yum-utils2 设置docker仓库镜像地址 yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo3 安装制定版本的docker yum -y install docker-ce-20.10.17-3.el74 查看是否安装成功 y…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

SpringTask-03.入门案例
一.入门案例 启动类: package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
起重机起升机构的安全装置有哪些?
起重机起升机构的安全装置是保障吊装作业安全的关键部件,主要用于防止超载、失控、断绳等危险情况。以下是常见的安全装置及其功能和原理: 一、超载保护装置(核心安全装置) 1. 起重量限制器 功能:实时监测起升载荷&a…...

python可视化:俄乌战争时间线关键节点与深层原因
俄乌战争时间线可视化分析:关键节点与深层原因 俄乌战争是21世纪欧洲最具影响力的地缘政治冲突之一,自2022年2月爆发以来已持续超过3年。 本文将通过Python可视化工具,系统分析这场战争的时间线、关键节点及其背后的深层原因,全面…...
IP选择注意事项
IP选择注意事项 MTP、FTP、EFUSE、EMEMORY选择时,需要考虑以下参数,然后确定后选择IP。 容量工作电压范围温度范围擦除、烧写速度/耗时读取所有bit的时间待机功耗擦写、烧写功耗面积所需要的mask layer...
中国政务数据安全建设细化及市场需求分析
(基于新《政务数据共享条例》及相关法规) 一、引言 近年来,中国政府高度重视数字政府建设和数据要素市场化配置改革。《政务数据共享条例》(以下简称“《共享条例》”)的发布,与《中华人民共和国数据安全法》(以下简称“《数据安全法》”)、《中华人民共和国个人信息…...

使用VMware克隆功能快速搭建集群
自己搭建的虚拟机,后续不管是学习java还是大数据,都需要集群,java需要分布式的微服务,大数据Hadoop的计算集群,如果从头开始搭建虚拟机会比较费时费力,这里分享一下如何使用克隆功能快速搭建一个集群 先把…...
Python打卡训练营学习记录Day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...