Flink CEP实现10秒内连续登录失败用户分析
1、什么是CEP?
Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。
2、代码实现
2.1 引入maven依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面这个依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
2.2 基本流程
//1、获取原始事件流
DataStream<Event> input = ......;
//2、定义匹配器
Pattern<Event,?> pattern = .......;
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(new PatternProcessFunction<Event, Result>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Result> out) throws Exception {}
});
2.3 完整代码
注意:代码运行前,先启动2.4 nlk socket服务
package com.roy.flink.project.userlogin;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配*/
public class MyUserLoginAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket测试env.setParallelism(1);// 1、获取原始事件流(10.86.97.206改为实际地址)final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {@Overridepublic UserLoginRecord map(String s) throws Exception {final String[] splitVal = s.split(",");return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime()));// 2、定义匹配器// 2.1:10秒内出现3次登录失败的记录(不一定连续)// Flink CEP定义消息匹配器。
// final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
// @Override
// public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
// return 1 == userLoginRecord.getLoginRes();
// }
// }).times(3).within(Time.seconds(10));// 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followByfinal Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("two").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("three").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).within(Time.seconds(10));// 3、获取匹配流final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);final MyProcessFunction myProcessFunction = new MyProcessFunction();// 4、将匹配流中的数据处理成结果数据流final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);badUserStream.print("badUser");env.execute("UserLoginAna");}// mainpublic static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{@Overridepublic void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {// 针对2.1 连续3次登录失败
// final List<UserLoginRecord> records = match.get("start");
// for(UserLoginRecord record : records){
// out.collect(record);
// }// 针对2.2 非连续3次登录失败final List<UserLoginRecord> records = match.get("three");for(UserLoginRecord record : records){out.collect(record);}}// processMarch}// MyProcessFunction
}
UserLoginRecord对象,如下:
public class UserLoginRecord {private String userId;private int loginRes; // 0-成功, 1-失败private long loginTime;public UserLoginRecord() {}public UserLoginRecord(String userId, int loginRes, long loginTime) {this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@Overridepublic String toString() {return "UserLoginRecord{" +"userId='" + userId + '\'' +", loginRes=" + loginRes +", loginTime=" + loginTime +'}';}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public int getLoginRes() {return loginRes;}public void setLoginRes(int loginRes) {this.loginRes = loginRes;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}
}
2.4 nlk模拟socket服务端
2.5 IDEA控制台打印
相关文章:

Flink CEP实现10秒内连续登录失败用户分析
1、什么是CEP? Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的…...

QSqlRelationalTableModel 关系表格模型
一、 1.1 QSqlRelationalTableModel继承自QSqlTableModel,并且对其进行了扩展,提供了对外键的支持。一个外键就是一个表中的一个字段 和 其他表中的主键字段之间的一对一的映射。例如,“studInfo”表中的departID字段对应的是“departments…...

JS和CSS实现的原生轮播图
JSCSS实现滑动轮播图 使用JS加CSS来实现的幻灯片,主要使用的是CSS的transform属性中的translate来实现,适合与用户交互的轮播图,展现轮播图的数量,用户可自由进行选择。 <!DOCTYPE html> <html lang"en">&…...

【微服务】skywalking自定义链路追踪与日志采集
目录 一、前言 二、自定义链路追踪简介 2.1 自定义链路追踪应用场景 2.2 链路追踪几个关键概念 三、skywalking 自定义链路追踪实现 3.1 环境准备 3.2 集成过程 3.2.1 导入核心依赖 3.2.2 几个常用注解 3.2.3 方法集成 3.2.4 上报追踪信息 四、skywalking 自定义日志…...
MYSQL基础问题
一.DBMS 是什么 DBMS(Database Management System),数据库管理系统,是一种操纵和管理 数据库的大型软件,用于建立、使用和维护数据库。对数据库进行统一的管理和 控制,以保证数据库的安全性和完整性。 二…...
SpringBoot使用Guava实现日志脱敏(含源码)
点击下载《SpringBoot使用Guava实现日志脱敏(含源码)》 1. 摘要 本文将介绍如何使用Google Guava库进行日志脱敏,保护敏感数据的安全。我们将详细解释脱敏的必要性,然后介绍如何使用Guava中的Strings、Maps和CharMatcher类来进行…...

