当前位置: 首页 > news >正文

FlinkSQL之temporary join开发

2705c43b7f4f8013c53321f32a9a2fed.gif

在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在此文中,供各位同学参考与交流。

ffcd1b37f760b4430542917b5e7f8f77.png

背景介绍

关于实时flinkSQL的双流join的背景知识可以先阅读以下文章:

https://www.51cto.com/article/713922.html

目前我们有一条流量日志明细的TT流A,以及一条商品标签的TT流B,在flink中对A流和B流进行双流join类似于将A流关联一个hbase维表。temporary join有以下特点:

1. 单流驱动:虽然是双流join,但数据下发只由一条流驱动。

2. 需要定义versioned table,versioned table记录了每个时刻的属性信息,双流join时被动查询。类似于银行汇率表,在货币兑换的时候需要参考兑换时刻的汇率。

3. 查询携带时间版本信息:temporary join携带由两条流的watermark触发,因此查询到的属性是对应时间内的属性。

48804d99d95cf0aa49670aa4181b12e5.png

图片来源:孙金城, 《Blink 漫谈系列 - Temporal Table JOIN》

应用场景&实例分享

当需要根据实时汇率*货币金额计算总金额,实时商品价格*成交件数计算总成交金额时,经常会使用temporary join获取实时的汇率和价格信息。在笔者的流量升级业务迭代中,我们需要获取实时的商品标签,因此需要定义商品标签的versioned table,写法如下:

CREATE TEMPORARY TABLE `tag_ri` (`id` VARCHAR,`tag` VARCHAR,`time` VARCHAR,`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0) --定义watermark
) WITH ('connector' = 'tt','router' = '******','topic' = 'tag_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8'
);--定义version table
CREATE TEMPORARY VIEW `tag`
AS
SELECT `id`, `tag`, `time`, `ts`
FROM (    SELECT `id`, `tag`, `time`, `ts`, ROW_NUMBER() OVER (PARTITION BY `id` --关联主键ORDER BY `time` DESC) AS `rownum`FROM `tag_ri`)
WHERE `rownum` = 1;

同上我们也需要定义流量日志明细流的watermark,并进行双流join

CREATE TEMPORARY TABLE `log_ri` (`id` VARCHAR,`time` VARCHAR,......`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0)
) WITH ('connector' = 'tt','router' = '******','topic' = 'log_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8',
);select `a`.`id`,......,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`

结果如下:

--商品标签信息
12:00> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================  t1                 A12:30> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================t1                 B--流量明细日志查询 t1商品共三条明细
SELECT * FROM log_ri;id              time
=======          ========t1              12:00       t1              12:15       t1               12:30       --执行temporary join
select `a`.`id`,`a`.`time`,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`id               time              tag(商品标签)
=======          ========        =======================t1              12:00                   At1              12:15                   At1              12:30                   B
开发经验
  稀疏数据处理

由于temporary join是由两条流的watermark触发,如果versioned table是一条稀疏的流(在一段时间内无数据流入),那么join可能存在等待不下发数据的现象,可以通过设置参数 set table.exec.source.idle-timeout = 10s ,可以让A流数据不进行等待,具体参数介绍可以参考:

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

  数据延迟下发
  • 问题

在实际开发中,我们发现temporay join后数据一直等待不下发,整点才会进行下发的现象。

8f9f7e4845d6e74bf7cb6e93c76e09a0.jpeg

  • 原因分析

我们结合SQL语法,对TT日志进行回流分析:代码逻辑是四路source union后, join 定义的versioned table

select a.*,b.tag
from
(
select * from source_1 
union all 
select * from source_2
union all 
select * from source_3
union all 
select * from source_4
) a
temporay join 
b流

source_4会在整点流入少部分当前小时59分钟的数据,而temporay join 是由两边的watermark所触发,所以会有a流等待b流的时间到达当前小时59分钟后再触发的现象。

ccf9a4c7f6f2b304cbb642b493d40e34.jpeg

  • 解法

对source_4中log_time>当前时间的部分,做temporary join时将log_time置为当前时间,该问题就解决了。

7b3d2f9b26327c4027ac08d723febb8d.png

总结

1. 在单流驱动的双流join场景中,temporary join是一种常见的处理方式。

2. temporary join由两条流的watermark触发,需要对两条流的watermark进行预处理,防止数据稀疏和数据抢跑等现象影响数据下发。

5f2cad15473f0438ff03df1f6df8861d.png

参考资料

  • https://www.51cto.com/article/713922.html

  • https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

