Flink之JDBC Sink
这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务
- 非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 构建jdbc sinkSinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句new JdbcStatementBuilder<CustomizeBean>() {@Overridepublic void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withBatchSize(10) // 批次大小,条数.withBatchIntervalMs(5000) // 批次最大等待时间.withMaxRetries(1) // 重复次数.build(), // 写入参数配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false").withUsername("root").withPassword("password").build() // jdbc信息配置);// 添加jdbc sinkcustomizeSource.addSink(jdbcSink);env.execute();}
}
- 事务代码
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;import javax.sql.XADataSource;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 每20秒作为checkpoint的一个周期env.enableCheckpointing(20000);// 两次checkpoint间隔最少是10秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);// 程序取消或者停止时不删除checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint必须在60秒结束,否则将丢弃env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只能有一个checkpointenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置EXACTLY_ONCE语义,默认就是这个env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoint存储位置env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpointSinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句(JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withMaxRetries(0) // 设置重复次数.withBatchSize(25) // 设置批次大小,数据条数.withBatchIntervalMs(1000) // 批次最大等待时间.build(),JdbcExactlyOnceOptions.builder()// 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的.withTransactionPerConnection(true).build(),(SerializableSupplier<XADataSource>) () -> {// XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置urlmysqlXADataSource.setUser("root"); // 设置用户mysqlXADataSource.setPassword("password"); // 设置密码return mysqlXADataSource;});// 添加jdbc sinkcustomizeSource.addSink(exactlyOneJdbcSink);env.execute();}
}
- pom依赖
<!-- 在原有的依赖中加入下面两个内容 --><!-- JDBC connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><!-- mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>
- 结果

jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.
相关文章:
Flink之JDBC Sink
这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务 非事务代码 import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.…...
lifecycleScope Unresolved reference
描述 导入了lifecycle.lifecycleScope,但是在activity中使用lifecycleScope报错出现Unresolved reference找不到引用。 导包 import androidx.lifecycle.lifecycleScope使用 lifecycleScope.launch(Dispatchers.IO) {...}错误 方案 代码中的activity继承Activ…...
P5960 【模板】差分约束算法
【模板】差分约束算法 题目描述 给出一组包含 m m m 个不等式,有 n n n 个未知数的形如: { x c 1 − x c 1 ′ ≤ y 1 x c 2 − x c 2 ′ ≤ y 2 ⋯ x c m − x c m ′ ≤ y m \begin{cases} x_{c_1}-x_{c_1}\leq y_1 \\x_{c_2}-x_{c_2} \leq y_2 \\…...
VSCode---通过ctrl+鼠标滚动改变字体大小
打开设置然后在右边输editor.mouseWheelZoo勾选即可实现鼠标滚动改变字体大小 4.这种设置的字体大小是固定的...
视频监控汇聚平台EasyCVR视频分享页面WebRTC流地址播放不了是什么原因?
开源EasyDarwin视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中,将分散的各类视频资源进行统一汇聚、整合、集中管理,在视频监控播放上,TSINGSEE青犀视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放,可同时播放多…...
Libevent开源库的介绍与应用
libeventhttps://libevent.org/ 一、初识 1、libevent介绍 Libevent 是一个用C语言编写的、轻量级的开源高性能事件通知库,主要有以下几个亮点:事件驱动( event-driven),高性能;轻量级,专注于网络ÿ…...
【LNMP】LNMP
LNMP:是目前成熟的企业网站的应用模式之一,指的是一套协同工作的系统和相关软件;能够提供静态页面服务,也可以提供动态web服务 L Linux系统,操作系统N Nginx网站服务,前端,提供前端的静态…...
uniapp自定义头部导航栏
有时我们需要一些特殊的头部导航栏页面,取消传统的导航栏,来增加页面的美观度。 下面我就教大家如何配置: 一、效果图 二、实现 首先在uniapp中打开pages.json配置文件,在单个路由配置style里面设置导航栏样式nav…...
Django实现音乐网站 ⑹
使用Python Django框架制作一个音乐网站, 本篇主要是在添加编辑过程中对后台歌手功能优化及表模型名称修改、模型继承内容。 目录 表模型名称修改 模型继承 创建抽象基类 其他模型继承 更新表结构 歌手新增、编辑优化 表字段名称修改 隐藏单曲数和专辑数 姓…...
dubbo-helloworld示例
1、工程架构 2、创建模块 (1)创建父工程,引入公共依赖 pom.xml依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></depende…...
电脑ADB连接手机的方式通过网络无法adb连接手机的问题(已解决)
首先电脑要下载adb工具,将压缩包解压到C盘:https://download.csdn.net/download/qq_43445867/87975072 1、使用USB线连接 打开手机USB调试;PC端安装手机USB驱动。 1.打开DOS命令窗口,进入adb文件夹,输入adb.exe devices回车列出设…...
79 | Python数据分析篇 —— Pandas中groupby聚合操作和透视表基础
Pandas是Python中最常用的数据处理库之一,它提供了高效的数据结构和数据分析工具。在进行数据分析和机器学习等领域的工作时,Pandas是必不可少的库之一。本文将介绍Pandas中的groupby聚合操作和透视表,包括groupby操作、透视表的基础知识、练习题和答案。 文章目录 Pandas中…...
iOS 搭建组件化私有库
一、创建私有库索引 步骤1是在没有索引库的情况下或者是新增索引的时候才需要用到(创建基础组件库) 首先在码云上建立一个私有库索引,起名为SYComponentSpec 二、本地添加私有库索引 添加私有库索引 pod repo add SYComponentSpec https:/…...
迅为全国产龙芯3A5000电脑运行统信UOS、银河麒麟、loongnix系统
iTOP-3A5000开发板采用全国产龙芯3A5000处理器,基于龙芯自主指令系统 (LoongArch) 的LA464微结构,并进一步提升频率,降低功耗,优化性能。在与龙芯3A4000处理器保持引脚兼容的基础上,频率提升至2.5GHZ,功耗降…...
枫叶时代:打造中国特色的传统文化IP
近年来,取材于传统文化的影视作品在文化产业市场受到前所未有的关注。作为一种兼具辨识度、影响力和流量变现能力的文化符号,影视IP既是文化产业的一个重要环节,也是国家文化软实力的直接体现。优秀的影视IP可以超越文字、语言、民族的障碍&a…...
一条sql语句在mysql中如何执行(查询+更新)
文章目录 一 MySQL 基础架构1.1 MySQL 基本架构1.2 Server 层基本组件介绍1) 连接器2) 查询缓存(MySQL 8.0 版本后移除)3) 分析器4) 优化器5) 执行器 二 语句分析2.1 查询语句2.2 更新语句为什么要用两个日志模块,用一个日志模块不行吗?为什么必须有“两阶段提交”…...
漫画 | TCP/IP之大明邮差
后记: 1973年,卡恩与瑟夫开发出了网络中最核心的两个协议:TCP协议和IP协议,随后为了验证两个协议的可用性,他们做了一个实验,在多个异构网络中进行数据传输,数据包在经过近10万公里的旅程后到达…...
Zookeeper和Nacos的区别
Zookeeper和Nacos的区别 在分布式系统中,注册中心充当着重要角色,是服务发现、客户端负载均衡中不可缺少的一员。注册中心除了能够实现基本的功能外,他的稳定性、可用性和健壮性对整个分布式系统的流畅运行影响重大。zookeeper和nacos可能是…...
O3DE的Pass
Pass介绍 Pass是具有输入和输出的渲染过程。 在最终渲染帧中看到的每个细节都是通过一系列Pass(前一个Pass的输出是下一个Pass的输入)计算出来的。Pass可以生成图像(作为纹理、缓冲区或渲染目标)。每个图像都包含关于场景的特定…...
如何建立含有逻辑删除字段的唯一索引
业务场景 在实际工作当中,遇到一个场景,就是在用户注册时,名字要全局唯一,当然,我们是可以对用户进行删除的,你会怎么去做? 分析 一般来说,我们可以在用户注册请求时,…...
避开CASA模型NPP估算的那些坑:我的IDL代码调试与参数优化心得
避开CASA模型NPP估算的那些坑:我的IDL代码调试与参数优化心得 第一次用CASA模型估算NPP时,我对着屏幕上的异常结果发呆了半小时——明明按照教程一步步操作,为什么输出的NPP值会出现大面积负值?后来才发现,温度胁迫因子…...
三星固件下载终极指南:Bifrost跨平台工具完整使用手册
三星固件下载终极指南:Bifrost跨平台工具完整使用手册 【免费下载链接】Bifrost Cross-platform tool for downloading Samsung mobile device firmware. 项目地址: https://gitcode.com/gh_mirrors/sa/Bifrost 还在为三星设备找不到官方固件而烦恼吗&#x…...
WCH RISC-V MCU开发:在MounRiver Studio里一键切换GCC8和GCC12工具链(附内存占用对比)
WCH RISC-V MCU开发实战:MounRiver Studio工具链切换与性能优化指南 对于嵌入式开发者而言,选择合适的编译器工具链往往能在资源受限的MCU环境中带来显著性能提升。WCH基于RISC-V架构的微控制器凭借其高性价比和丰富外设资源,正逐渐成为物联网…...
Spring Validation嵌套校验踩坑实录:用@Valid搞定订单里商品列表的深度验证
Spring Validation嵌套校验实战:用Valid解决订单商品列表的深度验证难题 电商系统中订单创建接口的复杂性往往体现在数据结构的嵌套层级上。一个典型的订单对象不仅包含基础订单信息,还会内嵌商品列表、优惠券、收货地址等多个子对象。当后端接收到这样的…...
手把手教你搭建低成本雷达测试环境:从暗室搭建到模拟器参数设置(基于国产设备实战)
低成本雷达测试环境搭建实战:国产设备方案与操作指南 在车载毫米波雷达研发领域,测试环节往往占据着项目预算的显著部分。传统方案依赖进口设备和专业暗室,动辄数百万元的投入让许多中小型团队望而却步。本文将揭示一个行业内的真实情况&…...
保姆级教程:在Qt 6.5桌面应用中集成WebRTC实现一对一视频通话(附完整源码)
Qt 6.5与WebRTC深度整合实战:构建企业级视频通话解决方案 1. 环境配置与依赖管理 在开始Qt 6.5与WebRTC的集成之旅前,我们需要搭建一个稳定的开发环境。不同于普通的Qt项目,这种集成对工具链和系统配置有特殊要求。 推荐开发环境配置&…...
2026年制造业员工入转调离全流程自动化趋势?——从“系统孤岛”到“Agent全闭环”的效能革命
2026年,全球制造业正处于从“设备自动化”向“组织智能化”跨越的关键拐点。 随着人口红利消退与用工结构性矛盾加剧,工厂对于人力资源的精准配置已不仅是行政命题,而是直接影响产线柔性与交付周期的核心生产力命题。 传统的HR管理模式在面对…...
别再只用默认模型了!手把手教你用SnowNLP训练专属情感分析模型(附完整代码)
突破SnowNLP默认模型局限:打造高精度领域情感分析系统的实战指南 从"水土不服"到精准预测:为什么你需要自定义情感模型 去年夏天,我们的产品团队在分析用户反馈时遇到了一个诡异现象:明明用户留言中充斥着"卡顿严重…...
为什么顶尖教研团队已弃用传统搜索引擎?Perplexity教育搜索的3个颠覆性能力,今天必须掌握
更多请点击: https://intelliparadigm.com 第一章:为什么顶尖教研团队已弃用传统搜索引擎? 当清华大学智能教育实验室在2023年构建AI辅助备课系统时,其技术白皮书明确指出:“Google Scholar 和通用搜索引擎的召回率在…...
【免费下载】 MySQL Connector/Java 8.0.29 驱动包
MySQL Connector/Java 8.0.29 驱动包 【下载地址】MySQLConnectorJava8.0.29驱动包 本仓库提供了一个用于Java应用程序连接MySQL数据库的JDBC驱动包。具体文件为 mysql-connector-java-8.0.29.jar,适用于MySQL数据库版本8.0.29。 项目地址: https://gitcode.com/o…...
