当前位置: 首页 > news >正文

Flink 统计接入的数据量-滚动窗口和状态的使用

1、概述

在生产场景值,经常需要和上游、下游对数,离线场景可以直接 group by 再 count ,但是实时场景中,如果使用 kafka 作为中间件,中间经过几个 job 的过滤转化后,再对照像 Doris 或 Clickhouse 中最终层的数据,如果出现缺失,很难判断是哪一层缺失的。

2、使用 侧流输出+处理时间的滚动窗口+状态进行数据量级统计

package com.flink.feature.windowcount;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** 1、先输入数据* 1* 1* 1* 1* 输出结果* main=>:8> 业务处理=>1* main=>:1> 业务处理=>1* main=>:2> 业务处理=>1* main=>:3> 业务处理=>1* 每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)** 2、再输入数据* 1* 2* 2* 3* 3* 4* 4* 输出结果* main=>:4> 业务处理=>1* main=>:5> 业务处理=>2* main=>:6> 业务处理=>2* main=>:7> 业务处理=>3* main=>:8> 业务处理=>3* main=>:1> 业务处理=>4* main=>:2> 业务处理=>4* 每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)* 每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)*/public class UseWindowValidateData {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();OutputTag<Tuple2<String,Integer>> windowCountTag = new OutputTag<Tuple2<String,Integer>>("window_count"){};DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String input, ProcessFunction<String, String>.Context ctx, Collector<String> collector) throws Exception {ctx.output(windowCountTag,new Tuple2<>("窗口统计=>"+input,1));collector.collect("业务处理=>" + input);}});process.getSideOutput(windowCountTag).keyBy(new KeySelector<Tuple2<String,Integer>, String>() {@Overridepublic String getKey(Tuple2<String,Integer> tp) throws Exception {return tp.f0;}}).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String,String,String,Integer>, String, TimeWindow>() {private MapState<String, Integer> mapState;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Integer> stateDescriptor = new MapStateDescriptor<>("map-state", String.class, Integer.class);mapState = getRuntimeContext().getMapState(stateDescriptor);}@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String, String, String, Integer>, String, TimeWindow>.Context ctx, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple4<String, String, String, Integer>> out) throws Exception {for (Tuple2<String, Integer> tp : elements) {Integer res = mapState.get(tp.f0);if (res == null) {res = 0;}res += 1;mapState.put(tp.f0, res);}out.collect(new Tuple4<>(String.valueOf(ctx.window().getStart()),String.valueOf(ctx.window().getEnd()),key,mapState.get(key)));// 每个窗口计算后清空状态mapState.clear();}}).print("每10秒每个key接受到的数据量=>");process.print("main=>");env.execute();}
}

3、测试结果

1)先输入数据

1
1
1
1

输出结果

main=>:8> 业务处理=>1
main=>:1> 业务处理=>1
main=>:2> 业务处理=>1
main=>:3> 业务处理=>1

每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)

2)再输入数据

1
2
2
3
3
4
4

输出结果

main=>:4> 业务处理=>1
main=>:5> 业务处理=>2
main=>:6> 业务处理=>2
main=>:7> 业务处理=>3
main=>:8> 业务处理=>3
main=>:1> 业务处理=>4
main=>:2> 业务处理=>4

每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)
每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)

相关文章:

Flink 统计接入的数据量-滚动窗口和状态的使用

1、概述 在生产场景值&#xff0c;经常需要和上游、下游对数&#xff0c;离线场景可以直接 group by 再 count &#xff0c;但是实时场景中&#xff0c;如果使用 kafka 作为中间件&#xff0c;中间经过几个 job 的过滤转化后&#xff0c;再对照像 Doris 或 Clickhouse 中最终层…...

SpringBoot快速整合canal1.1.5(TCP模式)

SpringBoot快速整合canal1.1.5&#xff08;TCP模式&#xff09; 安装并配置MySQL主从⭐ 1&#xff1a;Docker安装MySQL8.0.28 docker pull mysql:8.0.282&#xff1a;创建目录&#xff1a; mkdir -p /usr/local/mysql8/data mkdir -p /usr/local/mysql8/log mkdir -p /usr/…...

