Flink和Kafka连接时的精确一次保证
Flink写入Kafka两阶段提交
端到端的 exactly-once(精准一次)
kafka -> Flink -> kafka
1)输入端
输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)
2)Flink内部
Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义
3)输出端
两阶段提交(2PC)。
写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”。
如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。
必须的配置
1)必须启用检查点
2)指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE
3)配置 Kafka 读取数据的消费者的隔离级别【默认kafka消费者隔离级别是读未提交,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为读已提交】
4)事务超时配置
【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟】
代码实战
kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】
public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.读取 kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*3.写出到 Kafka精准一次 写入 Kafka,需要满足以下条件,【缺一不可】1、开启 checkpoint2、sink 设置保证级别为 精准一次3、sink 设置事务前缀4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092")// 指定序列化器:指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 【3.1】 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("li-")// 【3.3】 设置事务超时时间.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}
后续读取“ws”这个 topic 的消费者,要设置事务的隔离级别为“读已提交”
public class KafkaEOSConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用【两阶段提交】写入的 TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}
处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。
相关文章:
Flink和Kafka连接时的精确一次保证
Flink写入Kafka两阶段提交 端到端的 exactly-once(精准一次) kafka -> Flink -> kafka 1)输入端 输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset) 2)Flink内…...
UE4动作游戏实例RPG Action解析三:实现效果,三连击Combo,射线检测,显示血条,火球术
一、三连Combo 实现武器三连击,要求: 1.下一段Combo可以随机选择, 2.在一定的时机才能再次检测输入 3. 等当前片段播放完才播放下一片段 1.1、蒙太奇设置 通过右键-新建蒙太奇片段,在蒙太奇里创建三个片段,并且移除相关连接,这样默认只会播放第一个片段 不同片段播…...
Linux/麒麟系统上部署Vue+SpringBoot前后端分离项目
目录 1. 前端准备工作 1.1 在项目根目录创建两份环境配置文件 1.2 环境配置 2. 后端准备工作 2.1 在项目resources目录创建两份环境配置文件 2.2 环境配置 3. 前后端打包 3.1 前端打包 3.2 后端打包 4、服务器前后端配置及部署 4.1 下载、安装、启动Nginx 4.2 前端项目部署…...
STM32在FreeRTOS下的us延时
STM32在FreeRTOS下的us延时 前言 freeRTOS下跑SPI时需要微秒级别的延时,但是freeRTOS只提供了毫秒级的,记录一下实现us延时的方法。 前期分析 最简单的方式就是开个定时器或者干脆直接计算一下用nop做都可以实现us延时,但是显然还是使用滴…...
软件测试/人工智能丨深入人工智能软件测试:PyTorch引领新时代
在人工智能的浪潮中,软件测试的角色变得愈发关键。本文将介绍在人工智能软件测试中的一些关键技术,以及如何借助PyTorch深度学习框架来推动测试的创新与升级。 PyTorch:深度学习的引擎 PyTorch作为一种开源的深度学习框架,为软件…...
Android 当中的 Fragment 协作解耦方式
Android 当中的 Fragment 协作解耦方式 文章目录 Android 当中的 Fragment 协作解耦方式第一章 前言介绍第01节 遇到的问题第02节 绘图说明 第二章 核心代码第01节 代理人接口第02节 中间人 Activity第03节 开发者A第04节 开发者B第05节 测试类 第一章 前言介绍 第01节 遇到的…...
城市网吧视频智能监控方案,实现视频远程集中监控
网吧环境较为复杂,电脑设备众多且人员流动性大,极易发生人员或消防事故,亟需改变,TSINGSEE青犀AI智能网吧视频监管方案可以帮助实现对网吧环境和用户活动的实时监控和管理。 1、视频监控系统 在网吧内部布置高清摄像头࿰…...
C#WPF视频播放器实例
本文实例演示C#WPF视频播放器 实例如下: 修改mainwindow的代码 <Windowx:Class="PlayerDemo.MainWindow"xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"xml…...
【uniapp】Google Maps
话不多说 直接上干货 提前申请谷歌地图账号一、新建地图 使用h5获取当前定位或者使用三方uniapp插件 var coords ""navigator.geolocation.getCurrentPosition(function(position) {coords {lat: position.coords.latitude,lng: position.coords.longitude};lats …...
C语言变量与常量
跟着肯哥(不是我)学C语言的变量和常量、跨文件访问、栈空间 栈空间还不清楚,期待明天的课程内容 C变量 变量(Variable)是用于存储和表示数据值的名称。 主要包括四个环节:定义、初始化、声明、使用 在我刚…...
AI创作系统ChatGPT网站源码/支持DALL-E3文生图/支持最新GPT-4-Turbo模型+Prompt应用
一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…...
二维码智慧门牌管理系统升级,异常门牌聚合解决方案助力高效管理
文章目录 前言一、异常门牌聚合解决方案 前言 在今天的数字化时代,智慧城市已成为发展趋势,其中二维码智慧门牌管理系统扮演着至关重要的角色。通过对门牌信息进行数字化管理,该系统极大提升了城市管理的效率和便捷性。然而,随着…...
【XTDrone Ubuntu20.04】XTDrone+ Ubuntu20.04 + PX4安装
XTDrone仿真平台配置 文章目录 XTDrone仿真平台配置依赖安装 ROS一键安装Marvos安装PX4 安装安装QTGroundControlXTDrone下载安装 环境: VMWare 16.0 Ubuntu 22.04 (因为没人配过)Ubuntu 20.04 参考文章: 仿真平台基础配置 (yuq…...
河北大学选择ZStack Cube超融合一体机打造实训云平台
河北大学通过云轴科技ZStack Cube超融合一体机构建校园实训云平台,部署测试仅耗时1天,该平台能够更快地为学生提供高性能、高可用的云主机、云存储和云网络服务;同时也能满足日常运维管理要求,为学生提供更好的实训环境。 河北省…...
IDEA远程一键部署SpringBoot到Docker
IDEA是Java开发利器,Spring Boot是Java生态中最流行的微服务框架,docker是时下最火的容器技术,那么它们结合在一起会产生什么化学反应呢? 一、开发前准备 1. Docker安装 可以参考:https://docs.docker.com/install/ 2…...
索引三星结构
三星索引的定义,可以先给我们对索引优化提供一个大概的思路: 满足第1颗星: 取出所有的等值谓词的列,作为索引最开头的列——以任意顺序都可以。 满足第2颗星: 将order by加入到索引列,不要改变这些列的顺…...
rust 笔记 高级错误处理
文章目录 错误处理组合器or() 和 and()or_else() 和 and_then()filtermap() 和 map_err()map_or() 和 map_or_else()ok_or() and ok_or_else() 自定义错误类型错误转换 From 特征 归一化不同的错误类型Box<dyn Error>自定义错误类型 简化错误处理thiserroranyhow 错误处理…...
python+Django 使用apscheduler实现定时任务 管理调度
apscheduler实现定时任务 管理调度 在Django 项目中经常会用到定时任务去处理一些业务处理 使用 APScheduler 可以轻松地实现定时任务的管理和调度。你可以通过以下步骤来创建、启动、停止和删除定时任务: 1.创建调度器对象: from apscheduler.schedu…...
Java编程中,异步操作流程中,最终一致性以及重试补偿的设计与实现
一、背景 微服务设计中,跨服务的调用,由于网络或程序故障等各种原因,经常会出现调用失败而需要重试。另外,在异步操作中,我们提供接口让外部服务回调。回调过程中,也可能出现故障。 这就要求我们主动向外…...
吴恩达《机器学习》8-7:多元分类
在机器学习领域,经常会遇到不止两个类别的分类问题。这时,需要使用多类分类技术。本文将深入探讨多类分类,并结合学习内容中的示例,了解神经网络在解决这类问题时的应用。 一、理解多类分类 多类分类问题是指当目标有多个类别时…...
MFCMouseEffect:把桌面输入反馈这件事,做成一个真正可扩展的引擎
MFCMouseEffect:把桌面输入反馈这件事,做成一个真正可扩展的引擎 很多录屏、教程、演示和桌面工具,功能本身已经足够好,但一到“用户看你怎么操作”这一步,体验就会突然掉下来。 为什么? 因为点击不够明…...
STM32从入门到实战:两周速成指南
STM32快速入门指南:从零基础到项目实战1. 项目概述1.1 STM32与8051的对比分析对于已经掌握8051和C语言的开发者而言,STM32的学习曲线并不陡峭。关键在于理解何时需要从8051迁移到STM32平台:计算能力需求:当8051的主频无法满足复杂…...
OpenProject全球化协作本地化策略指南:打破语言壁垒的实战方案
OpenProject全球化协作本地化策略指南:打破语言壁垒的实战方案 【免费下载链接】openproject OpenProject is the leading open source project management software. 项目地址: https://gitcode.com/GitHub_Trending/op/openproject OpenProject作为领先的开…...
SpringBoot 接口全维度性能优化指南
文章目录: 前言 一、背景 1.1 为什么必须做 SpringBoot 接口优化? 1.2 接口优化的核心目标 1.3 本文适用范围 二、核心原理 2.1 接口请求全流程(瓶颈定位核心) 2.2 核心优化原理总览 2.3 优化优先级(生产环境…...
LongCat-Image-Edit图片编辑神器:5分钟快速部署,一句话精准改图
LongCat-Image-Edit图片编辑神器:5分钟快速部署,一句话精准改图 1. 产品核心能力介绍 LongCat-Image-Edit是美团LongCat团队推出的开源图像编辑模型,它让复杂的图片编辑变得像说话一样简单。这个模型有三大杀手锏: 一句话精准编…...
F3D开发环境搭建:从零开始编译和构建这个开源3D项目
F3D开发环境搭建:从零开始编译和构建这个开源3D项目 【免费下载链接】f3d Fast and minimalist 3D viewer. 项目地址: https://gitcode.com/GitHub_Trending/f3/f3d F3D是一款快速且极简的3D查看器,本指南将带你从零开始搭建其开发环境࿰…...
告别卡顿闪烁!在Cesium 1.134中集成SOG格式,让400万高斯秒级加载
突破性能瓶颈:Cesium 1.134集成SOG格式实现400万高斯秒级渲染 在三维地理空间可视化领域,Cesium一直是开发者构建高精度场景的首选引擎。但当项目涉及数百万级高斯泼溅数据时,传统加载方式往往导致令人崩溃的卡顿和视角移动时的闪烁问题。最近…...
【Java】UTF-8变长编码及其3字节存储奥秘
UTF-8 是一种变长编码,一个字符可能由 1 到 4 个字节组成。 解码时(将字节数组转回 String),计算机并不需要“猜”或者去查表,因为长度信息本身就包含在字节的“头部”里。这就是 UTF-8 设计的精妙之处:它是…...
智能家居控制中心:OpenClaw桥接Qwen3-32B-Chat与HomeAssistant
智能家居控制中心:OpenClaw桥接Qwen3-32B-Chat与HomeAssistant 1. 为什么需要AI驱动的家居控制中心 去年冬天的一个深夜,我被空调异常制热的噪音惊醒。摸黑在手机APP上反复调整参数无果后,突然意识到:如果有个能理解自然语言的智…...
百川2-13B-4bits模型精调:解决OpenClaw复杂任务分解难题
百川2-13B-4bits模型精调:解决OpenClaw复杂任务分解难题 1. 问题背景:OpenClaw的复杂任务执行困境 去年冬天,当我第一次尝试用OpenClaw自动化处理一份市场调研报告时,遭遇了令人抓狂的体验。这个看似简单的任务需要完成网页数据…...
