Flink CEP实现10秒内连续登录失败用户分析
1、什么是CEP?
Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。
2、代码实现
2.1 引入maven依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面这个依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
2.2 基本流程
//1、获取原始事件流
DataStream<Event> input = ......;
//2、定义匹配器
Pattern<Event,?> pattern = .......;
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(new PatternProcessFunction<Event, Result>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Result> out) throws Exception {}
});
2.3 完整代码
注意:代码运行前,先启动2.4 nlk socket服务
package com.roy.flink.project.userlogin;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配*/
public class MyUserLoginAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket测试env.setParallelism(1);// 1、获取原始事件流(10.86.97.206改为实际地址)final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {@Overridepublic UserLoginRecord map(String s) throws Exception {final String[] splitVal = s.split(",");return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime()));// 2、定义匹配器// 2.1:10秒内出现3次登录失败的记录(不一定连续)// Flink CEP定义消息匹配器。
// final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
// @Override
// public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
// return 1 == userLoginRecord.getLoginRes();
// }
// }).times(3).within(Time.seconds(10));// 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followByfinal Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("two").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("three").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).within(Time.seconds(10));// 3、获取匹配流final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);final MyProcessFunction myProcessFunction = new MyProcessFunction();// 4、将匹配流中的数据处理成结果数据流final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);badUserStream.print("badUser");env.execute("UserLoginAna");}// mainpublic static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{@Overridepublic void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {// 针对2.1 连续3次登录失败
// final List<UserLoginRecord> records = match.get("start");
// for(UserLoginRecord record : records){
// out.collect(record);
// }// 针对2.2 非连续3次登录失败final List<UserLoginRecord> records = match.get("three");for(UserLoginRecord record : records){out.collect(record);}}// processMarch}// MyProcessFunction
}
UserLoginRecord对象,如下:
public class UserLoginRecord {private String userId;private int loginRes; // 0-成功, 1-失败private long loginTime;public UserLoginRecord() {}public UserLoginRecord(String userId, int loginRes, long loginTime) {this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@Overridepublic String toString() {return "UserLoginRecord{" +"userId='" + userId + '\'' +", loginRes=" + loginRes +", loginTime=" + loginTime +'}';}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public int getLoginRes() {return loginRes;}public void setLoginRes(int loginRes) {this.loginRes = loginRes;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}
}
2.4 nlk模拟socket服务端

2.5 IDEA控制台打印

