当前位置: 首页 > news >正文

Flink Flink数据写入Kafka

一、环境准备

官网地址

flink官方集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

    <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.6</flink.version><spark.version>2.4.3</spark.version><hadoop.version>2.8.5</hadoop.version><hbase.version>1.4.9</hbase.version><hive.version>2.3.5</hive.version><java.version>1.8</java.version><scala.version>2.11.8</scala.version><mysql.version>8.0.22</mysql.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties>
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>${flink.version}</version></dependency>

二、KafkaSink介绍

在这里插入图片描述
在这里插入图片描述

三、正确理解序列化器

什么叫序列化和反序列化?
1.序列化:把对象转换为字节序列的过程称为对象的序列化.
2.反序列化:把字节序列恢复为对象的过程称为对象的反序列化.

序列化器的作用是将flink数据转换成 kafka的ProducerRecord
Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 对象;
那么,Flink Kafka Producer 需要知道如何将 Java/Scala 对象转化为二进制数据。
在这里插入图片描述
使用预定义的序列化器
将 DataStream 数据转换为 Kafka消息中的value,key为默认值null,timestamp为默认值

        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("localhost:9092")// 指定序列化器:指定Topic名称、具体的序列化(产生方需要序列化,接收方需要反序列化).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("testtopic01")// 指定value的序列化器.setValueSerializationSchema(new SimpleStringSchema()).build())

源码解析

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setValueSerializationSchema(SerializationSchema<T> valueSerializationSchema) {this.checkValueSerializerNotSet();KafkaRecordSerializationSchemaBuilder<T> self = this.self();self.valueSerializationSchema = (SerializationSchema)Preconditions.checkNotNull(valueSerializationSchema);return self;}

使用自定义的序列化器

KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// TODO 必填项:配置 kafka 的地址和端口.setBootstrapServers("localhost:9092")// TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型.setRecordSerializer(new KafkaRecordSerializationSchema<String>() {...............}).build();

四、容错保证级别

KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee)

启用 Flink 的 checkpointing 后,FlinkKafkaProducer 可以提供精确一次的语义保证。

除了启用 Flink 的 checkpointing,你也可以通过将适当的 semantic 参数传递给 FlinkKafkaProducer 来选择三种不同的操作模式:

Semantic.NONE:Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。
Semantic.AT_LEAST_ONCE(默认设置):可以保证不会丢失任何记录(但是记录可能会重复)
Semantic.EXACTLY_ONCE:使用 Kafka 事务提供精确一次语义。无论何时,在使用事务写入 Kafka 时,都要记得为所有消费 Kafka 消息的应用程序设置所需的 isolation.level(read_committed 或 read_uncommitted - 后者是默认值)。

注意事项
Semantic.EXACTLY_ONCE 模式依赖于事务提交的能力。事务提交发生于触发 checkpoint 之前,以及从 checkpoint 恢复之后。如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失(Kafka 会自动丢弃超出超时时间的事务)。考虑到这一点,请根据预期的宕机时间来合理地配置事务超时时间。

默认情况下,Kafka broker 将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms 属性设置为 1 小时,因此在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 transaction.max.timeout.ms 的值。

在 KafkaConsumer 的 read_committed 模式中,任何未结束(既未中止也未完成)的事务将阻塞来自给定 Kafka topic 的未结束事务之后的所有读取数据。 换句话说,在遵循如下一系列事件之后:

用户启动了 transaction1 并使用它写了一些记录
用户启动了 transaction2 并使用它编写了一些其他记录
用户提交了 transaction2
即使 transaction2 中的记录已提交,在提交或中止 transaction1 之前,消费者也不会看到这些记录。这有 2 层含义:

