当前位置: 首页 > news >正文

Flink CEP(二) 运行源码解析

通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。

1. Pattern的定义

Pattern<Tuple3<String, Long, String>,?> pattern = Pattern.<Tuple3<String, Long, String>>begin("begin").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("success");}}).followedByAny("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("fail");}}).followedBy("end").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("end");}});

 在执行中,我们可以看到pattern的几个属性,进入Pattern类中查看。

public class Pattern<T, F extends T> {/** Name of the pattern. */private final String name;/** Previous pattern. */private final Pattern<T, ? extends T> previous;/** The condition an event has to satisfy to be considered a matched. */private IterativeCondition<F> condition;/** Window length in which the pattern match has to occur. */private final Map<WithinType, Time> windowTimes = new HashMap<>();/*** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}.*/private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);/** The condition an event has to satisfy to stop collecting events into looping state. */private IterativeCondition<F> untilCondition;/** Applicable to a {@code times} pattern, and holds the number of times it has to appear. */private Times times;private final AfterMatchSkipStrategy afterMatchSkipStrategy;
}

可以看到每一个Pattern都会存在以下属性:

  • Name:Pattern的Name
  • previous:之前的Pattern
  • condition:Pattern的匹配逻辑
  • windowTimes:限制窗口的时长
  • Quantifier:Pattern的属性,包括配置Pattern的模式可以发生的循环次数,或者这个模式是贪婪的还是可选的。
    • /*** A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.** <ol>*   <li>Single*   <li>Looping*   <li>Times* </ol>** <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times* also hava an additional inner consuming strategy that is applied between accepted events in the* pattern.*/
      public class Quantifier {private final EnumSet<QuantifierProperty> properties;private final ConsumingStrategy consumingStrategy;private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
      }
  • untilCondition:Pattern的循环匹配的结束条件
  • times:连续匹配次数
  • afterMatchSkipStrategy:匹配后的跳过策略

2.PatternStream的构建

        对Pattern定义完成,会通过PatternStreamBuilder,将1中定义好的Pattern应用到输入流中,返回对应的PatternStream。

    static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);}PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));}

继续执行代码,进入Select()。

    public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction,final TypeInformation<R> outTypeInfo) {final PatternProcessFunction<T, R> processFunction =fromSelect(builder.clean(patternSelectFunction)).build();return process(processFunction, outTypeInfo);}

进入process可以看到PatternStream.select会调用builder.build函数。

    public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction,final TypeInformation<R> outTypeInfo) {return builder.build(outTypeInfo, builder.clean(patternProcessFunction));}

在build函数中会完成NFAFactory的定义,随后构建CepOperator。inputstream随之运行CepOperator即pattern定义的处理逻辑,并返回结果流PatternStream。

    <OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,final PatternProcessFunction<IN, OUT> processFunction) {checkNotNull(outTypeInfo);checkNotNull(processFunction);final TypeSerializer<IN> inputSerializer =inputStream.getType().createSerializer(inputStream.getExecutionConfig());final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;final NFACompiler.NFAFactory<IN> nfaFactory =NFACompiler.compileFactory(pattern, timeoutHandling);CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);final SingleOutputStreamOperator<OUT> patternStream;if (inputStream instanceof KeyedStream) {KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);} else {KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();patternStream =inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator).forceNonParallel();}return patternStream;}

3.CepOperator的执行

        初始化。

    @Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

 可以看到,nfaFactory.createNFA();会解析pattern组合,并为每一个pattern创建一个state。

