当前位置: 首页 > article >正文

基于Scala实现Flink的三种基本时间窗口操作

目录

代码结构

代码解析

(1) 主程序入口

(2) 窗口联结(Window Join)

(3) 间隔联结(Interval Join)

(4) 窗口同组联结(CoGroup)

(5) 执行任务

代码优化

(1) 时间戳分配

(2) 窗口大小

(3) 输出格式

(4) 并行度

优化后的代码


 

这段代码展示了 Apache Flink 中三种不同的流联结操作:窗口联结(Window Join)间隔联结(Interval Join) 和 窗口同组联结(CoGroup)。以下是对代码的详细解析和说明: 

代码结构

  • 包声明package transformplus
    定义了代码所在的包。

  • 导入依赖
    导入了 Flink 相关类库,包括流处理 API、窗口分配器、时间语义等。

  • WindowJoin 对象
    主程序入口,包含三种流联结操作的实现。

package transformplusimport java.langimport org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import source.Event/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: transformplus* @author: 赵嘉盟-HONOR* @data: 2023-12-05 12:05* @DESCRIPTION**/
object WindowJoin {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//TODO 窗口联结(join)val stream1 = env.fromElements(("a", 1000L),("b", 1000L),("a", 2000L),("b", 6000L)).assignAscendingTimestamps(_._2)val stream2 = env.fromElements(("a", 3000L),("b", 3000L),("a", 4000L),("b", 8000L)).assignAscendingTimestamps(_._2)stream1.join(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((e1,e2)=>e1+"->"+e2).print("Join")//TODO 间隔联结:用户行为事件联系(intervalJoin)// 订单事件流val orderStream: DataStream[(String, String, Long)] = env.fromElements(("Mary", "order-1", 5000L),("Alice", "order-2", 5000L),("Bob", "order-3", 20000L),("Alice", "order-4", 20000L),("Cary", "order-5", 51000L)).assignAscendingTimestamps(_._3)// 点击事件流val pvStream: DataStream[Event] = env.fromElements(Event("Bob", "./cart", 2000L),Event("Alice", "./prod?id=100", 3000L),Event("Alice", "./prod?id=200", 3500L),Event("Bob", "./prod?id=2", 2500L),Event("Alice", "./prod?id=300", 36000L),Event("Bob", "./home", 30000L),Event("Bob", "./prod?id=1", 23000L),Event("Bob", "./prod?id=3", 33000L)).assignAscendingTimestamps(_.timestamp)orderStream.keyBy(_._1).intervalJoin(pvStream.keyBy(_.user)).between(Time.seconds(-5),Time.seconds(10)).process(new ProcessJoinFunction[(String,String,Long),Event,String] {override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {collector.collect(in1+"=>"+in2)}}).print("intervalJoin")//TODO 窗口同组联结: coGroup(iterable)stream1.coGroup(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction[(String,Long),(String,Long),String] {override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {collector.collect(iterable+"=>"+iterable1)}}).print("coGroup")env.execute("windowJoin")}
}

代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)
  • 创建 Flink 流处理环境 StreamExecutionEnvironment,并设置并行度为 1。
