当前位置: 首页 > news >正文

Flink CEP(三)pattern动态更新

        线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

2.代码实现

       首先实现一个用户API。

package cep.functions;import java.io.Serializable;import org.apache.flink.api.common.functions.Function;import cep.pattern.Pattern;/*** @author StephenYou* Created on 2023-07-23* Description: 动态Pattern接口(用户调用API)不区分key*/
public interface DynamicPatternFunction<T> extends Function, Serializable {/**** 初始化* @throws Exception*/public void init() throws Exception;/*** 注入新的pattern* @return*/public Pattern<T,T> inject() throws Exception;/*** 一个扫描周期:ms* @return*/public long getPeriod() throws Exception;/*** 规则是否发生变更* @return*/public boolean isChanged() throws Exception;
}

        希望上述API的调用方式如下。

//正常调用CEP.pattern(dataStream,pattern);//动态PatternCEP.injectionPattern(dataStream, new UserDynamicPatternFunction())

        所以需要修改CEP-Lib源码

        b.增加injectionPattern函数。

public class CEP {/**** Dynamic injection pattern function * @param input* @param dynamicPatternFunction* @return* @param <T>*/public static <T> PatternStream<T> injectionPattern throws Exception (DataStream<T> input,DynamicPatternFunction<T> dynamicPatternFunction){return new PatternStream<>(input, dynamicPatternFunction); }
}

        增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。

public class PatternStream<T> {PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));}
}

        修改PatternStreamBuilder.build, 增加调用函数的过程。

        final CepOperator<IN, K, OUT> operator = null;if (patternFunction == null ) {operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);} else {operator = new CepOperator<>(inputSerializer,isProcessingTime,patternFunction,comparator,null,processFunction,lateDataOutputTag);}

        增加对应的CepOperator构造函数。

    public CepOperator(final TypeSerializer<IN> inputSerializer,final boolean isProcessingTime,final DynamicPatternFunction patternFunction,@Nullable final EventComparator<IN> comparator,@Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,final PatternProcessFunction<IN, OUT> function,@Nullable final OutputTag<IN> lateDataOutputTag) {super(function);this.inputSerializer = Preconditions.checkNotNull(inputSerializer);this.patternFunction = patternFunction;this.isProcessingTime = isProcessingTime;this.comparator = comparator;this.lateDataOutputTag = lateDataOutputTag;if (afterMatchSkipStrategy == null) {this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();} else {this.afterMatchSkipStrategy = afterMatchSkipStrategy;}this.nfaFactory = null;}

        加载Pattern,构造NFA

    @Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);//初始化if (patternFunction != null) {patternFunction.init();Pattern pattern = patternFunction.inject();afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);long period = patternFunction.getPeriod();// 注册定时器检测规则是否变更if (period > 0) {getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);}}nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

        状态清理一共分为两块: 匹配状态数据清理、定时器清理;

        进行状态清理:

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (patternFunction != null) {// 规则版本更新if (needRefresh.value() < refreshVersion.get()) {//清除状态computationStates.clear();elementQueueState.clear();partialMatches.releaseCacheStatisticsTimer();//清除定时器Iterable<Long> registerTime = registerTimeState.get();if (registerTime != null) {Iterator<Long> iterator = registerTime.iterator();while (iterator.hasNext()) {Long l = iterator.next();//删除定时器timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);//状态清理iterator.remove();}}//更新当前的版本needRefresh.update(refreshVersion.get());}}
}

        上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。

    @Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//初始化状态if (patternFunction != null) {/*** 两个标识位状态*/refreshFlagState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));if (context.isRestored()) {if (refreshFlagState.get().iterator().hasNext()) {refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());}} else {refreshVersion = new AtomicInteger(0);}needRefresh = context.getKeyedStateStore().getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));}
}

