当前位置: 首页 > news >正文

Flink CEP(二) 运行源码解析

通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。

1. Pattern的定义

Pattern<Tuple3<String, Long, String>,?> pattern = Pattern.<Tuple3<String, Long, String>>begin("begin").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("success");}}).followedByAny("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("fail");}}).followedBy("end").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("end");}});

 在执行中,我们可以看到pattern的几个属性,进入Pattern类中查看。

public class Pattern<T, F extends T> {/** Name of the pattern. */private final String name;/** Previous pattern. */private final Pattern<T, ? extends T> previous;/** The condition an event has to satisfy to be considered a matched. */private IterativeCondition<F> condition;/** Window length in which the pattern match has to occur. */private final Map<WithinType, Time> windowTimes = new HashMap<>();/*** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}.*/private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);/** The condition an event has to satisfy to stop collecting events into looping state. */private IterativeCondition<F> untilCondition;/** Applicable to a {@code times} pattern, and holds the number of times it has to appear. */private Times times;private final AfterMatchSkipStrategy afterMatchSkipStrategy;
}

可以看到每一个Pattern都会存在以下属性:

  • Name:Pattern的Name
  • previous:之前的Pattern
  • condition:Pattern的匹配逻辑
  • windowTimes:限制窗口的时长
  • Quantifier:Pattern的属性,包括配置Pattern的模式可以发生的循环次数,或者这个模式是贪婪的还是可选的。
    • /*** A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.** <ol>*   <li>Single*   <li>Looping*   <li>Times* </ol>** <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times* also hava an additional inner consuming strategy that is applied between accepted events in the* pattern.*/
      public class Quantifier {private final EnumSet<QuantifierProperty> properties;private final ConsumingStrategy consumingStrategy;private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
      }
  • untilCondition:Pattern的循环匹配的结束条件
  • times:连续匹配次数
  • afterMatchSkipStrategy:匹配后的跳过策略

2.PatternStream的构建

        对Pattern定义完成,会通过PatternStreamBuilder,将1中定义好的Pattern应用到输入流中,返回对应的PatternStream。

    static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);}PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));}

继续执行代码,进入Select()。

    public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction,final TypeInformation<R> outTypeInfo) {final PatternProcessFunction<T, R> processFunction =fromSelect(builder.clean(patternSelectFunction)).build();return process(processFunction, outTypeInfo);}

进入process可以看到PatternStream.select会调用builder.build函数。

    public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction,final TypeInformation<R> outTypeInfo) {return builder.build(outTypeInfo, builder.clean(patternProcessFunction));}

在build函数中会完成NFAFactory的定义,随后构建CepOperator。inputstream随之运行CepOperator即pattern定义的处理逻辑,并返回结果流PatternStream。

    <OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,final PatternProcessFunction<IN, OUT> processFunction) {checkNotNull(outTypeInfo);checkNotNull(processFunction);final TypeSerializer<IN> inputSerializer =inputStream.getType().createSerializer(inputStream.getExecutionConfig());final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;final NFACompiler.NFAFactory<IN> nfaFactory =NFACompiler.compileFactory(pattern, timeoutHandling);CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);final SingleOutputStreamOperator<OUT> patternStream;if (inputStream instanceof KeyedStream) {KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);} else {KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();patternStream =inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator).forceNonParallel();}return patternStream;}

3.CepOperator的执行

        初始化。

    @Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

 可以看到,nfaFactory.createNFA();会解析pattern组合,并为每一个pattern创建一个state。

