Flink流批一体计算(24):Flink SQL之mysql维表实时关联
目录
1.维表
2.数据准备
创建源数据
创建维度表
创建Sink表
3.配置任务
Flink SQL创建kafka源表
Flink SQL创建MySQL维表
Flink SQL创建MySQL结果表
编写计算任务
核验数据
1.维表
目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。
本案例采用MySQL创建维表,与创建MySQL sink表语法相同。
2.数据准备
创建源数据
重启kafka,创建Topic: case_kafka_mysql
写入json格式的数据
{"ts": "20201011","id": 8,"price_amt":211}
创建维度表
在MySQL中创建名为product_dim的表
CREATE TABLE `product_dim` (`id` bigint(11) NOT NULL,`coupon_price_amt` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
向数据表插入如下数据:
INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
创建Sink表
在MySQL中创建名为sync_test_3的表
CREATE TABLE `sync_test_3` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
3.配置任务
Flink SQL创建kafka源表
create table flink_test_3 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test3','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');
Flink SQL创建MySQL维表
create table flink_test_3_dim (id BIGINT,coupon_price_amt BIGINT
)
WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'product_dim','username' = 'root','password' = 'Admin','lookup.max-retries' = '3','lookup.cache.max-rows' = 1000);
WITH参数
参数 | 说明 | 类型 | 备注 |
lookup.cache.max-rows | 指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。 | Integer | 默认情况下,维表Cache是未开启的。 |
lookup.cache.ttl | 指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。 | Duration | 默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows和 lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。 |
lookup.cache.caching-missing-key | 是否缓存空的查询结果。 | Boolean | 参数取值如下: true(默认值):缓存空的查询结果。 false:不缓存空的查询结果。 |
lookup.max-retries | 查询数据库失败的最大重试次数。 | Integer | 默认值为3。 |
Flink SQL创建MySQL结果表
CREATE TABLE sync_test_3 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_3','username' = 'root','password' = 'Admin');
编写计算任务
INSERT INTO sync_test_3
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_3 as aLEFT JOIN flink_test_3_dim FOR SYSTEM_TIME AS OF a.proctime as bON b.id = a.id)
GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;
相关文章:
Flink流批一体计算(24):Flink SQL之mysql维表实时关联
目录 1.维表 2.数据准备 创建源数据 创建维度表 创建Sink表 3.配置任务 Flink SQL创建kafka源表 Flink SQL创建MySQL维表 Flink SQL创建MySQL结果表 编写计算任务 核验数据 1.维表 目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎…...
鸿蒙(HarmonyOS)应用开发——从网络获取数据(题目答案)
判断题 1.在http模块中,多个请求可以使用同一个httpRequest对象,httpRequest对象可以复用。 错误(False) 2.使用http模块发起网络请求后,可以使用destroy方法中断网络请求。 正确(True) 3.Web组件onConfirm(callback: (event?: { url: …...
力扣:197. 上升的温度(Python3)
题目: 表: Weather ------------------------ | Column Name | Type | ------------------------ | id | int | | recordDate | date | | temperature | int | ------------------------ id 是该表具有唯一值的列。 该表…...

uniApp应用软件在运行时,不符合华为应用市场审核标准。解决方案合集!
(暂时用不到的也建议收藏一下,因为文章持续更新中) 最新更改时间:20023-12-10 第一次做App应用开发相信大家一定都遇到过华为应用市场审核的“驳回”! 有些问题一看就明白可以立马修改,而有一些问题修改意…...
c#编码技巧(十五):新语法糖record深入分析
c#编码技巧(十四):新语法糖record深入分析 从 C# 9 开始新增了一个关键字record,用于封装数据。 record实质是微软提供的一个语法糖,因很多开源项目都用到了这个关键字,说明这个语法糖比较实用。 那么这个record类型和普通class类…...

Java IO流(五)(字符集基础知识简介)
字符集 计算机的存储规则(英文字符) 常见字符集介绍 a.GB2312字符集:1980年发布,1981年5月1日实施的简体中文汉字编码国家标准。收录7445个图形字符,其中包括6763个简体汉字 b.BIG5字符集:台湾地区繁体中…...

周周爱学习之Redis重点总结
redis重点总结 在正常的业务流程中,用户发送请求,然后到缓存中查询数据。如果缓存中不存在数据的话,就会去数据库查询数据。数据库中有的话,就会更新缓存然后返回数据,数据库中也没有的话就会给用户返回一个空。 1.缓…...

免费的SEO外链发布工具,提升排名的利器
互联网已经成为信息传播和商业发展的重要平台。而对于拥有网站的个人、企业来说,如何让自己的网站在搜索引擎中脱颖而出?SEO(Search Engine Optimization)作为提高网站在搜索引擎中排名的关键手段. 什么是SEO外链? S…...
腾讯字节常考的linux命令
1 ps 1.1 ps -ef 有哪些字段 ps -ef 命令在Unix/Linux系统中用于显示当前运行的进程。输出的字段通常包括: UID:启动进程的用户ID。PID:进程ID。PPID:父进程ID。C:CPU利用率。STIME:进程启动时间。TTY&a…...

JAVA后端自学技能实操合集
JAVA后端自学技能实操 内容将会持续更新中,有需要添加什么内容可以再评论区留言,大家一起学习FastDFS使用docker安装FastDFS(linux)集成到springboot项目中 内容将会持续更新中,有需要添加什么内容可以再评论区留言,大家一起学习 FastDFS 组名:文件上传后所在的 st…...
C++ 关联容器
关联容器 关联容器支持高效的关键字查找和访问。 两个主要的关联容器(associative container)类型是 map 和 set。 map 中的元素是一些关键字——值对。 关键字起到索引的作用,值则表示与索引相关联的数据。 set 中的每个元素只包含一个关键…...
ES6之函数新增的扩展
参数 ES6允许为函数的参数设置默认值 function log(x, y World) {console.log(x, y); }console.log(Hello) // Hello World console.log(Hello, China) // Hello China console.log(Hello, ) // Hello函数的形参是默认声明的,不能使用let或const再次声明 functi…...

postgresql安装部署(docker版本)
1.在线部署 创建数据库存储目录 mkdir /home/pgdata创建容器 docker run --name postgresql --restartalways -d -p 5432:5432 -v /home/pgdata:/var/lib/postgresql/data --shm-size10g -e POSTGRES_PASSWORD密码 postgis/postgis:12-3.2-alpine–name为设置容器名称 -d表…...
【Python/Java/C++三种语言】20天拿下华为OD笔试之【位运算】2023B-出错的或电路【欧弟算法】全网注释最详细分类最全的华为OD真题
文章目录 题目描述与示例题目描述输入描述输出描述示例一输入输出说明 示例二输入输出说明 解题思路代码PythonJavaC时空复杂度 华为OD算法/大厂面试高频题算法练习冲刺训练 题目描述与示例 题目描述 某生产门电路的厂商发现某一批次的或门电路不稳定,具体现象为计…...

vscode 编译运行c++ 记录
一、打开文件夹,新建或打开一个cpp文件 二、ctrl shift p 进入 c/c配置 进行 IntelliSense 配置。主要是选择编译器、 c标准, 设置头文件路径等,配置好后会生成 c_cpp_properties.json; 二、编译运行: 1、选中ma…...

错题总结(四)
1.【一维数组】输入10个整数,求平均值 编写一个程序,从用户输入中读取10个整数并存储在一个数组中。然后,计算并输出这些整数的平均值。 int main() {int arr[10];int sum 0;for (int n 0; n < 10; n){scanf("%d", &arr…...

ORACLE使用Mybatis-plus批量插入
ORACLE使用mybatis-plus自带的iservice.saveBatch方法时,会报DML Returing cannot be batch错误: 推测原因是oracle不支持insert into table_name (,) values (,),()的写法。且oracle不会自动生…...
vue,uniapp的pdf等文件在线预览
vue,uniapp文件在线预览方案,用了个稍微偏门一点的方法实现了 通过后端生成文件查看页面,然后前端只要展示这个网页就行,uniapp就用web-view来展示,后台系统就直接window.open()打开就行 示例查看PDF文件,…...

SpringBoot 项目 Jar 包加密,防止反编译
1场景 最近项目要求部署到其他公司的服务器上,但是又不想将源码泄露出去。要求对正式环境的启动包进行安全性处理,防止客户直接通过反编译工具将代码反编译出来。 2方案 第一种方案使用代码混淆 采用proguard-maven-plugin插件 在单模块中此方案还算简…...
DockerFile中途执行出错的解决办法
DockerFile中途执行出错的解决办法 你们是否也曾经因为DockerFile中途执行出错,而对其束手无策?总是对docker避之不及! 但是当下载的源码运用到了docker,dockerFile 执行到一半,报错了怎么办? 现状 那么当DockerFile执行一半出错后,会产生什么结果呢? 如图可知,生成…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...

19c补丁后oracle属主变化,导致不能识别磁盘组
补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
C++ 基础特性深度解析
目录 引言 一、命名空间(namespace) C 中的命名空间 与 C 语言的对比 二、缺省参数 C 中的缺省参数 与 C 语言的对比 三、引用(reference) C 中的引用 与 C 语言的对比 四、inline(内联函数…...

2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...

dify打造数据可视化图表
一、概述 在日常工作和学习中,我们经常需要和数据打交道。无论是分析报告、项目展示,还是简单的数据洞察,一个清晰直观的图表,往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server,由蚂蚁集团 AntV 团队…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!
简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求,并检查收到的响应。它以以下模式之一…...