当前位置: 首页 > news >正文

RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警

心得的体会

刚过了年刚开工,闲暇之余调研了分布式SQL流处理数据库–RisingWave,本人是Flink(包括FlinkSQL和Flink DataStream API)的资深用户,但接触到RisingWave令我眼前一亮,并且拿我们生产上的监控告警场景在RisingWave上做了验证,以下是自己的心得体会:

RisingWave架构简单,运维成本底,基于云原生(可以分别基于计算和存储动态伸缩),同时在开发上屏蔽了Flink等实时处理框架底层需要处理的一些技术细节(状态存储,数据一致性,分布式集群扩展等)。提供了与PostgreSQL兼容的标准SQL接口,用户可以像使用 PostgreSQL 一样处理数据流。并且RisingWave不单单可以处理流式数据,还提供了数据其他流式处理框架(如:Flink、storm)所不具备的数据存储能力,基本可以完全取代FlinkSQL。相对于其他OLAP系统(如:apache doris,starrocks),RisingWave采用同步实时,可以保证实时的新鲜度;强一致性,而不是最终一致性。用户需要做的仅仅是通过开发SQL就可以处理流数据,当然首先需要具备流式数据处理思维(相对于离线)。

RisingWave当然也有自身的不足,相对于Flink可以通过DataStream API自定义灵活的处理流式数据,RisingWave只能解决一些特定的流式场景,无法做太多定制开发;相对于Apache Doris等OLAP实时分析性数据库,RisingWave不适合做分析型随机查询。另外RisingWave是个新事物,正在发展阶段,周边生态和相关文档还不健全,作为尝鲜者可能会踩很多坑。然而令人欣慰的是RisingWave的社区回复还是很及时的,RisingWave官方投入了很多精力在做RisingWave的布道和答疑。

至于争论比较厉害的RisingWav VS FLink的性能和吞吐量上孰优孰劣,针对不同应用场景可能有不同表现,因此没有亲自调研就没有发言权。但我认为在不同的场景下他们应该有各自的优势。无论如何RisingWave部署简单,上手容易,试错成本低是一个不争的事实。RisingWave可以应用在一些数据看版,监控,实时指标等场景。

利用动态和时间过滤器实现监控告警

FlinkSQL解决不了定时触发的问题,FlinkSQL的流处理逻辑只是按event触发,不能按时间条件触发,也就是没有触发器机制。FlinkSQL窗口的定时触发,归根结底也是基于event触发,event驱动的机制。因此需要触发器的场景就需要用到Flink DataStream API的KeyedProcessFunction等算子。但RisingWave利用Dynamic filters 和 Temporal filters 可以间接实现类似场景的触发器机制。

场景描述

现有如下群消息实时指标监控场景:
数据有初始化(init)、查询(query)、回调(callback:succ+fail)三种先后顺序状态。
数据是按预设时间批次分组的,例如:2024-01-01 08:00:00、2024-01-01 08:30:00,实时统计每一个批次内三种不同状态的数据count。

