当前位置: 首页 > news >正文

Flink之Watermark策略代码模板

方式作用
WatermarkStrategy.noWatermarks()不生成watermark
WatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略
WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略
WatermarkStrategy.forGenerator()自定义watermark生成策略
  • noWatermarks
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark生成策略,选择不生成watermarkWatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    关于noWaterMarks()的使用没有太多内容.
  • forMonotonousTimestamps
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用紧跟最大事件时间策略WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps()// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于forMonotonousTimestamps()可说内容并不多,如果选择了forMonotonousTimestamps这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
    通过源码内容可以看到forMonotonousTimestamps底层也是使用的forBoundedOutOfOrderness方式,只不过将容错时间设置为了0,源码如下:
    // 首先看这里,继承的BoundedOutOfOrdernessWatermarks
    public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {/** Creates a new watermark generator with for ascending timestamps. */public AscendingTimestampsWatermarks() {super(Duration.ofMillis(0)); // 这里将容错时间设置为了0}
    }
    
  • forBoundedOutOfOrderness
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用允许水位线乱序策略,并设置最大容错时间为2sWatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000))// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为2S,那么前后的数据差最大只能是2S,如果差值大于2S,后来的这条数据就会被抛弃.

相关文章:

Flink之Watermark策略代码模板

方式作用WatermarkStrategy.noWatermarks()不生成watermarkWatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略WatermarkStrategy.forGenerator()自定义watermark生成策略 …...

ubuntu 安装postgresql,增加VECTOR向量数据库插件 踏坑详细流程

PGSQL安装&#xff0c;删除&#xff0c;运行&#xff0c;修改密码流程 Ubuntu18.04安装与配置postgresql含远程连接教程&#xff08;含踩坑记录&#xff09;_sudo apt-get install postgresql-CSDN博客 详细安装流程以上博客&#xff0c;自己也记录下 安装vector扩展连接 声明…...

基于Springboot实现影视影院订票选座管理系统【项目源码+论文说明】分享

基于Springboot实现影视影院订票选座管理系统演示 摘要 本论文主要论述了如何使用JAVA语言开发一个影城管理系统 &#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论…...

mysql批量插入数据,跳过唯一索引报错

数据准备 DROP TABLE IF EXISTS user1; CREATE TABLE user1 ( id INT NOT NULL AUTO_INCREMENT, name VARCHAR(45) NULL, age INT(3) NOT NULL, PRIMARY KEY (id), UNIQUE INDEX u_name (name));insert into user1(name, age) values (zhangshan, 18), (lisi, 19);1. INSERT I…...

论文阅读--Energy efficiency in heterogeneous wireless access networks

异构无线接入网络的能源效率 论文信息&#xff1a;Navaratnarajah S, Saeed A, Dianati M, et al. Energy efficiency in heterogeneous wireless access networks[J]. IEEE wireless communications, 2013, 20(5): 37-43. I. ABSTRACT && INTRODUCTION 本文提出了无…...

Redis的C客户端(hiredis库)使用

文章目录 1、Ubuntu安装redis服务端2、hiredis库的安装3、同步API接口的使用3.1、连接redis数据库redisConnect3.2、发送需要执行的命令redisCommand3.3、redisCommandArgv函数3.4、redisAppendCommand*函数支持管道命令3.5、释放资源3.6、同步连接代码 3.7、异步连接4、redis连…...

光引擎、光模块、光器件之间的关系和区别

最近小编有收到一些用户问“光引擎、光模块、光器件之间的关系和区别&#xff1f;”&#xff0c;众所周知光通信技术一直在不断演进&#xff0c;为满足不断增长的数据传输需求提供了强大的解决方案。而光通信系统中&#xff0c;光引擎、光模块和光器件是关键的组成部分&#xf…...

【办公-excel】两个时间相减 (二) - 带毫秒的时间进行相减操作

