当前位置: 首页 > news >正文

Flink之Watermark策略代码模板

方式作用
WatermarkStrategy.noWatermarks()不生成watermark
WatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略
WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略
WatermarkStrategy.forGenerator()自定义watermark生成策略
  • noWatermarks
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark生成策略,选择不生成watermarkWatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    关于noWaterMarks()的使用没有太多内容.
  • forMonotonousTimestamps
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用紧跟最大事件时间策略WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps()// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于forMonotonousTimestamps()可说内容并不多,如果选择了forMonotonousTimestamps这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
    通过源码内容可以看到forMonotonousTimestamps底层也是使用的forBoundedOutOfOrderness方式,只不过将容错时间设置为了0,源码如下:
    // 首先看这里,继承的BoundedOutOfOrdernessWatermarks
    public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {/** Creates a new watermark generator with for ascending timestamps. */public AscendingTimestampsWatermarks() {super(Duration.ofMillis(0)); // 这里将容错时间设置为了0}
    }
    
  • forBoundedOutOfOrderness
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用允许水位线乱序策略,并设置最大容错时间为2sWatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000))// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为2S,那么前后的数据差最大只能是2S,如果差值大于2S,后来的这条数据就会被抛弃.

相关文章:

Flink之Watermark策略代码模板

方式作用WatermarkStrategy.noWatermarks()不生成watermarkWatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略WatermarkStrategy.forGenerator()自定义watermark生成策略 …...

ubuntu 安装postgresql,增加VECTOR向量数据库插件 踏坑详细流程

PGSQL安装&#xff0c;删除&#xff0c;运行&#xff0c;修改密码流程 Ubuntu18.04安装与配置postgresql含远程连接教程&#xff08;含踩坑记录&#xff09;_sudo apt-get install postgresql-CSDN博客 详细安装流程以上博客&#xff0c;自己也记录下 安装vector扩展连接 声明…...

基于Springboot实现影视影院订票选座管理系统【项目源码+论文说明】分享

基于Springboot实现影视影院订票选座管理系统演示 摘要 本论文主要论述了如何使用JAVA语言开发一个影城管理系统 &#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论…...

mysql批量插入数据,跳过唯一索引报错

数据准备 DROP TABLE IF EXISTS user1; CREATE TABLE user1 ( id INT NOT NULL AUTO_INCREMENT, name VARCHAR(45) NULL, age INT(3) NOT NULL, PRIMARY KEY (id), UNIQUE INDEX u_name (name));insert into user1(name, age) values (zhangshan, 18), (lisi, 19);1. INSERT I…...

论文阅读--Energy efficiency in heterogeneous wireless access networks

异构无线接入网络的能源效率 论文信息&#xff1a;Navaratnarajah S, Saeed A, Dianati M, et al. Energy efficiency in heterogeneous wireless access networks[J]. IEEE wireless communications, 2013, 20(5): 37-43. I. ABSTRACT && INTRODUCTION 本文提出了无…...

Redis的C客户端(hiredis库)使用

文章目录 1、Ubuntu安装redis服务端2、hiredis库的安装3、同步API接口的使用3.1、连接redis数据库redisConnect3.2、发送需要执行的命令redisCommand3.3、redisCommandArgv函数3.4、redisAppendCommand*函数支持管道命令3.5、释放资源3.6、同步连接代码 3.7、异步连接4、redis连…...

光引擎、光模块、光器件之间的关系和区别

最近小编有收到一些用户问“光引擎、光模块、光器件之间的关系和区别&#xff1f;”&#xff0c;众所周知光通信技术一直在不断演进&#xff0c;为满足不断增长的数据传输需求提供了强大的解决方案。而光通信系统中&#xff0c;光引擎、光模块和光器件是关键的组成部分&#xf…...

【办公-excel】两个时间相减 (二) - 带毫秒的时间进行相减操作

