当前位置: 首页 > news >正文

Flink之Watermark策略代码模板

方式作用
WatermarkStrategy.noWatermarks()不生成watermark
WatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略
WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略
WatermarkStrategy.forGenerator()自定义watermark生成策略
  • noWatermarks
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark生成策略,选择不生成watermarkWatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    关于noWaterMarks()的使用没有太多内容.
  • forMonotonousTimestamps
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用紧跟最大事件时间策略WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps()// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于forMonotonousTimestamps()可说内容并不多,如果选择了forMonotonousTimestamps这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
    通过源码内容可以看到forMonotonousTimestamps底层也是使用的forBoundedOutOfOrderness方式,只不过将容错时间设置为了0,源码如下:
    // 首先看这里,继承的BoundedOutOfOrdernessWatermarks
    public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {/** Creates a new watermark generator with for ascending timestamps. */public AscendingTimestampsWatermarks() {super(Duration.ofMillis(0)); // 这里将容错时间设置为了0}
    }
    
  • forBoundedOutOfOrderness
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用允许水位线乱序策略,并设置最大容错时间为2sWatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000))// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为2S,那么前后的数据差最大只能是2S,如果差值大于2S,后来的这条数据就会被抛弃.

相关文章:

Flink之Watermark策略代码模板

方式作用WatermarkStrategy.noWatermarks()不生成watermarkWatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略WatermarkStrategy.forGenerator()自定义watermark生成策略 …...

ubuntu 安装postgresql,增加VECTOR向量数据库插件 踏坑详细流程

PGSQL安装&#xff0c;删除&#xff0c;运行&#xff0c;修改密码流程 Ubuntu18.04安装与配置postgresql含远程连接教程&#xff08;含踩坑记录&#xff09;_sudo apt-get install postgresql-CSDN博客 详细安装流程以上博客&#xff0c;自己也记录下 安装vector扩展连接 声明…...

基于Springboot实现影视影院订票选座管理系统【项目源码+论文说明】分享

基于Springboot实现影视影院订票选座管理系统演示 摘要 本论文主要论述了如何使用JAVA语言开发一个影城管理系统 &#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论…...

mysql批量插入数据,跳过唯一索引报错

数据准备 DROP TABLE IF EXISTS user1; CREATE TABLE user1 ( id INT NOT NULL AUTO_INCREMENT, name VARCHAR(45) NULL, age INT(3) NOT NULL, PRIMARY KEY (id), UNIQUE INDEX u_name (name));insert into user1(name, age) values (zhangshan, 18), (lisi, 19);1. INSERT I…...

论文阅读--Energy efficiency in heterogeneous wireless access networks

异构无线接入网络的能源效率 论文信息&#xff1a;Navaratnarajah S, Saeed A, Dianati M, et al. Energy efficiency in heterogeneous wireless access networks[J]. IEEE wireless communications, 2013, 20(5): 37-43. I. ABSTRACT && INTRODUCTION 本文提出了无…...

Redis的C客户端(hiredis库)使用

文章目录 1、Ubuntu安装redis服务端2、hiredis库的安装3、同步API接口的使用3.1、连接redis数据库redisConnect3.2、发送需要执行的命令redisCommand3.3、redisCommandArgv函数3.4、redisAppendCommand*函数支持管道命令3.5、释放资源3.6、同步连接代码 3.7、异步连接4、redis连…...

光引擎、光模块、光器件之间的关系和区别

最近小编有收到一些用户问“光引擎、光模块、光器件之间的关系和区别&#xff1f;”&#xff0c;众所周知光通信技术一直在不断演进&#xff0c;为满足不断增长的数据传输需求提供了强大的解决方案。而光通信系统中&#xff0c;光引擎、光模块和光器件是关键的组成部分&#xf…...

【办公-excel】两个时间相减 (二) - 带毫秒的时间进行相减操作

