RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警
心得的体会
刚过了年刚开工,闲暇之余调研了分布式SQL流处理数据库–RisingWave,本人是Flink(包括FlinkSQL和Flink DataStream API)的资深用户,但接触到RisingWave令我眼前一亮,并且拿我们生产上的监控告警场景在RisingWave上做了验证,以下是自己的心得体会:
RisingWave架构简单,运维成本底,基于云原生(可以分别基于计算和存储动态伸缩),同时在开发上屏蔽了Flink等实时处理框架底层需要处理的一些技术细节(状态存储,数据一致性,分布式集群扩展等)。提供了与PostgreSQL兼容的标准SQL接口,用户可以像使用 PostgreSQL 一样处理数据流。并且RisingWave不单单可以处理流式数据,还提供了数据其他流式处理框架(如:Flink、storm)所不具备的数据存储能力,基本可以完全取代FlinkSQL。相对于其他OLAP系统(如:apache doris,starrocks),RisingWave采用同步实时,可以保证实时的新鲜度;强一致性,而不是最终一致性。用户需要做的仅仅是通过开发SQL就可以处理流数据,当然首先需要具备流式数据处理思维(相对于离线)。
RisingWave当然也有自身的不足,相对于Flink可以通过DataStream API自定义灵活的处理流式数据,RisingWave只能解决一些特定的流式场景,无法做太多定制开发;相对于Apache Doris等OLAP实时分析性数据库,RisingWave不适合做分析型随机查询。另外RisingWave是个新事物,正在发展阶段,周边生态和相关文档还不健全,作为尝鲜者可能会踩很多坑。然而令人欣慰的是RisingWave的社区回复还是很及时的,RisingWave官方投入了很多精力在做RisingWave的布道和答疑。
至于争论比较厉害的RisingWav VS FLink的性能和吞吐量上孰优孰劣,针对不同应用场景可能有不同表现,因此没有亲自调研就没有发言权。但我认为在不同的场景下他们应该有各自的优势。无论如何RisingWave部署简单,上手容易,试错成本低是一个不争的事实。RisingWave可以应用在一些数据看版,监控,实时指标等场景。
利用动态和时间过滤器实现监控告警
FlinkSQL解决不了定时触发的问题,FlinkSQL的流处理逻辑只是按event触发,不能按时间条件触发,也就是没有触发器机制。FlinkSQL窗口的定时触发,归根结底也是基于event触发,event驱动的机制。因此需要触发器的场景就需要用到Flink DataStream API的KeyedProcessFunction等算子。但RisingWave利用Dynamic filters 和 Temporal filters 可以间接实现类似场景的触发器机制。
场景描述
现有如下群消息实时指标监控场景:
数据有初始化(init)、查询(query)、回调(callback:succ+fail)三种先后顺序状态。
数据是按预设时间批次分组的,例如:2024-01-01 08:00:00、2024-01-01 08:30:00,实时统计每一个批次内三种不同状态的数据count。
监控指标一:在某一个批次延迟指定的时间(query_timeout)之内(例如:2024-01-01 08:00:00延迟1小时触发时间为系统时间2024-01-01 09:00:00),该批次的query状态数据count没有达到init状态的数量count阀值(即query_count<init_count*query_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count等
监控指标二:如果指标一告警没有被触发,该批次在满足query状态数据count达到init状态数量count的阀值(即query_count>=init_countquery_threshold)以后,在指定的延迟时间内(callback_timeout),该批次的callback状态数据count没有达到query状态的数量count阀值(即callback_count<query_countcallback_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count、callback_count等
群消息实时指标监控流程图如下:

实例demo
RisingWave部署可以参考:RisingWave分布式SQL流处理数据库调研
假设:
query_threshold=1, callback_count=1
query_timeout= ‘5 minute’, callback_timeout= ‘1 minute’
0->init,1->query,2->callback
RisingWave SQL:
DROP TABLE t_msg;
CREATE TABLE t_msg(msg_id int,status smallint ,public_time timestamp,process_time timestamp as proctime()
) APPEND ONLY;set timezone = 'PRC';--PRC(People’s Republic of China)
show timezone;select * from t_msg;--统计不同状态的count
DROP MATERIALIZED VIEW mv_t_msg_groupby;
CREATE MATERIALIZED VIEW mv_t_msg_groupby AS
SELECT public_time
,sum(case when status = 0 then 1 else 0 end) AS init_count
,sum(case when status = 1 then 1 else 0 end) AS query_count
,sum(case when status = 2 then 1 else 0 end) AS callback_count
,max(process_time) as process_time
FROM t_msg
group by public_time;select * from mv_t_msg_groupby;--sink_query_alarm
DROP SINK sink_query_alarm;
CREATE SINK sink_query_alarm AS
SELECT public_time
,init_count
,query_count
,process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' <= now() --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and init_count*1 > query_count --query_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--由于RisingWave不支持在【MATERIALIZED VIEW】和【SINK】等【可伸缩流】中指定处理时间字段,因此需要借助外部存储kafka周转
--RisngWave官方给的解释:support a proctime on an append only stream might be easier but on retractable stream could take extra cost. We must think it carefully to introduce such a feature.--sink_query_succ
DROP SINK sink_query_succ;
CREATE SINK sink_query_succ AS
SELECT public_time
,init_count
,query_count
,callback_count
,process_time as query_succ_process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' >= now() --query_timeout=1 minute, 在指定的时间内,【Delete and clean expired data】
and init_count*1 <= query_count --query_threshold=1,query_count达到了指定值
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_succ'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--source连接器
DROP SOURCE source_query_succ;
CREATE SOURCE IF NOT EXISTS source_query_succ (init_count int,query_count int ,callback_count int ,public_time timestamp,query_succ_process_time timestamp
)
WITH (connector='kafka',topic='t_sink_query_succ',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='earliest', -- earliest ,latest,default:earliest
) FORMAT PLAIN ENCODE JSON;select * from source_query_succ;--sink_callback_alarm,用到动态过滤器和时间过滤器
DROP SINK sink_callback_alarm;
CREATE SINK sink_callback_alarm AS
WITH tmp AS (
select public_time, min(query_succ_process_time) as query_succ_process_time -- 动态过滤器
FROM source_query_succ
group by public_time
)
SELECT b.public_time
,b.init_count
,b.query_count
,b.callback_count
,b.process_time
,a.query_succ_process_time
FROM tmp a
JOIN mv_t_msg_groupby b ON a.public_time=b.public_time
where a.query_succ_process_time + INTERVAL '1 minute' <= now() --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and b.query_count*1 > b.callback_count --callback_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_callback_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);-- 模拟数据
--init
INSERT INTO t_msg values(1,0,'2024-02-23 15:55:00'::TIMESTAMP); --比当前系统时间早
INSERT INTO t_msg values(2,0,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,0,'2024-02-23 15:55:00'::TIMESTAMP);
--query
INSERT INTO t_msg values(1,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,1,'2024-02-23 15:55:00'::TIMESTAMP);
--callback
INSERT INTO t_msg values(1,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,2,'2024-02-23 15:55:00'::TIMESTAMP);
查看监控结果:
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_query_alarm -C
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_callback_alarm -C
相关文章:
RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警
心得的体会 刚过了年刚开工,闲暇之余调研了分布式SQL流处理数据库–RisingWave,本人是Flink(包括FlinkSQL和Flink DataStream API)的资深用户,但接触到RisingWave令我眼前一亮,并且拿我们生产上的监控告警…...
【Qt学习】QRadioButton 的介绍与使用(性别选择、模拟点餐)
文章目录 介绍实例使用实例1(性别选择 - 单选 隐藏)实例2(模拟点餐,多组单选) 相关资源文件 介绍 这里简单对QRadioButton类 进行介绍: QRadioButton 继承自 QAbstractButton ,用于创建单选按…...
基于java springboot的图书管理系统设计和实现
基于java springboot的图书管理系统设计和实现 博主介绍:5年java开发经验,专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 欢迎点赞 收藏 ⭐留言 文末获取源码联…...
自定义类型:联合和枚举
目录 1. 联合体 1.1 联合体类型的声明及特点 1.2 相同成员的结构体和联合体对比 1.3 联合体大小的计算 1.4 联合体的应用举例 2. 枚举类型 2.1 枚举类型的声明 2.2 枚举类型的优点 1. 联合体 1.1 联合体类型的声明及特点 像结构体一样,联合体也是由一个或…...
每日一学—由面试题“Redis 是否为单线程”引发的思考
文章目录 📋 前言🌰 举个例子🎯 什么是 Redis(知识点补充)🎯 Redis 中的多线程🎯 I/O 多线程🎯 Redis 中的多进程📝 结论🎯书籍推荐🔥参与方式 &a…...
chatGPT PLUS 绑卡提示信用卡被拒的解决办法
chatGPT PLUS 绑卡提示信用卡被拒的解决办法 一、 ChatGPT Plus介绍 作为人工智能领域的一项重要革新,ChatGPT Plus的上线引起了众多用户的关注,其背后的OpenAI表现出傲娇的态度,被誉为下一个GTP 4.0。总的来说,ChatGPT Plus的火…...
opencv鼠标操作与响应
//鼠标事件 Point sp(-1, -1); Point ep(-1, -1); Mat temp; static void on_draw(int event, int x, int y, int flags, void *userdata) {Mat image *((Mat*)userdata);if (event EVENT_LBUTTONDOWN) {sp.x x;sp.y y;std::cout << "start point:"<<…...
vue里echarts的使用:画饼图和面积折线图
vue里echarts的使用,我们要先安装echarts,然后在main.js里引入: //命令安装echarts npm i echarts//main.js里引入挂载到原型上 import echarts from echarts Vue.prototype.$echarts = echarts最终我们实现的效果如下: 头部标题这里我们封装了一个全局公共组件common-he…...
个人建站前端篇(六)插件unplugin-auto-import的使用
vue3日常项目中定义变量需要引入ref,reactive等等比较麻烦,可以通过unplugin-auto-import给我们自动引入 * unplugin-auto-import 解决了vue3-hook、vue-router、useVue等多个插件的自动导入,也支持自定义插件的自动导入,是一个功能强大的typ…...
【Python】 剪辑法欠采样 CNN压缩近邻法欠采样
借鉴:关于K近邻(KNN),看这一篇就够了!算法原理,kd树,球树,KNN解决样本不平衡,剪辑法,压缩近邻法 - 知乎 但是不要看他里面的代码,因为作者把代码…...
springmvc+ssm+springboot房屋中介服务平台的设计与实现 i174z
本论文拟采用计算机技术设计并开发的房屋中介服务平台,主要是为用户提供服务。使得用户可以在系统上查看房屋出租、房屋出售、房屋求购、房屋求租,管理员对信息进行统一管理,与此同时可以筛选出符合的信息,给笔者提供更符合实际的…...
挑战30天学完Python:Day19 文件处理
📘 Day 19 🎉 本系列为Python基础学习,原稿来源于 30-Days-Of-Python 英文项目,大奇主要是对其本地化翻译、逐条验证和补充,想通过30天完成正儿八经的系统化实践。此系列适合零基础同学,或仅了解Python一点…...
Spring Boot application.properties和application.yml文件的配置
在Spring Boot中,application.properties 和 application.yml 文件用于配置应用程序的各个方面,如服务器端口、数据库连接、日志级别等。这两个文件是Spring Boot的配置文件,位于 src/main/resources 目录下。 application.properties 示例 …...
Unity单元测试
Unity单元测试是一个专门用于嵌入式单元测试的库, 现在简单讲下移植以及代码结构. 源码地址: GitHub - ThrowTheSwitch/Unity: Simple Unit Testing for C 1.我们只需要移植三个文件即可: unity.c, unity.h, unity_internals.h 2.然后添加需要测试的函数. 3.在main.c中添加…...
Spring Bean 的生命周期了解么?
Spring Bean 的生命周期基本流程 一个Spring的Bean从出生到销毁的全过程就是他的整个生命周期, 整个生命周期可以大致分为3个大的阶段 : 创建 使用 销毁 还可以分为5个小步骤 : 实例化(Bean的创建) , 初始化赋值, 注册Destruction回调 , Bean的正常使用 以及 Bean的销毁 …...
.ryabina勒索病毒数据怎么处理|数据解密恢复
导言: 随着网络安全威胁的不断增加,勒索软件已成为严重的威胁之一,.ryabina勒索病毒是其中之一。本文将介绍.ryabina勒索病毒的特点、数据恢复方法和预防措施,以帮助用户更好地应对这一威胁。当面对被勒索病毒攻击导致的数据文件…...
上网行为监控软件能够看到聊天内容吗
随着信息技术的不断发展,上网行为监控软件在企业网络安全管理中扮演着越来越重要的角色。 这类软件主要用于监控员工的上网行为,以确保工作效率和网络安全。 而在这其中,域智盾软件作为一款知名的上网行为监控软件,其功能和使用…...
Java知识点一
hello,大家好!我们今天开启Java语言的学习之路,与C语言的学习内容有些许异同,今天我们来简单了解一下Java的基础知识。 一、数据类型 分两种:基本数据类型 引用数据类型 (1)整型 八种基本数…...
Django学习笔记-forms使用
1.创建forms.py文件,导入包 from django import forms from django.forms import fields from django.forms import widgets2. 创建EmployeeForm,继承forms.Form 3.创建testform.html文件 4.urls.py添加路由 5.views中导入forms 创建testform,编写代码 1).如果请求方式为GET,…...
BM100 设计LRU缓存结构(java实现)
一、题目 设计LRU(最近最少使用)缓存结构,该结构在构造时确定大小,假设大小为 capacity ,操作次数是 n ,并有如下功能: Solution(int capacity) 以正整数作为容量 capacity 初始化 LRU 缓存get(key):如果关键字 key …...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...
C++.OpenGL (10/64)基础光照(Basic Lighting)
基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...
Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
基于Springboot+Vue的办公管理系统
角色: 管理员、员工 技术: 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能: 该办公管理系统是一个综合性的企业内部管理平台,旨在提升企业运营效率和员工管理水…...
【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制
目录 节点的功能承载层(GATT/Adv)局限性: 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能,如 Configuration …...
