当前位置: 首页 > news >正文

ProcessWindowFunction 结合自定义触发器的陷阱

背景:

flink中常见的需求如下:统计某个页面一天内的点击率,每10秒输出一次,我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢?如果这样实现问题是什么呢?

ProcessWindowFunction 结合自定义触发器实现统计点击率

关键代码:
在这里插入图片描述
在这里插入图片描述
完整代码参见:

package wikiedits.func;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import wikiedits.func.model.KeyCount;public class ProcessWindowFunctionAndTiggerDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/windowtrigger"));// 并行度为1env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int xxxNum = 0;int yyyNum = 0;for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX和YYY两种nameString name = (0 == i % 2) ? "XXX" : "YYY";// 更新aaa和bbb元素的总数if (0 == i % 2) {xxxNum++;} else {yyyNum++;}// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据if(xxxNum % 2000==0){System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n", name,time(timeStamp), xxxNum, yyyNum));}// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1);}}@Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<String> mainDataStream = dataStream// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.minutes(5))// 10s触发一次计算,更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {// 自定义状态private ValueState<KeyCount> state;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态,name是myStatestate = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));}public void clear(Context context) {ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));contextWindowValueState.clear();}@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,Collector<String> collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current = state.value();// 如果myState还从未没有赋值过,就在此初始化if (current == null) {current = new KeyCount();current.key = s;current.count = 0;}int count = 0;// iterable可以访问该key当前窗口内的所有数据,// 这里简单处理,只统计了元素数量for (Tuple2<String, Integer> tuple2 : iterable) {count++;}// 更新当前key的元素总数current.count += count;// 更新状态到backendstate.update(current);ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));KeyCount windowValue = contextWindowValueState.value();if (windowValue == null) {windowValue = new KeyCount();windowValue.key = s;windowValue.count = 0;}windowValue.count += count;contextWindowValueState.update(windowValue);// 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串String value = String.format("window, %s, %s - %s, %d, windowStateCount :%d,   total : %d",// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key所在窗口的总数contextWindowValueState.value().count,// 当前key出现的总数current.count);// 发射到下游算子collector.collect(value);}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute("processfunction demo : processwindowfunction");}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}

这里采用ProcessWindowFunction 结合ContinuousProcessingTimeTrigger的方式确实可以实现统计至今为止某个页面点击率的目的,不过这其中需要注意点的点是:
每隔10s触发public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector)方法时,iterable对象是包含了一天的窗口内收到的所有消息,也就是当前触发时iterable集合是前10s触发时iterable集合的超集,包含前10s触发时的所有的消息集合。
到这里所引起的问题也自然而然的出来了:对于ProcessWindowFunction 实现而言,flink内部是通过ListState的形式保存窗口内收到的所有消息的,注意这里flink内部会使用ListState保存每一条分配到以天为单位的窗口内的消息,这会导致状态膨胀,想一下,一天内所有的消息都会当成状态保存起来,这对于状态后端的压力是有多大!这些保存在ListState中的消息只有在窗口结束后才会清理:具体参见WindowOperator.clearAllState,那有解决方案吗?使用Agg/Reduce处理函数替ProcessWindowFunction作为处理函数可以实现吗?请看下一篇文章

参考文章:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html

相关文章:

ProcessWindowFunction 结合自定义触发器的陷阱

背景&#xff1a; flink中常见的需求如下&#xff1a;统计某个页面一天内的点击率,每10秒输出一次&#xff0c;我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢&#xff1f;如果这样实现问题是什么呢&#xff1f; ProcessWindowFunction 结合自定义触发器实现…...

什么是jvm

一、初识JVM&#xff08;虚拟机&#xff09; JVM是Java Virtual Machine&#xff08;Java虚拟机&#xff09;的缩写&#xff0c;JVM是一种用于计算设备的规范&#xff0c;它是一个虚构出来的计算机&#xff0c;是通过在实际的计算机上仿真模拟各种计算机功能来实现的。 引入Jav…...

