Flink流批一体计算(24):Flink SQL之mysql维表实时关联
目录
1.维表
2.数据准备
创建源数据
创建维度表
创建Sink表
3.配置任务
Flink SQL创建kafka源表
Flink SQL创建MySQL维表
Flink SQL创建MySQL结果表
编写计算任务
核验数据
1.维表
目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。
本案例采用MySQL创建维表,与创建MySQL sink表语法相同。
2.数据准备
创建源数据
重启kafka,创建Topic: case_kafka_mysql
写入json格式的数据
{"ts": "20201011","id": 8,"price_amt":211}
创建维度表
在MySQL中创建名为product_dim的表
CREATE TABLE `product_dim` (`id` bigint(11) NOT NULL,`coupon_price_amt` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
向数据表插入如下数据:
INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
创建Sink表
在MySQL中创建名为sync_test_3的表
CREATE TABLE `sync_test_3` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
3.配置任务
Flink SQL创建kafka源表
create table flink_test_3 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test3','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');
Flink SQL创建MySQL维表
create table flink_test_3_dim (id BIGINT,coupon_price_amt BIGINT
)
WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'product_dim','username' = 'root','password' = 'Admin','lookup.max-retries' = '3','lookup.cache.max-rows' = 1000);
WITH参数
| 参数 | 说明 | 类型 | 备注 |
| lookup.cache.max-rows | 指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。 | Integer | 默认情况下,维表Cache是未开启的。 |
| lookup.cache.ttl | 指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。 | Duration | 默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows和 lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。 |
| lookup.cache.caching-missing-key | 是否缓存空的查询结果。 | Boolean | 参数取值如下: true(默认值):缓存空的查询结果。 false:不缓存空的查询结果。 |
| lookup.max-retries | 查询数据库失败的最大重试次数。 | Integer | 默认值为3。 |
Flink SQL创建MySQL结果表
CREATE TABLE sync_test_3 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_3','username' = 'root','password' = 'Admin');
编写计算任务
INSERT INTO sync_test_3
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_3 as aLEFT JOIN flink_test_3_dim FOR SYSTEM_TIME AS OF a.proctime as bON b.id = a.id)
GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;
相关文章:
Flink流批一体计算(24):Flink SQL之mysql维表实时关联
目录 1.维表 2.数据准备 创建源数据 创建维度表 创建Sink表 3.配置任务 Flink SQL创建kafka源表 Flink SQL创建MySQL维表 Flink SQL创建MySQL结果表 编写计算任务 核验数据 1.维表 目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎…...
鸿蒙(HarmonyOS)应用开发——从网络获取数据(题目答案)
判断题 1.在http模块中,多个请求可以使用同一个httpRequest对象,httpRequest对象可以复用。 错误(False) 2.使用http模块发起网络请求后,可以使用destroy方法中断网络请求。 正确(True) 3.Web组件onConfirm(callback: (event?: { url: …...
力扣:197. 上升的温度(Python3)
题目: 表: Weather ------------------------ | Column Name | Type | ------------------------ | id | int | | recordDate | date | | temperature | int | ------------------------ id 是该表具有唯一值的列。 该表…...
uniApp应用软件在运行时,不符合华为应用市场审核标准。解决方案合集!
(暂时用不到的也建议收藏一下,因为文章持续更新中) 最新更改时间:20023-12-10 第一次做App应用开发相信大家一定都遇到过华为应用市场审核的“驳回”! 有些问题一看就明白可以立马修改,而有一些问题修改意…...
c#编码技巧(十五):新语法糖record深入分析
c#编码技巧(十四):新语法糖record深入分析 从 C# 9 开始新增了一个关键字record,用于封装数据。 record实质是微软提供的一个语法糖,因很多开源项目都用到了这个关键字,说明这个语法糖比较实用。 那么这个record类型和普通class类…...
Java IO流(五)(字符集基础知识简介)
字符集 计算机的存储规则(英文字符) 常见字符集介绍 a.GB2312字符集:1980年发布,1981年5月1日实施的简体中文汉字编码国家标准。收录7445个图形字符,其中包括6763个简体汉字 b.BIG5字符集:台湾地区繁体中…...
周周爱学习之Redis重点总结
redis重点总结 在正常的业务流程中,用户发送请求,然后到缓存中查询数据。如果缓存中不存在数据的话,就会去数据库查询数据。数据库中有的话,就会更新缓存然后返回数据,数据库中也没有的话就会给用户返回一个空。 1.缓…...
免费的SEO外链发布工具,提升排名的利器
互联网已经成为信息传播和商业发展的重要平台。而对于拥有网站的个人、企业来说,如何让自己的网站在搜索引擎中脱颖而出?SEO(Search Engine Optimization)作为提高网站在搜索引擎中排名的关键手段. 什么是SEO外链? S…...
腾讯字节常考的linux命令
1 ps 1.1 ps -ef 有哪些字段 ps -ef 命令在Unix/Linux系统中用于显示当前运行的进程。输出的字段通常包括: UID:启动进程的用户ID。PID:进程ID。PPID:父进程ID。C:CPU利用率。STIME:进程启动时间。TTY&a…...
JAVA后端自学技能实操合集
JAVA后端自学技能实操 内容将会持续更新中,有需要添加什么内容可以再评论区留言,大家一起学习FastDFS使用docker安装FastDFS(linux)集成到springboot项目中 内容将会持续更新中,有需要添加什么内容可以再评论区留言,大家一起学习 FastDFS 组名:文件上传后所在的 st…...
C++ 关联容器
关联容器 关联容器支持高效的关键字查找和访问。 两个主要的关联容器(associative container)类型是 map 和 set。 map 中的元素是一些关键字——值对。 关键字起到索引的作用,值则表示与索引相关联的数据。 set 中的每个元素只包含一个关键…...
ES6之函数新增的扩展
参数 ES6允许为函数的参数设置默认值 function log(x, y World) {console.log(x, y); }console.log(Hello) // Hello World console.log(Hello, China) // Hello China console.log(Hello, ) // Hello函数的形参是默认声明的,不能使用let或const再次声明 functi…...
postgresql安装部署(docker版本)
1.在线部署 创建数据库存储目录 mkdir /home/pgdata创建容器 docker run --name postgresql --restartalways -d -p 5432:5432 -v /home/pgdata:/var/lib/postgresql/data --shm-size10g -e POSTGRES_PASSWORD密码 postgis/postgis:12-3.2-alpine–name为设置容器名称 -d表…...
【Python/Java/C++三种语言】20天拿下华为OD笔试之【位运算】2023B-出错的或电路【欧弟算法】全网注释最详细分类最全的华为OD真题
文章目录 题目描述与示例题目描述输入描述输出描述示例一输入输出说明 示例二输入输出说明 解题思路代码PythonJavaC时空复杂度 华为OD算法/大厂面试高频题算法练习冲刺训练 题目描述与示例 题目描述 某生产门电路的厂商发现某一批次的或门电路不稳定,具体现象为计…...
vscode 编译运行c++ 记录
一、打开文件夹,新建或打开一个cpp文件 二、ctrl shift p 进入 c/c配置 进行 IntelliSense 配置。主要是选择编译器、 c标准, 设置头文件路径等,配置好后会生成 c_cpp_properties.json; 二、编译运行: 1、选中ma…...
错题总结(四)
1.【一维数组】输入10个整数,求平均值 编写一个程序,从用户输入中读取10个整数并存储在一个数组中。然后,计算并输出这些整数的平均值。 int main() {int arr[10];int sum 0;for (int n 0; n < 10; n){scanf("%d", &arr…...
ORACLE使用Mybatis-plus批量插入
ORACLE使用mybatis-plus自带的iservice.saveBatch方法时,会报DML Returing cannot be batch错误: 推测原因是oracle不支持insert into table_name (,) values (,),()的写法。且oracle不会自动生…...
vue,uniapp的pdf等文件在线预览
vue,uniapp文件在线预览方案,用了个稍微偏门一点的方法实现了 通过后端生成文件查看页面,然后前端只要展示这个网页就行,uniapp就用web-view来展示,后台系统就直接window.open()打开就行 示例查看PDF文件,…...
SpringBoot 项目 Jar 包加密,防止反编译
1场景 最近项目要求部署到其他公司的服务器上,但是又不想将源码泄露出去。要求对正式环境的启动包进行安全性处理,防止客户直接通过反编译工具将代码反编译出来。 2方案 第一种方案使用代码混淆 采用proguard-maven-plugin插件 在单模块中此方案还算简…...
DockerFile中途执行出错的解决办法
DockerFile中途执行出错的解决办法 你们是否也曾经因为DockerFile中途执行出错,而对其束手无策?总是对docker避之不及! 但是当下载的源码运用到了docker,dockerFile 执行到一半,报错了怎么办? 现状 那么当DockerFile执行一半出错后,会产生什么结果呢? 如图可知,生成…...
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
五年级数学知识边界总结思考-下册
目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...
基于matlab策略迭代和值迭代法的动态规划
经典的基于策略迭代和值迭代法的动态规划matlab代码,实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...
【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...
接口自动化测试:HttpRunner基础
相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具,支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议,涵盖接口测试、性能测试、数字体验监测等测试类型…...
提升移动端网页调试效率:WebDebugX 与常见工具组合实践
在日常移动端开发中,网页调试始终是一个高频但又极具挑战的环节。尤其在面对 iOS 与 Android 的混合技术栈、各种设备差异化行为时,开发者迫切需要一套高效、可靠且跨平台的调试方案。过去,我们或多或少使用过 Chrome DevTools、Remote Debug…...
Sklearn 机器学习 缺失值处理 获取填充失值的统计值
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...
深入浅出JavaScript中的ArrayBuffer:二进制数据的“瑞士军刀”
深入浅出JavaScript中的ArrayBuffer:二进制数据的“瑞士军刀” 在JavaScript中,我们经常需要处理文本、数组、对象等数据类型。但当我们需要处理文件上传、图像处理、网络通信等场景时,单纯依赖字符串或数组就显得力不从心了。这时ÿ…...
