flink的带状态的RichFlatMapFunction函数使用
背景
使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法
RichFlatMapFunction使用示例
下面的例子的输入是不用name下的count数量值,当本次name的数量和前一次name的数量相差超过配置的阈值100时,打印出来一条告警日志,详细代码如下:
package wikiedits.func.state;import java.util.Objects;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** Tuple2<String, Integer> 是输入的数据类型 String 是监控到异常值后的输出数据类型*/
public class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Integer>, String> {// 键值分区状态,对应每个name一个值ValueState<StateEntity> nameState;@Overridepublic void open(Configuration parameters) throws Exception {// 创建一个键值分区状态ValueStateDescriptor<StateEntity> state = new ValueStateDescriptor<>("nameState", StateEntity.class);nameState = getRuntimeContext().getState(state);}@Overridepublic void flatMap(Tuple2<String, Integer> input, Collector<String> collector) throws Exception {// 判断状态值是否为空(状态默认值是空)if (Objects.isNull(nameState.value())) {StateEntity sFalg = new StateEntity(input.f0, input.f1);nameState.update(sFalg);return;}// 和上一次的状态值比较StateEntity value = nameState.value();if (Math.abs(value.count - input.f1) > 100) {collector.collect(new String("监控到异常值,名称: " + input.f0 + " 上次的值:" + value + " 本次的值:" + input));}value.setName(input.f0);value.setCount(input.f1);// 更新状态值nameState.update(value);}}package wikiedits.func.state;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class RichFlatMapFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX,YYY,ZZZ三种nameString name = (0 == i % 3) ? "XXX" : ((i % 3 == 1) ? "YYY" : "ZZZ");int count = RandomUtils.nextInt(0, 1000);// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, count), timeStamp);// 每发射一次就延时1秒Thread.sleep(5000);}}@Overridepublic void cancel() {}});dataStream.keyBy((f) -> {return f.f0;}).flatMap(new MyRichFlatMapFunction()).print();env.execute();}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}
结果

相关文章:
flink的带状态的RichFlatMapFunction函数使用
背景 使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法 RichFl…...
MySQL的安装使用(入学篇)
目录 1 MySQL安装 1.1 安装epel源 1.2 安装MySQL Repository 1.3 安装MySQL官方yum源 1.4 安装服务端、客户端 1.5 启动MySQL服务 2 MySQL 使用 2.1 获取初始登录密码 2.2 登录MySQL数据库 2.3 修改密码 2.4 退出数据库 2.5 使用新密码登录数据库 2.6 重启数据库 2.7 创建数据…...
面试复习整理
redis持久化方式和原理 Redis持久化是指将Redis内存中的数据以某种形式保存到磁盘上,以保证在Redis重启后数据不会丢失。Redis支持两种持久化方式:RDB(Redis DataBase)和AOF(Append Only File)。 RDB持久…...
第四章 :Spring Boot 配置文件指南
第四章 :Spring Boot 配置文件 前言 本章知识重点:作者结合开发实际经验与应用场景结合,整理了5种获取配置属性的方式。配置文件中获取属性应该是SpringBoot开发中最为常用的功能之一,但是常用的功能,仍然有很多开发者在这个方面踩坑。通过本章节学习在实际中避免一些坑,…...
常用中间件分类
常见的中间件包括: 消息中间件:用于处理应用程序之间的异步消息传递,常见的消息中间件包括 RabbitMQ、Apache Kafka、ActiveMQ 等。 缓存中间件:用于缓存数据以加快访问速度,常见的缓存中间件包括 Redis、Memcached 等…...
中文编程软件视频推荐,自学编程电脑推荐,中文编程开发语言工具下载
中文编程软件视频推荐,自学编程电脑推荐,中文编程开发语言工具下载 给大家分享一款中文编程工具,零基础轻松学编程,不需英语基础,编程工具可下载。 这款工具不但可以连接部分硬件,而且可以开发大型的软件…...
Spring Boot 启动加速
一、简介 本文将带你了解如何通过调整 Spring 应用的配置、JVM 参数和使用 GraalVM 原生镜像来缩短 Spring Boot 的启动时间。 二、调整 Spring 应用 首先,创建一个 Spring Boot(2.5.4)应用,添加 Spring Web、Spring Actuator …...
UDP数据报文格式
...
软考-系统架构-2023-反思
2023年11月4日,参加了软考的高级架构设计考试。针对于这次考试做一些总结和反思。 我的考试准备周期非常长,但是实际的时间非常少。差不多一年前我就开始有这个计划和想法准备考试了,但是前期基本上就是翻翻书,跟没有开始区别并不…...
day52
今日内容概要 web应用程序 手写web框架(帮助我们理解别人写好的成熟框架、重点在于思路的理解、代码无需掌握) Django框架的学习 Python中得主流框架 框架的下载、安装、版本、怎么启动、怎么使用等 三板斧问题 web应用程序 Django框架是一款专门用来开发web应用的框架 …...
Mysql关联查询
Mysql关联查询 1、数据准备 # 班级表 create table class(id int primary key auto_increment,name varchar(20),description varchar(100) );# 学生表 create table student(id int primary key auto_increment,sn varchar(20),name varchar(20),email varchar(20),class_id…...
MOSFET和IGBT栅极驱动器TLP250H(D4-TP1,F)电路的基本原理
TLP250H,TLP250H(D4-TP1,F)是SOP8封装中的光电耦合器,由GaA组成ℓ作为红外发光二极管(LED)光学耦合到集成的高增益、高速光电探测器IC芯片。它在高达125℃的温度下提供有保证的性能和规格. TLP250H具有内部法拉第屏蔽,…...
Vue - Syntax Error: TypeError: this.getOptions is not a function 项目运行时报错,详细解决方案
报错问题 关于此问题网上的教程都无法解决,如果您的报错与本文相似,本文即可 100% 完美解决。 在 vue2.js 项目中,执行 npm run serve 运行时出现如下报错信息, Syntax Error: TypeError: this.getOptions is not a function 解决方案 按照以下步骤,即可完美解决。 这个错…...
C 语言类型转换
C 语言类型转换 类型转换允许我们将一种数据类型转换为另一种数据类型。在C语言中,我们使用强制转换运算符进行类型转换,用(type)表示。 语法: (type)value;注意:始终建议将较低的值转换为较高的值&…...
数据结构-链表的简单操作实现
目录 0.链表前序工作 1.构建出一个链表 2.展示链表中的所有存储数据 3.查找关键字key是否在链表中 4.求链表的长度 5.头插法 6.尾插法 7.插入任意位置(规定第一个元素位置为0下标) 8.删除第一次出现的值为key的关键字 9.删除所有值为key的关键字…...
竞赛选题 深度学习手势识别 - yolo python opencv cnn 机器视觉
文章目录 0 前言1 课题背景2 卷积神经网络2.1卷积层2.2 池化层2.3 激活函数2.4 全连接层2.5 使用tensorflow中keras模块实现卷积神经网络 3 YOLOV53.1 网络架构图3.2 输入端3.3 基准网络3.4 Neck网络3.5 Head输出层 4 数据集准备4.1 数据标注简介4.2 数据保存 5 模型训练5.1 修…...
【算法练习Day42】买卖股票的最佳时机 III买卖股票的最佳时机 IV
📝个人主页:Sherry的成长之路 🏠学习社区:Sherry的成长之路(个人社区) 📖专栏链接:练题 🎯长路漫漫浩浩,万事皆有期待 文章目录 买卖股票的最佳时机 III买卖…...
苹果手机如何备份通讯录?看完这篇就懂了!
如果遇到手机丢失或者出现故障的情况,通讯录备份可以避免联系人信息丢失。另外,当用户更换手机或者进行数据迁移时,提前备份好的通讯录数据可以快速还原到新设备上,避免了手动输入联系人的麻烦。苹果手机如何备份通讯录࿱…...
[yarn]yarn异常
一、运行一下算圆周率的测试代码,看下报错 cd /home/data_warehouse/module/hadoop-3.1.3/share/hadoop/mapreduce hadoop jar hadoop-mapreduce-examples-3.1.3.jar pi 1000 1000 后面2个数字参数的含义: 第1个1000指的是要运行1000次map任务 …...
C++ NULL 与nullptr 区别
在编写C程序的时候只看到过NULL,而在C的编程中,我们可以看到NULL和nullptr两种关键字,其实nullptr是C11版本中新加入的,它的出现是为了解决NULL表示空指针在C中具有二义性的问题。 一、C程序中的NULL 在C语言中,NULL…...
别再只用SMOD了!SAP采购订单屏幕增强:BADI与函数组MEPOBADIEX的深度解析与应用选择
SAP采购订单屏幕增强技术选型:BADI与SMOD的深度对比与实践指南 在SAP系统实施过程中,采购订单屏幕增强几乎是每个企业都会遇到的定制化需求。当标准功能无法满足业务需求时,开发者通常面临两种主流技术路径的选择:传统的SMOD用户出…...
GDPR+等保2.0双压之下,医疗PHP脱敏算法必须重构的7个信号,你中了几个?
更多请点击: https://intelliparadigm.com 第一章:GDPR与等保2.0双合规框架下的医疗数据脱敏新范式 在跨境医疗协作与多中心临床研究日益频繁的背景下,同时满足欧盟《通用数据保护条例》(GDPR)的“数据最小化”原则与…...
AI助手插件生态库:构建企业级AI编码助手工具箱
1. 项目概述:一个为AI编码助手打造的插件生态库如果你和我一样,每天都在和Claude Code、Cursor或者Gemini这类AI编码助手打交道,那你肯定也遇到过这样的时刻:助手很聪明,但总感觉它离你的日常工作流还差那么一点“默契…...
算完这笔账,我失眠了:单收入线 vs 双收入线,十年后差距100万
为什么“多一条收入线”是职场人最该掌握的技能不是让你辞职,是让你不怕被辞去年年底,我一个朋友被裁了。 他在一家互联网中厂做了五年,技术骨干,绩效一直不错。裁员的理由是“业务调整”,整个部门端掉。N1拿了大几万&…...
“薪资open”“不设上限”:谈薪资时HR的5种套路及反杀话术
“薪资open”“不设上限”:谈薪资时HR的5种套路及反杀话术亲身踩坑总结,学会至少多拿30%这几天好几个朋友找我吐槽:面试聊得挺好,一到谈薪就被HR拿捏得死死的。 “你期望多少?” “我们预算有限。” “先进来ÿ…...
ParroT框架实战:用指令与反馈数据驯化开源大模型,打造可控翻译助手
1. 项目概述:用“提示”与“反馈”驯化大语言模型,打造专属翻译助手 在机器翻译领域,我们正处在一个激动人心的十字路口。以ChatGPT、GPT-4为代表的大语言模型(LLMs)展现出了令人惊叹的对话和翻译能力,但它…...
SSV6155/6255 WiFi模块调试日记:手把手解决‘驱动装了但搜不到网’的问题
SSV6x5x WiFi模块深度排障指南:从硬件信号到软件配置的完整解决方案 当你在Linux环境下成功加载了SSV6155/6255 WiFi模块驱动,dmesg显示一切正常,但执行ifconfig wlan0 up后却搜不到任何网络——这种看似简单的问题背后往往隐藏着硬件、驱动、…...
PyTorch训练中断后恢复?手把手教你修复‘optimizer group size mismatch‘错误
PyTorch训练中断恢复实战:彻底解决优化器参数组不匹配问题 深夜的实验室里,显示器蓝光映照着你疲惫的脸庞——连续运行72小时的模型训练突然中断,而当你尝试从检查点恢复时,屏幕上赫然出现"optimizer group size mismatch&qu…...
Verl v0.2终极发布:无Critic强化学习框架如何让训练效率飙升300%?
Verl v0.2终极发布:无Critic强化学习框架如何让训练效率飙升300%? 【免费下载链接】verl verl/HybridFlow: A Flexible and Efficient RL Post-Training Framework 项目地址: https://gitcode.com/GitHub_Trending/ve/verl Verl作为一款灵活高效…...
WechatDecrypt:微信聊天记录解密技术全解析
WechatDecrypt:微信聊天记录解密技术全解析 【免费下载链接】WechatDecrypt 微信消息解密工具 项目地址: https://gitcode.com/gh_mirrors/we/WechatDecrypt 你是否曾经因为误删了重要的微信聊天记录而懊恼不已?或者想要备份那些珍贵的对话却无从…...