kettle通过java步骤获取汉字首拼

kettle通过java步骤获取汉字首拼 用途描述 一组数据&#xff0c;需要获取汉字首拼后&#xff0c;输出&#xff1b; 实现效果 添加jar包 pinyin4j-2.5.0.jar 自定义常量数据 Java代码 完整代码&#xff1a; import net.sourceforge.pinyin4j.PinyinHelper; import net.sou…...

Conformer: Local Features Coupling Global Representationsfor Visual Recognition

论文链接&#xff1a;https://arxiv.org/abs/2105.03889 代码链接&#xff1a;https://github.com/pengzhiliang/Conformer 参考博文&#xff1a;Conformer论文以及代码解析&#xff08;上&#xff09;_conformer代码_从现在开始壹并超的博客-CSDN博客 摘要 在卷积神经网络…...

java8-Stream流常用API

什么是 Stream Stream&#xff08;流&#xff09;是 Java 8 引入的一个新的抽象概念&#xff0c;它代表着一种处理数据的序列。简单来说&#xff0c;Stream 是一系列元素的集合&#xff0c;这些元素可以是集合、数组、I/O 资源或者其他数据源。 Stream API 提供了丰富的操作方…...

React 任务调度

React 任务池 不同的fiber任务有不同的优先级&#xff0c;为了用户体验&#xff0c;React需要先处理优先级高的任务。 为了存储这些任务&#xff0c;React中有两个任务池&#xff1a; // Tasks are stored on a min heap var taskQueue []; // 存储立即要执行的任务 var tim…...

小白开始学习C++

​​​​第一节&#xff1a;控制台输出hello word&#xff01; #include<iostream> //引入库文件 int main() { //控制台输出 hello word! 之后回车 std::cout << "hello word!\n"; #include<iostream> //引入库文件int main() {//控制…...

SpringMVC入门的注解、参数传递、返回值和页面跳转---超详细教学

前言&#xff1a; 欢迎阅读Spring MVC入门必读&#xff01;在这篇文章中&#xff0c;我们将探索这个令人兴奋的框架&#xff0c;它为您提供了一种高效、灵活且易于维护的方式来构建Web应用程序。通过使用Spring MVC&#xff0c;您将享受到以下好处&#xff1a;简洁的代码、强大…...

【复习socket】每天40min,我们一起用70天稳扎稳打学完《JavaEE初阶》——28/70 第二十八天

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客,如有问题交流,欢迎评论区留言,一定尽快回复!(大家可以去看我的专栏,是所有文章的目录)   文章字体风格: 红色文字表示:重难点★✔ 蓝色文字表示:思路以及想法★✔   如果大家觉得有帮助的话,感谢大家帮忙 点…...

vue2踩坑之项目:生成二维码使用vue-print-nb打印二维码

1. vue2安装 npm install vue-print-nb --save vue3安装 npm install vue3-print-nb --save 2. //vue2 引入方式 全局 main.js import Print from vue-print-nb Vue.use(Print) ------------------------------------------------------------------------------------ //vue2 …...

【iVX】十五分钟制作一款小游戏,iVX真有怎么神?

个人主页&#xff1a;【&#x1f60a;个人主页】 新人博主&#xff0c;喜欢就关注一下呗~ 文章目录 前言iVX介绍初上手布置背景制作可移动物体总结&#xff08;完善步骤&#xff09; 前言 在上篇文章中&#xff0c;我向大家介绍了一种打破常规的编程方式——iVX&#xff0c;可…...

SpringMVC常用注解、参数传递、返回值

目录 前言 一、常用注解 二、参数传递 ​编辑 1. 基础类型String类型 2. 复杂类型 3. RequestParam 4. PathVariable 5.RequestBody 6. RequestHeader 三、方法返回值 一&#xff1a;void 二&#xff1a;String 三&#xff1a;Stringmodel 四&#xff1a;ModelAndVi…...

