当前位置: 首页 > news >正文

Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务

  • 非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 构建jdbc sinkSinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句new JdbcStatementBuilder<CustomizeBean>() {@Overridepublic void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withBatchSize(10) // 批次大小,条数.withBatchIntervalMs(5000) // 批次最大等待时间.withMaxRetries(1) // 重复次数.build(), // 写入参数配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false").withUsername("root").withPassword("password").build() // jdbc信息配置);// 添加jdbc sinkcustomizeSource.addSink(jdbcSink);env.execute();}
}
  • 事务代码
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;import javax.sql.XADataSource;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 每20秒作为checkpoint的一个周期env.enableCheckpointing(20000);// 两次checkpoint间隔最少是10秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);// 程序取消或者停止时不删除checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint必须在60秒结束,否则将丢弃env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只能有一个checkpointenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置EXACTLY_ONCE语义,默认就是这个env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoint存储位置env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpointSinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句(JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withMaxRetries(0) // 设置重复次数.withBatchSize(25) // 设置批次大小,数据条数.withBatchIntervalMs(1000) // 批次最大等待时间.build(),JdbcExactlyOnceOptions.builder()// 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的.withTransactionPerConnection(true).build(),(SerializableSupplier<XADataSource>) () -> {// XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置urlmysqlXADataSource.setUser("root"); // 设置用户mysqlXADataSource.setPassword("password"); // 设置密码return mysqlXADataSource;});// 添加jdbc sinkcustomizeSource.addSink(exactlyOneJdbcSink);env.execute();}
}
  • pom依赖
        <!-- 在原有的依赖中加入下面两个内容 --><!-- JDBC connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><!-- mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>
  • 结果
    在这里插入图片描述
    jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

相关文章:

Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务 非事务代码 import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.…...

lifecycleScope Unresolved reference

描述 导入了lifecycle.lifecycleScope&#xff0c;但是在activity中使用lifecycleScope报错出现Unresolved reference找不到引用。 导包 import androidx.lifecycle.lifecycleScope使用 lifecycleScope.launch(Dispatchers.IO) {...}错误 方案 代码中的activity继承Activ…...

P5960 【模板】差分约束算法

【模板】差分约束算法 题目描述 给出一组包含 m m m 个不等式&#xff0c;有 n n n 个未知数的形如&#xff1a; { x c 1 − x c 1 ′ ≤ y 1 x c 2 − x c 2 ′ ≤ y 2 ⋯ x c m − x c m ′ ≤ y m \begin{cases} x_{c_1}-x_{c_1}\leq y_1 \\x_{c_2}-x_{c_2} \leq y_2 \\…...

VSCode---通过ctrl+鼠标滚动改变字体大小

打开设置然后在右边输editor.mouseWheelZoo勾选即可实现鼠标滚动改变字体大小 4.这种设置的字体大小是固定的...

视频监控汇聚平台EasyCVR视频分享页面WebRTC流地址播放不了是什么原因?

开源EasyDarwin视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;在视频监控播放上&#xff0c;TSINGSEE青犀视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放&#xff0c;可同时播放多…...

Libevent开源库的介绍与应用

libeventhttps://libevent.org/ 一、初识 1、libevent介绍 Libevent 是一个用C语言编写的、轻量级的开源高性能事件通知库&#xff0c;主要有以下几个亮点&#xff1a;事件驱动&#xff08; event-driven&#xff09;&#xff0c;高性能;轻量级&#xff0c;专注于网络&#xff…...

【LNMP】LNMP

LNMP&#xff1a;是目前成熟的企业网站的应用模式之一&#xff0c;指的是一套协同工作的系统和相关软件&#xff1b;能够提供静态页面服务&#xff0c;也可以提供动态web服务 L Linux系统&#xff0c;操作系统N Nginx网站服务&#xff0c;前端&#xff0c;提供前端的静态…...

uniapp自定义头部导航栏

有时我们需要一些特殊的头部导航栏页面&#xff0c;取消传统的导航栏&#xff0c;来增加页面的美观度。 下面我就教大家如何配置&#xff1a; 一、效果图 二、实现 首先在uniapp中打开pages.json配置文件&#xff0c;在单个路由配置style里面设置导航栏样式​​​​​​nav…...

Django实现音乐网站 ⑹

使用Python Django框架制作一个音乐网站&#xff0c; 本篇主要是在添加编辑过程中对后台歌手功能优化及表模型名称修改、模型继承内容。 目录 表模型名称修改 模型继承 创建抽象基类 其他模型继承 更新表结构 歌手新增、编辑优化 表字段名称修改 隐藏单曲数和专辑数 姓…...

dubbo-helloworld示例

1、工程架构 2、创建模块 &#xff08;1&#xff09;创建父工程,引入公共依赖 pom.xml依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></depende…...

电脑ADB连接手机的方式通过网络无法adb连接手机的问题(已解决)

首先电脑要下载adb工具&#xff0c;将压缩包解压到C盘&#xff1a;https://download.csdn.net/download/qq_43445867/87975072 1、使用USB线连接 打开手机USB调试&#xff1b;PC端安装手机USB驱动。 1.打开DOS命令窗口&#xff0c;进入adb文件夹,输入adb.exe devices回车列出设…...

79 | Python数据分析篇 —— Pandas中groupby聚合操作和透视表基础

