当前位置: 首页 > news >正文

Flink电商实时数仓(六)

交易域支付成功事务事实表

  1. 从topic_db业务数据中筛选支付成功的数据
  2. 从dwd_trade_order_detail主题中读取订单事实数据、LookUp字典表
  3. 关联三张表形成支付成功宽表
  4. 写入 Kafka 支付成功主题
执行步骤
  1. 设置ttl,通过Interval join实现左右流的状态管理
  2. 获取下单明细数据:用户必然要先下单才有可能支付成功,因此支付成功明细数据集必然是订单明细数据集的子集。要注意:Interval Join要求表中均为Append数据,即“只能新增,不能修改”,订单明细表数据生成过程中用到了left join,生成了回撤流,看似不满足Interval Join的条件。但是,回撤数据进入Kafka会以null值形式存在,如果用Kafka Connector将订单明细封装为动态表,null值会被过滤,最终得到的是相同主键存在重复数据的Append流(动态表本质上就是流),满足Interval Join的条件。
    • Interval join只支持事件时间,因此数据必须携带水位线;建表时水位线的相关语法为 water for order_time as order_time - interval '5' second,这里要求数据是timestamp(3)
    • 原有的时间数据类型是bigint类型的ts,使用row_time as TO_TIMESTAMP_LTZ(ts,3)这个函数即可将原有的时间数据转换为水位线所需的数据类型
  3. 筛选支付数据:
    • 支付状态为支付成功
    • 操作类型为update
  4. 构建 LookUp 字典表
  5. 联上述三张表形成支付成功宽表,写入 Kafka 支付成功主题

核心代码如下

 public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//核心业务逻辑//1. 读取TopicDB主题数据createTopicDb(groupId,tableEnv);//2. 筛选支付成功的数据,从业务数据topic_db中filterPaymentTable(tableEnv);//3. 读取下单详情表数据, 从kafka读取数据createOrderDetailTable(tableEnv, groupId);//4. 创建base.dic字典表,从HBase维度数据中读取createBaseDic(tableEnv);//tableEnv.executeSql("select * from order_detail").print();//tableEnv.executeSql("select * from base_dic").print();//tableEnv.executeSql("select to_timestamp_ltz(ts,3) from order_detail");//5. 使用interval join 完成支付成功流和订单详情数据关联intervalJoin(tableEnv);//6. 使用lookup join完成维度退化Table resultTable = lookupJoin(tableEnv);//7. 创建upsert kafka连接器写出createKafkaSink(tableEnv);resultTable.insertInto(Constant.TOPIC_DWD_TRADE_ORDER_PAYMENT_SUCCESS).execute();}

事实表动态分流

在这里插入图片描述

dwd层其他的事实表都是从topic_db中去业务数据库一张表的变更数据,按照某些过滤后写入kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。有点类似我们前面实现DIM层的动态配置。

  1. 清洗过滤和转换:判断是否满足json格式,如果满足转换为jsonObj对象
  2. 读取配置表数据,使用flink-cdc读取
  3. 转换数据格式,转换到对应bean对象中
  4. 配置信息广播话,然后跟主流数据进行连接
  5. 筛选出需要的字段
  6. 根据表中的sink table字段来动态写出到对应的kafka主题中

核心代码如下

