当前位置: 首页 > article >正文

flink判断两个事件之间有没有超时(不使用CEP)

1.为啥不使用cep呢,cep的超时时间设置不好配置化,无法满足扩展要求

2.超时怎么界定。A事件发生后,过了N时间,还没有收到B事件,算超时。

代码如下:


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;@Slf4j
public class AsyncModelTimeoutHandler extends KeyedProcessFunction<String, JSONObject, JSONObject> {private static final long serialVersionUID = -61608451659272532L;private transient ValueState<Long> firstDataTime;private transient ValueState<Long> secondDataTime;private transient ValueState<String> eventType;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Long> firstDataDescriptor = new ValueStateDescriptor<>("firstDataTime", Long.class);firstDataTime = getRuntimeContext().getState(firstDataDescriptor);ValueStateDescriptor<Long> secondDataDescriptor = new ValueStateDescriptor<>("secondDataTime", Long.class);secondDataTime = getRuntimeContext().getState(secondDataDescriptor);ValueStateDescriptor<String> eventTypeDescriptor = new ValueStateDescriptor<>("eventType", String.class);eventType = getRuntimeContext().getState(eventTypeDescriptor);}@Overridepublic void processElement(JSONObject value, KeyedProcessFunction<String, JSONObject, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {Long currentTimestamp = value.getLong("ts");if (value.containsKey("timeout")) {//异步请求消息long timeout = value.getLong("timeout");firstDataTime.update(currentTimestamp + timeout);eventType.update(value.getString("event"));ctx.timerService().registerProcessingTimeTimer(currentTimestamp + timeout);} else {secondDataTime.update(currentTimestamp);}}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<String, JSONObject, JSONObject>.OnTimerContext ctx, Collector<JSONObject> out) throws Exception {Long firstTime = firstDataTime.value();Long lastTime = secondDataTime.value();if (lastTime == null || (firstTime != null && lastTime >= firstTime)) {//超时了log.info("AsyncModelTimeoutHandler onTimer handle triggerTime={}, firstTime={}, secondTime={},key={}", timestamp, firstTime, lastTime, ctx.getCurrentKey());JSONObject r = new JSONObject();r.put("id", ctx.getCurrentKey());r.put("judgeTime", timestamp);r.put("event", eventType.value());out.collect(r);}firstDataTime.clear();secondDataTime.clear();eventType.clear();}
}

相关文章:

flink判断两个事件之间有没有超时(不使用CEP)

1.为啥不使用cep呢&#xff0c;cep的超时时间设置不好配置化&#xff0c;无法满足扩展要求 2.超时怎么界定。A事件发生后&#xff0c;过了N时间&#xff0c;还没有收到B事件&#xff0c;算超时。 代码如下&#xff1a; import com.alibaba.fastjson.JSONObject; import lombo…...

数学建模与MATLAB实现:稳定状态模型与资源管理策略

引言 在实际问题中&#xff0c;动态过程的瞬时性态往往难以直接分析&#xff0c;而研究其稳定状态的特征则更具实际意义。本章介绍如何通过微分方程稳定性理论&#xff0c;结合再生资源管理、种群竞争等案例&#xff0c;分析系统的平衡点及稳定性&#xff0c;为实际决策提供数…...

爬虫代码中如何设置请求间隔?

在爬虫代码中设置请求间隔是确保爬虫稳定运行并避免对目标服务器造成过大压力的重要措施。合理设置请求间隔可以有效降低被目标网站封禁IP的风险&#xff0c;同时也有助于爬虫程序的稳定运行。以下是几种常见的方法来设置请求间隔&#xff1a; 一、使用time.sleep() time.sle…...

基于Spring Security 6的OAuth2 系列之十五 - 高级特性--客户端认证方式

之所以想写这一系列&#xff0c;是因为之前工作过程中使用Spring Security OAuth2搭建了网关和授权服务器&#xff0c;但当时基于spring-boot 2.3.x&#xff0c;其默认的Spring Security是5.3.x。之后新项目升级到了spring-boot 3.3.0&#xff0c;结果一看Spring Security也升级…...

bug-ant下拉框解决下拉框跟随表单容器(指定下拉框挂载容器):getPopupContainer=“p=>p.parentNode“

1.前言 getPopupContainer是Ant Design Vue&#xff08;简称Antd&#xff09;的<a-select>组件的一个属性&#xff0c;用于指定下拉框的挂载容器。默认情况下&#xff0c;下拉框会挂载到body元素上&#xff0c;但有时你可能需要将下拉框挂载到其他元素上&#xff0c;例如…...

驱动开发系列35 - Linux Graphics GEM Buffer Object 介绍

一:概述 在 Linux 内核中,DRM(Direct Rendering Manager)模块 是用于管理显示硬件和图形渲染的核心框架。它负责协调用户空间应用程序(例如 X Server、Wayland Compositors、Mesa 等)和 GPU 硬件之间的通信,是 Linux 图形子系统的重要组成部分。 GEM (Graphics Executio…...

网络安全检测思路

对于主机的安全检测&#xff0c;我们通常直接采用nmap或者类似软件进行扫描&#xff0c;然后针对主机操作系统及其 开放端口判断主机的安全程度&#xff0c;这当然是一种方法&#xff0c;但这种方法往往失之粗糙&#xff0c;我仔细考虑了一下&#xff0c;觉 得按下面的流程进行…...

vue error Expected indentation of 2 spaces but found 4 indent

问题的原因在于eslint的风格样式缩进检测&#xff0c;eslint给出的规则是2个缩进&#xff0c;但我们通常是4个缩进&#xff0c;这就造成了报错。 关闭eslint的缩进不同报错&#xff1a;.eslintrc.js indent:off, 全部配置&#xff1a; module.exports {root: true,parserOpt…...

回环地址127.0.0.1跟自身IP有什么区别?

区别比较显著&#xff1a; 1.从定义上看&#xff1a; 127.0.0.1&#xff1a;这个地址被称为回环地址&#xff08;Loopback Address&#xff09;&#xff0c;是用于本地通信的特殊IP地址&#xff0c;指向计算机自身。它用于测试和调试网络应用程序&#xff0c;无论设备是否连接…...

SQL CASE表达式的用法

SQL CASE表达式的用法 一、CASE表达式的基础语法简单CASE表达式搜索CASE表达式 二、简单CASE表达式的应用示例三、搜索CASE表达式的应用示例四、CASE表达式在聚合函数中的应用五、嵌套CASE表达式的应用 今天在也无力用到了CASE表达式&#xff0c;于是有了这篇博客&#xff0c;C…...

排序合集之快排详解(二)

摘要&#xff1a;快速排序是一种在实践中广泛使用的高效排序算法。它基于分治策略&#xff0c;平均时间复杂度为O(n log n)&#xff0c;使其成为处理大型数据集的理想选择。本文将深入探讨快速排序的各种实现方式、优化技巧以及非递归实现&#xff0c;并通过C语言代码示例进行详…...

前缀树算法篇:前缀信息的巧妙获取

前缀树算法篇&#xff1a;前缀信息的巧妙获取 那么前缀树算法是一个非常常用的算法&#xff0c;那么在介绍我们前缀树具体的原理以及实现上&#xff0c;我们先来说一下我们前缀树所应用的一个场景&#xff0c;那么在一个字符串的数据集合当中&#xff0c;那么我们查询我们某个字…...

shell脚本自动安装MySQL8

环境&#xff1a;centos7版本&#xff1a;8.0.28安装包&#xff1a;mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz 二进制包要求&#xff1a;安装包和shell脚本在同一目录下执行方式&#xff1a;sudo ./install_mysql8.sh #!/bin/bash# 定义MySQL安装目录和压缩包名称MYSQL_DIR…...

当没有OpenGL时,Skia如何绘制?

Skia 是可以在没有 OpenGL 的情况下进行图形绘制的&#xff0c;但是具体能否成功绘制图形&#xff0c;取决于 Skia 是如何配置的&#xff0c;以及平台上是否提供了其他的底层图形 API。 Skia 的底层依赖 Skia 的目标是提供一种跨平台的 2D 图形绘制接口。为了加速图形渲染&…...

大数据系列 | 白话讲解大数据技术生态中Hadoop、Hive、Spark的关系介绍

大数据属于数据管理系统的范畴&#xff0c;数据管理系统无非就两个问题&#xff1a;数据怎么存、数据怎么算    现在的信息爆炸时代&#xff0c;一台服务器数据存不下&#xff0c;可以找10台服务器存储&#xff0c;10台存储不下&#xff0c;可以再找100台服务器存储。但是这1…...

华为云函数计算FunctionGraph部署ollma+deepseek

1 概述 ollama和deepseek如果需要多实例&#xff0c;一种方式是部署在kubernetes集群中&#xff0c;一种是使用云厂商的云函数服务。云函数服务是按量付费&#xff0c;并且底层支持GPU&#xff0c;不需要维护kubernetes集群。本文介绍使用华为云函数计算FunctionGraph来部署ol…...

尚硅谷爬虫note001

一、模板设置 file——setting——editor——code style——file and code template——python script # _*_ coding : utf-8 _*_ # Time : ${DATE} ${TIME} # Author : 20250206-里奥 # File : ${NAME} # Project : ${PROJECT_NAME} 二、数据类型 2-1. 数字 整型int 浮点型f…...

35~37.ppt

目录 35.张秘书-《会计行业中长期人才发展规划》 题目​ 解析 36.颐和园公园&#xff08;25张PPT) 题目​ 解析 37.颐和园公园&#xff08;22张PPT) 题目 解析 35.张秘书-《会计行业中长期人才发展规划》 题目 解析 插入自定义的幻灯片&#xff1a;新建幻灯片→重用…...

FPGA简介|结构、组成和应用

Field Programmable Gate Arrays&#xff08;FPGA&#xff0c;现场可编程逻辑门阵列&#xff09;&#xff0c;是在PAL、GAL、CPLD等可编程器件的基础上进一步发展的产物&#xff0c; 是作为专用集成电路&#xff08;ASIC&#xff09;领域中的一种半定制电路而出现的&#xff0c…...

4. React 中的 CSS

用例中的干净的脚手架的创建可以参考另一篇文章&#xff1a;3.React 组件化开发React官方并没有给出在React中统一的样式风格&#xff1a; 由此&#xff0c;从普通的css&#xff0c;到css modules&#xff0c;再到css in js&#xff0c;有几十种不同的解决方案&#xff0c;上百…...

django中间件,中间件给下面传值

1、新建middleware.py文件 # myapp/middleware.py import time from django.http import HttpRequest import json from django.http import JsonResponse import urllib.parse from django.core.cache import cache from comm.Db import Db class RequestTimeMiddleware:def …...

【论文阅读】Revisiting the Assumption of Latent Separability for Backdoor Defenses

https://github.com/Unispac/Circumventing-Backdoor-Defenses 摘要和介绍 在各种后门毒化攻击中&#xff0c;来自目标类别的毒化样本和干净样本通常在潜在空间中形成两个分离的簇。 这种潜在的分离性非常普遍&#xff0c;甚至在防御研究中成为了一种默认假设&#xff0c;我…...

Python基于Django的微博热搜、微博舆论可视化系统(V3.0)【附源码】

博主介绍&#xff1a;✌Java老徐、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&…...

集成学习(一):从理论到实战(附代码)

一、引言 在机器学习领域&#xff0c;打造一个独立、强大的算法是解决问题的关键。然而&#xff0c;集成学习提供了一种不同的视角&#xff1a;通过组合多个“弱”学习器来创建一个更强大的模型。本文探讨集成学习的思想、方法及其应用。 二、机器学习 vs 集成学习思想 传统…...

不小心删除服务[null]后,git bash出现错误

不小心删除服务[null]后&#xff0c;git bash出现错误&#xff0c;如何解决&#xff1f; 错误描述&#xff1a;打开 git bash、msys2都会出现错误「bash: /dev/null: No such device or address」 问题定位&#xff1a; 1.使用搜索引擎搜索「bash: /dev/null: No such device o…...

【云安全】云原生- K8S kubeconfig 文件泄露

什么是 kubeconfig 文件&#xff1f; kubeconfig 文件是 Kubernetes 的配置文件&#xff0c;用于存储集群的访问凭证、API Server 的地址和认证信息&#xff0c;允许用户和 kubectl 等工具与 Kubernetes 集群进行交互。它通常包含多个集群的配置&#xff0c;支持通过上下文&am…...

【工业场景】用YOLOv8实现火灾识别

火灾识别任务是工业领域急需关注的重点安全事项,其应用场景和背景意义主要体现在以下几个方面: 应用场景:工业场所:在工厂、仓库等工业场所中,火灾是造成重大财产损失和人员伤亡的主要原因之一。利用火灾识别技术可以及时发现火灾迹象,采取相应的应急措施,保障人员安全和…...

(2025)深度分析DeepSeek-R1开源的6种蒸馏模型之间的逻辑处理和编写代码能力区别以及配置要求,并与ChatGPT进行对比(附本地部署教程)

(2025)通过Ollama光速部署本地DeepSeek-R1模型(支持Windows10/11)_deepseek猫娘咒语-CSDN博客文章浏览阅读1k次&#xff0c;点赞19次&#xff0c;收藏9次。通过Ollama光速部署本地DeepSeek-R1(支持Windows10/11)_deepseek猫娘咒语https://blog.csdn.net/m0_70478643/article/de…...

【自然语言处理】TextRank 算法提取关键词、短语、句(Python源码实现)

文章目录 一、TextRank 算法提取关键词 [工具包]二、TextRank 算法提取关键短语[工具包]三、TextRank 算法提取关键句[工具包]四、TextRank 算法提取关键句&#xff08;Python源码实现&#xff09; 一、TextRank 算法提取关键词 [工具包] 见链接 【自然语言处理】TextRank 算法…...

记一次Self XSS+CSRF组合利用

视频教程在我主页简介或专栏里 &#xff08;不懂都可以来问我 专栏找我哦&#xff09; 目录&#xff1a;  确认 XSS 漏洞 确认 CSRF 漏洞 这个漏洞是我在应用程序的订阅表单中发现的一个 XSS 漏洞&#xff0c;只能通过 POST 请求进行利用。通常情况下&#xff0c;基于 POST 的…...