当前位置: 首页 > news >正文

Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

在这里插入图片描述
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。

2、代码实现

2.1 引入maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面这个依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

2.2 基本流程

//1、获取原始事件流
DataStream<Event> input = ......; 
//2、定义匹配器
Pattern<Event,?> pattern = .......; 
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(new PatternProcessFunction<Event, Result>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Result> out) throws Exception {}
});

2.3 完整代码

注意:代码运行前,先启动2.4 nlk socket服务

package com.roy.flink.project.userlogin;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配*/
public class MyUserLoginAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket测试env.setParallelism(1);// 1、获取原始事件流(10.86.97.206改为实际地址)final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {@Overridepublic UserLoginRecord map(String s) throws Exception {final String[] splitVal = s.split(",");return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime()));// 2、定义匹配器// 2.1:10秒内出现3次登录失败的记录(不一定连续)// Flink CEP定义消息匹配器。
//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
//            @Override
//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
//                return 1 == userLoginRecord.getLoginRes();
//            }
//        }).times(3).within(Time.seconds(10));// 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followByfinal Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("two").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("three").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).within(Time.seconds(10));// 3、获取匹配流final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);final MyProcessFunction myProcessFunction = new MyProcessFunction();// 4、将匹配流中的数据处理成结果数据流final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);badUserStream.print("badUser");env.execute("UserLoginAna");}// mainpublic static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{@Overridepublic void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {// 针对2.1 连续3次登录失败
//            final List<UserLoginRecord> records = match.get("start");
//            for(UserLoginRecord record : records){
//                out.collect(record);
//            }// 针对2.2 非连续3次登录失败final List<UserLoginRecord> records = match.get("three");for(UserLoginRecord record : records){out.collect(record);}}// processMarch}// MyProcessFunction
}

UserLoginRecord对象,如下:


public class UserLoginRecord {private String userId;private int loginRes; // 0-成功, 1-失败private long loginTime;public UserLoginRecord() {}public UserLoginRecord(String userId, int loginRes, long loginTime) {this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@Overridepublic String toString() {return "UserLoginRecord{" +"userId='" + userId + '\'' +", loginRes=" + loginRes +", loginTime=" + loginTime +'}';}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public int getLoginRes() {return loginRes;}public void setLoginRes(int loginRes) {this.loginRes = loginRes;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}
}

2.4 nlk模拟socket服务端

在这里插入图片描述

2.5 IDEA控制台打印

在这里插入图片描述

相关文章:

Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP&#xff1f; Flink CEP即 Flink Complex Event Processing&#xff0c;是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型&#xff0c;即对于无界流中的各种数据(称为事件)&#xff0c;提供一种组合匹配的…...

QSqlRelationalTableModel 关系表格模型

一、 1.1 QSqlRelationalTableModel继承自QSqlTableModel&#xff0c;并且对其进行了扩展&#xff0c;提供了对外键的支持。一个外键就是一个表中的一个字段 和 其他表中的主键字段之间的一对一的映射。例如&#xff0c;“studInfo”表中的departID字段对应的是“departments…...

JS和CSS实现的原生轮播图

JSCSS实现滑动轮播图 使用JS加CSS来实现的幻灯片&#xff0c;主要使用的是CSS的transform属性中的translate来实现&#xff0c;适合与用户交互的轮播图&#xff0c;展现轮播图的数量&#xff0c;用户可自由进行选择。 <!DOCTYPE html> <html lang"en">&…...

【微服务】skywalking自定义链路追踪与日志采集

目录 一、前言 二、自定义链路追踪简介 2.1 自定义链路追踪应用场景 2.2 链路追踪几个关键概念 三、skywalking 自定义链路追踪实现 3.1 环境准备 3.2 集成过程 3.2.1 导入核心依赖 3.2.2 几个常用注解 3.2.3 方法集成 3.2.4 上报追踪信息 四、skywalking 自定义日志…...

MYSQL基础问题

一&#xff0e;DBMS 是什么 DBMS&#xff08;Database Management System&#xff09;,数据库管理系统&#xff0c;是一种操纵和管理 数据库的大型软件&#xff0c;用于建立、使用和维护数据库。对数据库进行统一的管理和 控制&#xff0c;以保证数据库的安全性和完整性。 二…...

