Flink电商实时数仓(三)
DIM层代码流程图
维度层的重点和难点在于实时电商数仓需要的维度信息一般是动态的变化的,并且由于实时数仓一般需要一直运行,无法使用常规的配置文件重启加载方式来修改需要读取的ODS层数据,因此需要通过Flink-cdc实时监控MySql中的维度数据配置信息表,实时动态的发布广播信息。主流数据根据广播数据及时调整处理逻辑,并自动在HBase中创建相应的维度表和写入相应的维度数据。
- 消费Kafka ods业务主题数据
- 数据清洗:是否为JSON格式
- 使用flink-cdc读取监控配置表数据
- 在HBase中创建维度表
- 做成广播流
- 连接主流和广播流
- 筛选出需要写出的字段
- 写出到Hbase
整体架构
- realtime-common模块
- base: 所有Flink程序的基类,负责搭建Flink运行环境和设置并行度和检查点等相关参数。其中我们的数据来源也确定为Kafka,故数据源代码也写在这里。每个Flink程序的具体处理逻辑由handle()函数来负责处理。
- bean:负责存放项目运行过程中需要用到的bean对象,比如当前flink-cdc程序中需要用到的TableProcessDim类,配置信息表对象。
- constant:负责存放程序中需要使用到常量参数
- function:负责存放一些通用的函数方法
- util:一般存放和数据连接相关的工具类
- test目录: 用来在写正式代码前测试连接是否通畅,数据是否可以正常发送。
- realtime-dim模块
- app:DimApp里面写的是dim层的具体实现,具体步骤如上述流程图所示。
- function:负责存放数据处理的实现类,一般会继承相应的父类,在dim层可以直接调用这里的子类来实现父类接口,让dim层的代码逻辑更加清晰。
- realtime-dwd模块:如上
- realtime-dws模块:如上
数据清洗ETL
数据清洗,简单来说就是对数据进行简单的转换筛选。首先如果在转换过程中出现异常,直接过滤掉。注意这里无需抛出异常,因为如果throw a exception会导致整个程序异常终止,而在数据处理过程中出现部分数据格式错误而无法正常进行格式转换是很常见的,只需将异常信息打印到控制台即可。如果转换正常,再判断是否满足以下三个条件:
- 数据库名为gmall
- 数据类型不是bootstrap-start或者bootstrap-complete
- data字段不是null且长度不为0
Flink-cdc读取配置表的数据
Flink中获取数据主要有两个步骤:
- 获取相应的数据源Source
- 注意:在构建Flink-cdc对应的MySQLSource时,tableList参数必须是库表.表名结构
- 调用env.fromSource()方法将数据源的发送过来的数据转换Ds数据流,在该方法中可以设置数据的水位线。
- 获取到数据后,建议先打印到控制台查看数据的具体结构。
- 注意读取配置信息表的并发度必须设置为1;如果不为1,只能读取r操作数据,其他更新数据无法读取。
public static MySqlSource<String> getMySqlSource(String databaseName, String tableName){MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(Constant.MYSQL_HOST).port(Constant.MYSQL_PORT).username(Constant.MYSQL_USER_NAME).password(Constant.MYSQL_PASSWORD).databaseList(databaseName) // set captured database.tableList(databaseName+"."+tableName) // set captured table.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.startupOptions(StartupOptions.initial()).build();return mySqlSource;}
在HBase中创建维度表
数据库中的配置表数据经过Flink-cdc处理后发送到这里是json格式的字符串,这里根据数据的四种类型op在HBase中进行不同的建表删表操作,同时对数json字符数据进行转换映射处理,转换为对应的bean对象数据流。这里一个数据产生一个处理后的对象,故使用Map算子或FlatMap算子都可以。
- op类型
- d 代表delete,需要删除before字段中对应的表
- c 代表create,r 代表 read,需要创建after字段中对应的表
- u 代表update,需要先删除掉旧表,然后根据新表的字段创建一个新表
- 创建HBase连接,创建连接是很耗费资源的行为,因此新建连接和关闭连接需要写在open和close方法中
- HBase中想要对表进行创建和删除等DDL操作,都由Admin对象管理;如果需要对数据进行插入删除等DML操作,需要创建Table对象。详细操作细节请看相应代码即可。
public static SingleOutputStreamOperator<TableProcessDim> createHbaseTable(DataStreamSource<String> mysqlSource) {SingleOutputStreamOperator<TableProcessDim> createHBaseTable = mysqlSource.flatMap(new RichFlatMapFunction<String, TableProcessDim>() {public Connection connection ;@Overridepublic void open(Configuration parameters) throws Exception {//获取连接connection = HBaseUtil.getHBaseConnection();}@Overridepublic void close() throws Exception {//关闭连接HBaseUtil.closeHBaseConn(connection);}@Overridepublic void flatMap(String s, Collector<TableProcessDim> out){//使用读取的配置表数据,到HBase中创建与之对应的表格try {JSONObject jsonObject = JSONObject.parseObject(s);String op = jsonObject.getString("op");TableProcessDim dim;//维度表if ("d".equals(op)) {dim = jsonObject.getObject("before", TableProcessDim.class);dim.setOp(op);//当配置表发送一个D类型的数据,对应的HBase需要删除一张维度表deleteTable(dim);} else if ("c".equals(op) || "r".equals(op)) {dim = jsonObject.getObject("after", TableProcessDim.class);createTable(dim);dim.setOp(op);} else {//op = 'u', 即修改dim = jsonObject.getObject("after", TableProcessDim.class);deleteTable(dim);createTable(dim);}dim.setOp(op);out.collect(dim);} catch (Exception e) {e.printStackTrace();}}private void createTable(TableProcessDim dim) {String sinkFamily = dim.getSinkFamily();String[] split = sinkFamily.split(",");try {HBaseUtil.createHBaseTable(connection,Constant.HBASE_NAMESPACE,dim.getSinkTable(),split);} catch (IOException e) {e.printStackTrace();}}private void deleteTable(TableProcessDim dim) {try {HBaseUtil.dropHBaseTable(connection, Constant.HBASE_NAMESPACE, dim.getSinkTable());} catch (IOException e) {e.printStackTrace();}}});return createHBaseTable;}
主流连接广播流
从Flink-cdc获取的数据(gmall2023_config)是作为一个参数来控制我们对于主流即ODS层数据(gmall数据库的业务数据)的处理逻辑。gmall2023)_config库中的Table_process_dim表决定了后续程序筛选哪个表作为维度信息,并且定义了表中有哪些字段。
- 转换为广播流只需要调用上述得到的TableProcessDimStream的broadcast方法
- 使用的主流(gmall业务数据)的connect方法,得到一个连接流,然后对连接流进行process处理。
- 创建BroadcastProcessFunction,在里面分别有两个函数
- processBroadcastElement():处理广播流数据
- processElement():处理主流数据
- 广播流处理逻辑:
- 读取广播状态
- 将配置表信息写到广播状态中
- 根据广播状态数据的op对状态做相应的修改
- 主流处理逻辑:
- 查询广播状态,判断当前数据对应的表是否存在于状态中
- 如果数据比状态来的更早,造成状态为空,需要对状态做预处理(提前从mysql中读取维表配置表信息)
- 如果根据当前表的表名查询的状态不为空,说明该表为维度数据,使用收集器收集起来。
筛选出需要的字段
在维度配置信息表中的sink_column字段里定义了维度表需要的字段,使用filter算子对JsonObj里面的data字段进行过滤即可获取到想要的字段数据。
写出到Hbase
过滤后的数据流调用它的addSink方法,方法中需要传入一个SinkFunction接口类。该接口需要实现三个方法分别是:
- open方法:获取HBase连接
- close方法:关闭HBase连接
- invoke方法:写入数据时调用的方法,根据jsonObj中的type做不同处理,如果是delete,需要删除对应的维度表数据;否则都是直接覆盖写入。
代码的Gitee仓库地址:https://gitee.com/langpaian/gmall2023-realtime.git
相关文章:

Flink电商实时数仓(三)
DIM层代码流程图 维度层的重点和难点在于实时电商数仓需要的维度信息一般是动态的变化的,并且由于实时数仓一般需要一直运行,无法使用常规的配置文件重启加载方式来修改需要读取的ODS层数据,因此需要通过Flink-cdc实时监控MySql中的维度数据…...

四种消息队列,如何选型
这篇文章,主要讲述 Kafka、RabbitMQ、RocketMQ 和 ActiveMQ 这 4 种消息队列的异同,无论是面试,还是用于技术选型,都有非常强的参考价值。 01 消息队列基础 1.1 什么是消息队列? 消息队列是在消息的传输过程中保存消…...

flutter开发windows应用的库
一、window_manager 这个插件允许 Flutter 桌面应用调整窗口的大小和位置 地址:https://github.com/leanflutter/window_manager二、win32 一个包,它使用FFI包装了一些最常见的Win32 API调用,使Dart代码可以访问这些调用,而不需…...

机器学习--线性回归
目录 监督学习算法 线性回归 损失函数 梯度下降 目标函数 更新参数 批量梯度下降 随机梯度下降 小批量梯度下降法 数据预处理 特征标准化 正弦函数特征 多项式特征的函数 数据预处理步骤 线性回归代码实现 初始化步骤 实现梯度下降优化模块 损失与预测模块 …...

【Spring Boot】面试题汇总,带答案的那种
继上次的文章【MySQL连环炮,你抗的住嘛?】爆火之后,越来越多的小伙伴后台留言,要求阿Q总结下其他的“连环炮”知识点,想在金九银十的面试黄金期轻松对线面试官。 同样为了节省大家的时间,阿Q最近对【Sprin…...

【大模型】快速体验百度智能云千帆AppBuilder搭建知识库与小助手
文章目录 前言千帆AppBuilder什么是千帆AppBuilderAppBuilder能做什么 体验千帆AppBuilderJava知识库高考作文小助手 总结 前言 前天,在【百度智能云智算大会】上,百度智能云千帆AppBuilder正式开放服务。这是一个AI原生应用开发工作台,可以…...

字符串压缩
...

MsSQL中的索引到底长啥样,查找过程怎么进行
参考文章一 参考文章二 建表 mysql> create table user(-> id int(10) auto_increment,-> name varchar(30),-> age tinyint(4),-> primary key (id),-> index idx_age (age)-> )engineinnodb charsetutf8mb4;insert into user(name,age) values(张三,…...
WPF 全局异常处理
在Application中存在三种异常事件EventHandler DispatcherUnhandledExceptionAppDomain.CurrentDomain.UnhandledExceptionTaskScheduler.UnobservedTaskException 其中 DispatcherUnhandledException 是在异常由应用程序引发但未进行处理时发生,但无法捕获多线程…...
Flink系列之:Elasticsearch SQL 连接器
Flink系列之:Elasticsearch SQL 连接器 一、Elasticsearch SQL 连接器二、创建 Elasticsearch表三、连接器参数四、Key 处理五、动态索引六、数据类型映射 一、Elasticsearch SQL 连接器 Sink: BatchSink: Streaming Append & Upsert ModeElasticsearch 连接器…...
java中将Map集合、对象、字符串转换为JSON对象
1、Map集合转JSON对象 创建一个Map集合; 新建json对象,并将Map引入json中。 public void demo1(){ //创建一个Map集合Map<String, String> map new HashMap<>();map.put("1729210001","zhangsan");map.put("17292…...

理解Spring中bean的作用域
singleton:Spring Ioc容器中只会存在一个共享的Bean实例,无论有多少个Bean引用它,始终指向同一个对象,作用域为Spring中的缺省(同一package)作用域 prototype:每次通过Spring容器获取prototype定义的bean时,…...
edge中以右键“打印”的方式“保存”当前页面的pdf形式,下载过程中卡进度的问题
目录 问题描述: 可能的问题: 解决: 问题描述: 特殊情况下需要保存网页的pdf形式,但页面没有类似“导出pdf”的功能按钮,可以通过页面右键“打印”的方式“保存”当前页面的pdf形式。在pdf文件下载过程中出…...
c# 使用OpenCV
C#和OpenCV的结合主要通过一个名为OpenCVSharp的库实现。OpenCVSharp是一个C#包装器,它提供了对OpenCV(一个开源的计算机视觉和机器学习库)功能的访问。 安装OpenCVSharp NuGet包: 在Visual Studio中,右键点击你的项目…...
数据库连接问题 - ChatGPT对自身的定位
1.一段关于数据库连接的技术性对话 sweetie,连接数据库的时候,需要在每次读写数据后就把连接释放吗? 亲爱的,连接数据库后,通常会在每次读写数据后将连接释放。这是为了确保数据库连接的及时释放和有效管理。如果不及…...

常见可视化大屏编辑器有哪些?
前言: 在当今数字化时代,可视化大屏编辑器成为了数据展示和决策支持的重要工具。大屏编辑器不仅仅是数据的呈现,更是数据背后的故事的讲述者。它通过图表、图形和实时数据的呈现,为用户提供了全面的信息视图,帮助用户更…...

利用ffmpeg cv2取h265码流视频(转换图片灰屏问题解决)
利用海康威视相机拍出来的视频是H265格式的,相比于常规的H264编码,压缩率更高,但因此如果直接用正常取流方法读取,会出现无法读取的情况 1. 如图h265码流取出图片为灰屏 2 、解决灰屏问题 import subprocess import cv2# 将h265流…...
Android Uri scheme协议file转content
一、Uri的介绍 在Android开发中,Uri(Uniform Resource Identifier)是用于标识和访问各种资源的核心概念。这些资源可能包括文件、网络URL、数据库记录等。在处理这些资源时,我们可能会遇到不同的Uri协议,如file和conte…...

【Jenkins】远程API接口:Java 包装接口使用示例
jenkins-rest 库是一个面向对象的 Java 项目,它通过编程方式提供对 Jenkins REST API 的访问,以访问 Jenkins 提供的一些远程 API。它使用 jclouds 工具包构建,可以轻松扩展以支持更多 REST 端点。其功能集不断发展,用户可以通过拉…...
未能加载工具箱项问题的解决
解决办法是项目属性要设置成any cpu 在解决方案里的所有项目上右键,属性,生成,看目标平台是不是都设置成了any cpu...

JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...
多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验
一、多模态商品数据接口的技术架构 (一)多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如,当用户上传一张“蓝色连衣裙”的图片时,接口可自动提取图像中的颜色(RGB值&…...

《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...

算法岗面试经验分享-大模型篇
文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer (1)资源 论文&a…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)
引言 在人工智能飞速发展的今天,大语言模型(Large Language Models, LLMs)已成为技术领域的焦点。从智能写作到代码生成,LLM 的应用场景不断扩展,深刻改变了我们的工作和生活方式。然而,理解这些模型的内部…...