Flink -- window(窗口)
1、窗口主要分成三大种:
1、Time Window (时间窗口):固定时间触发一次窗口
a、SlidingEventTimeWindows: 滑动的事件时间窗口
public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时间窗口:由时间触发的窗口* SlidingEventTimeWindows: 滑动的事件时间窗口* SlidingProcessingTimeWindows:滑动的处理时间窗口* TumblingEventTimeWindows:滚动的事件时间窗口* TumblingProcessingTimeWindows:滚动的处理时间窗口*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析数据,取出时间字段DataStream<Tuple2<String, Long>> wordAndTsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];//将时间戳转换成long类型long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//水位线前移1秒.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((kv, ts) -> kv.f1));/*** 每隔5秒统计最近10秒单词的数量*/SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));kvDS.keyBy(kv -> kv.f0)//滑动的事件时间窗口.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1).print();env.execute();}
}
b、SlidingProcessingTimeWindows:滑动的处理时间窗口
public class Demo03ProcessingTime {public static void main(String[] args) throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}
c、TumblingEventTimeWindows:滚动的事件时间窗口
kvDS.keyBy(kv -> kv.f0)
//滚动的事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();
d、TumblingProcessingTimeWindows:滚动的处理时间窗口
kvDS.keyBy(kv -> kv.f0)//滚动的处理时间窗口:
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();
2、Session Window (会话窗口):如果一段时间没有数据就会生成一个窗口,将前面的数据拉去过来一起计算。
1、 ProcessingTimeSessionWindows: 处理时间的会话窗口,是针对每一个key都会统计他的数量。
2、EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)(使用的比较少)
public class Demo3SessionWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** ProcessingTimeSessionWindows: 处理时间的会话窗口* EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)**/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//某一个key如果5秒没有数据产生,将前面的数据放一起进行计算.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1).print();env.execute();}
}
3、Count Window (统计窗口):固定的数据量计算一次
1、countWindow(10): 滚动的统计窗口
2、countWindow(10,2):滑动的统计窗口
public class Demo2CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** 实时统计单词的数量,每10条数据统计一次* .countWindow(10): 滚动的统计窗口* .countWindow(10,2):滑动的统计窗口*/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//每隔10条数据计算一次,同一个key每隔10条计算一次.countWindow(10).sum(1).print();env.execute();}
}
注意:对于事件时间,需要指定时间字段和水位线,处理时间不需要指定。
相关文章:
Flink -- window(窗口)
1、窗口主要分成三大种: 1、Time Window (时间窗口):固定时间触发一次窗口 a、SlidingEventTimeWindows: 滑动的事件时间窗口 public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时…...
原语:串并转换器
串并转换器OSERDESE2 可被Select IO IP核调用。 OSERDESE2允许DDR功能 参考: FPGA原语学习与整理第二弹,OSERDESE2串并转换器 - 知乎 (zhihu.com) 正点原子。 ISERDESE2原语和OSERDESE2原语是串并转换器,他的的功能都是实现串行数据和并行…...
没网络也能安装.Net 3.5!如何脱机安装.NET Framework 3.5
.NET框架是由微软制定的一个软件框架。它有助于在Windows上运行控制台、Web或移动应用程序。此有用的工具适用于Windows设备。 如何脱机安装.NET Framework 3.5 如果你拥有Windows 10、8、8.1或7,有时第三方软件可能会导致问题。你可能会在图片中看到这样的问题。 看这张照片…...
JVM运行时数据区-虚拟机栈
目录 一、内存中的栈 二、基本内容 三、优点 四、栈的存储单位 五、栈运行原理 六、栈的内部结构 (一)局部变量表 (二)操作数栈 (三)动态链接 (四)方法返回地址 …...
Java中介者模式
目录 定义 结构 案例 优点 缺点 使用场景 定义 又叫调停模式,定义一个中介角色来封装一系列对象之间的交互,使原有对象之间的耦合松散,且可以独立地改变它们之间的交互。 结构 中介者模式包含以下主要角色: 抽象中介者角…...
前端框架Vue学习 ——(五)前端工程化Vue-cli脚手架
文章目录 Vue-cliVue项目-创建Vue项目-目录结构Vue项目-启动Vue项目-配置端口Vue项目开发流程 Vue-cli 介绍:Vue-cli 是 Vue 官方提供的一个脚手架,用于快速生成一个 Vue 的项目模版 安装 NodeJS安装 Vue-cli npm install -g vue/cliVue项目-创建 图…...
App备案-iOS云管理式证书 Distribution Managed 公钥及证书SHA-1指纹的获取方法
根据近日工业和信息化部发布的《工业和信息化部关于开展移动互联网应用程序备案工作的通知》,相信不少要进行IOS平台App备案的朋友遇到了一个问题,就是apple不提供云管理式证书的下载,也就无法获取公钥及证书SHA-1指纹。 已经上架的应用不想重…...
Spring -Spring之依赖注入源码解析
依赖注入底层原理流程图:Spring中Bean的依赖注入原理| ProcessOn免费在线作图,在线流程图,在线思维导图 Spring中到底有几种依赖注入的方式? 首先分两种: 手动注入自动注入 手动注入 在XML中定义Bean时,就是手动注入…...
Spire.Office for .NET 8.10.2 同步更新-Crk
Spire.Office for .NET是 E-iceblue 提供的企业级 Office .NET API 的组合。它包括Spire.Doc、Spire.XLS、Spire.Spreadsheet、Spire.Presentation、Spire.PDF、Spire.DataExport、Spire.OfficeViewer、Spire.PDFViewer、Spire.DocViewer、Spire.Barcode和Spire.Email。Spire.O…...
MFC 基础篇(一)
目录 一.SDK编程 二.为什么要学MFC? 三.MFC能做什么? 四.MFC开发环境搭建 五.MFC项目创建 六.消息映射机制 一.SDK编程 Application Programming Interface 应用程序编程接口。 Software Development Kit 软件开发工具包,一般会包括A…...
Android技术-修改SO导出符号
背景 经常在使用第三方SDK的时候会莫名其妙报错,其中最常见的一种就是SO符号冲突,比如libA.so静态链接了libC.a,而libB.so动态链接了libC.so。这样便会导致符号冲突。又或者在使用不同版本的动态库,也会造成符号冲突。 报错案例 案例1 DEB…...
flutter 打包apk
Flutter项目打包生成APK_flutter打包apk_文阿花的博客-CSDN博客 关于iconData可能出现的错误: flutter build apk 打包报错调试过程 - 掘金 (juejin.cn) 使用命令行:flutter build apk --no-tree-shake-icons...
Halcon如何使用SaperaLT库连接dalsa相机
halcon安装好的时候,没有带SaperaLT的采集库,需要额外在Halcon官网下载此库。 以下是halcon官网下载此库的链接。官网需要注册才可以下载。 https://www.mvtec.com/downloads/interfaces?tx_mvtecproduct_extensiondownloadlist%5Bfilter%5D%5B0%5Dma…...
Vue 嵌套路由 多级路由规则
套娃路由 routes:[{path: /login,component: Login},{path: /user,component: User,children:[{ path: test, component: Test },{ path: test2, component: Test2 },]}]子路由不需要加/ 在父组件 子路由不需要加/ 需要带上父亲的路由路径 <router-link to"user/test…...
pandas教程:Introduction to pandas Data Structures pandas的数据结构
文章目录 Chapter 5 Getting Started with pandas5.1 Introduction to pandas Data Structures1 Series2 DataFrame3 Index Objects (索引对象) Chapter 5 Getting Started with pandas 这样导入pandas: import pandas as pde:\python3.7\lib\site-packages\numpy…...
MinIO 分布式文件(对象)存储
简介 MinIO是高性能、可扩展、云原生支持、操作简单、开源的分布式对象存储产品。 在中国:阿里巴巴、腾讯、百度、中国联通、华为、中国移动等等9000多家企业也都在使用MinIO产品 官网地址:http://www.minio.org.cn/ 下载 官网下载(8.4.3版本)&#x…...
HTML表单标签
## HTML标签:表单标签 * 表单: * 概念:用于采集用户输入的数据的。用于和服务器进行交互。 * form:用于定义表单的。可以定义一个范围,范围代表采集用户数据的范围 * 属性࿱…...
【黑马程序员】SpringCloud——Eureka
文章目录 前言一、提供者与消费者1. 服务调用关系 二、远程调用的问题三、eureka 原理分析1. eureka 的作用 四、Eureka 案例1. 搭建 eureka 服务1. 服务注册1.1 注册 user-service1.2 启动 user-service3. order-service 完成服务注册 3. 服务发现1. 在 order-service 完成服务…...
目标跟踪(DeepSORT)
本文首先将介绍在目标跟踪任务中常用的匈牙利算法(Hungarian Algorithm)和卡尔曼滤波(Kalman Filter),然后介绍经典算法DeepSORT的工作流程以及对相关源码进行解析。 目前主流的目标跟踪算法都是基于Tracking-by-Detec…...
2 任务2: 使用趋动云GPU进行猫狗识别实践
使用趋动云GPU进行猫狗识别实践 1 创建项目2 初始化开发环境3 调试代码4 提交离线任务5 结果集存储与下载 使用趋动云提供的免费GPU,进行猫狗识别实践。 虽然例程里面提供的是基于tensorflow的,但是你也可以使用pytorch的代码 使用这个平台的一个优点就是…...
多车环境下车载毫米波雷达是否会相互干扰?
在汽车工业迈向智能化与自动化的进程中,毫米波雷达已然成为了车辆感知体系中不可或缺的一部分。这种波长介于1毫米至10毫米之间的电磁波进行探测的装置,凭借其能够穿透雨雪、浓雾及强光直射的全天候工作能力,为高级驾驶辅助系统提供了关键的距…...
计算机毕业设计:Python汽车销量智能可视化与预测系统 Flask框架 可视化 机器学习 AI 大模型 大数据(建议收藏)✅
博主介绍:✌全网粉丝50W,前互联网大厂软件研发、集结硕博英豪成立软件开发工作室,专注于计算机相关专业项目实战6年之久,累计开发项目作品上万套。凭借丰富的经验与专业实力,已帮助成千上万的学生顺利毕业,…...
Java验证数组中的字符串是否对称,只判断字母和数字,忽略大小写
1、Java验证数组中的字符串是否对称,忽略大小写public class Main {public static void main(String[] args) {String[] strings {"A manm, a plan, a canal, Panama", "Madam", "12321", "12345"};findPalindromicAlphan…...
微软老员工称部分“被更新损坏“的电脑实际早已注定失败
据微软资深工程师雷蒙德陈表示,微软的系统更新并非总是客户设备损坏的罪魁祸首。有时这些设备早已存在问题,只是客户在补丁星期二重启尝试导致系统无法启动之前没有注意到。更新背后的真相陈在文章中写道:"我在企业产品支持部门的同事们…...
终极TypeScript类型安全指南:LiveTerm接口定义与类型检查最佳实践
终极TypeScript类型安全指南:LiveTerm接口定义与类型检查最佳实践 【免费下载链接】LiveTerm 💻 Build terminal styled websites in minutes! 项目地址: https://gitcode.com/gh_mirrors/li/LiveTerm LiveTerm是一个基于Next.js的终端风格网站构…...
3分钟上手Hysteria2:从安装到连接的超简单教程
3分钟上手Hysteria2:从安装到连接的超简单教程 Hysteria2是一款高效的网络加速工具,通过一键安装脚本即可快速部署,特别适合新手用户。本教程将带你在3分钟内完成从安装到连接的全过程,让你轻松享受高速网络体验。 准备工作&#…...
你的Bootloader安全吗?给STM32F103的Ymodem升级加上AES加密和CRC32校验(附完整代码)
STM32F103 Bootloader安全加固实战:AES加密与CRC32校验的Ymodem升级方案 在物联网设备快速普及的今天,固件升级已成为设备维护的常规操作。然而,传统Ymodem协议在传输安全性方面的不足,使得固件在传输过程中面临被窃取或篡改的风险…...
EVA-01应用实战:5个场景教你用Qwen2.5-VL处理工作学习中的图片难题
EVA-01应用实战:5个场景教你用Qwen2.5-VL处理工作学习中的图片难题 1. 引言:当视觉理解遇上机甲美学 想象一下,你正在处理一份满是手写笔记的文档照片,或者需要快速理解一张复杂的数据图表。传统方法可能需要你手动输入文字、反…...
Keil魔术棒设置详解:为什么你的printf在STM32上不工作?
Keil魔术棒设置详解:为什么你的printf在STM32上不工作? 调试STM32项目时,printf输出功能突然失效是许多开发者遇到的经典问题。明明代码逻辑正确,串口硬件也正常,为什么控制台就是一片寂静?这通常与Keil开…...
AI辅助开发C语言项目,让快马平台智能生成学生成绩管理系统
最近尝试用AI辅助开发一个C语言的学生成绩管理系统,整个过程比想象中顺利很多。这个项目虽然不算复杂,但涉及模块化设计、文件操作、指针管理等知识点,正好可以验证AI在辅助开发中的实际效果。下面分享我的具体实践过程: 需求分析…...
