Flink CEP(二) 运行源码解析
通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。
1. Pattern的定义
Pattern<Tuple3<String, Long, String>,?> pattern = Pattern.<Tuple3<String, Long, String>>begin("begin").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("success");}}).followedByAny("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("fail");}}).followedBy("end").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("end");}});

在执行中,我们可以看到pattern的几个属性,进入Pattern类中查看。
public class Pattern<T, F extends T> {/** Name of the pattern. */private final String name;/** Previous pattern. */private final Pattern<T, ? extends T> previous;/** The condition an event has to satisfy to be considered a matched. */private IterativeCondition<F> condition;/** Window length in which the pattern match has to occur. */private final Map<WithinType, Time> windowTimes = new HashMap<>();/*** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}.*/private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);/** The condition an event has to satisfy to stop collecting events into looping state. */private IterativeCondition<F> untilCondition;/** Applicable to a {@code times} pattern, and holds the number of times it has to appear. */private Times times;private final AfterMatchSkipStrategy afterMatchSkipStrategy;
}
可以看到每一个Pattern都会存在以下属性:
- Name:Pattern的Name
- previous:之前的Pattern
- condition:Pattern的匹配逻辑
- windowTimes:限制窗口的时长
- Quantifier:Pattern的属性,包括配置Pattern的模式可以发生的循环次数,或者这个模式是贪婪的还是可选的。
-
/*** A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.** <ol>* <li>Single* <li>Looping* <li>Times* </ol>** <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times* also hava an additional inner consuming strategy that is applied between accepted events in the* pattern.*/ public class Quantifier {private final EnumSet<QuantifierProperty> properties;private final ConsumingStrategy consumingStrategy;private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT; }
-
-
untilCondition:Pattern的循环匹配的结束条件
-
times:连续匹配次数
-
afterMatchSkipStrategy:匹配后的跳过策略
2.PatternStream的构建
对Pattern定义完成,会通过PatternStreamBuilder,将1中定义好的Pattern应用到输入流中,返回对应的PatternStream。
static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);}PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));}
继续执行代码,进入Select()。
public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction,final TypeInformation<R> outTypeInfo) {final PatternProcessFunction<T, R> processFunction =fromSelect(builder.clean(patternSelectFunction)).build();return process(processFunction, outTypeInfo);}
进入process可以看到PatternStream.select会调用builder.build函数。
public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction,final TypeInformation<R> outTypeInfo) {return builder.build(outTypeInfo, builder.clean(patternProcessFunction));}
在build函数中会完成NFAFactory的定义,随后构建CepOperator。inputstream随之运行CepOperator即pattern定义的处理逻辑,并返回结果流PatternStream。
<OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,final PatternProcessFunction<IN, OUT> processFunction) {checkNotNull(outTypeInfo);checkNotNull(processFunction);final TypeSerializer<IN> inputSerializer =inputStream.getType().createSerializer(inputStream.getExecutionConfig());final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;final NFACompiler.NFAFactory<IN> nfaFactory =NFACompiler.compileFactory(pattern, timeoutHandling);CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);final SingleOutputStreamOperator<OUT> patternStream;if (inputStream instanceof KeyedStream) {KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);} else {KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();patternStream =inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator).forceNonParallel();}return patternStream;}
3.CepOperator的执行
初始化。
@Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