数据结构—动态查找
动态查找介绍 1. 动态查找的引入:当查找表以线性表的形式组织时,若对查找表进行插入、删除或排序操作,就必须移动大量的记录,当记录数很多时,这种移动的代价很大。 2. 动态查找表的设计思想:表结构本身是…...
Tarjan算法学习笔记
目录 无向图的割点与桥 时间戳: 搜索树: 追溯值: 割边判定法则: 割点判定法则: 无向图的双连通分量 定理: 边双连通分量(e-DCC)的求法: e-DCC的缩点: 有向图的连通性 追…...
vue 项目涉及的焦点聚焦、格式化日期、判断是否为对象或数组、判断是否为空、深拷贝、节流、防抖
焦点聚焦 import Vue from vue // 插件对象(必须有 install 方法, 才可以注入到 Vue.use 中) export default {install () {Vue.directive(fofo, {inserted (el) {el el.querySelector(input)el.focus()}})} }格式化日期格式 export const formatDate (time) > {// 将xx…...

软件工程知识梳理6-运行和维护
软件维护需要的工作量很大,大型软件的维护成本高达开发成本的4倍左右。所以,软件工程的主要目的就是要提高软件的可维护性,减少软件维护所需要的工作量,降低软件系统的总成本。 定义:软件已经交付使用之后,…...
docker- php7.4
安装 gd拓展 anzhuanga在Dockerfile里面安装php7.4的GD库 - 知乎 apt update apt install -y libwebp-dev libjpeg-dev libpng-dev libfreetype6-devdocker-php-source extractdocker-php-ext-configure gd \ --with-jpeg/usr/include \ --with-freetype/usr/include/docker-…...
开发一个Android App,在项目中完成添加联系人的功能,通过ContentResolver向系统中添加联系人信息。
实现步骤: (1)添加动态联系人的权限。 (2)创建Activity和布局文件,添加输入框和按钮等控件。 (3)完成添加联系人的功能。 代码文件如下: activity_main.xml文件 <!…...

Flume搭建
压缩包版本:apache-flume-1.9.0-bin.tar 百度盘链接:https://pan.baidu.com/s/1ZhSiePUye9ax7TW5XbfWdw 提取码:ieks 1.解压 tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/ 2. 修改文件名 [rootbigdata1 opt]…...

Web APIs 1 DOM操作
Web APIs 1 引入:const优先Web API 基本认知01 作用和分类02 什么是DOM03 DOM树04 DOM对象 获取DOM对象01 根据CSS选择器获取02 其他获取DOM元素方法 操作元素内容01 innerText 属性02 innerHTML 属性 操作元素属性操作元素的常用属性操作元素的样式属性操作表单元素…...

dvwa,xss反射型lowmedium
xss,反射型,low&&medium low发现xss本地搭建实操 medium作为初学者的我第一次接触比较浅的绕过思路high low 发现xss 本关无过滤 <script>alert(/xss/)</script> //或 <script>confirm(/xss/)</script> //或 <scr…...

从云计算到物联网:虚拟化技术的演变与嵌入式系统的融合
文章目录 一、硬件性能提升:摩尔定律与嵌入式虚拟化二、CPU多核技术:为嵌入式虚拟化提供支持三、业务负载整合:嵌入式虚拟化的核心需求四、降低硬件成本:虚拟化技术的经济效益五、软件重用与移植:虚拟化技术的优势六、…...

