腾讯mini项目-【指标监控服务重构】2023-08-12
今日已办
Watermill
Handler
将 4 个阶段的逻辑处理定义为 Handler
测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。
参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)
Middleware
ProfileCtx实现 context.Context 接口
// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""go.uber.org/zap""profile/internal/config""profile/internal/log""profile/internal/schema""time"
)// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {// Properties that can be called by inherited subclassesStatus intCtx context.ContextRouter *message.RouterEvent schema.EventAppID string // API 上报FetchScenario string // API 上报
}// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Return *ProfileContext
func NewProfileContext() *ProfileContext {profileCtx := &ProfileContext{Ctx: context.Background(),}profileCtx.init()return profileCtx
}// init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) init() {logger := watermill.NewStdLogger(false, false)saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers: []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler: kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup: config.Profile.GetString("kafka.group"),},logger,)if err != nil {log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.CorrelationID,middleware.Retry{MaxRetries: 3,InitialInterval: time.Millisecond * 100,Logger: logger,}.Middleware,middleware.Recoverer,)topic := "to_analyzer__0.PERF_CRASH"router.AddNoPublisherHandler("WriteKafka", topic, subscriber, profileCtx.WriteKafka).AddMiddleware(profileCtx.UnpackKafkaMessage,profileCtx.InitPerformanceEvent,profileCtx.AnalyzeEvent,)profileCtx.Router = router
}// Run
// @Description
// @Author xzx 2023-08-12 13:52:53
func (profileCtx *ProfileContext) Run() {// router.Run contains defer cancel()if err := profileCtx.Router.Run(profileCtx.Ctx); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}func (profileCtx *ProfileContext) Done() <-chan struct{} {return profileCtx.Ctx.Done()
}func (profileCtx *ProfileContext) Err() error {return profileCtx.Ctx.Err()
}func (profileCtx *ProfileContext) Deadline() (deadline time.Time, ok bool) {return profileCtx.Ctx.Deadline()
}func (profileCtx *ProfileContext) Value(key any) any {return profileCtx.Ctx.Value(key)
}
【测试】前 3 个阶段处理为 Middleware,最后一个阶段为 Handler
// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("encoding/json""github.com/ThreeDotsLabs/watermill/message""github.com/segmentio/kafka-go""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""profile/internal/state"
)// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {// 反序列化,存入通用结构体if contextErr := json.Unmarshal(message.Payload, &profileCtx.Event); contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn h(message)}log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))message.SetContext(profileCtx)return h(message)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)if contextErr != nil {profileCtx.Status = state.StatusEventFactoryErrorreturn h(message)}log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", profileCtx.Event))profileCtx.Event.ProfileData = eventmessage.SetContext(profileCtx)return h(message)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)contextErr := profileCtx.Event.ProfileData.Analyze()if contextErr != nil {profileCtx.Status = state.StatusAnalyzeErrorreturn h(message)}log.Logger.Info("[AnalyzeEvent] analyze event success", zap.Any("event", profileCtx.Event))// clear dimensions and valuesprofileCtx.Event.Dimensions = nilprofileCtx.Event.Values = nilmessage.SetContext(profileCtx)return h(message)}
}// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {profileCtx = msg.Context().(*ProfileContext)toWriteBytes, contextErr := json.Marshal(profileCtx.Event)if contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn}topic := connector.GetTopic(profileCtx.Event.Category)contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafka.Message{Topic: topic,Key: []byte(profileCtx.Event.ID),Value: toWriteBytes,})if contextErr != nil {profileCtx.Status = state.StatusWriteKafkaErrorreturn}log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))return
}
可以实现正常的效果
Router
- 目前的 topic 是固定写死的,要考虑正则表达式,将不同 topic 的共有逻辑抽出为 Middleware,特定逻辑抽出为 Handler
- 消息处理似乎不是并发的
pub/sub kafka-go
-
custom pub/sub
-
Kafka Pub/Sub for the Watermill project, based on Shopify’s Sarama
-
qiulin/watermill-kafkago: Kafka Pub/Sub for the Watermill project, based on segmentio/kafka-go (github.com)
明日待办
- 组内开会
- 继续开发需求
相关文章:

腾讯mini项目-【指标监控服务重构】2023-08-12
今日已办 Watermill Handler 将 4 个阶段的逻辑处理定义为 Handler 测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。 参考https://watermill.io/docs/messages-router/实现不同topic&am…...
kubeadm部署k8sv1.24使用cri-docker做为CRI
目的 测试使用cri-docker做为containerd和docker的中间层垫片。 规划 IP系统主机名10.0.6.5ubuntu 22.04.3 jammymaster01.kktb.org10.0.6.6ubuntu 22.04.3 jammymaster02.kktb.org10.0.6.7ubuntu 22.04.3 jammymaster03.kktb.org 配置 步骤: 系统优化 禁用sw…...

在c#中使用CancellationToken取消任务
目录 🚀介绍: 🐤简单举例 🚀IsCancellationRequested 🚀ThrowIfCancellationRequested 🐤在控制器中使用 🚀通过异步方法的参数使用cancellationToken 🚀api结合ThrowIfCancel…...

【项目经验】:elementui多选表格默认选中
一.需求 在页面刚打开就默认选中指定项。 二.方法Table Methods toggleRowSelection用于多选表格,切换某一行的选中状态,如果使用了第二个参数,则是设置这一行选中与否(selected 为 true 则选中)row, selected 详细…...

