2.3 如何使用FlinkSQL读取写入到JDBC(MySQL)
1、JDBC SQL 连接器
FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据
添加Maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>
注意:如果使用 sql-client客户端,需保证 flink-1.17.1/lib 目录下 存在相应的jar包
相关jar可以通过官网下载:JDBC SQL 连接器

2、读取 MySQL
FlinkSQL读取MySQL表时,为批式处理,在流式计算任务中,通常被做维表来使用
-- 在FlinkSQL中创建 MySQL Source 表
drop table mysql_source_table;
CREATE TABLE mysql_source_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01/flink','driver' = 'com.mysql.jdbc.Driver', -- 【可选】不设置时,将自动从url中推导'username' = 'xxxx','password' = 'xxxx','table-name' = 'books'
);-- 批式 sql,查看 JDBC 表中的数据
select * from mysql_source_table;
运行结果:

3、写入MySQL
3.1 何时批量写入MySQL呢?
FlinkSQL往MySQL写入数据时,默认会在客户端缓存数据,当触发设置的阈值后,才会向服务端发送数据

开启checkpoint :
# TODO 开启checkpoint,当checkpoint后,会触发jdbc的flush操作
set execution.checkpointing.interval=300sec;
设置 flush 前缓存记录的最大值 、flush 间隔时间:
-- TODO 创建sink mysql table
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'xxxx','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '100', -- flush 前缓存记录的最大值,默认值为100,设置为0时,表示不缓存数据(来一条写入一条)'sink.buffer-flush.interval' = '50s' -- flush 间隔时间,超过该时间后异步线程将 flush 数据。默认为1s
);
使用说明:
FLinkSQL写入MySQL时,常通过 sink.buffer-flush.max-rows、sink.buffer-flush.interval 来控制写入数据的延迟程度
当 对写入实时性要求较高时,可以将 sink.buffer-flush.max-rows = 0 ,表示到来一条数据后立即写入MySQL,但带来的后果是 长时间占有mysql连接
当 数据量大且对实时要求不高时,可根据业务需求调大配置,可使实时行和性能最优
3.2 sink mysql table 中主键的作用
在FLinkSQL中创建sink mysql table时,如果表中定义了主键,则连接器将以 upsert 模式工作
否则连接器将以 append 模式工作
upsert 模式:Flink 将根据主键判断插入新行或者更新已存在的行
使用这种模式时,确保MySQL中的底表定义主键和添加唯一性约束
append 模式:对MySQL库中底表做insert操作
upsert 模式:
-- TODO 创建MySQL 表
CREATE TABLE `books` (`id` int(11) NOT NULL,`title` varchar(99) DEFAULT NULL,`author` varchar(99) DEFAULT NULL,`price` double DEFAULT NULL,`qty` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT,PRIMARY KEY (id) NOT ENFORCED -- 指定主键字段
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做upsert操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);
append 模式:
-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);
注意:使用 append模式时,如果MySQL底表中存在主键或唯一性约束时,INSERT 插入可能会失败
insert into 失败:

相关文章:
2.3 如何使用FlinkSQL读取写入到JDBC(MySQL)
1、JDBC SQL 连接器 FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据 添加Maven依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1…...
Flink日志收集到数据库/kafka
引言 我们做项目过程中发现flink日志不同模式启动,存放位置不同,查找任务日志很不方便,具体问题如下: 原始flink的日志配置文件log4j-cli.properties appender.file.append false,取消追加,直接覆盖掉上…...
Go项目踩坑:go get下载超时,goFrame框架下的go项目里将vue项目的dist同步打包发布,go项目打包并压缩
Go项目踩坑:go get下载超时,goFrame框架下的go项目里将vue项目的dist同步打包发布,go项目打包并压缩 go get下载超时goFrame打包静态资源vue项目打包gf pack生成go文件 静态资源使用打包发布go项目交叉编译,省略一些不必要的信息通…...
DataCon【签到题】挖矿流量检测
【签到题】挖矿流量检测 文章目录 答案【多选】1. 个人电脑中了挖矿病毒通常有以下哪些表现?【单选】2. 在典型挖矿场景中,矿工和矿池之间目前最常用的通信协议是哪一个?【单选】3. 目前的虚拟货币挖矿场景中,最常采用的是哪种共识…...
Vivado详细使用教程 | LED闪烁示例
文章目录 整体流程第一步:新建工程第二步:设计输入第三步:功能仿真第四步:分析与综合第五步:约束输入第六步:设计实现第七步:下载比特流 整体流程 打开软甲------>新建工程------->设计输…...
一些经典的神经网络(第17天)
1. 经典神经网络LeNet LeNet是早期成功的神经网络; 先使用卷积层来学习图片空间信息 然后使用全连接层来转到到类别空间 【通过在卷积层后加入激活函数,可以引入非线性、增加模型的表达能力、增强稀疏性和解决梯度消失等问题,从而提高卷积…...
Hadoop-HA-Hive-on-Spark 4台虚拟机安装配置文件
Hadoop-HA-Hive-on-Spark 4台虚拟机安装配置文件 版本号步骤hadoopcore-site.xmlhdfs-site.xmlmapred-site.xmlslavesworkersyarn-site.xml hivehive-site.xmlspark-defaults.conf sparkhdfs-site.xmlhive-site.xmlslavesyarn-site.xmlspark-env.sh 版本号 apache-hive-3.1.3-…...
Hutool工具类参考文章
Hutool工具类参考文章 日期: 身份证:...
【 Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全】
Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全 本文主要介绍了Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值&#x…...
eclipse 配置selenium环境
eclipse环境 安装selenium的步骤 配置谷歌浏览器驱动 Selenium安装-如何在Java中安装Selenium chrome驱动下载 eclipse 启动配置java_home: 在eclipse.ini文件中加上一行 1 配置java环境,网上有很多教程 2 下载eclipse,网上有很多教程 ps&…...
数据挖掘(6)聚类分析
一、什么是聚类分析 1.1概述 无指导的,数据集中类别未知类的特征: 类不是事先给定的,而是根据数据的相似性、距离划分的聚类的数目和结构都没有事先假定。挖掘有价值的客户: 找到客户的黄金客户ATM的安装位置 1.2区别 二、距离和相似系数 …...
在启智平台上安装anconda
安装Anaconda3-5.0.1-Linux-x86_64.sh python版本是3.6 在下面的网站上找到要下载的anaconda版本,把对应的.sh文件下载下来 https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/ 把sh文件压缩成.zip文件,拖到启智平台的调试页面 上传到平台上 un…...
棒球省队建设实施办法·棒球1号位
棒球省队建设实施办法 1. 建设目标与原则 提升棒球省队整体竞技水平 为了提升棒球省队整体竞技水平,我们需要采取一系列有效的措施。 首先,我们应该加强对棒球运动的投入和关注。各级政府和相关部门应加大对棒球运动的经费投入,提高球队的…...
架构案例2017(五十二)
第5题 阅读以下关于Web系统架构设计的叙述,在答题纸上回答问题1至问题3.【说明】某电子商务企业因发展良好,客户量逐步增大,企业业务不断扩充,导致其原有的B2C商品交易平台己不能满足现有业务需求。因此,该企业委托某…...
给四个点坐标计算两条直线的交点
文章目录 1 chatgpt42、文心一言3、星火4、Bard总结 我使用Chatgpt4和文心一言、科大讯飞星火、google Bard 对该问题进行搜索,分别给出答案。先说结论,是chatgpt4和文心一言给对了答案, 另外两个部分正确。 问题是:python 给定四…...
从入门到进阶 之 ElasticSearch SpringData 继承篇
🌹 以上分享 从入门到进阶 之 ElasticSearch SpringData 继承篇,如有问题请指教写。🌹🌹 如你对技术也感兴趣,欢迎交流。🌹🌹🌹 如有需要,请👍点赞…...
中文编程开发语言工具编程案例:计时计费管理系统软件连接灯控器编程案例
中文编程开发语言工具编程案例:计时计费管理系统软件连接灯控器编程案例 中文编程开发语言工具编程案例:计时计费管理系统软件连接灯控器编程案例 中文编程系统化教程,不需英语基础。学习链接 https://edu.csdn.net/course/detail/39036...
YOLOv7改进:动态蛇形卷积(Dynamic Snake Convolution),增强细微特征对小目标友好,实现涨点 | ICCV2023
💡💡💡本文独家改进:动态蛇形卷积(Dynamic Snake Convolution),增强细长微弱的局部结构特征与复杂多变的全局形态特征,对小目标检测很适用 Dynamic Snake Convolution | 亲测在多个数据集能够实现大幅涨点 收录: YOLOv7高阶自研专栏介绍: http://t.csdnimg.…...
从文心大模型4.0与FuncGPT:用AI为开发者打开新视界
今天,在百度2023世界大会上,文心大模型4.0正式发布,而在大洋的彼岸,因为大模型代表ChatGPT之类的AI编码工具来势汹汹,作为全世界每个开发者最爱的代码辅助网站,Stack Overflow的CEO Prashanth Chandrasekar…...
Nginx集群负载均衡配置完整流程
今天,良哥带你来做一个nginx集群的负载均衡配置的完整流程。 一、准备工作 本次搭建的操作系统环境是win11,linux可配置类同。 1)首先,下载nginx。 下载地址为:http://nginx.org/en/download.html 良哥下载的是&am…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...
Mysql8 忘记密码重置,以及问题解决
1.使用免密登录 找到配置MySQL文件,我的文件路径是/etc/mysql/my.cnf,有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
阿里云Ubuntu 22.04 64位搭建Flask流程(亲测)
cd /home 进入home盘 安装虚拟环境: 1、安装virtualenv pip install virtualenv 2.创建新的虚拟环境: virtualenv myenv 3、激活虚拟环境(激活环境可以在当前环境下安装包) source myenv/bin/activate 此时,终端…...
2025年低延迟业务DDoS防护全攻略:高可用架构与实战方案
一、延迟敏感行业面临的DDoS攻击新挑战 2025年,金融交易、实时竞技游戏、工业物联网等低延迟业务成为DDoS攻击的首要目标。攻击呈现三大特征: AI驱动的自适应攻击:攻击流量模拟真实用户行为,差异率低至0.5%,传统规则引…...
【1】跨越技术栈鸿沟:字节跳动开源TRAE AI编程IDE的实战体验
2024年初,人工智能编程工具领域发生了一次静默的变革。当字节跳动宣布退出其TRAE项目(一款融合大型语言模型能力的云端AI编程IDE)时,技术社区曾短暂叹息。然而这一退场并非终点——通过开源社区的接力,TRAE在WayToAGI等…...
Python异步编程:深入理解协程的原理与实践指南
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 持续学习,不断…...
基于小程序老人监护管理系统源码数据库文档
摘 要 近年来,随着我国人口老龄化问题日益严重,独居和居住养老机构的的老年人数量越来越多。而随着老年人数量的逐步增长,随之而来的是日益突出的老年人问题,尤其是老年人的健康问题,尤其是老年人产生健康问题后&…...
Android Framework预装traceroute执行文件到system/bin下
文章目录 Android SDK中寻找traceroute代码内置traceroute到SDK中traceroute参数说明-I 参数(使用 ICMP Echo 请求)-T 参数(使用 TCP SYN 包) 相关文章 Android SDK中寻找traceroute代码 设备使用的是Android 11,在/s…...