一、使用内部函数 1.1 效果展示 TEXT(((RIGHT(TEXT(B2,"yyyy-mm-dd hh:mm:ss.000"),LEN(TEXT(B2,"yyyy-mm-dd hh:mm:ss.000"))-FIND(".",TEXT(B2,"yyyy-mm-dd hh:mm:ss.000")))-RIGHT(TEXT(A2,"yyyy-mm-dd hh:mm:ss.000"),…...

二次封装View Design的table组件,实现宽度自适应,内容在一行展示

由于table组件本身并不支持宽度自适应&#xff0c;但实际项目需要&#xff0c;而且多处有用到table组件&#xff0c;所以尝试着自己来二次封装一下组件 想法 刚开始的想法很简单&#xff0c;就是获取每一列中数据和标题在表格中的长度&#xff0c;然后将当中最大的长度作为该列…...

Node.js代码漏洞扫描工具介绍——npm audit

npm audit 运行安全检查 主要作用&#xff1a;检查命令将项目中配置的依赖项的描述提交到默认注册中心&#xff0c;并要求报告已知漏洞。如果发现任何漏洞&#xff0c;则将计算影响和适当的补救措施。如果 fix 提供了参数&#xff0c;则将对包树应用补救措施。 具体参考&#x…...

node.js知识系列(3)-每天了解一点

目录 1. Express.js 中的中间件2. 处理路由和请求3. RESTful 路由4. 身份验证和授权5. 视图引擎6. 错误处理中间件7. 文件上传处理8. Cookie 和 Session 管理9. 路由参数和查询参数10. 处理跨域请求&#xff08;CORS&#xff09; &#x1f44d; 点赞&#xff0c;你的认可是我创…...

Zabbix监控系统 自定义监控项、自动发现与自动注册

Zabbix监控系统 自定义监控项、自动发现与自动注册 一、自定义监控内容部署实例二、zabbix 自动发现与自动注册部署实例2.1 部署zabbix自动发现 一、自定义监控内容部署实例 案列&#xff1a;自定义监控客户端服务器登录的人数 需求&#xff1a;限制登录人数不超过 3 个&#…...

Python信号之分享

在了解了Linux的信号基础之后&#xff0c;Python标准库中的signal包就很容易学习和理解。signal包负责在Python程序内部处理信号&#xff0c;典型的操作包括预设信号处理函数&#xff0c;暂停并等待信号&#xff0c;以及定时发出SIGALRM等。要注意&#xff0c;signal包主要是针…...

环信web、uniapp、微信小程序SDK报错详解---登录篇

项目场景&#xff1a; 记录对接环信sdk时遇到的一系列问题&#xff0c;总结一下避免大家再次踩坑。这里主要针对于web、uniapp、微信小程序在对接环信sdk时遇到的问题。主要针对报错400、404、401、40 (一) 登录用户报400 原因分析&#xff1a; 从console控制台输出及networ…...

DAZ To UMA⭐五.模型在Blender中的配置教程

文章目录 🟥 创建符合UMA的材质球属性1️⃣ 合并材质球🎁 选择材质球🎁 合并材质球🎁 删除多余材质球2️⃣ 将身体按材质球拆分🎁 进入身体编辑模式🎁 全选身体🎁 按材质分割身体🎁 重命名不同部位3️⃣ 将其余部位进行拆分🟧 更正选择缩放🟩 更新骨骼结构…...

网络安全工具汇总

网络安全工具汇总 1. 前言1.1. 工具提供 2. 漏洞库3. 杂项3.1. topology-scanner3.2. MDUT3.3. 404 4. 插件工具4.1. 浏览器插件4.1.1. Heimdallr4.1.2. HackTools4.1.3. SwitchyOmega4.1.4. fofa_view4.1.5. mitaka 4.2. CS插件4.2.1. taowu-cobalt_strike4.2.2. OLa4.2.3. Z1…...

day-65 代码随想录算法训练营(19)图论 part 04

463.岛屿的周长 分析&#xff1a; 1.陆地的旁边是海面&#xff0c;存在周长2.陆地在边界上&#xff0c;存在周长 思路一&#xff1a;深度优先遍历 1.通过记录访问情况来访问数据 class Solution { public:int direct[4][2]{{0,1},{0,-1},{1,0},{-1,0}};int res0;void dfs(…...

C++ - 完美语义(右值引用的中篇) - lambda表达式

前言 之前对右值引用的理解&#xff0c;用使用场景做了详细说明&#xff0c;具体看博客&#xff1a;C - 右值引用 和 移动拷贝-CSDN博客 在 有值引用 当中还有一个 完美转发&#xff0c;请看本篇博客。 完美转发 我们现在看这个例子&#xff1a; void Fun(int& x) { …...

常见排序算法详解

目录 排序的相关概念 排序&#xff1a; 稳定性&#xff1a; 内部排序&#xff1a; 外部排序&#xff1a; 常见的排序&#xff1a; 常见排序算法的实现 插入排序&#xff1a; 基本思想&#xff1a; 直…...

监控搭建-Prometheus

监控搭建-Prometheus 1、背景2、目标3、选型4、Prometheus4.1、介绍4.2、架构4.3、构件4.4、运行机制4.5、环境介绍4.6、数据准备4.7、网络策略4.7.1、主机端口放行4.7.2、设备端口放行 4.8、部署4.9、验证4.10、配置 1、背景 随着项目信息化进程的推进&#xff0c;操作系统、…...

Golang dig框架与GraphQL的完美结合

将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用&#xff0c;可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器&#xff0c;能够帮助开发者更好地管理复杂的依赖关系&#xff0c;而 GraphQL 则是一种用于 API 的查询语言&#xff0c;能够提…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)

🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

Redis数据倾斜问题解决

Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中&#xff0c;部分节点存储的数据量或访问量远高于其他节点&#xff0c;导致这些节点负载过高&#xff0c;影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

2025季度云服务器排行榜

在全球云服务器市场&#xff0c;各厂商的排名和地位并非一成不变&#xff0c;而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势&#xff0c;对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析&#xff1a; 一、全球“三巨头”…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

20个超级好用的 CSS 动画库

分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码&#xff0c;而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库&#xff0c;可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画&#xff0c;可以包含在你的网页或应用项目中。 3.An…...

深入浅出Diffusion模型:从原理到实践的全方位教程

I. 引言&#xff1a;生成式AI的黎明 – Diffusion模型是什么&#xff1f; 近年来&#xff0c;生成式人工智能&#xff08;Generative AI&#xff09;领域取得了爆炸性的进展&#xff0c;模型能够根据简单的文本提示创作出逼真的图像、连贯的文本&#xff0c;乃至更多令人惊叹的…...

ArcGIS Pro+ArcGIS给你的地图加上北回归线!

今天来看ArcGIS Pro和ArcGIS中如何给制作的中国地图或者其他大范围地图加上北回归线。 我们将在ArcGIS Pro和ArcGIS中一同介绍。 1 ArcGIS Pro中设置北回归线 1、在ArcGIS Pro中初步设置好经纬格网等&#xff0c;设置经线、纬线都以10间隔显示。 2、需要插入背会归线&#xf…...