腾讯mini项目-【指标监控服务重构】2023-08-16
今日已办
v1
验证 StageHandler 在处理消息时是否为单例,【错误尝试】
type StageHandler struct {
}func (s StageHandler) Middleware1(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 1")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware2(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 2")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware3(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 3")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Handler1(msg *message.Message) error {log.Logger.Info("StageHandler Handler 1")fmt.Printf("%p\n", &s)return nil
}

v2
- 定义不同 Handler
type CrashHandler struct {Topic string
}func (s CrashHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": CrashHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": CrashHandler Handler 1 end")return nil
}type LagHandler struct {Topic string
}func (s LagHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": LagHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": LagHandler Handler 1 end")return nil
}
- 添加到router中
for _, topic := range topics {var category stringvar handlerFunc message.NoPublishHandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashHandler{Topic: category}.Handler1} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagHandler{Topic: category}.Handler1} else {continue}handler := router.AddNoPublisherHandler(topic+"test-handler", topic, subscriber, handlerFunc)}

- 结论
- handler 实例会不断创建
- 不同的 handler 可以并行处理不同主题的消息
- 相同的 handler 在处理该主题的消息时是顺序的
官方文档: Message Router (watermill.io)
订阅者可以一次消费一条消息,也可以并行消费多条消息
- Single stream of messages 是最简单的方法,这意味着在调用
msg.Ack()之前,订阅者将不会收到任何新消息 - Multiple message streams 仅部分订阅者支持。通过一次订阅多个主题分区,可以并行消费多条消息,甚至是之前未确认的消息(例如,Kafka 订阅者就是这样工作的) Router 通过运行并发 HandlerFuncs(每个分区一个)来处理此模型
v3
存在并发安全问题
- 公用一个上下文
- 频繁的修改上下文中的字段值
- 不同Handler和MiddleWare存在并发
解决思路
- 将一次消息处理会使用到的数据集合定义为一个结构体
type ContextData struct {Status intEvent schema.EventAppID string // API 上报FetchScenario string // API 上报
}
- 使用message的Context来传递这个数据

- 移除掉 ProfileCtx 的相关设计
- 使用
watermillzap.Logger来替换本身的LoggerAdapter,更加直观且与原项目适配
完整代码
profile/internal/watermill/consumer/consumer_context.go
// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context"kc "github.com/Kevinello/kafka-client""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""github.com/garsue/watermillzap""github.com/qiulin/watermill-kafkago/pkg/kafkago""go.uber.org/zap""profile/internal/config""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""strings""time"
)// Consume
// @Description
// @Author xzx 2023-08-16 22:52:52
func Consume() {logger := watermillzap.NewLogger(log.Logger)publisher, subscriber := newPubSub(logger)router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Fatal("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.Retry{MaxRetries: 3,InitialInterval: time.Millisecond * 100,Logger: logger,}.Middleware,middleware.Recoverer,)getTopics := kc.GetTopicReMatch(strings.Split(config.Profile.GetString("kafka.topicRE"), ","))topics, err := getTopics(config.Profile.GetString("kafka.bootstrap"))if err != nil {log.Logger.Fatal("get topics failed", zap.Error(err))return}for _, topic := range topics {var category stringvar handlerFunc message.HandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashWriteKafka} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagWriteKafka} else {continue}router.AddHandler(category, topic, subscriber, connector.GetTopic(category), publisher, handlerFunc).AddMiddleware(UnpackKafkaMessage,InitPerformanceEvent,AnalyzeEvent)}if err = router.Run(context.Background()); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}// newPubSub
// @Description
// @Author xzx 2023-08-16 22:52:45
// @Param logger
// @Return message.Publisher
// @Return message.Subscriber
func newPubSub(logger watermill.LoggerAdapter) (message.Publisher, message.Subscriber) {marshaler := kafkago.DefaultMarshaler{}publisher := kafkago.NewPublisher(kafkago.PublisherConfig{Brokers: []string{config.Profile.GetString("kafka.bootstrap")},Async: false,Marshaler: marshaler,OTELEnabled: false,Ipv4Only: true,Timeout: 100 * time.Second,}, logger)subscriber, err := kafkago.NewSubscriber(kafkago.SubscriberConfig{Brokers: []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler: marshaler,ConsumerGroup: config.Profile.GetString("kafka.group"),OTELEnabled: false,}, logger)if err != nil {log.Logger.Fatal("Unable to create subscriber", zap.Error(err))}return publisher, subscriber
}
profile/internal/watermill/consumer/consumer_stage.go
// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("context""encoding/json""github.com/ThreeDotsLabs/watermill/message""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema""profile/internal/schema/performance""profile/internal/state"
)type ContextData struct {Status intEvent schema.EventAppID string // API 上报FetchScenario string // API 上报
}// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {var data ContextData// 反序列化,存入通用结构体if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}log.Logger.Info("[1-UnpackKafkaItem] unpack kafka item success", zap.Any("event", data.Event))msg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)if contextErr != nil {data.Status = state.StatusEventFactoryErrorreturn nil, contextErr}log.Logger.Info("[2-InitPerformanceEvent] Consume performance event success", zap.Any("event", data.Event))data.Event.ProfileData = eventmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)contextErr := data.Event.ProfileData.Analyze()if contextErr != nil {data.Status = state.StatusAnalyzeErrorreturn nil, contextErr}log.Logger.Info("[3-AnalyzeEvent] analyze event success", zap.Any("event", data.Event))// clear dimensions and valuesdata.Event.Dimensions = nildata.Event.Values = nilmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// CrashWriteKafka
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func CrashWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-CrashWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}func LagWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-LagWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}
测试
上报PERF_LAG Event可以并发处理 2 条消息,不必等待上一条消息处理完