外星人入侵游戏-(创新版)
🌈write in front🌈 🧸大家好,我是Aileen🧸.希望你看完之后,能对你有所帮助,不足请指正!共同学习交流. 🆔本文由Aileen_0v0🧸 原创 CSDN首发🐒 如…...

HTML 学习笔记(基础)
它是超文本标记语言,由一大堆约定俗成的标签组成,而其标签里一般又有一些属性值可以设置。 W3C标准:网页主要三大部分 结构:HTML表现:CSS行为:JavaScript <!DOCTYPE html> <html lang"zh-…...

最小二乘法
Least Square Method 1、相关的矩阵公式2、线性回归3、最小二乘法3.1、损失函数(Loss Function)3.2、多维空间的损失函数3.3、解析法求解3.4、梯度下降法求解 1、相关的矩阵公式 P r e c o n d i t i o n : ξ ∈ R n , A ∈ R n ∗ n i : σ A ξ σ ξ…...

使用stelnet进行安全的远程管理
1. telnet有哪些不足? 2.ssh如何保证数据传输安全? 需求:远程telnet管理设备 用户定义需要在AAA模式下: 开启远程登录的服务:定义vty接口 然后从R2登录:是可以登录的 同理R3登录: 在R1也可以查…...

python 二手车数据分析以及价格预测
二手车交易信息爬取、数据分析以及交易价格预测 引言一、数据爬取1.1 解析数据1.2 编写代码爬1.2.1 获取详细信息1.2.2 数据处理 二、数据分析2.1 统计分析2.2 可视化分析 三、价格预测3.1 价格趋势分析(特征分析)3.2 价格预测 引言 本文着眼于车辆信息,结合当下较…...

JAVA医药进销存管理系统(附源码+调试)
JAVA医药进销存管理系统 功能描述 (1)登录模块:登录信息等存储在数据库中 (2)基本信息模块:分为药品信息模块、客户情况模块、供应商情况模块; (3)业务管理模块&#x…...
H5 <blockquote> 标签
主要应用于:内容引用 标签定义及使用说明 <blockquote> 标签定义摘自另一个源的块引用。 浏览器通常会对 <blockquote> 元素进行缩进。 提示和注释 提示:如果标记是不需要段落分隔的短引用,请使用 <q>。 HTML 4.01 与 H…...

nginx配置指南
nginx.conf配置 找到Nginx的安装目录下的nginx.conf文件,该文件负责Nginx的基础功能配置。 配置文件概述 Nginx的主配置文件(conf/nginx.conf)按以下结构组织: 配置块功能描述全局块与Nginx运行相关的全局设置events块与网络连接有关的设置http块代理…...

【数据结构】优先级队列(堆)
文章目录 💐1. 优先级队列1.1 概念 💐2.堆的概念及存储方式2.1 什么是堆2.2 为什么要用完全二叉树描述堆呢?2.3 为什么说堆是在完全二叉树的基础上进行的调整?2.4 使用数组还原完全二叉树 💐3. 堆的常用操作-模拟实现3…...

前端笔试2
1.下面哪一个是检验对象是否有一个以自身定义的属性? foo.hasOwnProperty("bar")bar in foo foo["bar"] ! undefinedfoo.bar ! null 解析: bar in foo 检查 foo 对象是否包含名为 bar 的属性,但是这个属性可以是从原型链继承来的&a…...
LeetCode:66.加一
66.加一 来源:力扣(LeetCode) 链接: https://leetcode.cn/problems/plus-one/description/ 给定一个由 整数 组成的 非空 数组所表示的非负整数,在该数的基础上加一。 最高位数字存放在数组的首位, 数组中每个元素只存储单个数字。 你可以假设除了整数 0 之外,这个整数…...

Redis 常用命令
目录 全局命令 1)keys 2)exists 3) del(delete) 4)expire 5)type SET命令 GET命令 MSET 和 MGET命令 其他SET命令 计数命令 redis-cli,进入redis 最核心的命令:我们这里只是先介绍 set 和 get 最简单的操作…...
Integer.valueOf()用于字符和字符串的区别
LeetCode 17 电话号码的字母组合 先贴代码 class Solution {List<String> result new ArrayList<>();String temp new String("");Integer num;public List<String> letterCombinations(String digits) {dfs(digits, 0);return result;} publi…...

机械寿命预测(基于NASA C-MAPSS数据的剩余使用寿命RUL预测,Python代码,CNN_LSTM模型,有详细中文注释)
1.效果视频:机械寿命预测(NASA涡轮风扇发动机剩余使用寿命RUL预测,Python代码,CNN_LSTM模型,有详细中文注释)_哔哩哔哩_bilibili 环境库版本: 2.数据来源:https://www.nasa.gov/int…...

ConfigMaps-1
文章目录 主要内容一.使用 YAML 文件创建1.在data节点创建了一些键值:代码如下(示例): 2.解释 二.使用命令行创建1.创建了一个名为 person 的键值:代码如下(示例): 2.解释3.创建了一个 index.html 文件&…...
docker上安装es
安装docker 1 安装docker依赖 yum install -y yum-utils2 设置docker仓库镜像地址 yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo3 安装制定版本的docker yum -y install docker-ce-20.10.17-3.el74 查看是否安装成功 y…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...

centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...

srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...

从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...

SpringCloudGateway 自定义局部过滤器
场景: 将所有请求转化为同一路径请求(方便穿网配置)在请求头内标识原来路径,然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...
聊一聊接口测试的意义有哪些?
目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开,首…...