Pandas是Python中最常用的数据处理库之一,它提供了高效的数据结构和数据分析工具。在进行数据分析和机器学习等领域的工作时,Pandas是必不可少的库之一。本文将介绍Pandas中的groupby聚合操作和透视表,包括groupby操作、透视表的基础知识、练习题和答案。 文章目录 Pandas中…...

iOS 搭建组件化私有库

一、创建私有库索引 步骤1是在没有索引库的情况下或者是新增索引的时候才需要用到&#xff08;创建基础组件库&#xff09; 首先在码云上建立一个私有库索引&#xff0c;起名为SYComponentSpec 二、本地添加私有库索引 添加私有库索引 pod repo add SYComponentSpec https:/…...

迅为全国产龙芯3A5000电脑运行统信UOS、银河麒麟、loongnix系统

iTOP-3A5000开发板采用全国产龙芯3A5000处理器&#xff0c;基于龙芯自主指令系统 (LoongArch) 的LA464微结构&#xff0c;并进一步提升频率&#xff0c;降低功耗&#xff0c;优化性能。在与龙芯3A4000处理器保持引脚兼容的基础上&#xff0c;频率提升至2.5GHZ&#xff0c;功耗降…...

枫叶时代:打造中国特色的传统文化IP

近年来&#xff0c;取材于传统文化的影视作品在文化产业市场受到前所未有的关注。作为一种兼具辨识度、影响力和流量变现能力的文化符号&#xff0c;影视IP既是文化产业的一个重要环节&#xff0c;也是国家文化软实力的直接体现。优秀的影视IP可以超越文字、语言、民族的障碍&a…...

一条sql语句在mysql中如何执行(查询+更新)

文章目录 一 MySQL 基础架构1.1 MySQL 基本架构1.2 Server 层基本组件介绍1) 连接器2) 查询缓存(MySQL 8.0 版本后移除)3) 分析器4) 优化器5) 执行器 二 语句分析2.1 查询语句2.2 更新语句为什么要用两个日志模块&#xff0c;用一个日志模块不行吗?为什么必须有“两阶段提交”…...

漫画 | TCP/IP之大明邮差

后记&#xff1a; 1973年&#xff0c;卡恩与瑟夫开发出了网络中最核心的两个协议&#xff1a;TCP协议和IP协议&#xff0c;随后为了验证两个协议的可用性&#xff0c;他们做了一个实验&#xff0c;在多个异构网络中进行数据传输&#xff0c;数据包在经过近10万公里的旅程后到达…...

Zookeeper和Nacos的区别

Zookeeper和Nacos的区别 在分布式系统中&#xff0c;注册中心充当着重要角色&#xff0c;是服务发现、客户端负载均衡中不可缺少的一员。注册中心除了能够实现基本的功能外&#xff0c;他的稳定性、可用性和健壮性对整个分布式系统的流畅运行影响重大。zookeeper和nacos可能是…...

O3DE的Pass

Pass介绍 Pass是具有输入和输出的渲染过程。 在最终渲染帧中看到的每个细节都是通过一系列Pass&#xff08;前一个Pass的输出是下一个Pass的输入&#xff09;计算出来的。Pass可以生成图像&#xff08;作为纹理、缓冲区或渲染目标&#xff09;。每个图像都包含关于场景的特定…...

如何建立含有逻辑删除字段的唯一索引

业务场景 在实际工作当中&#xff0c;遇到一个场景&#xff0c;就是在用户注册时&#xff0c;名字要全局唯一&#xff0c;当然&#xff0c;我们是可以对用户进行删除的&#xff0c;你会怎么去做&#xff1f; 分析 一般来说&#xff0c;我们可以在用户注册请求时&#xff0c…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

pam_env.so模块配置解析

在PAM&#xff08;Pluggable Authentication Modules&#xff09;配置中&#xff0c; /etc/pam.d/su 文件相关配置含义如下&#xff1a; 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块&#xff0c;负责验证用户身份&am…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

沙箱虚拟化技术虚拟机容器之间的关系详解

问题 沙箱、虚拟化、容器三者分开一一介绍的话我知道他们各自都是什么东西&#xff0c;但是如果把三者放在一起&#xff0c;它们之间到底什么关系&#xff1f;又有什么联系呢&#xff1f;我不是很明白&#xff01;&#xff01;&#xff01; 就比如说&#xff1a; 沙箱&#…...

Unity VR/MR开发-VR开发与传统3D开发的差异

视频讲解链接&#xff1a;【XR马斯维】VR/MR开发与传统3D开发的差异【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...

云原生时代的系统设计:架构转型的战略支点

&#x1f4dd;个人主页&#x1f339;&#xff1a;一ge科研小菜鸡-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 一、云原生的崛起&#xff1a;技术趋势与现实需求的交汇 随着企业业务的互联网化、全球化、智能化持续加深&#xff0c;传统的 I…...

P10909 [蓝桥杯 2024 国 B] 立定跳远

# P10909 [蓝桥杯 2024 国 B] 立定跳远 ## 题目描述 在运动会上&#xff0c;小明从数轴的原点开始向正方向立定跳远。项目设置了 $n$ 个检查点 $a_1, a_2, \cdots , a_n$ 且 $a_i \ge a_{i−1} > 0$。小明必须先后跳跃到每个检查点上且只能跳跃到检查点上。同时&#xff0…...