首先,在 Flink 应用程序的正常工作期间,用户可以预料 Kafka 主题中生成的记录的可见性会延迟,相当于已完成 checkpoint 之间的平均时间。
其次,在 Flink 应用程序失败的情况下,此应用程序正在写入的供消费者读取的主题将被阻塞,直到应用程序重新启动或配置的事务超时时间过去后,才恢复正常。此标注仅适用于有多个 agent 或者应用程序写入同一 Kafka 主题的情况。
注意:Semantic.EXACTLY_ONCE 模式为每个 FlinkKafkaProducer 实例使用固定大小的 KafkaProducer 池。每个 checkpoint 使用其中一个 producer。如果并发 checkpoint 的数量超过池的大小,FlinkKafkaProducer 将抛出异常,并导致整个应用程序失败。请合理地配置最大池大小和最大并发 checkpoint 数量。

注意:Semantic.EXACTLY_ONCE 会尽一切可能不留下任何逗留的事务,否则会阻塞其他消费者从这个 Kafka topic 中读取数据。但是,如果 Flink 应用程序在第一次 checkpoint 之前就失败了,那么在重新启动此类应用程序后,系统中不会有先前池大小(pool size)相关的信息。因此,在第一次 checkpoint 完成前对 Flink 应用程序进行缩容,且并发数缩容倍数大于安全系数 FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR 的值的话,是不安全的。同样,在这种情况使用 setTransactionalIdPrefix() 改变 transactional.id 也是不安全的,因为系统也不知道先前使用的 transactional.id 前缀。

五、案例—Flink将Socket数据写入Kafka(精准一次)

注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可
1、开启 checkpoint
2、设置事务前缀
3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟

package com.flink.DataStream.Sink;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Properties;public class flinkSinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);// 如果是精准一次,必须开启 checkpointstreamExecutionEnvironment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);/*** TODO Kafka Sink* TODO 注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可* 1、开启 checkpoint* 2、设置事务前缀* 3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟*/Properties properties=new Properties();properties.put("transaction.timeout.ms",10 * 60 * 1000 + "");KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("localhost:9092")// 指定序列化器:指定Topic名称、具体的序列化(产生方需要序列化,接收方需要反序列化).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("testtopic01")// 指定value的序列化器.setValueSerializationSchema(new SimpleStringSchema()).build())// 写到 kafka 的一致性级别: 精准一次、至少一次.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("flinkkafkasink-")// 如果是精准一次,必须设置 事务超时时间: 大于 checkpoint间隔,小于 max 15 分钟.setKafkaProducerConfig(properties)//.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();streamSource.sinkTo(kafkaSink);streamExecutionEnvironment.execute();}
}

理解ProduceerConfig配置源码
在这里插入图片描述

六、启动Zookeeper、Kafka

#启动zookeeper
${ZK_HOME}/bin/zkServer.sh start
#查看zookeeper状态
${ZK_HOME}/bin/zkServer.sh status
#启动kafka
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
#查看topic
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper localhost:2181
#创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic testtopic02 --partitions 2 --replication-factor 1
#删除topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic testtopic02
#生产消息
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic01
#消费消息
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic01 --from-beginning

通过socket模拟数据写入Flink之后,Flink将数据写入Kafka
在这里插入图片描述

相关文章:

Flink Flink数据写入Kafka

一、环境准备 官网地址 flink官方集成了通用的 Kafka 连接器&#xff0c;使用时需要根据生产环境的版本引入相应的依赖 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.6</flink.version&g…...

《论文阅读》用于情绪回复生成的情绪正则化条件变分自动编码器 Affective Computing 2021

《论文阅读》用于情绪回复生成的情绪正则化条件变分自动编码器 前言简介模型结构实验结果总结前言 今天为大家带来的是《Emotion-Regularized Conditional Variational Autoencoder for Emotional Response Generation》 出版:IEEE Transactions on Affective Computing 时间…...

Pytorch CIFAR10图像分类 Swin Transformer篇