docker打包container成image,然后将image上传到docker hub

第一步&#xff1a;停止正在运行的容器 docker stop <container_name> eg: docker stop xuanjie_mlir 第二步&#xff1a;将对应的container打包成image docker commit <container_id> <镜像名&#xff1a;版本> eg&#xff1a;docker commit 005672e6d97a…...

设计模式—创建型模式之原型模式

设计模式—创建型模式之原型模式 原型模式&#xff08;Prototype Pattern&#xff09;用于创建重复的对象&#xff0c;同时又能保证性能。 本体给外部提供一个克隆体进行使用。 比如我们做一个SjdwzMybatis&#xff0c;用来操作数据库&#xff0c;从数据库里面查出很多记录&…...

Zygote进程通信为什么用Socket而不是Binder?

Zygote进程是Android系统中的一个特殊进程&#xff0c;它在系统启动时被创建&#xff0c;并负责孵化其他应用进程。它的主要作用是预加载和共享应用进程的资源&#xff0c;以提高应用启动的速度。 在Android系统中&#xff0c;常用的进程通信方式有以下几种&#xff1a; Intent…...

API接口加密,解决自动化中登录问题

一、加密方式 AES&#xff1a;对称加密&#xff0c;快RAS&#xff1a;非对称加密&#xff0c;慢AESRAS&#xff1a;安全高效 加密过程&#xff1a;字符串》字节流》加密的字节流&#xff08;算法&#xff09;&#xff0c;解密有可能出现乱码&#xff0c;所以不能直接转成字符…...

COCOS2DX3.17.2 Android升级targetSDK30问题解决方案

一、luajit不兼容问题 不兼容版本&#xff1a;【2.1.0-bate2、2.1.0-bate3都存在异常】 出问题系统&#xff1a;Android11&#xff1b;Android10的系统部分机型有问题&#xff0c;部分机型正常 异常点1&#xff1a;c调用lua接口&#xff0c;pushObjiect的时候crash 异常点2…...

HarmonyOS鸿蒙原生应用开发设计- 隐私声明

HarmonyOS设计文档中&#xff0c;为大家提供了独特的隐私声明&#xff0c;开发者可以根据需要直接引用。 开发者直接使用官方提供的隐私声明内容&#xff0c;既可以符合HarmonyOS原生应用的开发上架运营规范&#xff0c;又可以防止使用别人的内容产生的侵权意外情况等&#xff…...

【面试精选】00后卷王带你三天刷完软件测试面试八股文

前言 本人普通本科计算机专业&#xff0c;做测试也有3年的时间了&#xff0c;讲下我的经历&#xff0c;我刚毕业就进了一个小自研薪资还不错&#xff0c;有10.5k&#xff08;个人觉得我很优秀&#xff09;&#xff0c;在里面呆了两年&#xff0c;积累了一些的经验和技能&#…...

k-means算法c++实现

计算数据集中的元素与各个簇的中心的距离&#xff0c;将它赋给最近的簇&#xff0c;然后重新计算每个簇的平均值&#xff0c;再将元素按离平均值点最近的原则重新分配直到没有出现重新分配 该算法要事先给出k的值&#xff0c;即划分为几个簇。 vector<int> datoclu(dat…...

oracle查询哪些用户下有表

