当前位置: 首页 > news >正文

Flink_DataStreamAPI_输出算子Sink

Flink_DataStreamAPI_输出算子Sink

  • 1连接到外部系统
  • 2输出到文件
  • 3输出到Kafka
  • 4输出到MySQL(JDBC)
  • 5自定义Sink输出

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

在这里插入图片描述

1连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,

stream.sinkTo()

当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:
在这里插入图片描述
我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
在这里插入图片描述
除此以外,就需要用户自定义实现sink连接器了。

2输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
-行编码: FileSink.forRowFormat(basePath,rowEncoder)。
-批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

示例:

public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略:  1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}

3输出到Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码
输出无key的record:

public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次,必须开启checkpoint(后续章节介绍)env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** Kafka Sink:* TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint(后续介绍)* 2、设置事务前缀* 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** 如果要指定写入kafka的key,可以自定义序列化器:* 1、实现 一个接口,重写 序列化 方法* 2、指定key,转成 字节数组* 3、指定value,转成 字节数组* 4、返回一个 ProducerRecord对象,把key、value放进去*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("atguigu-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

(4)运行代码,在Linux主机启动一个消费者,查看是否收到数据

[atguigu@hadoop102 ~]$ 
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

4输出到MySQL(JDBC)

写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version>
</dependency>

官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:

<repositories><repository><id>apache-snapshots</id><name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url></repository>
</repositories>

添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:

	<mirror><id>aliyunmaven</id><mirrorOf>*,!apache-snapshots</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></mirror>

(2)启动MySQL,在test库下建表ws

mysql>     
CREATE TABLE `ws` (`id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的示例代码

public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法: addsink* 2、JDBCSink的4个参数:*    第一个参数: 执行的sql,一般就是 insert into*    第二个参数: 预编译sql, 对占位符填充值*    第三个参数: 执行选项 ---》 攒批、重试*    第四个参数: 连接选项 ---》 url、用户名、密码*/SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor,如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("000000").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());sensorDS.addSink(jdbcSink);env.execute();}
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

5自定义Sink输出

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

相关文章:

Flink_DataStreamAPI_输出算子Sink

Flink_DataStreamAPI_输出算子Sink 1连接到外部系统2输出到文件3输出到Kafka4输出到MySQL&#xff08;JDBC&#xff09;5自定义Sink输出 Flink作为数据处理框架&#xff0c;最终还是要把计算处理的结果写入外部存储&#xff0c;为外部应用提供支持。 1连接到外部系统 Flink的D…...

标准C++ 字符串

一、标准库中的字符串类型 在C中&#xff0c;字符串是一个非常重要的数据类型&#xff0c;用于表示和处理文本信息。C提供了多种方式来处理字符串&#xff0c;每种方式都有其特点和适用场景。以下是几种常见的字符串类型及其用法&#xff1a; 1. C 风格字符串 (char* 或 char…...

时序预测:多头注意力+宽度学习

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…...

day06(单片机)IIC+STH20

目录 IICSHT20 I2C基础简介 为什么I2C需要使用上拉电阻&#xff1f; I2C特点 时序图分析 起始信号与终止信号 数据传输时序 字节传输和应答信号 I2C寻址 主机给从机发送一个字节 主机给从机发送多个字节 主机从从机接收一个字节 主机从从机接收多个字节 I2C寄存器 I2C_RXDR&…...

Bugku CTF_Web——文件上传

Bugku CTF_Web——文件上传 进入靶场 My name is margin,give me a image file not a php抓个包上传试试 改成png也上传失败 应该校验了文件头 增加了文件头也不行 试了一下 把文件类型改成gif可以上传 但是还是不能连接 将Content-Type改大小写 再把文件后缀名改成php4 成…...

C#版使用融合通信API发送手机短信息

目录 功能实现 范例运行环境 实现范例 类设计 类代码实现 调用范例 总结 功能实现 融合云通信服务平台&#xff0c;为企业提供全方位通信服务&#xff0c;发送手机短信是其一项核心功能&#xff0c;本文将讲述如何使用融合云服务API为终端手机用户发送短信信息&#xf…...

人工智能:重塑医疗、企业与生活的未来知识管理——以HelpLook为例

一、医疗行业&#xff1a;AI引领的医疗革新 随着人工智能&#xff08;AI&#xff09;技术的持续飞跃&#xff0c;我们正身处一场跨行业的深刻变革之中。在医疗健康的广阔舞台上&#xff0c;人工智能技术正扮演着日益重要的角色。它不仅能够辅助医生进行病例的精准诊断&#xf…...

MVVM(Model-View-ViewModel)模型

MVVM&#xff08;ModelViewViewModel&#xff09;模型是一种常用于软件开发中的架构模式&#xff0c;尤其在前端框架&#xff08;如 Vue.js、React、Angular&#xff09;中被广泛应用。它将程序的用户界面与业务逻辑分离&#xff0c;便于维护和扩展。 MVVM 的三个组成部分 1. …...

权限系统:权限应用服务设计

今天聊聊权限系统的应用服务设计。 从业务需求的角度来看&#xff0c;权限系统需要解决两个核心问题&#xff1a; 1、菜单渲染与动态展示 当用户成功登录并接入系统后&#xff0c;系统需要动态获取并展示该用户有权限访问的菜单项。 这一过程涉及前端系统与权限系统的交互。前端…...

Android音频架构

音频基础知识 声音有哪些重要属性呢&#xff1f; 响度(Loudness) 响度就是人类可以感知到的各种声音的大小&#xff0c;也就是音量。响度与声波的振幅有直接关系。 音调(Pitch) 音调与声音的频率有关系&#xff0c;当声音的频率越大时&#xff0c;人耳所感知到的音调就越高&a…...

AI 智享直播:开启直播新篇,引领未来互动新趋势!

在数字化浪潮席卷全球的今天&#xff0c;AI技术正以不可阻挡之势渗透进我们生活的方方面面&#xff0c;从智能家居到自动驾驶&#xff0c;从医疗健康到金融服务&#xff0c;无一不彰显着其强大的影响力和无限的潜力。而在这一波科技革新的洪流中&#xff0c;直播行业也迎来了前…...

【AIGC】国内AI工具复现GPTs效果详解

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 &#x1f4af;前言&#x1f4af;本文所要复现的GPTs介绍&#x1f4af;GPTs指令作为提示词在ChatGPT实现类似效果&#x1f4af;国内AI工具复现GPTs效果可能出现的问题解决方法解决后的效果 &#x1…...

Charles抓https包-配置系统证书(雷电)

1、导出证书 2、下载 主页上传资源中有安装包&#xff0c;免费的 openssl 安装教程自己搜 openssl x509 -subject_hash_old -in charles.pem 3、修改证书名、后缀改成点0 雷电打开root和磁盘写入 4、导入雷电证书根目录 证书拖进去&#xff0c;基本就完成了&#xff…...

在卷积神经网络中真正占用内存的是什么

在卷积神经网络&#xff08;CNN&#xff09;中&#xff0c;占用内存的主要部分包括以下几个方面&#xff1a; 1. 模型参数&#xff08;Weights and Biases&#xff09; CNN 中的权重和偏置&#xff08;即模型的参数&#xff09;通常是占用内存的最大部分。具体来说&#xff1…...

2024 ECCV | DualDn: 通过可微ISP进行双域去噪

文章标题&#xff1a;《DualDn: Dual-domain Denoising via Differentiable ISP》 论文链接&#xff1a; DualDn 代码链接&#xff1a; https://openimaginglab.github.io/DualDn/ 本文收录于2024ECCV&#xff0c;是上海AI Lab、浙江大学、香港中文大学&#xff08;薛天帆等…...

Elasticsearch 和 Kibana 8.16:Kibana 获得上下文和 BBQ 速度并节省开支!

作者&#xff1a;来自 Elastic Platform Product Team Elastic Search AI 平台&#xff08;Elasticsearch、Kibana 和机器学习&#xff09;的 8.16 版本包含大量新功能&#xff0c;可提高性能、优化工作流程和简化数据管理。 使用更好的二进制量化 (Better Binary Quantizatio…...

Linux 抓包工具 --- tcpdump

序言 在传输层 Tcp 的学习中&#xff0c;我们了解了 三次握手和四次挥手 的概念&#xff0c;但是看了这么多篇文章&#xff0c;我们也只是停留在 纸上谈兵。  欲知事情如何&#xff0c;我们其实可以尝试去看一下具体的网络包的信息。在这篇文章中将向大家介绍&#xff0c;在 L…...

Vector Optimization – Stride

文章目录 Vector优化 – stride跳跃Vector优化 – stride跳跃 This distance between memory locations that separates the elements to be gathered into a single register is called the stride. A stride of one unit is called a unit-stride. This is equivalent to se…...

git config是做什么的?

git config是做什么的&#xff1f; git config作用配置级别三种配置级别的介绍及使用&#xff0c;配置文件说明 使用说明git confi查看参数 默认/不使用这个参数 情况下 Git 使用哪个配置等级&#xff1f; 一些常见的行为查看配置信息设置配置信息删除配置信息 一些常用的配置信…...

计算机网络(7) 数据链路层

数据链路层的内容不学不知道&#xff0c;一学真的是吓一跳哦&#xff0c;内容真的挺多的&#xff0c;但是大家不要害怕&#xff0c;总会学完的。 还有由于数据链路层的内容太多&#xff0c;一篇肯定是讲不完的所以我决定把它分为好几个部分进行学习与讲解。大家可以关注以后文…...

2024年秋国开电大《建筑结构试验》形考任务1-4

形考作业一 1.下列选项中,( )项不属于科学研究性试验。 答案:检验结构的质量,说明工程的可靠性 2.下列各项,( )项不属于工程鉴定性试验。 答案:验证结构计算理论的假定 3.按试验目的进行分类,可将结构试验分成( )。 答案:工程鉴定性试验和科学研究性试验…...

【MySQL】explain之type类型

explain的type共有以下几种类型&#xff0c;system、const、eq_ref、ref、range、index、all。 system&#xff1a;当表中只有一条记录并且该表使用的存储引擎的统计数据是精确的&#xff0c;比如MyISAM、Memory&#xff0c;那么对该表的访问方法就是system。 const&#xff…...

Llama架构及代码详解

Llama的框架图如图&#xff1a; 源码中含有大量分布式训练相关的代码&#xff0c;读起来比较晦涩难懂&#xff0c;所以我们对llama自顶向下进行了解析及复现&#xff0c;我们对其划分成三层&#xff0c;分别是顶层、中层、和底层&#xff0c;如下&#xff1a; Llama的整体组成…...

Android onConfigurationChanged 基础配置

onConfigurationChanged 代替重建 0. **定义与基本用途**1. **具体使用场景 - 屏幕方向改变**2. **具体使用场景 - 键盘可用性改变**3. **具体使用场景 - 语言设置变更**4. **具体使用场景 - 屏幕密度变化**5. **具体使用场景 - 字体大小改变**6. **具体使用场景 - 屏幕尺寸变化…...

3. Sharding-Jdbc核⼼流 程+多种分⽚策略

1. Sharding-Jdbc 分库分表执⾏核⼼流程 Sharding-JDBC执行流程 1. SQL解析 -> SQL优化 -> SQL路由 -> SQL改写 -> SQL执⾏-> 结果归并 ->返回结果简写为&#xff1a;解析->路由->改写->执⾏->结果归并1.1 SQL解析 1. SQL解析过程分为词法解析…...

为什么财富的蓝图如此重要

我们生活在一个二元对立的世界里&#xff1a;上与下、明与暗、冷与热内与外、快与慢、左与右。这些还只是千百种对立之中的几个例子而已。 有了一个极端&#xff0c;表示一定同时有相对的另一端存在。有了右边不可能没有左边。 所以&#xff0c;在钱这件事上&#xff0c;有外…...

【云计算解决方案面试整理】1-2云计算基础概念及云计算技术原理

准备面云计算解决方案的岗位,整理了一些,也请大佬们指点。 文档分为 云计算基础概念、云计算技术原理、主流云计算平台(以天翼云为例)、云计算架构(弹性设计、高可用设计、高性能设计)、安全防护几个方面。 一、云计算基础概念 1.请简要解释一下什么是云计算? 简单说呢…...

循环语句 while()... 与 for()...(day11)

一、while()与do...while()... 循环语句&#xff1a; 通过循环语句可以反复执行一段代码多次 1、while循环&#xff1a; - 语法&#xff1a; while(①条件表达式){ ②语句... } - while语句在执行时&#xff0c; 先对条件表达式进行求值判断&#xff0c; 如果值为true&#…...

Mysql篇-三大日志

概述 undo log&#xff08;回滚日志&#xff09;&#xff1a;是 Innodb 存储引擎层生成的日志&#xff0c;实现了事务中的原子性&#xff0c;主要用于事务回滚和 MVCC。 redo log&#xff08;重做日志&#xff09;&#xff1a;是 Innodb 存储引擎层生成的日志&#xff0c;实现…...

MySQL的SQL书写顺序和执行顺序

老是忘记执行顺序&#xff0c;记录一下&#xff1a; 1. SQL语句的书写顺序 书写顺序通常是我们编写SQL查询时的顺序&#xff0c;主要包括以下关键字&#xff1a; SELECT&#xff1a;选择要查询的字段。FROM&#xff1a;指定数据来源表。JOIN&#xff08;可选&#xff09;&am…...