当前位置: 首页 > news >正文

关于flink重新提交任务,重复消费kafka的坑

异常现象1

按照以下方式设置backend目录和checkpoint目录,fsbackend目录有数据,checkpoint目录没数据

env.getCheckpointConfig().setCheckpointStorage(PropUtils.getValueStr(Constant.ENV_FLINK_CHECKPOINT_PATH));
env.setStateBackend(new FsStateBackend(PropUtils.getValueStr(Constant.ENV_FLINK_STATEBACKEND_PATH)));

原因

我以为checkpoint和fsbackend要同时设置,其实,1.14.3版本,setCheckpointStorage和stateBackend改成了分着设置

我上边代码这样设置,相当于首先指定了以下checkpoint按照默认的backend存储,然后又指定了按照fsbackend存储,因此首先指定的checkpoint目录没有数据。

正解

env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(PropUtils.getValueStr(Constant.ENV_FLINK_CHECKPOINT_PATH));

State Backends | Apache Flink

异常现象2

开启checkpoint eos,开启容错,每次任务重新提交都会重新消费kafka已经完成了checkpoint的数据

原因

我以为只要开启这两个配置就可以保证已经checkpoint的kafka数据不会被重复消费,其实不然

checkpoint是flink内部的容错机制,他能保证在设置了失败重启策略之后(setRestartStrategy),如果发生异常导致失败重试之后自动从最新checkpoint恢复。不是手动重启。。。手动重启默认不会进行加载状态数据,所以每次都会从头消费

正解

flink任务 -s 指定恢复点提交,这个恢复点可以是checkpoint也可以时savepoint。

# 启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar# 备份,创建savepoint
/home/cuadmin/flink-1.14.3/bin/flink savepoint 19f4bb5d103ea8695712d4d1a797893f /home/cuadmin/flink-1.14.3/savepoint# 指定savepoint启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \
-s  file:/home/cuadmin/flink-1.14.3/savepoint/savepoint-033556-251a9e55ed25  \
/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar

异常现象4

这是错误的

# 指定savepoint启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \

/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar
-s  file:/home/cuadmin/flink-1.14.3/savepoint/savepoint-033556-251a9e55ed25  \

按照上述命令执行,这个地方显示恢复点的加载情况,这里没显示,代表恢复点没有执行成功

原因

-s的位置有问题,我之前以为没有顺序,把-s 放到了命令最后,结果没报错,也没识别。。

正解

-s 位置要正确

# 指定savepoint启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \
-s  file:/home/cuadmin/flink-1.14.3/savepoint/savepoint-033556-251a9e55ed25  \
/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar

异常现象3

我记得savepoint和checkpoint是都可以用来flink -s 进行恢复点恢复的。但是每次都提示恢复失败,提示文件找不到,savepoint就可以。。。

原因

cancel job会将 checkpoint的数据删掉。。。

正解

测试的时候,直接stop-cluster,这样checkpoint数据就不会被删除了

保留 Checkpoint 

Checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

ExternalizedCheckpointCleanup 配置项定义了当作业取消时,对作业 checkpoint 的操作:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。

总结

1、savepoint的数据要比checkpoint更加稳定,比如你可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。checkpoint就不可以,因为他有很多相对路径配置。

2、savepoint和checkpoint一般都能作为恢复点使用,例外情况是使用 RocksDB 状态后端的增量 Checkpoint。他们使用了一些 RocksDB 内部格式,而不是 Flink 的本机 Savepoint 格式。这使他们成为了与 Savepoint 相比,更轻量级的 Checkpoint 机制的第一个实例。

3、任务因为偶然原因内部重启(task级别),通过失败重试机制+checkpoint自动进行重放,任务因重启、断电、死机等外部因素(cluster级别),通过-s 指定checkpoint/savepoint恢复点进行手动重放。这样就可以保证状态数据的稳定

相关文章:

关于flink重新提交任务,重复消费kafka的坑