(2) 窗口联结(Window Join)
val stream1 = env.fromElements(("a", 1000L),("b", 1000L),("a", 2000L),("b", 6000L)
).assignAscendingTimestamps(_._2)val stream2 = env.fromElements(("a", 3000L),("b", 3000L),("a", 4000L),("b", 8000L)
).assignAscendingTimestamps(_._2)stream1.join(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((e1, e2) => e1 + "->" + e2).print("Join")
  • 数据流:定义了两个流 stream1 和 stream2,分别包含键值对 (String, Long)
  • 时间戳分配:使用 assignAscendingTimestamps 方法为事件分配时间戳。
  • 窗口联结
    • 使用 join 方法将两个流按键(_._1)联结。
    • 使用 TumblingEventTimeWindows 定义 5 秒的滚动窗口。
    • 使用 apply 方法将匹配的事件对拼接成字符串并输出。
(3) 间隔联结(Interval Join)
val orderStream: DataStream[(String, String, Long)] = env.fromElements(("Mary", "order-1", 5000L),("Alice", "order-2", 5000L),("Bob", "order-3", 20000L),("Alice", "order-4", 20000L),("Cary", "order-5", 51000L)).assignAscendingTimestamps(_._3)val pvStream: DataStream[Event] = env.fromElements(Event("Bob", "./cart", 2000L),Event("Alice", "./prod?id=100", 3000L),Event("Alice", "./prod?id=200", 3500L),Event("Bob", "./prod?id=2", 2500L),Event("Alice", "./prod?id=300", 36000L),Event("Bob", "./home", 30000L),Event("Bob", "./prod?id=1", 23000L),Event("Bob", "./prod?id=3", 33000L)).assignAscendingTimestamps(_.timestamp)orderStream.keyBy(_._1).intervalJoin(pvStream.keyBy(_.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunction[(String, String, Long), Event, String] {override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {collector.collect(in1 + "=>" + in2)}}).print("intervalJoin")
  • 数据流:定义了两个流 orderStream(订单事件)和 pvStream(点击事件)。
  • 时间戳分配:为事件分配时间戳。
  • 间隔联结
    • 使用 intervalJoin 方法将两个流按键(_._1 和 user)联结。
    • 使用 between 方法定义时间间隔(前 5 秒到后 10 秒)。
    • 使用 process 方法将匹配的事件对拼接成字符串并输出。
(4) 窗口同组联结(CoGroup)
stream1.coGroup(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction[(String, Long), (String, Long), String] {override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {collector.collect(iterable + "=>" + iterable1)}}).print("coGroup")
  • 窗口同组联结
    • 使用 coGroup 方法将两个流按键(_._1)联结。
    • 使用 TumblingEventTimeWindows 定义 5 秒的滚动窗口。
    • 使用 apply 方法将匹配的事件集合拼接成字符串并输出。
(5) 执行任务
env.execute("windowJoin")
  • 启动 Flink 流处理任务,任务名称为 windowJoin

代码优化

(1) 时间戳分配
  • assignAscendingTimestamps 方法假设事件时间戳是严格递增的。如果时间戳可能乱序,应使用 assignTimestampsAndWatermarks 方法:

    java

    stream1.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2)
    )
(2) 窗口大小
  • 窗口大小(5 秒)可能不适合所有场景。应根据实际需求调整窗口大小。
(3) 输出格式
  • 输出格式较为简单,可以优化为更易读的形式:

    java

    collector.collect(s"Order: ${in1._2}, Click: ${in2.url}")
(4) 并行度
  • 并行度设置为 1,可能影响性能。可以根据集群资源调整并行度:

    java

    env.setParallelism(4)

优化后的代码

以下是优化后的完整代码:

package transformplusimport java.lang
import java.time.Durationimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import source.Eventobject WindowJoin {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)// 窗口联结val stream1 = env.fromElements(("a", 1000L),("b", 1000L),("a", 2000L),("b", 6000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2))val stream2 = env.fromElements(("a", 3000L),("b", 3000L),("a", 4000L),("b", 8000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2))stream1.join(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((e1, e2) => s"${e1._1} (${e1._2}) -> ${e2._1} (${e2._2})").print("Join")// 间隔联结val orderStream: DataStream[(String, String, Long)] = env.fromElements(("Mary", "order-1", 5000L),("Alice", "order-2", 5000L),("Bob", "order-3", 20000L),("Alice", "order-4", 20000L),("Cary", "order-5", 51000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, String, Long), timestamp: Long) => event._3))val pvStream: DataStream[Event] = env.fromElements(Event("Bob", "./cart", 2000L),Event("Alice", "./prod?id=100", 3000L),Event("Alice", "./prod?id=200", 3500L),Event("Bob", "./prod?id=2", 2500L),Event("Alice", "./prod?id=300", 36000L),Event("Bob", "./home", 30000L),Event("Bob", "./prod?id=1", 23000L),Event("Bob", "./prod?id=3", 33000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: Event, timestamp: Long) => event.timestamp))orderStream.keyBy(_._1).intervalJoin(pvStream.keyBy(_.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunction[(String, String, Long), Event, String] {override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {collector.collect(s"Order: ${in1._2}, Click: ${in2.url}")}}).print("intervalJoin")// 窗口同组联结stream1.coGroup(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction[(String, Long), (String, Long), String] {override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {collector.collect(s"Stream1: ${iterable.toString}, Stream2: ${iterable1.toString}")}}).print("coGroup")env.execute("windowJoin")}
}

