当前位置: 首页 > news >正文

Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务

  • 非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 构建jdbc sinkSinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句new JdbcStatementBuilder<CustomizeBean>() {@Overridepublic void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withBatchSize(10) // 批次大小,条数.withBatchIntervalMs(5000) // 批次最大等待时间.withMaxRetries(1) // 重复次数.build(), // 写入参数配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false").withUsername("root").withPassword("password").build() // jdbc信息配置);// 添加jdbc sinkcustomizeSource.addSink(jdbcSink);env.execute();}
}
  • 事务代码
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;import javax.sql.XADataSource;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 每20秒作为checkpoint的一个周期env.enableCheckpointing(20000);// 两次checkpoint间隔最少是10秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);// 程序取消或者停止时不删除checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint必须在60秒结束,否则将丢弃env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只能有一个checkpointenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置EXACTLY_ONCE语义,默认就是这个env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoint存储位置env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpointSinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句(JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withMaxRetries(0) // 设置重复次数.withBatchSize(25) // 设置批次大小,数据条数.withBatchIntervalMs(1000) // 批次最大等待时间.build(),JdbcExactlyOnceOptions.builder()// 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的.withTransactionPerConnection(true).build(),(SerializableSupplier<XADataSource>) () -> {// XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置urlmysqlXADataSource.setUser("root"); // 设置用户mysqlXADataSource.setPassword("password"); // 设置密码return mysqlXADataSource;});// 添加jdbc sinkcustomizeSource.addSink(exactlyOneJdbcSink);env.execute();}
}
  • pom依赖
        <!-- 在原有的依赖中加入下面两个内容 --><!-- JDBC connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><!-- mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>
  • 结果
    在这里插入图片描述
    jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

相关文章:

Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务 非事务代码 import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.…...

lifecycleScope Unresolved reference

描述 导入了lifecycle.lifecycleScope&#xff0c;但是在activity中使用lifecycleScope报错出现Unresolved reference找不到引用。 导包 import androidx.lifecycle.lifecycleScope使用 lifecycleScope.launch(Dispatchers.IO) {...}错误 方案 代码中的activity继承Activ…...

P5960 【模板】差分约束算法

【模板】差分约束算法 题目描述 给出一组包含 m m m 个不等式&#xff0c;有 n n n 个未知数的形如&#xff1a; { x c 1 − x c 1 ′ ≤ y 1 x c 2 − x c 2 ′ ≤ y 2 ⋯ x c m − x c m ′ ≤ y m \begin{cases} x_{c_1}-x_{c_1}\leq y_1 \\x_{c_2}-x_{c_2} \leq y_2 \\…...

VSCode---通过ctrl+鼠标滚动改变字体大小

打开设置然后在右边输editor.mouseWheelZoo勾选即可实现鼠标滚动改变字体大小 4.这种设置的字体大小是固定的...

视频监控汇聚平台EasyCVR视频分享页面WebRTC流地址播放不了是什么原因?

开源EasyDarwin视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;在视频监控播放上&#xff0c;TSINGSEE青犀视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放&#xff0c;可同时播放多…...

Libevent开源库的介绍与应用

libeventhttps://libevent.org/ 一、初识 1、libevent介绍 Libevent 是一个用C语言编写的、轻量级的开源高性能事件通知库&#xff0c;主要有以下几个亮点&#xff1a;事件驱动&#xff08; event-driven&#xff09;&#xff0c;高性能;轻量级&#xff0c;专注于网络&#xff…...

【LNMP】LNMP

LNMP&#xff1a;是目前成熟的企业网站的应用模式之一&#xff0c;指的是一套协同工作的系统和相关软件&#xff1b;能够提供静态页面服务&#xff0c;也可以提供动态web服务 L Linux系统&#xff0c;操作系统N Nginx网站服务&#xff0c;前端&#xff0c;提供前端的静态…...

uniapp自定义头部导航栏

有时我们需要一些特殊的头部导航栏页面&#xff0c;取消传统的导航栏&#xff0c;来增加页面的美观度。 下面我就教大家如何配置&#xff1a; 一、效果图 二、实现 首先在uniapp中打开pages.json配置文件&#xff0c;在单个路由配置style里面设置导航栏样式​​​​​​nav…...

Django实现音乐网站 ⑹

使用Python Django框架制作一个音乐网站&#xff0c; 本篇主要是在添加编辑过程中对后台歌手功能优化及表模型名称修改、模型继承内容。 目录 表模型名称修改 模型继承 创建抽象基类 其他模型继承 更新表结构 歌手新增、编辑优化 表字段名称修改 隐藏单曲数和专辑数 姓…...

dubbo-helloworld示例

1、工程架构 2、创建模块 &#xff08;1&#xff09;创建父工程,引入公共依赖 pom.xml依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></depende…...

电脑ADB连接手机的方式通过网络无法adb连接手机的问题(已解决)

首先电脑要下载adb工具&#xff0c;将压缩包解压到C盘&#xff1a;https://download.csdn.net/download/qq_43445867/87975072 1、使用USB线连接 打开手机USB调试&#xff1b;PC端安装手机USB驱动。 1.打开DOS命令窗口&#xff0c;进入adb文件夹,输入adb.exe devices回车列出设…...