Pytorch CIFAR10图像分类 Swin Transformer篇 文章目录 Pytorch CIFAR10图像分类 Swin Transformer篇4. 定义网络&#xff08;Swin Transformer&#xff09;Swin Transformer整体架构Patch MergingW-MSASW-MSARelative position biasSwin Transformer 网络结构Patch EmbeddingP…...

【vim】常用操作

用的时候看看&#xff0c;记太多也没用&#xff0c;下面都是最常用的&#xff0c;更多去查文档vim指令集。 以下均为正常模式下面操作&#xff0c;正在编辑的&#xff0c;先etc一下. 1/拷贝当前行 yy&#xff0c;5yy为拷贝包含当前行往下五行 2/p将拷贝的东西粘贴到当前行下…...

oracle、误操作删除数据库 数据恢复。

–查询 执行 delete 的语句 &#xff0c;拿到删除的时间 FIRST_LOAD_TIME &#xff0c;删除行数可参考 ROWS_PROCESSED select t.FIRST_LOAD_TIME,t.ROWS_PROCESSED,t.* from v$sql t where t.sql_text like %delete from trade% ;select *from trade as of timestamp to_time…...

【Angular开发】Angular在2023年之前不是很好

做一个简单介绍&#xff0c;年近48 &#xff0c;有20多年IT工作经历&#xff0c;目前在一家500强做企业架构&#xff0e;因为工作需要&#xff0c;另外也因为兴趣涉猎比较广&#xff0c;为了自己学习建立了三个博客&#xff0c;分别是【全球IT瞭望】&#xff0c;【架构师酒馆】…...

记录 | 报错:libssl-dev : 依赖: libssl3 (= 3.0.8-1ubuntu1.1) 但是 3.0.8-1ubuntu1.2 正要被安装

ubuntu 上安装 libssl-dev 失败的报错解决 报错&#xff1a; 下列软件包有未满足的依赖关系&#xff1a; libssl-dev : 依赖: libssl3 ( 3.0.8-1ubuntu1.1) 但是 3.0.8-1ubuntu1.2 正要被安装 E: 无法修正错误&#xff0c;因为您要求某些软件包保持现状&#xff0c;就是它们破…...

MySQL联合查询、最左匹配、范围查询导致失效

服务器版本 客户端&#xff1a;navicat premium16.0.11 联合索引 假设有如下表 联合索引就是同时把多列设成索引&#xff0c;如(empno&#xff0c;ename)在查询的时候就会先按照empno进行查询&#xff0c;再按照ename进行查询其中empno是全局有序&#xff0c;ename是局部有…...

部署zabbix

源码下载地址&#xff1a; Download Zabbix sources nginx: download 防火墙和selinux都需要关闭 1、部署监控服务器 1&#xff09;安装LNMP环境 Zabbix监控管理控制台需要通过Web页面展示出来&#xff0c;并且还需要使用MySQL来存储数据&#xff0c;因此需要先为Zabbix准备基础…...

服务器感染了.locked、.locked1勒索病毒,如何确保数据文件完整恢复?

尊敬的读者&#xff1a; locked、.locked1勒索病毒的威胁如影随形&#xff0c;深刻影响着数字世界的安全。本文将深入揭示locked、.locked1的狡猾特征&#xff0c;为您提供实用的数据恢复方法&#xff0c;并分享一系列特别定制的预防措施&#xff0c;旨在使您的数字生活摆脱勒…...

【Linux系统化学习】命令行参数 | 环境变量的再次理解

个人主页点击直达&#xff1a;小白不是程序媛 Linux专栏&#xff1a;Linux系统化学习 代码仓库&#xff1a;Gitee 目录 mian函数传参获取环境变量 手动添加环境变量 导出环境变量 environ获取环境变量 本地变量和环境变量的区别 Linux的命令分类 常规命令 内建命令 …...

【STM32】TIM定时器编码器

1 编码器接口简介 Encoder Interface 编码器接口 编码器接口可接收增量&#xff08;正交&#xff09;编码器的信号&#xff0c;根据编码器旋转产生的正交信号脉冲&#xff0c;自动控制CNT自增或自减&#xff0c;从而指示编码器的位置、旋转方向和旋转速度 接收正交信号&#…...

