当前位置: 首页 > news >正文

FlinkSql 如何实现数据去重?

摘要

很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql

代码

--********************************************************************--
-- 创建临时表(只在当前sessoin生效的表称为临时表) DDL
CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record携带的源数据时间戳,参考官网kafka connectorproctime as PROCTIME() -- 获取数据处理时间,这是flink内置支持的关键字
) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka扫描数据模式,参考官网kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是将kafka数据格式化为string
);-- 创建SINKCREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
)
WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
);
-- 去重视图, 这是关键(json_value是flink的内置函数,data_hash是数据本身的primary key)
-- 下述语句含义是:根据data_hash字段分组,按照处理时间排序,取出最新的一条数据,其他的重复数据将被抛弃
CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;--  插入目标表
insert into ADB
select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode'  RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
from quchong;

相关文章:

FlinkSql 如何实现数据去重?

摘要 很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql 代码 --********************************************…...

机器学习概念

目录 一、人工智能、机器学习、深度学习的关系 二、什么是深度学习? 2.1 深度学习常用算法 一、人工智能、机器学习、深度学习的关系 人工智能、机器学习和深度学习的关系如下所示。 二、什么是深度学习? 深度学习( DL, Deep Learning) 是机器学习 …...

【数据结构】排序(插入、选择、交换、归并) -- 详解

一、排序的概念及其运用 1、排序的概念 排序:所谓排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作。 稳定性:假定在待排序的记录序列中,存在多个具有相同的关键字的记…...

游戏中的图片打包流程,免费的png打包plist工具,一款把若干资源图片拼接为一张大图的免费工具

手机游戏开发中,为了提高图片渲染性能,经常需要将小图片合并成一张大图进行渲染。如果手工来做的话就非常耗时。TexturePacker就是一款非常不错方便的处理工具。TexturePacker虽然非常优秀,但不是免费的。 对于打包流程,做游戏的…...

Springboot实现ENC加密

Springboot实现ENC加密 1、导入依赖2、配置加密秘钥&#xff08;盐&#xff09;3、获取并配置密文4、重启项目测试5、自定义前缀、后缀6、自定义加密方式 1、导入依赖 关于版本&#xff0c;需要根据spring-boot版本&#xff0c;自行修改 <dependency><groupId>co…...

nginx 托管vue项目配置

server {listen 80;server_name your_domain.com;location / {root /path/to/your/vue/project;index index.html;try_files $uri $uri/ /index.html;} }奇怪的现象,在vue路由中/会跳转到/abc/def&#xff0c;但如果直接输入/abc/def会显示404&#xff0c;添加 try_files $uri…...

Vue3中如何进行封装?—组件之间的传值

用了很久一段时间Vue3Ts了&#xff0c;工作中对一些常用的组件也进行了一些封装&#xff0c;这里对封装的一些方法进行一些简单的总结。 1.props传递 首先在主组件进行定义传值 <template><div>这里是主组件<common :first"first"></common&…...

实训笔记8.25

实训笔记8.25 8.25笔记一、Flume数据采集技术1.1 Flume实现数据采集主要借助Flume的组成架构1.2 Flume采集数据的时候&#xff0c;核心是编写Flume的采集脚本xxx.conf1.2.1 脚本文件主要由五部分组成 二、Flume案例实操2.1 采集一个网络端口的数据到控制台2.1.1 分析案例的组件…...

vue自定义监听元素宽高指令

在 main.js 中添加 // 自定义监听元素高度变化指令 const resizerMap new WeakMap() const resizeObserver new ResizeObserver((entries) > {for (const entry of entries) {const handle resizerMap.get(entry.target)if (handle) {handle({width: entry.borderBoxSiz…...

网络爬虫到底是个啥?

网络爬虫到底是个啥&#xff1f; 当涉及到网络爬虫技术时&#xff0c;需要考虑多个方面&#xff0c;从网页获取到最终的数据处理和分析&#xff0c;每个阶段都有不同的算法和策略。以下是这些方面的详细解释&#xff1a; 网页获取&#xff08;Web Crawling&#xff09;&#x…...

前端行级元素和块级元素的基本区别

块级元素和行内元素的基本区别是&#xff0c; 行内元素可以与其他行内元素并排&#xff1b;块级元素独占一行&#xff0c;不能与其他任何元素并列&#xff1b; 下面看一下&#xff1b; <!DOCTYPE html> <html> <head> <meta charset"utf-8"&…...

CentOS 7用二进制安装MySQL5.7

