Flink 1.20 Kafka Connector:新旧 API 深度解析与迁移指南
Flink Kafka Connector 新旧 API 深度解析与迁移指南
一、Flink Kafka Connector 演进背景
Apache Flink 作为实时计算领域的标杆框架,其 Kafka 连接器的迭代始终围绕性能优化、语义增强和API 统一展开。Flink 1.20 版本将彻底弃用基于 FlinkKafkaConsumer/FlinkKafkaProducer 的旧 API(标记为 @Deprecated),全面转向基于新 Source/Sink API(FLIP-27/FLIP-143)的 KafkaSource/KafkaSink。这一变革不仅带来了架构上的革新,更通过流批统一、精确一次语义和动态分区管理等特性,显著提升了用户体验。
二、新旧 API 核心差异对比
Apache Flink 在 1.13+ 版本逐步引入了全新的 Source/Sink API(也称为 FLIP-27 架构),取代了旧的 SourceFunction/SinkFunction 架构。这一变化旨在解决旧架构在扩展性、批流统一、状态管理等方面的局限性。
1. 新架构实现
这里我们从新 Source 举例,来了解新架构及实现原理:
开始了解 - 新 Source 原理 :
Flink 原先数据源一直使用的是 SourceFunction。实现它的 run 方法,使用 SourceContextcollect 数据或者发送 watermark 就实现了一个数据源。但是它有如下问题(来源于FLIP-27: Refactor Source Interface - Apache Flink - Apache Software Foundation翻译):
-
同一类型数据源的批和流模式需要两套不同实现。
-
“work发现”(分片、分区等)和实际 “读取” 数据的逻辑混杂在 SourceFunction 接口和 DataStream API 中,导致实现非常复杂,如 Kafka 和 Kinesis 源等。
-
分区/分片/拆分在接口中不是明确的。这使得以与 source 无关的方式实现某些功能变得困难,例如 event time 对齐、每个分区水印、动态拆分分配、工作窃取。例如,Kafka 和 Kinesis consumer 支持每个分区的 watermark,但从 Flink 1.8.1 开始,只有 Kinesis 消费者支持 event time 对齐(选择性地从拆分中读取以确保我们在事件时间上均匀推进)。
-
Checkpoint 锁由 source function “拥有”。实现必须确保进行元素发送和 state 更新时加锁。 Flink 无法优化它处理该锁的方式。
锁不是公平锁。在锁竞争下,一些线程可能无法获得锁(checkpoint线程)。这也妨碍使用 actor/mailbox 无锁线程模型。 -
没有通用的构建块,这意味着每个源都自己实现了一个复杂的线程模型。这使得实施和测试新 source 变得困难,并增加了对现有 source 的开发贡献的标准。
为了解决这些问题,Flink 引入了新的 Source 架构。
一个数据 source 包括三个核心组件:分片(Splits)、分片枚举器(SplitEnumerator) 以及 源阅读器(SourceReader)。
-
分片(Split) 是对一部分 source 数据的包装,如一个文件或者日志分区。分片是 source 进行任务分配和数据并行读取的基本粒度。
-
源阅读器(SourceReader) 会请求分片并进行处理,例如读取分片所表示的文件或日志分区。SourceReader 在 TaskManagers 上的 SourceOperators 并行运行,并产生并行的事件流/记录流。
-
分片枚举器(SplitEnumerator) 会生成分片并将它们分配给 SourceReader。该组件在 JobManager 上以单并行度运行,负责对未分配的分片进行维护,并以均衡的方式将其分配给 reader。
Source 类作为API入口,将上述三个组件结合在了一起。

