当前位置: 首页 > article >正文

Flink 数据清洗与字段标准化最佳实践

—— 构建可配置、可扩展的实时标准化清洗链路

本文是「Flink + Kafka 构建实时数仓实战」专栏的第 4 篇,将围绕字段标准化这一核心问题,从业务痛点、技术架构、配置设计到完整代码工程,系统讲透标准化实践。


📌 一、为什么实时字段标准化是数仓基石?

在真实业务中,数据往往来源于多个系统,字段命名不一致、取值不规范是常态:

字段原始值问题影响
platformios、iOS、苹果、android、安卓命名不一致报表维度混乱
gender男、male、M、1、女、F表达混杂用户标签识别异常
channel官网、weixin、AppStore、appstore无归一化推广渠道归因失败

如果不清洗、标准化,上层的指标分析、推荐、风控等全部都「靠不住」。


✅ 二、我们要构建怎样的标准化系统?

目标:

  • 支持多主题、多字段标准化

  • 配置驱动、动态字典更新

  • 高性能:广播状态替代外部维表 Join

  • 低耦合,适配不同业务领域(营销/风控/运营)


🧱 三、系统架构设计图

我们采用 Kafka → Flink 清洗标准化 → Kafka/Hudi 的结构:

          Kafka 多主题(event_log / user_action 等)↓Flink 主流数据流↓广播维表流(配置映射、标准化字典广播)↓BroadcastProcessFunction 字段标准化处理↓输出至 Kafka / DWS / Hudi


🧩 四、工程结构与配置文件设计(完整模板)

📂 工程目录结构

flink-standardize-demo/
├── pom.xml
├── src/main/java/com/demo/
│   ├── MainJob.java                    # Flink Job 启动类
│   ├── model/EventLog.java             # 业务字段模型
│   ├── util/SourceBuilder.java         # Kafka Source 封装
│   ├── util/SinkBuilder.java           # Kafka Sink 封装
│   ├── util/DictLoader.java            # 字典文件加载工具
│   └── func/StandardizeFunction.java   # 标准化函数(Broadcast)
└── resources/├── dicts/│   ├── dict_platform.json│   ├── dict_gender.json│   └── dict_channel.json└── mapping-config.json             # 字段-字典配置