异常现象1 按照以下方式设置backend目录和checkpoint目录,fsbackend目录有数据,checkpoint目录没数据 env.getCheckpointConfig().setCheckpointStorage(PropUtils.getValueStr(Constant.ENV_FLINK_CHECKPOINT_PATH)); env.setStateBackend(new FsStat…...

Win11右键恢复Win10老版本

Win11右键恢复Win10老版本 最近自己更新了windows11的OS,整体感觉都是不错的,但是就是每次右键菜单我都要再次点击下展开更多选项,这对追求极简主义的我,就是不爽, 手动恢复win10操作吧! 第一种:创建文件(简单快速) 1.新建一个resoreRightKey.reg文件,并在里面填入如下代码 W…...

ur机械臂30003端口socket通信踩坑(double类型数据怎么解析)

坑的由来 都知道在网络通信时要把网络字节序转化为主机字节序才行,但是c里的标准库函数ntohl默认是转换32位字节序的数据,也就是说默认是转换float类型的数据;而ur机械臂30003端口发送的是double类型的数据,没法直接用ntohl进行转…...

代理IP与Socks5代理的技术奇妙之旅

随着数字化时代的崛起,网络工程师们日益承担着维护网络稳定性和保护数据安全的重任。在这个充满挑战的世界里,代理IP与Socks5代理技术成为了他们的秘密武器,本文将带您踏上一段技术奇妙之旅,深入了解这两项技术在不同领域中的应用…...

自动化测试定位不到元素?可能是 frame 在搞鬼

很多人在用Splinter或Selenium定位页面元素的时候会遇到定位不到的问题,明明元素就在那儿,就是定位不到,这种情况很有可能是frame在搞鬼。 说白了就是网站上的网页A,又嵌入了其他网页B。你访问了网页A,在里面可以看到…...

uni-app 开发中,监听 input 键盘事件获取不到按下按键值怎么办?

uniapp 开发 H5 时&#xff0c;无法监听按钮键盘事件的原因以及解决方法。 问题描述&#xff1a; 不少 uni-app 开发者在使用 input 组件时&#xff0c;监听 keyup 事件时&#xff0c;获取不到键盘的 keyCode。编写的代码如下&#xff1a; <template><input keyup&…...

【juc】countdownlatch实现并发网络请求

目录 一、截图示例二、代码示例2.1 测试代码2.2 接口代码 一、截图示例 二、代码示例 2.1 测试代码 package com.learning.countdownlatch;import lombok.extern.slf4j.Slf4j; import org.springframework.web.client.RestTemplate;import java.util.Arrays; import java.uti…...

在供应链管理中,如何做好库存分析?库存分析有哪些监控指标?

在供应链管理中&#xff0c;库存分析是其重要的一环。库存分析的方法繁杂且广泛&#xff0c;选择正确的方法才能更好的进行库存分析&#xff0c;下面就为大家盘点一些常用的库存分析方法和监控指标&#xff0c;全程干货&#xff0c;建议收藏&#xff01; 01 如何进行库存分析&…...

黑豹程序员-架构师学习路线图-百科:Database数据库

文章目录 1、什么是Database2、发展历史3、数据库排行网4、总结 1、什么是Database 当今世界是一个充满着数据的互联网世界&#xff0c;各处都充斥着大量的数据。即这个互联网世界就是数据世界。 支撑这个数据世界的基石就是数据库&#xff0c;数据库也可以称为数据的仓库。 …...

你相信光吗?黑灯工厂重新相信“光”

你知道“黑灯工厂”吗&#xff1f;望文生义&#xff0c;所谓黑灯工厂&#xff0c;就是可以不需要照明的工厂。全程流水线自动化生产&#xff0c;无人干预、无人值守&#xff0c;工厂变成黑匣子&#xff0c;原材料进去&#xff0c;成品出来&#xff0c;流水线上百分百自动化。 完…...

Ubuntu 20.04使用源码安装nginx 1.14.0

nginx安装及使用&#xff08;详细版&#xff09;是一篇参考博文。 http://nginx.org/download/可以选择下载源码的版本。 sudo wget http://nginx.org/download/nginx-1.14.0.tar.gz下载源代码。 sudo tar xzf nginx-1.14.0.tar.gz进行解压。 cd nginx-1.14.0进入到源代码…...

springboot框架拦截器中HttpServletRequest 请求如何区分是图片上传流还是普通的字符流?

在Spring Boot框架中的拦截器&#xff08;Interceptor&#xff09;中&#xff0c;可以通过检查Content-Type请求头来区分图片上传流和普通的字符流。 当客户端发送POST请求并携带文件时&#xff0c;Content-Type请求头通常会包含multipart/form-data或者类似的值。这表明该请求…...

简单聊聊 TCP 协议

简单聊聊 TCP 协议 如何实现可靠传输 ?完全可靠存在比特差错存在丢包流水线可靠数据传输协议回退N步 (GBN)选择重传 (ARQ) 小结 TCPTCP 连接报文段结构序号和确认号 可靠数据传输避免重传超时时间加倍快速重传回退N步还是选择重传 流量控制连接管理拥塞控制拥塞原因拥塞控制方…...

钡铼BL124PN:简单快速转换Profinet到Ethernet/IP

钡铼技术BL124PN是一款高性能的Profinet转Ethernet/IP网关设备。该网关专为工业自动化领域设计&#xff0c;用于实现不同协议之间的互连和通信。BL124PN采用可靠稳定的硬件和先进的通信技术&#xff0c;具有以下主要特点&#xff1a; 协议转换能力&#xff1a;BL124PN能够将Pr…...

【golang】go 空结构体 详解 空结构体内容占用及大小

一、空结构体基础 空结构实例 和 空结构体变量 本质是一样的 1、所有空结构体地址都是一样的2、大小都为0&#xff08;最独特的&#xff09; package mainimport ("fmt""time""unsafe" )type EST struct { }func main() {// 一、基础// 空结构…...

身为产品经理该如何向客户推广API商品数据接口

在当今数字化的时代&#xff0c;API&#xff08;Application Programming Interface&#xff0c;应用程序编程接口&#xff09;已成为各种软件应用程序之间交互数据的主要方式。API商品数据接口作为一种特殊类型的API&#xff0c;能够让不同的系统之间共享商品数据&#xff0c;…...

【数据结构】460. LFU 缓存

460. LFU 缓存 解题思路 get操作 返回key对应的val 然后增加对应的freq插入操作 如果key已经存在 直接进行更新 如果不存在 但是容器已经满了 直接进行删除freq最小的Key 之后进行插入 class LFUCache {// key到 val的映射 KVHashMap<Integer,Integer> keyToVal;// …...

文字转语音播报模块(一):阿里云nls服务使用示例

一、业务场景 最近笔者在业务中涉及到语音告警的模块&#xff0c;需要讲告警内容以文件或流形式返回给前端进行语音播报&#xff0c;具体的分析与处理如下 二、业务分析 首先告警内容提示信息这里做的处理是通过专门字段去存储、编辑&#xff0c;根据拟定好的代码逻辑判断是…...

Vscode配置C#编程环境(win10)

目录 1、安装好Vscode 2、下载安装.NetCore SDK 3、配置C#环境 3.1 打开Vscode并下载扩展 3.2 Vscode中打开文件夹并配置环境 3.3 调试运行 1、安装好Vscode 2、下载安装.NetCore SDK 官网如下&#xff0c;下载完成后双击打开一路走到底就行.NetCore SDK官网 软件显示安…...

python:xlrd 读取 Excel文件,显示在 tkinterTable 表格中

pip install xlrd xlrd-1.2.0-py2.py3-none-any.whl (103 kB) 摘要: Library for developers to extract data from Microsoft Excel (tm) spreadsheet files pip install tkinterTable tkintertable-1.3.3.tar.gz (58 kB) 摘要: Extendable table class for Tkinter 源代…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装&#xff08;Encapsulation&#xff09; 定义&#xff1a;将数据&#xff08;属性&#xff09;和操作数据的方法绑定在一起&#xff0c;通过访问控制符&#xff08;private、protected、public&#xff09;隐藏内部实现细节。示例&#xff1a; public …...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

前端倒计时误差!

提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件

在选煤厂、化工厂、钢铁厂等过程生产型企业&#xff0c;其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进&#xff0c;需提前预防假检、错检、漏检&#xff0c;推动智慧生产运维系统数据的流动和现场赋能应用。同时&#xff0c;…...

在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module

1、为什么要修改 CONNECT 报文&#xff1f; 多租户隔离&#xff1a;自动为接入设备追加租户前缀&#xff0c;后端按 ClientID 拆分队列。零代码鉴权&#xff1a;将入站用户名替换为 OAuth Access-Token&#xff0c;后端 Broker 统一校验。灰度发布&#xff1a;根据 IP/地理位写…...

使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装

以下是基于 vant-ui&#xff08;适配 Vue2 版本 &#xff09;实现截图中照片上传预览、删除功能&#xff0c;并封装成可复用组件的完整代码&#xff0c;包含样式和逻辑实现&#xff0c;可直接在 Vue2 项目中使用&#xff1a; 1. 封装的图片上传组件 ImageUploader.vue <te…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

基于Docker Compose部署Java微服务项目

一. 创建根项目 根项目&#xff08;父项目&#xff09;主要用于依赖管理 一些需要注意的点&#xff1a; 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件&#xff0c;否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...

基于Java+VUE+MariaDB实现(Web)仿小米商城

仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意&#xff1a;运行前…...