相关文章:

基于Scala实现Flink的三种基本时间窗口操作

目录 代码结构 代码解析 (1) 主程序入口 (2) 窗口联结(Window Join) (3) 间隔联结(Interval Join) (4) 窗口同组联结(CoGroup) (5) 执行任务 代码优化 (1) 时间戳分配 (2) 窗口大小 (3) 输出格式…...

c++对halcon的动态链接库dll封装及调用(细细讲)

七个部分(是个大工程) 一,halcon封装函数导出cpp的内容介绍 二,c++中对halcon环境的配置 三,在配置环境下验证halcon代码 四,dll项目创建+环境配置 五,编辑dll及导出 六,调用打包好的动态链接库的配置 七,进行测试 一,halcon的封装及导出cpp的介绍 1,我这里…...

【优选算法】分治

一&#xff1a;颜色分类 class Solution { public:void sortColors(vector<int>& nums) {// 三指针法int n nums.size();int left -1, right n, i 0;while(i < right){if(nums[i] 0) swap(nums[left], nums[i]);else if(nums[i] 2) swap(nums[--right], num…...

QGraphicsView中鼠标点击与移动事件传递给MainWindow

在Qt图形应用程序开发中,QGraphicsView和QGraphicsScene框架提供了强大的2D图形显示功能。然而,当我们需要在主窗口(MainWindow)中处理这些视图中的鼠标事件。 问题背景 在典型的Qt图形应用程序架构中: MainWindow └── QGraphicsView└── QGraphicsScene└── QGra…...

【图片识别改名】如何批量将图片按图片上文字重命名?自动批量识别图片文字并命名,基于图片文字内容改名,WPF和京东ocr识别的解决方案

应用场景 在日常工作和生活中&#xff0c;我们经常会遇到需要对大量图片进行重命名的情况。例如&#xff0c;设计师可能需要根据图片内容为设计素材命名&#xff0c;文档管理人员可能需要根据扫描文档中的文字对图片进行分类命名。传统的手动重命名方式效率低下且容易出错&…...

RabbitMQ 的高可用性

RabbitMQ 是比较有代表性的&#xff0c;因为是基于主从&#xff08;非分布式&#xff09;做高可用的RabbitMQ 有三种模式&#xff1a;单机模式、普通集群模式、镜像集群模式。 单机模式 单机模式,生产几乎不用。 普通集群模式&#xff08;无高可用性&#xff09; 普通集群模…...

DAY 48 随机函数与广播机制

知识点回顾&#xff1a; 随机张量的生成&#xff1a;torch.randn函数卷积和池化的计算公式&#xff08;可以不掌握&#xff0c;会自动计算的&#xff09;pytorch的广播机制&#xff1a;加法和乘法的广播机制 ps&#xff1a;numpy运算也有类似的广播机制&#xff0c;基本一致 作…...

计算机基础知识(第五篇)

计算机基础知识&#xff08;第五篇&#xff09; 架构演化与维护 软件架构的演化和定义 软件架构的演化和维护就是对架构进行修改和完善的过程&#xff0c;目的就是为了使软件能够适应环境的变化而进行的纠错性修改和完善性修改等&#xff0c;是一个不断迭代的过程&#xff0…...

从零开始制作小程序简单概述

以下是结合案例的“从零制作小红书风格小程序”的全流程指南&#xff0c;采用小红书爆款笔记的结构呈现&#xff0c;并附CSDN参考资源&#x1f447;&#xff1a; 一、核心开发步骤&#xff08;附工具推荐&#xff09; 账号与定位 ✅ 注册类型选择&#xff1a;个人店&#xff08…...

AI架构师修炼之道

1 AI时代的架构革命 与传统软件开发和软件架构师相比&#xff0c;AI架构师面临着三重范式转换&#xff1a; 1.1 技术维度&#xff0c;需处理异构算力调度与模型生命周期管理的复杂性&#xff1b; 1.2 系统维度&#xff0c;需平衡实时性与资源约束的矛盾&#xff1b; 1.3 价…...

