Flink 统计接入的数据量-滚动窗口和状态的使用
1、概述
在生产场景值,经常需要和上游、下游对数,离线场景可以直接 group by 再 count ,但是实时场景中,如果使用 kafka 作为中间件,中间经过几个 job 的过滤转化后,再对照像 Doris 或 Clickhouse 中最终层的数据,如果出现缺失,很难判断是哪一层缺失的。
2、使用 侧流输出+处理时间的滚动窗口+状态进行数据量级统计
package com.flink.feature.windowcount;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** 1、先输入数据* 1* 1* 1* 1* 输出结果* main=>:8> 业务处理=>1* main=>:1> 业务处理=>1* main=>:2> 业务处理=>1* main=>:3> 业务处理=>1* 每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)** 2、再输入数据* 1* 2* 2* 3* 3* 4* 4* 输出结果* main=>:4> 业务处理=>1* main=>:5> 业务处理=>2* main=>:6> 业务处理=>2* main=>:7> 业务处理=>3* main=>:8> 业务处理=>3* main=>:1> 业务处理=>4* main=>:2> 业务处理=>4* 每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)* 每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)*/public class UseWindowValidateData {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();OutputTag<Tuple2<String,Integer>> windowCountTag = new OutputTag<Tuple2<String,Integer>>("window_count"){};DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String input, ProcessFunction<String, String>.Context ctx, Collector<String> collector) throws Exception {ctx.output(windowCountTag,new Tuple2<>("窗口统计=>"+input,1));collector.collect("业务处理=>" + input);}});process.getSideOutput(windowCountTag).keyBy(new KeySelector<Tuple2<String,Integer>, String>() {@Overridepublic String getKey(Tuple2<String,Integer> tp) throws Exception {return tp.f0;}}).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String,String,String,Integer>, String, TimeWindow>() {private MapState<String, Integer> mapState;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Integer> stateDescriptor = new MapStateDescriptor<>("map-state", String.class, Integer.class);mapState = getRuntimeContext().getMapState(stateDescriptor);}@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String, String, String, Integer>, String, TimeWindow>.Context ctx, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple4<String, String, String, Integer>> out) throws Exception {for (Tuple2<String, Integer> tp : elements) {Integer res = mapState.get(tp.f0);if (res == null) {res = 0;}res += 1;mapState.put(tp.f0, res);}out.collect(new Tuple4<>(String.valueOf(ctx.window().getStart()),String.valueOf(ctx.window().getEnd()),key,mapState.get(key)));// 每个窗口计算后清空状态mapState.clear();}}).print("每10秒每个key接受到的数据量=>");process.print("main=>");env.execute();}
}
3、测试结果
1)先输入数据
1
1
1
1
输出结果
main=>:8> 业务处理=>1
main=>:1> 业务处理=>1
main=>:2> 业务处理=>1
main=>:3> 业务处理=>1
每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)
2)再输入数据
1
2
2
3
3
4
4
输出结果
main=>:4> 业务处理=>1
main=>:5> 业务处理=>2
main=>:6> 业务处理=>2
main=>:7> 业务处理=>3
main=>:8> 业务处理=>3
main=>:1> 业务处理=>4
main=>:2> 业务处理=>4
每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)
每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)
相关文章:
Flink 统计接入的数据量-滚动窗口和状态的使用
1、概述 在生产场景值,经常需要和上游、下游对数,离线场景可以直接 group by 再 count ,但是实时场景中,如果使用 kafka 作为中间件,中间经过几个 job 的过滤转化后,再对照像 Doris 或 Clickhouse 中最终层…...
SpringBoot快速整合canal1.1.5(TCP模式)
SpringBoot快速整合canal1.1.5(TCP模式) 安装并配置MySQL主从⭐ 1:Docker安装MySQL8.0.28 docker pull mysql:8.0.282:创建目录: mkdir -p /usr/local/mysql8/data mkdir -p /usr/local/mysql8/log mkdir -p /usr/…...

docker打包container成image,然后将image上传到docker hub
第一步:停止正在运行的容器 docker stop <container_name> eg: docker stop xuanjie_mlir 第二步:将对应的container打包成image docker commit <container_id> <镜像名:版本> eg:docker commit 005672e6d97a…...

设计模式—创建型模式之原型模式
设计模式—创建型模式之原型模式 原型模式(Prototype Pattern)用于创建重复的对象,同时又能保证性能。 本体给外部提供一个克隆体进行使用。 比如我们做一个SjdwzMybatis,用来操作数据库,从数据库里面查出很多记录&…...

Zygote进程通信为什么用Socket而不是Binder?
Zygote进程是Android系统中的一个特殊进程,它在系统启动时被创建,并负责孵化其他应用进程。它的主要作用是预加载和共享应用进程的资源,以提高应用启动的速度。 在Android系统中,常用的进程通信方式有以下几种: Intent…...

API接口加密,解决自动化中登录问题
一、加密方式 AES:对称加密,快RAS:非对称加密,慢AESRAS:安全高效 加密过程:字符串》字节流》加密的字节流(算法),解密有可能出现乱码,所以不能直接转成字符…...

