当前位置: 首页 > news >正文

Flink流批一体计算(24):Flink SQL之mysql维表实时关联

目录

1.维表

2.数据准备

创建源数据

创建维度表

创建Sink表

3.配置任务

Flink SQL创建kafka源表

Flink SQL创建MySQL维表

Flink SQL创建MySQL结果表

编写计算任务

核验数据


1.维表

目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。

本案例采用MySQL创建维表,与创建MySQL sink表语法相同。

2.数据准备

创建源数据

重启kafka,创建Topic:  case_kafka_mysql

写入json格式的数据

  {"ts": "20201011","id": 8,"price_amt":211}

创建维度表

在MySQL中创建名为product_dim的表

CREATE TABLE `product_dim` (`id` bigint(11) NOT NULL,`coupon_price_amt` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

向数据表插入如下数据:

INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
创建Sink表

在MySQL中创建名为sync_test_3的表

CREATE TABLE `sync_test_3` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

3.配置任务

Flink SQL创建kafka源表
create table flink_test_3 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test3','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');
Flink SQL创建MySQL维表
create table flink_test_3_dim (id BIGINT,coupon_price_amt BIGINT
)
WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'product_dim','username' = 'root','password' = 'Admin','lookup.max-retries' = '3','lookup.cache.max-rows' = 1000);

WITH参数

参数

说明

类型

备注

lookup.cache.max-rows

指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。

Integer

默认情况下,维表Cache是未开启的。

lookup.cache.ttl

指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。

Duration

默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。

lookup.cache.caching-missing-key

是否缓存空的查询结果。

Boolean

参数取值如下:

true(默认值):缓存空的查询结果。

false:不缓存空的查询结果。

lookup.max-retries

查询数据库失败的最大重试次数。

Integer

默认值为3

Flink SQL创建MySQL结果表
CREATE TABLE sync_test_3 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_3','username' = 'root','password' = 'Admin');
编写计算任务
INSERT INTO sync_test_3
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_3 as aLEFT JOIN flink_test_3_dim  FOR SYSTEM_TIME AS OF  a.proctime  as bON b.id = a.id)
GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;

相关文章:

Flink流批一体计算(24):Flink SQL之mysql维表实时关联

目录 1.维表 2.数据准备 创建源数据 创建维度表 创建Sink表 3.配置任务 Flink SQL创建kafka源表 Flink SQL创建MySQL维表 Flink SQL创建MySQL结果表 编写计算任务 核验数据 1.维表 目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎…...

鸿蒙(HarmonyOS)应用开发——从网络获取数据(题目答案)