相关文章:
Flink CEP实现10秒内连续登录失败用户分析
1、什么是CEP? Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的…...
QSqlRelationalTableModel 关系表格模型
一、 1.1 QSqlRelationalTableModel继承自QSqlTableModel,并且对其进行了扩展,提供了对外键的支持。一个外键就是一个表中的一个字段 和 其他表中的主键字段之间的一对一的映射。例如,“studInfo”表中的departID字段对应的是“departments…...
JS和CSS实现的原生轮播图
JSCSS实现滑动轮播图 使用JS加CSS来实现的幻灯片,主要使用的是CSS的transform属性中的translate来实现,适合与用户交互的轮播图,展现轮播图的数量,用户可自由进行选择。 <!DOCTYPE html> <html lang"en">&…...
【微服务】skywalking自定义链路追踪与日志采集
目录 一、前言 二、自定义链路追踪简介 2.1 自定义链路追踪应用场景 2.2 链路追踪几个关键概念 三、skywalking 自定义链路追踪实现 3.1 环境准备 3.2 集成过程 3.2.1 导入核心依赖 3.2.2 几个常用注解 3.2.3 方法集成 3.2.4 上报追踪信息 四、skywalking 自定义日志…...
MYSQL基础问题
一.DBMS 是什么 DBMS(Database Management System),数据库管理系统,是一种操纵和管理 数据库的大型软件,用于建立、使用和维护数据库。对数据库进行统一的管理和 控制,以保证数据库的安全性和完整性。 二…...
SpringBoot使用Guava实现日志脱敏(含源码)
点击下载《SpringBoot使用Guava实现日志脱敏(含源码)》 1. 摘要 本文将介绍如何使用Google Guava库进行日志脱敏,保护敏感数据的安全。我们将详细解释脱敏的必要性,然后介绍如何使用Guava中的Strings、Maps和CharMatcher类来进行…...
数据结构—动态查找
动态查找介绍 1. 动态查找的引入:当查找表以线性表的形式组织时,若对查找表进行插入、删除或排序操作,就必须移动大量的记录,当记录数很多时,这种移动的代价很大。 2. 动态查找表的设计思想:表结构本身是…...
Tarjan算法学习笔记
目录 无向图的割点与桥 时间戳: 搜索树: 追溯值: 割边判定法则: 割点判定法则: 无向图的双连通分量 定理: 边双连通分量(e-DCC)的求法: e-DCC的缩点: 有向图的连通性 追…...
vue 项目涉及的焦点聚焦、格式化日期、判断是否为对象或数组、判断是否为空、深拷贝、节流、防抖
焦点聚焦 import Vue from vue // 插件对象(必须有 install 方法, 才可以注入到 Vue.use 中) export default {install () {Vue.directive(fofo, {inserted (el) {el el.querySelector(input)el.focus()}})} }格式化日期格式 export const formatDate (time) > {// 将xx…...
软件工程知识梳理6-运行和维护
软件维护需要的工作量很大,大型软件的维护成本高达开发成本的4倍左右。所以,软件工程的主要目的就是要提高软件的可维护性,减少软件维护所需要的工作量,降低软件系统的总成本。 定义:软件已经交付使用之后,…...
docker- php7.4
安装 gd拓展 anzhuanga在Dockerfile里面安装php7.4的GD库 - 知乎 apt update apt install -y libwebp-dev libjpeg-dev libpng-dev libfreetype6-devdocker-php-source extractdocker-php-ext-configure gd \ --with-jpeg/usr/include \ --with-freetype/usr/include/docker-…...
开发一个Android App,在项目中完成添加联系人的功能,通过ContentResolver向系统中添加联系人信息。
实现步骤: (1)添加动态联系人的权限。 (2)创建Activity和布局文件,添加输入框和按钮等控件。 (3)完成添加联系人的功能。 代码文件如下: activity_main.xml文件 <!…...
Flume搭建
压缩包版本:apache-flume-1.9.0-bin.tar 百度盘链接:https://pan.baidu.com/s/1ZhSiePUye9ax7TW5XbfWdw 提取码:ieks 1.解压 tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/ 2. 修改文件名 [rootbigdata1 opt]…...
Web APIs 1 DOM操作
Web APIs 1 引入:const优先Web API 基本认知01 作用和分类02 什么是DOM03 DOM树04 DOM对象 获取DOM对象01 根据CSS选择器获取02 其他获取DOM元素方法 操作元素内容01 innerText 属性02 innerHTML 属性 操作元素属性操作元素的常用属性操作元素的样式属性操作表单元素…...
dvwa,xss反射型lowmedium
xss,反射型,low&&medium low发现xss本地搭建实操 medium作为初学者的我第一次接触比较浅的绕过思路high low 发现xss 本关无过滤 <script>alert(/xss/)</script> //或 <script>confirm(/xss/)</script> //或 <scr…...
从云计算到物联网:虚拟化技术的演变与嵌入式系统的融合
文章目录 一、硬件性能提升:摩尔定律与嵌入式虚拟化二、CPU多核技术:为嵌入式虚拟化提供支持三、业务负载整合:嵌入式虚拟化的核心需求四、降低硬件成本:虚拟化技术的经济效益五、软件重用与移植:虚拟化技术的优势六、…...
linux 文件查看 head 、 cat 、 less 、tail 、grep
查看文件详细信息 stat 文件 cat 》》适合显示小文件【行数比较少】,如果行数较多,屏幕显示不完整(如果虚拟操作,是无法上下键的,或者滚动鼠标的,第三方 xsheel,crt 可以方向键查看…...
13.2 Web与Servlet进阶(❤❤)
13.2 Web与Servlet进阶 1. 请求与响应1.1 URL与URI1.2 HTTP请求的结构1. 结构2.后端获取访问工具类型:getHeader().toLowerCase方法1.3 响应的结构1. 结构2. 响应常见状态码3. 后端设置响应参数4. 响应的ContentType作用1.4 请求转发与响应重定向应用1. 请求转发:getRequestDis…...
记录解决报错--vue前后端分离,接口401(Unauthorized)
1.场景 前端访问不了后端接口。报错401。 2.解决步骤 ①在页面console.log(111)查看走到代码的位置没有。(走到了,没问题) ②查看vue.config.js配置。这段配置就是vue访问api的url。(没问题) devServer: {port: 80…...
【笔记】Android 常用编译模块和输出产物路径
模块&产物路径 具体编译到软件的路径要看编译规则的分区,代码中模块编译输出的产物基本对应。 Android 代码模块 编译产物路径设备adb路径Comment 模块device/mediatek/system/common/ 资源overlay/telephony/frameworks/base/core 文件举例res/res/values-m…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
逻辑回归:给不确定性划界的分类大师
想象你是一名医生。面对患者的检查报告(肿瘤大小、血液指标),你需要做出一个**决定性判断**:恶性还是良性?这种“非黑即白”的抉择,正是**逻辑回归(Logistic Regression)** 的战场&a…...
《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...
PL0语法,分析器实现!
简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...
