当前位置: 首页 > article >正文

基于Flink SQL实现7天用户行为风险识别,结合滚动窗口预聚合与CEP复杂事件处理技术,根据用户7天的动作,包括交易,支付,评价等行为,识别用户的风险等级

一、数据建模与预聚合

1. 数据源定义
CREATE TABLE user_actions (user_id STRING,event_time TIMESTAMP(3),action_type STRING, -- 交易/支付/评价amount DOUBLE,status STRING,      -- 交易状态(成功/失败)review_score INT,   -- 评价分数(1-5分)WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE
) WITH ('connector' = 'kafka','topic' = 'user_behavior','format' = 'json'
);
2. 日维度滚动窗口预聚合
CREATE VIEW daily_metrics AS
SELECT user_id,TUMBLE_START(event_time, INTERVAL '1' DAY) AS window_start,COUNT_IF(action_type = 'transaction' AND status = 'failed') AS daily_failed_trans,SUM_IF(amount, action_type = 'payment' AND amount > 10000) AS daily_high_payment,COUNT_IF(action_type = 'review' AND review_score <= 2) AS daily_negative_review
FROM user_actions
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' DAY); -- 按日滚动窗口聚合

关键优化

  • 使用COUNT_IF/SUM_IF过滤无效数据,减少后续处理量
  • 预聚合结果写入Redis/HBase,支持快速合并计算 

二、CEP规则定义(7天风险模式检测)

1. CEP模式语法
SELECT *
FROM daily_metrics
MATCH_RECOGNIZE (PARTITION BY user_idORDER BY window_startMEASURESSUM(A.daily_failed_trans) AS total_failed,SUM(B.daily_high_payment) AS total_high_payment,LAST(C.daily_negative_review) AS last_negative_review,CASE WHEN SUM(A.daily_failed_trans) >=1 AND SUM(B.daily_high_payment) >=1 AND LAST(C.daily_negative_review) >=1 THEN 'HIGH'ELSE 'LOW'END AS risk_levelPATTERN (A+ B+ C) WITHIN INTERVAL '7' DAY  -- 7天内模式匹配DEFINEA AS daily_failed_trans >= 1,    -- 至少1次失败交易B AS daily_high_payment >= 1,    -- 至少1次大额支付(金额>1万)C AS daily_negative_review >= 1  -- 至少1次差评(评分≤2)
);

模式详解

  • A+:匹配连续多日(≥1天)的失败交易
  • B+:匹配连续多日(≥1天)的大额支付
  • C:匹配最后1次差评事件
  • WITHIN限制整体时间窗口为7天 
2. 动态规则管理
-- 外部规则表(MySQL)
CREATE TABLE risk_rules (rule_id STRING,condition STRING, -- 如 'total_failed>=1 AND total_high_payment>=1'risk_level STRING,PRIMARY KEY (rule_id)
) WITH ('connector'='jdbc', ... );-- 动态关联规则
SELECT r.risk_level, c.* 
FROM cep_results c
JOIN risk_rules FOR SYSTEM_TIME AS OF c.window_start AS r
ON c.risk_condition = r.condition;

优势

  • 规则热更新:修改MySQL规则后,通过PatternProcessorDiscoverer动态加载 
  • 支持多级风险(如增加MEDIUM级别)

三、性能优化策略

1. 状态管理
  • 窗口状态TTL:设置14天过期(2倍窗口周期)
  • RocksDB状态后端:支持TB级状态存储 
  • 增量检查点:减少Checkpoint数据量 
2. 计算优化
  • Local-Global聚合:先本地预聚合再全局合并 
  • 水位线对齐:配置table.exec.source.idle-timeout防止窗口卡住 

四、风险处置联动

1. 告警输出
INSERT INTO risk_alert
SELECT user_id, risk_level,PROCTIME() AS alert_time 
FROM cep_results 
WHERE risk_level = 'HIGH';
2. 实时阻断
// 自定义UDF调用风控API
@FunctionHint(output = @DataTypeHint("BOOLEAN"))
public class BlockUserFunction extends ScalarFunction {public boolean eval(String userId) {return RiskService.block(userId); // 调用外部风控系统}
}

五、案例验证

测试数据示例

user_id日期失败交易大额支付差评
U0012025-02-16100
U0012025-02-18010
U0012025-02-20001

输出结果

user_id: U001, risk_level: HIGH 
window_start: 2025-02-16, window_end: 2025-02-23

总结

该方案通过FlinkSQL实现特征矩阵实时计算CEP动态规则引擎结合,解决了传统风控模型规则更新滞后的问题。关键技术点包括:

  1. 时态表关联(Temporal Table Join)实现实时-维度数据融合
  2. MATCH_RECOGNIZE语法定义复杂事件模式 
  3. 动态规则加载避免作业重启[[2][5]]

落地时可参考电商/金融行业案例,通过AB测试验证规则有效性(如误报率降低30%+)

相关文章:

基于Flink SQL实现7天用户行为风险识别,结合滚动窗口预聚合与CEP复杂事件处理技术,根据用户7天的动作,包括交易,支付,评价等行为,识别用户的风险等级

一、数据建模与预聚合 1. 数据源定义 CREATE TABLE user_actions (user_id STRING,event_time TIMESTAMP(3),action_type STRING, -- 交易/支付/评价amount DOUBLE,status STRING, -- 交易状态&#xff08;成功/失败&#xff09;review_score INT, -- 评价分数&#x…...

【无标题】网络安全公钥密码体制

第一节 网络安全 概述 一、基本概念 网络安全通信所需要的基本属性“ 机密性&#xff1b;消息完整性&#xff1b;可访问性与可用性&#xff1b;身份认证。 二、网络安全威胁 窃听&#xff1b;插入&#xff1b;假冒&#xff1b;劫持&#xff1b;拒绝服务Dos和分布式拒绝服务…...

【含开题报告+文档+PPT+源码】基于SpringBoot的进销存管理系统的设计与实现

开题报告 本文提出并研发了一款基于Spring Boot框架构建的进销存管理系统&#xff0c;该系统集成了全方位的企业运营管理功能&#xff0c;涵盖了用户登录验证、系统公告管理、员工信息与权限管理、物料全流程&#xff08;采购入库、销售出库、退货处理&#xff09;控制、部门组…...

Linux-SaltStack配置

文章目录 SaltStack配置 &#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;Linux专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2025年02月24日20点51分 SaltStack配置 SaltStack 中既支持SSH协议也支持我们的一个客户端 #获取公钥&#xff08;…...

DeepSeek-OpenSourceWeek-第二天-DeepEP

DeepSeek正在进行“开源周”活动,在第二天推出了DeepEP,这是一个面向混合专家(MoE)模型训练和推理的开源通信库。DeepSeek此前的成果已令人印象深刻,此次开源关键组件,体现了其对透明度、社区合作以及推动AI进步的决心,通过5个代码库(已发布2个)来展示这一承诺。 第一…...

事务的4个特性和4个隔离级别

事务的4个特性和4个隔离级别 1. 什么是事务2. 事务的ACID特性2.1 原子性2.2 一致性2.3 持久性2.4 隔离性 3. 事务的创建4. 事务并发时出现的问题4.1 DIRTY READ 脏读4.2 NON - REPEATABLR READ 不可重复读4.3 PHANTOM READ 幻读 5. 事务的隔离级别5.1 READ UNCOMMITTED 读未提交…...

对计算机中缓存的理解和使用Redis作为缓存

使用Redis作为缓存缓存例子缓存的引入 Redis缓存的实现 使用Redis作为缓存 缓存 ​什么是缓存&#xff0c;第一次接触这个东西是在考研学习408的时候&#xff0c;计算机组成原理里面学习到Cache缓存&#xff0c;用于降低由于内存和CPU的速度的差异带来的延迟。它是在CPU和内存…...

vue3 下载文件 responseType-blob 或者 a标签

在 Vue 3 中&#xff0c;你可以使用 axios 或 fetch 来下载文件&#xff0c;并将 responseType 设置为 blob 以处理二进制数据。以下是一个使用 axios 的示例&#xff1a; 使用 axios 下载文件 首先&#xff0c;确保你已经安装了 axios&#xff1a; npm install axios然后在你…...

SOME/IP-SD -- 协议英文原文讲解5

前言 SOME/IP协议越来越多的用于汽车电子行业中&#xff0c;关于协议详细完全的中文资料却没有&#xff0c;所以我将结合工作经验并对照英文原版协议做一系列的文章。基本分三大块&#xff1a; 1. SOME/IP协议讲解 2. SOME/IP-SD协议讲解 3. python/C举例调试讲解 5.1.2.5 S…...

lowagie(itext)老版本手绘PDF,包含页码、水印、图片、复选框、复杂行列合并等。

入口类&#xff1a;exportPdf ​ package xcsy.qms.webapi.service;import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.common.utils.StringUtils; import com.ibm.icu.text.RuleBasedNumberFormat; import com.lowa…...

MySQL undo log,redo log和bin log日志文件的生成时间点、层级归属、存储位置及生命周期详解

问题&#xff1a; 假如库名叫做A库&#xff0c;表名叫B表&#xff0c;undo log,redo log和bin log&#xff0c;这些日志文件的生成的时间点是什么&#xff1f;是在mysql的哪一层生成的&#xff1f;哪些文件是有buffer的&#xff1f;哪些日志文件是存在磁盘上的&#xff1f;哪些…...

吃一堑长一智

工作中经历&#xff0c;有感触记录下 故事一 以前在一家公司时&#xff0c;自己是一名开发人员&#xff0c;遇到问题请教领导解决方案&#xff0c;当时领导给了建议&#xff0c;后来上线后出问题了&#xff0c;背了锅。心里想的是领导说这样做的呀&#xff0c;为什么出问题还…...

DeepSeek基础之机器学习

文章目录 一、核心概念总结&#xff08;一&#xff09;机器学习基本定义&#xff08;二&#xff09;基本术语&#xff08;三&#xff09;假设空间&#xff08;四&#xff09;归纳偏好&#xff08;五&#xff09;“没有免费的午餐”定理&#xff08;NFL 定理&#xff09; 二、重…...

达梦有没有类似oerr的功能

在oracle 23ai的sqlplus中&#xff0c;直接看异常信息说明&#xff1a; 达梦没有此功能&#xff0c;但是可以造一个 cd /home/dmdba cat >err.sql<<eof set echo off set ver off set timing off set lineshow off set feedback off select * from V\$ERR_INFO wher…...

实战-网安

面试感受:网安公司前端实习 今天我有幸面试了一家网络安全公司的前端开发实习岗位,整个过程让我受益匪浅,也让我对未来的职业发展有了更清晰的认识。 首先,面试官非常专业且友好,整个面试氛围轻松但不失严谨。面试一开始,面试官简单介绍了公司背景和团队文化,让我对公…...

一文掌握Splash的详细使用

文章目录 1. 安装与启动 Splash1.1 使用 Docker 安装1.2 直接安装 2. 基本用法2.1 访问 Splash 界面2.2 使用 Splash 渲染页面2.3 使用 Lua 脚本 3. 高级用法3.1 处理 JavaScript3.2 截图与 PDF3.3 处理 AJAX 请求3.4 设置请求头3.5 处理 Cookies 4. 与 Scrapy 集成4.1 安装 Sc…...

从 Linux 服务器到前端到网关到后端业务逻辑的分析

前言 在现代 Web 应用程序的架构中&#xff0c;一个完整的请求处理流程涉及多个组件&#xff0c;涵盖了用户界面、服务器环境、网关层和后端业务逻辑。理解这一过程有助于优化系统性能、提高用户体验&#xff0c;并确保系统的可维护性和可扩展性。本文将详细分析从 Linux 服务…...

Java中的Stream API:从入门到实战

引言 在现代Java开发中&#xff0c;Stream API 是处理集合数据的强大工具。它不仅让代码更加简洁易读&#xff0c;还能通过并行处理提升性能。本文将带你从基础概念入手&#xff0c;逐步深入Stream API的使用&#xff0c;并通过实战案例展示其强大功能。 1. 什么是Stream API…...

【python随手记】——读取文本文件内容转换为json格式

文章目录 前言一、TXT文件转换为JSON数组1.txt文件内容2.python代码3.输出结果 二、TXT文件转换为JSON对象1.txt文件2.python代码3.输出结果 前言 场景&#xff1a;用于读取包含空格分隔数据的TXT文件&#xff0c;并将其转换为结构化JSON文件 一、TXT文件转换为JSON数组 1.tx…...

【蓝桥杯】第十五届省赛大学真题组真题解析

【蓝桥杯】第十五届省赛大学真题组真题解析 一、智能停车系统 1、知识点 &#xff08;1&#xff09;flex-wrap 控制子元素的换行方式 属性值有&#xff1a; no-wrap不换行wrap伸缩容器不够则自动往下换行wrap-reverse伸缩容器不够则自动往上换行 &#xff08;2&#xff0…...

MybatisPlus-扩展功能-枚举处理器

在Mybatis里有一个叫TypeHandler的类型处理器&#xff0c;我们常见的PO当中的这些成员变量的数据类型&#xff0c;它都有对应的处理器&#xff0c;因此它就能自动实现这些Java数据类型与数据库类型的相互转换。 它里面还有一个叫EnumOrdinalTypeHandler的枚举处理器&#xff0…...

力扣2454. 下一个更大元素 IV

力扣2454. 下一个更大元素 IV 题目 题目解析及思路 题目要求对于每个数&#xff0c;找到右边比它大的第二个数&#xff0c;并记录在ans数组中 如果是右边第一个大的&#xff0c;就用一个递减栈即可&#xff0c;栈顶元素如果<当前元素则弹出 第二个大数就要利用弹出的栈顶…...

unity学习51:所有UI的父物体:canvas画布

目录 1 下载资源 1.1 在window / Asset store下下载一套免费的UI资源 1.2 下载&#xff0c;导入import 1.3 导入后在 project / Asset下面可以看到 2 画布canvas&#xff0c;UI的父物体 2.1 创建canvas 2.1.1 画布的下面是 event system是UI相关的事件系统 2.2 canvas…...

Ollama部署与常用命令

Ollama是一款开源工具&#xff0c;其目标是简化大语言模型在本地环境的部署和使用。它支持多种流行的开源大语言模型&#xff0c;如 Llama 2、Qwen2.5等。 通过Ollama&#xff0c;用户无需具备深厚的技术背景&#xff0c;就能在普通的消费级硬件上快速搭建一个强大的语言处理环…...

Visual Studio Code 远程开发方法

方法1 共享屏幕远程控制&#xff0c;如 to desk, 向日葵 &#xff0c;像素太差&#xff0c;放弃 方法2 内网穿透 ssh 第二个方法又很麻烦&#xff0c;尤其是对于 windows 电脑&#xff0c;要使用 ssh 还需要额外安装杂七杂八的东西&#xff1b;并且内网穿透服务提供商提供的…...

C语言预编译

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言正文一、预处理的作用与流程&#xf…...

汽车智能制造企业数字化转型SAP解决方案总结

一、项目实施概述 项目阶段划分&#xff1a; 蓝图设计阶段主数据管理方案各模块蓝图设计方案下一阶段工作计划 关键里程碑&#xff1a; 2022年6月6日&#xff1a;项目启动会2022年12月1日&#xff1a;系统上线 二、总体目标 通过SAP实施&#xff0c;构建研产供销协同、业财一…...

flowable-ui 的会签功能实现

场景&#xff1a;在进行智慧保时通开发时&#xff0c;有个协作合同入围功能&#xff0c;这个功能的流程图里有个评审小组&#xff0c;这个评审小组就需要进行会签操作&#xff0c;会签完成后&#xff0c;需要依据是否有不通过的情况选择下一步走的流程 思考步骤&#xff1a; 首…...

Spring Boot 与 MyBatis 数据库操作

一、核心原理 Spring Boot 的自动配置 通过 mybatis-spring-boot-starter 自动配置 DataSource&#xff08;连接池&#xff09;、SqlSessionFactory 和 SqlSessionTemplate。 扫描 Mapper 接口或指定包路径&#xff0c;生成动态代理实现类。 MyBatis 的核心组件 SqlSessionF…...

大连指令数据集的创建--数据收集与预处理_02

1.去哪儿爬虫 编程语言&#xff1a;Python爬虫框架&#xff1a;Selenium&#xff08;用于浏览器自动化&#xff09;解析库&#xff1a;BeautifulSoup&#xff08;用于解析HTML&#xff09; 2.爬虫策略 目标网站&#xff1a;去哪儿&#xff08;https://travel.qunar.com/trav…...