flink的带状态的RichFlatMapFunction函数使用
背景
使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法
RichFlatMapFunction使用示例
下面的例子的输入是不用name下的count数量值,当本次name的数量和前一次name的数量相差超过配置的阈值100时,打印出来一条告警日志,详细代码如下:
package wikiedits.func.state;import java.util.Objects;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** Tuple2<String, Integer> 是输入的数据类型 String 是监控到异常值后的输出数据类型*/
public class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Integer>, String> {// 键值分区状态,对应每个name一个值ValueState<StateEntity> nameState;@Overridepublic void open(Configuration parameters) throws Exception {// 创建一个键值分区状态ValueStateDescriptor<StateEntity> state = new ValueStateDescriptor<>("nameState", StateEntity.class);nameState = getRuntimeContext().getState(state);}@Overridepublic void flatMap(Tuple2<String, Integer> input, Collector<String> collector) throws Exception {// 判断状态值是否为空(状态默认值是空)if (Objects.isNull(nameState.value())) {StateEntity sFalg = new StateEntity(input.f0, input.f1);nameState.update(sFalg);return;}// 和上一次的状态值比较StateEntity value = nameState.value();if (Math.abs(value.count - input.f1) > 100) {collector.collect(new String("监控到异常值,名称: " + input.f0 + " 上次的值:" + value + " 本次的值:" + input));}value.setName(input.f0);value.setCount(input.f1);// 更新状态值nameState.update(value);}}package wikiedits.func.state;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class RichFlatMapFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX,YYY,ZZZ三种nameString name = (0 == i % 3) ? "XXX" : ((i % 3 == 1) ? "YYY" : "ZZZ");int count = RandomUtils.nextInt(0, 1000);// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, count), timeStamp);// 每发射一次就延时1秒Thread.sleep(5000);}}@Overridepublic void cancel() {}});dataStream.keyBy((f) -> {return f.f0;}).flatMap(new MyRichFlatMapFunction()).print();env.execute();}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}
结果

相关文章:
flink的带状态的RichFlatMapFunction函数使用
背景 使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法 RichFl…...
MySQL的安装使用(入学篇)
目录 1 MySQL安装 1.1 安装epel源 1.2 安装MySQL Repository 1.3 安装MySQL官方yum源 1.4 安装服务端、客户端 1.5 启动MySQL服务 2 MySQL 使用 2.1 获取初始登录密码 2.2 登录MySQL数据库 2.3 修改密码 2.4 退出数据库 2.5 使用新密码登录数据库 2.6 重启数据库 2.7 创建数据…...
面试复习整理
redis持久化方式和原理 Redis持久化是指将Redis内存中的数据以某种形式保存到磁盘上,以保证在Redis重启后数据不会丢失。Redis支持两种持久化方式:RDB(Redis DataBase)和AOF(Append Only File)。 RDB持久…...
第四章 :Spring Boot 配置文件指南
第四章 :Spring Boot 配置文件 前言 本章知识重点:作者结合开发实际经验与应用场景结合,整理了5种获取配置属性的方式。配置文件中获取属性应该是SpringBoot开发中最为常用的功能之一,但是常用的功能,仍然有很多开发者在这个方面踩坑。通过本章节学习在实际中避免一些坑,…...
常用中间件分类
常见的中间件包括: 消息中间件:用于处理应用程序之间的异步消息传递,常见的消息中间件包括 RabbitMQ、Apache Kafka、ActiveMQ 等。 缓存中间件:用于缓存数据以加快访问速度,常见的缓存中间件包括 Redis、Memcached 等…...
中文编程软件视频推荐,自学编程电脑推荐,中文编程开发语言工具下载
中文编程软件视频推荐,自学编程电脑推荐,中文编程开发语言工具下载 给大家分享一款中文编程工具,零基础轻松学编程,不需英语基础,编程工具可下载。 这款工具不但可以连接部分硬件,而且可以开发大型的软件…...
Spring Boot 启动加速
一、简介 本文将带你了解如何通过调整 Spring 应用的配置、JVM 参数和使用 GraalVM 原生镜像来缩短 Spring Boot 的启动时间。 二、调整 Spring 应用 首先,创建一个 Spring Boot(2.5.4)应用,添加 Spring Web、Spring Actuator …...
UDP数据报文格式
...
软考-系统架构-2023-反思
2023年11月4日,参加了软考的高级架构设计考试。针对于这次考试做一些总结和反思。 我的考试准备周期非常长,但是实际的时间非常少。差不多一年前我就开始有这个计划和想法准备考试了,但是前期基本上就是翻翻书,跟没有开始区别并不…...
day52
今日内容概要 web应用程序 手写web框架(帮助我们理解别人写好的成熟框架、重点在于思路的理解、代码无需掌握) Django框架的学习 Python中得主流框架 框架的下载、安装、版本、怎么启动、怎么使用等 三板斧问题 web应用程序 Django框架是一款专门用来开发web应用的框架 …...
Mysql关联查询
Mysql关联查询 1、数据准备 # 班级表 create table class(id int primary key auto_increment,name varchar(20),description varchar(100) );# 学生表 create table student(id int primary key auto_increment,sn varchar(20),name varchar(20),email varchar(20),class_id…...
MOSFET和IGBT栅极驱动器TLP250H(D4-TP1,F)电路的基本原理
TLP250H,TLP250H(D4-TP1,F)是SOP8封装中的光电耦合器,由GaA组成ℓ作为红外发光二极管(LED)光学耦合到集成的高增益、高速光电探测器IC芯片。它在高达125℃的温度下提供有保证的性能和规格. TLP250H具有内部法拉第屏蔽,…...
Vue - Syntax Error: TypeError: this.getOptions is not a function 项目运行时报错,详细解决方案
报错问题 关于此问题网上的教程都无法解决,如果您的报错与本文相似,本文即可 100% 完美解决。 在 vue2.js 项目中,执行 npm run serve 运行时出现如下报错信息, Syntax Error: TypeError: this.getOptions is not a function 解决方案 按照以下步骤,即可完美解决。 这个错…...
C 语言类型转换
C 语言类型转换 类型转换允许我们将一种数据类型转换为另一种数据类型。在C语言中,我们使用强制转换运算符进行类型转换,用(type)表示。 语法: (type)value;注意:始终建议将较低的值转换为较高的值&…...
数据结构-链表的简单操作实现
目录 0.链表前序工作 1.构建出一个链表 2.展示链表中的所有存储数据 3.查找关键字key是否在链表中 4.求链表的长度 5.头插法 6.尾插法 7.插入任意位置(规定第一个元素位置为0下标) 8.删除第一次出现的值为key的关键字 9.删除所有值为key的关键字…...
竞赛选题 深度学习手势识别 - yolo python opencv cnn 机器视觉
文章目录 0 前言1 课题背景2 卷积神经网络2.1卷积层2.2 池化层2.3 激活函数2.4 全连接层2.5 使用tensorflow中keras模块实现卷积神经网络 3 YOLOV53.1 网络架构图3.2 输入端3.3 基准网络3.4 Neck网络3.5 Head输出层 4 数据集准备4.1 数据标注简介4.2 数据保存 5 模型训练5.1 修…...
【算法练习Day42】买卖股票的最佳时机 III买卖股票的最佳时机 IV
📝个人主页:Sherry的成长之路 🏠学习社区:Sherry的成长之路(个人社区) 📖专栏链接:练题 🎯长路漫漫浩浩,万事皆有期待 文章目录 买卖股票的最佳时机 III买卖…...
苹果手机如何备份通讯录?看完这篇就懂了!
如果遇到手机丢失或者出现故障的情况,通讯录备份可以避免联系人信息丢失。另外,当用户更换手机或者进行数据迁移时,提前备份好的通讯录数据可以快速还原到新设备上,避免了手动输入联系人的麻烦。苹果手机如何备份通讯录࿱…...
[yarn]yarn异常
一、运行一下算圆周率的测试代码,看下报错 cd /home/data_warehouse/module/hadoop-3.1.3/share/hadoop/mapreduce hadoop jar hadoop-mapreduce-examples-3.1.3.jar pi 1000 1000 后面2个数字参数的含义: 第1个1000指的是要运行1000次map任务 …...
C++ NULL 与nullptr 区别
在编写C程序的时候只看到过NULL,而在C的编程中,我们可以看到NULL和nullptr两种关键字,其实nullptr是C11版本中新加入的,它的出现是为了解决NULL表示空指针在C中具有二义性的问题。 一、C程序中的NULL 在C语言中,NULL…...
Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...
【解密LSTM、GRU如何解决传统RNN梯度消失问题】
解密LSTM与GRU:如何让RNN变得更聪明? 在深度学习的世界里,循环神经网络(RNN)以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而,传统RNN存在的一个严重问题——梯度消失&#…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...
ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合
在汽车智能化的汹涌浪潮中,车辆不再仅仅是传统的交通工具,而是逐步演变为高度智能的移动终端。这一转变的核心支撑,来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒(T-Box)方案:NXP S32K146 与…...
DiscuzX3.5发帖json api
参考文章:PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下,适配我自己的需求 有一个站点存在多个采集站,我想通过主站拿标题,采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...
基于stm32F10x 系列微控制器的智能电子琴(附完整项目源码、详细接线及讲解视频)
注:文章末尾网盘链接中自取成品使用演示视频、项目源码、项目文档 所用硬件:STM32F103C8T6、无源蜂鸣器、44矩阵键盘、flash存储模块、OLED显示屏、RGB三色灯、面包板、杜邦线、usb转ttl串口 stm32f103c8t6 面包板 …...
基于 HTTP 的单向流式通信协议SSE详解
SSE(Server-Sent Events)详解 🧠 什么是 SSE? SSE(Server-Sent Events) 是 HTML5 标准中定义的一种通信机制,它允许服务器主动将事件推送给客户端(浏览器)。与传统的 H…...
Git 命令全流程总结
以下是从初始化到版本控制、查看记录、撤回操作的 Git 命令全流程总结,按操作场景分类整理: 一、初始化与基础操作 操作命令初始化仓库git init添加所有文件到暂存区git add .提交到本地仓库git commit -m "提交描述"首次提交需配置身份git c…...
