当前位置: 首页 > news >正文

ProcessWindowFunction 结合自定义触发器的陷阱

背景:

flink中常见的需求如下:统计某个页面一天内的点击率,每10秒输出一次,我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢?如果这样实现问题是什么呢?

ProcessWindowFunction 结合自定义触发器实现统计点击率

关键代码:
在这里插入图片描述
在这里插入图片描述
完整代码参见:

package wikiedits.func;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import wikiedits.func.model.KeyCount;public class ProcessWindowFunctionAndTiggerDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/windowtrigger"));// 并行度为1env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int xxxNum = 0;int yyyNum = 0;for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX和YYY两种nameString name = (0 == i % 2) ? "XXX" : "YYY";// 更新aaa和bbb元素的总数if (0 == i % 2) {xxxNum++;} else {yyyNum++;}// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据if(xxxNum % 2000==0){System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n", name,time(timeStamp), xxxNum, yyyNum));}// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1);}}@Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<String> mainDataStream = dataStream// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.minutes(5))// 10s触发一次计算,更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {// 自定义状态private ValueState<KeyCount> state;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态,name是myStatestate = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));}public void clear(Context context) {ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));contextWindowValueState.clear();}@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,Collector<String> collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current = state.value();// 如果myState还从未没有赋值过,就在此初始化if (current == null) {current = new KeyCount();current.key = s;current.count = 0;}int count = 0;// iterable可以访问该key当前窗口内的所有数据,// 这里简单处理,只统计了元素数量for (Tuple2<String, Integer> tuple2 : iterable) {count++;}// 更新当前key的元素总数current.count += count;// 更新状态到backendstate.update(current);ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));KeyCount windowValue = contextWindowValueState.value();if (windowValue == null) {windowValue = new KeyCount();windowValue.key = s;windowValue.count = 0;}windowValue.count += count;contextWindowValueState.update(windowValue);// 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串String value = String.format("window, %s, %s - %s, %d, windowStateCount :%d,   total : %d",// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key所在窗口的总数contextWindowValueState.value().count,// 当前key出现的总数current.count);// 发射到下游算子collector.collect(value);}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute("processfunction demo : processwindowfunction");}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}

这里采用ProcessWindowFunction 结合ContinuousProcessingTimeTrigger的方式确实可以实现统计至今为止某个页面点击率的目的,不过这其中需要注意点的点是:
每隔10s触发public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector)方法时,iterable对象是包含了一天的窗口内收到的所有消息,也就是当前触发时iterable集合是前10s触发时iterable集合的超集,包含前10s触发时的所有的消息集合。
到这里所引起的问题也自然而然的出来了:对于ProcessWindowFunction 实现而言,flink内部是通过ListState的形式保存窗口内收到的所有消息的,注意这里flink内部会使用ListState保存每一条分配到以天为单位的窗口内的消息,这会导致状态膨胀,想一下,一天内所有的消息都会当成状态保存起来,这对于状态后端的压力是有多大!这些保存在ListState中的消息只有在窗口结束后才会清理:具体参见WindowOperator.clearAllState,那有解决方案吗?使用Agg/Reduce处理函数替ProcessWindowFunction作为处理函数可以实现吗?请看下一篇文章

参考文章:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html

相关文章:

ProcessWindowFunction 结合自定义触发器的陷阱

背景&#xff1a; flink中常见的需求如下&#xff1a;统计某个页面一天内的点击率,每10秒输出一次&#xff0c;我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢&#xff1f;如果这样实现问题是什么呢&#xff1f; ProcessWindowFunction 结合自定义触发器实现…...

什么是jvm

一、初识JVM&#xff08;虚拟机&#xff09; JVM是Java Virtual Machine&#xff08;Java虚拟机&#xff09;的缩写&#xff0c;JVM是一种用于计算设备的规范&#xff0c;它是一个虚构出来的计算机&#xff0c;是通过在实际的计算机上仿真模拟各种计算机功能来实现的。 引入Jav…...