一、使用内部函数 1.1 效果展示 TEXT(((RIGHT(TEXT(B2,"yyyy-mm-dd hh:mm:ss.000"),LEN(TEXT(B2,"yyyy-mm-dd hh:mm:ss.000"))-FIND(".",TEXT(B2,"yyyy-mm-dd hh:mm:ss.000")))-RIGHT(TEXT(A2,"yyyy-mm-dd hh:mm:ss.000"),…...

二次封装View Design的table组件,实现宽度自适应,内容在一行展示

由于table组件本身并不支持宽度自适应&#xff0c;但实际项目需要&#xff0c;而且多处有用到table组件&#xff0c;所以尝试着自己来二次封装一下组件 想法 刚开始的想法很简单&#xff0c;就是获取每一列中数据和标题在表格中的长度&#xff0c;然后将当中最大的长度作为该列…...

Node.js代码漏洞扫描工具介绍——npm audit

npm audit 运行安全检查 主要作用&#xff1a;检查命令将项目中配置的依赖项的描述提交到默认注册中心&#xff0c;并要求报告已知漏洞。如果发现任何漏洞&#xff0c;则将计算影响和适当的补救措施。如果 fix 提供了参数&#xff0c;则将对包树应用补救措施。 具体参考&#x…...

node.js知识系列(3)-每天了解一点

目录 1. Express.js 中的中间件2. 处理路由和请求3. RESTful 路由4. 身份验证和授权5. 视图引擎6. 错误处理中间件7. 文件上传处理8. Cookie 和 Session 管理9. 路由参数和查询参数10. 处理跨域请求&#xff08;CORS&#xff09; &#x1f44d; 点赞&#xff0c;你的认可是我创…...

Zabbix监控系统 自定义监控项、自动发现与自动注册

Zabbix监控系统 自定义监控项、自动发现与自动注册 一、自定义监控内容部署实例二、zabbix 自动发现与自动注册部署实例2.1 部署zabbix自动发现 一、自定义监控内容部署实例 案列&#xff1a;自定义监控客户端服务器登录的人数 需求&#xff1a;限制登录人数不超过 3 个&#…...

Python信号之分享

在了解了Linux的信号基础之后&#xff0c;Python标准库中的signal包就很容易学习和理解。signal包负责在Python程序内部处理信号&#xff0c;典型的操作包括预设信号处理函数&#xff0c;暂停并等待信号&#xff0c;以及定时发出SIGALRM等。要注意&#xff0c;signal包主要是针…...

环信web、uniapp、微信小程序SDK报错详解---登录篇

项目场景&#xff1a; 记录对接环信sdk时遇到的一系列问题&#xff0c;总结一下避免大家再次踩坑。这里主要针对于web、uniapp、微信小程序在对接环信sdk时遇到的问题。主要针对报错400、404、401、40 (一) 登录用户报400 原因分析&#xff1a; 从console控制台输出及networ…...

DAZ To UMA⭐五.模型在Blender中的配置教程

文章目录 🟥 创建符合UMA的材质球属性1️⃣ 合并材质球🎁 选择材质球🎁 合并材质球🎁 删除多余材质球2️⃣ 将身体按材质球拆分🎁 进入身体编辑模式🎁 全选身体🎁 按材质分割身体🎁 重命名不同部位3️⃣ 将其余部位进行拆分🟧 更正选择缩放🟩 更新骨骼结构…...

网络安全工具汇总

网络安全工具汇总 1. 前言1.1. 工具提供 2. 漏洞库3. 杂项3.1. topology-scanner3.2. MDUT3.3. 404 4. 插件工具4.1. 浏览器插件4.1.1. Heimdallr4.1.2. HackTools4.1.3. SwitchyOmega4.1.4. fofa_view4.1.5. mitaka 4.2. CS插件4.2.1. taowu-cobalt_strike4.2.2. OLa4.2.3. Z1…...

day-65 代码随想录算法训练营(19)图论 part 04

463.岛屿的周长 分析&#xff1a; 1.陆地的旁边是海面&#xff0c;存在周长2.陆地在边界上&#xff0c;存在周长 思路一&#xff1a;深度优先遍历 1.通过记录访问情况来访问数据 class Solution { public:int direct[4][2]{{0,1},{0,-1},{1,0},{-1,0}};int res0;void dfs(…...

C++ - 完美语义(右值引用的中篇) - lambda表达式

前言 之前对右值引用的理解&#xff0c;用使用场景做了详细说明&#xff0c;具体看博客&#xff1a;C - 右值引用 和 移动拷贝-CSDN博客 在 有值引用 当中还有一个 完美转发&#xff0c;请看本篇博客。 完美转发 我们现在看这个例子&#xff1a; void Fun(int& x) { …...

常见排序算法详解

目录 排序的相关概念 排序&#xff1a; 稳定性&#xff1a; 内部排序&#xff1a; 外部排序&#xff1a; 常见的排序&#xff1a; 常见排序算法的实现 插入排序&#xff1a; 基本思想&#xff1a; 直…...

监控搭建-Prometheus

监控搭建-Prometheus 1、背景2、目标3、选型4、Prometheus4.1、介绍4.2、架构4.3、构件4.4、运行机制4.5、环境介绍4.6、数据准备4.7、网络策略4.7.1、主机端口放行4.7.2、设备端口放行 4.8、部署4.9、验证4.10、配置 1、背景 随着项目信息化进程的推进&#xff0c;操作系统、…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

P3 QT项目----记事本(3.8)

3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

MySQL 部分重点知识篇

一、数据库对象 1. 主键 定义 &#xff1a;主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 &#xff1a;确保数据的完整性&#xff0c;便于数据的查询和管理。 示例 &#xff1a;在学生信息表中&#xff0c;学号可以作为主键&#xff…...

关于easyexcel动态下拉选问题处理

前些日子突然碰到一个问题&#xff0c;说是客户的导入文件模版想支持部分导入内容的下拉选&#xff0c;于是我就找了easyexcel官网寻找解决方案&#xff0c;并没有找到合适的方案&#xff0c;没办法只能自己动手并分享出来&#xff0c;针对Java生成Excel下拉菜单时因选项过多导…...