当前位置: 首页 > news >正文

Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务

  • 非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 构建jdbc sinkSinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句new JdbcStatementBuilder<CustomizeBean>() {@Overridepublic void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withBatchSize(10) // 批次大小,条数.withBatchIntervalMs(5000) // 批次最大等待时间.withMaxRetries(1) // 重复次数.build(), // 写入参数配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false").withUsername("root").withPassword("password").build() // jdbc信息配置);// 添加jdbc sinkcustomizeSource.addSink(jdbcSink);env.execute();}
}
  • 事务代码
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;import javax.sql.XADataSource;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 每20秒作为checkpoint的一个周期env.enableCheckpointing(20000);// 两次checkpoint间隔最少是10秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);// 程序取消或者停止时不删除checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint必须在60秒结束,否则将丢弃env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只能有一个checkpointenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置EXACTLY_ONCE语义,默认就是这个env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoint存储位置env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpointSinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句(JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withMaxRetries(0) // 设置重复次数.withBatchSize(25) // 设置批次大小,数据条数.withBatchIntervalMs(1000) // 批次最大等待时间.build(),JdbcExactlyOnceOptions.builder()// 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的.withTransactionPerConnection(true).build(),(SerializableSupplier<XADataSource>) () -> {// XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置urlmysqlXADataSource.setUser("root"); // 设置用户mysqlXADataSource.setPassword("password"); // 设置密码return mysqlXADataSource;});// 添加jdbc sinkcustomizeSource.addSink(exactlyOneJdbcSink);env.execute();}
}
  • pom依赖
        <!-- 在原有的依赖中加入下面两个内容 --><!-- JDBC connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><!-- mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>
  • 结果
    在这里插入图片描述
    jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

相关文章:

Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务 非事务代码 import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.…...

lifecycleScope Unresolved reference

描述 导入了lifecycle.lifecycleScope&#xff0c;但是在activity中使用lifecycleScope报错出现Unresolved reference找不到引用。 导包 import androidx.lifecycle.lifecycleScope使用 lifecycleScope.launch(Dispatchers.IO) {...}错误 方案 代码中的activity继承Activ…...

P5960 【模板】差分约束算法

【模板】差分约束算法 题目描述 给出一组包含 m m m 个不等式&#xff0c;有 n n n 个未知数的形如&#xff1a; { x c 1 − x c 1 ′ ≤ y 1 x c 2 − x c 2 ′ ≤ y 2 ⋯ x c m − x c m ′ ≤ y m \begin{cases} x_{c_1}-x_{c_1}\leq y_1 \\x_{c_2}-x_{c_2} \leq y_2 \\…...

VSCode---通过ctrl+鼠标滚动改变字体大小

打开设置然后在右边输editor.mouseWheelZoo勾选即可实现鼠标滚动改变字体大小 4.这种设置的字体大小是固定的...

视频监控汇聚平台EasyCVR视频分享页面WebRTC流地址播放不了是什么原因?

开源EasyDarwin视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;在视频监控播放上&#xff0c;TSINGSEE青犀视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放&#xff0c;可同时播放多…...

Libevent开源库的介绍与应用

libeventhttps://libevent.org/ 一、初识 1、libevent介绍 Libevent 是一个用C语言编写的、轻量级的开源高性能事件通知库&#xff0c;主要有以下几个亮点&#xff1a;事件驱动&#xff08; event-driven&#xff09;&#xff0c;高性能;轻量级&#xff0c;专注于网络&#xff…...

【LNMP】LNMP

LNMP&#xff1a;是目前成熟的企业网站的应用模式之一&#xff0c;指的是一套协同工作的系统和相关软件&#xff1b;能够提供静态页面服务&#xff0c;也可以提供动态web服务 L Linux系统&#xff0c;操作系统N Nginx网站服务&#xff0c;前端&#xff0c;提供前端的静态…...

uniapp自定义头部导航栏

有时我们需要一些特殊的头部导航栏页面&#xff0c;取消传统的导航栏&#xff0c;来增加页面的美观度。 下面我就教大家如何配置&#xff1a; 一、效果图 二、实现 首先在uniapp中打开pages.json配置文件&#xff0c;在单个路由配置style里面设置导航栏样式​​​​​​nav…...

Django实现音乐网站 ⑹