3.测试验证

        设置每10s变更一次Pattern。

 PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {@Overridepublic Map select(Map map) throws Exception {map.put("processingTime", System.currentTimeMillis());return map;}}).print();env.execute("SyCep");}public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {public TestDynamicPatternFunction() {this.flag = true;}boolean flag;int time = 0;@Overridepublic void init() throws Exception {flag = true;}@Overridepublic Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()throws Exception {// 2种patternif (flag) {Pattern pattern = Pattern.<Tuple3<String, Long, String>>begin("start").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("success");}}).times(1).followedBy("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("fail");}}).times(1).next("end");return pattern;} else {Pattern pattern = Pattern.<Tuple3<String, Long, String>>begin("start2").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("success2");}}).times(2).next("middle2").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("fail2");}}).times(2).next("end2");return pattern;}}@Overridepublic long getPeriod() throws Exception {return 10000;}@Overridepublic boolean isChanged() throws Exception {flag = !flag ;time += getPeriod();System.out.println("change pattern : " + time);return true;}}

打印结果:符合预期

4.源码地址

感觉有用的话,帮忙点个小星星。^_^

 GitHub - StephenYou520/SyCep: CEP 动态Pattern

相关文章:

Flink CEP(三)pattern动态更新

线上运行的CEP中肯定经常遇到规则变更的情况&#xff0c;如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景&#xff0c;如果规则窗口过长&#xff08;一两个星期&#xff09;&#xff0c;状态过大&#xff0c;就会导致重启…...

抽象工厂模式(C++)

定义 提供一个接口,让该接口负责创建一系列“相关或者相互依赖的对象”&#xff0c;无需指定它们具体的类。 使用场景 在软件系统中&#xff0c;经常面临着“一系列相互依赖的对象”的创建工作;同时,由于需求的变化&#xff0c;往往存在更多系列对象的创建工作。如何应对这种…...

程序员面试金典17.*

文章目录 17.01 不用加号的加法17.04 消失的数字17.05字母与数字17.06 2出现的次数17.07 婴儿名字17.08 马戏团人塔17.09 第k个数17.10 主要元素17.11 单词距离17.12 BiNode17.13 恢复空格&#xff08;未做&#xff0c;字典树dp&#xff09;17.14 最小K个数17.15 最长单词17.16…...

【瑞吉外卖项目复写】基本部分复写笔记

Day1 瑞吉外卖项目概述 mysql的数据源配置 spring:datasource:druid:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/regie?serverTimezoneAsia/Shanghai&useUnicodetrue&characterEncodingutf-8&zeroDateTimeBehaviorconvertTo…...

用html+javascript打造公文一键排版系统15:一键删除所有空格

现在我们来实现一键删除所有空格的功能。 一、使用原有的代码来实现&#xff0c;测试效果并不理想 在这之前我们已经为String对象编写了一个使用正则表达式来删除所有空格的方法&#xff1a; //功能&#xff1a;删除字符串中的所有空格 //记录&#xff1a;20230726创建 Stri…...

苍穹外卖day12(完结撒花)——工作台+Spring_Apche_POI+导出运营数据Excel报表

工作台——需求分析与设计 产品原型 接口设计 工作台——代码导入 将提供的代码导入对应的位置。 工作台——功能测试 Apache POI_介绍 应用场景 Apache POI_入门案例 导入坐标 <!-- poi --><dependency><groupId>org.apache.poi</groupId><ar…...

SQL与NoSQL概念(详细介绍!!)

先搞清楚全称 SQL全称为Structured query language &#xff0c;即结构化查询语言&#xff0c;可以把他理解为一门特殊的编程语言。 那么nosql是什么意思呢&#xff1f;这里的no并不仅是not&#xff0c;而是not only的意思&#xff0c;所以nosql全称应该是Not Only Structure…...

node debian 镜像 new Date 获取时间少 8 小时问题

问题 在 node debian 镜像中&#xff0c;用 (new Date()).getHours() 与系统时间&#xff08;东 8 区&#xff09;少了 8 小时 系统时间 $ node > (new Date()).getHours() 11容器中的时间 $ node > (new Date()).getHours() 3原 Dockerfile FROM node:20.5-bullsey…...

【N32L40X】学习笔记13-软件IIC读写EEPROM AT24C02

AT24C02 8个字节每页,累计32个页 通讯频率MAX 400K AT24C02大小 2K 芯片地址 对于at24c02 A2A1A0 这三个引脚没有使用 写时序 由于设备在写周期中不会产生ACK恢复&#xff0c;因此这可用于确定周期何时完成&#xff08;此特性可用于最大限度地提高总线吞吐量&#xff09;…...

JVM 调优

点击下方关注我&#xff0c;然后右上角点击...“设为星标”&#xff0c;就能第一时间收到更新推送啦~~~ JVM调优是一项重要的任务&#xff0c;可以提高Java应用程序的性能和稳定性。掌握JVM调优需要深入了解JVM的工作原理、参数和配置选项&#xff0c;以及历史JVM参数的调整和优…...

DP-GAN剩余代码

在前面计算完损失后&#xff0c;该进行更新&#xff1a; 1&#xff1a;netEMA是模型的生成器&#xff1a; 遍历生成器的state_dict&#xff0c;将每一个键对应的值乘以EMA_decay。 接着根据当前迭代步数计算num_upd&#xff0c;每1000,2500,10000代倍数就执行一次。 当num…...

在word的文本框内使用Endnote引用文献,如何保证引文编号按照上下文排序

问题 如下图所示&#xff0c;我在word中插入了一个文本框&#xff08;为了插图&#xff09;&#xff0c;然后文本框内有引用&#xff0c;结果endnote自动将文本框内的引用优先排序&#xff0c;变成文献[1]了&#xff0c;而事实上应该是[31]。请问如何能让文本框内的排序也自动…...

SpringBoot项目上传至服务器

1.服务器安装JDK1.8 通过包管理器安装 2.服务器安装数据库 参考链接&#xff1a; CentOS 7 通过 yum 安装 MariaDB - 知乎 1. 安装之后没有密码&#xff0c;所以需要设置密码&#xff0c;使用下面的语句 set password for rootlocalhost password(111111); 2.在数据库中建…...

C++中实现多线程的三种方式

目录 1 背景2 方法 1 背景 力扣1116题 打印零和奇偶数。 2 方法 方法1&#xff1a;原子操作 class ZeroEvenOdd { private:int n;atomic<int> flag 0; public:ZeroEvenOdd(int n) {this->n n;}// printNumber(x) outputs "x", where x is an integer.…...

程序员副业指南:怎样实现年入10w+的目标?

大家好&#xff0c;这里是程序员晚枫&#xff0c;全网同名。 今天给大家分享一个大家都感兴趣的话题&#xff1a;程序员可以做什么副业&#xff0c;年入十万&#xff1f; 01 推荐 程序员可以从事以下副业&#xff0c;以获得一年收入10w&#xff1a; 兼职编程&#xff1a;可…...

excel 计算 分位值

_XLFN.QUARTILE.EXC(Result 1!G:G,2) 和 PERCENTILE 都可以用来计算一组数据的分位数&#xff0c;但是它们的计算方式略有不同。 _XLFN.QUARTILE.EXC(Result 1!G:G,2) 是 Excel 中的一个函数&#xff0c;在计算一个数据集的四分位数时使用。其中&#xff0c;第一个参数 Result…...

mongodb-windows-x86_64-4.4.23-signed.msi

...

一个SpringBoot 项目能处理多少请求?

这篇文章带大家盘一个读者遇到的面试题哈。 根据读者转述&#xff0c;面试官的原问题就是&#xff1a;一个 SpringBoot 项目能同时处理多少请求&#xff1f; 不知道你听到这个问题之后的第一反应是什么。 我大概知道他要问的是哪个方向&#xff0c;但是对于这种只有一句话的…...

Shell编程基础(十)读取多行文本到数组 写入多行文本到文件

读取多行文本到数组 & 写入多行文本到文件 读取多行文本到数组写入多行文本到文件 读取多行文本到数组 创建一个文本文件&#xff0c;内容如下 1 zhangsan 男 10 2 liis 女 12 3 wangwu 男 17读取这个文件中所有人的信息 #!/bin/bash while read u do echo $u done <…...

MyBatis学习笔记2

CRUD 1.namespace namespace中的包名要和mapper接口的包名一致&#xff01; 2.select 选择查询语句 id&#xff1a;就是对应的namespace中的方法名&#xff1b; resultType:Sql语句执行的返回值&#xff01; parameterType&#xff1a;参数类型 增删改必须提交事务&…...

OpenPnP玩家必看:深度解析松下DP102传感器与贴片机真空系统的联动原理与调优

OpenPnP系统集成实战&#xff1a;DP102负压传感器与真空控制回路的科学调优 在DIY贴片机的世界里&#xff0c;OpenPnP系统就像一位不知疲倦的指挥家&#xff0c;而DP102负压传感器则是这支精密乐队中的关键乐手。当吸嘴与元器件相遇的瞬间&#xff0c;背后是一场由气压数据驱动…...

Cadence Virtuoso新手避坑指南:手把手教你画反相器原理图(附3.3V工艺库设置)

Cadence Virtuoso新手避坑指南&#xff1a;3.3V工艺库反相器设计全流程解析 第一次打开Cadence Virtuoso时&#xff0c;那个充满专业术语的界面就像面对一架航天飞机的控制台——每个按钮都暗藏玄机&#xff0c;每次点击都可能引发未知错误。作为模拟IC设计的行业标准工具&…...

C#上位机实战:手把手教你用WinForm控制艾德克斯IT6322B程控电源(附完整源码)

C#工业级程控电源上位机开发实战&#xff1a;从协议解析到多线程安全控制 在工业自动化测试领域&#xff0c;程控电源作为核心供电设备&#xff0c;其精确控制能力直接影响测试结果的可靠性。传统的手动调节方式早已无法满足现代生产线对效率和一致性的要求。以艾德克斯IT6322…...

Overleaf实战:手把手教你用LaTeX制作符合A4排版要求的跨页长表格(含完整代码)

Overleaf实战&#xff1a;LaTeX跨页长表格的终极解决方案 当你正在撰写一篇包含大量数据的学术论文或技术手册时&#xff0c;那些横跨多页的表格往往会成为格式噩梦。表格在页面底部被生硬截断&#xff0c;表头在后续页面消失&#xff0c;页码引用混乱——这些问题不仅影响阅读…...

STC32G单片机开发实战:GPIO模式配置与寄存器详解

1. STC32G单片机GPIO基础认知 第一次拿到STC32G开发板时&#xff0c;我习惯性地想用STM32那套HAL库来操作GPIO&#xff0c;结果发现根本行不通。这就像拿着汽车钥匙去开保险箱&#xff0c;虽然都是"开锁"&#xff0c;但机制完全不同。STC32G作为增强型8051架构单片机…...

B站视频转文字终极指南:如何快速将B站视频转换为可搜索文本

B站视频转文字终极指南&#xff1a;如何快速将B站视频转换为可搜索文本 【免费下载链接】bili2text Bilibili视频转文字&#xff0c;一步到位&#xff0c;输入链接即可使用 项目地址: https://gitcode.com/gh_mirrors/bi/bili2text Bili2Text是一款开源的B站视频转文字工…...

RISC-V开发板结合Python实现B站消息监测:硬件极客的IoT实践

1. 项目概述&#xff1a;当硬件极客遇上日常痛点前几天在极客社区里看到一个挺有意思的分享&#xff0c;一位开发者朋友用一块高性能的RISC-V开发板&#xff0c;结合自己写的Python脚本&#xff0c;做了一个B站未读消息的实时监测器。这项目乍一听有点“杀鸡用牛刀”的感觉——…...

MoneyPrinterPlus智能视频创作工具实战指南:从零到批量生产的完整流程

MoneyPrinterPlus智能视频创作工具实战指南&#xff1a;从零到批量生产的完整流程 【免费下载链接】MoneyPrinterPlus AI一键批量生成各类短视频,自动批量混剪短视频,自动把视频发布到抖音,快手,小红书,视频号上,赚钱从来没有这么容易过! 支持本地语音模型chatTTS,fasterwhispe…...

为每日代码评审接入Claude Code并配置Taotoken作为后备模型

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 为每日代码评审接入Claude Code并配置Taotoken作为后备模型 作为团队技术负责人&#xff0c;将AI工具引入日常开发流程&#xff0c…...

信步SV1a-13714P嵌入式主板拆解:工业边缘计算硬件选型与实战部署指南

1. 项目概述&#xff1a;一块嵌入式主板的深度拆解最近在整理一个工业边缘计算的项目资料&#xff0c;翻出了几块之前用过的“信步科技SV1a-13714P”嵌入式主板。这块板子虽然不是什么新潮的玩意儿&#xff0c;但在特定的工业场景里&#xff0c;它就像一颗“定心丸”&#xff0…...