🧾 配置样例一:字段映射(mapping-config.json

{"event_log": {"mappings": {"platform": "dict_platform","gender": "dict_gender","channel": "dict_channel"}},"user_action": {"mappings": {"os": "dict_platform","sex": "dict_gender"}}
}

🔍 每个 Kafka 主题可定义自己要标准化的字段,以及所使用的字典。


📁 配置样例二:标准化字典(如 dict_gender.json

{"男": "1","male": "1","M": "1","女": "2","female": "2","F": "2"
}

更多如 dict_platform.jsondict_channel.json 可类比定义。


🔧 五、核心实现:Flink Broadcast 标准化函数

1. 状态描述器初始化

MapStateDescriptor<String, Map<String, String>> dictStateDescriptor =new MapStateDescriptor<>("dictState", Types.STRING, Types.MAP(Types.STRING, Types.STRING));

2. 广播字典解析与更新

@Override
public void processBroadcastElement(Map<String, Map<String, String>> value, Context ctx, Collector<EventLog> out) throws Exception {BroadcastState<String, Map<String, String>> dictState = ctx.getBroadcastState(dictStateDescriptor);for (Map.Entry<String, Map<String, String>> entry : value.entrySet()) {dictState.put(entry.getKey(), entry.getValue());}
}

3. 主数据流字段标准化逻辑

@Override
public void processElement(EventLog value, ReadOnlyContext ctx, Collector<EventLog> out) throws Exception {ReadOnlyBroadcastState<String, Map<String, String>> dicts = ctx.getBroadcastState(dictStateDescriptor);Map<String, String> genderDict = dicts.get("dict_gender");if (genderDict != null && genderDict.containsKey(value.getGender())) {value.setGender(genderDict.get(value.getGender()));}out.collect(value);
}


🔄 六、字典热更新机制设计

更新方式实现推荐特点
Kafka 广播 Topic每天定时推送字典 JSON✅ 推荐,自动同步
外部 API 拉取Flink 自定义 Source适合高频更新字典
本地配置轮询FileSource + Map 更新简单、适合 PoC 测试

💼 七、真实业务落地建议

场景建议
多系统数据集成每个系统字段映射集中管理
跨业务复用字段字典可复用,映射配置拆分维护
字典频繁变动推荐 Kafka 热更新或外部 API 拉取
性能优化使用 Broadcast State 缓存,避免外部 Join

🧭 下一篇预告

第五篇:Flink 时态维度表 Join 与缓存机制实战

将聚焦实时数据与维度数据如何进行:

  • 广播状态 Join

  • Temporal Join 实现

  • 缓存刷新策略优化

相关文章:

Flink 数据清洗与字段标准化最佳实践

—— 构建可配置、可扩展的实时标准化清洗链路 本文是「Flink Kafka 构建实时数仓实战」专栏的第 4 篇&#xff0c;将围绕字段标准化这一核心问题&#xff0c;从业务痛点、技术架构、配置设计到完整代码工程&#xff0c;系统讲透标准化实践。 &#x1f4cc; 一、为什么实时字段…...

哈工大李治军《操作系统》进程同步与信号量笔记

1.什么是信号量&#xff1f; 定义&#xff1a;记录一些信息&#xff08;即量&#xff09;&#xff0c;并根据这个信息决定睡眠还是唤醒&#xff08;即信号&#xff09;。睡眠和唤醒只是一个信号&#xff08;相当于0和1&#xff09;。 2.问题&#xff1a;一种资源的数量是8&am…...

EasyRTC音视频实时通话在线教育解决方案:打造沉浸式互动教学新体验

一、方案概述 EasyRTC是一款基于WebRTC技术的实时音视频通信平台&#xff0c;为在线教育行业提供了高效、稳定、低延迟的互动教学解决方案。本方案将EasyRTC技术深度整合到在线教育场景中&#xff0c;实现师生间的实时音视频互动等核心功能&#xff0c;打造沉浸式的远程学习体…...

常见网络安全攻击类型深度剖析(三):DDoS攻击——分类、攻击机制及企业级防御策略

常见网络安全攻击类型深度剖析&#xff08;三&#xff09;&#xff1a;DDoS攻击——分类、攻击机制及企业级防御策略 在网络安全威胁中&#xff0c;分布式拒绝服务攻击&#xff08;Distributed Denial of Service, DDoS&#xff09;堪称“网络流量炸弹”。攻击者通过控制成百上…...

【分布式系统中的“瑞士军刀”_ Zookeeper】一、Zookeeper 快速入门和核心概念

在分布式系统的复杂世界里&#xff0c;协调与同步是确保系统稳定运行的关键所在。Zookeeper 作为分布式协调服务的 “瑞士军刀”&#xff0c;为众多分布式项目提供了高效、可靠的协调解决方案。无论是在分布式锁的实现、配置管理&#xff0c;还是在服务注册与发现等场景中&…...

Libconfig 修改配置文件里的某个节点

THCommandStatus ( { Status "1"; index 5; }, { Status "2"; index 8; }, { Status "3"; index 7; }, { Status "4"; index 0; } ); 比如这是配置文件的内容&#xff…...

从FP32到BF16,再到混合精度的全景解析

笔者做过目标检测模型、超分模型以及扩散生成模型。其中最常使用的是单精度FP32、半精度FP16、BF16。 双精度"FP64"就不说了&#xff0c;不太会用到。 #1. 单精度、半精度和混合精度 单精度&#xff08;FP32&#xff09;、半精度&#xff08;FP16&#xff09;和混合…...

Electron从入门到入门

项目说明 项目地址 项目地址&#xff1a;https://gitee.com/ruirui-study/electron-demo 本项目为示例项目&#xff0c;代码注释非常清晰&#xff0c;给大家当做入门项目吧。 其实很多东西都可以在我这基础上添加或修改、市面上有些已开源的项目&#xff0c;但是太臃肿了&am…...

优化提示词方面可以使用的数学方法理论:信息熵,概率论 ,最优化理论

优化提示词方面可以使用的数学方法理论:信息熵,概率论 ,最优化理论 目录 优化提示词方面可以使用的数学方法理论:信息熵,概率论 ,最优化理论信息论信息熵明确问题主题提供具体细节限定回答方向规范语言表达概率论最优化理论信息论 原理:信息论中的熵可以衡量信息的不确定性。…...

腾讯一面面经:总结一下

1. Java 中的 和 equals 有什么区别&#xff1f;比较对象时使用哪一个 1. 操作符&#xff1a; 用于比较对象的内存地址&#xff08;引用是否相同&#xff09;。 对于基本数据类型、 比较的是值。&#xff08;8种基本数据类型&#xff09;对于引用数据类型、 比较的是两个引…...

Golang | 倒排索引

文章目录 倒排索引的设计倒排索引v0版实现 倒排索引的设计 通用搜索引擎 v.s. 垂直搜索引擎&#xff1a; 通用搜索引擎&#xff1a;什么都可以搜索&#xff0c;更加智能化垂直搜索引擎&#xff1a;只能搜自家数据库里面的内容&#xff0c;一般都带着搜索条件&#xff0c;搜索一…...

大模型驱动智能服务变革:从全流程赋能到行业纵深落地

大模型技术的快速发展&#xff0c;正深刻改变着人工智能的研发与应用模式。作为"软硬协同、开箱即用"的智能化基础设施&#xff0c;大模型一体机通过整合计算硬件、部署平台和预置模型&#xff0c;重构了传统AI部署方式&#xff0c;成为推动AI普惠化和行业落地的重要…...

【Python-Day 5】Python 格式化输出实战:%、format()、f-string 对比与最佳实践

Langchain系列文章目录 01-玩转LangChain&#xff1a;从模型调用到Prompt模板与输出解析的完整指南 02-玩转 LangChain Memory 模块&#xff1a;四种记忆类型详解及应用场景全覆盖 03-全面掌握 LangChain&#xff1a;从核心链条构建到动态任务分配的实战指南 04-玩转 LangChai…...

【初识Trae】字节跳动推出的下一代AI原生IDE,重新定义智能编程

​ 初识官网文档 从官网可以看到有两个大标签页&#xff0c;即Trae IDE CN和Trae插件&#xff0c;这就说明Trae在发布Trae IDE的同时考虑到对主流IDE的插件支持&#xff0c;这一点非常有心&#xff0c;但是我估测Trae IDE的体验更好&#xff08;就是AI IDE出生&#xff0c;毕…...

装备制造企业选型:什么样的项目管理系统最合适?

个性化定制需求日益增加、项目周期长、供应链协同复杂、成本控制难度大、以及设计、生产、安装、售后等环节协同不畅。这些挑战使得装备制造企业在传统的管理方式捉襟见肘&#xff0c;迫切需要一套高效、智能的项目管理系统来提升运营效率和盈利能力。 那么&#xff0c;对于装…...

技术面试一面标准流程

0. 自我介绍 ...... 1. 拷打项目 项目干了啥&#xff1f; 难点是啥&#xff1f; 问项目中用到的东西&#xff1f; 扩展&#xff1f; ...... 2. 基础知识 数据结构、C基础、设计模式 数据结构&#xff1a; 堆&#xff1f; unordered_map 和 布隆过滤器 都是用于查找…...

【playwright】 page.wait_for_timeout() 和time.sleep()区别

page.wait_for_timeout() 和 time.sleep() 都可以用于在代码中引入延迟&#xff0c;但它们的实现方式和效果有一些关键区别。以下是两者的详细对比&#xff1a; 1. 实现方式 page.wait_for_timeout()&#xff1a; 是 Playwright 提供的一个内置方法&#xff0c;专门用于在 Play…...

常见网络安全攻击类型深度剖析(四):跨站脚本攻击(XSS)——分类、漏洞利用与前端安全防护

常见网络安全攻击类型深度剖析&#xff08;四&#xff09;&#xff1a;跨站脚本攻击&#xff08;XSS&#xff09;——分类、漏洞利用与前端安全防护 在Web应用安全中&#xff0c;跨站脚本攻击&#xff08;Cross-Site Scripting, XSS&#xff09;是攻击者利用浏览器漏洞&#x…...

QT多元素控件及其属性

Qt中提供的多元素控件有&#xff1a; QListWidget QListView QTableWidget QTableView QTreeWidget QTreeView widget和view多元素控件的区别&#xff1a; view是更底层的实现&#xff0c;widget是基于view封装而来&#xff0c;view是MVC结构的一种典型实现 MVC结构&am…...

如何快速高效学习Python?

如何快速高效学习Python&#xff1f; How to Fastly and Effectively Learn Python Programming? By JacksonML 1. Python年轻吗&#xff1f; Python自1991年诞生到现在&#xff0c;已经经历了三十四年或者更长时间了。毕竟&#xff0c;Python之父 – 吉多范罗苏姆先生(Gu…...

【网络原理】TCP提升效率机制(二):流量控制和拥塞控制

目录 一. 前言 二. 流量控制 三. 拥塞控制 一. 前言 TCP的可靠传输依靠确认应答机制&#xff0c;超时重传机制是对确认应答的一种补充&#xff0c;解决了丢包问题 为了提高传输效率&#xff0c;避免大量的时间都浪费在等待应答的过程&#xff0c;故引入了滑动窗口机制&…...

语音合成之六端到端TTS模型的演进

端到端TTS模型的演进 引言Tacotron&#xff1a;奠基之作FastSpeech&#xff1a;解决效率瓶颈VITS&#xff1a;实现高保真和富有表现力的语音SparkTTS&#xff1a;利用LLM实现高效可控的TTSCosyvoice&#xff1a;一种可扩展的多语种TTS方法端到端TTS模型的演进与未来方向 引言 …...

Properties配置文件

Properties(是一个特殊的Map)默认键值都是String类型 备注:Properties能调用Map中的所有方法,但由于放入Properties中的key-value都是String类型,Properties中提供了特殊的存值和取值的方法,所以尽量不要用Map中的方法,如下 Properties的作用 A、将内存中的数据写入到…...

C#高级语法--接口

先引用一些通俗一点的话语说明 1. 接口就像“插座标准”(解耦) 🧩 场景: 你家的手机充电器(USB-C、Lightning)必须插进匹配的插座才能充电。问题:如果每个手机品牌插座都不一样,你换手机就得换充电器,太麻烦了!💡 接口的作用: 定义一个通用的充电口标准(比如U…...

5.6 Microsoft Semantic Kernel:专注于将LLM集成到现有应用中的框架

5.6.1 Semantic Kernel概述 Microsoft Semantic Kernel&#xff08;以下简称SK&#xff09;是一个开源的软件开发工具包&#xff08;SDK&#xff09;&#xff0c;旨在帮助开发者将大型语言模型&#xff08;LLM&#xff09;无缝集成到现有的应用程序中。它支持C#、Python和Java…...

【尚硅谷Redis6】自用学习笔记

Redis介绍 Redis是单线程 多路IO复用技术&#xff08;类似黄牛买票&#xff09; 默认有16个库&#xff0c;用select进行切换 默认端口号为6379 Memcached&#xff1a;多线程 锁&#xff08;数据类型单一&#xff0c;不支持持久化&#xff09; 五大常用数据类型 Redis key …...

Vue里面elementUi-aside 和el-main不垂直排列

先说解决方法 main.js少导包 import element-ui/lib/theme-chalk/index.css; //加入此行即可 问题复现 排查了一个小时终于找出来问题了&#xff0c;建议导包去看官方的文档&#xff0c;作者就是因为看了别人的导包流程导致的问题 导包官网地址Element UI导包快速入门...

VS Code搭建C/C++开发环境

文章目录 一、VScode 是什么?二、VScode的下载和安装1、下载2、安装 三、环境介绍1、安装中文插件 四、VScode配置 C/C开发环境1、下载MinGW-w64 编译器套件2、配置MingGW643、验证4、安装C/C插件 五、在VSCode上编写C语言代码并编译成功1、打开文件夹2、新建C语言文件&#x…...

6.ArkUI Row的介绍和使用

ArkUI Row 组件介绍与使用指南 什么是 Row 组件&#xff1f; Row 是 ArkUI 中的基础布局容器组件&#xff0c;用于水平&#xff08;横向&#xff09;排列子组件。它与 Column 组件相对应&#xff0c;是构建用户界面最常用的布局方式之一&#xff0c;类似于其他UI框架中的水平…...

mysql 在 dbeaver中下载驱动失败处理

直接上解决方法 1. 在mysql官网下载驱动 2. 引入dbeaver中即可 3. 最后再双击即可...