基于Flink SQL的实时指标多维分析模型
数据流程介绍
1.创建源表kafka接入消息队列数据,定义字段映射规则;
2.创建目标表es_sink配置Elasticsearch输出;
3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算;
4.使用ROLLUP进行多维聚合统计;
5.最终计算结果写入ES,包含成功率等衍生指标。

Flink SQL 逻辑
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
-- 单位:ms, 10天
--SET table.exec.state.ttl = 864000000CREATE TABLE kafkaTable (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,id AS IF(cur['id'] IS NOT NULL , cur['id'], src ['id']),task_id AS IF(cur['task_id'] IS NOT NULL , cur['task_id'], src ['task_id']),account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type']),retry_status AS IF(cur['retry_status'] IS NOT NULL , cur['retry_status'], src ['retry_status']),update_time as IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']),event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)proctime AS PROCTIME()
-- WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE --SECOND
) WITH ('connector' = 'kafka','topic' = 'xxx','jdq.client.id' = 'xxx','jdq.password' = 'xxx','jdq.domain' = 'xxx','scan.startup.mode' = 'group-offsets', -- default: group-offsets,other: latest-offset,earliest-offset-- 'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format' = 'binlog');CREATE TABLE es_sink(send_type STRING,task_id STRING,month_dim STRING,day_dim STRING,grouping_id INTEGER,init INTEGER,cancel INTEGER,succ INTEGER,fail INTEGER,cancel_rate float,succ_rate float,fail_rate float,update_date STRING,PRIMARY KEY (grouping_id,send_type,month_dim,day_dim,task_id) NOT ENFORCED
)with ('connector' = 'elasticsearch-6','index' = 'index01','document-type' = 'type01','hosts' = 'xx','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb');
-- 维度:
-- - send_type, 发送类型
-- - month_dim,月份维度
-- - day_dim,天维度
-- - task_id,任务IDCREATE view tmp as
selectsend_type,task_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0' then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and msg_status='4' then 1 else 0 end AS cancel,case when UPPER(opt) = 'UPDATE' and msg_status='1' then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and msg_status='2' then 1 else 0 end AS fail,update_time,opt,ts,id,proctime,SUBSTRING(publish_time,1,7) as month_dim,SUBSTRING(publish_time,1,10) as day_dim
FROM kafkaTable
where trim(retry_status) = '0'and publish_time >= '2025-01-01 00:00:00'and( (UPPER(opt) = 'INSERT' and msg_status='0' and position( '_R' in task_id) = 0)or (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4') and position( '_R' in task_id) = 0)or (UPPER(opt) = 'UPDATE' and msg_status='1' and position( '_R' in task_id) > 0));--去重模式,去重是指对在列的集合内重复的行进行删除,只保留第一行或最后一行数据。在聚合sum或count时,Flink回撤流会对数据进行回撤处理
create view tmp_dedup as
select * from(select *,row_number() over(partition by id,msg_status order by proctime desc) as rnfrom tmp) t
where rn=1;CREATE view tmp1 as
selectsend_type,task_id,month_dim,day_dim,init,case when cancel = 1 and update_time <= publish_time then 1 else 0 end AS cancel,succ,case when cancel = 1 and update_time > publish_time then 1 else fail end AS fail,update_time
from tmp_dedup
where position( '_R' in task_id) = 0;CREATE view tmp2 as
selectsend_type,SPLIT_INDEX(task_id,'_R',0) AS task_id,month_dim,day_dim,init,cancel,succ,-1 AS fail,update_time
from tmp_dedup
where position( '_R' in task_id) > 0
and succ = 1 ;CREATE view tmp3 as
selectsend_type,task_id,month_dim,day_dim,init,cancel,succ,fail
from tmp1
UNION ALL
selectsend_type,task_id,month_dim,day_dim,init,cancel,succ,fail
from tmp2;CREATE view tmp_groupby as
select
--/*+ STATE_TTL('tmp' = '10d') */COALESCE(send_type,'N') AS send_type,COALESCE(month_dim,'N') AS month_dim,COALESCE(day_dim,'N') AS day_dim,COALESCE(task_id,'N') AS task_id,case when send_type is null and month_dim is null and day_dim is null and task_id is null then 1when send_type is not null and month_dim is null and day_dim is null and task_id is null then 2when send_type is not null and month_dim is not null and day_dim is null and task_id is null then 3when send_type is not null and month_dim is not null and day_dim is not null and task_id is null then 4when send_type is not null and month_dim is not null and day_dim is not null and task_id is not null then 5end grouping_id,sum(init) as init,sum(cancel) as cancel,sum(succ) as succ,sum(fail) as fail
from tmp3
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,month_dim,day_dim,task_id); --等同于以上INSERT INTO es_sink
selectcase when trim(send_type) = '1' then '发送类型1'when trim(send_type) = '2' then '发送类型2'else send_type end AS send_type,task_id,month_dim,day_dim,grouping_id,init,cancel,succ,fail,ROUND(cancel*100.0/init,2) AS cancel_rate,ROUND(succ*100.0/(init - cancel),2) AS succ_rate,ROUND(fail*100.0/(init - cancel),2) AS fail_rate,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp_groupby
where init > 0
and (init - cancel) > 0;
es mapping
#POST index01/type01/_mapping
{"type01": {"properties": {"grouping_id": {"type": "byte"},"send_type": {"type": "keyword","ignore_above": 256},"month_dim": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM","ignore_malformed":"true" --忽略错误的各式}}},"day_dim": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM-dd","ignore_malformed":"true"}}},"task_id": {"type": "keyword"},"init": {"type": "integer"},"cancel": {"type": "integer"},"succ": {"type": "integer"},"fail": {"type": "integer"},"cancel_rate": {"type": "float"},"succ_rate": {"type": "float"},"fail_rate": {"type": "float"},"update_date": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}}}
}
相关文章:
基于Flink SQL的实时指标多维分析模型
数据流程介绍 1.创建源表kafka接入消息队列数据,定义字段映射规则; 2.创建目标表es_sink配置Elasticsearch输出; 3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算&#x…...
算法刷题整理合集(一)
算法刷题整理合集(一) 本篇博客旨在记录自已的算法刷题练习成长,里面注有详细的代码注释以及和个人的思路想法,希望可以给同道之人些许帮助。本人也是算法小白,水平有限,如果文章中有什么错误或遗漏之处&am…...
C++ STL—— String库
在C编程中,字符串操作是几乎每个项目都会涉及的基础功能。C标准模板库(STL)中的string类为我们提供了强大而灵活的工具,使得字符串的处理变得简单高效。无论是字符串的创建、修改、查找,还是复杂的文本处理,…...
【从零开始学习计算机科学】数据库系统(二)关系数据库 与 关系代数
【从零开始学习计算机科学】数据库系统(二)关系数据库 与 关系代数 关系数据库结构化查询语言SQL数据定义语言(DDL)数据查询语言(Data Query Language, DQL)数据操纵语言(Data Manipulation Language, DML)数据控制语言(Data Control Language, DCL)关系型数据库的优…...
DoS攻击防范
一、网络架构优化 使用CDN或反向代理 通过内容分发网络(CDN)或反向代理(如Nginx)分散流量,将请求分发到多个服务器节点,减轻单点压力,同时过滤异常请求。 负载均衡技术 部署负载均衡设备&#…...
Linux驱动开发实战(四):设备树点RGB灯
Linux驱动开发实战(四):设备树点RGB灯 文章目录 Linux驱动开发实战(四):设备树点RGB灯前言一、驱动实现1.1 驱动设计思路1.2 关键数据结构1.3 字符设备操作函数1.4 平台驱动探测函数1.5 匹配表和平台驱动结…...
vue中,watch里,this为undefined的两种解决办法
提示:vue中,watch里,this为undefined的两种解决办法 文章目录 [TOC](文章目录) 前言一、问题二、方法1——使用function函数代替箭头函数()>{}三、方法2——使用that总结 前言 尽量使用方法1——使用function函数代替箭头函数()…...
设计模式C++
针对一些经典的常见的场景, 给定了一些对应的解决方案,这个就叫设计模式。 设计模式的作用:使代码的可重用性高,可读性强,灵活性好,可维护性强。 设计原则: 单一职责原则:一个类只做一方面的…...
前端构建工具进化论:从Grunt到Turbopack的十年征程
前端构建工具进化论:从Grunt到Turbopack的十年征程 一、石器时代:任务自动化工具(2012-2014) 1.1 Grunt:首个主流构建工具 // Gruntfile.js 典型配置 module.exports function(grunt) {grunt.initConfig({concat: {…...
设备预测性维护:企业降本增效的关键密码
在当今竞争激烈的商业战场中,企业犹如一艘在波涛汹涌大海上航行的巨轮,要想乘风破浪、稳步前行,降本增效便是那至关重要的 “船锚”,帮助企业在复杂的市场环境中站稳脚跟。而设备预测性维护,正是开启企业降本增效大门的…...
css基本功
为什么 ::first-letter 是伪元素? ::first-letter 的作用是选择并样式化元素的第一个字母,它创建了一个虚拟的元素来包裹这个字母,因此属于伪元素。 grid布局 案例一 <!DOCTYPE html> <html lang"zh-CN"><head&…...
信号处理抽取多项滤波的数学推导与仿真
昨天的《信号处理之插值、抽取与多项滤波》,已经介绍了插值抽取的多项滤率,今天详细介绍多项滤波的数学推导,并附上实战仿真代码。 一、数学变换推导 1. 多相分解的核心思想 将FIR滤波器的系数 h ( n ) h(n) h(n)按相位分组,每…...
C++双端队列知识点+习题
在C中,双端队列(Deque,发音为“deck”)是标准模板库(STL)中的一种容器适配器,其全称为Double-Ended Queue。它结合了队列和栈的特点,允许在容器的两端(前端和后端&#x…...
【递归、搜索和回溯算法】专题二 :二叉树中的深搜
二叉树中的深搜 深度优先遍历(DFS):一种沿着树或图的深度遍历节点的算法,尽可能深地搜索树或图的分支,如果一条路径上的所有结点都被遍历完毕,就会回溯到上一层,继续找一条路遍历。 在二叉树中…...
Vue3计算属性深度解析:经典场景与Vue2对比
一、计算属性的核心价值 计算属性(Computed Properties)是Vue响应式系统的核心特性之一,它通过依赖追踪和缓存机制优雅地解决模板中复杂逻辑的问题。当我们需要基于现有响应式数据进行派生计算时,计算属性总能保持高效的性能表现…...
UE5与U3D引擎对比分析
Unreal Engine 5(UE5)和Unity 3D(U3D)是两款主流的游戏引擎,适用于不同类型的项目开发。以下是它们的主要区别,分点整理: 1. 核心定位 UE5: 主打3A级高画质项目(如主机/P…...
【vue3学习笔记】(第150-151节)computed计算属性;watch监视ref定义的数据
尚硅谷Vue2.0Vue3.0全套教程丨vuejs从入门到精通 本篇内容对应课程第150-151节 课程 P150节 《computed计算属性》笔记 写一个简单的 姓、名输入框效果: 用vue2的形式定义一个计算属性 fullName: 测试页面展示无问题: 但是,在vue…...
JavaScript如何实现复制图片功能?
最近开发中遇到一个需求,就是用户希望能通过直接点击按钮复制图片,然后就可以很方便的把图片发送到班群中,于是就有了复制图片的需求。 那么如何通过JavaScript来实现复制图片呢? 一、前置知识:如何实现复制…...
MySQL 8 设置允许远程连接(Windows环境)
🌟 MySQL 8 设置允许远程连接(Windows环境) 在开发和部署应用时,经常需要从远程主机连接到MySQL数据库。默认情况下,MySQL仅允许本地连接,因此需要进行一些配置才能允许远程访问。今天,我将详细…...
我又又又又又又更新了~~纯手工编写C++画图,有注释~~~
再再再次感谢Ttcofee提的问题 本次更新内容: 鼠标图案(切换),版本号获取,输入框复制剪切板 提前申明:如果运行不了,请到主页查看RedpandaDevc下载,若还是不行就卸了重装。 版本号&…...
Python控制语句——循环语句-for
1.下面的语句哪个会无限循环下去()。 A、 for a in range(10): time.sleep(10) B、 while 1<10: time.sleep(10) C、 while True: break D、 a = [3,-1,2] for i in a: if i==-1: break 答案:B。1<10始终为True,循环体中又没有break的条件,故B会无限循环。 2.for s i…...
全面解析:将采购入库单数据集成到MySQL的技术实施
旺店通旗舰版-采购入库单集成到MySQL的技术案例分享 在数据驱动的业务环境中,如何高效、准确地实现系统间的数据对接是企业面临的重要挑战。本文将聚焦于一个具体的系统对接集成案例:将旺店通旗舰奇门平台上的采购入库单数据集成到MySQL数据库中&#x…...
12. Pandas :使用pandas读Excel文件的常用方法
一 read_excel 函数 其他参数根据实际需要进行查找。 1.接受一个工作表 在 11 案例用到的 Excel 工作簿中,数据是从第一张工作表的 A1 单元格开始的。但在实际场景中, Excel 文件可能并没有这么规整。所以 panda 提供了一些参数来优化读取过程。 比如 s…...
记录致远OA服务器硬盘升级过程
前言 日常使用中OA系统突然卡死,刷新访问进不去系统,ping服务器地址正常,立马登录服务器检查,一看磁盘爆了。 我大脑直接萎缩了,谁家OA系统配400G的空间啊,过我手的服务器没有50也是30台,还是…...
Java网络多线程
网络相关概念: 关于访问: IP端口 因为一个主机上可能有多个服务, 一个服务监听一个端口,当你访问的时候主机通过端口号就能知道要和哪个端口发生通讯.因此一个主机上不能有两个及以上的服务监听同一个端口. 协议简单来说就是数据的组织形式 好像是两个人交流一样,要保证自己说…...
【H2O2 | 软件开发】Axios发送Http请求
目录 前言 开篇语 准备工作 正文 概念 封装工具包 示例 结束语 前言 开篇语 本系列为短篇,每次讲述少量知识点,无需一次性灌输太多的新知识点。该主题文章主要是围绕前端、全栈开发相关面试常见问题撰写的,希望对诸位有所帮助。 如…...
VScode 运行LVGL
下载vscode解压 环境安装 安装mingw64,gcc 版本必须8.3以上 安装cmak 系统环境变量Path中添加(以实际安装目录为准) C:\Program Files\mingw64\bin C:\Program Files\CMake\bin 将GUI-Guider生成的代码目录拷贝一份放到vscode项目目录…...
AIP-165 按条件删除
编号165原文链接https://google.aip.dev/165状态批准创建日期2019-12-18更新日期2019-12-18 有时API需要提供一种机制,按照一些过滤参数删除大量资源,而非提供待删除的各资源名字。 这是一个稀有的场景,用于用户一次性删除数千或更多资源的…...
React Next项目中导入Echart世界航线图 并配置中文
公司业务要求做世界航线图,跑了三个ai未果,主要是引入world.json失败,echart包中并不携带该文件,源码的world.json文件页面404找不到。需要自己寻找。这是整个问题卡壳的关键点,特此贴出资源网址。 目录 一、安装 二…...
QT与网页显示数据公式的方法
一.网页中显示数学公式通常有三种主要方法 1.图片方式 原理:将公式转换为图片(如 PNG、SVG),通过 <img> 标签嵌入网页。 实现步骤: 使用工具(如 LaTeX dvipng、在线生成工具)将公式渲…...