监控指标一:在某一个批次延迟指定的时间(query_timeout)之内(例如:2024-01-01 08:00:00延迟1小时触发时间为系统时间2024-01-01 09:00:00),该批次的query状态数据count没有达到init状态的数量count阀值(即query_count<init_count*query_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count等

监控指标二:如果指标一告警没有被触发,该批次在满足query状态数据count达到init状态数量count的阀值(即query_count>=init_countquery_threshold)以后,在指定的延迟时间内(callback_timeout),该批次的callback状态数据count没有达到query状态的数量count阀值(即callback_count<query_countcallback_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count、callback_count等

群消息实时指标监控流程图如下:
群消息实时指标监控流程图

实例demo

RisingWave部署可以参考:RisingWave分布式SQL流处理数据库调研

假设:
query_threshold=1, callback_count=1
query_timeout= ‘5 minute’, callback_timeout= ‘1 minute’
0->init,1->query,2->callback

RisingWave SQL:

 
DROP TABLE t_msg;
CREATE TABLE t_msg(msg_id int,status smallint ,public_time timestamp,process_time timestamp as proctime()
) APPEND ONLY;set timezone = 'PRC';--PRC(People’s Republic of China)
show timezone;select * from t_msg;--统计不同状态的count
DROP MATERIALIZED VIEW mv_t_msg_groupby; 
CREATE MATERIALIZED VIEW mv_t_msg_groupby AS
SELECT public_time
,sum(case when status = 0  then 1 else 0 end) AS init_count
,sum(case when status = 1  then 1 else 0 end) AS query_count
,sum(case when status = 2  then 1 else 0 end) AS callback_count
,max(process_time) as process_time
FROM t_msg 
group by public_time;select * from mv_t_msg_groupby;--sink_query_alarm 
DROP SINK sink_query_alarm;
CREATE SINK sink_query_alarm AS 
SELECT public_time
,init_count
,query_count
,process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   init_count*1 > query_count --query_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--由于RisingWave不支持在【MATERIALIZED VIEW】和【SINK】等【可伸缩流】中指定处理时间字段,因此需要借助外部存储kafka周转
--RisngWave官方给的解释:support a proctime on an append only stream might be easier but on retractable stream could take extra cost. We must think it carefully to introduce such a feature.--sink_query_succ 
DROP SINK sink_query_succ;
CREATE SINK sink_query_succ AS 
SELECT public_time
,init_count
,query_count
,callback_count
,process_time as query_succ_process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' >= now()   --query_timeout=1 minute, 在指定的时间内,【Delete and clean expired data】
and   init_count*1 <= query_count --query_threshold=1,query_count达到了指定值
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_succ'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--source连接器
DROP SOURCE source_query_succ;
CREATE SOURCE IF NOT EXISTS source_query_succ (init_count int,query_count int ,callback_count int ,public_time timestamp,query_succ_process_time timestamp
)
WITH (connector='kafka',topic='t_sink_query_succ',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='earliest', -- earliest ,latest,default:earliest 
) FORMAT PLAIN ENCODE JSON;select * from source_query_succ;--sink_callback_alarm,用到动态过滤器和时间过滤器
DROP SINK sink_callback_alarm;
CREATE SINK sink_callback_alarm AS 
WITH tmp AS ( 
select public_time, min(query_succ_process_time) as query_succ_process_time  -- 动态过滤器
FROM source_query_succ 
group by public_time
)
SELECT b.public_time
,b.init_count
,b.query_count
,b.callback_count
,b.process_time
,a.query_succ_process_time
FROM  tmp a
JOIN  mv_t_msg_groupby b ON a.public_time=b.public_time
where a.query_succ_process_time + INTERVAL '1 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   b.query_count*1 > b.callback_count --callback_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_callback_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);-- 模拟数据
--init
INSERT INTO t_msg values(1,0,'2024-02-23 15:55:00'::TIMESTAMP); --比当前系统时间早
INSERT INTO t_msg values(2,0,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,0,'2024-02-23 15:55:00'::TIMESTAMP);
--query                                
INSERT INTO t_msg values(1,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,1,'2024-02-23 15:55:00'::TIMESTAMP);
--callback                             
INSERT INTO t_msg values(1,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,2,'2024-02-23 15:55:00'::TIMESTAMP);

查看监控结果:

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_query_alarm -C 
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_callback_alarm -C 

相关文章:

RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警

心得的体会 刚过了年刚开工&#xff0c;闲暇之余调研了分布式SQL流处理数据库–RisingWave&#xff0c;本人是Flink&#xff08;包括FlinkSQL和Flink DataStream API&#xff09;的资深用户&#xff0c;但接触到RisingWave令我眼前一亮&#xff0c;并且拿我们生产上的监控告警…...

【Qt学习】QRadioButton 的介绍与使用(性别选择、模拟点餐)

文章目录 介绍实例使用实例1&#xff08;性别选择 - 单选 隐藏&#xff09;实例2&#xff08;模拟点餐&#xff0c;多组单选&#xff09; 相关资源文件 介绍 这里简单对QRadioButton类 进行介绍&#xff1a; QRadioButton 继承自 QAbstractButton &#xff0c;用于创建单选按…...

基于java springboot的图书管理系统设计和实现

基于java springboot的图书管理系统设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 欢迎点赞 收藏 ⭐留言 文末获取源码联…...

自定义类型:联合和枚举

目录 1. 联合体 1.1 联合体类型的声明及特点 1.2 相同成员的结构体和联合体对比 1.3 联合体大小的计算 1.4 联合体的应用举例 2. 枚举类型 2.1 枚举类型的声明 2.2 枚举类型的优点 1. 联合体 1.1 联合体类型的声明及特点 像结构体一样&#xff0c;联合体也是由一个或…...

每日一学—由面试题“Redis 是否为单线程”引发的思考

文章目录 &#x1f4cb; 前言&#x1f330; 举个例子&#x1f3af; 什么是 Redis&#xff08;知识点补充&#xff09;&#x1f3af; Redis 中的多线程&#x1f3af; I/O 多线程&#x1f3af; Redis 中的多进程&#x1f4dd; 结论&#x1f3af;书籍推荐&#x1f525;参与方式 &a…...

chatGPT PLUS 绑卡提示信用卡被拒的解决办法

chatGPT PLUS 绑卡提示信用卡被拒的解决办法 一、 ChatGPT Plus介绍 作为人工智能领域的一项重要革新&#xff0c;ChatGPT Plus的上线引起了众多用户的关注&#xff0c;其背后的OpenAI表现出傲娇的态度&#xff0c;被誉为下一个GTP 4.0。总的来说&#xff0c;ChatGPT Plus的火…...

opencv鼠标操作与响应

//鼠标事件 Point sp(-1, -1); Point ep(-1, -1); Mat temp; static void on_draw(int event, int x, int y, int flags, void *userdata) {Mat image *((Mat*)userdata);if (event EVENT_LBUTTONDOWN) {sp.x x;sp.y y;std::cout << "start point:"<<…...

vue里echarts的使用:画饼图和面积折线图

vue里echarts的使用,我们要先安装echarts,然后在main.js里引入: //命令安装echarts npm i echarts//main.js里引入挂载到原型上 import echarts from echarts Vue.prototype.$echarts = echarts最终我们实现的效果如下: 头部标题这里我们封装了一个全局公共组件common-he…...

个人建站前端篇(六)插件unplugin-auto-import的使用

vue3日常项目中定义变量需要引入ref,reactive等等比较麻烦&#xff0c;可以通过unplugin-auto-import给我们自动引入 * unplugin-auto-import 解决了vue3-hook、vue-router、useVue等多个插件的自动导入&#xff0c;也支持自定义插件的自动导入&#xff0c;是一个功能强大的typ…...

【Python】 剪辑法欠采样 CNN压缩近邻法欠采样

借鉴&#xff1a;关于K近邻&#xff08;KNN&#xff09;&#xff0c;看这一篇就够了&#xff01;算法原理&#xff0c;kd树&#xff0c;球树&#xff0c;KNN解决样本不平衡&#xff0c;剪辑法&#xff0c;压缩近邻法 - 知乎 但是不要看他里面的代码&#xff0c;因为作者把代码…...

springmvc+ssm+springboot房屋中介服务平台的设计与实现 i174z

本论文拟采用计算机技术设计并开发的房屋中介服务平台&#xff0c;主要是为用户提供服务。使得用户可以在系统上查看房屋出租、房屋出售、房屋求购、房屋求租&#xff0c;管理员对信息进行统一管理&#xff0c;与此同时可以筛选出符合的信息&#xff0c;给笔者提供更符合实际的…...

挑战30天学完Python:Day19 文件处理

&#x1f4d8; Day 19 &#x1f389; 本系列为Python基础学习&#xff0c;原稿来源于 30-Days-Of-Python 英文项目&#xff0c;大奇主要是对其本地化翻译、逐条验证和补充&#xff0c;想通过30天完成正儿八经的系统化实践。此系列适合零基础同学&#xff0c;或仅了解Python一点…...

Spring Boot application.properties和application.yml文件的配置

在Spring Boot中&#xff0c;application.properties 和 application.yml 文件用于配置应用程序的各个方面&#xff0c;如服务器端口、数据库连接、日志级别等。这两个文件是Spring Boot的配置文件&#xff0c;位于 src/main/resources 目录下。 application.properties 示例 …...

Unity单元测试

Unity单元测试是一个专门用于嵌入式单元测试的库, 现在简单讲下移植以及代码结构. 源码地址: GitHub - ThrowTheSwitch/Unity: Simple Unit Testing for C 1.我们只需要移植三个文件即可: unity.c, unity.h, unity_internals.h 2.然后添加需要测试的函数. 3.在main.c中添加…...

Spring Bean 的生命周期了解么?

Spring Bean 的生命周期基本流程 一个Spring的Bean从出生到销毁的全过程就是他的整个生命周期, 整个生命周期可以大致分为3个大的阶段 : 创建 使用 销毁 还可以分为5个小步骤 : 实例化(Bean的创建) , 初始化赋值, 注册Destruction回调 , Bean的正常使用 以及 Bean的销毁 …...

.ryabina勒索病毒数据怎么处理|数据解密恢复

导言&#xff1a; 随着网络安全威胁的不断增加&#xff0c;勒索软件已成为严重的威胁之一&#xff0c;.ryabina勒索病毒是其中之一。本文将介绍.ryabina勒索病毒的特点、数据恢复方法和预防措施&#xff0c;以帮助用户更好地应对这一威胁。当面对被勒索病毒攻击导致的数据文件…...

上网行为监控软件能够看到聊天内容吗

随着信息技术的不断发展&#xff0c;上网行为监控软件在企业网络安全管理中扮演着越来越重要的角色。 这类软件主要用于监控员工的上网行为&#xff0c;以确保工作效率和网络安全。 而在这其中&#xff0c;域智盾软件作为一款知名的上网行为监控软件&#xff0c;其功能和使用…...

Java知识点一

hello&#xff0c;大家好&#xff01;我们今天开启Java语言的学习之路&#xff0c;与C语言的学习内容有些许异同&#xff0c;今天我们来简单了解一下Java的基础知识。 一、数据类型 分两种&#xff1a;基本数据类型 引用数据类型 &#xff08;1&#xff09;整型 八种基本数…...

Django学习笔记-forms使用

1.创建forms.py文件,导入包 from django import forms from django.forms import fields from django.forms import widgets2. 创建EmployeeForm,继承forms.Form 3.创建testform.html文件 4.urls.py添加路由 5.views中导入forms 创建testform,编写代码 1).如果请求方式为GET,…...

BM100 设计LRU缓存结构(java实现)

一、题目 设计LRU(最近最少使用)缓存结构&#xff0c;该结构在构造时确定大小&#xff0c;假设大小为 capacity &#xff0c;操作次数是 n &#xff0c;并有如下功能: Solution(int capacity) 以正整数作为容量 capacity 初始化 LRU 缓存get(key)&#xff1a;如果关键字 key …...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

Oracle查询表空间大小

1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

【配置 YOLOX 用于按目录分类的图片数据集】

现在的图标点选越来越多&#xff0c;如何一步解决&#xff0c;采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集&#xff08;每个目录代表一个类别&#xff0c;目录下是该类别的所有图片&#xff09;&#xff0c;你需要进行以下配置步骤&#x…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

Unit 1 深度强化学习简介

Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库&#xff0c;例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体&#xff0c;比如 SnowballFight、Huggy the Do…...

多种风格导航菜单 HTML 实现(附源码)

下面我将为您展示 6 种不同风格的导航菜单实现&#xff0c;每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

【从零学习JVM|第三篇】类的生命周期(高频面试题)

前言&#xff1a; 在Java编程中&#xff0c;类的生命周期是指类从被加载到内存中开始&#xff0c;到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期&#xff0c;让读者对此有深刻印象。 目录 ​…...