SpringBoot使用Guava实现日志脱敏(含源码)

点击下载《SpringBoot使用Guava实现日志脱敏&#xff08;含源码&#xff09;》 1. 摘要 本文将介绍如何使用Google Guava库进行日志脱敏&#xff0c;保护敏感数据的安全。我们将详细解释脱敏的必要性&#xff0c;然后介绍如何使用Guava中的Strings、Maps和CharMatcher类来进行…...

数据结构—动态查找

动态查找介绍 1. 动态查找的引入&#xff1a;当查找表以线性表的形式组织时&#xff0c;若对查找表进行插入、删除或排序操作&#xff0c;就必须移动大量的记录&#xff0c;当记录数很多时&#xff0c;这种移动的代价很大。 2. 动态查找表的设计思想&#xff1a;表结构本身是…...

Tarjan算法学习笔记

目录 无向图的割点与桥 时间戳&#xff1a; 搜索树&#xff1a; 追溯值&#xff1a; 割边判定法则&#xff1a; 割点判定法则&#xff1a; 无向图的双连通分量 定理&#xff1a; 边双连通分量(e-DCC)的求法&#xff1a; e-DCC的缩点&#xff1a; 有向图的连通性 追…...

vue 项目涉及的焦点聚焦、格式化日期、判断是否为对象或数组、判断是否为空、深拷贝、节流、防抖

焦点聚焦 import Vue from vue // 插件对象(必须有 install 方法, 才可以注入到 Vue.use 中) export default {install () {Vue.directive(fofo, {inserted (el) {el el.querySelector(input)el.focus()}})} }格式化日期格式 export const formatDate (time) > {// 将xx…...

软件工程知识梳理6-运行和维护

软件维护需要的工作量很大&#xff0c;大型软件的维护成本高达开发成本的4倍左右。所以&#xff0c;软件工程的主要目的就是要提高软件的可维护性&#xff0c;减少软件维护所需要的工作量&#xff0c;降低软件系统的总成本。 定义&#xff1a;软件已经交付使用之后&#xff0c;…...

docker- php7.4

安装 gd拓展 anzhuanga在Dockerfile里面安装php7.4的GD库 - 知乎 apt update apt install -y libwebp-dev libjpeg-dev libpng-dev libfreetype6-devdocker-php-source extractdocker-php-ext-configure gd \ --with-jpeg/usr/include \ --with-freetype/usr/include/docker-…...

开发一个Android App,在项目中完成添加联系人的功能,通过ContentResolver向系统中添加联系人信息。

实现步骤&#xff1a; &#xff08;1&#xff09;添加动态联系人的权限。 &#xff08;2&#xff09;创建Activity和布局文件&#xff0c;添加输入框和按钮等控件。 &#xff08;3&#xff09;完成添加联系人的功能。 代码文件如下&#xff1a; activity_main.xml文件 <!…...

Flume搭建

压缩包版本&#xff1a;apache-flume-1.9.0-bin.tar 百度盘链接&#xff1a;https://pan.baidu.com/s/1ZhSiePUye9ax7TW5XbfWdw 提取码&#xff1a;ieks 1.解压 tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/ 2. 修改文件名 [rootbigdata1 opt]…...

Web APIs 1 DOM操作

Web APIs 1 引入&#xff1a;const优先Web API 基本认知01 作用和分类02 什么是DOM03 DOM树04 DOM对象 获取DOM对象01 根据CSS选择器获取02 其他获取DOM元素方法 操作元素内容01 innerText 属性02 innerHTML 属性 操作元素属性操作元素的常用属性操作元素的样式属性操作表单元素…...

dvwa,xss反射型lowmedium

xss&#xff0c;反射型&#xff0c;low&&medium low发现xss本地搭建实操 medium作为初学者的我第一次接触比较浅的绕过思路high low 发现xss 本关无过滤 <script>alert(/xss/)</script> //或 <script>confirm(/xss/)</script> //或 <scr…...

从云计算到物联网:虚拟化技术的演变与嵌入式系统的融合

文章目录 一、硬件性能提升&#xff1a;摩尔定律与嵌入式虚拟化二、CPU多核技术&#xff1a;为嵌入式虚拟化提供支持三、业务负载整合&#xff1a;嵌入式虚拟化的核心需求四、降低硬件成本&#xff1a;虚拟化技术的经济效益五、软件重用与移植&#xff1a;虚拟化技术的优势六、…...