三十五、面向对象底层逻辑-Spring MVC中AbstractXlsxStreamingView的设计

在Web应用开发中&#xff0c;大数据量的Excel导出功能是常见需求。传统Apache POI的XSSF实现方式在处理超大数据集时&#xff0c;会因全量加载到内存导致OOM&#xff08;内存溢出&#xff09;问题。Spring MVC提供的AbstractXlsxStreamingView通过流式处理机制&#xff0c;有效…...

Unity的日志管理类

脚本功能&#xff1a; 1&#xff0c;打印日志到控制台 2&#xff0c;显示日志到UI Text 3&#xff0c;将日志写入本地文件 这对unity开发安卓平台来说很有用 using System; using System.IO; using System.Text; using UnityEngine; using UnityEngine.UI;public class FileLo…...

【PhysUnits】17.2 配套变量结构体 Var(variable.rs)

一、源码 这段代码定义了一个泛型结构体 Var&#xff0c;用于封装数值类型并提供各种运算操作。 /** 变量结构体 Var* 该结构体泛型参数 T 需满足 Numeric 约束*/use core::ops::{Neg, Add, Sub, Mul, Div, AddAssign, SubAssign, MulAssign}; use crate::constant::Integer;…...

iview组件库:当后台返回到的数据与使用官网组件指定的字段不匹配时,进行修改某个属性名再将response数据渲染到页面上的处理

1、需求导入 当存在前端需要的数据的字段渲染到表格或者是一些公共的表格组件展示数据时的某个字段名与后台返回的字段不一致时&#xff0c;那么需要前端进行稍加处理&#xff0c;而不能直接this.list res.data;这样数据是渲染不出来的。 2、后台返回的数据类型 Datalist(pn) …...

服务器 | Centos 9 系统中,如何部署SpringBoot后端项目?

系列文章目录 虚拟机 | Ubuntu 安装流程以及界面太小问题解决 虚拟机 | Ubuntu图形化系统&#xff1a; open-vm-tools安装失败以及实现文件拖放 虚拟机 | Ubuntu操作系统&#xff1a;su和sudo理解及如何处理忘记root密码 文章目录 系列文章目录前言一、环境介绍二、 使用syst…...

qt network 整体框架

以下是 Qt 网络模块中 QNetworkInterface、TCP、UDP 及相关类的层次关系图及说明&#xff1a; 一、Qt 网络模块层次结构 ┌─────────────────────────────────────────────────────────────┐ │ QtNetwork 模…...

C++ map基础概念、map对象创建、map赋值操作、map大小操作、map数据插入、map数据删除、map数据修改、map数据统计

map的使用频率很高&#xff0c;仅次于vector&#xff0c;先了解下pair的概念&#xff1a; pair 概念&#xff1a; template<class _Ty1, class Ty2> struct pair{ _Ty1 first; // 这两个可以是任意的类型 _Ty2 second; }; eg&#xff1a;pair<int, int> p(13,…...

(2025)Windows修改JupyterNotebook的字体,使用JetBrains Mono

(JetBrains Mono字体未下载就配置,这种情况我不知道能不能行,没做过实验,因为我电脑已经下载了,不可能删了那么多字体做实验,我的建议是下载JetBrains Mono字体,当你使用VsCode配置里面的JetBrains字体也很有用) 首先参考该文章下载字体到电脑上 VSCode 修改字体为JetBrains …...

小番茄C盘清理:专业高效的电脑磁盘清理工具

在使用电脑的过程中&#xff0c;我们常常会遇到系统盘空间不足、磁盘碎片过多、垃圾文件堆积等问题&#xff0c;这些问题不仅会导致电脑运行缓慢&#xff0c;还可能引发系统崩溃。为了解决这些问题&#xff0c;小番茄C盘清理应运而生。它是一款专业的C盘清理软件&#xff0c;能…...

CSS 预处理器与工具

目录 CSS 预处理器与工具1. Less主要特性 2. Sass/SCSS主要特性 3. Tailwind CSS主要特性 4. 其他工具PostCSSCSS Modules 5. 选择建议 CSS 预处理器与工具 1. Less Less 是一个 CSS 预处理器&#xff0c;它扩展了 CSS 语言&#xff0c;添加了变量、嵌套规则、混合&#xff0…...