多次测试发现是由于两条消息走了不同的 Handler

暂未修复,明明是同一主题的两条消息却都走了两条不同的链路,而且 publisher 最后写回的主题也是写到了不同的主题上,并且上报另一个类型的事件,即另一个主题的消息却无法触发消费者消费!
暂定先写死两个主题名称测试是否正常
明日待办
- 开会讨论项目规划和任务分工
- 继续完成需求
相关文章:
腾讯mini项目-【指标监控服务重构】2023-08-16
今日已办 v1 验证 StageHandler 在处理消息时是否为单例,【错误尝试】 type StageHandler struct { }func (s StageHandler) Middleware1(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Log…...
PTA:7-3 两个递增链表的差集
^两个递增链表的差集 题目输入样例输出样例 代码 题目 输入样例 5 1 3 5 7 9 3 2 3 5输出样例 3 1 7 9代码 #include <iostream> #include <list> #include <unordered_set> using namespace std; int main() {int n1, n2;cin >> n1;list<int&g…...
智能合约漏洞案例,DEI 漏洞复现
智能合约漏洞案例,DEI 漏洞复现 1. 漏洞简介 https://twitter.com/eugenioclrc/status/1654576296507088906 2. 相关地址或交易 https://explorer.phalcon.xyz/tx/arbitrum/0xb1141785b7b94eb37c39c37f0272744c6e79ca1517529fec3f4af59d4c3c37ef 攻击交易 3. …...
Attention is all you need 论文笔记
该论文引入Transformer,主要核心是自注意力机制,自注意力(Self-Attention)机制是一种可以考虑输入序列中所有位置信息的机制。 RNN介绍 引入RNN为了更好的处理序列信息,比如我 吃 苹果,前后的输入之间是有…...
Hdoop伪分布式集群搭建
文章目录 Hadoop安装部署前言1.环境2.步骤3.效果图 具体步骤(一)前期准备(1)ping外网(2)配置主机名(3)配置时钟同步(4)关闭防火墙 (二)…...
java临时文件
临时文件 有时候,我们程序运行时需要产生中间文件,但是这些文件只是临时用途,并不做长久保存。 我们可以使用临时文件,不需要长久保存。 public static File createTempFile(String prefix, String suffix)prefix 前缀 suffix …...
C++中的<string>头文件 和 <cstring>头文件简介
C中的<string>头文件 和 <cstring>头文件简介 在C中<string> 和 <cstring> 是两个不同的头文件。 <string> 是C标准库中的头文件,定义了一个名为std::string的类,提供了对字符串的操作如size()、length()、empty() 及字…...
安装MySQL
Centos7下安装MySQL详细步骤_centos7安装mysql教程_欢欢李的博客-CSDN博客...
输入学生成绩,函数返回最大元素的数组下标,求最高分学生成绩(输入负数表示输入结束)
scanfscore()函数用于输入学生的成绩 int scanfscore(int score[N])//输入学生的成绩 {int i -1;do {i;printf("输入学生成绩:");scanf("%d", &score[i]);} while (score[i] > 0);return i; } findmax()用于寻找最大值 int findmax(int score[N…...
常用音频接口:TDM,PDM,I2S,PCM
常用音频接口:TDM,PDM,I2S,PCM_tdm音频_沙漠的甲壳虫的博客-CSDN博客 I2S/PCM接口及音频codec_音频pcm接口模块设计-CSDN博客 2个TDM8功放调试ing_周龙(AI湖湘学派)的博客-CSDN博客 数字音频接口时序----IIS、TDM、PCM、PDM_td…...
git clone报错Failed to connect to github.com port 443 after 21055 ms:
git 设置代理端口号 git config --global http.proxy http://127.0.0.1:10085 和 git config --global https.proxy http://127.0.0.1:10085 然后就可以成功git clone hugging face的数据集了 如果是https://huggingface.co/datasets/shibing624/medical/tree/main 那么…...
【操作系统】深入浅出死锁问题
死锁的概念 在多线程编程中,我们为了防止多线程竞争共享资源而导致数据错乱,都会在操作共享资源而导致数据错乱,都会在操作共享资源之前加上互斥锁,只有成功获得到锁的线程,才能操作共享资源,获取不到锁的…...
springboot实现webSocket服务端和客户端demo
1:pom导入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><version>2.2.7.RELEASE</version></dependency>2:myWebSocketClien…...
代码走读: FFMPEG-ffplayer02
AVFrame int attribute_align_arg avcodec_receive_frame(AVCodecContext *avctx, AVFrame *frame) 选取一个音频解码器 和 一个视频解码器分别介绍该解码器功能 音频G722 g722dec.c -> g722_decode_frame 通过 ff_get_buffer 给 传入的 frame 指针分配内存 g722_decode_…...
【数据结构】——排序算法的相关习题
目录 一、选择题题型一 (插入排序)1、直接插入排序2、折半插入排序3、希尔排序 题型二(交换排序)1、冒泡排序2、快速排序 题型三(选择排序)1、简单选择排序~2、堆排序 ~题型四(归并排序…...
C高级day5(Makefile)
一、Xmind整理: 二、上课笔记整理: 1.#----->把带参宏的参数替换成字符串 #include <stdio.h> #include <stdlib.h> #include <string.h> #define MAX(a,b) a>b?a:b #define STR(n) #n int main(int argc, const char *argv…...
Android 系统中适配OAID获取
一、OAID概念 OAID(Open Anonymous Identification)是一种匿名身份识别标识符, 用于在移动设备上进行广告追踪和个性化广告投放。它是由中国移动通信集 团、中国电信集团和中国联通集团共同推出的一项行业标准 OAID值为一个64位的数字 二、…...
差分数组leetcode 2770 数组的最大美丽值
什么是差分数组 差分数组是一种数据结构,它存储的是一个数组每个相邻元素的差值。换句话说,给定一个数组arr[],其对应的差分数组diff[]将满足: diff[i] arr[i1] - arr[i] 对于所有 0 < i < n-1 差分数组的作用 用于高效…...
请求响应状态码
请求与响应&状态码 Requests部分 请求行、消息报头、请求正文。 Header解释示例Accept指定客户端能够接收的内容类型Accept: text/plain, text/htmlAccept-Chars et浏览器可以接受的字符编码集。Accept-Charset: iso-8859-5Accept-Encodi ng指定浏览器可以支持的web服务…...
安卓机型系统美化 Color.xml文件必备常识 自定义颜色资源
color.xml文件是Android工程中用来进行颜色资源管理的文件.可以在color.xml文件中通过<color>标签来定义颜色资源.我们在布局文件中、代码中、style定义中或者其他资源文件中,都可以引用之前在color.xml文件中定义的颜色资源。 将color.xml文件拷到res/value…...
Lattice FPGA开发实战:Diamond与ModelSim协同仿真环境搭建全攻略
1. 环境准备:软件安装与基础配置 第一次接触Lattice FPGA开发时,最头疼的就是仿真环境的搭建。我清楚地记得去年接手MachXO2项目时,光是让Diamond和ModelSim这两个"老伙计"协同工作就折腾了整整两天。不过别担心,跟着我…...
轻量级AI写作工坊:OpenClaw+nanobot内容创作流
轻量级AI写作工坊:OpenClawnanobot内容创作流 1. 为什么需要自动化写作助手 作为一名技术博主兼自媒体运营者,我每天都要面对内容创作的"三重压力":选题焦虑、写作耗时、发布繁琐。最痛苦的是,当我花两小时写完一篇技…...
微信无法登录时的恢复操作
本文记录 OpenClaw 中 openclaw-weixin 插件在登录态丢失、微信链接不可用、扫码登录失败时的恢复流程。2026-03-23 版本 OpenClaw 更新后曾出现微信插件失效,但在 2026-03-24 版本中已恢复。本文目标是先判断问题类型,再选择最小影响的修复方式,避免不必要的全量重装。 一、…...
用MediaPipe和Python做个隔空切水果游戏:从手势骨架提取到简单游戏逻辑实现
用MediaPipe和Python打造体感切水果游戏:从手势识别到游戏逻辑全解析 还记得小时候在街机厅玩《水果忍者》的畅快感吗?现在,我们完全可以用Python和MediaPipe技术,在电脑前通过手势隔空切水果!本文将带你从零开始&…...
3大核心步骤打造专属翻译引擎:Zotero PDF Translate高级扩展指南
3大核心步骤打造专属翻译引擎:Zotero PDF Translate高级扩展指南 【免费下载链接】zotero-pdf-translate 支持将PDF、EPub、网页内容、元数据、注释和笔记翻译为目标语言,并且兼容20多种翻译服务。 项目地址: https://gitcode.com/gh_mirrors/zo/zoter…...
从零搭建SRS流媒体服务器:实现RTMP推拉流的实战部署指南
1. 为什么选择SRS搭建流媒体服务器? 最近几年直播和实时视频的需求爆发式增长,很多开发者都在寻找轻量高效的流媒体服务器方案。我测试过不少开源方案,最终发现SRS(Simple Realtime Server)是最适合个人和小团队自建的…...
Qt串口通信实战:用QSerialPort从零搭建一个串口调试助手(附完整源码)
Qt串口通信实战:从零构建工业级调试助手 在嵌入式开发和工业控制领域,串口通信作为最基础也最可靠的通信方式之一,至今仍发挥着不可替代的作用。无论是单片机与上位机的数据交换,还是工业设备的参数配置,一个稳定高效的…...
Spring Boot 与 Serverless 集成最佳实践
Spring Boot 与 Serverless 集成最佳实践 引言 大家好,今天想和大家聊聊 Spring Boot 与 Serverless 的集成。Serverless 是一种云原生的计算模型,它允许开发者专注于代码开发,而不需要管理服务器基础设施。在 Spring Boot 应用中,…...
毕业论文党必看!用MathType实现Word公式自动编号的3种隐藏技巧
毕业论文公式排版终极指南:MathType高效编号技巧全解析 在撰写理工科毕业论文或学术论文时,公式排版往往是让研究者头疼的环节。传统手动编号不仅效率低下,更会在修改文档时引发连锁灾难——一个公式的增删可能导致全篇编号错乱。MathType作为…...
解锁汽车ECU诊断新可能:ECUBus-Pro开源工具的全场景应用指南
解锁汽车ECU诊断新可能:ECUBus-Pro开源工具的全场景应用指南 【免费下载链接】ECUBus ECU bus tool, UDS over CAN, CAN-FD, Ethernet and so on. 项目地址: https://gitcode.com/gh_mirrors/ec/ECUBus ECUBus-Pro是一款功能强大的开源汽车ECU开发工具&#…...
