当前位置: 首页 > news >正文

FlinkSql 如何实现数据去重?

摘要

很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql

代码

--********************************************************************--
-- 创建临时表(只在当前sessoin生效的表称为临时表) DDL
CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record携带的源数据时间戳,参考官网kafka connectorproctime as PROCTIME() -- 获取数据处理时间,这是flink内置支持的关键字
) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka扫描数据模式,参考官网kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是将kafka数据格式化为string
);-- 创建SINKCREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
)
WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
);
-- 去重视图, 这是关键(json_value是flink的内置函数,data_hash是数据本身的primary key)
-- 下述语句含义是:根据data_hash字段分组,按照处理时间排序,取出最新的一条数据,其他的重复数据将被抛弃
CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;--  插入目标表
insert into ADB
select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode'  RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
from quchong;

相关文章:

FlinkSql 如何实现数据去重?

摘要 很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql 代码 --********************************************…...

机器学习概念

目录 一、人工智能、机器学习、深度学习的关系 二、什么是深度学习? 2.1 深度学习常用算法 一、人工智能、机器学习、深度学习的关系 人工智能、机器学习和深度学习的关系如下所示。 二、什么是深度学习? 深度学习( DL, Deep Learning) 是机器学习 …...

【数据结构】排序(插入、选择、交换、归并) -- 详解

一、排序的概念及其运用 1、排序的概念 排序:所谓排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作。 稳定性:假定在待排序的记录序列中,存在多个具有相同的关键字的记…...

游戏中的图片打包流程,免费的png打包plist工具,一款把若干资源图片拼接为一张大图的免费工具

手机游戏开发中,为了提高图片渲染性能,经常需要将小图片合并成一张大图进行渲染。如果手工来做的话就非常耗时。TexturePacker就是一款非常不错方便的处理工具。TexturePacker虽然非常优秀,但不是免费的。 对于打包流程,做游戏的…...

Springboot实现ENC加密

Springboot实现ENC加密 1、导入依赖2、配置加密秘钥&#xff08;盐&#xff09;3、获取并配置密文4、重启项目测试5、自定义前缀、后缀6、自定义加密方式 1、导入依赖 关于版本&#xff0c;需要根据spring-boot版本&#xff0c;自行修改 <dependency><groupId>co…...

nginx 托管vue项目配置

server {listen 80;server_name your_domain.com;location / {root /path/to/your/vue/project;index index.html;try_files $uri $uri/ /index.html;} }奇怪的现象,在vue路由中/会跳转到/abc/def&#xff0c;但如果直接输入/abc/def会显示404&#xff0c;添加 try_files $uri…...

Vue3中如何进行封装?—组件之间的传值

用了很久一段时间Vue3Ts了&#xff0c;工作中对一些常用的组件也进行了一些封装&#xff0c;这里对封装的一些方法进行一些简单的总结。 1.props传递 首先在主组件进行定义传值 <template><div>这里是主组件<common :first"first"></common&…...

实训笔记8.25

实训笔记8.25 8.25笔记一、Flume数据采集技术1.1 Flume实现数据采集主要借助Flume的组成架构1.2 Flume采集数据的时候&#xff0c;核心是编写Flume的采集脚本xxx.conf1.2.1 脚本文件主要由五部分组成 二、Flume案例实操2.1 采集一个网络端口的数据到控制台2.1.1 分析案例的组件…...

vue自定义监听元素宽高指令

在 main.js 中添加 // 自定义监听元素高度变化指令 const resizerMap new WeakMap() const resizeObserver new ResizeObserver((entries) > {for (const entry of entries) {const handle resizerMap.get(entry.target)if (handle) {handle({width: entry.borderBoxSiz…...

网络爬虫到底是个啥?

网络爬虫到底是个啥&#xff1f; 当涉及到网络爬虫技术时&#xff0c;需要考虑多个方面&#xff0c;从网页获取到最终的数据处理和分析&#xff0c;每个阶段都有不同的算法和策略。以下是这些方面的详细解释&#xff1a; 网页获取&#xff08;Web Crawling&#xff09;&#x…...

前端行级元素和块级元素的基本区别

块级元素和行内元素的基本区别是&#xff0c; 行内元素可以与其他行内元素并排&#xff1b;块级元素独占一行&#xff0c;不能与其他任何元素并列&#xff1b; 下面看一下&#xff1b; <!DOCTYPE html> <html> <head> <meta charset"utf-8"&…...

CentOS 7用二进制安装MySQL5.7

[rootlocalhost ~]# [rootlocalhost ~]# ll 总用量 662116 -rw-------. 1 root root 1401 8月 29 19:29 anaconda-ks.cfg -rw-r--r--. 1 root root 678001736 8月 29 19:44 mysql-5.7.40-linux-glibc2.12-x86_64.tar.gz [rootlocalhost ~]# tar xf mysql-5.7.40-linux-…...