[rootlocalhost ~]# [rootlocalhost ~]# ll 总用量 662116 -rw-------. 1 root root 1401 8月 29 19:29 anaconda-ks.cfg -rw-r--r--. 1 root root 678001736 8月 29 19:44 mysql-5.7.40-linux-glibc2.12-x86_64.tar.gz [rootlocalhost ~]# tar xf mysql-5.7.40-linux-…...

华为加速回归Mate 60发布, 7nm全自研工艺芯片

华为于今天12:08推出“HUAWEI Mate 60 Pro先锋计划”&#xff0c;让部分消费者提前体验。在华为商城看到&#xff0c;华为Mate 60 pro手机已上架&#xff0c;售价6999元&#xff0c;提供雅川青、白沙银、南糯紫、雅丹黑四种配色供选择。 据介绍&#xff0c;华为在卫星通信领域…...

Linux系列讲解 —— 【systemd】下载及编译记录

Ubuntu18.04的init程序合并到了systemd中&#xff0c;本篇文章记录一下systemd的下载和编译。 1. 下载systemd源码 (1) 查看systemd版本号&#xff0c;用来确定需要下载的分支 sunsun-pc:~$ systemd --version systemd 237 PAM AUDIT SELINUX IMA APPARMOR SMACK SYSVINIT UT…...

u-view 的u-calendar 组件设置默认日期后,多次点击后,就不滚动到默认日期的位置

场景&#xff1a;uniapp开发微信小程序 vue2 uview版本&#xff1a;2.0.36 &#xff1b; u-calendar 组件设置默认日期后 我打开弹窗&#xff0c;再关闭弹窗&#xff0c; 重复两次 就不显示默认日期了 在源码中找到这个位置进行打印值&#xff0c;根据出bug前后的值进行…...

vue naive ui 按钮绑定按键

使用vue (naive ui) 绑定Enter 按键 知识点: 按键绑定Button全局挂载使得message,notification, dialog, loadingBar 等NaiveUI 生效UMD方式使用vue 与 naive ui将vue默认的 分隔符大括号 替换 为 [[ ]] <!DOCTYPE html> <html lang"en"> <head>…...

Viobot基本功能使用及介绍

设备拿到手当然是要先试一下效果的&#xff0c;这部分可以参考本专栏的第一篇 Viobot开机指南。 接下来我们就从UI开始熟悉这个产品吧&#xff01; 1.状态 设备上电会自动运行它的程序&#xff0c;开启了一个服务器&#xff0c;上位机通过连接这个服务器连接到设备&#xff0c…...

《PMBOK指南》第七版12大原则和8大绩效域

《PMBOK指南》第七版12大原则 原则1&#xff1a;成为勤勉、尊重和关心他人的管家 原则2&#xff1a;营造协作的项目团队环境 原则3&#xff1a;有效地干系人参与 原则4&#xff1a;聚焦于价值 原则5&#xff1a;识别、评估和响应系统交互 原则6&#xff1a;展现领导力行为…...

docker 启动命令

cd /ycw/docker docker build -f DockerFile -t jshepr:1.0 . #前面测试docker已经介绍过该命令下面就不再介绍了 docker images docker run -it -p 7003:9999 --name yyy -d jshepr:1.0 #上面运行报错 用这个 不报错就不用 docker rm yyy docker ps #查看项目日志 docker …...

C++ DAY7

一、类模板 建立一个通用的类&#xff0c;其类中的类型不确定&#xff0c;用一个虚拟类型替代 template<typename T> 类template ----->表示开始创建模板 typename -->表明后面的符号是数据类型&#xff0c;typename 也可以用class代替 T ----->表示数据类型…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲

文章目录 前言第一部分&#xff1a;体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分&#xff1a;体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…...

day36-多路IO复用

一、基本概念 &#xff08;服务器多客户端模型&#xff09; 定义&#xff1a;单线程或单进程同时监测若干个文件描述符是否可以执行IO操作的能力 作用&#xff1a;应用程序通常需要处理来自多条事件流中的事件&#xff0c;比如我现在用的电脑&#xff0c;需要同时处理键盘鼠标…...

第7篇:中间件全链路监控与 SQL 性能分析实践

7.1 章节导读 在构建数据库中间件的过程中&#xff0c;可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中&#xff0c;必须做到&#xff1a; &#x1f50d; 追踪每一条 SQL 的生命周期&#xff08;从入口到数据库执行&#xff09;&#…...

LOOI机器人的技术实现解析:从手势识别到边缘检测

LOOI机器人作为一款创新的AI硬件产品&#xff0c;通过将智能手机转变为具有情感交互能力的桌面机器人&#xff0c;展示了前沿AI技术与传统硬件设计的完美结合。作为AI与玩具领域的专家&#xff0c;我将全面解析LOOI的技术实现架构&#xff0c;特别是其手势识别、物体识别和环境…...