linux 文件查看 head 、 cat 、 less 、tail 、grep
查看文件详细信息 stat 文件 cat 》》适合显示小文件【行数比较少】,如果行数较多,屏幕显示不完整(如果虚拟操作,是无法上下键的,或者滚动鼠标的,第三方 xsheel,crt 可以方向键查看…...
13.2 Web与Servlet进阶(❤❤)
13.2 Web与Servlet进阶 1. 请求与响应1.1 URL与URI1.2 HTTP请求的结构1. 结构2.后端获取访问工具类型:getHeader().toLowerCase方法1.3 响应的结构1. 结构2. 响应常见状态码3. 后端设置响应参数4. 响应的ContentType作用1.4 请求转发与响应重定向应用1. 请求转发:getRequestDis…...
记录解决报错--vue前后端分离,接口401(Unauthorized)
1.场景 前端访问不了后端接口。报错401。 2.解决步骤 ①在页面console.log(111)查看走到代码的位置没有。(走到了,没问题) ②查看vue.config.js配置。这段配置就是vue访问api的url。(没问题) devServer: {port: 80…...

【笔记】Android 常用编译模块和输出产物路径
模块&产物路径 具体编译到软件的路径要看编译规则的分区,代码中模块编译输出的产物基本对应。 Android 代码模块 编译产物路径设备adb路径Comment 模块device/mediatek/system/common/ 资源overlay/telephony/frameworks/base/core 文件举例res/res/values-m…...

记录一个用了很久的git提交到github和gitee比较方便的方法
在当前git init后,在隐藏的git文件夹中找到config文件 [user]name thels [remote "github"]url your github repository urlfetch refs/heads/*:refs/remotes/origin/* [remote "gitee"]url your gitee repository urlfetch refs/heads/*:…...

Spark 之 DataFrame 开发
foreachPartition val data = spark.sparkContext.parallelize(1 to 100)// 使用 foreachPartition 批量处理分区 data.foreachPartition {partitionIterator =...
白银6月想法
一、市场回顾 2025年5月,SHFE白银主力合约总体呈现出震荡偏强的运行格局。从2025年5月1日至2025年5月30日,白银期货价格整体运行在7944元至8342元区间内,最高价出现在5月22日的8342.0元,最低价则为5月15日的7944元。最终在5月30日…...
Android第十四次面试总结
OkHttp中获取数据与操作数据 一、数据获取核心机制 1. 同步请求(阻塞式) // 1. 创建HTTP客户端(全局应复用实例) OkHttpClient client new OkHttpClient();// 2. 构建请求对象(GET示例) Request r…...
DeepSeek 赋能智能养老:情感陪伴机器人的温暖革新
目录 一、引言二、智能养老情感陪伴机器人的市场现状与需求2.1 市场现状2.2 老年人情感陪伴需求分析 三、DeepSeek 技术详解3.1 DeepSeek 的技术特点3.2 与其他类似技术的对比优势 四、DeepSeek 在智能养老情感陪伴机器人中的具体应用4.1 自然语言处理与对话交互4.2 情感识别与…...

浏览器工作原理05 [#] 渲染流程(上):HTML、CSS和JavaScript是如何变成页面的
引用 浏览器工作原理与实践 一、提出问题 在上一篇文章中我们介绍了导航相关的流程,那导航被提交后又会怎么样呢?就进入了渲染阶段。这个阶段很重要,了解其相关流程能让你“看透”页面是如何工作的,有了这些知识,你可…...
React 中 HTML 插入的全场景实践与安全指南
在 React 开发过程中,我们常常会遇到需要插入 HTML 内容的场景。比如将服务端返回的富文本渲染到页面,还有处理复杂的 UI 结构,正确的 HTML 插入方式不仅影响页面展示效果,更关乎应用的安全性。 本文将详细探讨 React 中插入 HTM…...

如何在没有 iTunes 的情况下备份 iPhone
我可以在没有 iTunes 的情况下将 iPhone 备份到电脑吗?虽然 iTunes 曾经是备份 iPhone 的主要方法,但它并不是 iOS 用户唯一的备份选项。您可以选择多种方便的替代方案来备份 iPhone,无需使用 iTunes。您可以在这里获得更灵活、更人性化的备份…...
【LeetCode】3170. 删除星号以后字典序最小的字符串(贪心 | 优先队列)
LeetCode 3170. 删除星号以后字典序最小的字符串(中等) 题目描述解题思路java代码 题目描述 题目链接:3170. 删除星号以后字典序最小的字符串 给你一个字符串 s 。它可能包含任意数量的 * 字符。你的任务是删除所有的 * 字符。 当字符串还…...
RTOS学习之重难点
📢:如果你也对机器人、人工智能感兴趣,看来我们志同道合✨ 📢:不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 📢:文章若有幸对你有帮助,可点赞 👍…...