kettle通过java步骤获取汉字首拼

kettle通过java步骤获取汉字首拼 用途描述 一组数据&#xff0c;需要获取汉字首拼后&#xff0c;输出&#xff1b; 实现效果 添加jar包 pinyin4j-2.5.0.jar 自定义常量数据 Java代码 完整代码&#xff1a; import net.sourceforge.pinyin4j.PinyinHelper; import net.sou…...

Conformer: Local Features Coupling Global Representationsfor Visual Recognition

论文链接&#xff1a;https://arxiv.org/abs/2105.03889 代码链接&#xff1a;https://github.com/pengzhiliang/Conformer 参考博文&#xff1a;Conformer论文以及代码解析&#xff08;上&#xff09;_conformer代码_从现在开始壹并超的博客-CSDN博客 摘要 在卷积神经网络…...

java8-Stream流常用API

什么是 Stream Stream&#xff08;流&#xff09;是 Java 8 引入的一个新的抽象概念&#xff0c;它代表着一种处理数据的序列。简单来说&#xff0c;Stream 是一系列元素的集合&#xff0c;这些元素可以是集合、数组、I/O 资源或者其他数据源。 Stream API 提供了丰富的操作方…...

React 任务调度

React 任务池 不同的fiber任务有不同的优先级&#xff0c;为了用户体验&#xff0c;React需要先处理优先级高的任务。 为了存储这些任务&#xff0c;React中有两个任务池&#xff1a; // Tasks are stored on a min heap var taskQueue []; // 存储立即要执行的任务 var tim…...

小白开始学习C++

​​​​第一节&#xff1a;控制台输出hello word&#xff01; #include<iostream> //引入库文件 int main() { //控制台输出 hello word! 之后回车 std::cout << "hello word!\n"; #include<iostream> //引入库文件int main() {//控制…...

SpringMVC入门的注解、参数传递、返回值和页面跳转---超详细教学

前言&#xff1a; 欢迎阅读Spring MVC入门必读&#xff01;在这篇文章中&#xff0c;我们将探索这个令人兴奋的框架&#xff0c;它为您提供了一种高效、灵活且易于维护的方式来构建Web应用程序。通过使用Spring MVC&#xff0c;您将享受到以下好处&#xff1a;简洁的代码、强大…...

【复习socket】每天40min,我们一起用70天稳扎稳打学完《JavaEE初阶》——28/70 第二十八天

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客,如有问题交流,欢迎评论区留言,一定尽快回复!(大家可以去看我的专栏,是所有文章的目录)   文章字体风格: 红色文字表示:重难点★✔ 蓝色文字表示:思路以及想法★✔   如果大家觉得有帮助的话,感谢大家帮忙 点…...

vue2踩坑之项目:生成二维码使用vue-print-nb打印二维码

1. vue2安装 npm install vue-print-nb --save vue3安装 npm install vue3-print-nb --save 2. //vue2 引入方式 全局 main.js import Print from vue-print-nb Vue.use(Print) ------------------------------------------------------------------------------------ //vue2 …...

【iVX】十五分钟制作一款小游戏,iVX真有怎么神?

个人主页&#xff1a;【&#x1f60a;个人主页】 新人博主&#xff0c;喜欢就关注一下呗~ 文章目录 前言iVX介绍初上手布置背景制作可移动物体总结&#xff08;完善步骤&#xff09; 前言 在上篇文章中&#xff0c;我向大家介绍了一种打破常规的编程方式——iVX&#xff0c;可…...

SpringMVC常用注解、参数传递、返回值

目录 前言 一、常用注解 二、参数传递 ​编辑 1. 基础类型String类型 2. 复杂类型 3. RequestParam 4. PathVariable 5.RequestBody 6. RequestHeader 三、方法返回值 一&#xff1a;void 二&#xff1a;String 三&#xff1a;Stringmodel 四&#xff1a;ModelAndVi…...