AUTOSAR实战教程--标准协议栈实现DoIP转DoCAN的方法

目录 软件架构 关键知识点 第一:PDUR的缓存作用 第二:CANTP的组包拆包功能 第三:流控帧的意义 配置过程 步骤0:ECUC模块中PDU创建 步骤1:SoAD模块维持不变 步骤2:DoIP模块为Gateway功能添加Connection ​步骤3:DoIP模块为Gateway新增LA/TA/SA ​步骤4:PDUR模…...

【MySQL系列】MySQL 导出表数据到文件

博客目录 一、使用 SELECT INTO OUTFILE 语句基本语法参数详解注意事项实际示例 二、使用 mysqldump 工具基本语法常用选项实际示例 三、使用 MySQL Workbench 导出导出步骤高级选项 四、其他导出方法1. 使用 mysql 命令行客户端2. 使用 LOAD DATA INFILE 的逆向操作3. 使用编程…...

vue3:十五、管理员管理-页面搭建

一、页面效果 实现管理员页面,完成管理员对应角色的中文名称显示,实现搜索栏,表格基本增删改查,分页等功能 二、修改问题 1、修改搜索框传递参数问题 (1)问题图示 如下图,之前搜索后,传递的数据不直接是一个value值,而是如下图的格式 查询可知这里传递的数据定义的是…...

学习使用YOLO的predict函数使用

YOLO的 result.py #2025.1.3 """ https://docs.ultralytics.com/zh/modes/predict/#inference-arguments 对yolo 目标检测、实例分割、关键点检测结果进行说明https://docs.ultralytics.com/reference/engine/results/#ultralytics.engine.results.Masks.xy 对…...

零基础在实践中学习网络安全-皮卡丘靶场(第十四期-XXE模块)

本期内容涉及到很多前面的内容&#xff0c;因此复习后可以更好的了解本期内容 介绍 XXE -"xml external entity injection"即"xml外部实体注入漏洞"。 概括一下就是"攻击者通过向服务器注入指定的xml实体内容,从而让服务器按照指定的配置进行执行,导…...

深入浅出Spring Security

一、Spring Security基本组件 Spring Security的设计理念是提供一种可插拔的、高度可定制的安全服务。其核心功能依赖于以下几个关键组件&#xff1a; Authentication (认证): 概念: 确认用户身份的过程&#xff0c;即验证“你是谁”。核心类: Authentication 接口&#xff0c…...

基于51单片机的红外防盗及万年历仿真

目录 具体实现功能 设计介绍 资料内容 全部内容 资料获取 具体实现功能 具体功能&#xff1a; &#xff08;1&#xff09;实时显示年、月、日、时、分、秒、星期信息&#xff1b; &#xff08;2&#xff09;红外传感器&#xff08;仿真中用按键模拟&#xff09;检测是否有…...

Doris 数据库深度解析:架构、原理与实战应用

一、Doris 的架构与原理 1. 架构组成 Doris 是一个分布式 MPP&#xff08;大规模并行处理&#xff09;数据库&#xff0c;它的架构主要由以下几部分组成&#xff1a; FE&#xff08;Frontend&#xff09;&#xff1a;负责管理元数据、解析 SQL 查询、优化查询计划&#xff0…...

【飞腾AI加固服务器】全国产化飞腾+昇腾310+PCIe Switch的AI大模型服务器解决方案

以下是全国产化飞腾AI加固服务器采用飞腾昇腾PCIe Switch解决方案&#xff1a; &#x1f5a5;️ 一、硬件架构亮点 ‌国产算力双擎‌ ‌飞腾处理器‌&#xff1a;搭载飞腾FT2000/64核服务器级CPU&#xff08;主频1.8-2.2GHz&#xff09;&#xff0c;支持高并发任务与复杂计算&a…...

【术语扫盲】评估指标Precision、Recall、F1-score、Support是什么含义?

一、背景 Precision、Recall、F1-score、Support 是分类问题中最常用的评估指标&#xff0c;它们是机器学习、深度学习、数据挖掘中非常基础也非常重要的术语。 二、 详细解释 指标含义公式Precision&#xff08;精准率&#xff09;预测为某类的样本中&#xff0c;有多少是真…...