CepOperator会在processElement中处理流中的每条数据。

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (isProcessingTime) {if (comparator == null) {// there can be no out of order elements in processing timeNFAState nfaState = getNFAState();long timestamp = getProcessingTimeService().getCurrentProcessingTime();advanceTime(nfaState, timestamp);processEvent(nfaState, element.getValue(), timestamp);updateNFA(nfaState);} else {long currentTime = timerService.currentProcessingTime();bufferEvent(element.getValue(), currentTime);}} else {long timestamp = element.getTimestamp();IN value = element.getValue();// In event-time processing we assume correctness of the watermark.// Events with timestamp smaller than or equal with the last seen watermark are// considered late.// Late events are put in a dedicated side output, if the user has specified one.if (timestamp > timerService.currentWatermark()) {// we have an event with a valid timestamp, so// we buffer it until we receive the proper watermark.bufferEvent(value, timestamp);} else if (lateDataOutputTag != null) {output.collect(lateDataOutputTag, element);} else {numLateRecordsDropped.inc();}}}

        可以看到,如果使用的是处理时间,需要先对数据根据当前处理时间将乱序的数据做一次处理,保证数据的有序。

        如果使用的事件时间,如果事件时间戳小于等于watermark会被认为是迟到数据。

        正常数据会先被缓存起来,等待处理。

    private void bufferEvent(IN event, long currentTime) throws Exception {List<IN> elementsForTimestamp = elementQueueState.get(currentTime);if (elementsForTimestamp == null) {elementsForTimestamp = new ArrayList<>();registerTimer(currentTime);}elementsForTimestamp.add(event);elementQueueState.put(currentTime, elementsForTimestamp);}

       elementQueueState 会以时间戳为key保存对应的数据。在onEventTime()函数中通过processEvent中处理缓存的匹配数据。

    @Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {// 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists//		by feeding them in the NFA// 3) advance the time to the current watermark, so that expired patterns are discarded.// 4) update the stored state for the key, by only storing the new NFA and MapState iff they//		have state to be used later.// 5) update the last seen watermark.// STEP 1PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();NFAState nfaState = getNFAState();// STEP 2while (!sortedTimestamps.isEmpty()&& sortedTimestamps.peek() <= timerService.currentWatermark()) {long timestamp = sortedTimestamps.poll();advanceTime(nfaState, timestamp);// 对事件按时间进行排序try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {elements.forEachOrdered(event -> {try {processEvent(nfaState, event, timestamp);} catch (Exception e) {throw new RuntimeException(e);}});}elementQueueState.remove(timestamp);}// STEP 3advanceTime(nfaState, timerService.currentWatermark());// STEP 4updateNFA(nfaState);}
   private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {Collection<Map<String, List<IN>>> patterns =nfa.process(sharedBufferAccessor,nfaState,event,timestamp,afterMatchSkipStrategy,cepTimerService);if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {registerTimer(timestamp + nfa.getWindowTime());}processMatchedSequences(patterns, timestamp);}}private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {PatternProcessFunction<IN, OUT> function = getUserFunction();setTimestamp(timestamp);for (Map<String, List<IN>> matchingSequence : matchingSequences) {function.processMatch(matchingSequence, context, collector);}}

        nfa.process()最后会调用doProcess进行处理。

        computer

         可以看到每来一个新的Event,就会从上一个数据停留的状态开始遍历。判断新事件Event匹配之前已经匹配过的哪个状态,并为其版本号+1

前5条数据是success->fail->fail->success->fail,我们可以观察到partialMatches的变化如下:

success事件到达,因为之前没有事件,所以当前停留的状态是 begin。success匹配,预期会停留在middle状态

 fail事件到达,可以看到上面的success事件停留在了middle状态,并且begin的版本+1.

判断这个fail事件可以匹配后续的patern,状态从middle转移到end。存在newComputationStates中。最终更新到partialMatch中。

 第二个fail事件到达,只能匹配之前的middle状态,所以partialMatch中会新增一个end状态,并且middle的版本+1;

 

 最后如果状态到达终态,输出到potentialMatches中存储。

打印结果,可以看到每个事件都会试图去匹配所有的历史状态,nfa会存储所有匹配上的历史状态,直到到达终态。 

相关文章:

Flink CEP(二) 运行源码解析

通过DemoApp学习一下&#xff0c;CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。 1. Pattern的定义 Pattern<Tuple3<String, Long, String>,?> pattern Pattern.<Tuple3<String, Long, String>>begin("begin").where(new…...

剑指Offer-学习计划(四)双指针(下)

剑指 Offer 57. 和为s的两个数字 剑指 Offer 58 - I. 翻转单词顺序 剑指 Offer 21. 调整数组顺序使奇数位于偶数前面 题目一&#xff1a;调整数组顺序使奇数位于偶数前面 输入一个整数数组&#xff0c;实现一个函数来调整该数组中数字的顺序&#xff0c;使得所有奇数在数组的…...

深度学习——常见注意力机制

1.SENet SENet属于通道注意力机制。2017年提出&#xff0c;是imageNet最后的冠军 SENet采用的方法是对于特征层赋予权值。 重点在于如何赋权 1.将输入信息的所有通道平均池化。 2.平均池化后进行两次全连接&#xff0c;第一次全连接链接的神经元较少&#xff0c;第二次全连…...

Python 进阶(七):高级文件操作(shutil 模块)

❤️ 博客主页&#xff1a;水滴技术 &#x1f338; 订阅专栏&#xff1a;Python 入门核心技术 &#x1f680; 支持水滴&#xff1a;点赞&#x1f44d; 收藏⭐ 留言&#x1f4ac; 文章目录 1. 简介2. 常用函数2.1 复制文件2.2 复制目录2.3 移动文件或目录2.4 删除文件或目录2.…...

保留网络:大型语言模型的Transformer继任者

原文信息 原文题目&#xff1a;《Retentive Network: A Successor to Transformer for Large Language Models》 原文引用&#xff1a;Sun Y, Dong L, Huang S, et al. Retentive Network: A Successor to Transformer for Large Language Models[J]. arXiv preprint arXiv:2…...

算法通关村第二关——反转链表青铜笔记

LeetCode 206.反转链表 建立虚拟结点辅助翻转 public ListNode reverseList(ListNode head) {ListNode ans new ListNode(-1);ListNode cur head;while(cur!null){ListNode curNext cur.next;cur.next ans.next;ans.next cur;cur curNext;}return ans.next; }不带虚拟头…...

【Linux】——线程安全

目录 关于线程进程的问题 可重入与线程安全 常见的线程安全的情况 常见的不可重入的情况 常见的可重入的情况 可重入与线程安全区别 可重入与线程安全联系 Linux线程互斥 进程线程间的互斥相关概念 互斥量mutex 互斥量mutex常用接口 互斥量改造抢票系统 互斥量的原…...

[React]生命周期

前言 学习React&#xff0c;生命周期很重要&#xff0c;我们了解完生命周期的各个组件&#xff0c;对写高性能组件会有很大的帮助. Ract生命周期 React 生命周期分为三种状态 1. 初始化 2.更新 3.销毁 初始化 1、getDefaultProps() 设置默认的props&#xff0c;也可以用duf…...

【2023】Redis实现消息队列的方式汇总以及代码实现

Redis实现消息队列的方式汇总以及代码实现 前言开始前准备1、添加依赖2、添加配置的Bean 具体实现一、从最简单的开始&#xff1a;List 队列代码实现 二、发布订阅模式&#xff1a;Pub/Sub1、使用RedisMessageListenerContainer实现订阅2、还可以使用redisTemplate实现订阅 三、…...

ARM裸机-10

1、X210开发板和光盘资料 1.1、配置信息 CPU&#xff1a;三星S5PV210 内存&#xff1a;512M DDR2 SDRAM Flash&#xff1a;4GB iBand LCD&#xff1a;7寸&#xff0c;分辨率800x480 触摸屏&#xff1a;电容触摸屏 2、X210开发板硬件手册 3、X210开发板刷系统 3.1、什么是刷…...

「C/C++」C/C++指针详解

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「算法」数据结构与算法「File」数据文件格式 目录 一、术语…...

提高电脑寿命的维护技巧与方法分享

在维护电脑运行方面&#xff0c;我有一些自己觉得非常有用的技巧和方法。下面我将分享一些我常用的维护技巧&#xff0c;并解释为什么我会选择这样做以及这样做的好处。 首先&#xff0c;我经常清理我的电脑内部的灰尘。电脑内部的灰尘会影响散热效果&#xff0c;导致电脑发热…...

React常见面试题

React常见面试题 一、React中的样式管理有哪些方法 内联样式&#xff1a;对象&#xff0c;作用于当前组件普通样式表&#xff1a; 作用于全局&#xff0c;文件名是&#xff1a;xxx.scssCSS模块&#xff1a;类似Vue的scoped&#xff0c; 文件名需是&#xff1a;xxx.module.scs…...

C++中数据的输入输出介绍

C中数据的输入输出介绍 C中数据的输入输出涉及到的文件 <iostream>&#xff1a;这是C标准库中最常用的头文件之一&#xff0c;包含了进行标准输入输出操作的类和对象&#xff0c;如std::cin、std::cout、std::endl等。 <iomanip>&#xff1a;该头文件提供了一些用…...

0101日志-运维-mysql

1 错误日志 错误日志&#xff08;Error Log&#xff09;&#xff1a;错误日志记录了MySQL引擎在运行过程中出现的错误和异常情况。这些错误可能包括启动和关闭问题、数据库崩溃、权限问题等。错误日志对于排查和解决MySQL引擎问题非常有帮助。 改日志默认开启&#xff0c;默认存…...

LabVIEW使用灰度和边缘检测进行视频滤波

LabVIEW使用灰度和边缘检测进行视频滤波 数字图像处理&#xff08;DIP&#xff09;是真实和连续世界的离散表示。除此之外&#xff0c;这种数字图像在通信、医学、遥感、地震学、工业自动化、机器人、航空航天和教育等领域变得非常重要。计算机技术越来越需要视频图像的数字图…...

SpringBoot整合WebService

SpringBoot整合WebService WebService是一个比较旧的远程调用通信框架&#xff0c;现在企业项目中用的比较少&#xff0c;因为它逐步被SpringCloud所取代&#xff0c;它的优势就是能够跨语言平台通信&#xff0c;所以还有点价值&#xff0c;下面来看看如何在SpringBoot项目中使…...

【LangChain】向量存储之FAISS

LangChain学习文档 【LangChain】向量存储(Vector stores)【LangChain】向量存储之FAISS 概要 Facebook AI 相似性搜索&#xff08;Faiss&#xff09;是一个用于高效相似性搜索和密集向量聚类的库。它包含的算法可以搜索任意大小的向量集&#xff0c;甚至可能无法容纳在 RAM 中…...

小研究 - 主动式微服务细粒度弹性缩放算法研究(三)

微服务架构已成为云数据中心的基本服务架构。但目前关于微服务系统弹性缩放的研究大多是基于服务或实例级别的水平缩放&#xff0c;忽略了能够充分利用单台服务器资源的细粒度垂直缩放&#xff0c;从而导致资源浪费。为此&#xff0c;本文设计了主动式微服务细粒度弹性缩放算法…...

驱动开发相关内容复盘

并发与竞争 并发 ​ 多个“用户”同时访问同一个共享资源。 竞争 并发和竞争的处理方法 处理并发和竞争的机制&#xff1a;原子操作、自旋锁、信号量和互斥体。 1、原子操作 ​ 原子操作就是指不能再进一步分割的操作&#xff0c;一般原子操作用于变量或者位操作。 ​ …...

人工智能导论:模型与算法(未来发展与趋势)

9 人工智能未来发展和趋势 人工智能作为引领新一轮科技革命和产业变革的战略性技术&#xff0c;正在深刻改变人类社会。本章从类脑计算、自动化机器学习、神经网络压缩、人工智能芯片、量子机器学习、人工智能伦理与治理、人工智能算法开发框架等方面&#xff0c;简要总结人工智…...

Taotoken在应对大模型API服务波动时的路由与容灾机制体验

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 Taotoken在应对大模型API服务波动时的路由与容灾机制体验 1. 背景与观测场景 在开发实践中&#xff0c;我们时常会遇到依赖的某个…...

用Arduino Uno和8个舵机,我让这个并联腿机器狗走起来了(附完整代码)

用Arduino Uno和8个舵机打造会走路的并联腿机器狗 第一次看到机器狗灵活地迈步时&#xff0c;那种成就感至今难忘。作为创客爱好者&#xff0c;我决定用最基础的Arduino Uno和8个舵机&#xff0c;从零开始搭建一个能自主行走的并联腿机器狗。这个项目不仅考验机械结构设计&…...

当A*算法遇上真实山地DEM:一份给无人机/机器人路径规划者的Python避坑指南

当A*算法遇上真实山地DEM&#xff1a;无人机路径规划的Python实战与优化 山地路径规划的独特挑战 在无人机和机器人导航领域&#xff0c;山地地形带来了传统路径规划算法难以应对的复杂性。与平坦城市环境不同&#xff0c;山地DEM&#xff08;数字高程模型&#xff09;数据包含…...

别再为路径报错头疼了!手把手教你将Robei工程无缝迁移到Quartus II(附文件整理技巧)

从Robei到Quartus II&#xff1a;工程迁移的完整避坑指南 第一次把Robei工程导入Quartus II时&#xff0c;我盯着满屏的路径报错和未定义模块提示&#xff0c;差点把键盘摔了。这种挫败感想必每个FPGA初学者都经历过——明明在Robei里运行完美的设计&#xff0c;换到Quartus II…...

i.MX6ULL LCD驱动适配实战:从设备树到时序调试全解析

1. 项目概述与核心价值最近在搞一个基于i.MX6ULL的工控HMI项目&#xff0c;屏幕显示是绕不开的一环。市面上很多教程要么只讲Framebuffer应用&#xff0c;要么直接给个现成的设备树文件让你照着改&#xff0c;至于里面的参数怎么来的、屏幕初始化序列怎么配&#xff0c;往往一笔…...

从源头到输出:开关电源纹波与噪声的精准抑制策略

1. 开关电源纹波与噪声的本质解析 第一次拆解开关电源时&#xff0c;我被电路板上密集的元器件和错综复杂的走线震撼到了。作为电源工程师&#xff0c;我们每天都在和这些看不见的"电脉冲"打交道——纹波就像电源的心跳&#xff0c;而噪声则是它偶尔的"咳嗽&qu…...

ROS Topic通讯实战:拆解`/turtle1/cmd_vel`,理解速度指令如何驱动小乌龟运动

ROS Topic通讯实战&#xff1a;拆解/turtle1/cmd_vel&#xff0c;理解速度指令如何驱动小乌龟运动 在机器人操作系统&#xff08;ROS&#xff09;的学习过程中&#xff0c;控制小乌龟&#xff08;turtlesim&#xff09;画圆是一个经典案例。这个看似简单的任务背后&#xff0c;…...

iOS 18.1 5G功能深度解析:从智能省电到SA网络优化

1. 项目概述&#xff1a;一次聚焦于连接体验的深度更新作为一名长期跟踪移动操作系统生态的从业者&#xff0c;每次苹果发布新的iOS版本&#xff0c;我都会习惯性地去拆解其更新日志&#xff0c;看看哪些是“面子工程”&#xff0c;哪些是真正触及用户体验核心的“里子升级”。…...

RK3576嵌入式平台Weston配置实战:从显示校准到性能调优

1. 项目概述&#xff1a;为什么Weston配置值得深挖&#xff1f;如果你正在基于RK3576这类高性能嵌入式平台进行产品开发&#xff0c;尤其是涉及图形化人机交互界面的项目&#xff0c;那么你大概率已经接触或正在使用Wayland/Weston这套显示协议栈。RK3576作为一款集成了强大GPU…...