FlinkSql 如何实现数据去重?
摘要
很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql
代码
--********************************************************************--
-- 创建临时表(只在当前sessoin生效的表称为临时表) DDL
CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record携带的源数据时间戳,参考官网kafka connectorproctime as PROCTIME() -- 获取数据处理时间,这是flink内置支持的关键字
) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka扫描数据模式,参考官网kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是将kafka数据格式化为string
);-- 创建SINK 表
CREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
)
WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
);
-- 去重视图, 这是关键(json_value是flink的内置函数,data_hash是数据本身的primary key)
-- 下述语句含义是:根据data_hash字段分组,按照处理时间排序,取出最新的一条数据,其他的重复数据将被抛弃
CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;-- 插入目标表
insert into ADB
select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode' RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
from quchong;相关文章:
FlinkSql 如何实现数据去重?
摘要 很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql 代码 --********************************************…...
机器学习概念
目录 一、人工智能、机器学习、深度学习的关系 二、什么是深度学习? 2.1 深度学习常用算法 一、人工智能、机器学习、深度学习的关系 人工智能、机器学习和深度学习的关系如下所示。 二、什么是深度学习? 深度学习( DL, Deep Learning) 是机器学习 …...
【数据结构】排序(插入、选择、交换、归并) -- 详解
一、排序的概念及其运用 1、排序的概念 排序:所谓排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作。 稳定性:假定在待排序的记录序列中,存在多个具有相同的关键字的记…...
游戏中的图片打包流程,免费的png打包plist工具,一款把若干资源图片拼接为一张大图的免费工具
手机游戏开发中,为了提高图片渲染性能,经常需要将小图片合并成一张大图进行渲染。如果手工来做的话就非常耗时。TexturePacker就是一款非常不错方便的处理工具。TexturePacker虽然非常优秀,但不是免费的。 对于打包流程,做游戏的…...
Springboot实现ENC加密
Springboot实现ENC加密 1、导入依赖2、配置加密秘钥(盐)3、获取并配置密文4、重启项目测试5、自定义前缀、后缀6、自定义加密方式 1、导入依赖 关于版本,需要根据spring-boot版本,自行修改 <dependency><groupId>co…...
nginx 托管vue项目配置
server {listen 80;server_name your_domain.com;location / {root /path/to/your/vue/project;index index.html;try_files $uri $uri/ /index.html;} }奇怪的现象,在vue路由中/会跳转到/abc/def,但如果直接输入/abc/def会显示404,添加 try_files $uri…...
Vue3中如何进行封装?—组件之间的传值
用了很久一段时间Vue3Ts了,工作中对一些常用的组件也进行了一些封装,这里对封装的一些方法进行一些简单的总结。 1.props传递 首先在主组件进行定义传值 <template><div>这里是主组件<common :first"first"></common&…...
实训笔记8.25
实训笔记8.25 8.25笔记一、Flume数据采集技术1.1 Flume实现数据采集主要借助Flume的组成架构1.2 Flume采集数据的时候,核心是编写Flume的采集脚本xxx.conf1.2.1 脚本文件主要由五部分组成 二、Flume案例实操2.1 采集一个网络端口的数据到控制台2.1.1 分析案例的组件…...
vue自定义监听元素宽高指令
在 main.js 中添加 // 自定义监听元素高度变化指令 const resizerMap new WeakMap() const resizeObserver new ResizeObserver((entries) > {for (const entry of entries) {const handle resizerMap.get(entry.target)if (handle) {handle({width: entry.borderBoxSiz…...
网络爬虫到底是个啥?
网络爬虫到底是个啥? 当涉及到网络爬虫技术时,需要考虑多个方面,从网页获取到最终的数据处理和分析,每个阶段都有不同的算法和策略。以下是这些方面的详细解释: 网页获取(Web Crawling)&#x…...
前端行级元素和块级元素的基本区别
块级元素和行内元素的基本区别是, 行内元素可以与其他行内元素并排;块级元素独占一行,不能与其他任何元素并列; 下面看一下; <!DOCTYPE html> <html> <head> <meta charset"utf-8"&…...
CentOS 7用二进制安装MySQL5.7
[rootlocalhost ~]# [rootlocalhost ~]# ll 总用量 662116 -rw-------. 1 root root 1401 8月 29 19:29 anaconda-ks.cfg -rw-r--r--. 1 root root 678001736 8月 29 19:44 mysql-5.7.40-linux-glibc2.12-x86_64.tar.gz [rootlocalhost ~]# tar xf mysql-5.7.40-linux-…...
华为加速回归Mate 60发布, 7nm全自研工艺芯片
华为于今天12:08推出“HUAWEI Mate 60 Pro先锋计划”,让部分消费者提前体验。在华为商城看到,华为Mate 60 pro手机已上架,售价6999元,提供雅川青、白沙银、南糯紫、雅丹黑四种配色供选择。 据介绍,华为在卫星通信领域…...
Linux系列讲解 —— 【systemd】下载及编译记录
Ubuntu18.04的init程序合并到了systemd中,本篇文章记录一下systemd的下载和编译。 1. 下载systemd源码 (1) 查看systemd版本号,用来确定需要下载的分支 sunsun-pc:~$ systemd --version systemd 237 PAM AUDIT SELINUX IMA APPARMOR SMACK SYSVINIT UT…...
u-view 的u-calendar 组件设置默认日期后,多次点击后,就不滚动到默认日期的位置
场景:uniapp开发微信小程序 vue2 uview版本:2.0.36 ; u-calendar 组件设置默认日期后 我打开弹窗,再关闭弹窗, 重复两次 就不显示默认日期了 在源码中找到这个位置进行打印值,根据出bug前后的值进行…...
vue naive ui 按钮绑定按键
使用vue (naive ui) 绑定Enter 按键 知识点: 按键绑定Button全局挂载使得message,notification, dialog, loadingBar 等NaiveUI 生效UMD方式使用vue 与 naive ui将vue默认的 分隔符大括号 替换 为 [[ ]] <!DOCTYPE html> <html lang"en"> <head>…...
Viobot基本功能使用及介绍
设备拿到手当然是要先试一下效果的,这部分可以参考本专栏的第一篇 Viobot开机指南。 接下来我们就从UI开始熟悉这个产品吧! 1.状态 设备上电会自动运行它的程序,开启了一个服务器,上位机通过连接这个服务器连接到设备,…...
《PMBOK指南》第七版12大原则和8大绩效域
《PMBOK指南》第七版12大原则 原则1:成为勤勉、尊重和关心他人的管家 原则2:营造协作的项目团队环境 原则3:有效地干系人参与 原则4:聚焦于价值 原则5:识别、评估和响应系统交互 原则6:展现领导力行为…...
docker 启动命令
cd /ycw/docker docker build -f DockerFile -t jshepr:1.0 . #前面测试docker已经介绍过该命令下面就不再介绍了 docker images docker run -it -p 7003:9999 --name yyy -d jshepr:1.0 #上面运行报错 用这个 不报错就不用 docker rm yyy docker ps #查看项目日志 docker …...
C++ DAY7
一、类模板 建立一个通用的类,其类中的类型不确定,用一个虚拟类型替代 template<typename T> 类template ----->表示开始创建模板 typename -->表明后面的符号是数据类型,typename 也可以用class代替 T ----->表示数据类型…...
LuaJIT字节码逆向难题:LJD如何帮你恢复可读源码
LuaJIT字节码逆向难题:LJD如何帮你恢复可读源码 【免费下载链接】luajit-decompiler https://gitlab.com/znixian/luajit-decompiler 项目地址: https://gitcode.com/gh_mirrors/lu/luajit-decompiler 面对编译后的LuaJIT字节码文件,你是否曾困惑…...
如何在GTA5在线模式中保护自己?YimMenu安全增强菜单完整指南
如何在GTA5在线模式中保护自己?YimMenu安全增强菜单完整指南 【免费下载链接】YimMenu YimMenu, a GTA V menu protecting against a wide ranges of the public crashes and improving the overall experience. 项目地址: https://gitcode.com/GitHub_Trending/y…...
破冰总结:写给 QA 的一份 30 天 AI 技术转型学习路线图
写在前面:一个不得不面对的现实 打开招聘网站,搜索“高级QA工程师”,你会发现薪资最高的一批岗位都有同一个关键词:AI。不是指“用AI写测试用例”那种浮于表面的用法,而是要求你真正理解AI系统的工作原理、能评估模型输出质量、能设计对抗性测试方案、能把RAG管线部署到生…...
KMS_VL_ALL_AIO:一键激活Windows与Office的完整解决方案
KMS_VL_ALL_AIO:一键激活Windows与Office的完整解决方案 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 你是否曾经为Windows或Office的激活问题而烦恼?每次重装系统后都…...
3个企业级验证码识别架构设计:DdddOcr技术选型与性能优化策略
3个企业级验证码识别架构设计:DdddOcr技术选型与性能优化策略 【免费下载链接】ddddocr 带带弟弟 通用验证码识别OCR pypi版 项目地址: https://gitcode.com/gh_mirrors/dd/ddddocr 引言:验证码识别在企业自动化系统中的战略价值 在当今数字化时…...
ElevenLabs波兰语语音突然失真?3类高频报错代码+实时调试日志解析(含波兰语IPA音素校验表)
更多请点击: https://codechina.net 第一章:ElevenLabs波兰语语音失真现象的系统性定位 ElevenLabs 的 TTS 服务在处理波兰语时,部分用户报告出现音素断裂、重音偏移及辅音簇(如 szcz、 żdź)发音模糊等失真现象。此…...
ElevenLabs越南文TTS落地全链路:从API密钥配置、SSML控制到本地化韵律校准(含实测MOS评分对比)
更多请点击: https://codechina.net 第一章:ElevenLabs越南文TTS落地全链路概览 ElevenLabs 作为当前高保真语音合成领域的领先平台,其对越南语(vi-VN)的支持已进入生产就绪阶段。尽管官方文档未单独设立越南语专区&a…...
收藏! Harness 让你轻松驾驭大模型,小白也能写出高效代码
本文探讨了 AI 编程 Agent 的核心要素,强调 Harness(工具、流程和反馈系统)的重要性远超单纯依赖模型。通过实例说明,优化编辑格式等 Harness 设计可显著提升 Agent 成功率。文章提出,为 AI 准备更好的工作台ÿ…...
如何将Scrapeless MCP服务器集成到ZeroClaw中:逐步指南
关键要点: 一个TOML块将云浏览器连接到本地Rust代理。 ZeroClaw是一个单一二进制AI代理运行时,它与LLM提供者通信,监听30多个频道,并通过工具进行操作。只需在~/.zeroclaw/config.toml中添加四行[mcp]块即可添加Scrapeless MCP服…...
配置Hermes Agent使用自定义Taotoken作为模型供应商的步骤
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 配置Hermes Agent使用自定义Taotoken作为模型供应商的步骤 1. 准备工作:获取必要的凭证 在开始配置之前,你…...