力扣44题通配符匹配题解

44. 通配符匹配 - 力扣&#xff08;LeetCode&#xff09; 给你一个输入字符串 (s) 和一个字符模式 (p) &#xff0c;请你实现一个支持 ? 和 * 匹配规则的通配符匹配&#xff1a; ? 可以匹配任何单个字符。* 可以匹配任意字符序列&#xff08;包括空字符序列&#xff09;。 …...

windows系统安装RocketMQ_dashboard

1.下载源码 按照官网说明下载源码 官网 官网文档 2.源码安装 2.1.① 编译rocketmq-dashboard 注释掉报错的maven插件frontend-maven-plugin、maven-antrun-plugin mvn clean package -Dmaven.test.skiptrue2.2.② 运行rocketmq-dashboard java -jar target/rocketmq-…...

ATECLOUD电源自动测试系统打破传统 助力新能源汽车电源测试

随着新能源汽车市场的逐步扩大&#xff0c;技术不断完善提升&#xff0c;新能源汽车测试变得越来越复杂&#xff0c;测试要求也越来越严格。作为新能源汽车的关键部件之一&#xff0c;电源为各个器件和整个电路提供稳定的电源&#xff0c;满足需求&#xff0c;确保新能源汽车的…...

如何教会小白使用淘宝API接口获取商品数据

随着互联网的普及&#xff0c;越来越多的人开始接触网络购物&#xff0c;而淘宝作为中国最大的电商平台之一&#xff0c;成为了众多消费者首选的购物平台。然而&#xff0c;对于一些小白用户来说&#xff0c;如何通过淘宝API接口获取商品数据可能是一个难题。本文将详细介绍如何…...

Redis有序集合对象

一.编码 有序集合的编码可以是ziplist或者skiplist。 ziplist编码的有序集合对象使用压缩列表作为底层实现&#xff0c;每一个集合元素使用紧挨在一起的两个压缩列表节点来保存。第一个节点保存元素的成员(member)&#xff0c;而第二个元素则保存元素的分值(score)。 127.0.0.…...

【C++数据结构 | 字符串速通】10分钟秒杀字符串相关操作 | 字符串的增删改查 | 字符串与数组相互转换

字符串 by.Qin3Yu 文中所有代码默认已使用std命名空间且已导入部分头文件&#xff1a; #include <iostream> #include <string> using namespace std;概念速览 字符串是一种非常好理解的数据类型&#xff0c;它用于存储和操作文本数据。字符串可以包含任意字符…...

运动重定向:C-3PO

C-3PO: Cyclic-Three-Phase Optimization for Human-Robot Motion Retargeting based on Reinforcement Learning解析 摘要1. 简介2. 相关工作2.1 运动重定向&#xff08;Motion Retargeting&#xff09;2.2 强化学习&#xff08;Reinforcement Learning&#xff09; 3. 预备知…...

天池SQL训练营(四)-集合运算-表的加减法和join等

-天池龙珠计划SQL训练营 4.1表的加减法 4.1.1 什么是集合运算 集合在数学领域表示“各种各样的事物的总和”, 在数据库领域表示记录的集合. 具体来说,表、视图和查询的执行结果都是记录的集合, 其中的元素为表或者查询结果中的每一行。 在标准 SQL 中, 分别对检索结果使用 U…...

大数据学习栈记——Neo4j的安装与使用

本文介绍图数据库Neofj的安装与使用&#xff0c;操作系统&#xff1a;Ubuntu24.04&#xff0c;Neofj版本&#xff1a;2025.04.0。 Apt安装 Neofj可以进行官网安装&#xff1a;Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat&#xff08;I/O Statistics&#xff09;是Linux系统下用于监视系统输入输出设备和CPU使…...

Java - Mysql数据类型对应

Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...