linux 文件查看 head 、 cat 、 less 、tail 、grep

查看文件详细信息 stat 文件 cat 》》适合显示小文件【行数比较少】&#xff0c;如果行数较多&#xff0c;屏幕显示不完整&#xff08;如果虚拟操作&#xff0c;是无法上下键的&#xff0c;或者滚动鼠标的&#xff0c;第三方 xsheel&#xff0c;crt 可以方向键查看&#xf…...

13.2 Web与Servlet进阶(❤❤)

13.2 Web与Servlet进阶 1. 请求与响应1.1 URL与URI1.2 HTTP请求的结构1. 结构2.后端获取访问工具类型:getHeader().toLowerCase方法1.3 响应的结构1. 结构2. 响应常见状态码3. 后端设置响应参数4. 响应的ContentType作用1.4 请求转发与响应重定向应用1. 请求转发:getRequestDis…...

记录解决报错--vue前后端分离,接口401(Unauthorized)

1.场景 前端访问不了后端接口。报错401。 2.解决步骤 ①在页面console.log(111)查看走到代码的位置没有。&#xff08;走到了&#xff0c;没问题&#xff09; ②查看vue.config.js配置。这段配置就是vue访问api的url。&#xff08;没问题&#xff09; devServer: {port: 80…...

【笔记】Android 常用编译模块和输出产物路径

模块&产物路径 具体编译到软件的路径要看编译规则的分区&#xff0c;代码中模块编译输出的产物基本对应。 Android 代码模块 编译产物路径设备adb路径Comment 模块device/mediatek/system/common/ 资源overlay/telephony/frameworks/base/core 文件举例res/res/values-m…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

【笔记】WSL 中 Rust 安装与测试完整记录

#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统&#xff1a;Ubuntu 24.04 LTS (WSL2)架构&#xff1a;x86_64 (GNU/Linux)Rust 版本&#xff1a;rustc 1.87.0 (2025-05-09)Cargo 版本&#xff1a;cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...

MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用

文章目录 一、背景知识&#xff1a;什么是 B-Tree 和 BTree&#xff1f; B-Tree&#xff08;平衡多路查找树&#xff09; BTree&#xff08;B-Tree 的变种&#xff09; 二、结构对比&#xff1a;一张图看懂 三、为什么 MySQL InnoDB 选择 BTree&#xff1f; 1. 范围查询更快 2…...

Axure 下拉框联动

实现选省、选完省之后选对应省份下的市区...

OCR MLLM Evaluation

为什么需要评测体系&#xff1f;——背景与矛盾 ​​ 能干的事&#xff1a;​​ 看清楚发票、身份证上的字&#xff08;准确率>90%&#xff09;&#xff0c;速度飞快&#xff08;眨眼间完成&#xff09;。​​干不了的事&#xff1a;​​ 碰到复杂表格&#xff08;合并单元…...

Redis上篇--知识点总结

Redis上篇–解析 本文大部分知识整理自网上&#xff0c;在正文结束后都会附上参考地址。如果想要深入或者详细学习可以通过文末链接跳转学习。 1. 基本介绍 Redis 是一个开源的、高性能的 内存键值数据库&#xff0c;Redis 的键值对中的 key 就是字符串对象&#xff0c;而 val…...

el-amap-bezier-curve运用及线弧度设置

文章目录 简介示例线弧度属性主要弧度相关属性其他相关样式属性完整示例链接简介 ‌el-amap-bezier-curve 是 Vue-Amap 组件库中的一个组件,用于在 高德地图 上绘制贝塞尔曲线。‌ 基本用法属性path定义曲线的路径,可以是多个弧线段的组合。stroke-weight线条的宽度。stroke…...

使用 uv 工具快速部署并管理 vLLM 推理环境

uv&#xff1a;现代 Python 项目管理的高效助手 uv&#xff1a;Rust 驱动的 Python 包管理新时代 在部署大语言模型&#xff08;LLM&#xff09;推理服务时&#xff0c;vLLM 是一个备受关注的方案&#xff0c;具备高吞吐、低延迟和对 OpenAI API 的良好兼容性。为了提高部署效…...