华为加速回归Mate 60发布, 7nm全自研工艺芯片

华为于今天12:08推出“HUAWEI Mate 60 Pro先锋计划”&#xff0c;让部分消费者提前体验。在华为商城看到&#xff0c;华为Mate 60 pro手机已上架&#xff0c;售价6999元&#xff0c;提供雅川青、白沙银、南糯紫、雅丹黑四种配色供选择。 据介绍&#xff0c;华为在卫星通信领域…...

Linux系列讲解 —— 【systemd】下载及编译记录

Ubuntu18.04的init程序合并到了systemd中&#xff0c;本篇文章记录一下systemd的下载和编译。 1. 下载systemd源码 (1) 查看systemd版本号&#xff0c;用来确定需要下载的分支 sunsun-pc:~$ systemd --version systemd 237 PAM AUDIT SELINUX IMA APPARMOR SMACK SYSVINIT UT…...

u-view 的u-calendar 组件设置默认日期后,多次点击后,就不滚动到默认日期的位置

场景&#xff1a;uniapp开发微信小程序 vue2 uview版本&#xff1a;2.0.36 &#xff1b; u-calendar 组件设置默认日期后 我打开弹窗&#xff0c;再关闭弹窗&#xff0c; 重复两次 就不显示默认日期了 在源码中找到这个位置进行打印值&#xff0c;根据出bug前后的值进行…...

vue naive ui 按钮绑定按键

使用vue (naive ui) 绑定Enter 按键 知识点: 按键绑定Button全局挂载使得message,notification, dialog, loadingBar 等NaiveUI 生效UMD方式使用vue 与 naive ui将vue默认的 分隔符大括号 替换 为 [[ ]] <!DOCTYPE html> <html lang"en"> <head>…...

Viobot基本功能使用及介绍

设备拿到手当然是要先试一下效果的&#xff0c;这部分可以参考本专栏的第一篇 Viobot开机指南。 接下来我们就从UI开始熟悉这个产品吧&#xff01; 1.状态 设备上电会自动运行它的程序&#xff0c;开启了一个服务器&#xff0c;上位机通过连接这个服务器连接到设备&#xff0c…...

《PMBOK指南》第七版12大原则和8大绩效域

《PMBOK指南》第七版12大原则 原则1&#xff1a;成为勤勉、尊重和关心他人的管家 原则2&#xff1a;营造协作的项目团队环境 原则3&#xff1a;有效地干系人参与 原则4&#xff1a;聚焦于价值 原则5&#xff1a;识别、评估和响应系统交互 原则6&#xff1a;展现领导力行为…...

docker 启动命令

cd /ycw/docker docker build -f DockerFile -t jshepr:1.0 . #前面测试docker已经介绍过该命令下面就不再介绍了 docker images docker run -it -p 7003:9999 --name yyy -d jshepr:1.0 #上面运行报错 用这个 不报错就不用 docker rm yyy docker ps #查看项目日志 docker …...

C++ DAY7

一、类模板 建立一个通用的类&#xff0c;其类中的类型不确定&#xff0c;用一个虚拟类型替代 template<typename T> 类template ----->表示开始创建模板 typename -->表明后面的符号是数据类型&#xff0c;typename 也可以用class代替 T ----->表示数据类型…...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

(十)学生端搭建

本次旨在将之前的已完成的部分功能进行拼装到学生端&#xff0c;同时完善学生端的构建。本次工作主要包括&#xff1a; 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...

golang循环变量捕获问题​​

在 Go 语言中&#xff0c;当在循环中启动协程&#xff08;goroutine&#xff09;时&#xff0c;如果在协程闭包中直接引用循环变量&#xff0c;可能会遇到一个常见的陷阱 - ​​循环变量捕获问题​​。让我详细解释一下&#xff1a; 问题背景 看这个代码片段&#xff1a; fo…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引&#xff0c;可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度&#xff08;创建索引的主要原因&#xff09;。3. 可以加速表和表之间的连接&#xff0c;实现数据的参考完整性。4. 可以在查询过程中&#xff0c;…...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件

今天呢&#xff0c;博主的学习进度也是步入了Java Mybatis 框架&#xff0c;目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学&#xff0c;希望能对大家有所帮助&#xff0c;也特别欢迎大家指点不足之处&#xff0c;小生很乐意接受正确的建议&…...

Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具

文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...

Nuxt.js 中的路由配置详解

Nuxt.js 通过其内置的路由系统简化了应用的路由配置&#xff0c;使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互

引擎版本&#xff1a; 3.8.1 语言&#xff1a; JavaScript/TypeScript、C、Java 环境&#xff1a;Window 参考&#xff1a;Java原生反射机制 您好&#xff0c;我是鹤九日&#xff01; 回顾 在上篇文章中&#xff1a;CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...