Flink-时间窗口
在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一 般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的 窗口计算。所以窗口和时间往往是分不开的。
时间语义
1、处理时间(Processing Time)
处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。 在这种时间语义下处理窗口非常简单粗暴,不需要各个节点之间进行协调同步,也不需要 考虑数据在流中的位置,简单来说就是“我的地盘听我的”。所以处理时间是最简单的时间语义。
2、事件时间(Event Time)
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。 数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实 就是这条数据记录的“时间戳”(Timestamp)。
水位线
在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟, 用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数 据的时间戳来驱动的。
我们可以把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟 的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标 记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以 更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时 间(Event Time)进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点, 主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个 数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
- 水位线是基于数据的时间戳生成的
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
- 水位线可以通过设置延迟,来保证正确处理乱序数据
- 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之 前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据
水位线生成策略
在Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法: assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指 示事件时间。
有序流的水位线生成策略
object f1 {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//获取当前的运行配置//setAutoWatermarkInterval(时间戳)自动生成水位线的时间间隔env.getConfig.setAutoWatermarkInterval(500L)//数据val stream = env.fromElements(Event(4, "aa", 1000L),Event(5, "bb", 2000L),Event(6, "cc", 2500L),Event(7, "dd", 4000L))//设置水位线//1、有序流的水位线生成策略stream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(new SerializableTimestampAssigner[Event] {override def extractTimestamp(t: Event, l: Long): Long = t.time //指定字段中的time为时间戳}))//执行env.execute()}case class Event(id: Int, name: String, time: Long )
}
相关文章:
Flink-时间窗口
在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一 般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的 窗口计算。所以窗口和时间往往是分不开的。 时…...

软件设计模式原则(三)单一职责原则
单一职责原则(SRP)又称单一功能原则。它规定一个类应该只有一个发生变化的原因。所谓职责是指类变化的原因。如果一个类有多于一个的动机被改变,那么这个类就具有多于一个的职责。而单一职责原则就是指一个类或者模块应该有且只有一个改变的原…...

使用Postman创建Mock Server
这篇文章将教会大家如何利用 Postman,通过 Mock 的方式测试我们的 API。 什么是 Mock Mock 是一项特殊的测试技巧,可以在没有依赖项的情况下进行单元测试。通常情况下,Mock 与其他方法的主要区别就是,用于取代代码依赖项的模拟对…...

【古月居《ros入门21讲》学习笔记】15_ROS中的坐标系管理系统
目录 说明: 1. 机器人中的坐标变换 tf功能包能干什么? tf坐标变换如何实现 2. 小海龟跟随实验 安装 ros-melodic-turtle-tf 实验命令 运行效果 说明: 1. 本系列学习笔记基于B站:古月居《ROS入门21讲》课程,且使…...

初始linux:文件操作
目录 提示:以下指令均在Xshell 7 中进行 linux的理念 一、echo echo "字符串" 二、输出重定向 > > [文件] echo "字符串" > [文件] echo "字符串" > > [文件] 制作大文件 三、< 输入重定向与ca…...

iOS上传ipa使用可视化工具Transporter
文章目录 前言一、Transporter二、Appuploader三、iTMSTransporter总结 前言 最近为了让非开发人员上传IPA文件,特意找了一些方法,至于以前的ApplicationUploader已经不能用了,下面介绍两个工具可以上传IPA包。 一、Transporter 1、操作简单…...

解读《陆奇最新演讲实录—我的大模型世界观》
腾讯科技频道记者张小珺一篇《陆奇最新演讲实录—我的大模型世界观》刷爆朋友圈。文章知识点丰富、字里行间处处流淌着创业方法论和AI应用商机,含金量极高! PS:一家之言、不求苟同。如有不爽之处,欢迎来 找我。 腾讯新闻原文&am…...

