当前位置: 首页 > news >正文

flink的带状态的RichFlatMapFunction函数使用

背景

使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法

RichFlatMapFunction使用示例

下面的例子的输入是不用name下的count数量值,当本次name的数量和前一次name的数量相差超过配置的阈值100时,打印出来一条告警日志,详细代码如下:

package wikiedits.func.state;import java.util.Objects;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** Tuple2<String, Integer> 是输入的数据类型 String 是监控到异常值后的输出数据类型*/
public class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Integer>, String> {// 键值分区状态,对应每个name一个值ValueState<StateEntity> nameState;@Overridepublic void open(Configuration parameters) throws Exception {// 创建一个键值分区状态ValueStateDescriptor<StateEntity> state = new ValueStateDescriptor<>("nameState", StateEntity.class);nameState = getRuntimeContext().getState(state);}@Overridepublic void flatMap(Tuple2<String, Integer> input, Collector<String> collector) throws Exception {// 判断状态值是否为空(状态默认值是空)if (Objects.isNull(nameState.value())) {StateEntity sFalg = new StateEntity(input.f0, input.f1);nameState.update(sFalg);return;}// 和上一次的状态值比较StateEntity value = nameState.value();if (Math.abs(value.count - input.f1) > 100) {collector.collect(new String("监控到异常值,名称: " + input.f0 + " 上次的值:" + value + " 本次的值:" + input));}value.setName(input.f0);value.setCount(input.f1);// 更新状态值nameState.update(value);}}package wikiedits.func.state;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class RichFlatMapFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX,YYY,ZZZ三种nameString name = (0 == i % 3) ? "XXX" : ((i % 3 == 1) ? "YYY" : "ZZZ");int count = RandomUtils.nextInt(0, 1000);// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, count), timeStamp);// 每发射一次就延时1秒Thread.sleep(5000);}}@Overridepublic void cancel() {}});dataStream.keyBy((f) -> {return f.f0;}).flatMap(new MyRichFlatMapFunction()).print();env.execute();}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}

结果
在这里插入图片描述

相关文章:

flink的带状态的RichFlatMapFunction函数使用

背景 使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换&#xff0c;而且这种用法非常常见&#xff0c;根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换&#xff0c;本文就来简单举个例子说明下RichFlatMapFunction的使用方法 RichFl…...

MySQL的安装使用(入学篇)

目录 1 MySQL安装 1.1 安装epel源 1.2 安装MySQL Repository 1.3 安装MySQL官方yum源 1.4 安装服务端、客户端 1.5 启动MySQL服务 2 MySQL 使用 2.1 获取初始登录密码 2.2 登录MySQL数据库 2.3 修改密码 2.4 退出数据库 2.5 使用新密码登录数据库 2.6 重启数据库 2.7 创建数据…...

面试复习整理

redis持久化方式和原理 Redis持久化是指将Redis内存中的数据以某种形式保存到磁盘上&#xff0c;以保证在Redis重启后数据不会丢失。Redis支持两种持久化方式&#xff1a;RDB&#xff08;Redis DataBase&#xff09;和AOF&#xff08;Append Only File&#xff09;。 RDB持久…...

第四章 :Spring Boot 配置文件指南

第四章 :Spring Boot 配置文件 前言 本章知识重点:作者结合开发实际经验与应用场景结合,整理了5种获取配置属性的方式。配置文件中获取属性应该是SpringBoot开发中最为常用的功能之一,但是常用的功能,仍然有很多开发者在这个方面踩坑。通过本章节学习在实际中避免一些坑,…...

常用中间件分类

常见的中间件包括&#xff1a; 消息中间件&#xff1a;用于处理应用程序之间的异步消息传递&#xff0c;常见的消息中间件包括 RabbitMQ、Apache Kafka、ActiveMQ 等。 缓存中间件&#xff1a;用于缓存数据以加快访问速度&#xff0c;常见的缓存中间件包括 Redis、Memcached 等…...

中文编程软件视频推荐,自学编程电脑推荐,中文编程开发语言工具下载

中文编程软件视频推荐&#xff0c;自学编程电脑推荐&#xff0c;中文编程开发语言工具下载 给大家分享一款中文编程工具&#xff0c;零基础轻松学编程&#xff0c;不需英语基础&#xff0c;编程工具可下载。 这款工具不但可以连接部分硬件&#xff0c;而且可以开发大型的软件…...

Spring Boot 启动加速

一、简介 本文将带你了解如何通过调整 Spring 应用的配置、JVM 参数和使用 GraalVM 原生镜像来缩短 Spring Boot 的启动时间。 二、调整 Spring 应用 首先&#xff0c;创建一个 Spring Boot&#xff08;2.5.4&#xff09;应用&#xff0c;添加 Spring Web、Spring Actuator …...

UDP数据报文格式

...

软考-系统架构-2023-反思

2023年11月4日&#xff0c;参加了软考的高级架构设计考试。针对于这次考试做一些总结和反思。 我的考试准备周期非常长&#xff0c;但是实际的时间非常少。差不多一年前我就开始有这个计划和想法准备考试了&#xff0c;但是前期基本上就是翻翻书&#xff0c;跟没有开始区别并不…...

day52

今日内容概要 web应用程序 手写web框架(帮助我们理解别人写好的成熟框架、重点在于思路的理解、代码无需掌握) Django框架的学习 Python中得主流框架 框架的下载、安装、版本、怎么启动、怎么使用等 三板斧问题 web应用程序 Django框架是一款专门用来开发web应用的框架 …...

Mysql关联查询

Mysql关联查询 1、数据准备 # 班级表 create table class(id int primary key auto_increment,name varchar(20),description varchar(100) );# 学生表 create table student(id int primary key auto_increment,sn varchar(20),name varchar(20),email varchar(20),class_id…...

MOSFET和IGBT栅极驱动器TLP250H(D4-TP1,F)电路的基本原理

TLP250H&#xff0c;TLP250H(D4-TP1,F)是SOP8封装中的光电耦合器&#xff0c;由GaA组成ℓ作为红外发光二极管&#xff08;LED&#xff09;光学耦合到集成的高增益、高速光电探测器IC芯片。它在高达125℃的温度下提供有保证的性能和规格. TLP250H具有内部法拉第屏蔽&#xff0c;…...

Vue - Syntax Error: TypeError: this.getOptions is not a function 项目运行时报错,详细解决方案

报错问题 关于此问题网上的教程都无法解决,如果您的报错与本文相似,本文即可 100% 完美解决。 在 vue2.js 项目中,执行 npm run serve 运行时出现如下报错信息, Syntax Error: TypeError: this.getOptions is not a function 解决方案 按照以下步骤,即可完美解决。 这个错…...

C 语言类型转换

C 语言类型转换 类型转换允许我们将一种数据类型转换为另一种数据类型。在C语言中&#xff0c;我们使用强制转换运算符进行类型转换&#xff0c;用&#xff08;type&#xff09;表示。 语法&#xff1a; (type)value;注意&#xff1a;始终建议将较低的值转换为较高的值&…...

数据结构-链表的简单操作实现

目录 0.链表前序工作 1.构建出一个链表 2.展示链表中的所有存储数据 3.查找关键字key是否在链表中 4.求链表的长度 5.头插法 6.尾插法 7.插入任意位置&#xff08;规定第一个元素位置为0下标&#xff09; 8.删除第一次出现的值为key的关键字 9.删除所有值为key的关键字…...

竞赛选题 深度学习手势识别 - yolo python opencv cnn 机器视觉

文章目录 0 前言1 课题背景2 卷积神经网络2.1卷积层2.2 池化层2.3 激活函数2.4 全连接层2.5 使用tensorflow中keras模块实现卷积神经网络 3 YOLOV53.1 网络架构图3.2 输入端3.3 基准网络3.4 Neck网络3.5 Head输出层 4 数据集准备4.1 数据标注简介4.2 数据保存 5 模型训练5.1 修…...

【算法练习Day42】买卖股票的最佳时机 III买卖股票的最佳时机 IV

​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;练题 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录 买卖股票的最佳时机 III买卖…...

苹果手机如何备份通讯录?看完这篇就懂了!

如果遇到手机丢失或者出现故障的情况&#xff0c;通讯录备份可以避免联系人信息丢失。另外&#xff0c;当用户更换手机或者进行数据迁移时&#xff0c;提前备份好的通讯录数据可以快速还原到新设备上&#xff0c;避免了手动输入联系人的麻烦。苹果手机如何备份通讯录&#xff1…...

[yarn]yarn异常

一、运行一下算圆周率的测试代码&#xff0c;看下报错 cd /home/data_warehouse/module/hadoop-3.1.3/share/hadoop/mapreduce hadoop jar hadoop-mapreduce-examples-3.1.3.jar pi 1000 1000 后面2个数字参数的含义&#xff1a; 第1个1000指的是要运行1000次map任务 …...

C++ NULL 与nullptr 区别

在编写C程序的时候只看到过NULL&#xff0c;而在C的编程中&#xff0c;我们可以看到NULL和nullptr两种关键字&#xff0c;其实nullptr是C11版本中新加入的&#xff0c;它的出现是为了解决NULL表示空指针在C中具有二义性的问题。 一、C程序中的NULL 在C语言中&#xff0c;NULL…...

别再只用SMOD了!SAP采购订单屏幕增强:BADI与函数组MEPOBADIEX的深度解析与应用选择

SAP采购订单屏幕增强技术选型&#xff1a;BADI与SMOD的深度对比与实践指南 在SAP系统实施过程中&#xff0c;采购订单屏幕增强几乎是每个企业都会遇到的定制化需求。当标准功能无法满足业务需求时&#xff0c;开发者通常面临两种主流技术路径的选择&#xff1a;传统的SMOD用户出…...

GDPR+等保2.0双压之下,医疗PHP脱敏算法必须重构的7个信号,你中了几个?

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;GDPR与等保2.0双合规框架下的医疗数据脱敏新范式 在跨境医疗协作与多中心临床研究日益频繁的背景下&#xff0c;同时满足欧盟《通用数据保护条例》&#xff08;GDPR&#xff09;的“数据最小化”原则与…...

AI助手插件生态库:构建企业级AI编码助手工具箱

1. 项目概述&#xff1a;一个为AI编码助手打造的插件生态库如果你和我一样&#xff0c;每天都在和Claude Code、Cursor或者Gemini这类AI编码助手打交道&#xff0c;那你肯定也遇到过这样的时刻&#xff1a;助手很聪明&#xff0c;但总感觉它离你的日常工作流还差那么一点“默契…...

算完这笔账,我失眠了:单收入线 vs 双收入线,十年后差距100万

为什么“多一条收入线”是职场人最该掌握的技能不是让你辞职&#xff0c;是让你不怕被辞去年年底&#xff0c;我一个朋友被裁了。 他在一家互联网中厂做了五年&#xff0c;技术骨干&#xff0c;绩效一直不错。裁员的理由是“业务调整”&#xff0c;整个部门端掉。N1拿了大几万&…...

“薪资open”“不设上限”:谈薪资时HR的5种套路及反杀话术

“薪资open”“不设上限”&#xff1a;谈薪资时HR的5种套路及反杀话术亲身踩坑总结&#xff0c;学会至少多拿30%这几天好几个朋友找我吐槽&#xff1a;面试聊得挺好&#xff0c;一到谈薪就被HR拿捏得死死的。 “你期望多少&#xff1f;” “我们预算有限。” “先进来&#xff…...

ParroT框架实战:用指令与反馈数据驯化开源大模型,打造可控翻译助手

1. 项目概述&#xff1a;用“提示”与“反馈”驯化大语言模型&#xff0c;打造专属翻译助手 在机器翻译领域&#xff0c;我们正处在一个激动人心的十字路口。以ChatGPT、GPT-4为代表的大语言模型&#xff08;LLMs&#xff09;展现出了令人惊叹的对话和翻译能力&#xff0c;但它…...

SSV6155/6255 WiFi模块调试日记:手把手解决‘驱动装了但搜不到网’的问题

SSV6x5x WiFi模块深度排障指南&#xff1a;从硬件信号到软件配置的完整解决方案 当你在Linux环境下成功加载了SSV6155/6255 WiFi模块驱动&#xff0c;dmesg显示一切正常&#xff0c;但执行ifconfig wlan0 up后却搜不到任何网络——这种看似简单的问题背后往往隐藏着硬件、驱动、…...

PyTorch训练中断后恢复?手把手教你修复‘optimizer group size mismatch‘错误

PyTorch训练中断恢复实战&#xff1a;彻底解决优化器参数组不匹配问题 深夜的实验室里&#xff0c;显示器蓝光映照着你疲惫的脸庞——连续运行72小时的模型训练突然中断&#xff0c;而当你尝试从检查点恢复时&#xff0c;屏幕上赫然出现"optimizer group size mismatch&qu…...

Verl v0.2终极发布:无Critic强化学习框架如何让训练效率飙升300%?

Verl v0.2终极发布&#xff1a;无Critic强化学习框架如何让训练效率飙升300%&#xff1f; 【免费下载链接】verl verl/HybridFlow: A Flexible and Efficient RL Post-Training Framework 项目地址: https://gitcode.com/GitHub_Trending/ve/verl Verl作为一款灵活高效…...

WechatDecrypt:微信聊天记录解密技术全解析

WechatDecrypt&#xff1a;微信聊天记录解密技术全解析 【免费下载链接】WechatDecrypt 微信消息解密工具 项目地址: https://gitcode.com/gh_mirrors/we/WechatDecrypt 你是否曾经因为误删了重要的微信聊天记录而懊恼不已&#xff1f;或者想要备份那些珍贵的对话却无从…...