public static void main(String[] args) {new DwdBaseDb().start(10019, 4, "dwd_base_db", Constant.TOPIC_DB);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//核心业务逻辑//1. 读取topic_db数据//stream.print();//2. 清洗过滤和转换, jsonObjStream是主流数据SingleOutputStreamOperator<JSONObject> jsonObjStream = filterJson(stream);//jsonObjStream.print();//3. 读取配置表数据,使用flink-cdc读取,读取配置文件时并发度最好为1DataStreamSource<String> tableProcessDwd = getTableProcessDwd(env);//tableProcessDwd.print();4. 转换数据格式 string -> TableProcessDwd -> broadcastStream,广播流数据SingleOutputStreamOperator<TableProcessDwd> processDwdStream = getProcessDwdStream(tableProcessDwd);MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor = new MapStateDescriptor<>("process_state", String.class, TableProcessDwd.class);BroadcastStream<TableProcessDwd> broadcastStream = processDwdStream.broadcast(mapStateDescriptor);//5. 连接主流和广播流,对主流数据进行判断是否需要保留SingleOutputStreamOperator<Tuple2<JSONObject, TableProcessDwd>> processStream = processBaseDb(jsonObjStream, broadcastStream, mapStateDescriptor);//processStream.print();//6. 筛选最后需要写出的字段SingleOutputStreamOperator<JSONObject> dataStream = filterColumns(processStream);//7. 通过sink_table的表名来动态写出到对应kafka主题//在setRecordSerializer()设置dataStream.sinkTo(FlinkSinkUtil.getKafkaSinkWithTopicName());}

gitee地址 :https://gitee.com/langpaian/gmall2023-realtime

相关文章:

Flink电商实时数仓(六)

交易域支付成功事务事实表 从topic_db业务数据中筛选支付成功的数据从dwd_trade_order_detail主题中读取订单事实数据、LookUp字典表关联三张表形成支付成功宽表写入 Kafka 支付成功主题 执行步骤 设置ttl&#xff0c;通过Interval join实现左右流的状态管理获取下单明细数据…...

本地部署Jellyfin影音服务器并实现远程访问内网影音库

文章目录 1. 前言2. Jellyfin服务网站搭建2.1. Jellyfin下载和安装2.2. Jellyfin网页测试 3.本地网页发布3.1 cpolar的安装和注册3.2 Cpolar云端设置3.3 Cpolar本地设置 4.公网访问测试5. 结语 1. 前言 随着移动智能设备的普及&#xff0c;各种各样的使用需求也被开发出来&…...

【React Native】第一个Android应用

第一个Android应用 环境TIP开发工具环境及版本要求建议官方建议 安装 Android Studio首次安装模板选择安装 Android SDK配置 ANDROID_HOME 环境变量把一些工具目录添加到环境变量 Path[可选参数] 指定版本或项目模板 运行使用 Android 模拟器编译并运行 React Native 应用修改项…...

解决IOS transform rotate后文字无法显示,backface-visibility导致@click事件失效

问题一&#xff1a;IOS transform rotate后文字无法显示 网上搜到可以用backface-visibility:hidden来解决&#xff0c;这样做文字是出来了&#xff0c;但是click事件无效了。 问题二&#xff1a;backface-visibility导致click事件失效 在Vue中使用backface-visibility和cli…...

Nature | 大型语言模型(LLM)能够产生和发现新知识吗?

大型语言模型&#xff08;LLM&#xff09;是基于大量数据进行预训练的超大型深度学习模型。底层转换器是一组神经网络&#xff0c;这些神经网络由具有自注意力功能的编码器和解码器组成。编码器和解码器从一系列文本中提取含义&#xff0c;并理解其中的单词和短语之间的关系。通…...

多维时序 | MATLAB实CNN-Mutilhead-Attention卷积神经网络融合多头注意力机制多变量时间序列预测

多维时序 | MATLAB实CNN-Mutilhead-Attention卷积神经网络融合多头注意力机制多变量时间序列预测 目录 多维时序 | MATLAB实CNN-Mutilhead-Attention卷积神经网络融合多头注意力机制多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 多维时序 | …...

Nature 新研究发布,GPT 驱动的机器人化学家能够自行设计和进行实验,这对科研意味着什么?

文章目录 前言揭秘Coscientist不到四分钟&#xff0c;设计并改进了程序能力越大&#xff0c;责任越大 前言 有消息称&#xff0c;AI 大模型 “化学家” 登 Nature 能够自制阿司匹林、对乙酰氨基酚、布洛芬&#xff0c;甚至连复杂的钯催化交叉偶联反应&#xff0c;也能完成。 …...

Ai画板原理

在创建时画板可以选择数量和排列方式 也可以采用这个图片左上的画板工具&#xff0c;选择画板在其他地方画框即可生成&#xff0c;同时可以在属性框中可以修改尺寸大小 选择全部重新排列可以进行创建时的布局...

【hacker送书第11期】Python数据分析从入门到精通

探索数据世界&#xff0c;揭示未来趋势 《Python数据分析从入门到精通》是你掌握Python数据分析的理想选择。本书深入讲解核心工具如pandas、matplotlib和numpy&#xff0c;助您轻松处理和理解复杂数据。 通过matplotlib、seaborn和创新的pyecharts&#xff0c;本书呈现生动直…...

华为OD机试 - 精准核酸检测(Java JS Python C)

在线OJ刷题 题目详情 - 精准核酸检测 - Hydro 题目描述 为了达到新冠疫情精准防控的需要,为了避免全员核酸检测带来的浪费,需要精准圈定可能被感染的人群。 现在根据传染病流调以及大数据分析,得到了每个人之间在时间、空间上是否存在轨迹交叉。 现在给定一组确诊人员编…...

智能优化算法应用:基于材料生成算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于材料生成算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于材料生成算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.材料生成算法4.实验参数设定5.算法结果6.…...

【MySQL】:超详细MySQL完整安装和配置教程

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; MySQL从入门到进阶 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一. MySQL数据库1.1 版本1.2 下载1.3 安装1.4 客户端连接 &#x1f324;️全篇总…...

OpenAI亲授ChatGPT “屠龙术”!官方Prompt 工程指南来啦

应该如何形容 Prompt 工程呢&#xff1f;对于一个最开始使用 ChatGPT 的新人小白&#xff0c;面对据说参数量千亿万亿的庞然巨兽&#xff0c;Prompt 神秘的似乎像某种献祭&#xff1a;我扔进去几句话&#xff0c;等待聊天窗口后的“智慧生命”给我以神谕。 然而&#xff0c;上…...

最新ChatGPT商业运营网站程序源码,支持Midjourney绘画,GPT语音对话+DALL-E3文生图+文档对话总结

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…...

经验 | IDEA常用快捷键

1、编辑&#xff08;Editing&#xff09; Ctrl Space 基本的代码完成&#xff08;类、方法、属性&#xff09; Ctrl Alt Space 快速导入任意类 Ctrl Shift Enter 语句完成 Ctrl P 参数信息&#xff08;在方法中调用参数&#xff09; Ctrl Q 快速查看文档 Shift F…...

spark中 write.csv时, 添加第一行的标题title

在 Spark 中使用 write.csv 写入 CSV 文件时&#xff0c;默认情况下是不会在文件中添加标题行的。但是&#xff0c;你可以通过设置 header 选项来控制是否包含标题行。 下面是一个示例&#xff1a; val data Seq((1, "John", 28),(2, "Alice", 22),(3, …...

HTML美化网页

使用CSS3美化的原因 用css美化页面文本,使页面漂亮、美观、吸引用户 可以更好的突出页面的主题内容,使用户第一眼可以看到页面主要内容 具有良好的用户体验 <span>标签 作用 能让某几个文字或者某个词语凸显出来 有效的传递页面信息用css美化页面文本&#xff0c;使页面漂…...

nn.LSTM个人记录

简介 nn.LSTM参数 torch.nn.lstm(input_size, "输入的嵌入向量维度&#xff0c;例如每个单词用50维向量表示&#xff0c;input_size就是50"hidden_size, "隐藏层节点数量,也是输出的嵌入向量维度"num_layers, "lstm 隐层的层数&#xff0c;默认…...

vr虚拟高压电器三维仿真展示更立体全面

VR工业虚拟仿真软件的应用价值主要体现在以下几个方面&#xff1a; 降低成本&#xff1a;通过VR技术进行产品设计和开发&#xff0c;可以在虚拟环境中进行&#xff0c;从而减少对物理样机的依赖&#xff0c;降低试错成本和时间。此外&#xff0c;利用VR技术构建的模拟场景使用方…...

轮廓平滑方法

目录 1. 形态学操作 2. 边缘平滑化 3. 轮廓近似 python 有回归线平滑 2D 轮廓 1. 形态学操作 利用形态学操作&#xff08;例如腐蚀、膨胀、开运算、闭运算等&#xff09;可以使分割边界更加平滑和连续。腐蚀可以消除小的不连续区域&#xff0c;膨胀可以填充空洞&#xff0…...

5G工程师的日常:一次由OFDM边带EVM异常引发的‘破案’经历

5G工程师手记&#xff1a;解码OFDM边带EVM异常之谜 那天清晨&#xff0c;实验室的频谱分析仪上跳动的波形让我停下了手中的咖啡杯——在5G NR信号的边带区域&#xff0c;一个诡异的周期性EVM波动像心电图般规律闪烁。这不是教科书上的理想OFDM波形&#xff0c;而是一个活生生的…...

Cube Studio:革命性云原生AI平台,一站式解决机器学习全流程难题

Cube Studio&#xff1a;革命性云原生AI平台&#xff0c;一站式解决机器学习全流程难题 【免费下载链接】cube-studio cube studio开源云原生一站式机器学习/深度学习/大模型AI平台/MaaS/mlops/人工智能平台/训推平台&#xff0c;算法全链路流程&#xff0c;多租户&#xff0c;…...

基于RAG的智能招聘引擎:技术原理、实现与应用

1. 项目概述&#xff1a;一个面向人才招聘的智能RAG引擎最近在GitHub上看到一个挺有意思的项目&#xff0c;叫talent-rag-engine。光看名字&#xff0c;就能猜到个大概——这是一个专门为人才招聘场景设计的检索增强生成引擎。RAG&#xff08;Retrieval-Augmented Generation&a…...

基于MCP协议与RAG技术构建智能聊天应用:架构解析与实战指南

1. 项目概述&#xff1a;一个基于MCP协议的RAG聊天应用最近在开源社区里&#xff0c;一个名为gogabrielordonez/mcp-ragchat的项目引起了我的注意。乍一看标题&#xff0c;它融合了当下两个非常热门的技术概念&#xff1a;MCP和RAG。对于从事AI应用开发&#xff0c;特别是希望构…...

对抗测试框架:用字节码增强与混沌工程提升系统韧性

1. 项目概述&#xff1a;一个对抗测试的“剧院”最近在开源社区里&#xff0c;我注意到一个名字挺有意思的项目&#xff0c;叫nanami7777777/anti-test-theater。乍一看&#xff0c;这个标题有点让人摸不着头脑——“反测试剧院”&#xff1f;测试和剧院能扯上什么关系&#xf…...

赣州威视智投GEO优化服务

在数字化浪潮席卷的当下&#xff0c;赣州本地商家面临着线上曝光不足、流量少、排名靠后的经营难题。如何在激烈的市场竞争中脱颖而出&#xff0c;实现精准获客与稳定引流&#xff0c;成为众多商家亟待解决的问题。赣州威视智投科技有限公司&#xff08;以下简称“威视智投”&a…...

Swift集成飞书API:使用feishu-swift SDK构建高效机器人

1. 项目概述&#xff1a;一个连接飞书与Swift生态的桥梁 最近在折腾一个内部工具&#xff0c;需要把服务端的一些数据变动实时同步到飞书群里&#xff0c;方便团队同学及时跟进。服务端是用Swift写的&#xff0c;而飞书官方虽然有开放的API&#xff0c;但直接上手去调&#xf…...

告别玄学:给STM32/CH32V的SD卡SPI驱动加上超时、重试与状态机

从零构建工业级SD卡SPI驱动&#xff1a;超时重试与状态机设计实战 在嵌入式系统中&#xff0c;SD卡作为可靠的大容量存储介质被广泛应用。然而许多开发者都经历过这样的困境&#xff1a;实验室测试完美的SD卡驱动&#xff0c;一旦部署到真实环境中就频繁出现读写失败、卡死甚至…...

AI驱动编辑预设:智能调色与音频处理实战指南

1. 项目概述&#xff1a;AI驱动的编辑预设库最近在折腾视频和图片后期的时候&#xff0c;发现一个挺有意思的项目&#xff0c;叫kaushalrao/ai-editor-presets。光看名字&#xff0c;你可能觉得这又是一个普通的滤镜包或者调色预设合集。但深入用下来&#xff0c;我发现它的核心…...

微软UFO项目:基于视觉大模型的GUI自动化智能体实战解析

1. 项目概述&#xff1a;当“全能”AI助手遇见复杂任务编排 最近在AI应用开发圈里&#xff0c;一个来自微软研究院的项目“UFO”引起了我的注意。这名字听起来挺科幻&#xff0c;全称是“UI-Focused Agent”&#xff0c;直译过来是“专注于用户界面的智能体”。但别被这个直白的…...