KDP数据分析实战:从0到1完成数据实时采集处理到可视化
智领云自主研发的开源轻量级Kubernetes数据平台,即Kubernetes Data Platform (简称KDP),能够为用户提供在Kubernetes上的一站式云原生数据集成与开发平台。在最新的v1.1.0版本中,用户可借助 KDP 平台上开箱即用的 Airflow、AirByte、Flink、Kafka、MySQL、ClickHouse、Superset 等开源组件快速搭建实时、半实时或批量采集、处理、分析的数据流水线以及可视化报表展示,可视化展示效果如下:

以下我们将介绍一个实时订单数据流水线从数据采集到数据处理,最后到可视化展示的详细建设流程。
1.流水线设计
借助 KDP 平台的开源组件 Airflow、MySQL、Flink、Kafka、ClickHouse、Superset 完成数据实时采集处理及可视化分析,架构如下:

1.1 数据流
直接使用Flink构建实时数仓,由Flink进行清洗加工转换和聚合汇总,将各层结果集写入Kafka中;
ClickHouse从Kafka分别订阅各层数据,将各层数据持久化到ClickHouse中,用于之后的查询分析。
1.2 数据表
本次分析数据基于mock数据,包含数据实时采集处理及可视化分析:
消费者表:customers
字段 | 字段说明 |
id | 用户ID |
name | 姓名 |
age | 年龄 |
gender | 性别 |
订单表:orders
字段 | 字段说明 |
order_id | 订单ID |
order_revenue | 订单金额 |
order_region | 下单地区 |
customer_id | 用户ID |
create_time | 下单时间 |
1.3 环境说明
在 KDP 页面安装如下组件并完成组件的 QuickStart:
MySQL: 实时数据数据源及 Superset/Airflow 元数据库,安装时需要开启binlog
Kafka: 数据采集sink
Flink: 数据采集及数据处理
ClickHouse: 数据存储
Superset: 数据可视化
Airflow: 作业调度
2. 数据集成与处理
文中使用的账号密码信息请根据实际集群配置进行修改。
2.1 创建MySQL表
2.2 创建 Kafka Topic
进入Kafka broker pod,执行命令创建 Topic,也可以通过Kafka manager 页面创建,以下为进入pod并通过命令行创建的示例:
export BOOTSTRAP="kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092" bin/kafka-topics.sh --create \--topic ods-order \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAP bin/kafka-topics.sh --create \--topic ods-customers \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dwd-order-customer-valid \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dws-agg-by-region \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAP 2.3 创建 ClickHouse 表
进入clickhouse pod,使用`clickhouse-client`执行命令创建表,以下为建表语句:
CREATE DATABASE IF NOT EXISTS kdp_demo;
USE kdp_demo;-- kafka_dwd_order_customer_valid
CREATE TABLE IF NOT EXISTS kdp_demo.dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = MergeTree()
ORDER BY order_id;CREATE TABLE kdp_demo.kafka_dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dwd-order-customer-valid',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dwd_order_customer_valid TO kdp_demo.dwd_order_customer_valid AS
SELECTorder_id,order_revenue,order_region,create_time,customer_id,customer_age,customer_name,customer_gender
FROM kdp_demo.kafka_dwd_order_customer_valid;-- kafka_dws_agg_by_region
CREATE TABLE IF NOT EXISTS kdp_demo.dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = ReplacingMergeTree()
ORDER BY order_region;CREATE TABLE kdp_demo.kafka_dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dws-agg-by-region',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dws_agg_by_region TO kdp_demo.dws_agg_by_region AS
SELECTorder_region,order_cnt,order_total_revenue
FROM kdp_demo.kafka_dws_agg_by_region; 2.4 创建 Flink SQL 作业
2.4.1 SQL部分
CREATE DATABASE IF NOT EXISTS `default_catalog`.`kdp_demo`;-- create source tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`orders_src`(`order_id` INT NOT NULL,`order_revenue` FLOAT NOT NULL,`order_region` STRING NOT NULL,`customer_id` INT NOT NULL,`create_time` TIMESTAMP,PRIMARY KEY(`order_id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'orders'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`customers_src` (`id` INT NOT NULL,`age` FLOAT NOT NULL,`name` STRING NOT NULL,`gender` STRING NOT NULL,PRIMARY KEY(`id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'customers'
);-- create ods dwd and dws tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_order_table` (`order_id` INT,`order_revenue` FLOAT,`order_region` VARCHAR(40),`customer_id` INT,`create_time` TIMESTAMP,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-order','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_customers_table` (`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`gender` STRING,PRIMARY KEY (customer_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-customers','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dwd_order_customer_valid` (`order_id` INT,`order_revenue` FLOAT,`order_region` STRING,`create_time` TIMESTAMP,`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`customer_gender` STRING,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dwd-order-customer-valid','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dws_agg_by_region` (`order_region` VARCHAR(40),`order_cnt` BIGINT,`order_total_revenue` FLOAT,PRIMARY KEY (order_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dws-agg-by-region','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);USE kdp_demo;
-- EXECUTE STATEMENT SET
-- BEGIN
INSERT INTO ods_order_table SELECT * FROM orders_src;
INSERT INTO ods_customers_table SELECT * FROM customers_src;
INSERT INTOdwd_order_customer_valid
SELECTo.order_id,o.order_revenue,o.order_region,o.create_time,c.id as customer_id,c.age as customer_age,c.name as customer_name,c.gender as customer_gender
FROMcustomers_src cJOIN orders_src o ON c.id = o.customer_id
WHEREc.id <> -1;
INSERT INTOdws_agg_by_region
SELECTorder_region,count(*) as order_cnt,sum(order_revenue) as order_total_revenue
FROMdwd_order_customer_valid
GROUP BYorder_region;
-- END; 2.4.2 使用 StreamPark 创建 Flink SQL 作业
具体使用参考 StreamPark 文档。
maven 依赖:
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version>
</dependency> 2.5 创建 Airflow DAG
2.5.1 DAG 文件部分
import random
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_agodefault_args = {'owner': 'admin','depends_on_past': False,'email_on_failure': False,'email_on_retry': False,'retries': 1,
}dag = DAG('kdp_demo_order_data_insert',description='Insert into orders by using random data',schedule_interval=timedelta(minutes=1),start_date=days_ago(1),catchup=False,tags=['kdp-example'],
)# MySQL connection info
mysql_host = 'kdp-data-mysql'
mysql_db = 'kdp_demo'
mysql_user = 'bdos_dba'
mysql_password = 'KdpDba!mysql123'
mysql_port = '3306'
cities = ["北京", "上海", "广州", "深圳", "成都", "杭州", "重庆", "武汉", "西安", "苏州", "天津", "南京", "郑州","长沙", "东莞", "青岛", "宁波", "沈阳", "昆明", "合肥", "大连", "厦门", "哈尔滨", "福州", "济南", "温州","佛山", "南昌", "长春", "贵阳", "南宁", "金华", "石家庄", "常州", "泉州", "南通", "太原", "徐州", "嘉兴","乌鲁木齐", "惠州", "珠海", "扬州", "兰州", "烟台", "汕头", "潍坊", "保定", "海口"]
city = random.choice(cities)
consumer_id = random.randint(1, 100)
order_revenue = random.randint(1, 100)
# 插入数据的 BashOperator
insert_data_orders = BashOperator(task_id='insert_data_orders',bash_command=f'''mysql -h {mysql_host} -P {mysql_port} -u {mysql_user} -p{mysql_password} {mysql_db} -e "INSERT INTO orders(order_revenue,order_region,customer_id) VALUES({order_revenue},'{city}',{consumer_id});"''',dag=dag,
)
insert_data_orders 2.5.2 DAG 说明及执行
当前Airflow安装时,需要指定可访问的git 仓库地址,因此需要将 Airflow DAG 提交到 Git 仓库中。每分钟向orders表插入一条数据。
2.6 数据验证
使用ClickHouse验证数据:
(1)进入ClickHouse客户端
clickhouse-client
# default pass: ckdba.123 (2)执行查询
SELECT * FROM kdp_demo.dwd_order_customer_valid;
SELECT count(*) FROM kdp_demo.dwd_order_customer_valid; (3)对比验证MySQL中数据是否一致
select count(*) from kdp_demo.orders; 3. 数据可视化
在2.6中数据验证通过后,可以通过Superset进行数据可视化展示。使用账号`admin/admin`登录Superset页面(注意添加本地 Host 解析):http://superset-kdp-data.kdp-e2e.io
3.1 创建图表
导入我们制作好的图表:
下载面板:https://gitee.com/linktime-cloud/example-datasets/raw/main/superset/dashboard_export_20240607T100739.zip
导入面板
(1)选择下载的文件导入

(2)输入 ClickHouse 的用户`default`的默认密码`ckdba.123`:

3.2 效果展示
最终的实时订单数据图表展示如下,随着订单数据的更新,图表中的数据也会实时更新:

快速体验
🚀GitHub项目:
https://github.com/linktimecloud/kubernetes-data-platform
欢迎您参与开源社区的建设🤝
- FIN -

更多精彩推荐
我们开源啦!一键部署免费使用!Kubernetes上直接运行大数据平台!
开源 KDP v1.1.0 版本正式发布,新增数据集成开发应用场景
在 KubeSphere 上快速安装和使用 KDP 云原生数据平台
在 Rancher 上快速安装和使用 KDP 云原生数据平台
相关文章:
KDP数据分析实战:从0到1完成数据实时采集处理到可视化
智领云自主研发的开源轻量级Kubernetes数据平台,即Kubernetes Data Platform (简称KDP),能够为用户提供在Kubernetes上的一站式云原生数据集成与开发平台。在最新的v1.1.0版本中,用户可借助 KDP 平台上开箱即用的 Airflow、AirByte、Flink、K…...
【人工智能】-- 智能机器人
个人主页:欢迎来到 Papicatch的博客 课设专栏 :学生成绩管理系统 专业知识专栏: 专业知识 文章目录 🍉引言 🍉机器人介绍 🍈机器人硬件 🍍机械结构 🍍传感器 🍍控…...
Android广播机制
简介 某个网络的IP范围是192.168.0.XXX,子网 掩码是255.255.255.0,那么这个网络的广播地址就是192.168.0.255。广播数据包会被发送到同一 网络上的所有端口,这样在该网络中的每台主机都将会收到这条广播。为了便于进行系统级别的消息通知&…...
SQL FOREIGN KEY
SQL FOREIGN KEY 简介 SQL(Structured Query Language)是用于管理关系数据库管理系统(RDBMS)的标准编程语言。在SQL中,FOREIGN KEY是一个重要的概念,用于建立和维护数据库中不同表之间的关系。本文将详细介绍SQL FOREIGN KEY的概念、用途、以及如何在SQL中实现和使用FO…...
绘唐3最新版本哪里下载
绘唐3最新版本哪里下载 绘唐最新版本下载地址 推文视频创作设计是一种通过视频和文字的形式来进行推广的方式,可以通过一些专业的工具来进行制作。 以下是一些常用的小说推文视频创作设计工具: 视频剪辑软件:如Adobe Premiere Pro、Fina…...
[ES6] 箭头函数
JavaScript 是一种广泛使用的编程语言,随着其发展和演变,引入了很多新的特性来提高代码的可读性和开发效率。其中一个重要的特性就是 ES6(ECMAScript 2015)中引入的箭头函数(Arrow Function)。箭头函数不仅…...
BiLSTM模型实现
# 本段代码构建类BiLSTM, 完成初始化和网络结构的搭建 # 总共3层: 词嵌入层, 双向LSTM层, 全连接线性层 # 本段代码构建类BiLSTM, 完成初始化和网络结构的搭建 # 总共3层: 词嵌入层, 双向LSTM层, 全连接线性层 import torch import torch.nn as nn# 本函数实现将中文文本映射为…...
linux内核源码学习所需基础
1.面向对象的思想,尤其是oopc的实现方式。 2.设计模式。 这两点需要内核源码学习者不仅要会c和汇编,还要接触一门面向对象的语言,比如c++/java/python等等任意一门都行,起码要了解面向对象的思想。 另外li…...
Java并发编程-AQS详解及案例实战(上篇)
文章目录 AQS概述AQS 的核心概念AQS 的工作原理AQS 的灵活性使用场景使用指南使用示例AQS的本质:为啥叫做异步队列同步器AQS的核心机制“异步队列”的含义“同步器”的含义总结加锁失败的时候如何借助AQS异步入队阻塞等待AQS的锁队列加锁失败时的处理流程异步入队的机制总结Ree…...
第11章 规划过程组(二)(11.8排列活动顺序)
第11章 规划过程组(二)11.8排列活动顺序,在第三版教材第391页; 文字图片音频方式 第一个知识点:主要输出 1、项目进度网络图 如图11-20 项目进度网络图示例 带有多个紧前活动的活动代表路径汇聚,而带有…...
DP学习——观察者模式
学而时习之,温故而知新。 敌人出招(使用场景) 多个对象依赖一个对象的状态改变,当业务中有这样的关系时你出什么招? 你出招 这个时候就要用观察者模式这招了! 2个角色 分为啥主题和观察者角色。 我觉…...
如何利用GPT-4o生成有趣的梗图
文章目录 如何利用GPT-4o生成有趣的梗图一、引言二、使用GPT-4o生成梗图1. 提供主题2. 调用工具3. 获取图片实际案例输入输出 三、更多功能1. 创意和灵感2. 梗图知识 四、总结 如何利用GPT-4o生成有趣的梗图 梗图,作为互联网文化的一部分,已经成为了我们…...
深入理解 KVO
在 iOS 中,KVO(Key-Value Observing)是一个强大的观察机制,它的底层实现相对复杂。KVO 利用 Objective-C 的动态特性,为对象的属性提供观察能力。 KVO 的底层实现 1. 动态子类化 当一个对象的属性被添加观察者时&am…...
当需要对大量数据进行排序操作时,怎样优化内存使用和性能?
文章目录 一、选择合适的排序算法1. 快速排序2. 归并排序3. 堆排序 二、数据结构优化1. 使用索引2. 压缩数据3. 分块排序 三、外部排序1. 多路归并排序 四、利用多核和并行计算1. 多线程排序2. 使用并行流 五、性能调优技巧1. 避免不必要的内存复制2. 缓存友好性3. 基准测试和性…...
kubernetes集群部署:node节点部署和cri-docker运行时安装(四)
安装前准备 同《kubernetes集群部署:环境准备及master节点部署(二)》 安装cri-docker 在 Kubernetes 1.20 版本之前,Docker 是 Kubernetes 默认的容器运行时。然而,Kubernetes 社区决定在 Kubernetes 1.20 及以后的…...
第五十章 Web Service URL 汇总
文章目录 第五十章 Web Service URL 汇总Web 服务 URLWeb 服务的端点WSDL 使用受密码保护的 WSDL URL 第五十章 Web Service URL 汇总 本主题总结了与 IRIS 数据平台 Web 服务相关的 URL。 Web 服务 URL 与 IRIS Web 服务相关的 URL 如下: Web 服务的端点 http…...
动态白色小幽灵404网站源码
动态白色小幽灵404网站源码,页面时单页HTML源码,将代码放到空白的html里面,鼠标双击html即可查看效果,或者上传到服务器,错误页重定向这个界面即可,喜欢的朋友可以拿去使用 <!DOCTYPE html> <ht…...
axios的使用,处理请求和响应,axios拦截器
1、axios官网 https://www.axios-http.cn/docs/interceptors 2、安装 npm install axios 3、在onMouunted钩子函数中使用axios来发送请求,接受响应 4.出现的问题: (1) 但是如果发送请求请求时间过长,回出现请求待处…...
visual studio 2017增加.cu文件
右击项目名称,选择生成依赖项>生成自定义把CUDA11.3target勾选上; 把带有cuda代码的.cpp文件和.cu文件右击属性>项类型>选择CUDA C/C 右击项目名称,C/C>命令行添加/D _CRT_SECURE_NO_WARNINGS; 选择CUDA C/C>命…...
linux 管道符 |
在Linux中,管道符(|)是一个非常重要的概念,它允许你将一个命令的输出作为另一个命令的输入。这种机制使得Linux命令可以非常灵活地进行组合,从而执行复杂的任务。 管道符的基本用法 假设你有两个命令:com…...
SpringBoot-17-MyBatis动态SQL标签之常用标签
文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...
零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?
一、核心优势:专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发,是一款收费低廉但功能全面的Windows NAS工具,主打“无学习成本部署” 。与其他NAS软件相比,其优势在于: 无需硬件改造:将任意W…...
51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...
从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