使用Python Django框架制作一个音乐网站&#xff0c; 本篇主要是在添加编辑过程中对后台歌手功能优化及表模型名称修改、模型继承内容。 目录 表模型名称修改 模型继承 创建抽象基类 其他模型继承 更新表结构 歌手新增、编辑优化 表字段名称修改 隐藏单曲数和专辑数 姓…...

dubbo-helloworld示例

1、工程架构 2、创建模块 &#xff08;1&#xff09;创建父工程,引入公共依赖 pom.xml依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></depende…...

电脑ADB连接手机的方式通过网络无法adb连接手机的问题(已解决)

首先电脑要下载adb工具&#xff0c;将压缩包解压到C盘&#xff1a;https://download.csdn.net/download/qq_43445867/87975072 1、使用USB线连接 打开手机USB调试&#xff1b;PC端安装手机USB驱动。 1.打开DOS命令窗口&#xff0c;进入adb文件夹,输入adb.exe devices回车列出设…...

79 | Python数据分析篇 —— Pandas中groupby聚合操作和透视表基础

Pandas是Python中最常用的数据处理库之一,它提供了高效的数据结构和数据分析工具。在进行数据分析和机器学习等领域的工作时,Pandas是必不可少的库之一。本文将介绍Pandas中的groupby聚合操作和透视表,包括groupby操作、透视表的基础知识、练习题和答案。 文章目录 Pandas中…...

iOS 搭建组件化私有库

一、创建私有库索引 步骤1是在没有索引库的情况下或者是新增索引的时候才需要用到&#xff08;创建基础组件库&#xff09; 首先在码云上建立一个私有库索引&#xff0c;起名为SYComponentSpec 二、本地添加私有库索引 添加私有库索引 pod repo add SYComponentSpec https:/…...

迅为全国产龙芯3A5000电脑运行统信UOS、银河麒麟、loongnix系统

iTOP-3A5000开发板采用全国产龙芯3A5000处理器&#xff0c;基于龙芯自主指令系统 (LoongArch) 的LA464微结构&#xff0c;并进一步提升频率&#xff0c;降低功耗&#xff0c;优化性能。在与龙芯3A4000处理器保持引脚兼容的基础上&#xff0c;频率提升至2.5GHZ&#xff0c;功耗降…...

枫叶时代:打造中国特色的传统文化IP

近年来&#xff0c;取材于传统文化的影视作品在文化产业市场受到前所未有的关注。作为一种兼具辨识度、影响力和流量变现能力的文化符号&#xff0c;影视IP既是文化产业的一个重要环节&#xff0c;也是国家文化软实力的直接体现。优秀的影视IP可以超越文字、语言、民族的障碍&a…...

一条sql语句在mysql中如何执行(查询+更新)

文章目录 一 MySQL 基础架构1.1 MySQL 基本架构1.2 Server 层基本组件介绍1) 连接器2) 查询缓存(MySQL 8.0 版本后移除)3) 分析器4) 优化器5) 执行器 二 语句分析2.1 查询语句2.2 更新语句为什么要用两个日志模块&#xff0c;用一个日志模块不行吗?为什么必须有“两阶段提交”…...

漫画 | TCP/IP之大明邮差

后记&#xff1a; 1973年&#xff0c;卡恩与瑟夫开发出了网络中最核心的两个协议&#xff1a;TCP协议和IP协议&#xff0c;随后为了验证两个协议的可用性&#xff0c;他们做了一个实验&#xff0c;在多个异构网络中进行数据传输&#xff0c;数据包在经过近10万公里的旅程后到达…...

Zookeeper和Nacos的区别

Zookeeper和Nacos的区别 在分布式系统中&#xff0c;注册中心充当着重要角色&#xff0c;是服务发现、客户端负载均衡中不可缺少的一员。注册中心除了能够实现基本的功能外&#xff0c;他的稳定性、可用性和健壮性对整个分布式系统的流畅运行影响重大。zookeeper和nacos可能是…...

O3DE的Pass

Pass介绍 Pass是具有输入和输出的渲染过程。 在最终渲染帧中看到的每个细节都是通过一系列Pass&#xff08;前一个Pass的输出是下一个Pass的输入&#xff09;计算出来的。Pass可以生成图像&#xff08;作为纹理、缓冲区或渲染目标&#xff09;。每个图像都包含关于场景的特定…...

如何建立含有逻辑删除字段的唯一索引

