当前位置: 首页 > news >正文

2.3 如何使用FlinkSQL读取写入到JDBC(MySQL)

1、JDBC SQL 连接器

FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据

添加Maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>

注意:如果使用 sql-client客户端,需保证 flink-1.17.1/lib 目录下 存在相应的jar包

 相关jar可以通过官网下载:JDBC SQL 连接器 


2、读取 MySQL

FlinkSQL读取MySQL表时,为批式处理,在流式计算任务中,通常被做维表来使用

-- 在FlinkSQL中创建 MySQL Source 表
drop table mysql_source_table;
CREATE TABLE mysql_source_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01/flink','driver' = 'com.mysql.jdbc.Driver',  -- 【可选】不设置时,将自动从url中推导'username' = 'xxxx','password' = 'xxxx','table-name' = 'books'
);-- 批式 sql,查看 JDBC 表中的数据
select * from mysql_source_table;

运行结果:


3、写入MySQL

3.1 何时批量写入MySQL呢?

FlinkSQL往MySQL写入数据时,默认会在客户端缓存数据,当触发设置的阈值后,才会向服务端发送数据

开启checkpoint :

# TODO 开启checkpoint,当checkpoint后,会触发jdbc的flush操作
set execution.checkpointing.interval=300sec;

设置 flush 前缓存记录的最大值 、flush 间隔时间:

-- TODO 创建sink mysql table
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'xxxx','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '100', -- flush 前缓存记录的最大值,默认值为100,设置为0时,表示不缓存数据(来一条写入一条)'sink.buffer-flush.interval' = '50s' -- flush 间隔时间,超过该时间后异步线程将 flush 数据。默认为1s
);

使用说明:

FLinkSQL写入MySQL时,常通过 sink.buffer-flush.max-rows、sink.buffer-flush.interval 来控制写入数据的延迟程度

        当 对写入实时性要求较高时,可以将 sink.buffer-flush.max-rows = 0 ,表示到来一条数据后立即写入MySQL,但带来的后果是 长时间占有mysql连接

        当 数据量大且对实时要求不高时,可根据业务需求调大配置,可使实时行和性能最优


3.2 sink mysql table 中主键的作用

在FLinkSQL中创建sink mysql table时,如果表中定义了主键,则连接器将以 upsert 模式工作

否则连接器将以 append 模式工作

         upsert 模式:Flink 将根据主键判断插入新行或者更新已存在的行

                               使用这种模式时,确保MySQL中的底表定义主键和添加唯一性约束

       append 模式:对MySQL库中底表做insert操作

 upsert 模式:

-- TODO 创建MySQL 表
CREATE TABLE `books` (`id` int(11) NOT NULL,`title` varchar(99) DEFAULT NULL,`author` varchar(99) DEFAULT NULL,`price` double DEFAULT NULL,`qty` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT,PRIMARY KEY (id) NOT ENFORCED -- 指定主键字段
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做upsert操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

append 模式:

-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

注意:使用 append模式时,如果MySQL底表中存在主键或唯一性约束时,INSERT 插入可能会失败

insert into 失败:

相关文章:

2.3 如何使用FlinkSQL读取写入到JDBC(MySQL)

1、JDBC SQL 连接器 FlinkSQL允许使用 JDBC连接器&#xff0c;向任意类型的关系型数据库读取或者写入数据 添加Maven依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1…...

Flink日志收集到数据库/kafka

引言 我们做项目过程中发现flink日志不同模式启动&#xff0c;存放位置不同&#xff0c;查找任务日志很不方便&#xff0c;具体问题如下&#xff1a; 原始flink的日志配置文件log4j-cli.properties appender.file.append false&#xff0c;取消追加&#xff0c;直接覆盖掉上…...

Go项目踩坑:go get下载超时,goFrame框架下的go项目里将vue项目的dist同步打包发布,go项目打包并压缩

Go项目踩坑&#xff1a;go get下载超时&#xff0c;goFrame框架下的go项目里将vue项目的dist同步打包发布&#xff0c;go项目打包并压缩 go get下载超时goFrame打包静态资源vue项目打包gf pack生成go文件 静态资源使用打包发布go项目交叉编译&#xff0c;省略一些不必要的信息通…...

DataCon【签到题】挖矿流量检测

【签到题】挖矿流量检测 文章目录 答案【多选】1. 个人电脑中了挖矿病毒通常有以下哪些表现&#xff1f;【单选】2. 在典型挖矿场景中&#xff0c;矿工和矿池之间目前最常用的通信协议是哪一个&#xff1f;【单选】3. 目前的虚拟货币挖矿场景中&#xff0c;最常采用的是哪种共识…...

Vivado详细使用教程 | LED闪烁示例

文章目录 整体流程第一步&#xff1a;新建工程第二步&#xff1a;设计输入第三步&#xff1a;功能仿真第四步&#xff1a;分析与综合第五步&#xff1a;约束输入第六步&#xff1a;设计实现第七步&#xff1a;下载比特流 整体流程 打开软甲------>新建工程------->设计输…...

一些经典的神经网络(第17天)

1. 经典神经网络LeNet LeNet是早期成功的神经网络&#xff1b; 先使用卷积层来学习图片空间信息 然后使用全连接层来转到到类别空间 【通过在卷积层后加入激活函数&#xff0c;可以引入非线性、增加模型的表达能力、增强稀疏性和解决梯度消失等问题&#xff0c;从而提高卷积…...

Hadoop-HA-Hive-on-Spark 4台虚拟机安装配置文件

Hadoop-HA-Hive-on-Spark 4台虚拟机安装配置文件 版本号步骤hadoopcore-site.xmlhdfs-site.xmlmapred-site.xmlslavesworkersyarn-site.xml hivehive-site.xmlspark-defaults.conf sparkhdfs-site.xmlhive-site.xmlslavesyarn-site.xmlspark-env.sh 版本号 apache-hive-3.1.3-…...

Hutool工具类参考文章

Hutool工具类参考文章 日期&#xff1a; 身份证&#xff1a;...

【 Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全】

Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全 本文主要介绍了Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全&#xff0c;文中通过示例代码介绍的非常详细&#xff0c;对大家的学习或者工作具有一定的参考学习价值&#x…...

eclipse 配置selenium环境

eclipse环境 安装selenium的步骤 配置谷歌浏览器驱动 Selenium安装-如何在Java中安装Selenium chrome驱动下载 eclipse 启动配置java_home&#xff1a; 在eclipse.ini文件中加上一行 1 配置java环境&#xff0c;网上有很多教程 2 下载eclipse&#xff0c;网上有很多教程 ps&…...

数据挖掘(6)聚类分析

一、什么是聚类分析 1.1概述 无指导的&#xff0c;数据集中类别未知类的特征&#xff1a; 类不是事先给定的&#xff0c;而是根据数据的相似性、距离划分的聚类的数目和结构都没有事先假定。挖掘有价值的客户: 找到客户的黄金客户ATM的安装位置 1.2区别 二、距离和相似系数 …...

在启智平台上安装anconda

安装Anaconda3-5.0.1-Linux-x86_64.sh python版本是3.6 在下面的网站上找到要下载的anaconda版本&#xff0c;把对应的.sh文件下载下来 https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/ 把sh文件压缩成.zip文件&#xff0c;拖到启智平台的调试页面 上传到平台上 un…...

棒球省队建设实施办法·棒球1号位

棒球省队建设实施办法 1. 建设目标与原则 提升棒球省队整体竞技水平 为了提升棒球省队整体竞技水平&#xff0c;我们需要采取一系列有效的措施。 首先&#xff0c;我们应该加强对棒球运动的投入和关注。各级政府和相关部门应加大对棒球运动的经费投入&#xff0c;提高球队的…...

架构案例2017(五十二)

第5题 阅读以下关于Web系统架构设计的叙述&#xff0c;在答题纸上回答问题1至问题3.【说明】某电子商务企业因发展良好&#xff0c;客户量逐步增大&#xff0c;企业业务不断扩充&#xff0c;导致其原有的B2C商品交易平台己不能满足现有业务需求。因此&#xff0c;该企业委托某…...

给四个点坐标计算两条直线的交点

文章目录 1 chatgpt42、文心一言3、星火4、Bard总结 我使用Chatgpt4和文心一言、科大讯飞星火、google Bard 对该问题进行搜索&#xff0c;分别给出答案。先说结论&#xff0c;是chatgpt4和文心一言给对了答案&#xff0c; 另外两个部分正确。 问题是&#xff1a;python 给定四…...

从入门到进阶 之 ElasticSearch SpringData 继承篇

&#x1f339; 以上分享 从入门到进阶 之 ElasticSearch SpringData 继承篇&#xff0c;如有问题请指教写。&#x1f339;&#x1f339; 如你对技术也感兴趣&#xff0c;欢迎交流。&#x1f339;&#x1f339;&#x1f339; 如有需要&#xff0c;请&#x1f44d;点赞&#x1f…...

中文编程开发语言工具编程案例:计时计费管理系统软件连接灯控器编程案例

中文编程开发语言工具编程案例&#xff1a;计时计费管理系统软件连接灯控器编程案例 中文编程开发语言工具编程案例&#xff1a;计时计费管理系统软件连接灯控器编程案例 中文编程系统化教程&#xff0c;不需英语基础。学习链接 https://edu.csdn.net/course/detail/39036...

YOLOv7改进:动态蛇形卷积(Dynamic Snake Convolution),增强细微特征对小目标友好,实现涨点 | ICCV2023

💡💡💡本文独家改进:动态蛇形卷积(Dynamic Snake Convolution),增强细长微弱的局部结构特征与复杂多变的全局形态特征,对小目标检测很适用 Dynamic Snake Convolution | 亲测在多个数据集能够实现大幅涨点 收录: YOLOv7高阶自研专栏介绍: http://t.csdnimg.…...

从文心大模型4.0与FuncGPT:用AI为开发者打开新视界

今天&#xff0c;在百度2023世界大会上&#xff0c;文心大模型4.0正式发布&#xff0c;而在大洋的彼岸&#xff0c;因为大模型代表ChatGPT之类的AI编码工具来势汹汹&#xff0c;作为全世界每个开发者最爱的代码辅助网站&#xff0c;Stack Overflow的CEO Prashanth Chandrasekar…...

Nginx集群负载均衡配置完整流程

今天&#xff0c;良哥带你来做一个nginx集群的负载均衡配置的完整流程。 一、准备工作 本次搭建的操作系统环境是win11&#xff0c;linux可配置类同。 1&#xff09;首先&#xff0c;下载nginx。 下载地址为&#xff1a;http://nginx.org/en/download.html 良哥下载的是&am…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

【位运算】消失的两个数字(hard)

消失的两个数字&#xff08;hard&#xff09; 题⽬描述&#xff1a;解法&#xff08;位运算&#xff09;&#xff1a;Java 算法代码&#xff1a;更简便代码 题⽬链接&#xff1a;⾯试题 17.19. 消失的两个数字 题⽬描述&#xff1a; 给定⼀个数组&#xff0c;包含从 1 到 N 所有…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 &#xff08;FL&#xff09; 支持跨分布式客户端进行协作模型训练&#xff0c;而无需共享原始数据&#xff0c;这使其成为在互联和自动驾驶汽车 &#xff08;CAV&#xff09; 等领域保护隐私的机器学习的一种很有前途的方法。然而&#xff0c;最近的研究表明&…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口&#xff08;适配服务端返回 Token&#xff09; export const login async (code, avatar) > {const res await http…...

反射获取方法和属性

Java反射获取方法 在Java中&#xff0c;反射&#xff08;Reflection&#xff09;是一种强大的机制&#xff0c;允许程序在运行时访问和操作类的内部属性和方法。通过反射&#xff0c;可以动态地创建对象、调用方法、改变属性值&#xff0c;这在很多Java框架中如Spring和Hiberna…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...