当前位置: 首页 > news >正文

Flink CEP(二) 运行源码解析

通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。

1. Pattern的定义

Pattern<Tuple3<String, Long, String>,?> pattern = Pattern.<Tuple3<String, Long, String>>begin("begin").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("success");}}).followedByAny("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("fail");}}).followedBy("end").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("end");}});

 在执行中,我们可以看到pattern的几个属性,进入Pattern类中查看。

public class Pattern<T, F extends T> {/** Name of the pattern. */private final String name;/** Previous pattern. */private final Pattern<T, ? extends T> previous;/** The condition an event has to satisfy to be considered a matched. */private IterativeCondition<F> condition;/** Window length in which the pattern match has to occur. */private final Map<WithinType, Time> windowTimes = new HashMap<>();/*** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}.*/private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);/** The condition an event has to satisfy to stop collecting events into looping state. */private IterativeCondition<F> untilCondition;/** Applicable to a {@code times} pattern, and holds the number of times it has to appear. */private Times times;private final AfterMatchSkipStrategy afterMatchSkipStrategy;
}

可以看到每一个Pattern都会存在以下属性:

  • Name:Pattern的Name
  • previous:之前的Pattern
  • condition:Pattern的匹配逻辑
  • windowTimes:限制窗口的时长
  • Quantifier:Pattern的属性,包括配置Pattern的模式可以发生的循环次数,或者这个模式是贪婪的还是可选的。
    • /*** A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.** <ol>*   <li>Single*   <li>Looping*   <li>Times* </ol>** <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times* also hava an additional inner consuming strategy that is applied between accepted events in the* pattern.*/
      public class Quantifier {private final EnumSet<QuantifierProperty> properties;private final ConsumingStrategy consumingStrategy;private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
      }
  • untilCondition:Pattern的循环匹配的结束条件
  • times:连续匹配次数
  • afterMatchSkipStrategy:匹配后的跳过策略

2.PatternStream的构建

        对Pattern定义完成,会通过PatternStreamBuilder,将1中定义好的Pattern应用到输入流中,返回对应的PatternStream。

    static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);}PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));}

继续执行代码,进入Select()。

    public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction,final TypeInformation<R> outTypeInfo) {final PatternProcessFunction<T, R> processFunction =fromSelect(builder.clean(patternSelectFunction)).build();return process(processFunction, outTypeInfo);}

进入process可以看到PatternStream.select会调用builder.build函数。

    public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction,final TypeInformation<R> outTypeInfo) {return builder.build(outTypeInfo, builder.clean(patternProcessFunction));}

在build函数中会完成NFAFactory的定义,随后构建CepOperator。inputstream随之运行CepOperator即pattern定义的处理逻辑,并返回结果流PatternStream。

    <OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,final PatternProcessFunction<IN, OUT> processFunction) {checkNotNull(outTypeInfo);checkNotNull(processFunction);final TypeSerializer<IN> inputSerializer =inputStream.getType().createSerializer(inputStream.getExecutionConfig());final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;final NFACompiler.NFAFactory<IN> nfaFactory =NFACompiler.compileFactory(pattern, timeoutHandling);CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);final SingleOutputStreamOperator<OUT> patternStream;if (inputStream instanceof KeyedStream) {KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);} else {KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();patternStream =inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator).forceNonParallel();}return patternStream;}

3.CepOperator的执行

        初始化。

    @Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

 可以看到,nfaFactory.createNFA();会解析pattern组合,并为每一个pattern创建一个state。

