当前位置: 首页 > news >正文

FlinkSql 如何实现数据去重?

摘要

很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql

代码

--********************************************************************--
-- 创建临时表(只在当前sessoin生效的表称为临时表) DDL
CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record携带的源数据时间戳,参考官网kafka connectorproctime as PROCTIME() -- 获取数据处理时间,这是flink内置支持的关键字
) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka扫描数据模式,参考官网kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是将kafka数据格式化为string
);-- 创建SINKCREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
)
WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
);
-- 去重视图, 这是关键(json_value是flink的内置函数,data_hash是数据本身的primary key)
-- 下述语句含义是:根据data_hash字段分组,按照处理时间排序,取出最新的一条数据,其他的重复数据将被抛弃
CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;--  插入目标表
insert into ADB
select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode'  RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
from quchong;

相关文章:

FlinkSql 如何实现数据去重?

摘要 很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql 代码 --********************************************…...

机器学习概念

目录 一、人工智能、机器学习、深度学习的关系 二、什么是深度学习? 2.1 深度学习常用算法 一、人工智能、机器学习、深度学习的关系 人工智能、机器学习和深度学习的关系如下所示。 二、什么是深度学习? 深度学习( DL, Deep Learning) 是机器学习 …...

【数据结构】排序(插入、选择、交换、归并) -- 详解

一、排序的概念及其运用 1、排序的概念 排序:所谓排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作。 稳定性:假定在待排序的记录序列中,存在多个具有相同的关键字的记…...

游戏中的图片打包流程,免费的png打包plist工具,一款把若干资源图片拼接为一张大图的免费工具

手机游戏开发中,为了提高图片渲染性能,经常需要将小图片合并成一张大图进行渲染。如果手工来做的话就非常耗时。TexturePacker就是一款非常不错方便的处理工具。TexturePacker虽然非常优秀,但不是免费的。 对于打包流程,做游戏的…...

Springboot实现ENC加密

Springboot实现ENC加密 1、导入依赖2、配置加密秘钥&#xff08;盐&#xff09;3、获取并配置密文4、重启项目测试5、自定义前缀、后缀6、自定义加密方式 1、导入依赖 关于版本&#xff0c;需要根据spring-boot版本&#xff0c;自行修改 <dependency><groupId>co…...

nginx 托管vue项目配置

server {listen 80;server_name your_domain.com;location / {root /path/to/your/vue/project;index index.html;try_files $uri $uri/ /index.html;} }奇怪的现象,在vue路由中/会跳转到/abc/def&#xff0c;但如果直接输入/abc/def会显示404&#xff0c;添加 try_files $uri…...

Vue3中如何进行封装?—组件之间的传值

用了很久一段时间Vue3Ts了&#xff0c;工作中对一些常用的组件也进行了一些封装&#xff0c;这里对封装的一些方法进行一些简单的总结。 1.props传递 首先在主组件进行定义传值 <template><div>这里是主组件<common :first"first"></common&…...

实训笔记8.25

实训笔记8.25 8.25笔记一、Flume数据采集技术1.1 Flume实现数据采集主要借助Flume的组成架构1.2 Flume采集数据的时候&#xff0c;核心是编写Flume的采集脚本xxx.conf1.2.1 脚本文件主要由五部分组成 二、Flume案例实操2.1 采集一个网络端口的数据到控制台2.1.1 分析案例的组件…...

vue自定义监听元素宽高指令

在 main.js 中添加 // 自定义监听元素高度变化指令 const resizerMap new WeakMap() const resizeObserver new ResizeObserver((entries) > {for (const entry of entries) {const handle resizerMap.get(entry.target)if (handle) {handle({width: entry.borderBoxSiz…...

网络爬虫到底是个啥?

网络爬虫到底是个啥&#xff1f; 当涉及到网络爬虫技术时&#xff0c;需要考虑多个方面&#xff0c;从网页获取到最终的数据处理和分析&#xff0c;每个阶段都有不同的算法和策略。以下是这些方面的详细解释&#xff1a; 网页获取&#xff08;Web Crawling&#xff09;&#x…...

前端行级元素和块级元素的基本区别