新公司第一次上架新APP需要提前准备哪些材料?

目录 前言一、需要上架的应用市场二、需要准备的资料总结 前言 前不久&#xff0c;使用一家新公司刚刚上架了一款新的APP项目。特此记录一下&#xff0c;现在第一次上架一款APP需要提前准备的各项材料。 一、需要上架的应用市场 现在&#xff0c;上架一款新的APP主流的应用市…...

『C语言进阶』指针进阶(一)

&#x1f525;博客主页&#xff1a; 小羊失眠啦 &#x1f516;系列专栏&#xff1a; C语言 &#x1f325;️每日语录&#xff1a;无论你怎么选&#xff0c;都难免会有遗憾。 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 前言 在C语言初阶中&#xff0c;我们对指针有了一定的…...

2605. 从两个数字数组里生成最小数字(Java)

给你两个只包含 1 到 9 之间数字的数组 nums1 和 nums2 &#xff0c;每个数组中的元素 互不相同 &#xff0c;请你返回 最小 的数字&#xff0c;两个数组都 至少 包含这个数字的某个数位。 示例 1&#xff1a; 输入&#xff1a;nums1 [4,1,3], nums2 [5,7] 输出&#xff1a;1…...

深度解析 PostgreSQL Protocol v3.0(一)

引言 PostgreSQL 使用基于消息的协议在前端&#xff08;也可以称为客户端&#xff09;和后端&#xff08;也可以称为服务器&#xff09;之间进行通信。该协议通过 TCP/IP 和 Unix 域套接字支持。 《深度解析 PostgreSQL Protocol v3.0》系列技术贴&#xff0c;将带大家深度了…...

Mysql中having语句与where语句的用法与区别

分析&回答 我们在写sql语句的时候,经常会使用where语句,很少会用到having,其实在mysql中having子句也是设定条件的语句与where有相似之处但也有区别。having子句在查询过程中慢于聚合语句(sum,min,max,avg,count)。而where子句在查询过程中则快于聚合语句(sum,min,max,avg…...

基于qt软件的网上聊天室软件

1.服务器: 1).功能: 用于创建一个客户端&#xff0c;通过文本编辑器来获得端口号&#xff0c;根据获得的端口号创建服务器&#xff0c;等待客户端连接 创建成功会提示服务器创建成功 在收到客户端发送的信息时&#xff0c;把这条信息发送给其他所有客户端&#xff0c;实现群…...

本是同根生-双数据库集群keepalived virtual_route_id冲突导致连接故障

项目场景&#xff1a; 一企业近期陆续开始升级办公与大数据系统&#xff0c;新的承包商。原有的数据库是某国内大品牌A&#xff0c;现在新的功能准备陆续迁移到大品牌B上。系统部署后&#xff0c;A依旧承担比较轻松的财务、仓库管理&#xff0c;B承担实时的线上业务。项目验收…...

『力扣每日一题06』字符串中的第一个唯一字符

今天是学习新知识的一天&#xff0c;String 类中有太多细枝末节&#xff0c;需要我去学习跟掌握了。 话不多说&#xff0c;今天给大家带来一道字符串的题目~ 一、题目 给定一个字符串 s &#xff0c;找到 它的第一个不重复的字符&#xff0c;并返回它的索引 。如果不存在&…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作&#xff1a; 1&#xff09;、切换集群 2&#xff09;、切换节点 3&#xff09;、切换到 apparmor 的目录 4&#xff09;、执行 apparmor 策略模块 5&#xff09;、修改 pod 文件 6&#xff09;、…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

【Java学习笔记】BigInteger 和 BigDecimal 类

BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点&#xff1a;传参类型必须是类对象 一、BigInteger 1. 作用&#xff1a;适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...

基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解

JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用&#xff0c;结合SQLite数据库实现联系人管理功能&#xff0c;并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能&#xff0c;同时可以最小化到系统…...