当前位置: 首页 > news >正文

Flink和Kafka连接时的精确一次保证

Flink写入Kafka两阶段提交

端到端的 exactly-once(精准一次)

kafka -> Flink -> kafka

1)输入端

输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)

2)Flink内部

Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义

3)输出端

两阶段提交(2PC)

写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”

如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

必须的配置

1)必须启用检查点

2)指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE

3)配置 Kafka 读取数据的消费者的隔离级别【默认kafka消费者隔离级别是读未提交,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为读已提交

4)事务超时配置

【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟

代码实战

kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】

public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.读取 kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*3.写出到 Kafka精准一次 写入 Kafka,需要满足以下条件,【缺一不可】1、开启 checkpoint2、sink 设置保证级别为 精准一次3、sink 设置事务前缀4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092")// 指定序列化器:指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 【3.1】 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("li-")// 【3.3】 设置事务超时时间.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}

后续读取“ws”这个 topic 的消费者,要设置事务的隔离级别为“读已提交”

public class KafkaEOSConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用【两阶段提交】写入的 TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}

处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。

相关文章:

Flink和Kafka连接时的精确一次保证

Flink写入Kafka两阶段提交 端到端的 exactly-once&#xff08;精准一次&#xff09; kafka -> Flink -> kafka 1&#xff09;输入端 输入数据源端的 Kafka 可以对数据进行持久化保存&#xff0c;并可以重置偏移量&#xff08;offset&#xff09; 2&#xff09;Flink内…...

UE4动作游戏实例RPG Action解析三:实现效果,三连击Combo,射线检测,显示血条,火球术

一、三连Combo 实现武器三连击,要求: 1.下一段Combo可以随机选择, 2.在一定的时机才能再次检测输入 3. 等当前片段播放完才播放下一片段 1.1、蒙太奇设置 通过右键-新建蒙太奇片段,在蒙太奇里创建三个片段,并且移除相关连接,这样默认只会播放第一个片段 不同片段播…...

Linux/麒麟系统上部署Vue+SpringBoot前后端分离项目

目录 1. 前端准备工作 1.1 在项目根目录创建两份环境配置文件 1.2 环境配置 2. 后端准备工作 2.1 在项目resources目录创建两份环境配置文件 2.2 环境配置 3. 前后端打包 3.1 前端打包 3.2 后端打包 4、服务器前后端配置及部署 4.1 下载、安装、启动Nginx 4.2 前端项目部署…...

STM32在FreeRTOS下的us延时

STM32在FreeRTOS下的us延时 前言 freeRTOS下跑SPI时需要微秒级别的延时&#xff0c;但是freeRTOS只提供了毫秒级的&#xff0c;记录一下实现us延时的方法。 前期分析 最简单的方式就是开个定时器或者干脆直接计算一下用nop做都可以实现us延时&#xff0c;但是显然还是使用滴…...

软件测试/人工智能丨深入人工智能软件测试:PyTorch引领新时代

在人工智能的浪潮中&#xff0c;软件测试的角色变得愈发关键。本文将介绍在人工智能软件测试中的一些关键技术&#xff0c;以及如何借助PyTorch深度学习框架来推动测试的创新与升级。 PyTorch&#xff1a;深度学习的引擎 PyTorch作为一种开源的深度学习框架&#xff0c;为软件…...

Android 当中的 Fragment 协作解耦方式

Android 当中的 Fragment 协作解耦方式 文章目录 Android 当中的 Fragment 协作解耦方式第一章 前言介绍第01节 遇到的问题第02节 绘图说明 第二章 核心代码第01节 代理人接口第02节 中间人 Activity第03节 开发者A第04节 开发者B第05节 测试类 第一章 前言介绍 第01节 遇到的…...

城市网吧视频智能监控方案,实现视频远程集中监控

网吧环境较为复杂&#xff0c;电脑设备众多且人员流动性大&#xff0c;极易发生人员或消防事故&#xff0c;亟需改变&#xff0c;TSINGSEE青犀AI智能网吧视频监管方案可以帮助实现对网吧环境和用户活动的实时监控和管理。 1、视频监控系统 在网吧内部布置高清摄像头&#xff0…...

C#WPF视频播放器实例

本文实例演示C#WPF视频播放器 实例如下: 修改mainwindow的代码 <Windowx:Class="PlayerDemo.MainWindow"xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"xml…...

【uniapp】Google Maps