CepOperator会在processElement中处理流中的每条数据。

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (isProcessingTime) {if (comparator == null) {// there can be no out of order elements in processing timeNFAState nfaState = getNFAState();long timestamp = getProcessingTimeService().getCurrentProcessingTime();advanceTime(nfaState, timestamp);processEvent(nfaState, element.getValue(), timestamp);updateNFA(nfaState);} else {long currentTime = timerService.currentProcessingTime();bufferEvent(element.getValue(), currentTime);}} else {long timestamp = element.getTimestamp();IN value = element.getValue();// In event-time processing we assume correctness of the watermark.// Events with timestamp smaller than or equal with the last seen watermark are// considered late.// Late events are put in a dedicated side output, if the user has specified one.if (timestamp > timerService.currentWatermark()) {// we have an event with a valid timestamp, so// we buffer it until we receive the proper watermark.bufferEvent(value, timestamp);} else if (lateDataOutputTag != null) {output.collect(lateDataOutputTag, element);} else {numLateRecordsDropped.inc();}}}

        可以看到,如果使用的是处理时间,需要先对数据根据当前处理时间将乱序的数据做一次处理,保证数据的有序。

        如果使用的事件时间,如果事件时间戳小于等于watermark会被认为是迟到数据。

        正常数据会先被缓存起来,等待处理。

    private void bufferEvent(IN event, long currentTime) throws Exception {List<IN> elementsForTimestamp = elementQueueState.get(currentTime);if (elementsForTimestamp == null) {elementsForTimestamp = new ArrayList<>();registerTimer(currentTime);}elementsForTimestamp.add(event);elementQueueState.put(currentTime, elementsForTimestamp);}

       elementQueueState 会以时间戳为key保存对应的数据。在onEventTime()函数中通过processEvent中处理缓存的匹配数据。

    @Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {// 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists//		by feeding them in the NFA// 3) advance the time to the current watermark, so that expired patterns are discarded.// 4) update the stored state for the key, by only storing the new NFA and MapState iff they//		have state to be used later.// 5) update the last seen watermark.// STEP 1PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();NFAState nfaState = getNFAState();// STEP 2while (!sortedTimestamps.isEmpty()&& sortedTimestamps.peek() <= timerService.currentWatermark()) {long timestamp = sortedTimestamps.poll();advanceTime(nfaState, timestamp);// 对事件按时间进行排序try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {elements.forEachOrdered(event -> {try {processEvent(nfaState, event, timestamp);} catch (Exception e) {throw new RuntimeException(e);}});}elementQueueState.remove(timestamp);}// STEP 3advanceTime(nfaState, timerService.currentWatermark());// STEP 4updateNFA(nfaState);}
   private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {Collection<Map<String, List<IN>>> patterns =nfa.process(sharedBufferAccessor,nfaState,event,timestamp,afterMatchSkipStrategy,cepTimerService);if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {registerTimer(timestamp + nfa.getWindowTime());}processMatchedSequences(patterns, timestamp);}}private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {PatternProcessFunction<IN, OUT> function = getUserFunction();setTimestamp(timestamp);for (Map<String, List<IN>> matchingSequence : matchingSequences) {function.processMatch(matchingSequence, context, collector);}}

        nfa.process()最后会调用doProcess进行处理。

        computer

         可以看到每来一个新的Event,就会从上一个数据停留的状态开始遍历。判断新事件Event匹配之前已经匹配过的哪个状态,并为其版本号+1

前5条数据是success->fail->fail->success->fail,我们可以观察到partialMatches的变化如下:

success事件到达,因为之前没有事件,所以当前停留的状态是 begin。success匹配,预期会停留在middle状态

 fail事件到达,可以看到上面的success事件停留在了middle状态,并且begin的版本+1.

判断这个fail事件可以匹配后续的patern,状态从middle转移到end。存在newComputationStates中。最终更新到partialMatch中。

 第二个fail事件到达,只能匹配之前的middle状态,所以partialMatch中会新增一个end状态,并且middle的版本+1;

 

 最后如果状态到达终态,输出到potentialMatches中存储。

打印结果,可以看到每个事件都会试图去匹配所有的历史状态,nfa会存储所有匹配上的历史状态,直到到达终态。 

相关文章:

Flink CEP(二) 运行源码解析

通过DemoApp学习一下&#xff0c;CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。 1. Pattern的定义 Pattern<Tuple3<String, Long, String>,?> pattern Pattern.<Tuple3<String, Long, String>>begin("begin").where(new…...

剑指Offer-学习计划(四)双指针(下)

剑指 Offer 57. 和为s的两个数字 剑指 Offer 58 - I. 翻转单词顺序 剑指 Offer 21. 调整数组顺序使奇数位于偶数前面 题目一&#xff1a;调整数组顺序使奇数位于偶数前面 输入一个整数数组&#xff0c;实现一个函数来调整该数组中数字的顺序&#xff0c;使得所有奇数在数组的…...

深度学习——常见注意力机制

1.SENet SENet属于通道注意力机制。2017年提出&#xff0c;是imageNet最后的冠军 SENet采用的方法是对于特征层赋予权值。 重点在于如何赋权 1.将输入信息的所有通道平均池化。 2.平均池化后进行两次全连接&#xff0c;第一次全连接链接的神经元较少&#xff0c;第二次全连…...

Python 进阶(七):高级文件操作(shutil 模块)

❤️ 博客主页&#xff1a;水滴技术 &#x1f338; 订阅专栏&#xff1a;Python 入门核心技术 &#x1f680; 支持水滴&#xff1a;点赞&#x1f44d; 收藏⭐ 留言&#x1f4ac; 文章目录 1. 简介2. 常用函数2.1 复制文件2.2 复制目录2.3 移动文件或目录2.4 删除文件或目录2.…...

保留网络:大型语言模型的Transformer继任者

原文信息 原文题目&#xff1a;《Retentive Network: A Successor to Transformer for Large Language Models》 原文引用&#xff1a;Sun Y, Dong L, Huang S, et al. Retentive Network: A Successor to Transformer for Large Language Models[J]. arXiv preprint arXiv:2…...

算法通关村第二关——反转链表青铜笔记

LeetCode 206.反转链表 建立虚拟结点辅助翻转 public ListNode reverseList(ListNode head) {ListNode ans new ListNode(-1);ListNode cur head;while(cur!null){ListNode curNext cur.next;cur.next ans.next;ans.next cur;cur curNext;}return ans.next; }不带虚拟头…...

【Linux】——线程安全

目录 关于线程进程的问题 可重入与线程安全 常见的线程安全的情况 常见的不可重入的情况 常见的可重入的情况 可重入与线程安全区别 可重入与线程安全联系 Linux线程互斥 进程线程间的互斥相关概念 互斥量mutex 互斥量mutex常用接口 互斥量改造抢票系统 互斥量的原…...

[React]生命周期

前言 学习React&#xff0c;生命周期很重要&#xff0c;我们了解完生命周期的各个组件&#xff0c;对写高性能组件会有很大的帮助. Ract生命周期 React 生命周期分为三种状态 1. 初始化 2.更新 3.销毁 初始化 1、getDefaultProps() 设置默认的props&#xff0c;也可以用duf…...

【2023】Redis实现消息队列的方式汇总以及代码实现

Redis实现消息队列的方式汇总以及代码实现 前言开始前准备1、添加依赖2、添加配置的Bean 具体实现一、从最简单的开始&#xff1a;List 队列代码实现 二、发布订阅模式&#xff1a;Pub/Sub1、使用RedisMessageListenerContainer实现订阅2、还可以使用redisTemplate实现订阅 三、…...

ARM裸机-10

1、X210开发板和光盘资料 1.1、配置信息 CPU&#xff1a;三星S5PV210 内存&#xff1a;512M DDR2 SDRAM Flash&#xff1a;4GB iBand LCD&#xff1a;7寸&#xff0c;分辨率800x480 触摸屏&#xff1a;电容触摸屏 2、X210开发板硬件手册 3、X210开发板刷系统 3.1、什么是刷…...

「C/C++」C/C++指针详解

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「算法」数据结构与算法「File」数据文件格式 目录 一、术语…...

提高电脑寿命的维护技巧与方法分享

在维护电脑运行方面&#xff0c;我有一些自己觉得非常有用的技巧和方法。下面我将分享一些我常用的维护技巧&#xff0c;并解释为什么我会选择这样做以及这样做的好处。 首先&#xff0c;我经常清理我的电脑内部的灰尘。电脑内部的灰尘会影响散热效果&#xff0c;导致电脑发热…...

React常见面试题

React常见面试题 一、React中的样式管理有哪些方法 内联样式&#xff1a;对象&#xff0c;作用于当前组件普通样式表&#xff1a; 作用于全局&#xff0c;文件名是&#xff1a;xxx.scssCSS模块&#xff1a;类似Vue的scoped&#xff0c; 文件名需是&#xff1a;xxx.module.scs…...

C++中数据的输入输出介绍

C中数据的输入输出介绍 C中数据的输入输出涉及到的文件 <iostream>&#xff1a;这是C标准库中最常用的头文件之一&#xff0c;包含了进行标准输入输出操作的类和对象&#xff0c;如std::cin、std::cout、std::endl等。 <iomanip>&#xff1a;该头文件提供了一些用…...

0101日志-运维-mysql

1 错误日志 错误日志&#xff08;Error Log&#xff09;&#xff1a;错误日志记录了MySQL引擎在运行过程中出现的错误和异常情况。这些错误可能包括启动和关闭问题、数据库崩溃、权限问题等。错误日志对于排查和解决MySQL引擎问题非常有帮助。 改日志默认开启&#xff0c;默认存…...

LabVIEW使用灰度和边缘检测进行视频滤波

LabVIEW使用灰度和边缘检测进行视频滤波 数字图像处理&#xff08;DIP&#xff09;是真实和连续世界的离散表示。除此之外&#xff0c;这种数字图像在通信、医学、遥感、地震学、工业自动化、机器人、航空航天和教育等领域变得非常重要。计算机技术越来越需要视频图像的数字图…...

SpringBoot整合WebService

SpringBoot整合WebService WebService是一个比较旧的远程调用通信框架&#xff0c;现在企业项目中用的比较少&#xff0c;因为它逐步被SpringCloud所取代&#xff0c;它的优势就是能够跨语言平台通信&#xff0c;所以还有点价值&#xff0c;下面来看看如何在SpringBoot项目中使…...

【LangChain】向量存储之FAISS

LangChain学习文档 【LangChain】向量存储(Vector stores)【LangChain】向量存储之FAISS 概要 Facebook AI 相似性搜索&#xff08;Faiss&#xff09;是一个用于高效相似性搜索和密集向量聚类的库。它包含的算法可以搜索任意大小的向量集&#xff0c;甚至可能无法容纳在 RAM 中…...

小研究 - 主动式微服务细粒度弹性缩放算法研究(三)

微服务架构已成为云数据中心的基本服务架构。但目前关于微服务系统弹性缩放的研究大多是基于服务或实例级别的水平缩放&#xff0c;忽略了能够充分利用单台服务器资源的细粒度垂直缩放&#xff0c;从而导致资源浪费。为此&#xff0c;本文设计了主动式微服务细粒度弹性缩放算法…...

驱动开发相关内容复盘

并发与竞争 并发 ​ 多个“用户”同时访问同一个共享资源。 竞争 并发和竞争的处理方法 处理并发和竞争的机制&#xff1a;原子操作、自旋锁、信号量和互斥体。 1、原子操作 ​ 原子操作就是指不能再进一步分割的操作&#xff0c;一般原子操作用于变量或者位操作。 ​ …...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

如何在看板中有效管理突发紧急任务

在看板中有效管理突发紧急任务需要&#xff1a;设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP&#xff08;Work-in-Progress&#xff09;弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中&#xff0c;设立专门的紧急任务通道尤为重要&#xff0c;这能…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

return this;返回的是谁

一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请&#xff0c;不同级别的经理有不同的审批权限&#xff1a; // 抽象处理者&#xff1a;审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

快刀集(1): 一刀斩断视频片头广告

一刀流&#xff1a;用一个简单脚本&#xff0c;秒杀视频片头广告&#xff0c;还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农&#xff0c;平时写代码之余看看电影、补补片&#xff0c;是再正常不过的事。 电影嘛&#xff0c;要沉浸&#xff0c;…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能

1. 开发环境准备 ​​安装DevEco Studio 3.1​​&#xff1a; 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK ​​项目配置​​&#xff1a; // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...

基于鸿蒙(HarmonyOS5)的打车小程序

1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...

拟合问题处理

在机器学习中&#xff0c;核心任务通常围绕模型训练和性能提升展开&#xff0c;但你提到的 “优化训练数据解决过拟合” 和 “提升泛化性能解决欠拟合” 需要结合更准确的概念进行梳理。以下是对机器学习核心任务的系统复习和修正&#xff1a; 一、机器学习的核心任务框架 机…...

轻量安全的密码管理工具Vaultwarden

一、Vaultwarden概述 Vaultwarden主要作用是提供一个自托管的密码管理器服务。它是Bitwarden密码管理器的第三方轻量版&#xff0c;由国外开发者在Bitwarden的基础上&#xff0c;采用Rust语言重写而成。 &#xff08;一&#xff09;Vaultwarden镜像的作用及特点 轻量级与高性…...