当前位置: 首页 > news >正文

Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

在这里插入图片描述
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。

2、代码实现

2.1 引入maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面这个依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

2.2 基本流程

//1、获取原始事件流
DataStream<Event> input = ......; 
//2、定义匹配器
Pattern<Event,?> pattern = .......; 
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(new PatternProcessFunction<Event, Result>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Result> out) throws Exception {}
});

2.3 完整代码

注意:代码运行前,先启动2.4 nlk socket服务

package com.roy.flink.project.userlogin;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配*/
public class MyUserLoginAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket测试env.setParallelism(1);// 1、获取原始事件流(10.86.97.206改为实际地址)final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {@Overridepublic UserLoginRecord map(String s) throws Exception {final String[] splitVal = s.split(",");return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime()));// 2、定义匹配器// 2.1:10秒内出现3次登录失败的记录(不一定连续)// Flink CEP定义消息匹配器。
//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
//            @Override
//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
//                return 1 == userLoginRecord.getLoginRes();
//            }
//        }).times(3).within(Time.seconds(10));// 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followByfinal Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("two").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("three").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).within(Time.seconds(10));// 3、获取匹配流final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);final MyProcessFunction myProcessFunction = new MyProcessFunction();// 4、将匹配流中的数据处理成结果数据流final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);badUserStream.print("badUser");env.execute("UserLoginAna");}// mainpublic static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{@Overridepublic void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {// 针对2.1 连续3次登录失败
//            final List<UserLoginRecord> records = match.get("start");
//            for(UserLoginRecord record : records){
//                out.collect(record);
//            }// 针对2.2 非连续3次登录失败final List<UserLoginRecord> records = match.get("three");for(UserLoginRecord record : records){out.collect(record);}}// processMarch}// MyProcessFunction
}

UserLoginRecord对象,如下:


public class UserLoginRecord {private String userId;private int loginRes; // 0-成功, 1-失败private long loginTime;public UserLoginRecord() {}public UserLoginRecord(String userId, int loginRes, long loginTime) {this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@Overridepublic String toString() {return "UserLoginRecord{" +"userId='" + userId + '\'' +", loginRes=" + loginRes +", loginTime=" + loginTime +'}';}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public int getLoginRes() {return loginRes;}public void setLoginRes(int loginRes) {this.loginRes = loginRes;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}
}

2.4 nlk模拟socket服务端

在这里插入图片描述

2.5 IDEA控制台打印

在这里插入图片描述

相关文章:

Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP&#xff1f; Flink CEP即 Flink Complex Event Processing&#xff0c;是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型&#xff0c;即对于无界流中的各种数据(称为事件)&#xff0c;提供一种组合匹配的…...

QSqlRelationalTableModel 关系表格模型

一、 1.1 QSqlRelationalTableModel继承自QSqlTableModel&#xff0c;并且对其进行了扩展&#xff0c;提供了对外键的支持。一个外键就是一个表中的一个字段 和 其他表中的主键字段之间的一对一的映射。例如&#xff0c;“studInfo”表中的departID字段对应的是“departments…...

JS和CSS实现的原生轮播图

JSCSS实现滑动轮播图 使用JS加CSS来实现的幻灯片&#xff0c;主要使用的是CSS的transform属性中的translate来实现&#xff0c;适合与用户交互的轮播图&#xff0c;展现轮播图的数量&#xff0c;用户可自由进行选择。 <!DOCTYPE html> <html lang"en">&…...

【微服务】skywalking自定义链路追踪与日志采集

目录 一、前言 二、自定义链路追踪简介 2.1 自定义链路追踪应用场景 2.2 链路追踪几个关键概念 三、skywalking 自定义链路追踪实现 3.1 环境准备 3.2 集成过程 3.2.1 导入核心依赖 3.2.2 几个常用注解 3.2.3 方法集成 3.2.4 上报追踪信息 四、skywalking 自定义日志…...

MYSQL基础问题

一&#xff0e;DBMS 是什么 DBMS&#xff08;Database Management System&#xff09;,数据库管理系统&#xff0c;是一种操纵和管理 数据库的大型软件&#xff0c;用于建立、使用和维护数据库。对数据库进行统一的管理和 控制&#xff0c;以保证数据库的安全性和完整性。 二…...

SpringBoot使用Guava实现日志脱敏(含源码)

点击下载《SpringBoot使用Guava实现日志脱敏&#xff08;含源码&#xff09;》 1. 摘要 本文将介绍如何使用Google Guava库进行日志脱敏&#xff0c;保护敏感数据的安全。我们将详细解释脱敏的必要性&#xff0c;然后介绍如何使用Guava中的Strings、Maps和CharMatcher类来进行…...

数据结构—动态查找

动态查找介绍 1. 动态查找的引入&#xff1a;当查找表以线性表的形式组织时&#xff0c;若对查找表进行插入、删除或排序操作&#xff0c;就必须移动大量的记录&#xff0c;当记录数很多时&#xff0c;这种移动的代价很大。 2. 动态查找表的设计思想&#xff1a;表结构本身是…...

Tarjan算法学习笔记

目录 无向图的割点与桥 时间戳&#xff1a; 搜索树&#xff1a; 追溯值&#xff1a; 割边判定法则&#xff1a; 割点判定法则&#xff1a; 无向图的双连通分量 定理&#xff1a; 边双连通分量(e-DCC)的求法&#xff1a; e-DCC的缩点&#xff1a; 有向图的连通性 追…...