ChatGPT到底是如何运作?
自从2022年11月30日发布以来,ChatGPT一直占据着科技届的头条位置,随着苹果的创新能力下降,ChatGPT不断给大家带来震撼,2023年11月7日,首届OpenAI开发者大会在洛杉矶举行,业界普遍认为,OpenAI的开…...
学习Java第57天,Servlet的基本使用步骤
步骤1 开发一个web类型的module 步骤2 开发一个UserServlet public class UserServlet extends HttpServlet {Overrideprotected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {// 获取请求中的参数String usern…...

解决:ValueError: must have exactly one of create/read/write/append mode
解决:ValueError: must have exactly one of create/read/write/append mode 文章目录 解决:ValueError: must have exactly one of create/read/write/append mode背景报错问题报错翻译报错位置代码报错原因解决方法今天的分享就到此结束了 背景 在使用…...
大数据-之LibrA数据库系统告警处理(ALM-37014 Gaussdb进程锁文件已经存在)
告警解释 当集群中的CN实例或者DN实例锁文件创建失败时,产生该告警。 告警属性 告警ID 告警级别 可自动清除 37014 严重 是 告警参数 参数名称 参数含义 ServiceName 产生告警的服务名称 RoleName 产生告警的角色名称 HostName 产生告警的主机名 I…...
STM32 基础知识
1. STM32微控制器的核心特性是什么? STM32微控制器是基于ARM Cortex-M 处理器 , 它具有高性能处理能力和低功耗的特性 , 适合用于嵌入式系统STM32系列具有多种多样的内存大小和丰富的内置外设选项,包括 多通道ADC , 定…...

JVM——产生内存溢出原因
目录 1.产生内存溢出原因一 :代码中的内存泄漏1.案例1:equals()和hashCode()导致的内存泄漏问题:**正常情况**:**异常情况:**解决方案: 2.案例2:内部类引用外部类问题:解决方案&…...
关于X86机器上运行GnuCobol的研究
1.安装GnuCobol 当前的稳定版本是 3.1.2,已经在各种平台上进行了广泛测试,并已投入商用。 下载地址为: https://phoenixnap.dl.sourceforge.net/project/gnucobol/gnucobol/3.1/gnucobol- 3.1.2.tar.bz2 1)上传压缩包至x86服务器; 2)通过tar -xvf gnucobol-3.1.2.tar.bz2…...
open与openat的区别
Linux 中的 open 和 openat 系统调用都用于打开文件,但它们有一些区别。 一、函数原型 open 系统调用的原型 #include <fcntl.h>int open(const char *pathname, int flags, mode_t mode);pathname 是要打开的文件路径flags 是打开文件的标志mode 是文件的…...

人工智能与供应链行业融合:预测算法的通用化与实战化
前言 「作者主页」:雪碧有白泡泡 「个人网站」:雪碧的个人网站 让我们一起深入探索人工智能与供应链的融合,以及预测算法在实际应用中的价值!🔍🚀 文章目录 前言供应链预测算法的基本流程统计学习模型与机…...

Cytoscape学习教程
写在前面 今天分享的内容是自己遇到问题后,咨询社群里面的同学,帮忙解决的总结。 关于Cytoscape,对于做组学或生物信息学的同学基本是陌生的,可能有的同学用这个软件作图是非常溜的,做出来的网络图也是十分的好看,“可玩性”很高,就像前面分享的aPEAR包一样aPEAR包绘制…...
computed和watch相关
Computed本质是一个具备缓存的watcher,依赖的属性发生变化就会更新视图。 适用于计算比较消耗性能的计算场景。当表达式过于复杂时,在模板中放入过多逻辑会让模板难以维护,可以将复杂的逻辑放入计算属性中处理 computed擅长处理:一…...

反思一次效能提升
前天与一个大佬交流。想起自己在6年多前在团队里做的一次小小的效能提升。 改进前 在同一个产品团队,同时有前端工程师和后端工程师。他们经常需要共同协作完成features。 前端是一个传统的多页应用。前端渲染是由后端的velocity模板引擎实现的。 打包后,…...
ElasticSearch之cat indices API
命令样例如下: curl -X GET "https://localhost:9200/_cat/indices?vtrue&pretty" --cacert $ES_HOME/config/certs/http_ca.crt -u "elastic:ohCxPHQBEs5*lo7F9"执行结果输出如下: health status index uuid …...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...

LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

el-switch文字内置
el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
Qt Http Server模块功能及架构
Qt Http Server 是 Qt 6.0 中引入的一个新模块,它提供了一个轻量级的 HTTP 服务器实现,主要用于构建基于 HTTP 的应用程序和服务。 功能介绍: 主要功能 HTTP服务器功能: 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...

微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...

如何应对敏捷转型中的团队阻力
应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中,明确沟通敏捷转型目的尤为关键,团队成员只有清晰理解转型背后的原因和利益,才能降低对变化的…...