af16dc18390c18a051935ea9c0e343b9.png

团队介绍

我们是淘天集团-业务技术-商家数据团队,专注于开发和维护生意参谋这一全渠道、全链路、一站式的数据平台,同时也负责品牌数据银行和策略中心两大产品。旨在为商家提供全面的数据服务,包括但不限于经营分析、市场洞察、客群洞察等,以帮助商家提高商业决策效率。

¤ 拓展阅读 ¤

3DXR技术 | 终端技术 | 音视频技术

服务端技术 | 技术质量 | 数据算法

相关文章:

FlinkSQL之temporary join开发

在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在…...

第二十六节 直方图均衡化

图像直方图均衡化 图像直方图均衡化可以增强图像增强,对输入图像进行直方图均衡化处理,提升后续对象检测的准确率在Opencv人脸检测的代码演示中已经很常见了,此外对医学影像图像与卫星遥感图像也经常通过直方图均衡化来提升图像质量 Opencv…...

工单管理用什么工具好?8款推荐清单

本文推荐的8款项目工单管理系统有:1. PingCode; 2.Worktile; 3.Teambition; 4.致远OA; 5.TAPD; 6.Gitee; 7.Wrike; 8.Trello。 很多企业在处理项目工单时,依然依赖电子邮件、Excel表格,甚至是手动记录。这样做不仅效率低下,还容易导致工单遗漏…...

工地安全新突破:AI视频监控提升巡检与防护水平

在建筑工地和其他劳动密集型行业,工人的安全一直是管理工作的重中之重。为了确保工地的安全管理更加高效和智能化,AI视频监控卫士。通过人工智能技术,系统不仅能实时监控,还能自动识别工地现场的安全隐患,为工地管理者…...

World of Warcraft [CLASSIC][80][the Ulduar]

Ulduar 奥杜尔副本介绍 奥杜尔共计14个BOSS,通常说的10H就是10个苦难模式就是全通,9H就是除了【观察者奥尔加隆】,特别说明开启【观察者奥尔加隆】,是需要打掉困难模式4个守护者的。 所以人们经常说的类似“10H 观察者”、“10H…...

python实现数据库的增删改查功能,图形化版本

import tkinter from tkinter import * import psycopg2 from tkinter import messagebox#连接信息 t_conn{"dbname": "d1","user": "u1","password": "123qqq...A","port": "15400","h…...

pipeline开发笔记

pipeline开发笔记 jenkins常用插件Build Authorization Token Root配置GitLab的webhooks(钩子)配置构建触发器--示例 piblish over sshBlue OceanWorkspace Cleanup PluginGit插件PipelineLocalization: Chinese (Simplified) --中文显示Build Environment Plugin 显示构建过程…...

spark读取parquet文件

源码 parquet文件读取的入口是FileSourceScanExec,用parquet文件生成对应的RDD 非bucket文件所以走createNonBucketedReadRDD方法。 createNonBucketedReadRDD 过程: 确定文件分割参数 openCostInBytes4M 相关参数spark.sql.files.openCostInBytes4M…...

redis详细教程(1.String类型)

Redis 的 String 类型内部使用了一种叫做 SDS(Simple Dynamic String)的结构。SDS 的设计比传统的 C 语言字符串更加高效和安全,主要特点如下: 头部信息:SDS 的头部包含了一些元数据,比如字符串的长度、剩…...

用友U8接口-库存管理(7)

概括 本文的操作需要正确部署U8API主要讲述库存管理接口的使用,以产成品入库单作为说明,其他单据接口都是大同小异的!许多时候先在ERP做个单,然后仿造ERP单据参数,构造接口JSON参数是不错的做法。 获取Token访问令牌…...

Spring Boot HikariCP数据库连接池入门

1. 概述 在我们的项目中,数据库连接池基本是必不可少的组件。在目前数据库连接池的选型中,主要是 Druid ,为监控而生的数据库连接池。HikariCP ,号称性能最好的数据库连接池。 至于怎么选择,两者都非常优秀&#x…...

Docker快速上手教程:MacOS系统【安装/配置/使用/原理】全链路速通

背景 最近换了个 Macbook Air M3, 写个人项目需要用到 Docker,配置过程有一点点坎坷,还是得记录下避免重蹈覆辙。 什么。为什么是买 Air 而不是 Pro Max? 因为码农的钱也是钱啊。 这里我不会先讲原理,我认为工程的事情都是先看到现象,有了概念的轮廓,才应该去研究原理,…...

