当前位置: 首页 > news >正文

Flink -- window(窗口)

1、窗口主要分成三大种:
        1、Time Window (时间窗口):固定时间触发一次窗口

                a、SlidingEventTimeWindows: 滑动的事件时间窗口

public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时间窗口:由时间触发的窗口* SlidingEventTimeWindows: 滑动的事件时间窗口* SlidingProcessingTimeWindows:滑动的处理时间窗口* TumblingEventTimeWindows:滚动的事件时间窗口* TumblingProcessingTimeWindows:滚动的处理时间窗口*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析数据,取出时间字段DataStream<Tuple2<String, Long>> wordAndTsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];//将时间戳转换成long类型long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//水位线前移1秒.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((kv, ts) -> kv.f1));/*** 每隔5秒统计最近10秒单词的数量*/SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));kvDS.keyBy(kv -> kv.f0)//滑动的事件时间窗口.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1).print();env.execute();}
}

                b、SlidingProcessingTimeWindows:滑动的处理时间窗口

public class Demo03ProcessingTime {public static void main(String[] args)  throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}

                c、TumblingEventTimeWindows:滚动的事件时间窗口

kvDS.keyBy(kv -> kv.f0) 
//滚动的事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();

                d、TumblingProcessingTimeWindows:滚动的处理时间窗口

kvDS.keyBy(kv -> kv.f0)//滚动的处理时间窗口: 
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();
        2、Session Window (会话窗口):如果一段时间没有数据就会生成一个窗口,将前面的数据拉去过来一起计算。

        1、 ProcessingTimeSessionWindows: 处理时间的会话窗口,是针对每一个key都会统计他的数量。

        2、EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)(使用的比较少)

public class Demo3SessionWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** ProcessingTimeSessionWindows: 处理时间的会话窗口* EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)**/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//某一个key如果5秒没有数据产生,将前面的数据放一起进行计算.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1).print();env.execute();}
}
        3、Count Window (统计窗口):固定的数据量计算一次

                1、countWindow(10): 滚动的统计窗口
                 2、countWindow(10,2):滑动的统计窗口

public class Demo2CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** 实时统计单词的数量,每10条数据统计一次*  .countWindow(10): 滚动的统计窗口*  .countWindow(10,2):滑动的统计窗口*/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//每隔10条数据计算一次,同一个key每隔10条计算一次.countWindow(10).sum(1).print();env.execute();}
}

注意:对于事件时间,需要指定时间字段和水位线,处理时间不需要指定。

相关文章:

Flink -- window(窗口)

1、窗口主要分成三大种&#xff1a; 1、Time Window &#xff08;时间窗口&#xff09;&#xff1a;固定时间触发一次窗口 a、SlidingEventTimeWindows: 滑动的事件时间窗口 public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时…...

原语:串并转换器

串并转换器OSERDESE2 可被Select IO IP核调用。 OSERDESE2允许DDR功能 参考&#xff1a; FPGA原语学习与整理第二弹&#xff0c;OSERDESE2串并转换器 - 知乎 (zhihu.com) 正点原子。 ISERDESE2原语和OSERDESE2原语是串并转换器&#xff0c;他的的功能都是实现串行数据和并行…...

没网络也能安装.Net 3.5!如何脱机安装.NET Framework 3.5

.NET框架是由微软制定的一个软件框架。它有助于在Windows上运行控制台、Web或移动应用程序。此有用的工具适用于Windows设备。 如何脱机安装.NET Framework 3.5 如果你拥有Windows 10、8、8.1或7,有时第三方软件可能会导致问题。你可能会在图片中看到这样的问题。 看这张照片…...

JVM运行时数据区-虚拟机栈

目录 一、内存中的栈 二、基本内容 三、优点 四、栈的存储单位 五、栈运行原理 六、栈的内部结构 &#xff08;一&#xff09;局部变量表 &#xff08;二&#xff09;操作数栈 &#xff08;三&#xff09;动态链接 &#xff08;四&#xff09;方法返回地址 &#xf…...

Java中介者模式

目录 定义 结构 案例 优点 缺点 使用场景 定义 又叫调停模式&#xff0c;定义一个中介角色来封装一系列对象之间的交互&#xff0c;使原有对象之间的耦合松散&#xff0c;且可以独立地改变它们之间的交互。 结构 中介者模式包含以下主要角色&#xff1a; 抽象中介者角…...

前端框架Vue学习 ——(五)前端工程化Vue-cli脚手架

文章目录 Vue-cliVue项目-创建Vue项目-目录结构Vue项目-启动Vue项目-配置端口Vue项目开发流程 Vue-cli 介绍&#xff1a;Vue-cli 是 Vue 官方提供的一个脚手架&#xff0c;用于快速生成一个 Vue 的项目模版 安装 NodeJS安装 Vue-cli npm install -g vue/cliVue项目-创建 图…...

App备案-iOS云管理式证书 Distribution Managed 公钥及证书SHA-1指纹的获取方法

根据近日工业和信息化部发布的《工业和信息化部关于开展移动互联网应用程序备案工作的通知》&#xff0c;相信不少要进行IOS平台App备案的朋友遇到了一个问题&#xff0c;就是apple不提供云管理式证书的下载&#xff0c;也就无法获取公钥及证书SHA-1指纹。 已经上架的应用不想重…...

Spring -Spring之依赖注入源码解析

依赖注入底层原理流程图&#xff1a;Spring中Bean的依赖注入原理| ProcessOn免费在线作图,在线流程图,在线思维导图 Spring中到底有几种依赖注入的方式&#xff1f; 首先分两种&#xff1a; 手动注入自动注入 手动注入 在XML中定义Bean时&#xff0c;就是手动注入&#xf…...

