Flink CEP(二) 运行源码解析
通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。
1. Pattern的定义
Pattern<Tuple3<String, Long, String>,?> pattern = Pattern.<Tuple3<String, Long, String>>begin("begin").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("success");}}).followedByAny("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("fail");}}).followedBy("end").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("end");}});
在执行中,我们可以看到pattern的几个属性,进入Pattern类中查看。
public class Pattern<T, F extends T> {/** Name of the pattern. */private final String name;/** Previous pattern. */private final Pattern<T, ? extends T> previous;/** The condition an event has to satisfy to be considered a matched. */private IterativeCondition<F> condition;/** Window length in which the pattern match has to occur. */private final Map<WithinType, Time> windowTimes = new HashMap<>();/*** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}.*/private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);/** The condition an event has to satisfy to stop collecting events into looping state. */private IterativeCondition<F> untilCondition;/** Applicable to a {@code times} pattern, and holds the number of times it has to appear. */private Times times;private final AfterMatchSkipStrategy afterMatchSkipStrategy;
}
可以看到每一个Pattern都会存在以下属性:
- Name:Pattern的Name
- previous:之前的Pattern
- condition:Pattern的匹配逻辑
- windowTimes:限制窗口的时长
- Quantifier:Pattern的属性,包括配置Pattern的模式可以发生的循环次数,或者这个模式是贪婪的还是可选的。
-
/*** A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.** <ol>* <li>Single* <li>Looping* <li>Times* </ol>** <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times* also hava an additional inner consuming strategy that is applied between accepted events in the* pattern.*/ public class Quantifier {private final EnumSet<QuantifierProperty> properties;private final ConsumingStrategy consumingStrategy;private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT; }
-
-
untilCondition:Pattern的循环匹配的结束条件
-
times:连续匹配次数
-
afterMatchSkipStrategy:匹配后的跳过策略
2.PatternStream的构建
对Pattern定义完成,会通过PatternStreamBuilder,将1中定义好的Pattern应用到输入流中,返回对应的PatternStream。
static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);}PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));}
继续执行代码,进入Select()。
public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction,final TypeInformation<R> outTypeInfo) {final PatternProcessFunction<T, R> processFunction =fromSelect(builder.clean(patternSelectFunction)).build();return process(processFunction, outTypeInfo);}
进入process可以看到PatternStream.select会调用builder.build函数。
public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction,final TypeInformation<R> outTypeInfo) {return builder.build(outTypeInfo, builder.clean(patternProcessFunction));}
在build函数中会完成NFAFactory的定义,随后构建CepOperator。inputstream随之运行CepOperator即pattern定义的处理逻辑,并返回结果流PatternStream。
<OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,final PatternProcessFunction<IN, OUT> processFunction) {checkNotNull(outTypeInfo);checkNotNull(processFunction);final TypeSerializer<IN> inputSerializer =inputStream.getType().createSerializer(inputStream.getExecutionConfig());final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;final NFACompiler.NFAFactory<IN> nfaFactory =NFACompiler.compileFactory(pattern, timeoutHandling);CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);final SingleOutputStreamOperator<OUT> patternStream;if (inputStream instanceof KeyedStream) {KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);} else {KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();patternStream =inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator).forceNonParallel();}return patternStream;}
3.CepOperator的执行
初始化。
@Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}
可以看到,nfaFactory.createNFA();会解析pattern组合,并为每一个pattern创建一个state。
CepOperator会在processElement中处理流中的每条数据。
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (isProcessingTime) {if (comparator == null) {// there can be no out of order elements in processing timeNFAState nfaState = getNFAState();long timestamp = getProcessingTimeService().getCurrentProcessingTime();advanceTime(nfaState, timestamp);processEvent(nfaState, element.getValue(), timestamp);updateNFA(nfaState);} else {long currentTime = timerService.currentProcessingTime();bufferEvent(element.getValue(), currentTime);}} else {long timestamp = element.getTimestamp();IN value = element.getValue();// In event-time processing we assume correctness of the watermark.// Events with timestamp smaller than or equal with the last seen watermark are// considered late.// Late events are put in a dedicated side output, if the user has specified one.if (timestamp > timerService.currentWatermark()) {// we have an event with a valid timestamp, so// we buffer it until we receive the proper watermark.bufferEvent(value, timestamp);} else if (lateDataOutputTag != null) {output.collect(lateDataOutputTag, element);} else {numLateRecordsDropped.inc();}}}
可以看到,如果使用的是处理时间,需要先对数据根据当前处理时间将乱序的数据做一次处理,保证数据的有序。
如果使用的事件时间,如果事件时间戳小于等于watermark会被认为是迟到数据。
正常数据会先被缓存起来,等待处理。
private void bufferEvent(IN event, long currentTime) throws Exception {List<IN> elementsForTimestamp = elementQueueState.get(currentTime);if (elementsForTimestamp == null) {elementsForTimestamp = new ArrayList<>();registerTimer(currentTime);}elementsForTimestamp.add(event);elementQueueState.put(currentTime, elementsForTimestamp);}
elementQueueState 会以时间戳为key保存对应的数据。在onEventTime()函数中通过processEvent中处理缓存的匹配数据。
@Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {// 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists// by feeding them in the NFA// 3) advance the time to the current watermark, so that expired patterns are discarded.// 4) update the stored state for the key, by only storing the new NFA and MapState iff they// have state to be used later.// 5) update the last seen watermark.// STEP 1PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();NFAState nfaState = getNFAState();// STEP 2while (!sortedTimestamps.isEmpty()&& sortedTimestamps.peek() <= timerService.currentWatermark()) {long timestamp = sortedTimestamps.poll();advanceTime(nfaState, timestamp);// 对事件按时间进行排序try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {elements.forEachOrdered(event -> {try {processEvent(nfaState, event, timestamp);} catch (Exception e) {throw new RuntimeException(e);}});}elementQueueState.remove(timestamp);}// STEP 3advanceTime(nfaState, timerService.currentWatermark());// STEP 4updateNFA(nfaState);}
private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {Collection<Map<String, List<IN>>> patterns =nfa.process(sharedBufferAccessor,nfaState,event,timestamp,afterMatchSkipStrategy,cepTimerService);if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {registerTimer(timestamp + nfa.getWindowTime());}processMatchedSequences(patterns, timestamp);}}private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {PatternProcessFunction<IN, OUT> function = getUserFunction();setTimestamp(timestamp);for (Map<String, List<IN>> matchingSequence : matchingSequences) {function.processMatch(matchingSequence, context, collector);}}
nfa.process()最后会调用doProcess进行处理。
computer
可以看到每来一个新的Event,就会从上一个数据停留的状态开始遍历。判断新事件Event匹配之前已经匹配过的哪个状态,并为其版本号+1
前5条数据是success->fail->fail->success->fail,我们可以观察到partialMatches的变化如下:
success事件到达,因为之前没有事件,所以当前停留的状态是 begin。success匹配,预期会停留在middle状态
fail事件到达,可以看到上面的success事件停留在了middle状态,并且begin的版本+1.
判断这个fail事件可以匹配后续的patern,状态从middle转移到end。存在newComputationStates中。最终更新到partialMatch中。
第二个fail事件到达,只能匹配之前的middle状态,所以partialMatch中会新增一个end状态,并且middle的版本+1;
最后如果状态到达终态,输出到potentialMatches中存储。
打印结果,可以看到每个事件都会试图去匹配所有的历史状态,nfa会存储所有匹配上的历史状态,直到到达终态。
相关文章:

Flink CEP(二) 运行源码解析
通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。 1. Pattern的定义 Pattern<Tuple3<String, Long, String>,?> pattern Pattern.<Tuple3<String, Long, String>>begin("begin").where(new…...
剑指Offer-学习计划(四)双指针(下)
剑指 Offer 57. 和为s的两个数字 剑指 Offer 58 - I. 翻转单词顺序 剑指 Offer 21. 调整数组顺序使奇数位于偶数前面 题目一:调整数组顺序使奇数位于偶数前面 输入一个整数数组,实现一个函数来调整该数组中数字的顺序,使得所有奇数在数组的…...

深度学习——常见注意力机制
1.SENet SENet属于通道注意力机制。2017年提出,是imageNet最后的冠军 SENet采用的方法是对于特征层赋予权值。 重点在于如何赋权 1.将输入信息的所有通道平均池化。 2.平均池化后进行两次全连接,第一次全连接链接的神经元较少,第二次全连…...

Python 进阶(七):高级文件操作(shutil 模块)
❤️ 博客主页:水滴技术 🌸 订阅专栏:Python 入门核心技术 🚀 支持水滴:点赞👍 收藏⭐ 留言💬 文章目录 1. 简介2. 常用函数2.1 复制文件2.2 复制目录2.3 移动文件或目录2.4 删除文件或目录2.…...

保留网络:大型语言模型的Transformer继任者
原文信息 原文题目:《Retentive Network: A Successor to Transformer for Large Language Models》 原文引用:Sun Y, Dong L, Huang S, et al. Retentive Network: A Successor to Transformer for Large Language Models[J]. arXiv preprint arXiv:2…...
算法通关村第二关——反转链表青铜笔记
LeetCode 206.反转链表 建立虚拟结点辅助翻转 public ListNode reverseList(ListNode head) {ListNode ans new ListNode(-1);ListNode cur head;while(cur!null){ListNode curNext cur.next;cur.next ans.next;ans.next cur;cur curNext;}return ans.next; }不带虚拟头…...

【Linux】——线程安全
目录 关于线程进程的问题 可重入与线程安全 常见的线程安全的情况 常见的不可重入的情况 常见的可重入的情况 可重入与线程安全区别 可重入与线程安全联系 Linux线程互斥 进程线程间的互斥相关概念 互斥量mutex 互斥量mutex常用接口 互斥量改造抢票系统 互斥量的原…...

[React]生命周期
前言 学习React,生命周期很重要,我们了解完生命周期的各个组件,对写高性能组件会有很大的帮助. Ract生命周期 React 生命周期分为三种状态 1. 初始化 2.更新 3.销毁 初始化 1、getDefaultProps() 设置默认的props,也可以用duf…...

【2023】Redis实现消息队列的方式汇总以及代码实现
Redis实现消息队列的方式汇总以及代码实现 前言开始前准备1、添加依赖2、添加配置的Bean 具体实现一、从最简单的开始:List 队列代码实现 二、发布订阅模式:Pub/Sub1、使用RedisMessageListenerContainer实现订阅2、还可以使用redisTemplate实现订阅 三、…...
ARM裸机-10
1、X210开发板和光盘资料 1.1、配置信息 CPU:三星S5PV210 内存:512M DDR2 SDRAM Flash:4GB iBand LCD:7寸,分辨率800x480 触摸屏:电容触摸屏 2、X210开发板硬件手册 3、X210开发板刷系统 3.1、什么是刷…...

「C/C++」C/C++指针详解
✨博客主页何曾参静谧的博客📌文章专栏「C/C」C/C程序设计📚全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「算法」数据结构与算法「File」数据文件格式 目录 一、术语…...

提高电脑寿命的维护技巧与方法分享
在维护电脑运行方面,我有一些自己觉得非常有用的技巧和方法。下面我将分享一些我常用的维护技巧,并解释为什么我会选择这样做以及这样做的好处。 首先,我经常清理我的电脑内部的灰尘。电脑内部的灰尘会影响散热效果,导致电脑发热…...
React常见面试题
React常见面试题 一、React中的样式管理有哪些方法 内联样式:对象,作用于当前组件普通样式表: 作用于全局,文件名是:xxx.scssCSS模块:类似Vue的scoped, 文件名需是:xxx.module.scs…...

C++中数据的输入输出介绍
C中数据的输入输出介绍 C中数据的输入输出涉及到的文件 <iostream>:这是C标准库中最常用的头文件之一,包含了进行标准输入输出操作的类和对象,如std::cin、std::cout、std::endl等。 <iomanip>:该头文件提供了一些用…...

0101日志-运维-mysql
1 错误日志 错误日志(Error Log):错误日志记录了MySQL引擎在运行过程中出现的错误和异常情况。这些错误可能包括启动和关闭问题、数据库崩溃、权限问题等。错误日志对于排查和解决MySQL引擎问题非常有帮助。 改日志默认开启,默认存…...

LabVIEW使用灰度和边缘检测进行视频滤波
LabVIEW使用灰度和边缘检测进行视频滤波 数字图像处理(DIP)是真实和连续世界的离散表示。除此之外,这种数字图像在通信、医学、遥感、地震学、工业自动化、机器人、航空航天和教育等领域变得非常重要。计算机技术越来越需要视频图像的数字图…...

SpringBoot整合WebService
SpringBoot整合WebService WebService是一个比较旧的远程调用通信框架,现在企业项目中用的比较少,因为它逐步被SpringCloud所取代,它的优势就是能够跨语言平台通信,所以还有点价值,下面来看看如何在SpringBoot项目中使…...
【LangChain】向量存储之FAISS
LangChain学习文档 【LangChain】向量存储(Vector stores)【LangChain】向量存储之FAISS 概要 Facebook AI 相似性搜索(Faiss)是一个用于高效相似性搜索和密集向量聚类的库。它包含的算法可以搜索任意大小的向量集,甚至可能无法容纳在 RAM 中…...

小研究 - 主动式微服务细粒度弹性缩放算法研究(三)
微服务架构已成为云数据中心的基本服务架构。但目前关于微服务系统弹性缩放的研究大多是基于服务或实例级别的水平缩放,忽略了能够充分利用单台服务器资源的细粒度垂直缩放,从而导致资源浪费。为此,本文设计了主动式微服务细粒度弹性缩放算法…...
驱动开发相关内容复盘
并发与竞争 并发 多个“用户”同时访问同一个共享资源。 竞争 并发和竞争的处理方法 处理并发和竞争的机制:原子操作、自旋锁、信号量和互斥体。 1、原子操作 原子操作就是指不能再进一步分割的操作,一般原子操作用于变量或者位操作。 …...

C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...

基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...

ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...