Flink 中 JDBC Connector 使用详解
1. 背景
在实时计算或离线任务中,往往需要与关系型数据库交互,例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector,可以方便地将流式数据写入或读取数据库。
本文将介绍 Flink JDBC Connector 的基础用法、配置方法以及注意事项,帮助开发者更好地集成数据库操作。
2. JDBC Connector 的基础概念
JDBC Connector 是 Flink 官方提供的一个用于连接关系型数据库的工具包,支持:
- Source:从数据库读取数据。
- Sink:将数据写入数据库。
使用 JDBC Connector 可以实现对数据库的实时写入,也可以用作批量操作的工具。
3. Maven 依赖
在项目中添加 Flink JDBC 依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.17.0</version> <!-- 根据实际使用的 Flink 版本调整 -->
</dependency>
如果使用 MySQL 数据库,还需添加 MySQL 驱动:
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version> <!-- MySQL 驱动版本 -->
</dependency>
4. JDBC Connector 的使用
4.1 写入数据库(Sink)
以下是一个将流式数据写入 MySQL 的示例:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcSink;public class JdbcSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模拟输入数据env.fromElements(Tuple2.of(1, "Alice"),Tuple2.of(2, "Bob"),Tuple2.of(3, "Charlie")).addSink(JdbcSink.sink("INSERT INTO users (id, name) VALUES (?, ?)", // SQL 语句(ps, t) -> {ps.setInt(1, t.f0); // 设置第一个参数为 IDps.setString(2, t.f1); // 设置第二个参数为 Name},JdbcSink.DefaultJdbcExecutionOptions.builder().withBatchSize(100) // 批量写入大小.build(),() -> JdbcSink.defaultJdbcConnectionProvider("jdbc:mysql://localhost:3306/testdb", // 数据库 URL"root", // 用户名"password" // 密码)));env.execute("Flink JDBC Sink Example");}
}
关键点解析
- SQL 语句:支持动态参数
?占位符,适合批量插入。 - 参数绑定:通过 Lambda 表达式绑定输入数据与 SQL 参数。
- 批量写入:通过
JdbcExecutionOptions配置批量写入策略。
4.2 从数据库读取数据(Source)
以下是一个从 MySQL 读取数据并打印的示例:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;public class JdbcSourceExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<Integer, String>> sourceStream = env.createInput(JdbcInputFormat.buildJdbcInputFormat().setDrivername("com.mysql.cj.jdbc.Driver") // JDBC 驱动.setDBUrl("jdbc:mysql://localhost:3306/testdb") // 数据库 URL.setUsername("root") // 用户名.setPassword("password") // 密码.setQuery("SELECT id, name FROM users") // SQL 查询.setRowTypeInfo(Types.TUPLE(Types.INT, Types.STRING)) // 结果类型.finish());sourceStream.print();env.execute("Flink JDBC Source Example");}
}
关键点解析
- SQL 查询:需要提供完整的查询语句。
- 结果类型:通过
RowTypeInfo显式定义数据库返回的数据结构。
5. JDBC Connector 的配置选项
5.1 批量写入配置
通过 JdbcExecutionOptions 可调整写入策略:
withBatchSize(int):设置批量写入大小(默认为 500)。withBatchIntervalMs(long):设置批量写入的时间间隔。withMaxRetries(int):设置写入失败后的最大重试次数。
5.2 数据库连接池
Flink JDBC Connector 默认使用单个连接执行操作。对于高并发需求,可以结合 HikariCP 等连接池框架优化性能。
6. 注意事项
-
事务支持:
- 默认情况下,JDBC Sink 使用批量提交,未显式开启事务。如果需要事务一致性,可以通过 JDBC 驱动自行管理事务。
-
数据库性能瓶颈:
- 数据库可能成为瓶颈,建议使用批量写入和合适的索引优化性能。
- 高写入场景可考虑切换到 Kafka、HBase 等专为实时写入设计的存储系统。
-
错误处理:
- 可通过
withMaxRetries设置重试次数。 - 对于未能成功写入的数据,可考虑使用侧输出流保存以供后续处理。
- 可通过
-
分布式读取:
- 默认情况下,Flink JDBC Source 在单线程上运行,性能可能有限。可以使用分片或其他工具提升读取性能。
7. 总结
Flink JDBC Connector 是一个简单而高效的工具,适用于实时计算场景下与关系型数据库的交互。无论是数据写入还是读取,都可以通过简单配置快速实现。但对于高并发和大规模数据场景,需要根据业务需求调整策略。
相关文章:
Flink 中 JDBC Connector 使用详解
1. 背景 在实时计算或离线任务中,往往需要与关系型数据库交互,例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector,可以方便地将流式数据写入或读取数据库。 本文将介绍 Flink JDBC Connector 的基础用法、配置方法以及注意事…...
【Linux打怪升级记 | 报错02】-bash: 警告:setlocale: LC_TIME: 无法改变区域选项 (zh_CN.UTF-8)
🗺️博客地图 📍1、报错发现 📍2、原因分析 📍3、解决办法 📍4、测试结果 1、报错发现 装好了CentOS操作系统,使用ssh远程登陆CentOS,出现如下告警信息: bash: 警告:setlocale…...
未来已来?AI技术革新改变我们的生活
在21世纪的今天,人工智能(AI)不再是一个遥远的概念,而是逐渐渗透到我们生活的方方面面。从智能家居到自动驾驶汽车,从个性化推荐系统到医疗诊断辅助,AI技术正在以惊人的速度发展,并深刻地影响着…...
【Linux】进程的生命之旅——诞生、消逝与守候(fork/exit/wait)
🎬 个人主页:谁在夜里看海. 📖 个人专栏:《C系列》《Linux系列》《算法系列》 ⛰️ 一念既出,万山无阻 目录 📖一、进程创建 1.fork函数 📚高层封装特性 📚fork返回值 2.写时拷…...
使用vcpkg自动链接tinyxml2时莫名链接其他库(例如boost)
使用vcpkg自动链接tinyxml2时莫名链接其他库(例如boost) vcpkg的自动链接功能非常方便,但在某些情况下会出现过度链接的问题。 链接错误症状 以tinyxml2为例,程序中调用tinyxml2的函数后,若vcpkg中同时存在opencv和…...
【去毛刺】OpenCV图像处理基础:腐蚀与膨胀操作入门
在数字图像处理中,形态学操作是一种常用的技术,用于提取图像中的特定形状或特征。其中,腐蚀(Erosion)和膨胀(Dilation)是两种基本的形态学运算。本文将通过一个简单的例子来演示如何使用Python中…...
道可云人工智能元宇宙每日资讯|第三届京西地区发展论坛成功召开
道可云元宇宙每日简报(2024年11月27日)讯,今日元宇宙新鲜事有: 工信部等十二部门印发《5G规模化应用“扬帆”行动升级方案》 11月25日,工业和信息化部等十二部门印发《5G规模化应用“扬帆”行动升级方案》。《方案》…...
若依框架部署在网站一个子目录下(/admin)问题(
部署在子目录下首先修改vue.config.js文件: 问题一:登陆之后跳转到了404页面问题,解决办法如下: src/router/index.js 把404页面直接变成了首页(大佬有啥优雅的解决办法求告知) 问题二:退出登录…...
【ue5】UE5运行时下载视频/UE5 runtime download video(MP4)
插件还是老朋友。 节点的content type要打对。 (参照表:MIME 类型(MIME Type)完整对照表 - 免费在线工具) 结果展示:...
对比C++,Rust在内存安全上做的努力
简介 近年来,越来越多的组织表示,如果新项目在技术选型时需要使用系统级开发语言,那么不要选择使用C/C这种内存不安全的系统语言,推荐使用内存安全的Rust作为替代。 谷歌也声称,Android 的安全漏洞,从 20…...
如何利用 Qt 的模块化架构组织大型项目
目录 1. 大型项目的架构设计 1.1 分层架构 1.2 事件驱动与异步架构 2. 模块划分与职责分离 2.1 功能模块划分 2.2 模块之间的依赖管理 3. 跨平台开发与模块复用 在大型软件项目中,随着代码量的增加和功能的扩展,项目的复杂度会显著提升。没有良好…...
探索Python词云库WordCloud的奥秘
文章目录 探索Python词云库WordCloud的奥秘1. 背景介绍:为何选择WordCloud?2. WordCloud库简介3. 安装WordCloud库4. 简单函数使用方法5. 应用场景示例6. 常见Bug及解决方案7. 总结 探索Python词云库WordCloud的奥秘 1. 背景介绍:为何选择Wo…...
MySQL根据idb文件恢复数据
首先得有对应表的idb文件以及建表语句 1.首先在新数据库建表 CREATE TABLE sys_menu (id bigint(20) NOT NULL,parent_id bigint(20) NULL DEFAULT NULL,name varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,type int(11) NULL DEFAULT …...
hadoop-mapreduce词频统计
一、Map Reduce主要阶段 二、词频统计示例 0.MapReduce 词频统计(Word Count)示例图 1. Input 阶段(输入阶段) 输入数据是一段文本,如下: Hadoop is a big data framework. Hadoop can store vast data. Hadoop processes big …...
精心修炼Java并发编程(JUC)-volatile与synchronized关键字
volatile volatile 是 JVM 提供的 最轻量级的同步机制,中文意思是不稳定的,易变的,用 volatile 修饰变量是为了保证变量在多线程中的可见性,它表达的含义是:告诉编译器,对这个变量的读写,需要基…...
【ROS2】ROS2 与 ROS1 编码方式对比(Python实现)
目录 一、初始化和关闭节点二、发布者三、订阅者四、服务端五、客户端六、参数管理七、日志记录八、生命周期管理 ROS2 在 Python 编程中引入了一些新的概念和 API,这些变化使得代码更加模块化和易于维护。特别是 rclpy 库提供了更丰富的功能和更好的错误处理机制&a…...
ElasticSearch的下载和基本使用(通过apifox)
1.概述 一个开源的高扩展的分布式全文检索引擎,近乎实时的存储,检索数据 2.安装路径 Elasticsearch 7.8.0 | Elastic 安装后启动elasticsearch-7.8.0\bin里的elasticsearch.bat文件, 启动后就可以访问本地的es库http://localhost:9200/ …...
城市轨道交通运营控制指挥中心设计方案
为某城市轨道交通运营控制指挥中心(OCC)的设计提供方案时,我们需要考虑到多个方面的需求,包括系统架构、设备选择、功能实现、数据流与监控、通信管理等。以下是一个综合性的设计方案,涉及系统硬件和软件的选择、布局规划、安全性等方面,以确保指挥中心的高效运作、实时监…...
多目标优化算法:多目标河马优化算法(MOHOA)求解ZDT1、ZDT2、ZDT3、ZDT4、ZDT6,提供完整MATLAB代码
一、河马优化算法 河马优化算法(Hippopotamus optimization algorithm,HO)由Amiri等人于2024年提出的一种模拟自然界中河马觅食行为的新型群体智能优化算法。该算法由Mohammad Hussein Amiri等人于2024年2月发表在Nature旗下子刊《Scientifi…...
线程与进程的个人理解
进程(Process): 一个程序在执行时,操作系统为其分配的资源(如内存、CPU 时间等)构成了一个进程。每个进程都有自己的独立的地址空间、堆栈和局部变量,它们之间不共享内存(除非通过特…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...
android13 app的触摸问题定位分析流程
一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...
LCTF液晶可调谐滤波器在多光谱相机捕捉无人机目标检测中的作用
中达瑞和自2005年成立以来,一直在光谱成像领域深度钻研和发展,始终致力于研发高性能、高可靠性的光谱成像相机,为科研院校提供更优的产品和服务。在《低空背景下无人机目标的光谱特征研究及目标检测应用》这篇论文中提到中达瑞和 LCTF 作为多…...
Python 高级应用10:在python 大型项目中 FastAPI 和 Django 的相互配合
无论是python,或者java 的大型项目中,都会涉及到 自身平台微服务之间的相互调用,以及和第三发平台的 接口对接,那在python 中是怎么实现的呢? 在 Python Web 开发中,FastAPI 和 Django 是两个重要但定位不…...
docker 部署redis集群 配置
docker的网络模式 网桥模式每次重启容器都有可能导致容器ip地址变化,需要固定ip的自己自定义网络,这里介绍的是默认网络模式 docker创建容器 docker run --name redis6379 -p 6379:6379 -p 16379:16379 -v /etc/redis/redis6379:/etc/redis -d --r…...
十、【ESP32开发全栈指南: TCP客户端】
一、TCP协议核心特性回顾 TCP与UDP关键差异 特性TCPUDP连接方式面向连接 (三次握手)无连接可靠性可靠传输 (重传/排序/校验)尽力交付数据顺序保证数据按序到达不保证顺序流控制滑动窗口机制无流控制传输效率协议开销大头部开销小适用场景文件传输、网页浏览实时音视频、广播通…...
Flask与Celery 项目应用(shared_task使用)
目录 1. 项目概述主要功能技术栈 2. 项目结构3. 环境设置创建虚拟环境并安装依赖主要依赖 4. 应用配置Flask应用初始化 (__init__.py)Celery应用初始化 (make_celery.py) 5. 定义Celery任务 (tasks.py)任务说明 6. 创建API端点 (views.py)API端点说明 7. 前端界面 (index.html)…...