CepOperator会在processElement中处理流中的每条数据。

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (isProcessingTime) {if (comparator == null) {// there can be no out of order elements in processing timeNFAState nfaState = getNFAState();long timestamp = getProcessingTimeService().getCurrentProcessingTime();advanceTime(nfaState, timestamp);processEvent(nfaState, element.getValue(), timestamp);updateNFA(nfaState);} else {long currentTime = timerService.currentProcessingTime();bufferEvent(element.getValue(), currentTime);}} else {long timestamp = element.getTimestamp();IN value = element.getValue();// In event-time processing we assume correctness of the watermark.// Events with timestamp smaller than or equal with the last seen watermark are// considered late.// Late events are put in a dedicated side output, if the user has specified one.if (timestamp > timerService.currentWatermark()) {// we have an event with a valid timestamp, so// we buffer it until we receive the proper watermark.bufferEvent(value, timestamp);} else if (lateDataOutputTag != null) {output.collect(lateDataOutputTag, element);} else {numLateRecordsDropped.inc();}}}

        可以看到,如果使用的是处理时间,需要先对数据根据当前处理时间将乱序的数据做一次处理,保证数据的有序。

        如果使用的事件时间,如果事件时间戳小于等于watermark会被认为是迟到数据。

        正常数据会先被缓存起来,等待处理。

    private void bufferEvent(IN event, long currentTime) throws Exception {List<IN> elementsForTimestamp = elementQueueState.get(currentTime);if (elementsForTimestamp == null) {elementsForTimestamp = new ArrayList<>();registerTimer(currentTime);}elementsForTimestamp.add(event);elementQueueState.put(currentTime, elementsForTimestamp);}

       elementQueueState 会以时间戳为key保存对应的数据。在onEventTime()函数中通过processEvent中处理缓存的匹配数据。

    @Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {// 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists//		by feeding them in the NFA// 3) advance the time to the current watermark, so that expired patterns are discarded.// 4) update the stored state for the key, by only storing the new NFA and MapState iff they//		have state to be used later.// 5) update the last seen watermark.// STEP 1PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();NFAState nfaState = getNFAState();// STEP 2while (!sortedTimestamps.isEmpty()&& sortedTimestamps.peek() <= timerService.currentWatermark()) {long timestamp = sortedTimestamps.poll();advanceTime(nfaState, timestamp);// 对事件按时间进行排序try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {elements.forEachOrdered(event -> {try {processEvent(nfaState, event, timestamp);} catch (Exception e) {throw new RuntimeException(e);}});}elementQueueState.remove(timestamp);}// STEP 3advanceTime(nfaState, timerService.currentWatermark());// STEP 4updateNFA(nfaState);}
   private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {Collection<Map<String, List<IN>>> patterns =nfa.process(sharedBufferAccessor,nfaState,event,timestamp,afterMatchSkipStrategy,cepTimerService);if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {registerTimer(timestamp + nfa.getWindowTime());}processMatchedSequences(patterns, timestamp);}}private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {PatternProcessFunction<IN, OUT> function = getUserFunction();setTimestamp(timestamp);for (Map<String, List<IN>> matchingSequence : matchingSequences) {function.processMatch(matchingSequence, context, collector);}}

        nfa.process()最后会调用doProcess进行处理。

        computer

         可以看到每来一个新的Event,就会从上一个数据停留的状态开始遍历。判断新事件Event匹配之前已经匹配过的哪个状态,并为其版本号+1

前5条数据是success->fail->fail->success->fail,我们可以观察到partialMatches的变化如下:

success事件到达,因为之前没有事件,所以当前停留的状态是 begin。success匹配,预期会停留在middle状态

 fail事件到达,可以看到上面的success事件停留在了middle状态,并且begin的版本+1.

判断这个fail事件可以匹配后续的patern,状态从middle转移到end。存在newComputationStates中。最终更新到partialMatch中。

 第二个fail事件到达,只能匹配之前的middle状态,所以partialMatch中会新增一个end状态,并且middle的版本+1;

 

 最后如果状态到达终态,输出到potentialMatches中存储。

打印结果,可以看到每个事件都会试图去匹配所有的历史状态,nfa会存储所有匹配上的历史状态,直到到达终态。 

相关文章:

Flink CEP(二) 运行源码解析

通过DemoApp学习一下&#xff0c;CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。 1. Pattern的定义 Pattern<Tuple3<String, Long, String>,?> pattern Pattern.<Tuple3<String, Long, String>>begin("begin").where(new…...

剑指Offer-学习计划(四)双指针(下)

剑指 Offer 57. 和为s的两个数字 剑指 Offer 58 - I. 翻转单词顺序 剑指 Offer 21. 调整数组顺序使奇数位于偶数前面 题目一&#xff1a;调整数组顺序使奇数位于偶数前面 输入一个整数数组&#xff0c;实现一个函数来调整该数组中数字的顺序&#xff0c;使得所有奇数在数组的…...

深度学习——常见注意力机制

1.SENet SENet属于通道注意力机制。2017年提出&#xff0c;是imageNet最后的冠军 SENet采用的方法是对于特征层赋予权值。 重点在于如何赋权 1.将输入信息的所有通道平均池化。 2.平均池化后进行两次全连接&#xff0c;第一次全连接链接的神经元较少&#xff0c;第二次全连…...

Python 进阶(七):高级文件操作(shutil 模块)

❤️ 博客主页&#xff1a;水滴技术 &#x1f338; 订阅专栏&#xff1a;Python 入门核心技术 &#x1f680; 支持水滴&#xff1a;点赞&#x1f44d; 收藏⭐ 留言&#x1f4ac; 文章目录 1. 简介2. 常用函数2.1 复制文件2.2 复制目录2.3 移动文件或目录2.4 删除文件或目录2.…...

保留网络:大型语言模型的Transformer继任者

原文信息 原文题目&#xff1a;《Retentive Network: A Successor to Transformer for Large Language Models》 原文引用&#xff1a;Sun Y, Dong L, Huang S, et al. Retentive Network: A Successor to Transformer for Large Language Models[J]. arXiv preprint arXiv:2…...

算法通关村第二关——反转链表青铜笔记

LeetCode 206.反转链表 建立虚拟结点辅助翻转 public ListNode reverseList(ListNode head) {ListNode ans new ListNode(-1);ListNode cur head;while(cur!null){ListNode curNext cur.next;cur.next ans.next;ans.next cur;cur curNext;}return ans.next; }不带虚拟头…...

【Linux】——线程安全

目录 关于线程进程的问题 可重入与线程安全 常见的线程安全的情况 常见的不可重入的情况 常见的可重入的情况 可重入与线程安全区别 可重入与线程安全联系 Linux线程互斥 进程线程间的互斥相关概念 互斥量mutex 互斥量mutex常用接口 互斥量改造抢票系统 互斥量的原…...

[React]生命周期

前言 学习React&#xff0c;生命周期很重要&#xff0c;我们了解完生命周期的各个组件&#xff0c;对写高性能组件会有很大的帮助. Ract生命周期 React 生命周期分为三种状态 1. 初始化 2.更新 3.销毁 初始化 1、getDefaultProps() 设置默认的props&#xff0c;也可以用duf…...

【2023】Redis实现消息队列的方式汇总以及代码实现

Redis实现消息队列的方式汇总以及代码实现 前言开始前准备1、添加依赖2、添加配置的Bean 具体实现一、从最简单的开始&#xff1a;List 队列代码实现 二、发布订阅模式&#xff1a;Pub/Sub1、使用RedisMessageListenerContainer实现订阅2、还可以使用redisTemplate实现订阅 三、…...

ARM裸机-10

1、X210开发板和光盘资料 1.1、配置信息 CPU&#xff1a;三星S5PV210 内存&#xff1a;512M DDR2 SDRAM Flash&#xff1a;4GB iBand LCD&#xff1a;7寸&#xff0c;分辨率800x480 触摸屏&#xff1a;电容触摸屏 2、X210开发板硬件手册 3、X210开发板刷系统 3.1、什么是刷…...

「C/C++」C/C++指针详解

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「算法」数据结构与算法「File」数据文件格式 目录 一、术语…...

提高电脑寿命的维护技巧与方法分享

在维护电脑运行方面&#xff0c;我有一些自己觉得非常有用的技巧和方法。下面我将分享一些我常用的维护技巧&#xff0c;并解释为什么我会选择这样做以及这样做的好处。 首先&#xff0c;我经常清理我的电脑内部的灰尘。电脑内部的灰尘会影响散热效果&#xff0c;导致电脑发热…...

React常见面试题

React常见面试题 一、React中的样式管理有哪些方法 内联样式&#xff1a;对象&#xff0c;作用于当前组件普通样式表&#xff1a; 作用于全局&#xff0c;文件名是&#xff1a;xxx.scssCSS模块&#xff1a;类似Vue的scoped&#xff0c; 文件名需是&#xff1a;xxx.module.scs…...

C++中数据的输入输出介绍

C中数据的输入输出介绍 C中数据的输入输出涉及到的文件 <iostream>&#xff1a;这是C标准库中最常用的头文件之一&#xff0c;包含了进行标准输入输出操作的类和对象&#xff0c;如std::cin、std::cout、std::endl等。 <iomanip>&#xff1a;该头文件提供了一些用…...

0101日志-运维-mysql

1 错误日志 错误日志&#xff08;Error Log&#xff09;&#xff1a;错误日志记录了MySQL引擎在运行过程中出现的错误和异常情况。这些错误可能包括启动和关闭问题、数据库崩溃、权限问题等。错误日志对于排查和解决MySQL引擎问题非常有帮助。 改日志默认开启&#xff0c;默认存…...

LabVIEW使用灰度和边缘检测进行视频滤波

LabVIEW使用灰度和边缘检测进行视频滤波 数字图像处理&#xff08;DIP&#xff09;是真实和连续世界的离散表示。除此之外&#xff0c;这种数字图像在通信、医学、遥感、地震学、工业自动化、机器人、航空航天和教育等领域变得非常重要。计算机技术越来越需要视频图像的数字图…...

SpringBoot整合WebService

SpringBoot整合WebService WebService是一个比较旧的远程调用通信框架&#xff0c;现在企业项目中用的比较少&#xff0c;因为它逐步被SpringCloud所取代&#xff0c;它的优势就是能够跨语言平台通信&#xff0c;所以还有点价值&#xff0c;下面来看看如何在SpringBoot项目中使…...

【LangChain】向量存储之FAISS

LangChain学习文档 【LangChain】向量存储(Vector stores)【LangChain】向量存储之FAISS 概要 Facebook AI 相似性搜索&#xff08;Faiss&#xff09;是一个用于高效相似性搜索和密集向量聚类的库。它包含的算法可以搜索任意大小的向量集&#xff0c;甚至可能无法容纳在 RAM 中…...

小研究 - 主动式微服务细粒度弹性缩放算法研究(三)

微服务架构已成为云数据中心的基本服务架构。但目前关于微服务系统弹性缩放的研究大多是基于服务或实例级别的水平缩放&#xff0c;忽略了能够充分利用单台服务器资源的细粒度垂直缩放&#xff0c;从而导致资源浪费。为此&#xff0c;本文设计了主动式微服务细粒度弹性缩放算法…...

驱动开发相关内容复盘

并发与竞争 并发 ​ 多个“用户”同时访问同一个共享资源。 竞争 并发和竞争的处理方法 处理并发和竞争的机制&#xff1a;原子操作、自旋锁、信号量和互斥体。 1、原子操作 ​ 原子操作就是指不能再进一步分割的操作&#xff0c;一般原子操作用于变量或者位操作。 ​ …...

2.2 身份鉴别与访问控制

数据参考&#xff1a;CISP官方 目录 身份鉴别基础基于实体所知的鉴别基于实体所有的鉴别基于实体特征的鉴别访问控制基础访问控制模型 一、身份鉴别基础 1、身份鉴别的概念 标识 实体身份的一种计算机表达每个实体与计算机内部的一个身份表达绑定信息系统在执行操作时&a…...

C++ 注释

程序的注释是解释性语句&#xff0c;您可以在 C 代码中包含注释&#xff0c;这将提高源代码的可读性。所有的编程语言都允许某种形式的注释。 C 支持单行注释和多行注释。注释中的所有字符会被 C 编译器忽略。 C 注释一般有两种&#xff1a; // - 一般用于单行注释。 /* … …...

Spring事务(声明式事务)(Spring的事务,Spring隔离级别,事务传播机制)

目录 一、什么是事务&#xff0c;为什么要用事务 二、Spring声明式事务 &#x1f345; 1、Transactional的使用 &#x1f388; 事务回滚 &#x1f388;注意&#xff1a;异常被捕获&#xff0c;不会发生事务回滚 &#x1f345; 2、Transactional 作⽤范围 &#x1f345; …...

Linux运维面试题(四)之Linux服务管理

Linux运维面试题&#xff08;四&#xff09;之Linux服务管理 4.1 SSHSSH的登录验证方式SSH的登陆端口&#xff08;默认22&#xff09;和监听设置&#xff08;/etc/ssh/sshd_config&#xff09;SSH的登录用户限制(/etc/ssh/sshd_config PermitRootLogin)SSH的登录超时设置(/etc/…...

ChatGPT能否撰写科研论文?

ChatGPT&#xff0c;这款被许多人誉为语言处理领域的“黑马”&#xff0c;究竟能否应用于撰写科研论文&#xff1f;近期&#xff0c;以色列理工学院生物学家兼数据科学家Roy Kishony带领的团队&#xff0c;针对这一问题进行了系列研究&#xff0c;其结果已在《Nature》杂志上发…...

2023 电赛 E 题 K210方案

第一章&#xff1a;K210 介绍 K210芯片是一款基于RISC-V架构的嵌入式人工智能芯片&#xff0c;具备低功耗、高性能的特点。它拥有强大的图像处理和机器学习能力&#xff0c;适用于边缘计算设备和物联网应用。为了方便开发者&#xff0c;K210芯片提供了丰富的外设接口&#xff…...

网络知识介绍

一、TCP 传输控制协议&#xff0c;Transmission Control Protocol。 面向广域网的通信协议&#xff0c;跨域多个网络通信时&#xff0c;为两个通信端点之间提供一条具有如下特点的通信方式&#xff1a; 基于流、面向连接、可靠通信方式、网络状况不佳时尽量降低系统由于重传带…...

MapStruct设置全局的ComponentModel

在mapStruct上边&#xff0c;如果我们要切换成非默认的组件模式&#xff0c;常常要在Mapper注释中添加componentModel "spring"&#xff0c;如果类太多的了的话&#xff0c;非常麻烦&#xff0c;有没有更好的方式呢&#xff0c;有的&#xff0c;可以在pom中添加一个…...

LinearAlgebraMIT_6_ColumnSpaceAndNullSpace

这节课的两个重点是column space列空间和null space零空间。 x.1 pre-multiply/left multiply and post-multiply/right multiply 对于pre-multiply/left multiply左乘和post-multiply/right multiply右乘&#xff0c;如果用英文的pre-和post-是比较容易理解的&#xff0c; A…...

出版物经营许可办理 出版物许可地址变更 出版物零售延期

一、出版物零售单位设立所需材料 1、申请书 2、营业执照 3、租赁合同 4、主要负责人身 份证 5、出版物经营许可申请表 二、办理出版物经营许可证所要符合的条件 1、有确定的企业名称和经营范围; 2、有出版物业务的经营场地; 3、有出版物业务的组织机构和发行人员。 三、…...