当前位置: 首页 > news >正文

flink判断两个事件之间有没有超时(不使用CEP)

1.为啥不使用cep呢,cep的超时时间设置不好配置化,无法满足扩展要求

2.超时怎么界定。A事件发生后,过了N时间,还没有收到B事件,算超时。

代码如下:


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;@Slf4j
public class AsyncModelTimeoutHandler extends KeyedProcessFunction<String, JSONObject, JSONObject> {private static final long serialVersionUID = -61608451659272532L;private transient ValueState<Long> firstDataTime;private transient ValueState<Long> secondDataTime;private transient ValueState<String> eventType;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Long> firstDataDescriptor = new ValueStateDescriptor<>("firstDataTime", Long.class);firstDataTime = getRuntimeContext().getState(firstDataDescriptor);ValueStateDescriptor<Long> secondDataDescriptor = new ValueStateDescriptor<>("secondDataTime", Long.class);secondDataTime = getRuntimeContext().getState(secondDataDescriptor);ValueStateDescriptor<String> eventTypeDescriptor = new ValueStateDescriptor<>("eventType", String.class);eventType = getRuntimeContext().getState(eventTypeDescriptor);}@Overridepublic void processElement(JSONObject value, KeyedProcessFunction<String, JSONObject, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {Long currentTimestamp = value.getLong("ts");if (value.containsKey("timeout")) {//异步请求消息long timeout = value.getLong("timeout");firstDataTime.update(currentTimestamp + timeout);eventType.update(value.getString("event"));ctx.timerService().registerProcessingTimeTimer(currentTimestamp + timeout);} else {secondDataTime.update(currentTimestamp);}}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<String, JSONObject, JSONObject>.OnTimerContext ctx, Collector<JSONObject> out) throws Exception {Long firstTime = firstDataTime.value();Long lastTime = secondDataTime.value();if (lastTime == null || (firstTime != null && lastTime >= firstTime)) {//超时了log.info("AsyncModelTimeoutHandler onTimer handle triggerTime={}, firstTime={}, secondTime={},key={}", timestamp, firstTime, lastTime, ctx.getCurrentKey());JSONObject r = new JSONObject();r.put("id", ctx.getCurrentKey());r.put("judgeTime", timestamp);r.put("event", eventType.value());out.collect(r);}firstDataTime.clear();secondDataTime.clear();eventType.clear();}
}

相关文章:

flink判断两个事件之间有没有超时(不使用CEP)

1.为啥不使用cep呢&#xff0c;cep的超时时间设置不好配置化&#xff0c;无法满足扩展要求 2.超时怎么界定。A事件发生后&#xff0c;过了N时间&#xff0c;还没有收到B事件&#xff0c;算超时。 代码如下&#xff1a; import com.alibaba.fastjson.JSONObject; import lombo…...

二级C语言题解:十进制转其他进制、非素数求和、重复数统计

目录 一、程序填空&#x1f4dd; --- 十进制转其他进制 题目&#x1f4c3; 分析&#x1f9d0; 二、程序修改&#x1f6e0;️ --- 非素数求和 题目&#x1f4c3; 分析&#x1f9d0; 三、程序设计&#x1f4bb; --- 重复数统计 题目&#x1f4c3; 分析&#x1f9d0; 前言…...

打家劫舍3

今天和打家讲一下打家劫舍3 题目&#xff1a; 题目链接&#xff1a;337. 打家劫舍 III - 力扣&#xff08;LeetCode&#xff09; 小偷又发现了一个新的可行窃的地区。这个地区只有一个入口&#xff0c;我们称之为root。 除了 root 之外&#xff0c;每栋房子有且只有一个“父“…...

练习题(2025.2.9)

题目背景 “咚咚咚……”“查水表&#xff01;”原来是查水表来了&#xff0c;现在哪里找这么热心上门的查表员啊&#xff01;小明感动得热泪盈眶&#xff0c;开起了门…… 题目描述 妈妈下班回家&#xff0c;街坊邻居说小明被一群陌生人强行押上了警车&#xff01;妈妈丰富…...

【练习】PAT 乙 1074 宇宙无敌加法器

题目 地球人习惯使用十进制数&#xff0c;并且默认一个数字的每一位都是十进制的。而在PAT星人开挂的世界里&#xff0c;每个数字的每一位都是不同进制的&#xff0c;这种神奇的数字称为“PAT数”。每个PAT星人都必须熟记各位数字的进制表&#xff0c;例如“……0527”就表示最…...

网络防御高级02-综合实验

web页面&#xff1a; [FW]interface GigabitEthernet 0/0/0 [FW-GigabitEthernet0/0/0]service-manage all permit 需求一&#xff0c;接口配置&#xff1a; SW2: [Huawei]sysname SW2 1.创建vlan [sw2]vlan 10 [sw2]vlan 20 2.接口配置 [sw2]interface GigabitEther…...

UITableView的复用原理

UITableView复用的基本原理是Cell复用机制&#xff0c;它通过重用已经创建的Cell来减少内存开始并提高性能&#xff0c;避免频繁创建和销毁Cell。 复用的流程 1.队列管理 UITableView维护一个可复用队列&#xff08;reuse queue&#xff09;&#xff0c;存储离屏的UITableVi…...

SQL条件分支中的大讲究

在SQL中&#xff0c;条件分支用于根据不同的条件执行不同的操作&#xff0c;适用于数据查询、数据更新以及存储过程等场景。合理使用SQL条件分支&#xff0c;可以优化数据操作流程&#xff0c;提高代码的可读性和可维护性。 目录 1. 逻辑判断的基本概念 2. CASE 语句&#xf…...

Cherry Studio:一站式多模型AI交互平台深度解析 可配合大模型搭建私有知识库问答系统

Cherry Studio&#xff1a;一站式多模型AI交互平台深度解析 可配合大模型搭建私有知识库问答系统 大模型本地化部署流程可查看文章 3分钟教你搭建属于自己的本地大模型 DeepSeek Cherry Studio地址&#xff1a;https://cherry-ai.com/download Cherry Studio 简介 Cherry S…...

工业相机,镜头的选型及实战

工业相机和镜头的选型是机器视觉系统中的关键步骤&#xff0c;选型不当可能导致成像质量差或系统性能不达标。&#xff08;用于个人的学习和记录&#xff09; 一、工业相机选型方法 确定分辨率 分辨率需求&#xff1a;根据被测物体的尺寸和检测精度要求计算所需分辨率。 公式…...

C++模板学习从专家到入门:关键字typename与class

文章目录 共同点typename特性class特性 共同点 在定义类模板或者函数模板时&#xff0c;typename 和 class 关键字都可以用于指定模板参数中的类型。 template <class T> template <typename T>typename特性 C 允许在类内定义类型别名&#xff0c;且其使用方法与…...

BFS算法篇——FloodFill问题的高效解决之道(下)

文章目录 前言一. 图像渲染1.1 题目链接&#xff1a;https://leetcode.cn/problems/flood-fill/description/1.2 题目分析&#xff1a;1.3 思路讲解&#xff1a;1.4 代码实现&#xff1a; 二. 岛屿数量2.1 题目链接&#xff1a;https://leetcode.cn/problems/number-of-islands…...

Android性能优化

Android性能优化 如何优化一个包含大量图片加载的Android应用&#xff0c;以提高性能和用户体验&#xff1f; 优化一个包含大量图片加载的Android应用&#xff0c;可以从以下几个方面入手&#xff0c;以提高性能和用户体验&#xff1a; 选择合适的图片加载库 使用成熟的图片…...

1、http介绍

一、HTTP 和 HTTPS 简介 HTTP&#xff08;HyperText Transfer Protocol&#xff09; 用途&#xff1a;用于网页数据传输&#xff08;不加密&#xff09;。协议特性&#xff1a;以明文形式传输数据&#xff0c;默认端口 80&#xff0c;无身份验证和完整性保护。典型场景&#xf…...

2.6 寒假训练营补题

C Tokitsukaze and Balance String (hard) 题目描述 本题为《Tokitsukaze and Balance String (easy)》的困难版本&#xff0c;两题的唯一区别在于 n n n 的范围。 一个字符串是平衡的&#xff0c;当且仅当字符串中 "01" 连续子串的个数与 "10" 连续子…...

kafka生产者之发送模式与ACK

文章目录 Kafka的发送模式Kafka的ack机制发送模式与ack的关联重试次数总结 在Kafka中&#xff0c;发送模式与ack机制紧密相关&#xff0c;它们共同影响着消息发送的可靠性和性能。 Kafka的发送模式 发后即忘&#xff08;Fire and Forget&#xff09;&#xff1a;生产者发送消息…...

笔记:蓝桥杯python搜索(3-2)——DFS剪支和记忆化搜索

目录 一、DFS剪支 二、例题 P2942 数字王国之军训军队 P3075 特殊的多边形 三、记忆化搜索 四、例题 例题 P3820 混境之地 P216 地宫取宝 一、DFS剪支 在搜索过程中&#xff0c;如果需要完全遍历所有情况可能需要很多时间在搜索到某种状态时&#xff0c;根据当前状态判断…...

ChatBox+硅基流动Deepseek_R1开源API 满血(671B)部署教程,全程干货无废话

DeepSeek开源深度推理模型火爆发布&#xff0c;网络流量过大经常导致服务器崩溃&#xff0c;所以一般有两种方法解决这个问题 如果你的硬件支持&#xff0c;或者保密文档&#xff0c;保密单位&#xff0c;那么可以部署在本地端。但是再好的电脑也不能让DS满血复活&#xff0c;…...

35~37.ppt

目录 35.张秘书-《会计行业中长期人才发展规划》 题目​ 解析 36.颐和园公园&#xff08;25张PPT) 题目​ 解析 37.颐和园公园&#xff08;22张PPT) 题目 解析 35.张秘书-《会计行业中长期人才发展规划》 题目 解析 插入自定义的幻灯片&#xff1a;新建幻灯片→重用…...

畅快使用DeepSeek-R1的方法

腾讯云API接入Cherry Studio简明指南-畅快使用DeepSeek-R1 注意&#xff1a;腾讯云API针对deepseek限时免费&#xff08;后续即使收费也较为便宜&#xff0c;可以作为长期使用的方法&#xff09;&#xff0c;并且比华为的API要快不少。 一、获取腾讯云API密钥 登录并进入腾讯…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

Fabric V2.5 通用溯源系统——增加图片上传与下载功能

fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...

【深度学习新浪潮】什么是credit assignment problem?

Credit Assignment Problem(信用分配问题) 是机器学习,尤其是强化学习(RL)中的核心挑战之一,指的是如何将最终的奖励或惩罚准确地分配给导致该结果的各个中间动作或决策。在序列决策任务中,智能体执行一系列动作后获得一个最终奖励,但每个动作对最终结果的贡献程度往往…...

【把数组变成一棵树】有序数组秒变平衡BST,原来可以这么优雅!

【把数组变成一棵树】有序数组秒变平衡BST,原来可以这么优雅! 🌱 前言:一棵树的浪漫,从数组开始说起 程序员的世界里,数组是最常见的基本结构之一,几乎每种语言、每种算法都少不了它。可你有没有想过,一组看似“线性排列”的有序数组,竟然可以**“长”成一棵平衡的二…...

车载诊断架构 --- ZEVonUDS(J1979-3)简介第一篇

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 做到欲望极简,了解自己的真实欲望,不受外在潮流的影响,不盲从,不跟风。把自己的精力全部用在自己。一是去掉多余,凡事找规律,基础是诚信;二是…...

C++中vector类型的介绍和使用

文章目录 一、vector 类型的简介1.1 基本介绍1.2 常见用法示例1.3 常见成员函数简表 二、vector 数据的插入2.1 push_back() —— 在尾部插入一个元素2.2 emplace_back() —— 在尾部“就地”构造对象2.3 insert() —— 在任意位置插入一个或多个元素2.4 emplace() —— 在任意…...