Flink 统计接入的数据量-滚动窗口和状态的使用
1、概述
在生产场景值,经常需要和上游、下游对数,离线场景可以直接 group by 再 count ,但是实时场景中,如果使用 kafka 作为中间件,中间经过几个 job 的过滤转化后,再对照像 Doris 或 Clickhouse 中最终层的数据,如果出现缺失,很难判断是哪一层缺失的。
2、使用 侧流输出+处理时间的滚动窗口+状态进行数据量级统计
package com.flink.feature.windowcount;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** 1、先输入数据* 1* 1* 1* 1* 输出结果* main=>:8> 业务处理=>1* main=>:1> 业务处理=>1* main=>:2> 业务处理=>1* main=>:3> 业务处理=>1* 每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)** 2、再输入数据* 1* 2* 2* 3* 3* 4* 4* 输出结果* main=>:4> 业务处理=>1* main=>:5> 业务处理=>2* main=>:6> 业务处理=>2* main=>:7> 业务处理=>3* main=>:8> 业务处理=>3* main=>:1> 业务处理=>4* main=>:2> 业务处理=>4* 每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)* 每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)*/public class UseWindowValidateData {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();OutputTag<Tuple2<String,Integer>> windowCountTag = new OutputTag<Tuple2<String,Integer>>("window_count"){};DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String input, ProcessFunction<String, String>.Context ctx, Collector<String> collector) throws Exception {ctx.output(windowCountTag,new Tuple2<>("窗口统计=>"+input,1));collector.collect("业务处理=>" + input);}});process.getSideOutput(windowCountTag).keyBy(new KeySelector<Tuple2<String,Integer>, String>() {@Overridepublic String getKey(Tuple2<String,Integer> tp) throws Exception {return tp.f0;}}).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String,String,String,Integer>, String, TimeWindow>() {private MapState<String, Integer> mapState;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Integer> stateDescriptor = new MapStateDescriptor<>("map-state", String.class, Integer.class);mapState = getRuntimeContext().getMapState(stateDescriptor);}@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String, String, String, Integer>, String, TimeWindow>.Context ctx, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple4<String, String, String, Integer>> out) throws Exception {for (Tuple2<String, Integer> tp : elements) {Integer res = mapState.get(tp.f0);if (res == null) {res = 0;}res += 1;mapState.put(tp.f0, res);}out.collect(new Tuple4<>(String.valueOf(ctx.window().getStart()),String.valueOf(ctx.window().getEnd()),key,mapState.get(key)));// 每个窗口计算后清空状态mapState.clear();}}).print("每10秒每个key接受到的数据量=>");process.print("main=>");env.execute();}
}
3、测试结果
1)先输入数据
1
1
1
1
输出结果
main=>:8> 业务处理=>1
main=>:1> 业务处理=>1
main=>:2> 业务处理=>1
main=>:3> 业务处理=>1
每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)
2)再输入数据
1
2
2
3
3
4
4
输出结果
main=>:4> 业务处理=>1
main=>:5> 业务处理=>2
main=>:6> 业务处理=>2
main=>:7> 业务处理=>3
main=>:8> 业务处理=>3
main=>:1> 业务处理=>4
main=>:2> 业务处理=>4
每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)
每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)
相关文章:
Flink 统计接入的数据量-滚动窗口和状态的使用
1、概述 在生产场景值,经常需要和上游、下游对数,离线场景可以直接 group by 再 count ,但是实时场景中,如果使用 kafka 作为中间件,中间经过几个 job 的过滤转化后,再对照像 Doris 或 Clickhouse 中最终层…...
SpringBoot快速整合canal1.1.5(TCP模式)
SpringBoot快速整合canal1.1.5(TCP模式) 安装并配置MySQL主从⭐ 1:Docker安装MySQL8.0.28 docker pull mysql:8.0.282:创建目录: mkdir -p /usr/local/mysql8/data mkdir -p /usr/local/mysql8/log mkdir -p /usr/…...
docker打包container成image,然后将image上传到docker hub
第一步:停止正在运行的容器 docker stop <container_name> eg: docker stop xuanjie_mlir 第二步:将对应的container打包成image docker commit <container_id> <镜像名:版本> eg:docker commit 005672e6d97a…...
设计模式—创建型模式之原型模式
设计模式—创建型模式之原型模式 原型模式(Prototype Pattern)用于创建重复的对象,同时又能保证性能。 本体给外部提供一个克隆体进行使用。 比如我们做一个SjdwzMybatis,用来操作数据库,从数据库里面查出很多记录&…...
Zygote进程通信为什么用Socket而不是Binder?
Zygote进程是Android系统中的一个特殊进程,它在系统启动时被创建,并负责孵化其他应用进程。它的主要作用是预加载和共享应用进程的资源,以提高应用启动的速度。 在Android系统中,常用的进程通信方式有以下几种: Intent…...
API接口加密,解决自动化中登录问题
一、加密方式 AES:对称加密,快RAS:非对称加密,慢AESRAS:安全高效 加密过程:字符串》字节流》加密的字节流(算法),解密有可能出现乱码,所以不能直接转成字符…...
COCOS2DX3.17.2 Android升级targetSDK30问题解决方案
一、luajit不兼容问题 不兼容版本:【2.1.0-bate2、2.1.0-bate3都存在异常】 出问题系统:Android11;Android10的系统部分机型有问题,部分机型正常 异常点1:c调用lua接口,pushObjiect的时候crash 异常点2…...
HarmonyOS鸿蒙原生应用开发设计- 隐私声明
HarmonyOS设计文档中,为大家提供了独特的隐私声明,开发者可以根据需要直接引用。 开发者直接使用官方提供的隐私声明内容,既可以符合HarmonyOS原生应用的开发上架运营规范,又可以防止使用别人的内容产生的侵权意外情况等ÿ…...
【面试精选】00后卷王带你三天刷完软件测试面试八股文
前言 本人普通本科计算机专业,做测试也有3年的时间了,讲下我的经历,我刚毕业就进了一个小自研薪资还不错,有10.5k(个人觉得我很优秀),在里面呆了两年,积累了一些的经验和技能&#…...
k-means算法c++实现
计算数据集中的元素与各个簇的中心的距离,将它赋给最近的簇,然后重新计算每个簇的平均值,再将元素按离平均值点最近的原则重新分配直到没有出现重新分配 该算法要事先给出k的值,即划分为几个簇。 vector<int> datoclu(dat…...
oracle查询哪些用户下有表
oracle查询哪些用户下有表,排除系统用户。 在实际业务中 oracle数据库中创建了很多的用户 但实际都是无表的,利用SQL语句将这些有表的用户查询出来 并显示用户名、表名、创建表的时间等信息。 select * from dba_objects where object_type = TABLE and owner not in ( AN…...
机器人连杆惯量参数辨识(估计)
杆的转动惯量的计算公式是Imr^2。在经典力学中,转动惯量(又称质量惯性矩,简称惯矩)通常以I 或J表示,SI 单位为 kgm。对于一个质点,I mr,其中 m 是其质量,r 是质点和转轴的垂直距离。…...
一座 “数智桥梁”,华为助力“天堑变通途”
《水调歌头游泳》中的一句话,“一桥飞架南北,天堑变通途”,广为人们所熟知,其中展现出的,是中国人对美好出行的无限向往。 天堑变通途从来不易。 中国是当今世界上交通运输最繁忙、最快捷的国家之一,交通行…...
C#知识总结 基础篇(上)
本篇内容参考C#图解教程 本篇内容偏向基础,适合0基础的朋友快速上手,也适合有一定C语言(或其他语言如C,java)基础的人快速上手C#。同时适合unity引擎的初学者,更加详细的了解C#语言。 本文内容基本涵盖C#基础内容&am…...
照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版 2024年最新中文版下载
照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版是一款功能强大的专业级图像编辑软件,由Serif公司开发。它提供了广泛的工具和功能,适用于摄影师、设计师和艺术家。 照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版软件介绍 TIFF&#…...
TPAMI 2023 | Temporal Perceiver:通用时序边界检测方法
本文介绍一下今年我们组被T-PAMI 2023收录的时序边界检测工作 Temporal Perceiver: A General Architecture for Arbitrary Boundary Detection。 论文名称: Temporal Perceiver: A General Architecture for Arbitrary Boundary Detection 论文链接&…...
Unity-UV展开工具
using System.Collections; using System.Collections.Generic; using UnityEngine; using UnityEditor;public class unfold : EditorWindow {[MenuItem("Gq_Tools/展开")]public static void ShowWin(){EditorWindow.CreateInstance<unfold>().Show();}priva…...
springboot actuator jvm监控丢失
1、背景 系统接入了监控prometheus和grafana,某天grafana突然发现只有几台机器可以看到指标。 随便点击一个地址http://192.168.0.76:8681/lms/actuator/prometheus访问指标,发现JVM相关指标全部丢失 2、解决方法 从网上查找相关资料,逐一…...
UDP服务端和客户端通信代码开发流程
一、UDP通信 TCP:传输控制协议,面向连接的,稳定的,可靠的,安全的数据集流传递 稳定和可靠:丢包重传 数据有序:序号和确认序号 流量控制:稳定窗口 UDP:用户数据报协议 面向无连接的,不稳定的,不可靠,不安…...
数据库实验:SQL的数据定义与单表查询
目录 实验目的实验内容实验要求实验过程实验步骤实例代码结果示意 数据库的实验,对关系型数据库MySQL进行一些实际的操作 实验目的 (1) 掌握DBMS的数据定义功能 (2) 掌握SQL语言的数据定义语句 (3) 掌握RDBMS的数据单表查询功能 (4) 掌握SQL语言的数据单表查询语句…...
JoyCon-Driver:Windows平台上的Switch手柄完美解决方案
JoyCon-Driver:Windows平台上的Switch手柄完美解决方案 【免费下载链接】JoyCon-Driver A vJoy feeder for the Nintendo Switch JoyCons and Pro Controller 项目地址: https://gitcode.com/gh_mirrors/jo/JoyCon-Driver 还在为Nintendo Switch JoyCon控制器…...
8051单片机sbit位操作失效问题与volatile解决方案
1. 问题现象与背景解析在8051单片机开发中,我们经常需要对寄存器或内存中的特定位进行操作。Keil C51编译器提供了sbit关键字来实现位寻址功能,这是一种非常高效的位操作方式。但在实际开发中,不少工程师遇到过这样的困扰:明明在代…...
别再死记硬背了!用Python+Simulink仿真液压系统,帮你彻底搞懂帕斯卡原理和伯努利方程
用Python和Simulink仿真液压系统:从理论到实践的沉浸式学习 液压传动作为现代机械工程的核心技术之一,其理论基础往往让初学者望而生畏。帕斯卡原理、伯努利方程这些看似简单的公式背后,隐藏着复杂的物理现象和工程应用。传统的死记硬背方式不…...
Windows HEIC缩略图终极解决方案:3步解锁苹果照片完美预览
Windows HEIC缩略图终极解决方案:3步解锁苹果照片完美预览 【免费下载链接】windows-heic-thumbnails Enable Windows Explorer to display thumbnails for HEIC/HEIF files 项目地址: https://gitcode.com/gh_mirrors/wi/windows-heic-thumbnails 还在为iPh…...
从“寄生二极管”入手:用万用表二极管档快速判别NMOS/PMOS管脚与好坏
从“寄生二极管”入手:用万用表二极管档快速判别NMOS/PMOS管脚与好坏 当你面对一个没有任何标识的MOS管,或者怀疑电路板上的MOS管损坏时,如何快速准确地判断它是NMOS还是PMOS,并识别出D、S、G三个引脚?本文将详细介绍一…...
枚举进阶:从常量集合到业务逻辑承载者的实战扩展技巧
1. 项目概述:从“能用”到“好用”的枚举进阶之路在软件开发中,枚举(Enum)是我们再熟悉不过的基础工具了。它把一组有限的、具名的常量组织在一起,让代码意图更清晰,避免“魔法数字”满天飞。但不知道你有没…...
Windows HEIC缩略图解决方案:告别格式壁垒,实现跨平台无缝浏览
Windows HEIC缩略图解决方案:告别格式壁垒,实现跨平台无缝浏览 【免费下载链接】windows-heic-thumbnails Enable Windows Explorer to display thumbnails for HEIC/HEIF files 项目地址: https://gitcode.com/gh_mirrors/wi/windows-heic-thumbnails…...
嵌入式C语言单元测试实战:Unity框架入门与工程实践
1. 项目概述:为什么嵌入式开发也需要单元测试?在嵌入式开发领域,尤其是使用C语言进行单片机、RTOS或裸机程序开发时,我们常常陷入一种“烧录-看灯-调串口”的循环。代码逻辑稍微复杂一点,比如一个状态机或者一个协议解…...
前沿:小目标检测,YOLOv11n 再进化!
点击蓝字 关注我们 关注并星标 从此不迷路 计算机视觉研究院 公众号ID|计算机视觉研究院 学习群|扫码在主页获取加入方式 https://sensors.myu-group.co.jp/sm_pdf/SM4311.pdf 计算机视觉研究院专栏 Column of Computer Vision Institute 基于最新 YOLOv…...
G-Helper:华硕笔记本用户的终极轻量级硬件控制方案
G-Helper:华硕笔记本用户的终极轻量级硬件控制方案 【免费下载链接】g-helper Lightweight Armoury Crate alternative for Asus laptops with nearly the same functionality. Works with ROG Zephyrus, Flow, TUF, Strix, Scar, ProArt, Vivobook, Zenbook, Exper…...