话不多说 直接上干货 提前申请谷歌地图账号一、新建地图 使用h5获取当前定位或者使用三方uniapp插件 var coords ""navigator.geolocation.getCurrentPosition(function(position) {coords {lat: position.coords.latitude,lng: position.coords.longitude};lats …...

C语言变量与常量

跟着肯哥&#xff08;不是我&#xff09;学C语言的变量和常量、跨文件访问、栈空间 栈空间还不清楚&#xff0c;期待明天的课程内容 C变量 变量&#xff08;Variable&#xff09;是用于存储和表示数据值的名称。 主要包括四个环节&#xff1a;定义、初始化、声明、使用 在我刚…...

AI创作系统ChatGPT网站源码/支持DALL-E3文生图/支持最新GPT-4-Turbo模型+Prompt应用

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…...

二维码智慧门牌管理系统升级,异常门牌聚合解决方案助力高效管理

文章目录 前言一、异常门牌聚合解决方案 前言 在今天的数字化时代&#xff0c;智慧城市已成为发展趋势&#xff0c;其中二维码智慧门牌管理系统扮演着至关重要的角色。通过对门牌信息进行数字化管理&#xff0c;该系统极大提升了城市管理的效率和便捷性。然而&#xff0c;随着…...

【XTDrone Ubuntu20.04】XTDrone+ Ubuntu20.04 + PX4安装

XTDrone仿真平台配置 文章目录 XTDrone仿真平台配置依赖安装 ROS一键安装Marvos安装PX4 安装安装QTGroundControlXTDrone下载安装 环境&#xff1a; VMWare 16.0 Ubuntu 22.04 &#xff08;因为没人配过&#xff09;Ubuntu 20.04 参考文章&#xff1a; 仿真平台基础配置 (yuq…...

河北大学选择ZStack Cube超融合一体机打造实训云平台

河北大学通过云轴科技ZStack Cube超融合一体机构建校园实训云平台&#xff0c;部署测试仅耗时1天&#xff0c;该平台能够更快地为学生提供高性能、高可用的云主机、云存储和云网络服务&#xff1b;同时也能满足日常运维管理要求&#xff0c;为学生提供更好的实训环境。 河北省…...

IDEA远程一键部署SpringBoot到Docker

IDEA是Java开发利器&#xff0c;Spring Boot是Java生态中最流行的微服务框架&#xff0c;docker是时下最火的容器技术&#xff0c;那么它们结合在一起会产生什么化学反应呢&#xff1f; 一、开发前准备 1. Docker安装 可以参考&#xff1a;https://docs.docker.com/install/ 2…...

索引三星结构

三星索引的定义&#xff0c;可以先给我们对索引优化提供一个大概的思路&#xff1a; 满足第1颗星&#xff1a; 取出所有的等值谓词的列&#xff0c;作为索引最开头的列——以任意顺序都可以。 满足第2颗星&#xff1a; 将order by加入到索引列&#xff0c;不要改变这些列的顺…...

rust 笔记 高级错误处理

文章目录 错误处理组合器or() 和 and()or_else() 和 and_then()filtermap() 和 map_err()map_or() 和 map_or_else()ok_or() and ok_or_else() 自定义错误类型错误转换 From 特征 归一化不同的错误类型Box<dyn Error>自定义错误类型 简化错误处理thiserroranyhow 错误处理…...

python+Django 使用apscheduler实现定时任务 管理调度

apscheduler实现定时任务 管理调度 在Django 项目中经常会用到定时任务去处理一些业务处理 使用 APScheduler 可以轻松地实现定时任务的管理和调度。你可以通过以下步骤来创建、启动、停止和删除定时任务&#xff1a; 1.创建调度器对象&#xff1a; from apscheduler.schedu…...

Java编程中,异步操作流程中,最终一致性以及重试补偿的设计与实现

一、背景 微服务设计中&#xff0c;跨服务的调用&#xff0c;由于网络或程序故障等各种原因&#xff0c;经常会出现调用失败而需要重试。另外&#xff0c;在异步操作中&#xff0c;我们提供接口让外部服务回调。回调过程中&#xff0c;也可能出现故障。 这就要求我们主动向外…...

吴恩达《机器学习》8-7:多元分类

在机器学习领域&#xff0c;经常会遇到不止两个类别的分类问题。这时&#xff0c;需要使用多类分类技术。本文将深入探讨多类分类&#xff0c;并结合学习内容中的示例&#xff0c;了解神经网络在解决这类问题时的应用。 一、理解多类分类 多类分类问题是指当目标有多个类别时…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

遍历 Map 类型集合的方法汇总

1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一&#xff09; 1. CSI-2层定义&#xff08;CSI-2 Layer Definitions&#xff09; 分层结构 &#xff1a;CSI-2协议分为6层&#xff1a; 物理层&#xff08;PHY Layer&#xff09; &#xff1a; 定义电气特性、时钟机制和传输介质&#xff08;导线&#…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

Linux-07 ubuntu 的 chrome 启动不了

文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了&#xff0c;报错如下四、启动不了&#xff0c;解决如下 总结 问题原因 在应用中可以看到chrome&#xff0c;但是打不开(说明&#xff1a;原来的ubuntu系统出问题了&#xff0c;这个是备用的硬盘&a…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

七、数据库的完整性

七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…...