可以看到,nfaFactory.createNFA();会解析pattern组合,并为每一个pattern创建一个state。
CepOperator会在processElement中处理流中的每条数据。
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (isProcessingTime) {if (comparator == null) {// there can be no out of order elements in processing timeNFAState nfaState = getNFAState();long timestamp = getProcessingTimeService().getCurrentProcessingTime();advanceTime(nfaState, timestamp);processEvent(nfaState, element.getValue(), timestamp);updateNFA(nfaState);} else {long currentTime = timerService.currentProcessingTime();bufferEvent(element.getValue(), currentTime);}} else {long timestamp = element.getTimestamp();IN value = element.getValue();// In event-time processing we assume correctness of the watermark.// Events with timestamp smaller than or equal with the last seen watermark are// considered late.// Late events are put in a dedicated side output, if the user has specified one.if (timestamp > timerService.currentWatermark()) {// we have an event with a valid timestamp, so// we buffer it until we receive the proper watermark.bufferEvent(value, timestamp);} else if (lateDataOutputTag != null) {output.collect(lateDataOutputTag, element);} else {numLateRecordsDropped.inc();}}}
可以看到,如果使用的是处理时间,需要先对数据根据当前处理时间将乱序的数据做一次处理,保证数据的有序。
如果使用的事件时间,如果事件时间戳小于等于watermark会被认为是迟到数据。
正常数据会先被缓存起来,等待处理。
private void bufferEvent(IN event, long currentTime) throws Exception {List<IN> elementsForTimestamp = elementQueueState.get(currentTime);if (elementsForTimestamp == null) {elementsForTimestamp = new ArrayList<>();registerTimer(currentTime);}elementsForTimestamp.add(event);elementQueueState.put(currentTime, elementsForTimestamp);}
elementQueueState 会以时间戳为key保存对应的数据。在onEventTime()函数中通过processEvent中处理缓存的匹配数据。
@Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {// 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists// by feeding them in the NFA// 3) advance the time to the current watermark, so that expired patterns are discarded.// 4) update the stored state for the key, by only storing the new NFA and MapState iff they// have state to be used later.// 5) update the last seen watermark.// STEP 1PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();NFAState nfaState = getNFAState();// STEP 2while (!sortedTimestamps.isEmpty()&& sortedTimestamps.peek() <= timerService.currentWatermark()) {long timestamp = sortedTimestamps.poll();advanceTime(nfaState, timestamp);// 对事件按时间进行排序try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {elements.forEachOrdered(event -> {try {processEvent(nfaState, event, timestamp);} catch (Exception e) {throw new RuntimeException(e);}});}elementQueueState.remove(timestamp);}// STEP 3advanceTime(nfaState, timerService.currentWatermark());// STEP 4updateNFA(nfaState);}
private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {Collection<Map<String, List<IN>>> patterns =nfa.process(sharedBufferAccessor,nfaState,event,timestamp,afterMatchSkipStrategy,cepTimerService);if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {registerTimer(timestamp + nfa.getWindowTime());}processMatchedSequences(patterns, timestamp);}}private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {PatternProcessFunction<IN, OUT> function = getUserFunction();setTimestamp(timestamp);for (Map<String, List<IN>> matchingSequence : matchingSequences) {function.processMatch(matchingSequence, context, collector);}}
nfa.process()最后会调用doProcess进行处理。
computer

可以看到每来一个新的Event,就会从上一个数据停留的状态开始遍历。判断新事件Event匹配之前已经匹配过的哪个状态,并为其版本号+1
前5条数据是success->fail->fail->success->fail,我们可以观察到partialMatches的变化如下:
success事件到达,因为之前没有事件,所以当前停留的状态是 begin。success匹配,预期会停留在middle状态

fail事件到达,可以看到上面的success事件停留在了middle状态,并且begin的版本+1.

判断这个fail事件可以匹配后续的patern,状态从middle转移到end。存在newComputationStates中。最终更新到partialMatch中。

第二个fail事件到达,只能匹配之前的middle状态,所以partialMatch中会新增一个end状态,并且middle的版本+1;


最后如果状态到达终态,输出到potentialMatches中存储。
打印结果,可以看到每个事件都会试图去匹配所有的历史状态,nfa会存储所有匹配上的历史状态,直到到达终态。

相关文章:
Flink CEP(二) 运行源码解析
通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。 1. Pattern的定义 Pattern<Tuple3<String, Long, String>,?> pattern Pattern.<Tuple3<String, Long, String>>begin("begin").where(new…...
剑指Offer-学习计划(四)双指针(下)
剑指 Offer 57. 和为s的两个数字 剑指 Offer 58 - I. 翻转单词顺序 剑指 Offer 21. 调整数组顺序使奇数位于偶数前面 题目一:调整数组顺序使奇数位于偶数前面 输入一个整数数组,实现一个函数来调整该数组中数字的顺序,使得所有奇数在数组的…...
深度学习——常见注意力机制
1.SENet SENet属于通道注意力机制。2017年提出,是imageNet最后的冠军 SENet采用的方法是对于特征层赋予权值。 重点在于如何赋权 1.将输入信息的所有通道平均池化。 2.平均池化后进行两次全连接,第一次全连接链接的神经元较少,第二次全连…...
Python 进阶(七):高级文件操作(shutil 模块)
❤️ 博客主页:水滴技术 🌸 订阅专栏:Python 入门核心技术 🚀 支持水滴:点赞👍 收藏⭐ 留言💬 文章目录 1. 简介2. 常用函数2.1 复制文件2.2 复制目录2.3 移动文件或目录2.4 删除文件或目录2.…...
保留网络:大型语言模型的Transformer继任者
原文信息 原文题目:《Retentive Network: A Successor to Transformer for Large Language Models》 原文引用:Sun Y, Dong L, Huang S, et al. Retentive Network: A Successor to Transformer for Large Language Models[J]. arXiv preprint arXiv:2…...
算法通关村第二关——反转链表青铜笔记
LeetCode 206.反转链表 建立虚拟结点辅助翻转 public ListNode reverseList(ListNode head) {ListNode ans new ListNode(-1);ListNode cur head;while(cur!null){ListNode curNext cur.next;cur.next ans.next;ans.next cur;cur curNext;}return ans.next; }不带虚拟头…...
【Linux】——线程安全
目录 关于线程进程的问题 可重入与线程安全 常见的线程安全的情况 常见的不可重入的情况 常见的可重入的情况 可重入与线程安全区别 可重入与线程安全联系 Linux线程互斥 进程线程间的互斥相关概念 互斥量mutex 互斥量mutex常用接口 互斥量改造抢票系统 互斥量的原…...
[React]生命周期
前言 学习React,生命周期很重要,我们了解完生命周期的各个组件,对写高性能组件会有很大的帮助. Ract生命周期 React 生命周期分为三种状态 1. 初始化 2.更新 3.销毁 初始化 1、getDefaultProps() 设置默认的props,也可以用duf…...
【2023】Redis实现消息队列的方式汇总以及代码实现
Redis实现消息队列的方式汇总以及代码实现 前言开始前准备1、添加依赖2、添加配置的Bean 具体实现一、从最简单的开始:List 队列代码实现 二、发布订阅模式:Pub/Sub1、使用RedisMessageListenerContainer实现订阅2、还可以使用redisTemplate实现订阅 三、…...
ARM裸机-10
1、X210开发板和光盘资料 1.1、配置信息 CPU:三星S5PV210 内存:512M DDR2 SDRAM Flash:4GB iBand LCD:7寸,分辨率800x480 触摸屏:电容触摸屏 2、X210开发板硬件手册 3、X210开发板刷系统 3.1、什么是刷…...
「C/C++」C/C++指针详解
✨博客主页何曾参静谧的博客📌文章专栏「C/C」C/C程序设计📚全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「算法」数据结构与算法「File」数据文件格式 目录 一、术语…...
提高电脑寿命的维护技巧与方法分享
在维护电脑运行方面,我有一些自己觉得非常有用的技巧和方法。下面我将分享一些我常用的维护技巧,并解释为什么我会选择这样做以及这样做的好处。 首先,我经常清理我的电脑内部的灰尘。电脑内部的灰尘会影响散热效果,导致电脑发热…...
React常见面试题
React常见面试题 一、React中的样式管理有哪些方法 内联样式:对象,作用于当前组件普通样式表: 作用于全局,文件名是:xxx.scssCSS模块:类似Vue的scoped, 文件名需是:xxx.module.scs…...
C++中数据的输入输出介绍
C中数据的输入输出介绍 C中数据的输入输出涉及到的文件 <iostream>:这是C标准库中最常用的头文件之一,包含了进行标准输入输出操作的类和对象,如std::cin、std::cout、std::endl等。 <iomanip>:该头文件提供了一些用…...
0101日志-运维-mysql
1 错误日志 错误日志(Error Log):错误日志记录了MySQL引擎在运行过程中出现的错误和异常情况。这些错误可能包括启动和关闭问题、数据库崩溃、权限问题等。错误日志对于排查和解决MySQL引擎问题非常有帮助。 改日志默认开启,默认存…...
LabVIEW使用灰度和边缘检测进行视频滤波
LabVIEW使用灰度和边缘检测进行视频滤波 数字图像处理(DIP)是真实和连续世界的离散表示。除此之外,这种数字图像在通信、医学、遥感、地震学、工业自动化、机器人、航空航天和教育等领域变得非常重要。计算机技术越来越需要视频图像的数字图…...
SpringBoot整合WebService
SpringBoot整合WebService WebService是一个比较旧的远程调用通信框架,现在企业项目中用的比较少,因为它逐步被SpringCloud所取代,它的优势就是能够跨语言平台通信,所以还有点价值,下面来看看如何在SpringBoot项目中使…...
【LangChain】向量存储之FAISS
LangChain学习文档 【LangChain】向量存储(Vector stores)【LangChain】向量存储之FAISS 概要 Facebook AI 相似性搜索(Faiss)是一个用于高效相似性搜索和密集向量聚类的库。它包含的算法可以搜索任意大小的向量集,甚至可能无法容纳在 RAM 中…...
小研究 - 主动式微服务细粒度弹性缩放算法研究(三)
微服务架构已成为云数据中心的基本服务架构。但目前关于微服务系统弹性缩放的研究大多是基于服务或实例级别的水平缩放,忽略了能够充分利用单台服务器资源的细粒度垂直缩放,从而导致资源浪费。为此,本文设计了主动式微服务细粒度弹性缩放算法…...
驱动开发相关内容复盘
并发与竞争 并发 多个“用户”同时访问同一个共享资源。 竞争 并发和竞争的处理方法 处理并发和竞争的机制:原子操作、自旋锁、信号量和互斥体。 1、原子操作 原子操作就是指不能再进一步分割的操作,一般原子操作用于变量或者位操作。 …...
90% 的代码交给 AI 后,人还剩什么本事?
问题定义、架构决策、结果取舍。 Cognition AI 及其研发的智能体 Devin 如何重塑软件工程的未来。作者指出,AI 已经能够接管 90% 的底层执行工作,包括编写代码和修复漏洞,使人类工程师从琐碎的实现细节中解放出来。在这一范式转变下ÿ…...
OpenClaw学术助手:Qwen2.5-VL-7B自动解析论文图表数据
OpenClaw学术助手:Qwen2.5-VL-7B自动解析论文图表数据 1. 为什么需要自动化论文图表解析 作为一名经常需要阅读大量学术论文的研究者,我发现自己花费了太多时间在手动转录图表数据上。每当遇到一篇包含复杂实验数据的论文,就需要对着PDF截图…...
Qt多线程数据库操作:安全分离连接,彻底解决段错误
在 Qt 开发中,数据库操作与多线程的搭配是一个经典难题。许多开发者都曾遇到过这样的诡异现象:程序运行一段时间后突然崩溃,堆栈指向数据库操作,但代码逻辑明明正确。真相只有一个——数据库连接被多个线程共享了。本文结合真实项…...
JAVA重点基础、进阶知识及易错点总结(17)线程安全 synchronized 同步锁
🚀 Java 巩固进阶 第17天 主题:线程安全 & synchronized 同步锁 —— 并发编程的第一道防线📅 进度概览:今天攻克 多线程最核心难题:线程安全。这是面试必考、生产环境必用的知识点,直接决定你的代码能…...
ECharts折线图入门学习:从基础到实战的完整指南
引言 折线图是数据可视化中最常用的图表类型之一,特别适合展示数据随时间变化的趋势。ECharts作为一款功能强大的JavaScript可视化库,提供了丰富的配置选项和交互功能,能够轻松创建出专业、美观的折线图。本文将带领大家从零开始学习ECharts折…...
Cursor Pro功能突破解决方案:基于cursor-free-vip的完整技术指南
Cursor Pro功能突破解决方案:基于cursor-free-vip的完整技术指南 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reache…...
3步突破Navicat试用期限制:让数据库管理工具持续为你服务
3步突破Navicat试用期限制:让数据库管理工具持续为你服务 【免费下载链接】navicat_reset_mac navicat16 mac版无限重置试用期脚本 项目地址: https://gitcode.com/gh_mirrors/na/navicat_reset_mac 作为数据库开发者的日常伴侣,Navicat以其直观的…...
【深度剖析】从libgomp TLS内存分配冲突到scikit-learn在ARM平台的兼容性优化
1. ARM架构下TLS内存分配的底层原理 当你在ARM服务器上跑scikit-learn模型时,突然蹦出"cannot allocate memory in static TLS block"错误,这背后其实是线程本地存储(TLS)在作祟。想象每个线程都有自己专属的储物柜&…...
告别CANoe依赖:手把手教你用Visual Studio 2019为UDS $27服务开发通用DLL(附Python调用脚本)
从零构建UDS安全访问DLL:Visual Studio 2019实战指南与Python无缝集成 在汽车电子诊断领域,UDS(Unified Diagnostic Services)协议的安全访问服务($27服务)是保护ECU敏感操作的核心机制。传统方案往往依赖C…...
从CVE-2025-65112到NPM投毒:手把手教你搭建安全的私有包仓库(以PubNet为例)
从CVE-2025-65112到NPM投毒:手把手教你搭建安全的私有包仓库(以PubNet为例) 最近几年,软件供应链攻击事件频发,从SolarWinds事件到Log4j漏洞,再到最近的NPM投毒事件,每一次都让开发者们心惊胆战…...