vue 项目涉及的焦点聚焦、格式化日期、判断是否为对象或数组、判断是否为空、深拷贝、节流、防抖

焦点聚焦 import Vue from vue // 插件对象(必须有 install 方法, 才可以注入到 Vue.use 中) export default {install () {Vue.directive(fofo, {inserted (el) {el el.querySelector(input)el.focus()}})} }格式化日期格式 export const formatDate (time) > {// 将xx…...

软件工程知识梳理6-运行和维护

软件维护需要的工作量很大&#xff0c;大型软件的维护成本高达开发成本的4倍左右。所以&#xff0c;软件工程的主要目的就是要提高软件的可维护性&#xff0c;减少软件维护所需要的工作量&#xff0c;降低软件系统的总成本。 定义&#xff1a;软件已经交付使用之后&#xff0c;…...

docker- php7.4

安装 gd拓展 anzhuanga在Dockerfile里面安装php7.4的GD库 - 知乎 apt update apt install -y libwebp-dev libjpeg-dev libpng-dev libfreetype6-devdocker-php-source extractdocker-php-ext-configure gd \ --with-jpeg/usr/include \ --with-freetype/usr/include/docker-…...

开发一个Android App,在项目中完成添加联系人的功能,通过ContentResolver向系统中添加联系人信息。

实现步骤&#xff1a; &#xff08;1&#xff09;添加动态联系人的权限。 &#xff08;2&#xff09;创建Activity和布局文件&#xff0c;添加输入框和按钮等控件。 &#xff08;3&#xff09;完成添加联系人的功能。 代码文件如下&#xff1a; activity_main.xml文件 <!…...

Flume搭建

压缩包版本&#xff1a;apache-flume-1.9.0-bin.tar 百度盘链接&#xff1a;https://pan.baidu.com/s/1ZhSiePUye9ax7TW5XbfWdw 提取码&#xff1a;ieks 1.解压 tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/ 2. 修改文件名 [rootbigdata1 opt]…...

Web APIs 1 DOM操作

Web APIs 1 引入&#xff1a;const优先Web API 基本认知01 作用和分类02 什么是DOM03 DOM树04 DOM对象 获取DOM对象01 根据CSS选择器获取02 其他获取DOM元素方法 操作元素内容01 innerText 属性02 innerHTML 属性 操作元素属性操作元素的常用属性操作元素的样式属性操作表单元素…...

dvwa,xss反射型lowmedium

xss&#xff0c;反射型&#xff0c;low&&medium low发现xss本地搭建实操 medium作为初学者的我第一次接触比较浅的绕过思路high low 发现xss 本关无过滤 <script>alert(/xss/)</script> //或 <script>confirm(/xss/)</script> //或 <scr…...

从云计算到物联网:虚拟化技术的演变与嵌入式系统的融合

文章目录 一、硬件性能提升&#xff1a;摩尔定律与嵌入式虚拟化二、CPU多核技术&#xff1a;为嵌入式虚拟化提供支持三、业务负载整合&#xff1a;嵌入式虚拟化的核心需求四、降低硬件成本&#xff1a;虚拟化技术的经济效益五、软件重用与移植&#xff1a;虚拟化技术的优势六、…...

linux 文件查看 head 、 cat 、 less 、tail 、grep

查看文件详细信息 stat 文件 cat 》》适合显示小文件【行数比较少】&#xff0c;如果行数较多&#xff0c;屏幕显示不完整&#xff08;如果虚拟操作&#xff0c;是无法上下键的&#xff0c;或者滚动鼠标的&#xff0c;第三方 xsheel&#xff0c;crt 可以方向键查看&#xf…...

13.2 Web与Servlet进阶(❤❤)

13.2 Web与Servlet进阶 1. 请求与响应1.1 URL与URI1.2 HTTP请求的结构1. 结构2.后端获取访问工具类型:getHeader().toLowerCase方法1.3 响应的结构1. 结构2. 响应常见状态码3. 后端设置响应参数4. 响应的ContentType作用1.4 请求转发与响应重定向应用1. 请求转发:getRequestDis…...

记录解决报错--vue前后端分离,接口401(Unauthorized)

1.场景 前端访问不了后端接口。报错401。 2.解决步骤 ①在页面console.log(111)查看走到代码的位置没有。&#xff08;走到了&#xff0c;没问题&#xff09; ②查看vue.config.js配置。这段配置就是vue访问api的url。&#xff08;没问题&#xff09; devServer: {port: 80…...

【笔记】Android 常用编译模块和输出产物路径

模块&产物路径 具体编译到软件的路径要看编译规则的分区&#xff0c;代码中模块编译输出的产物基本对应。 Android 代码模块 编译产物路径设备adb路径Comment 模块device/mediatek/system/common/ 资源overlay/telephony/frameworks/base/core 文件举例res/res/values-m…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

可靠性+灵活性:电力载波技术在楼宇自控中的核心价值

可靠性灵活性&#xff1a;电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中&#xff0c;电力载波技术&#xff08;PLC&#xff09;凭借其独特的优势&#xff0c;正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据&#xff0c;无需额外布…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践

6月5日&#xff0c;2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席&#xff0c;并作《智能体在安全领域的应用实践》主题演讲&#xff0c;分享了在智能体在安全领域的突破性实践。他指出&#xff0c;百度通过将安全能力…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

安卓基础(aar)

重新设置java21的环境&#xff0c;临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的&#xff1a; MyApp/ ├── app/ …...