块级元素和行内元素的基本区别是&#xff0c; 行内元素可以与其他行内元素并排&#xff1b;块级元素独占一行&#xff0c;不能与其他任何元素并列&#xff1b; 下面看一下&#xff1b; <!DOCTYPE html> <html> <head> <meta charset"utf-8"&…...

CentOS 7用二进制安装MySQL5.7

[rootlocalhost ~]# [rootlocalhost ~]# ll 总用量 662116 -rw-------. 1 root root 1401 8月 29 19:29 anaconda-ks.cfg -rw-r--r--. 1 root root 678001736 8月 29 19:44 mysql-5.7.40-linux-glibc2.12-x86_64.tar.gz [rootlocalhost ~]# tar xf mysql-5.7.40-linux-…...

华为加速回归Mate 60发布, 7nm全自研工艺芯片

华为于今天12:08推出“HUAWEI Mate 60 Pro先锋计划”&#xff0c;让部分消费者提前体验。在华为商城看到&#xff0c;华为Mate 60 pro手机已上架&#xff0c;售价6999元&#xff0c;提供雅川青、白沙银、南糯紫、雅丹黑四种配色供选择。 据介绍&#xff0c;华为在卫星通信领域…...

Linux系列讲解 —— 【systemd】下载及编译记录

Ubuntu18.04的init程序合并到了systemd中&#xff0c;本篇文章记录一下systemd的下载和编译。 1. 下载systemd源码 (1) 查看systemd版本号&#xff0c;用来确定需要下载的分支 sunsun-pc:~$ systemd --version systemd 237 PAM AUDIT SELINUX IMA APPARMOR SMACK SYSVINIT UT…...

u-view 的u-calendar 组件设置默认日期后,多次点击后,就不滚动到默认日期的位置

场景&#xff1a;uniapp开发微信小程序 vue2 uview版本&#xff1a;2.0.36 &#xff1b; u-calendar 组件设置默认日期后 我打开弹窗&#xff0c;再关闭弹窗&#xff0c; 重复两次 就不显示默认日期了 在源码中找到这个位置进行打印值&#xff0c;根据出bug前后的值进行…...

vue naive ui 按钮绑定按键

使用vue (naive ui) 绑定Enter 按键 知识点: 按键绑定Button全局挂载使得message,notification, dialog, loadingBar 等NaiveUI 生效UMD方式使用vue 与 naive ui将vue默认的 分隔符大括号 替换 为 [[ ]] <!DOCTYPE html> <html lang"en"> <head>…...

Viobot基本功能使用及介绍

设备拿到手当然是要先试一下效果的&#xff0c;这部分可以参考本专栏的第一篇 Viobot开机指南。 接下来我们就从UI开始熟悉这个产品吧&#xff01; 1.状态 设备上电会自动运行它的程序&#xff0c;开启了一个服务器&#xff0c;上位机通过连接这个服务器连接到设备&#xff0c…...

《PMBOK指南》第七版12大原则和8大绩效域

《PMBOK指南》第七版12大原则 原则1&#xff1a;成为勤勉、尊重和关心他人的管家 原则2&#xff1a;营造协作的项目团队环境 原则3&#xff1a;有效地干系人参与 原则4&#xff1a;聚焦于价值 原则5&#xff1a;识别、评估和响应系统交互 原则6&#xff1a;展现领导力行为…...

docker 启动命令

cd /ycw/docker docker build -f DockerFile -t jshepr:1.0 . #前面测试docker已经介绍过该命令下面就不再介绍了 docker images docker run -it -p 7003:9999 --name yyy -d jshepr:1.0 #上面运行报错 用这个 不报错就不用 docker rm yyy docker ps #查看项目日志 docker …...

C++ DAY7

一、类模板 建立一个通用的类&#xff0c;其类中的类型不确定&#xff0c;用一个虚拟类型替代 template<typename T> 类template ----->表示开始创建模板 typename -->表明后面的符号是数据类型&#xff0c;typename 也可以用class代替 T ----->表示数据类型…...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

遍历 Map 类型集合的方法汇总

1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装&#xff1b;只需暴露 19530&#xff08;gRPC&#xff09;与 9091&#xff08;HTTP/WebUI&#xff09;两个端口&#xff0c;即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...