参考原文内容:https://blog.csdn.net/bigdatakenan/article/details/141064429
总的来说,Flink 新 Source/Sink 架构的本质是通过组件解耦和动态分片机制,以实现更加灵活、精细化的资源管理。
2. 核心类与依赖
| 特性 | 旧 API(1.12 之前) | 新 API(1.13+) |
|---|---|---|
| Source 实现类 | FlinkKafkaConsumer | KafkaSource |
| Sink 实现类 | FlinkKafkaProducer | KafkaSink |
| 依赖坐标 示例 | flink-connector-kafka_2.11:1.12 | flink-connector-kafka:1.20 |
| 批流统一 | 需要不同 API | 同一 API 支持流批 |
| 资源效率 | 静态并行度 | 动态分片分配 |
| 并发与锁管理 | 全局锁导致高竞争 | 分片级并发 |
3. 关键功能介绍
(1)动态分区发现
旧 API 通过 flink.partition-discovery.interval-millis 配置分区发现间隔。
新 API 通过 partition.discovery.interval.ms 配置分区发现间隔。
// 设置动态分区发现,间隔为 10 秒
// 旧 API
Properties props = new Properties();
...
props.setProperty("flink.partition-discovery.interval-millis", "10000");
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("topic-name",new SimpleStringSchema(),props
);// 新 API
KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(Config.KAFKA_SERVER).setValueOnlyDeserializer(new SimpleStringSchema())....setProperty("partition.discovery.interval.ms", "10000") .build();
(2)起始偏移量控制
旧 API 通过 auto.offset.reset 参数 或 flinkKafkaConsumer.setStartFromEarliest()的方法配置 offset 。
新 API 提供更细粒度的 offset 控制:
// 旧 API
Properties props = new Properties();
props.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("topic-name",new SimpleStringSchema(),props
);
// 或
flinkKafkaConsumer.setStartFromEarliest();// 新 API
// 从最早可用偏移量(earliest offset)开始消费,忽略消费者组已提交的偏移量。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest()).build();// 优先使用消费者组已提交的偏移量(若存在),如果无提交的偏移量(如首次启动消费者组),则回退到 EARLIEST 偏移量
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();// 从大于或等于指定时间戳(Unix 毫秒时间戳)的 Kafka 消息开始消费
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1657256257000L)).build();
(3)事务性写入
相较于旧 API ,新 API 配置事务方式更加简洁和易读。
Properties props = new Properties();
...
props.setProperty("enable.idempotence", true);
props.setProperty("transaction.timeout.ms", "900000"); // 旧 API Exactly-Once Sink
FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>("topic",new SimpleStringSchema(),props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 旧 API 语义配置)
flinkKafkaProducer.setTransactionalIdPrefix("flink-transactional-id-");// 新 API Exactly-Once Sink
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(Config.KAFKA_SERVER).setRecordSerializer(...).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 新 API 语义配置.setTransactionalIdPrefix("flink-transactional-id-").setKafkaProducerConfig(props).build();
三、迁移实战指南
1. 依赖升级
<!-- 旧 API 依赖 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.12.x</version>
</dependency><!-- 新 API 依赖 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.20.x</version>
</dependency>
2. Source 迁移示例(ConsumerRecord 版)
(1)旧 API 实现
Properties properties = new Properties();
properties.setProperty("consumer.topic", "test_123");
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "group_id_001");
properties.setProperty("auto.offset.reset", "earliest");FlinkKafkaConsumer<ConsumerRecord<byte[], byte[]>> flinkKafkaConsumer = new FlinkKafkaConsumer<>(properties.getProperty("consumer.topic"),new KafkaDeserializationSchema<ConsumerRecord<byte[], byte[]>>(){@Overridepublic TypeInformation getProducedType() {return TypeInformation.of(new TypeHint<ConsumerRecord<byte[], byte[]>>() {});}@Overridepublic ConsumerRecord<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {return new ConsumerRecord<>(record.topic(),record.partition(),record.offset(),record.timestamp(),record.timestampType(),record.checksum(),record.serializedKeySize(),record.serializedValueSize(),record.key(),record.value());}@Overridepublic boolean isEndOfStream(ConsumerRecord<byte[], byte[]> nextElement) {return false;}},properties
);env.addSource(flinkKafkaConsumer);
(2)新 API 实现
KafkaSource<ConsumerRecord<String, String>> source = KafkaSource.<ConsumerRecord<String, String>>builder().setBootstrapServers(Config.KAFKA_SERVER).setTopics(Config.KAFKA_TOPIC).setGroupId(Config.KAFKA_GROUP_ID).setStartingOffsets(OffsetsInitializer.earliest()) .setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchema<ConsumerRecord<String, String>>() {@Overridepublic TypeInformation<ConsumerRecord<String, String>> getProducedType() {return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {});}@Overridepublic boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {return false;}@Overridepublic ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {return new ConsumerRecord<String, String>(record.topic(),record.partition(),record.offset(),record.timestamp(),record.timestampType(),record.checksum(),record.serializedKeySize(),record.serializedValueSize(),record.key() == null ? "" : new String(record.key(), StandardCharsets.UTF_8),record.value() == null ? "" : new String(record.value(), StandardCharsets.UTF_8));}})).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), "New Flink Kafka Source");
3. Sink 迁移示例(ProducerRecord 版)
(1)旧 API 实现
Properties props = new Properties();
properties.setProperty("producer.topic", "test_123");
props.setProperty("bootstrap.servers", "localhost:9092");FlinkKafkaProducer<byte[]> flinkKafkaProducer = new FlinkKafkaProducer<>(properties.getProperty("producer.topic"),new KafkaSerializationSchema<byte[]>() {@Overridepublic void open(SerializationSchema.InitializationContext context) throws Exception {KafkaSerializationSchema.super.open(context);}@Overridepublic ProducerRecord<byte[], byte[]> serialize(byte[] element, @Nullable Long timestamp) {return new ProducerRecord<>(properties.getProperty("producer.topic"),"my_key_id".getBytes(StandardCharsets.UTF_8),element);}},properties,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
);env.fromElements("apple", "banana", "orange").map(i -> i.getBytes(StandardCharsets.UTF_8)).addSink(flinkKafkaProducer);
(2)新 API 实现
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(Config.KAFKA_SERVER).setRecordSerializer((KafkaRecordSerializationSchema<String>) (element, context, timestamp) -> {String keyId = "my_key_id";byte[] key = keyId.getBytes(StandardCharsets.UTF_8); // 指定 keybyte[] value = element.getBytes(StandardCharsets.UTF_8); // 指定 valuereturn new ProducerRecord<>(Config.KAFKA_TOPIC, key, value);}).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();env.fromElements("apple", "banana", "orange").sinkTo(sink);
四、总结与展望
通过Flink KafkaSource/KafkaSink 的连接器 API,用户在使用时不仅获得了更简洁的编程模型,还享受到更便捷的动态分区管理、精确一次语义和性能优化等高级特性。建议用户尽早迁移至新 API,以充分利用 Flink 1.20 的增强功能。
Flink 1.20 版本的发布标志着 Flink 在流批一体和云原生架构方面迈出了重要的一步,进一步巩固了其作为实时流数据处理平台的地位。这一版本在功能上进行了诸多增强,还在性能和易用性方面做出了显著改进,为用户提供了更强大的工具来处理复杂的流批任务。
Flink 1.20 核心改进总结【官宣|Apache Flink 1.20 发布公告】:
-
Flink SQL 物化表(Materialized Table)
- 功能特性
- 通过声明式 SQL 定义动态表结构与数据新鲜度(如
FRESHNESS = INTERVAL '3' MINUTE),引擎自动构建流批统一的数据加工链路。 - 支持流式持续刷新、批式全量刷新、增量刷新三种模式,用户可根据成本灵活切换(如大促时秒级实时,日常天级批处理)。
- 简化运维操作:暂停/恢复数据刷新(
SUSPEND/RESUME)、手动回刷历史分区(REFRESH PARTITION)。
- 通过声明式 SQL 定义动态表结构与数据新鲜度(如
- 价值
- 降低 ETL 开发复杂度,无需分别维护流/批作业,提升实时数仓构建效率。
- 功能特性
-
状态与检查点优化
- 统一检查点文件合并机制
- 将零散小文件合并为大文件,减少元数据压力,需配置
execution.checkpointing.file-merging.enabled=true。 - 支持跨检查点合并(
execution.checkpointing.file-merging.across-checkpoint-boundary=true)及文件池模式选择(阻塞/非阻塞)。
- 将零散小文件合并为大文件,减少元数据压力,需配置
- RocksDB 优化
- 后台自动合并小 SST 文件,避免因文件数量膨胀导致检查点失败。
- 统一检查点文件合并机制
-
批处理容错能力增强
- JobMaster 故障恢复机制
- 通过
JobEventStore持久化执行状态(如任务进度、算子协调器状态),故障后从外部存储恢复进度,避免重跑已完成任务。 - 需启用集群高可用(HA)并配置
execution.batch.job-recovery.enabled=true。
- 通过
- JobMaster 故障恢复机制
-
API 演进
- DataSet API 弃用
- 推荐迁移至 DataStream API 或 Table API/SQL,实现流批统一编程模型。
- DataStream API 增强
- 支持全量分区数据处理(
fullWindowPartition),补齐批处理能力。
- 支持全量分区数据处理(
- DataSet API 弃用
相关文章:
Flink 1.20 Kafka Connector:新旧 API 深度解析与迁移指南
Flink Kafka Connector 新旧 API 深度解析与迁移指南 一、Flink Kafka Connector 演进背景 Apache Flink 作为实时计算领域的标杆框架,其 Kafka 连接器的迭代始终围绕性能优化、语义增强和API 统一展开。Flink 1.20 版本将彻底弃用基于 FlinkKafkaConsumer/FlinkK…...
2025年渗透测试面试题总结- 某四字大厂面试复盘扩展 一面(题目+回答)
网络安全领域各种资源,学习文档,以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具,欢迎关注。 目录 某四字大厂面试复盘扩展 一面 一、Java内存马原理与查杀 二、冰蝎与哥斯拉原理对比(技术演…...
批量压缩 jpg/png 等格式照片|批量调整图片的宽高尺寸
图片格式种类非常的多,并且不同的图片由于像素、尺寸不一样,可能占用的空间也会不一样。文件太大会占用较多的磁盘空间,传输及上传系统都非常不方便,可能会收到限制,因此我们经常会碰到需要对图片进行压缩的需求。如何…...
目录穿越 + pickle反序列化 -- xyctf Signin WP
源代码 # -*- encoding: utf-8 -*-File : main.py Time : 2025/03/28 22:20:49 Author : LamentXUflag in /flag_{uuid4}from bottle import Bottle, request, response, redirect, static_file, run, route secret aapp Bottle() route(/) def index():return…...
Spring Boot 框架注解:@ConfigurationProperties
ConfigurationProperties(prefix "sky.jwt") 是 Spring Boot 框架里的一个注解,其主要功能是把配置文件(像 application.properties 或者 application.yml)里的属性值绑定到一个 Java 类的字段上。下面详细阐述其作用:…...
【动手学深度学习】卷积神经网络(CNN)入门
【动手学深度学习】卷积神经网络(CNN)入门 1,卷积神经网络简介2,卷积层2.1,互相关运算原理2.2,互相关运算实现2.3,实现卷积层 3,卷积层的简单应用:边缘检测3.1࿰…...
在huggingface上制作小demo
在huggingface上制作小demo 今天好兄弟让我帮他搞一个模型,他有小样本的化学数据,想让我根据这些数据训练一个小模型,他想用这个模型预测一些值 最终我简单训练了一个小模型,起初想把这个模型和GUI界面打包成exe发给他࿰…...
集合学习内容总结
集合简介 1、Scala 的集合有三大类:序列 Seq、集Set、映射 Map,所有的集合都扩展自 Iterable 特质。 2、对于几乎所有的集合类,Scala 都同时提供了可变和不可变的版本,分别位于以下两个包 不可变集合:scala.collect…...
51.评论日记
千万不能再挖了,否则整个华夏文明将被改写。_哔哩哔哩_bilibili 2025年4月7日22:13:42...
SpringCloud第二篇:注册中心Eureka
注册中心的意义 注册中心 管理各种服务功能包括服务的注册、发现、熔断、负载、降级等,比如dubbo admin后台的各种功能。 有了注册中心,调用关系的变化,画几个简图来看一下。(了解源码可求求: 1791743380) 服务A调用服务B 有了注册中心之后&a…...
ES 参数调优
1、refresh_interval 控制索引刷新的时间间隔。增大这个值可以减少I/O操作,从而提升写入性能,但会延迟新文档的可见性 查看 GET /content_erp_nlp_help_202503191453/_settings?include_defaultstrue 动态修改:refresh_interval 是一个动态…...
用claude3.7,不到1天写了一个工具小程序(11个工具6个游戏)
一、功能概览和本文核心 本次开发,不是1天干撸,而是在下班后或早起搞的,总体加和计算了一下,大概1天的时间(12个小时),平常下班都是9点的衰仔,好在还有双休,谢天谢地。 …...
【GeoDa使用】空间自相关分析操作
使用 GeoDa 软件进行空间自相关分析 双击打开 GeoDa 软件 选择 .shp 文件 导入文件 空间权重矩阵(*.gal / *.gwt)是进行任何空间分析的前提 构建空间权重矩阵 空间权重矩阵(Spatial Weights Matrix) 是一个用来描述空间对象之间…...
什么是数据
一、数据的本质定义 哲学视角 亚里士多德《形而上学》中"未加工的观察记录"现代认知科学:人类感知系统接收的原始刺激信号(如视网膜光信号、听觉神经电信号)信息论奠基人香农:消除不确定性的度量载体 …...
C++基于rapidjson的Json与结构体互相转换
简介 使用rapidjson库进行封装,实现了使用C对结构体数据和json字符串进行互相转换的功能。最短只需要使用两行代码即可无痛完成结构体数据转换为Json字符串。 支持std::string、数组、POD数据(int,float,double等)、std::vector、嵌套结构体…...
OpenStack Yoga版安装笔记(十七)安全组笔记
一、安全组与iptables的关系 OpenStack的安全组(Security Group)默认是通过Linux的iptables实现的。以下是其主要实现原理和机制: 安全组与iptables的关系 OpenStack的安全组规则通过iptables的规则链实现。每条安全组规则会被转换为相应的i…...
通义万相2.1 图生视频:为AI绘梦插上翅膀,开启ALGC算力领域新纪元
通义万相2.1图生视频大模型 通义万相2.1图生视频技术架构万相2.1的功能特点性能优势与其他工具的集成方案 蓝耘平台部署万相2.1核心目标典型应用场景未来发展方向 通义万相2.1ALGC实战应用操作说明功能测试 为什么选择蓝耘智算蓝耘智算平台的优势如何通过API调用万相2.1 写在最…...
Debezium日常分享系列之:Debezium3.1版本之增量快照
Debezium日常分享系列之:Debezium3.1版本之增量快照 按需快照触发一次临时增量快照触发临时阻塞快照增量快照增量快照过程如何 Debezium 解决具有相同主键的记录之间的冲突快照窗口触发增量快照使用附加条件运行临时增量快照使用 Kafka 信号通道触发增量快照临时增量…...
聊聊Spring AI的RedisVectorStore
序 本文主要研究一下Spring AI的RedisVectorStore 示例 pom.xml <dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-vector-store-redis</artifactId> </dependency>配置 spring:ai:vectorstore:…...
Diffusion Policy Visuomotor Policy Learning via Action Diffusion官方项目解读(二)(4)
运行官方代码库中提供的Colab代码:vision-based environment(二)(4) 十六、函数unnormalize_data,继承自torch.utils.data.Dataset十六.1 def __init__()十六.2 def __len__ ()十六.3 def __getitem__()总体…...
52.个人健康管理系统小程序(基于springbootvue)
目录 1.系统的受众说明 2.开发环境与技术 2.1 MYSQL数据库 2.2 Java语言 2.3 微信小程序技术 2.4 SpringBoot框架 2.5 B/S架构 2.6 Tomcat 介绍 2.7 HTML简介 2.8 MyEclipse开发工具 3.系统分析 3.1 可行性分析 3.1.1 技术可行性 3.1.2 经济可行性 3.1.3 操作…...
学习比较JVM篇(六):解读GC日志
一、前言 在之前的文章中,我们对JVM的结构、垃圾回收算法、垃圾回收器做了一些列的讲解,同时也使用了JVM自带的命令行工具进行了实际操作。今天我们继续讲解JVM。 我们学习JVM的目的是为了了解JVM,然后优化对应的参数。那么如何了解JVM运行…...
I²S协议概述与信号线说明
IIS协议概述 IS(Inter-IC Sound)协议,又称 IIS(Inter-IC Sound),是一种专门用于数字音频数据传输的串行总线标准,由飞利浦(Philips)公司提出。该协议通常用于微控制器…...
b4a安卓开发技术和建议,VB6开发Android APK
b4a功能建议实现方法想法创意Wait For可以在参数中直接返回结果吗?Wait For (cam.OpenCamera(front)) Complete (TaskIndex As Int) Wait For B4XPage_PermissionResult (Permission As String, Result As Boolean) 函数别名,减少代码,通用函…...
计算机网络-子网划分试题七
计算机网络中IP地址为172.16.20.60、172.16.30.60、172.16.80.60,子网掩码为255.255.192.0的三台计算机的网络号,子网号及主机号,并确定三台计算机是否处于同一个子网,如果不是请指出哪些在同一个子网,哪些不是&#x…...
免费Deepseek-v3接口实现Browser-Use Web UI:浏览器自动化本地模拟抓取数据实录
源码 https://github.com/browser-use/web-ui 我们按照官方教程,修订几个环节,更快地部署 步骤 1:克隆存储库 git clone https://github.com/browser-use/web-ui.git cd web-ui Step 2: Set Up Python Environment 第 2 步:设置…...
[蓝桥杯] 求和
题目链接 P8772 [蓝桥杯 2022 省 A] 求和 - 洛谷 题目理解 这道题就是公式题,我们模拟出公式后,输出最终结果即可。 本题不难,相信很多同学第一次见到这道题都是直接暴力解题。 两个for循环,测试样例,直接拿下。 #in…...
大数据学习(100)-kafka详解
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言📝支持一…...
通过Ollama本地部署DeepSeek R1模型(Windows版)
嗨,大家好,我是心海 以下是一份详细的Windows系统下通过Ollama本地部署DeepSeek R1模型的教程,内容简洁易懂,适合新手用户参考 本地部署大模型,就有点像在你自己的电脑或者服务器上,安装并运行这样一个“私…...
【C++】vector的底层封装和实现
目录 目录前言基本框架迭代器容量第一个测试,野指针异常第二轮测试,浅拷贝的问题 元素访问修改操作push_backinsert迭代器失效问题 erase 默认成员函数构造函数双重构造引发调用歧义 拷贝构造赋值重载析构函数 源码end 目录 前言 废话不多说࿰…...