【JavaSE】认识String类,了解,进阶到熟练掌握

#1024程序员节 | 征文# 下面就让博主带领大家一起解决心中关于String类的疑问吧~~~ 1.字符串构造: 第一种和第二种(有一定的区别,在常量池上) public static void main(String[] args) { // 使用常量串构造 String s1 "h…...

vue3 vben-admin 窗口大小更改后 echarts尺寸变为 100px的问题

问题描述: 当切换切换tab 并且窗口尺寸更改时, echarts的尺寸因为父元素为 0, 自动设置为 100px 网上查找资料的结果: 1,使用vue 中的 v-if 来重新设置dom树 缺点: 频繁操作dom树结构, 极其消耗性能 优点: 自适应展示 2,设置固定宽高 缺点: 不能自适应展示, 无需消耗额外…...

Web应用框架-Django应用基础(3)-Jinja2

1.创建姓名模板 username里的数据发生改变&#xff0c;页面中渲染的数据发生改变&#xff0c;该效果称为动态数据 #hello/views:def hello_user(request):username000html <!DOCTYPE html><html lang"en"><head><meta charset"UTF-8&quo…...

js(深浅拷贝,节流防抖,this指向,改变this指向的方法)

一、深浅拷贝 1.基本数据类型和引用数据类型的区别&#xff1a; 1. 基本数据类型的变量存储的是值 引用数据类型的变量存储的是地址值 2. 基本数据类型的变量存储的值在栈内存 引用数据类型的变量存储的值在堆内存 3. 基本数据类型的变量存储的是值和值之间相互不影响 引用数据…...

香橙派5(RK3588)使用npu加速yolov5推理的部署过程

香橙派5使用npu加速yolov5推理的部署过程 硬件环境 部署过程 模型训练(x86主机) 在带nvidia显卡(最好)的主机上进行yolo的配置与训练, 获取最终的best.pt模型文件, 详见另一篇文档 模型转换(x86主机) 下载airockchip提供的yolov5(从pt到onnx) 一定要下这个版本的yolov5, …...

基于MWORKS的蓝桥杯「智能装备数字化建模大赛」正式发布,首期培训本周六开启

为强化装备数字化人才培养&#xff0c;推动装备数字化技术快速发展&#xff0c;第十六届蓝桥杯全国软件和信息技术专业人才大赛设置专项赛暨智能装备数字化建模大赛&#xff0c;使用MWORKS作为参赛软件。关于参赛软件授权、技术支持与培训、教材与案例开发支持、成果转化培训及…...

021、深入解析前端请求拦截器

目录 深入解析前端请求拦截器&#xff1a; 1. 引言 2. 核心实现与基础概念 2.1 基础拦截器实现 2.2 响应拦截器配置 3. 实际应用场景 3.1 完整的用户认证系统 3.2 文件上传系统 3.3 API请求缓存系统 3.4 请求重试机制 3.5 国际化处理 4. 性能优化实践 4.1 请求合并…...

windows中的tracert命令

在 Windows 操作系统中&#xff0c;tracert&#xff08;全称 Trace Route&#xff09;是一个用于确定 IP 数据包到达目标主机所经过的路径的命令行工具。它通过发送具有不同生存时间&#xff08;TTL&#xff09;的 ICMP&#xff08;Internet Control Message Protocol&#xff…...

RestClient

什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端&#xff0c;它允许HTTP与Elasticsearch 集群通信&#xff0c;而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级&#xff…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

el-switch文字内置

el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...

质量体系的重要

质量体系是为确保产品、服务或过程质量满足规定要求&#xff0c;由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面&#xff1a; &#x1f3db;️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限&#xff0c;形成层级清晰的管理网络&#xf…...

高等数学(下)题型笔记(八)空间解析几何与向量代数

目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

Web 架构之 CDN 加速原理与落地实践

文章目录 一、思维导图二、正文内容&#xff08;一&#xff09;CDN 基础概念1. 定义2. 组成部分 &#xff08;二&#xff09;CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 &#xff08;三&#xff09;CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 &#xf…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)

考察一般的三次多项式&#xff0c;以r为参数&#xff1a; p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]&#xff1b; 此多项式的根为&#xff1a; 尽管看起来这个多项式是特殊的&#xff0c;其实一般的三次多项式都是可以通过线性变换化为这个形式…...

【SpringBoot自动化部署】

SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一&#xff0c;能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时&#xff0c;需要添加Git仓库地址和凭证&#xff0c;设置构建触发器&#xff08;如GitHub…...