79 | Python数据分析篇 —— Pandas中groupby聚合操作和透视表基础

Pandas是Python中最常用的数据处理库之一,它提供了高效的数据结构和数据分析工具。在进行数据分析和机器学习等领域的工作时,Pandas是必不可少的库之一。本文将介绍Pandas中的groupby聚合操作和透视表,包括groupby操作、透视表的基础知识、练习题和答案。 文章目录 Pandas中…...

iOS 搭建组件化私有库

一、创建私有库索引 步骤1是在没有索引库的情况下或者是新增索引的时候才需要用到&#xff08;创建基础组件库&#xff09; 首先在码云上建立一个私有库索引&#xff0c;起名为SYComponentSpec 二、本地添加私有库索引 添加私有库索引 pod repo add SYComponentSpec https:/…...

迅为全国产龙芯3A5000电脑运行统信UOS、银河麒麟、loongnix系统

iTOP-3A5000开发板采用全国产龙芯3A5000处理器&#xff0c;基于龙芯自主指令系统 (LoongArch) 的LA464微结构&#xff0c;并进一步提升频率&#xff0c;降低功耗&#xff0c;优化性能。在与龙芯3A4000处理器保持引脚兼容的基础上&#xff0c;频率提升至2.5GHZ&#xff0c;功耗降…...

枫叶时代:打造中国特色的传统文化IP

近年来&#xff0c;取材于传统文化的影视作品在文化产业市场受到前所未有的关注。作为一种兼具辨识度、影响力和流量变现能力的文化符号&#xff0c;影视IP既是文化产业的一个重要环节&#xff0c;也是国家文化软实力的直接体现。优秀的影视IP可以超越文字、语言、民族的障碍&a…...

一条sql语句在mysql中如何执行(查询+更新)

文章目录 一 MySQL 基础架构1.1 MySQL 基本架构1.2 Server 层基本组件介绍1) 连接器2) 查询缓存(MySQL 8.0 版本后移除)3) 分析器4) 优化器5) 执行器 二 语句分析2.1 查询语句2.2 更新语句为什么要用两个日志模块&#xff0c;用一个日志模块不行吗?为什么必须有“两阶段提交”…...

漫画 | TCP/IP之大明邮差

后记&#xff1a; 1973年&#xff0c;卡恩与瑟夫开发出了网络中最核心的两个协议&#xff1a;TCP协议和IP协议&#xff0c;随后为了验证两个协议的可用性&#xff0c;他们做了一个实验&#xff0c;在多个异构网络中进行数据传输&#xff0c;数据包在经过近10万公里的旅程后到达…...

Zookeeper和Nacos的区别

Zookeeper和Nacos的区别 在分布式系统中&#xff0c;注册中心充当着重要角色&#xff0c;是服务发现、客户端负载均衡中不可缺少的一员。注册中心除了能够实现基本的功能外&#xff0c;他的稳定性、可用性和健壮性对整个分布式系统的流畅运行影响重大。zookeeper和nacos可能是…...

O3DE的Pass

Pass介绍 Pass是具有输入和输出的渲染过程。 在最终渲染帧中看到的每个细节都是通过一系列Pass&#xff08;前一个Pass的输出是下一个Pass的输入&#xff09;计算出来的。Pass可以生成图像&#xff08;作为纹理、缓冲区或渲染目标&#xff09;。每个图像都包含关于场景的特定…...

如何建立含有逻辑删除字段的唯一索引

业务场景 在实际工作当中&#xff0c;遇到一个场景&#xff0c;就是在用户注册时&#xff0c;名字要全局唯一&#xff0c;当然&#xff0c;我们是可以对用户进行删除的&#xff0c;你会怎么去做&#xff1f; 分析 一般来说&#xff0c;我们可以在用户注册请求时&#xff0c…...

springboot 百货中心供应链管理系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;百货中心供应链管理系统被用户普遍使用&#xff0c;为方…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上&#xff0c;开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识&#xff0c;在 vs 2017 平台上&#xff0c;进行 ASP.NET 应用程序和简易网站的开发&#xff1b;初步熟悉开发一…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中&#xff0c;我们经常会遇到这样的场景&#xff1a;一个对象的状态变化需要自动通知其他对象&#xff0c;比如&#xff1a; 电商平台中&#xff0c;商品库存变化时需要通知所有订阅该商品的用户&#xff1b;新闻网站中&#xff0…...

CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)

漏洞概览 漏洞名称&#xff1a;Apache Flink REST API 任意文件读取漏洞CVE编号&#xff1a;CVE-2020-17519CVSS评分&#xff1a;7.5影响版本&#xff1a;Apache Flink 1.11.0、1.11.1、1.11.2修复版本&#xff1a;≥ 1.11.3 或 ≥ 1.12.0漏洞类型&#xff1a;路径遍历&#x…...

【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制

使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下&#xff0c;限制某个 IP 的访问频率是非常重要的&#xff0c;可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案&#xff0c;使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...

MySQL 8.0 事务全面讲解

以下是一个结合两次回答的 MySQL 8.0 事务全面讲解&#xff0c;涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容&#xff0c;并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念&#xff08;ACID&#xff09; 事务是…...