2.3 如何使用FlinkSQL读取写入到JDBC(MySQL)
1、JDBC SQL 连接器
FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据
添加Maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>
注意:如果使用 sql-client客户端,需保证 flink-1.17.1/lib 目录下 存在相应的jar包
相关jar可以通过官网下载:JDBC SQL 连接器

2、读取 MySQL
FlinkSQL读取MySQL表时,为批式处理,在流式计算任务中,通常被做维表来使用
-- 在FlinkSQL中创建 MySQL Source 表
drop table mysql_source_table;
CREATE TABLE mysql_source_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01/flink','driver' = 'com.mysql.jdbc.Driver', -- 【可选】不设置时,将自动从url中推导'username' = 'xxxx','password' = 'xxxx','table-name' = 'books'
);-- 批式 sql,查看 JDBC 表中的数据
select * from mysql_source_table;
运行结果:

3、写入MySQL
3.1 何时批量写入MySQL呢?
FlinkSQL往MySQL写入数据时,默认会在客户端缓存数据,当触发设置的阈值后,才会向服务端发送数据

开启checkpoint :
# TODO 开启checkpoint,当checkpoint后,会触发jdbc的flush操作
set execution.checkpointing.interval=300sec;
设置 flush 前缓存记录的最大值 、flush 间隔时间:
-- TODO 创建sink mysql table
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'xxxx','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '100', -- flush 前缓存记录的最大值,默认值为100,设置为0时,表示不缓存数据(来一条写入一条)'sink.buffer-flush.interval' = '50s' -- flush 间隔时间,超过该时间后异步线程将 flush 数据。默认为1s
);
使用说明:
FLinkSQL写入MySQL时,常通过 sink.buffer-flush.max-rows、sink.buffer-flush.interval 来控制写入数据的延迟程度
当 对写入实时性要求较高时,可以将 sink.buffer-flush.max-rows = 0 ,表示到来一条数据后立即写入MySQL,但带来的后果是 长时间占有mysql连接
当 数据量大且对实时要求不高时,可根据业务需求调大配置,可使实时行和性能最优
3.2 sink mysql table 中主键的作用
在FLinkSQL中创建sink mysql table时,如果表中定义了主键,则连接器将以 upsert 模式工作
否则连接器将以 append 模式工作
upsert 模式:Flink 将根据主键判断插入新行或者更新已存在的行
使用这种模式时,确保MySQL中的底表定义主键和添加唯一性约束
append 模式:对MySQL库中底表做insert操作
upsert 模式:
-- TODO 创建MySQL 表
CREATE TABLE `books` (`id` int(11) NOT NULL,`title` varchar(99) DEFAULT NULL,`author` varchar(99) DEFAULT NULL,`price` double DEFAULT NULL,`qty` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT,PRIMARY KEY (id) NOT ENFORCED -- 指定主键字段
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做upsert操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);
append 模式:
-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);
注意:使用 append模式时,如果MySQL底表中存在主键或唯一性约束时,INSERT 插入可能会失败
insert into 失败:

相关文章:
2.3 如何使用FlinkSQL读取写入到JDBC(MySQL)
1、JDBC SQL 连接器 FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据 添加Maven依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1…...
Flink日志收集到数据库/kafka
引言 我们做项目过程中发现flink日志不同模式启动,存放位置不同,查找任务日志很不方便,具体问题如下: 原始flink的日志配置文件log4j-cli.properties appender.file.append false,取消追加,直接覆盖掉上…...
Go项目踩坑:go get下载超时,goFrame框架下的go项目里将vue项目的dist同步打包发布,go项目打包并压缩
Go项目踩坑:go get下载超时,goFrame框架下的go项目里将vue项目的dist同步打包发布,go项目打包并压缩 go get下载超时goFrame打包静态资源vue项目打包gf pack生成go文件 静态资源使用打包发布go项目交叉编译,省略一些不必要的信息通…...
DataCon【签到题】挖矿流量检测
【签到题】挖矿流量检测 文章目录 答案【多选】1. 个人电脑中了挖矿病毒通常有以下哪些表现?【单选】2. 在典型挖矿场景中,矿工和矿池之间目前最常用的通信协议是哪一个?【单选】3. 目前的虚拟货币挖矿场景中,最常采用的是哪种共识…...
Vivado详细使用教程 | LED闪烁示例
文章目录 整体流程第一步:新建工程第二步:设计输入第三步:功能仿真第四步:分析与综合第五步:约束输入第六步:设计实现第七步:下载比特流 整体流程 打开软甲------>新建工程------->设计输…...
一些经典的神经网络(第17天)
1. 经典神经网络LeNet LeNet是早期成功的神经网络; 先使用卷积层来学习图片空间信息 然后使用全连接层来转到到类别空间 【通过在卷积层后加入激活函数,可以引入非线性、增加模型的表达能力、增强稀疏性和解决梯度消失等问题,从而提高卷积…...
Hadoop-HA-Hive-on-Spark 4台虚拟机安装配置文件
Hadoop-HA-Hive-on-Spark 4台虚拟机安装配置文件 版本号步骤hadoopcore-site.xmlhdfs-site.xmlmapred-site.xmlslavesworkersyarn-site.xml hivehive-site.xmlspark-defaults.conf sparkhdfs-site.xmlhive-site.xmlslavesyarn-site.xmlspark-env.sh 版本号 apache-hive-3.1.3-…...
Hutool工具类参考文章
Hutool工具类参考文章 日期: 身份证:...
【 Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全】
Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全 本文主要介绍了Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值&#x…...
eclipse 配置selenium环境
eclipse环境 安装selenium的步骤 配置谷歌浏览器驱动 Selenium安装-如何在Java中安装Selenium chrome驱动下载 eclipse 启动配置java_home: 在eclipse.ini文件中加上一行 1 配置java环境,网上有很多教程 2 下载eclipse,网上有很多教程 ps&…...
数据挖掘(6)聚类分析
一、什么是聚类分析 1.1概述 无指导的,数据集中类别未知类的特征: 类不是事先给定的,而是根据数据的相似性、距离划分的聚类的数目和结构都没有事先假定。挖掘有价值的客户: 找到客户的黄金客户ATM的安装位置 1.2区别 二、距离和相似系数 …...
在启智平台上安装anconda
安装Anaconda3-5.0.1-Linux-x86_64.sh python版本是3.6 在下面的网站上找到要下载的anaconda版本,把对应的.sh文件下载下来 https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/ 把sh文件压缩成.zip文件,拖到启智平台的调试页面 上传到平台上 un…...
棒球省队建设实施办法·棒球1号位
棒球省队建设实施办法 1. 建设目标与原则 提升棒球省队整体竞技水平 为了提升棒球省队整体竞技水平,我们需要采取一系列有效的措施。 首先,我们应该加强对棒球运动的投入和关注。各级政府和相关部门应加大对棒球运动的经费投入,提高球队的…...
架构案例2017(五十二)
第5题 阅读以下关于Web系统架构设计的叙述,在答题纸上回答问题1至问题3.【说明】某电子商务企业因发展良好,客户量逐步增大,企业业务不断扩充,导致其原有的B2C商品交易平台己不能满足现有业务需求。因此,该企业委托某…...
给四个点坐标计算两条直线的交点
文章目录 1 chatgpt42、文心一言3、星火4、Bard总结 我使用Chatgpt4和文心一言、科大讯飞星火、google Bard 对该问题进行搜索,分别给出答案。先说结论,是chatgpt4和文心一言给对了答案, 另外两个部分正确。 问题是:python 给定四…...
从入门到进阶 之 ElasticSearch SpringData 继承篇
🌹 以上分享 从入门到进阶 之 ElasticSearch SpringData 继承篇,如有问题请指教写。🌹🌹 如你对技术也感兴趣,欢迎交流。🌹🌹🌹 如有需要,请👍点赞…...
中文编程开发语言工具编程案例:计时计费管理系统软件连接灯控器编程案例
中文编程开发语言工具编程案例:计时计费管理系统软件连接灯控器编程案例 中文编程开发语言工具编程案例:计时计费管理系统软件连接灯控器编程案例 中文编程系统化教程,不需英语基础。学习链接 https://edu.csdn.net/course/detail/39036...
YOLOv7改进:动态蛇形卷积(Dynamic Snake Convolution),增强细微特征对小目标友好,实现涨点 | ICCV2023
💡💡💡本文独家改进:动态蛇形卷积(Dynamic Snake Convolution),增强细长微弱的局部结构特征与复杂多变的全局形态特征,对小目标检测很适用 Dynamic Snake Convolution | 亲测在多个数据集能够实现大幅涨点 收录: YOLOv7高阶自研专栏介绍: http://t.csdnimg.…...
从文心大模型4.0与FuncGPT:用AI为开发者打开新视界
今天,在百度2023世界大会上,文心大模型4.0正式发布,而在大洋的彼岸,因为大模型代表ChatGPT之类的AI编码工具来势汹汹,作为全世界每个开发者最爱的代码辅助网站,Stack Overflow的CEO Prashanth Chandrasekar…...
Nginx集群负载均衡配置完整流程
今天,良哥带你来做一个nginx集群的负载均衡配置的完整流程。 一、准备工作 本次搭建的操作系统环境是win11,linux可配置类同。 1)首先,下载nginx。 下载地址为:http://nginx.org/en/download.html 良哥下载的是&am…...
终极指南:使用 crypto-js 测试套件确保你的加密功能100%可靠
终极指南:使用 crypto-js 测试套件确保你的加密功能100%可靠 【免费下载链接】crypto-js JavaScript library of crypto standards. 项目地址: https://gitcode.com/gh_mirrors/cr/crypto-js 在Web开发中,你有没有遇到过这样的场景:你…...
SenseVoice-Small模型在运维监控中的语音告警应用
SenseVoice-Small模型在运维监控中的语音告警应用 1. 运维人员每天都在和告警“搏斗” 你有没有经历过这样的场景:凌晨三点,手机突然震动,一条告警短信跳出来——“数据库连接池使用率98%”。你立刻爬起来打开电脑,连上跳板机&a…...
Python AOT编译迎来分水岭:2026年3大工业级工具实测对比(启动提速8.7×,内存降63%,兼容CPython 3.13+)
第一章:Python AOT编译的范式跃迁与工业落地元年定义长期以来,Python 以解释执行和动态特性见长,但其运行时开销、启动延迟与内存 footprint 成为云原生服务、边缘设备与实时系统规模化部署的关键瓶颈。2024 年,随着 Nuitka 14.x、…...
终极zsh语法高亮插件版本兼容性测试:Zsh 5.0到5.9全面支持指南
终极zsh语法高亮插件版本兼容性测试:Zsh 5.0到5.9全面支持指南 【免费下载链接】zsh-syntax-highlighting Fish shell like syntax highlighting for Zsh. 项目地址: https://gitcode.com/gh_mirrors/zs/zsh-syntax-highlighting zsh-syntax-highlighting是Z…...
英雄联盟段位修改完整解决方案:LeaguePrank免费工具终极指南
英雄联盟段位修改完整解决方案:LeaguePrank免费工具终极指南 【免费下载链接】LeaguePrank 项目地址: https://gitcode.com/gh_mirrors/le/LeaguePrank 还在为单调的游戏段位显示感到乏味吗?LeaguePrank这款创新的免费工具将彻底改变你的英雄联盟…...
用MNN实现手机端AI绘画:Android Studio集成与模型量化实战
用MNN实现手机端AI绘画:Android Studio集成与模型量化实战 移动端AI应用正在经历爆发式增长,其中AI绘画因其创意性和实用性成为开发者关注的热点。本文将手把手教你如何通过阿里开源的MNN框架,在Android应用中实现高性能的AI绘画功能。不同于…...
MAD与标准差:鲁棒统计中的抗噪利器
1. 为什么我们需要抗噪统计量? 在日常数据分析中,我们经常会遇到一些"不听话"的数据点。比如分析员工薪资时突然冒出几个高管的天价年薪,或者测量温度时混入几个明显错误的极端值。这时候如果直接用传统的标准差来计算离散程度&…...
虚幻引擎PicoVR开发避坑指南:PicoXR与PicoOpenXR插件选型与实战解析
1. PicoXR与PicoOpenXR插件核心差异解析 第一次接触PicoVR开发时,很多开发者都会被两个相似的插件名称搞懵——PicoXR和PicoOpenXR。这两个插件虽然名字相近,但在功能特性和适用场景上存在本质区别。我在去年开发健身类VR应用时就因为选错插件࿰…...
ComfyUI-VideoHelperSuite终极指南:掌握视频合成与AI工作流的核心技巧
ComfyUI-VideoHelperSuite终极指南:掌握视频合成与AI工作流的核心技巧 【免费下载链接】ComfyUI-VideoHelperSuite Nodes related to video workflows 项目地址: https://gitcode.com/gh_mirrors/co/ComfyUI-VideoHelperSuite 想要将AI生成的图像序列转化为流…...
深度解析CloverBootloader内存管理:AptioMemoryFix原理与实现详解
深度解析CloverBootloader内存管理:AptioMemoryFix原理与实现详解 【免费下载链接】CloverBootloader Bootloader for macOS, Windows and Linux in UEFI and in legacy mode 项目地址: https://gitcode.com/gh_mirrors/cl/CloverBootloader CloverBootloade…...
