flink的带状态的RichFlatMapFunction函数使用
背景
使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法
RichFlatMapFunction使用示例
下面的例子的输入是不用name下的count数量值,当本次name的数量和前一次name的数量相差超过配置的阈值100时,打印出来一条告警日志,详细代码如下:
package wikiedits.func.state;import java.util.Objects;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** Tuple2<String, Integer> 是输入的数据类型 String 是监控到异常值后的输出数据类型*/
public class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Integer>, String> {// 键值分区状态,对应每个name一个值ValueState<StateEntity> nameState;@Overridepublic void open(Configuration parameters) throws Exception {// 创建一个键值分区状态ValueStateDescriptor<StateEntity> state = new ValueStateDescriptor<>("nameState", StateEntity.class);nameState = getRuntimeContext().getState(state);}@Overridepublic void flatMap(Tuple2<String, Integer> input, Collector<String> collector) throws Exception {// 判断状态值是否为空(状态默认值是空)if (Objects.isNull(nameState.value())) {StateEntity sFalg = new StateEntity(input.f0, input.f1);nameState.update(sFalg);return;}// 和上一次的状态值比较StateEntity value = nameState.value();if (Math.abs(value.count - input.f1) > 100) {collector.collect(new String("监控到异常值,名称: " + input.f0 + " 上次的值:" + value + " 本次的值:" + input));}value.setName(input.f0);value.setCount(input.f1);// 更新状态值nameState.update(value);}}package wikiedits.func.state;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class RichFlatMapFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX,YYY,ZZZ三种nameString name = (0 == i % 3) ? "XXX" : ((i % 3 == 1) ? "YYY" : "ZZZ");int count = RandomUtils.nextInt(0, 1000);// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, count), timeStamp);// 每发射一次就延时1秒Thread.sleep(5000);}}@Overridepublic void cancel() {}});dataStream.keyBy((f) -> {return f.f0;}).flatMap(new MyRichFlatMapFunction()).print();env.execute();}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}
结果
相关文章:

flink的带状态的RichFlatMapFunction函数使用
背景 使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法 RichFl…...

MySQL的安装使用(入学篇)
目录 1 MySQL安装 1.1 安装epel源 1.2 安装MySQL Repository 1.3 安装MySQL官方yum源 1.4 安装服务端、客户端 1.5 启动MySQL服务 2 MySQL 使用 2.1 获取初始登录密码 2.2 登录MySQL数据库 2.3 修改密码 2.4 退出数据库 2.5 使用新密码登录数据库 2.6 重启数据库 2.7 创建数据…...

面试复习整理
redis持久化方式和原理 Redis持久化是指将Redis内存中的数据以某种形式保存到磁盘上,以保证在Redis重启后数据不会丢失。Redis支持两种持久化方式:RDB(Redis DataBase)和AOF(Append Only File)。 RDB持久…...
第四章 :Spring Boot 配置文件指南
第四章 :Spring Boot 配置文件 前言 本章知识重点:作者结合开发实际经验与应用场景结合,整理了5种获取配置属性的方式。配置文件中获取属性应该是SpringBoot开发中最为常用的功能之一,但是常用的功能,仍然有很多开发者在这个方面踩坑。通过本章节学习在实际中避免一些坑,…...
常用中间件分类
常见的中间件包括: 消息中间件:用于处理应用程序之间的异步消息传递,常见的消息中间件包括 RabbitMQ、Apache Kafka、ActiveMQ 等。 缓存中间件:用于缓存数据以加快访问速度,常见的缓存中间件包括 Redis、Memcached 等…...

中文编程软件视频推荐,自学编程电脑推荐,中文编程开发语言工具下载
中文编程软件视频推荐,自学编程电脑推荐,中文编程开发语言工具下载 给大家分享一款中文编程工具,零基础轻松学编程,不需英语基础,编程工具可下载。 这款工具不但可以连接部分硬件,而且可以开发大型的软件…...
Spring Boot 启动加速
一、简介 本文将带你了解如何通过调整 Spring 应用的配置、JVM 参数和使用 GraalVM 原生镜像来缩短 Spring Boot 的启动时间。 二、调整 Spring 应用 首先,创建一个 Spring Boot(2.5.4)应用,添加 Spring Web、Spring Actuator …...

UDP数据报文格式
...
软考-系统架构-2023-反思
2023年11月4日,参加了软考的高级架构设计考试。针对于这次考试做一些总结和反思。 我的考试准备周期非常长,但是实际的时间非常少。差不多一年前我就开始有这个计划和想法准备考试了,但是前期基本上就是翻翻书,跟没有开始区别并不…...

day52
今日内容概要 web应用程序 手写web框架(帮助我们理解别人写好的成熟框架、重点在于思路的理解、代码无需掌握) Django框架的学习 Python中得主流框架 框架的下载、安装、版本、怎么启动、怎么使用等 三板斧问题 web应用程序 Django框架是一款专门用来开发web应用的框架 …...

Mysql关联查询
Mysql关联查询 1、数据准备 # 班级表 create table class(id int primary key auto_increment,name varchar(20),description varchar(100) );# 学生表 create table student(id int primary key auto_increment,sn varchar(20),name varchar(20),email varchar(20),class_id…...

MOSFET和IGBT栅极驱动器TLP250H(D4-TP1,F)电路的基本原理
TLP250H,TLP250H(D4-TP1,F)是SOP8封装中的光电耦合器,由GaA组成ℓ作为红外发光二极管(LED)光学耦合到集成的高增益、高速光电探测器IC芯片。它在高达125℃的温度下提供有保证的性能和规格. TLP250H具有内部法拉第屏蔽,…...

Vue - Syntax Error: TypeError: this.getOptions is not a function 项目运行时报错,详细解决方案
报错问题 关于此问题网上的教程都无法解决,如果您的报错与本文相似,本文即可 100% 完美解决。 在 vue2.js 项目中,执行 npm run serve 运行时出现如下报错信息, Syntax Error: TypeError: this.getOptions is not a function 解决方案 按照以下步骤,即可完美解决。 这个错…...
C 语言类型转换
C 语言类型转换 类型转换允许我们将一种数据类型转换为另一种数据类型。在C语言中,我们使用强制转换运算符进行类型转换,用(type)表示。 语法: (type)value;注意:始终建议将较低的值转换为较高的值&…...

数据结构-链表的简单操作实现
目录 0.链表前序工作 1.构建出一个链表 2.展示链表中的所有存储数据 3.查找关键字key是否在链表中 4.求链表的长度 5.头插法 6.尾插法 7.插入任意位置(规定第一个元素位置为0下标) 8.删除第一次出现的值为key的关键字 9.删除所有值为key的关键字…...

竞赛选题 深度学习手势识别 - yolo python opencv cnn 机器视觉
文章目录 0 前言1 课题背景2 卷积神经网络2.1卷积层2.2 池化层2.3 激活函数2.4 全连接层2.5 使用tensorflow中keras模块实现卷积神经网络 3 YOLOV53.1 网络架构图3.2 输入端3.3 基准网络3.4 Neck网络3.5 Head输出层 4 数据集准备4.1 数据标注简介4.2 数据保存 5 模型训练5.1 修…...

【算法练习Day42】买卖股票的最佳时机 III买卖股票的最佳时机 IV
📝个人主页:Sherry的成长之路 🏠学习社区:Sherry的成长之路(个人社区) 📖专栏链接:练题 🎯长路漫漫浩浩,万事皆有期待 文章目录 买卖股票的最佳时机 III买卖…...

苹果手机如何备份通讯录?看完这篇就懂了!
如果遇到手机丢失或者出现故障的情况,通讯录备份可以避免联系人信息丢失。另外,当用户更换手机或者进行数据迁移时,提前备份好的通讯录数据可以快速还原到新设备上,避免了手动输入联系人的麻烦。苹果手机如何备份通讯录࿱…...

[yarn]yarn异常
一、运行一下算圆周率的测试代码,看下报错 cd /home/data_warehouse/module/hadoop-3.1.3/share/hadoop/mapreduce hadoop jar hadoop-mapreduce-examples-3.1.3.jar pi 1000 1000 后面2个数字参数的含义: 第1个1000指的是要运行1000次map任务 …...
C++ NULL 与nullptr 区别
在编写C程序的时候只看到过NULL,而在C的编程中,我们可以看到NULL和nullptr两种关键字,其实nullptr是C11版本中新加入的,它的出现是为了解决NULL表示空指针在C中具有二义性的问题。 一、C程序中的NULL 在C语言中,NULL…...

JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...

23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
Spring是如何解决Bean的循环依赖:三级缓存机制
1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间互相持有对方引用,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...

人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...