oracle查询哪些用户下有表,排除系统用户。 在实际业务中 oracle数据库中创建了很多的用户 但实际都是无表的,利用SQL语句将这些有表的用户查询出来 并显示用户名、表名、创建表的时间等信息。 select * from dba_objects where object_type = TABLE and owner not in ( AN…...

机器人连杆惯量参数辨识(估计)

杆的转动惯量的计算公式是Imr^2。在经典力学中&#xff0c;转动惯量&#xff08;又称质量惯性矩&#xff0c;简称惯矩&#xff09;通常以I 或J表示&#xff0c;SI 单位为 kgm。对于一个质点&#xff0c;I mr&#xff0c;其中 m 是其质量&#xff0c;r 是质点和转轴的垂直距离。…...

一座 “数智桥梁”,华为助力“天堑变通途”

《水调歌头游泳》中的一句话&#xff0c;“一桥飞架南北&#xff0c;天堑变通途”&#xff0c;广为人们所熟知&#xff0c;其中展现出的&#xff0c;是中国人对美好出行的无限向往。 天堑变通途从来不易。 中国是当今世界上交通运输最繁忙、最快捷的国家之一&#xff0c;交通行…...

C#知识总结 基础篇(上)

本篇内容参考C#图解教程 本篇内容偏向基础&#xff0c;适合0基础的朋友快速上手&#xff0c;也适合有一定C语言&#xff08;或其他语言如C,java&#xff09;基础的人快速上手C#。同时适合unity引擎的初学者&#xff0c;更加详细的了解C#语言。 本文内容基本涵盖C#基础内容&am…...

照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版 2024年最新中文版下载

照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版是一款功能强大的专业级图像编辑软件&#xff0c;由Serif公司开发。它提供了广泛的工具和功能&#xff0c;适用于摄影师、设计师和艺术家。 照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版软件介绍 TIFF&#…...

TPAMI 2023 | Temporal Perceiver:通用时序边界检测方法

本文介绍一下今年我们组被T-PAMI 2023收录的时序边界检测工作 Temporal Perceiver: A General Architecture for Arbitrary Boundary Detection。​​​​​​​ 论文名称&#xff1a; Temporal Perceiver: A General Architecture for Arbitrary Boundary Detection 论文链接&…...

Unity-UV展开工具

using System.Collections; using System.Collections.Generic; using UnityEngine; using UnityEditor;public class unfold : EditorWindow {[MenuItem("Gq_Tools/展开")]public static void ShowWin(){EditorWindow.CreateInstance<unfold>().Show();}priva…...

springboot actuator jvm监控丢失

1、背景 系统接入了监控prometheus和grafana&#xff0c;某天grafana突然发现只有几台机器可以看到指标。 随便点击一个地址http://192.168.0.76:8681/lms/actuator/prometheus访问指标&#xff0c;发现JVM相关指标全部丢失 2、解决方法 从网上查找相关资料&#xff0c;逐一…...

UDP服务端和客户端通信代码开发流程

一、UDP通信 TCP&#xff1a;传输控制协议&#xff0c;面向连接的&#xff0c;稳定的&#xff0c;可靠的&#xff0c;安全的数据集流传递 稳定和可靠:丢包重传 数据有序:序号和确认序号 流量控制:稳定窗口 UDP&#xff1a;用户数据报协议 面向无连接的,不稳定的,不可靠,不安…...

数据库实验:SQL的数据定义与单表查询

目录 实验目的实验内容实验要求实验过程实验步骤实例代码结果示意 数据库的实验&#xff0c;对关系型数据库MySQL进行一些实际的操作 实验目的 (1) 掌握DBMS的数据定义功能 (2) 掌握SQL语言的数据定义语句 (3) 掌握RDBMS的数据单表查询功能 (4) 掌握SQL语言的数据单表查询语句…...

大数据领域Hive与Spark的结合使用案例

大数据领域Hive与Spark的结合使用案例 关键词:Hive、Spark、大数据处理、数据仓库、分布式计算、ETL、数据分析 摘要:在大数据技术栈中,Hive作为基于Hadoop的数据仓库工具,擅长海量数据的存储与离线分析;Spark作为高性能分布式计算引擎,在复杂数据处理和实时计算领域表现…...

StructuredTaskScope配置不生效?揭秘ClassLoader隔离、虚拟线程绑定与作用域传播的3层断点排查法

第一章&#xff1a;StructuredTaskScope配置不生效&#xff1f;揭秘ClassLoader隔离、虚拟线程绑定与作用域传播的3层断点排查法当使用 Java 21 的 StructuredTaskScope 时&#xff0c;常见现象是&#xff1a;明明调用了 scope.fork() 并设置了自定义上下文&#xff08;如 MDC、…...

无损视频剪辑终极指南:如何用LosslessCut保持原始画质快速编辑

无损视频剪辑终极指南&#xff1a;如何用LosslessCut保持原始画质快速编辑 【免费下载链接】lossless-cut The swiss army knife of lossless video/audio editing 项目地址: https://gitcode.com/gh_mirrors/lo/lossless-cut 在视频编辑的世界里&#xff0c;质量与速度…...

Wan2.2-I2V-A14B惊艳案例:动态水墨山水+古风人物行走10秒视频生成

Wan2.2-I2V-A14B惊艳案例&#xff1a;动态水墨山水古风人物行走10秒视频生成 1. 开篇&#xff1a;当AI遇见传统水墨艺术 想象一下&#xff0c;你只需要输入一段文字描述&#xff0c;就能让AI生成一段10秒的动态水墨山水视频&#xff0c;画中还有古风人物悠然行走。这不是科幻…...

EcomGPT-中英文-7B电商模型实战:基于YOLOv8的商品图像识别与文案生成联动

EcomGPT-中英文-7B电商模型实战&#xff1a;基于YOLOv8的商品图像识别与文案生成联动 1. 引言 想象一下这个场景&#xff1a;你正在看一场电商直播&#xff0c;主播语速飞快地介绍着几十款商品。你刚对其中一款水杯产生兴趣&#xff0c;还没来得及问材质和容量&#xff0c;画…...

RWKV7-1.5B-g1a部署教程:CSDN平台外网域名(gpu-guyeohq1so-7860)配置要点

RWKV7-1.5B-g1a部署教程&#xff1a;CSDN平台外网域名&#xff08;gpu-guyeohq1so-7860&#xff09;配置要点 1. 模型简介 rwkv7-1.5B-g1a是基于RWKV-7架构的多语言文本生成模型&#xff0c;特别适合以下应用场景&#xff1a; 基础问答文案续写简短总结轻量中文对话 2. 环境…...

4步完成Axure本地化设置:让新手轻松上手的中文界面方案

4步完成Axure本地化设置&#xff1a;让新手轻松上手的中文界面方案 【免费下载链接】axure-cn Chinese language file for Axure RP. Axure RP 简体中文语言包&#xff0c;不定期更新。支持 Axure 9、Axure 10。 项目地址: https://gitcode.com/gh_mirrors/ax/axure-cn …...

实战分享:如何用Altium Designer高效搞定PCB的定位孔、散热孔和屏蔽孔?

Altium Designer实战&#xff1a;PCB定位孔、散热孔与屏蔽孔的高效设计指南 在PCB设计领域&#xff0c;机械孔的设计往往被工程师视为"简单任务"而草率处理&#xff0c;直到量产时才发现定位偏差、散热不足或EMI超标等问题。作为从业十年的硬件设计师&#xff0c;我曾…...

从‘梯度裁剪’到‘权重初始化’:一份预防梯度爆炸的PyTorch/TensorFlow实操清单

从‘梯度裁剪’到‘权重初始化’&#xff1a;一份预防梯度爆炸的PyTorch/TensorFlow实操清单 训练深度神经网络时&#xff0c;梯度爆炸问题就像一颗定时炸弹——它可能在你最意想不到的时候突然引爆&#xff0c;导致损失函数值瞬间变为NaN&#xff0c;或者权重更新出现剧烈震荡…...

3分钟夺回你的数字音乐资产:Unlock Music浏览器解密全攻略 [特殊字符]

3分钟夺回你的数字音乐资产&#xff1a;Unlock Music浏览器解密全攻略 &#x1f3b5; 【免费下载链接】unlock-music 在浏览器中解锁加密的音乐文件。原仓库&#xff1a; 1. https://github.com/unlock-music/unlock-music &#xff1b;2. https://git.unlock-music.dev/um/web…...