一、使用内部函数 1.1 效果展示 TEXT(((RIGHT(TEXT(B2,"yyyy-mm-dd hh:mm:ss.000"),LEN(TEXT(B2,"yyyy-mm-dd hh:mm:ss.000"))-FIND(".",TEXT(B2,"yyyy-mm-dd hh:mm:ss.000")))-RIGHT(TEXT(A2,"yyyy-mm-dd hh:mm:ss.000"),…...

二次封装View Design的table组件,实现宽度自适应,内容在一行展示

由于table组件本身并不支持宽度自适应&#xff0c;但实际项目需要&#xff0c;而且多处有用到table组件&#xff0c;所以尝试着自己来二次封装一下组件 想法 刚开始的想法很简单&#xff0c;就是获取每一列中数据和标题在表格中的长度&#xff0c;然后将当中最大的长度作为该列…...

Node.js代码漏洞扫描工具介绍——npm audit

npm audit 运行安全检查 主要作用&#xff1a;检查命令将项目中配置的依赖项的描述提交到默认注册中心&#xff0c;并要求报告已知漏洞。如果发现任何漏洞&#xff0c;则将计算影响和适当的补救措施。如果 fix 提供了参数&#xff0c;则将对包树应用补救措施。 具体参考&#x…...

node.js知识系列(3)-每天了解一点

目录 1. Express.js 中的中间件2. 处理路由和请求3. RESTful 路由4. 身份验证和授权5. 视图引擎6. 错误处理中间件7. 文件上传处理8. Cookie 和 Session 管理9. 路由参数和查询参数10. 处理跨域请求&#xff08;CORS&#xff09; &#x1f44d; 点赞&#xff0c;你的认可是我创…...

Zabbix监控系统 自定义监控项、自动发现与自动注册

Zabbix监控系统 自定义监控项、自动发现与自动注册 一、自定义监控内容部署实例二、zabbix 自动发现与自动注册部署实例2.1 部署zabbix自动发现 一、自定义监控内容部署实例 案列&#xff1a;自定义监控客户端服务器登录的人数 需求&#xff1a;限制登录人数不超过 3 个&#…...

Python信号之分享

在了解了Linux的信号基础之后&#xff0c;Python标准库中的signal包就很容易学习和理解。signal包负责在Python程序内部处理信号&#xff0c;典型的操作包括预设信号处理函数&#xff0c;暂停并等待信号&#xff0c;以及定时发出SIGALRM等。要注意&#xff0c;signal包主要是针…...

环信web、uniapp、微信小程序SDK报错详解---登录篇

项目场景&#xff1a; 记录对接环信sdk时遇到的一系列问题&#xff0c;总结一下避免大家再次踩坑。这里主要针对于web、uniapp、微信小程序在对接环信sdk时遇到的问题。主要针对报错400、404、401、40 (一) 登录用户报400 原因分析&#xff1a; 从console控制台输出及networ…...

DAZ To UMA⭐五.模型在Blender中的配置教程

文章目录 🟥 创建符合UMA的材质球属性1️⃣ 合并材质球🎁 选择材质球🎁 合并材质球🎁 删除多余材质球2️⃣ 将身体按材质球拆分🎁 进入身体编辑模式🎁 全选身体🎁 按材质分割身体🎁 重命名不同部位3️⃣ 将其余部位进行拆分🟧 更正选择缩放🟩 更新骨骼结构…...

网络安全工具汇总

网络安全工具汇总 1. 前言1.1. 工具提供 2. 漏洞库3. 杂项3.1. topology-scanner3.2. MDUT3.3. 404 4. 插件工具4.1. 浏览器插件4.1.1. Heimdallr4.1.2. HackTools4.1.3. SwitchyOmega4.1.4. fofa_view4.1.5. mitaka 4.2. CS插件4.2.1. taowu-cobalt_strike4.2.2. OLa4.2.3. Z1…...

day-65 代码随想录算法训练营(19)图论 part 04

463.岛屿的周长 分析&#xff1a; 1.陆地的旁边是海面&#xff0c;存在周长2.陆地在边界上&#xff0c;存在周长 思路一&#xff1a;深度优先遍历 1.通过记录访问情况来访问数据 class Solution { public:int direct[4][2]{{0,1},{0,-1},{1,0},{-1,0}};int res0;void dfs(…...

C++ - 完美语义(右值引用的中篇) - lambda表达式

前言 之前对右值引用的理解&#xff0c;用使用场景做了详细说明&#xff0c;具体看博客&#xff1a;C - 右值引用 和 移动拷贝-CSDN博客 在 有值引用 当中还有一个 完美转发&#xff0c;请看本篇博客。 完美转发 我们现在看这个例子&#xff1a; void Fun(int& x) { …...

常见排序算法详解

目录 排序的相关概念 排序&#xff1a; 稳定性&#xff1a; 内部排序&#xff1a; 外部排序&#xff1a; 常见的排序&#xff1a; 常见排序算法的实现 插入排序&#xff1a; 基本思想&#xff1a; 直…...

监控搭建-Prometheus

监控搭建-Prometheus 1、背景2、目标3、选型4、Prometheus4.1、介绍4.2、架构4.3、构件4.4、运行机制4.5、环境介绍4.6、数据准备4.7、网络策略4.7.1、主机端口放行4.7.2、设备端口放行 4.8、部署4.9、验证4.10、配置 1、背景 随着项目信息化进程的推进&#xff0c;操作系统、…...

别再只会用BurpSuite了!手把手教你用ZAP(Zed Attack Proxy)给Web应用做免费安全体检

从零开始掌握ZAP&#xff1a;开源Web安全测试实战指南 在当今快速迭代的Web开发领域&#xff0c;安全测试早已不是可选项而是必选项。当大多数开发者习惯性地打开BurpSuite时&#xff0c;他们可能忽略了开源世界中同样强大的替代方案——Zed Attack Proxy&#xff08;ZAP&#…...

从GLIBCXX_3.4.29缺失到系统库兼容性:一次深度排错与修复实践

1. 当你的程序突然罢工&#xff1a;GLIBCXX_3.4.29缺失的背后故事 那天我正在部署一个机器学习模型服务&#xff0c;突然终端弹出鲜红的报错&#xff1a;"libstdc.so.6: version GLIBCXX_3.4.29 not found"。这个错误看似简单&#xff0c;却让我花了整整一个下午才彻…...

PADS VX2.7 光绘文件实战:从CAM配置到Gerber输出的全链路解析

1. PADS VX2.7光绘文件生成的核心逻辑 第一次用PADS VX2.7输出Gerber文件时&#xff0c;我被它和其他EDA软件的区别惊到了。不像某些软件一键导出所有层&#xff0c;PADS需要像搭积木一样逐层配置&#xff0c;这种看似繁琐的设计其实暗藏玄机——它让工程师对每层光绘文件的生成…...

别再手动reshape了!用einops.rearrange优雅处理PyTorch张量维度(附实战代码)

用einops.rearrange重塑PyTorch张量&#xff1a;告别混乱的维度操作 深度学习开发中最令人头疼的莫过于张量维度的变换。你是否曾在凌晨三点盯着屏幕&#xff0c;试图理解自己昨天写的permute和reshape组合到底在做什么&#xff1f;或者花费半小时调试一个维度不匹配的错误&…...

MLT框架的“Producer”到底有多智能?深入loader.dict与avformat揭秘媒体文件自动解析

MLT框架的“Producer”智能解析机制&#xff1a;从loader.dict到avformat的深度探索 当你在MLT框架中写下Producer(profile, nullptr, "video.mp4")这样一行看似简单的代码时&#xff0c;背后其实隐藏着一套精妙的媒体文件自动解析系统。这个系统能够根据文件扩展名、…...

C++ TinyWebServer项目实战:手把手教你用阻塞队列实现高性能异步日志(附完整代码)

C TinyWebServer项目实战&#xff1a;手把手教你用阻塞队列实现高性能异步日志&#xff08;附完整代码&#xff09; 在构建高并发服务器时&#xff0c;日志系统往往成为容易被忽视却至关重要的组件。想象这样一个场景&#xff1a;当服务器每秒处理上万请求时&#xff0c;如果每…...

B站视频转文字终极指南:3步快速提取视频字幕和文案

B站视频转文字终极指南&#xff1a;3步快速提取视频字幕和文案 【免费下载链接】bili2text Bilibili视频转文字&#xff0c;一步到位&#xff0c;输入链接即可使用 项目地址: https://gitcode.com/gh_mirrors/bi/bili2text 还在为B站视频内容无法搜索而烦恼吗&#xff1…...

告别重复劳动:用这个Maya Mel脚本插件,5分钟搞定Arnold材质批量调节

告别重复劳动&#xff1a;Maya Mel脚本插件在Arnold材质批量调节中的高效应用 在三维动画和视觉特效制作中&#xff0c;材质调节往往是项目后期最耗时的环节之一。当导演皱着眉头说"这个场景的金属感太强了"或者客户反馈"整体色调需要更暖一些"时&#xf…...

ESJsonFormat-Xcode与MJExtension完美结合:构建高效iOS数据模型

ESJsonFormat-Xcode与MJExtension完美结合&#xff1a;构建高效iOS数据模型 【免费下载链接】ESJsonFormat-Xcode 将JSON格式化输出为模型的属性 项目地址: https://gitcode.com/gh_mirrors/es/ESJsonFormat-Xcode ESJsonFormat-Xcode是一款专为iOS开发者打造的JSON转模…...

Altium Designer 22 导出嘉立创SMT文件保姆级教程(附BOM/坐标文件避坑指南)

Altium Designer 22 导出嘉立创SMT文件全流程解析与实战技巧 在电子设计领域&#xff0c;从手工焊接转向SMT贴片生产是一个关键的进阶步骤。对于使用Altium Designer&#xff08;简称AD&#xff09;的设计师来说&#xff0c;掌握正确的文件导出方法不仅能节省大量时间&#xff…...