COCOS2DX3.17.2 Android升级targetSDK30问题解决方案
一、luajit不兼容问题 不兼容版本:【2.1.0-bate2、2.1.0-bate3都存在异常】 出问题系统:Android11;Android10的系统部分机型有问题,部分机型正常 异常点1:c调用lua接口,pushObjiect的时候crash 异常点2…...

HarmonyOS鸿蒙原生应用开发设计- 隐私声明
HarmonyOS设计文档中,为大家提供了独特的隐私声明,开发者可以根据需要直接引用。 开发者直接使用官方提供的隐私声明内容,既可以符合HarmonyOS原生应用的开发上架运营规范,又可以防止使用别人的内容产生的侵权意外情况等ÿ…...

【面试精选】00后卷王带你三天刷完软件测试面试八股文
前言 本人普通本科计算机专业,做测试也有3年的时间了,讲下我的经历,我刚毕业就进了一个小自研薪资还不错,有10.5k(个人觉得我很优秀),在里面呆了两年,积累了一些的经验和技能&#…...

k-means算法c++实现
计算数据集中的元素与各个簇的中心的距离,将它赋给最近的簇,然后重新计算每个簇的平均值,再将元素按离平均值点最近的原则重新分配直到没有出现重新分配 该算法要事先给出k的值,即划分为几个簇。 vector<int> datoclu(dat…...
oracle查询哪些用户下有表
oracle查询哪些用户下有表,排除系统用户。 在实际业务中 oracle数据库中创建了很多的用户 但实际都是无表的,利用SQL语句将这些有表的用户查询出来 并显示用户名、表名、创建表的时间等信息。 select * from dba_objects where object_type = TABLE and owner not in ( AN…...

机器人连杆惯量参数辨识(估计)
杆的转动惯量的计算公式是Imr^2。在经典力学中,转动惯量(又称质量惯性矩,简称惯矩)通常以I 或J表示,SI 单位为 kgm。对于一个质点,I mr,其中 m 是其质量,r 是质点和转轴的垂直距离。…...

一座 “数智桥梁”,华为助力“天堑变通途”
《水调歌头游泳》中的一句话,“一桥飞架南北,天堑变通途”,广为人们所熟知,其中展现出的,是中国人对美好出行的无限向往。 天堑变通途从来不易。 中国是当今世界上交通运输最繁忙、最快捷的国家之一,交通行…...

C#知识总结 基础篇(上)
本篇内容参考C#图解教程 本篇内容偏向基础,适合0基础的朋友快速上手,也适合有一定C语言(或其他语言如C,java)基础的人快速上手C#。同时适合unity引擎的初学者,更加详细的了解C#语言。 本文内容基本涵盖C#基础内容&am…...

照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版 2024年最新中文版下载
照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版是一款功能强大的专业级图像编辑软件,由Serif公司开发。它提供了广泛的工具和功能,适用于摄影师、设计师和艺术家。 照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版软件介绍 TIFF&#…...

TPAMI 2023 | Temporal Perceiver:通用时序边界检测方法
本文介绍一下今年我们组被T-PAMI 2023收录的时序边界检测工作 Temporal Perceiver: A General Architecture for Arbitrary Boundary Detection。 论文名称: Temporal Perceiver: A General Architecture for Arbitrary Boundary Detection 论文链接&…...
Unity-UV展开工具
using System.Collections; using System.Collections.Generic; using UnityEngine; using UnityEditor;public class unfold : EditorWindow {[MenuItem("Gq_Tools/展开")]public static void ShowWin(){EditorWindow.CreateInstance<unfold>().Show();}priva…...
springboot actuator jvm监控丢失
1、背景 系统接入了监控prometheus和grafana,某天grafana突然发现只有几台机器可以看到指标。 随便点击一个地址http://192.168.0.76:8681/lms/actuator/prometheus访问指标,发现JVM相关指标全部丢失 2、解决方法 从网上查找相关资料,逐一…...

UDP服务端和客户端通信代码开发流程
一、UDP通信 TCP:传输控制协议,面向连接的,稳定的,可靠的,安全的数据集流传递 稳定和可靠:丢包重传 数据有序:序号和确认序号 流量控制:稳定窗口 UDP:用户数据报协议 面向无连接的,不稳定的,不可靠,不安…...

数据库实验:SQL的数据定义与单表查询
目录 实验目的实验内容实验要求实验过程实验步骤实例代码结果示意 数据库的实验,对关系型数据库MySQL进行一些实际的操作 实验目的 (1) 掌握DBMS的数据定义功能 (2) 掌握SQL语言的数据定义语句 (3) 掌握RDBMS的数据单表查询功能 (4) 掌握SQL语言的数据单表查询语句…...

苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...

视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...

select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...

tree 树组件大数据卡顿问题优化
问题背景 项目中有用到树组件用来做文件目录,但是由于这个树组件的节点越来越多,导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多,导致的浏览器卡顿,这里很明显就需要用到虚拟列表的技术&…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...

有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换
目录 关键点 技术实现1 技术实现2 摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车…...

Axure 下拉框联动
实现选省、选完省之后选对应省份下的市区...

高考志愿填报管理系统---开发介绍
高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发,采用现代化的Web技术,为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## 📋 系统概述 ### 🎯 系统定…...