业务场景 在实际工作当中&#xff0c;遇到一个场景&#xff0c;就是在用户注册时&#xff0c;名字要全局唯一&#xff0c;当然&#xff0c;我们是可以对用户进行删除的&#xff0c;你会怎么去做&#xff1f; 分析 一般来说&#xff0c;我们可以在用户注册请求时&#xff0c…...

关联分析——从购物篮到推荐引擎的算法演进

1. 从购物篮到推荐引擎的关联分析演进 记得我第一次接触关联分析是在2015年&#xff0c;当时在一家零售企业做数据分析。老板扔给我一堆购物小票数据&#xff0c;让我找出"像啤酒和尿布那样的神奇组合"。那时候我才明白&#xff0c;原来数据里藏着这么多有趣的秘密。…...

计算机专业四类毕业生就业全景对比:数据背后的残酷真相与报考抉择

数据来源&#xff1a;麦可思研究院《2025中国本科生就业报告》、教育部《2025年全国普通高校毕业生就业质量年度报告》、工信部《2025网络安全产业人才发展报告》、牛客Moka《2025春季校园招聘白皮书》、代码随想录星球薪资报告、知乎/B站等平台校招实况、CSDN/虎嗅/21经济网等…...

系统整体设计方案

业务架构设计项目架构图业务流程设计文档向量整个流程从用户上传文档开始&#xff0c;用户通过前端页面选择文档并设置相关的组织标签和可见信后系统开始接收文档。这个阶段的关键是建立文档的基本记录信息&#xff0c;包括文件的Md5哈希值文件原始名文件大小上传用户等信息。系…...

灵活创建Windows安装介质:MediaCreationTool.bat的实用指南

灵活创建Windows安装介质&#xff1a;MediaCreationTool.bat的实用指南 【免费下载链接】MediaCreationTool.bat Universal MCT wrapper script for all Windows 10/11 versions from 1507 to 21H2! 项目地址: https://gitcode.com/gh_mirrors/me/MediaCreationTool.bat …...

一款实用汉化工具快速安装使用指南 -- cheat-engine中文版安装教程入口

文章目录安装方式安装后在哪里找到&#xff1f;&#xff08;重点补全&#xff09;使用说明温馨提示首先呢&#xff0c;大家可能在用 cheat engine (CE修改器&#xff09;的时候呢&#xff0c;可能总是使用的是英文版&#xff0c;用的不太舒服啊&#xff0c;这个时候呢&#xff…...

高效文件元数据管理:让Windows文件属性编辑变得简单直观

高效文件元数据管理&#xff1a;让Windows文件属性编辑变得简单直观 【免费下载链接】FileMeta Enable Explorer in Vista, Windows 7 and later to see, edit and search on tags and other metadata for any file type 项目地址: https://gitcode.com/gh_mirrors/fi/FileMe…...

OpenClaw技能扩展:安装Qwen3-4B专用插件实现代码生成

OpenClaw技能扩展&#xff1a;安装Qwen3-4B专用插件实现代码生成 1. 为什么需要Qwen3-4B专用技能 作为一个长期与代码打交道的开发者&#xff0c;我一直在寻找能够提升编码效率的工具。当我第一次接触OpenClaw时&#xff0c;最吸引我的不是它的基础自动化能力&#xff0c;而是…...

告别系统卡顿:RyTuneX全方位性能优化指南

告别系统卡顿&#xff1a;RyTuneX全方位性能优化指南 【免费下载链接】RyTuneX RyTuneX is a cutting-edge optimizer built with the WinUI 3 framework, designed to amplify the performance of Windows devices. Crafted for both Windows 10 and 11. 项目地址: https://…...

CW32L012FOC开源项目推进

作为一枚合格的“职场摸鱼学”实践者&#xff08;手动狗头&#xff09;&#xff0c;我坚决不建议在长假结束后立刻全身心扎进任务清单。那太不“可持续发展”了。 所以&#xff0c;今天上午&#xff0c;我可以理直气壮地把“整理工位”作为最高优先级。说得具体点&#xff0c;…...

ssh远程登录的时候同一个秘钥可以用于多个不同服务器

可以看到&#xff1a;这2台服务器使用了同一个秘钥&#xff0c;现在都可以正常登录&#xff1a;可以看出来第二个云服务器有安全更新没有激活赶快要更新了。...