判断题 1.在http模块中,多个请求可以使用同一个httpRequest对象,httpRequest对象可以复用。 错误(False) 2.使用http模块发起网络请求后,可以使用destroy方法中断网络请求。 正确(True) 3.Web组件onConfirm(callback: (event?: { url: …...

力扣:197. 上升的温度(Python3)

题目: 表: Weather ------------------------ | Column Name | Type | ------------------------ | id | int | | recordDate | date | | temperature | int | ------------------------ id 是该表具有唯一值的列。 该表…...

uniApp应用软件在运行时,不符合华为应用市场审核标准。解决方案合集!

(暂时用不到的也建议收藏一下,因为文章持续更新中) 最新更改时间:20023-12-10 第一次做App应用开发相信大家一定都遇到过华为应用市场审核的“驳回”! 有些问题一看就明白可以立马修改,而有一些问题修改意…...

c#编码技巧(十五):新语法糖record深入分析

c#编码技巧(十四):新语法糖record深入分析 从 C# 9 开始新增了一个关键字record,用于封装数据。 record实质是微软提供的一个语法糖,因很多开源项目都用到了这个关键字,说明这个语法糖比较实用。 那么这个record类型和普通class类…...

Java IO流(五)(字符集基础知识简介)

字符集 计算机的存储规则(英文字符) 常见字符集介绍 a.GB2312字符集:1980年发布,1981年5月1日实施的简体中文汉字编码国家标准。收录7445个图形字符,其中包括6763个简体汉字 b.BIG5字符集:台湾地区繁体中…...

周周爱学习之Redis重点总结

redis重点总结 在正常的业务流程中,用户发送请求,然后到缓存中查询数据。如果缓存中不存在数据的话,就会去数据库查询数据。数据库中有的话,就会更新缓存然后返回数据,数据库中也没有的话就会给用户返回一个空。 1.缓…...

免费的SEO外链发布工具,提升排名的利器

互联网已经成为信息传播和商业发展的重要平台。而对于拥有网站的个人、企业来说,如何让自己的网站在搜索引擎中脱颖而出?SEO(Search Engine Optimization)作为提高网站在搜索引擎中排名的关键手段. 什么是SEO外链? S…...

腾讯字节常考的linux命令

1 ps 1.1 ps -ef 有哪些字段 ps -ef 命令在Unix/Linux系统中用于显示当前运行的进程。输出的字段通常包括: UID:启动进程的用户ID。PID:进程ID。PPID:父进程ID。C:CPU利用率。STIME:进程启动时间。TTY&a…...

JAVA后端自学技能实操合集

JAVA后端自学技能实操 内容将会持续更新中,有需要添加什么内容可以再评论区留言,大家一起学习FastDFS使用docker安装FastDFS(linux)集成到springboot项目中 内容将会持续更新中,有需要添加什么内容可以再评论区留言,大家一起学习 FastDFS 组名:文件上传后所在的 st…...

C++ 关联容器

关联容器 关联容器支持高效的关键字查找和访问。 两个主要的关联容器(associative container)类型是 map 和 set。 map 中的元素是一些关键字——值对。 关键字起到索引的作用,值则表示与索引相关联的数据。 set 中的每个元素只包含一个关键…...

ES6之函数新增的扩展

参数 ES6允许为函数的参数设置默认值 function log(x, y World) {console.log(x, y); }console.log(Hello) // Hello World console.log(Hello, China) // Hello China console.log(Hello, ) // Hello函数的形参是默认声明的,不能使用let或const再次声明 functi…...

postgresql安装部署(docker版本)

1.在线部署 创建数据库存储目录 mkdir /home/pgdata创建容器 docker run --name postgresql --restartalways -d -p 5432:5432 -v /home/pgdata:/var/lib/postgresql/data --shm-size10g -e POSTGRES_PASSWORD密码 postgis/postgis:12-3.2-alpine–name为设置容器名称 -d表…...

【Python/Java/C++三种语言】20天拿下华为OD笔试之【位运算】2023B-出错的或电路【欧弟算法】全网注释最详细分类最全的华为OD真题

文章目录 题目描述与示例题目描述输入描述输出描述示例一输入输出说明 示例二输入输出说明 解题思路代码PythonJavaC时空复杂度 华为OD算法/大厂面试高频题算法练习冲刺训练 题目描述与示例 题目描述 某生产门电路的厂商发现某一批次的或门电路不稳定,具体现象为计…...

vscode 编译运行c++ 记录

一、打开文件夹,新建或打开一个cpp文件 二、ctrl shift p 进入 c/c配置 进行 IntelliSense 配置。主要是选择编译器、 c标准, 设置头文件路径等,配置好后会生成 c_cpp_properties.json; 二、编译运行: 1、选中ma…...

错题总结(四)

1.【一维数组】输入10个整数&#xff0c;求平均值 编写一个程序&#xff0c;从用户输入中读取10个整数并存储在一个数组中。然后&#xff0c;计算并输出这些整数的平均值。 int main() {int arr[10];int sum 0;for (int n 0; n < 10; n){scanf("%d", &arr…...

ORACLE使用Mybatis-plus批量插入

ORACLE使用mybatis-plus自带的iservice.saveBatch方法时&#xff0c;会报DML Returing cannot be batch错误&#xff1a; 推测原因是oracle不支持insert into table_name (,) values &#xff08;&#xff0c;&#xff09;,&#xff08;&#xff09;的写法。且oracle不会自动生…...

vue,uniapp的pdf等文件在线预览

vue&#xff0c;uniapp文件在线预览方案&#xff0c;用了个稍微偏门一点的方法实现了 通过后端生成文件查看页面&#xff0c;然后前端只要展示这个网页就行&#xff0c;uniapp就用web-view来展示&#xff0c;后台系统就直接window.open()打开就行 示例查看PDF文件&#xff0c;…...

SpringBoot 项目 Jar 包加密,防止反编译

1场景 最近项目要求部署到其他公司的服务器上&#xff0c;但是又不想将源码泄露出去。要求对正式环境的启动包进行安全性处理&#xff0c;防止客户直接通过反编译工具将代码反编译出来。 2方案 第一种方案使用代码混淆 采用proguard-maven-plugin插件 在单模块中此方案还算简…...

DockerFile中途执行出错的解决办法

DockerFile中途执行出错的解决办法 你们是否也曾经因为DockerFile中途执行出错,而对其束手无策?总是对docker避之不及! 但是当下载的源码运用到了docker,dockerFile 执行到一半,报错了怎么办? 现状 那么当DockerFile执行一半出错后,会产生什么结果呢? 如图可知,生成…...

Phi-3-mini-4k-instruct-gguf高算力适配:CUDA加速下RTX3090显存占用仅2.1GB实测

Phi-3-mini-4k-instruct-gguf高算力适配&#xff1a;CUDA加速下RTX3090显存占用仅2.1GB实测 1. 模型概述 Phi-3-mini-4k-instruct-gguf是微软Phi-3系列中的轻量级文本生成模型GGUF版本。这个经过优化的模型特别适合问答、文本改写、摘要整理和简短创作等场景。相比原始版本&a…...

基于Simulink的滞环电压控制(Bang-Bang)Buck仿真

目录 手把手教你学Simulink ——基于Simulink的滞环电压控制(Bang-Bang)Buck仿真 一、问题背景 二、滞环控制原理 1. 控制思想 三、系统架构 四、Simulink 建模步骤 第一步:搭建 Buck 主电路 第二步:实现滞环比较器 第三步:死区时间插入(防直通) 第四步:驱动…...

openclaw v2026.4.1 发布!16 大核心功能升级 + 28 项关键修复,AI 智能体网关全面进化,稳定性与安全性再攀高峰

一、前言&#xff1a;开源AI智能体标杆再升级&#xff0c;v2026.4.1引领本地自动化新潮流 2026年4月2日&#xff0c;开源AI智能体执行网关领域的标杆项目OpenClaw正式推出v2026.4.1最新版本。作为一款主打本地优先、自托管、全开源的AI智能体框架&#xff0c;OpenClaw自诞生以来…...

Qwen3-VL-2B-Instruct保姆级教程:视觉对话机器人部署

Qwen3-VL-2B-Instruct保姆级教程&#xff1a;视觉对话机器人部署 1. 环境准备与快速部署 想要体验AI视觉对话的神奇能力吗&#xff1f;Qwen3-VL-2B-Instruct让你不用昂贵的显卡也能拥有一个能"看懂"图片的智能助手。这个教程将手把手带你完成整个部署过程&#xff…...

PyTorch 2.8开源大模型镜像实操:HuggingFace模型本地化API服务封装

PyTorch 2.8开源大模型镜像实操&#xff1a;HuggingFace模型本地化API服务封装 1. 镜像环境概览 1.1 硬件与软件配置 这个基于PyTorch 2.8的深度学习镜像经过RTX 4090D显卡和CUDA 12.4的深度优化&#xff0c;为大型模型推理和训练提供了开箱即用的环境。主要配置包括&#x…...

YOLO X Layout在新闻行业的应用:版面自动排版

YOLO X Layout在新闻行业的应用&#xff1a;版面自动排版 每天清晨&#xff0c;当大多数人还在睡梦中时&#xff0c;新闻编辑部的排版编辑已经开始了一天中最紧张的工作&#xff1a;将记者们连夜赶制的稿件、摄影师捕捉的精彩瞬间、设计师制作的图表&#xff0c;精准地排列在有…...

OpenClaw移动办公:Phi-3-mini-128k-instruct通过钉钉审批电子合同

OpenClaw移动办公&#xff1a;Phi-3-mini-128k-instruct通过钉钉审批电子合同 1. 为什么需要移动审批电子合同&#xff1f; 上周三我在高铁上收到法务同事的紧急消息&#xff1a;"有个供应商合同今天必须签完&#xff0c;但关键条款需要你确认"。当时手边既没电脑也…...

复古计算机复兴:OpenClaw+Qwen3-14B驱动命令行工作流

复古计算机复兴&#xff1a;OpenClawQwen3-14B驱动命令行工作流 1. 当AI遇见Unix哲学 我的书桌上至今保留着一台1984年的IBM PC/AT&#xff0c;那厚重的机械键盘和闪烁的绿色光标总能唤起某种仪式感。最近在调试OpenClaw对接Qwen3-14B时&#xff0c;突然意识到&#xff1a;我…...

【独家首发】CPython内存管理策略白皮书(基于v3.9–v3.13源码比对):37处关键宏定义、12个GC阈值参数、8类对象内存布局差异

第一章&#xff1a;CPython内存管理策略全景概览CPython 作为 Python 官方解释器&#xff0c;其内存管理机制融合了引用计数、循环垃圾回收&#xff08;GC&#xff09;与分代回收策略&#xff0c;形成一套兼顾实时性与鲁棒性的综合体系。理解该机制对诊断内存泄漏、优化对象生命…...

方寸陶瓷藏乾坤:百能云板用陶瓷基板四大核心工艺,赋能万物互联时代

当你驾驶新能源汽车平稳穿梭在城市街巷&#xff0c;当深夜的 LED 路灯精准照亮回家的路&#xff0c;当手机人脸识别瞬间解锁生活便捷 —— 你或许不会想到&#xff0c;这些场景的背后&#xff0c;都离不开一块 “隐形基石”&#xff1a;陶瓷散热基板。作为电子设备的 “散热心脏…...