Spire.Office for .NET 8.10.2 同步更新-Crk

Spire.Office for .NET是 E-iceblue 提供的企业级 Office .NET API 的组合。它包括Spire.Doc、Spire.XLS、Spire.Spreadsheet、Spire.Presentation、Spire.PDF、Spire.DataExport、Spire.OfficeViewer、Spire.PDFViewer、Spire.DocViewer、Spire.Barcode和Spire.Email。Spire.O…...

MFC 基础篇(一)

目录 一.SDK编程 二.为什么要学MFC&#xff1f; 三.MFC能做什么&#xff1f; 四.MFC开发环境搭建 五.MFC项目创建 六.消息映射机制 一.SDK编程 Application Programming Interface 应用程序编程接口。 Software Development Kit 软件开发工具包&#xff0c;一般会包括A…...

Android技术-修改SO导出符号

背景 经常在使用第三方SDK的时候会莫名其妙报错&#xff0c;其中最常见的一种就是SO符号冲突&#xff0c;比如libA.so静态链接了libC.a,而libB.so动态链接了libC.so。这样便会导致符号冲突。又或者在使用不同版本的动态库&#xff0c;也会造成符号冲突。 报错案例 案例1 DEB…...

flutter 打包apk

Flutter项目打包生成APK_flutter打包apk_文阿花的博客-CSDN博客 关于iconData可能出现的错误&#xff1a; flutter build apk 打包报错调试过程 - 掘金 (juejin.cn) 使用命令行&#xff1a;flutter build apk --no-tree-shake-icons...

Halcon如何使用SaperaLT库连接dalsa相机

halcon安装好的时候&#xff0c;没有带SaperaLT的采集库&#xff0c;需要额外在Halcon官网下载此库。 以下是halcon官网下载此库的链接。官网需要注册才可以下载。 https://www.mvtec.com/downloads/interfaces?tx_mvtecproduct_extensiondownloadlist%5Bfilter%5D%5B0%5Dma…...

Vue 嵌套路由 多级路由规则

套娃路由 routes:[{path: /login,component: Login},{path: /user,component: User,children:[{ path: test, component: Test },{ path: test2, component: Test2 },]}]子路由不需要加/ 在父组件 子路由不需要加/ 需要带上父亲的路由路径 <router-link to"user/test…...

pandas教程:Introduction to pandas Data Structures pandas的数据结构

文章目录 Chapter 5 Getting Started with pandas5.1 Introduction to pandas Data Structures1 Series2 DataFrame3 Index Objects (索引对象) Chapter 5 Getting Started with pandas 这样导入pandas&#xff1a; import pandas as pde:\python3.7\lib\site-packages\numpy…...

MinIO 分布式文件(对象)存储

简介 MinIO是高性能、可扩展、云原生支持、操作简单、开源的分布式对象存储产品。 在中国&#xff1a;阿里巴巴、腾讯、百度、中国联通、华为、中国移动等等9000多家企业也都在使用MinIO产品 官网地址&#xff1a;http://www.minio.org.cn/ 下载 官网下载(8.4.3版本)&#x…...

HTML表单标签

## HTML标签&#xff1a;表单标签 * 表单&#xff1a; * 概念&#xff1a;用于采集用户输入的数据的。用于和服务器进行交互。 * form&#xff1a;用于定义表单的。可以定义一个范围&#xff0c;范围代表采集用户数据的范围 * 属性&#xff1…...

【黑马程序员】SpringCloud——Eureka

文章目录 前言一、提供者与消费者1. 服务调用关系 二、远程调用的问题三、eureka 原理分析1. eureka 的作用 四、Eureka 案例1. 搭建 eureka 服务1. 服务注册1.1 注册 user-service1.2 启动 user-service3. order-service 完成服务注册 3. 服务发现1. 在 order-service 完成服务…...

目标跟踪(DeepSORT)

本文首先将介绍在目标跟踪任务中常用的匈牙利算法&#xff08;Hungarian Algorithm&#xff09;和卡尔曼滤波&#xff08;Kalman Filter&#xff09;&#xff0c;然后介绍经典算法DeepSORT的工作流程以及对相关源码进行解析。 目前主流的目标跟踪算法都是基于Tracking-by-Detec…...

2 任务2: 使用趋动云GPU进行猫狗识别实践

使用趋动云GPU进行猫狗识别实践 1 创建项目2 初始化开发环境3 调试代码4 提交离线任务5 结果集存储与下载 使用趋动云提供的免费GPU&#xff0c;进行猫狗识别实践。 虽然例程里面提供的是基于tensorflow的&#xff0c;但是你也可以使用pytorch的代码 使用这个平台的一个优点就是…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

React hook之useRef

React useRef 详解 useRef 是 React 提供的一个 Hook&#xff0c;用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途&#xff0c;下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

在四层代理中还原真实客户端ngx_stream_realip_module

一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡&#xff08;如 HAProxy、AWS NLB、阿里 SLB&#xff09;发起上游连接时&#xff0c;将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后&#xff0c;ngx_stream_realip_module 从中提取原始信息…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题

分区配置 (ptab.json) img 属性介绍&#xff1a; img 属性指定分区存放的 image 名称&#xff0c;指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件&#xff0c;则以 proj_name:binary_name 格式指定文件名&#xff0c; proj_name 为工程 名&…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…...

HubSpot推出与ChatGPT的深度集成引发兴奋与担忧

上周三&#xff0c;HubSpot宣布已构建与ChatGPT的深度集成&#xff0c;这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋&#xff0c;但同时也存在一些关于数据安全的担忧。 许多网络声音声称&#xff0c;这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...