新公司第一次上架新APP需要提前准备哪些材料?

目录 前言一、需要上架的应用市场二、需要准备的资料总结 前言 前不久&#xff0c;使用一家新公司刚刚上架了一款新的APP项目。特此记录一下&#xff0c;现在第一次上架一款APP需要提前准备的各项材料。 一、需要上架的应用市场 现在&#xff0c;上架一款新的APP主流的应用市…...

『C语言进阶』指针进阶(一)

&#x1f525;博客主页&#xff1a; 小羊失眠啦 &#x1f516;系列专栏&#xff1a; C语言 &#x1f325;️每日语录&#xff1a;无论你怎么选&#xff0c;都难免会有遗憾。 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 前言 在C语言初阶中&#xff0c;我们对指针有了一定的…...

2605. 从两个数字数组里生成最小数字(Java)

给你两个只包含 1 到 9 之间数字的数组 nums1 和 nums2 &#xff0c;每个数组中的元素 互不相同 &#xff0c;请你返回 最小 的数字&#xff0c;两个数组都 至少 包含这个数字的某个数位。 示例 1&#xff1a; 输入&#xff1a;nums1 [4,1,3], nums2 [5,7] 输出&#xff1a;1…...

深度解析 PostgreSQL Protocol v3.0(一)

引言 PostgreSQL 使用基于消息的协议在前端&#xff08;也可以称为客户端&#xff09;和后端&#xff08;也可以称为服务器&#xff09;之间进行通信。该协议通过 TCP/IP 和 Unix 域套接字支持。 《深度解析 PostgreSQL Protocol v3.0》系列技术贴&#xff0c;将带大家深度了…...

Mysql中having语句与where语句的用法与区别

分析&回答 我们在写sql语句的时候,经常会使用where语句,很少会用到having,其实在mysql中having子句也是设定条件的语句与where有相似之处但也有区别。having子句在查询过程中慢于聚合语句(sum,min,max,avg,count)。而where子句在查询过程中则快于聚合语句(sum,min,max,avg…...

基于qt软件的网上聊天室软件

1.服务器: 1).功能: 用于创建一个客户端&#xff0c;通过文本编辑器来获得端口号&#xff0c;根据获得的端口号创建服务器&#xff0c;等待客户端连接 创建成功会提示服务器创建成功 在收到客户端发送的信息时&#xff0c;把这条信息发送给其他所有客户端&#xff0c;实现群…...

本是同根生-双数据库集群keepalived virtual_route_id冲突导致连接故障

项目场景&#xff1a; 一企业近期陆续开始升级办公与大数据系统&#xff0c;新的承包商。原有的数据库是某国内大品牌A&#xff0c;现在新的功能准备陆续迁移到大品牌B上。系统部署后&#xff0c;A依旧承担比较轻松的财务、仓库管理&#xff0c;B承担实时的线上业务。项目验收…...

『力扣每日一题06』字符串中的第一个唯一字符

今天是学习新知识的一天&#xff0c;String 类中有太多细枝末节&#xff0c;需要我去学习跟掌握了。 话不多说&#xff0c;今天给大家带来一道字符串的题目~ 一、题目 给定一个字符串 s &#xff0c;找到 它的第一个不重复的字符&#xff0c;并返回它的索引 。如果不存在&…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

Java多线程实现之Callable接口深度解析

Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

Unit 1 深度强化学习简介

Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库&#xff0c;例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体&#xff0c;比如 SnowballFight、Huggy the Do…...

SpringTask-03.入门案例

一.入门案例 启动类&#xff1a; package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)

船舶制造装配管理现状&#xff1a;装配工作依赖人工经验&#xff0c;装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书&#xff0c;但在实际执行中&#xff0c;工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...

WebRTC从入门到实践 - 零基础教程

WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC&#xff1f; WebRTC&#xff08;Web Real-Time Communication&#xff09;是一个支持网页浏览器进行实时语音…...