【Flink银行反欺诈系统设计方案】1.短时间内多次大额交易场景的flink与cep的实现
【flink应用系列】1.Flink银行反欺诈系统设计方案
- 1. 经典案例:短时间内多次大额交易
- 1.1 场景描述
- 1.2 风险判定逻辑
- 2. 使用Flink实现
- 2.1 实现思路
- 2.2 代码实现
- 2.3 使用Flink流处理
- 3. 使用Flink CEP实现
- 3.1 实现思路
- 3.2 代码实现
- 4. 总结
1. 经典案例:短时间内多次大额交易
1.1 场景描述
规则1:单笔交易金额超过10,000元。
规则2:同一用户在10分钟内进行了3次或更多次交易。
风险行为:同时满足规则1和规则2的交易行为。
1.2 风险判定逻辑
检测每笔交易是否满足“单笔交易金额超过10,000元”。
对同一用户,统计10分钟内的交易次数。
如果交易次数达到3次或更多,则判定为风险行为。
2. 使用Flink实现
2.1 实现思路
使用Flink的KeyedStream按用户分组。
使用ProcessFunction实现自定义窗口逻辑,统计10分钟内的交易次数。
结合规则1和规则2,判断是否为风险行为。
2.2 代码实现
// 定义交易数据POJO
public class Transaction {private String transactionId;private String userId;private Double amount;private Long timestamp;// getters and setters
}// 定义风控结果POJO
public class RiskResult {private String userId;private String transactionId;private String riskLevel;private String actionTaken;private Long createTime;// getters and setters
}// 实现风控逻辑
public class FraudDetectionProcessFunction extends KeyedProcessFunction<String, Transaction, RiskResult> {private transient ValueState<Integer> transactionCountState;private transient ValueState<Long> timerState;@Overridepublic void open(Configuration parameters) {// 初始化状态ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>("transactionCount", Types.INT);transactionCountState = getRuntimeContext().getState(countDescriptor);ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);timerState = getRuntimeContext().getState(timerDescriptor);}@Overridepublic void processElement(Transaction transaction,Context ctx,Collector<RiskResult> out) throws Exception {// 规则1:单笔交易金额超过10,000元if (transaction.getAmount() > 10000) {// 更新交易次数Integer count = transactionCountState.value();if (count == null) {count = 0;}count += 1;transactionCountState.update(count);// 如果是第一次满足规则1,设置10分钟的定时器if (count == 1) {long timer = ctx.timestamp() + 10 * 60 * 1000; // 10分钟ctx.timerService().registerEventTimeTimer(timer);timerState.update(timer);}// 规则2:10分钟内交易次数达到3次if (count >= 3) {RiskResult result = new RiskResult();result.setUserId(transaction.getUserId());result.setTransactionId(transaction.getTransactionId());result.setRiskLevel("HIGH");result.setActionTaken("ALERT");result.setCreateTime(System.currentTimeMillis());out.collect(result);}}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<RiskResult> out) throws Exception {// 定时器触发时,重置状态transactionCountState.clear();timerState.clear();}
}
2.3 使用Flink流处理
java
DataStream<Transaction> transactionStream = env.addSource(transactionSource);DataStream<RiskResult> riskResultStream = transactionStream.keyBy(Transaction::getUserId).process(new FraudDetectionProcessFunction());riskResultStream.addSink(new AlertSink());
3. 使用Flink CEP实现
Flink CEP(Complex Event Processing)是Flink提供的复杂事件处理库,适合处理基于时间序列的模式匹配。以下是使用Flink CEP实现上述风控规则的示例。
3.1 实现思路
定义模式:检测10分钟内3次或更多次大额交易。
使用Flink CEP的模式匹配功能,匹配符合条件的事件序列。
3.2 代码实现
java
// 定义交易数据POJO
public class Transaction {private String transactionId;private String userId;private Double amount;private Long timestamp;// getters and setters
}// 定义风控结果POJO
public class RiskResult {private String userId;private List<String> transactionIds;private String riskLevel;private String actionTaken;private Long createTime;// getters and setters
}// 实现风控逻辑
public class FraudDetectionCEP {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 交易数据流DataStream<Transaction> transactionStream = env.addSource(transactionSource).assignTimestampsAndWatermarks(WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, timestamp) -> event.getTimestamp()));// 按用户分组KeyedStream<Transaction, String> keyedStream = transactionStream.keyBy(Transaction::getUserId);// 定义CEP模式:10分钟内3次或更多次大额交易Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).next("second").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).next("third").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).within(Time.minutes(10));// 应用模式PatternStream<Transaction> patternStream = CEP.pattern(keyedStream, pattern);// 生成风控结果DataStream<RiskResult> riskResultStream = patternStream.process(new PatternProcessFunction<Transaction, RiskResult>() {@Overridepublic void processMatch(Map<String, List<Transaction>> match,Context ctx,Collector<RiskResult> out) throws Exception {RiskResult result = new RiskResult();result.setUserId(match.get("first").get(0).getUserId());result.setTransactionIds(match.values().stream().flatMap(List::stream).map(Transaction::getTransactionId).collect(Collectors.toList()));result.setRiskLevel("HIGH");result.setActionTaken("ALERT");result.setCreateTime(System.currentTimeMillis());out.collect(result);}});// 输出结果riskResultStream.addSink(new AlertSink());env.execute("Fraud Detection with Flink CEP");}
}
4. 总结
Flink实现:通过KeyedProcessFunction和状态管理实现多规则匹配。
Flink CEP实现:通过定义复杂事件模式,简化多规则匹配的逻辑。
适用场景:
Flink适合需要自定义逻辑的场景。
Flink CEP适合基于时间序列的模式匹配场景。
通过以上实现,可以高效检测银行交易中的风险行为,并根据需要扩展更多规则
相关文章:
【Flink银行反欺诈系统设计方案】1.短时间内多次大额交易场景的flink与cep的实现
【flink应用系列】1.Flink银行反欺诈系统设计方案 1. 经典案例:短时间内多次大额交易1.1 场景描述1.2 风险判定逻辑 2. 使用Flink实现2.1 实现思路2.2 代码实现2.3 使用Flink流处理 3. 使用Flink CEP实现3.1 实现思路3.2 代码实现 4. 总结 1. 经典案例:短…...
HashMap的table数组何时初始化?默认容量和扩容阈值是多少?
HashMap 的 table 数组何时初始化? 答案: table 数组在第一次调用 put() 方法时初始化。 为什么? HashMap 为了节省内存,采用了“懒加载”机制。即使用 new HashMap() 创建对象时,只是计算了参数(如容量、…...
基于CURL命令封装的JAVA通用HTTP工具
文章目录 一、简要概述二、封装过程1. 引入依赖2. 定义脚本执行类 三、单元测试四、其他资源 一、简要概述 在Linux中curl是一个利用URL规则在命令行下工作的文件传输工具,可以说是一款很强大的http命令行工具。它支持文件的上传和下载,是综合传输工具&…...
docker学习笔记(1)从安装docker到使用Portainer部署容器
docker学习笔记第一课 先交代背景 docker宿主机系统:阿里云ubuntu22.04 开发机系统:win11 docker镜像仓库:阿里云,此阿里云与宿主机系统没有关系,是阿里云提供的一个免费的docker仓库 代码托管平台:github&…...
数据集/API 笔记:新加坡PSI(空气污染指数)API
data.gov.sg 数据范围:2016年2月 - 2025年3月 1 获取API方式 curl --request GET \--url https://api-open.data.gov.sg/v2/real-time/api/psi 2 返回数据 API 的数据结构可以分为 3 大部分: 区域元数据(regionMetadata) →…...
计算机网络数据传输探秘:包裹如何在数字世界旅行?
计算机网络数据传输探秘:包裹如何在数字世界旅行? 一、从快递网络看数据传输本质 想象你网购了一件商品: 打包:商家用纸箱包装,贴上地址标签(数据封装)运输:包裹经过网点→分拣中心→运输车(网络节点与链路)签收:快递员核对信息后交付(数据校验与接收)数据的网络…...
笔记:代码随想录算法训练营day36:LeetCode1049. 最后一块石头的重量 II、494. 目标和、474.一和零
学习资料:代码随想录 1049.最后一块石头的重量II 力扣题目链接 思路:如何讲该问题转化为背包问题:还是对半分去碰,对半分去碰碰剩下的就是最小的。然后背包容量就是一半儿,物品重量等于物品价值等于stones[i] 和上…...
Bitmap -> Bitmap安卓设备上的显示和内存
Android 屏幕显示与 Bitmap 内存详解 前言 在 Android 开发中,理解屏幕显示单位和 Bitmap 内存占用是构建高效应用的基础。本文将详细介绍相关概念、计算公式及单位转换,并通过实例分析 Bitmap 在内存中的表现。 一、屏幕显示单位基础 1.1 基本单位及…...
QT study DAY2
作业 代码 Widget.h class Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);~Widget();void save_data(const QString& filename,const QString& data); private slots:void on_lineEdit_textChanged(); //账户栏void on_l…...
QT-自定义参数设计框架软件
QT-自定义参数设计框架软件 Chapter1 QT-自定义参数设计框架软件前言一、演示效果二、使用步骤1.应用进行参数注册2.数据库操作单例对象3.参数操作单例对象 三、下载链接 Chapter2 Qt中管理配置参数(QSettings、单例模式)1 前言2 QSettings类ini文件写in…...
VUE集成Live2d
VUE集成Live2d 目前基于大模型,可以实现一个桌面的3D动画小人,个人猜测可以简介这个项目进行实现 1-参考网址 试了很多项目,只有这个项目直观的把问题说清楚了 Live2D Vue3技术应用:https://blog.csdn.net/hh1233321/article/details/1406947…...
【CPP面经】科大讯飞 腾讯后端开发面经分享
文章目录 C 面试问题整理基础问题简答1. 内存对齐2. this 指针3. 在成员函数中删除 this4. 引用占用内存吗?5. C 越界访问场景6. 进程通信方式7. 无锁队列实现8. ping 在哪一层?实现原理?9. HTTPS 流程10. GDB 使用及 CPU 高使用定位11. 智能…...
el-card 结合 el-descriptions 作为信息展示
记录下el-card 组合 el-descriptions 实现动态展示信息 文章结构 实现效果1. el-descriptions 组件使用1.1 结合v-for实现列表渲染1.2 解析 2. 自定义 el-descriptions 样式2.1 修改背景色、字体颜色2.2 调整字体大小2.3 解析 3. el-card 结合 el-descriptions 作为信息展示3.…...
GaussDB自带诊断工具实战指南
一、引言 GaussDB是一种分布式的关系型数据库。在数据库运维中,快速定位性能瓶颈、诊断故障是保障业务连续性的关键。GaussDB内置了多种诊断工具,结合日志分析、执行计划解析和实时监控功能,帮助开发者与运维人员高效解决问题。本文深入讲解…...
LeetCode 链表章节
简单 21. 合并两个有序链表 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1: 输入:l1 [1,2,4], l2 [1,3,4] 输出:[1,1,2,3,4,4]示例 2: 输入:l1 [], l2…...
SSL证书和HTTPS:全面解析它们的功能与重要性
每当我们在互联网上输入个人信息、进行在线交易时,背后是否有一个安全的保障?这时,SSL证书和HTTPS便扮演了至关重要的角色。本文将全面分析SSL证书和HTTPS的含义、功能、重要性以及它们在网络安全中的作用。 一、SSL证书的定义与基本概念 S…...
正交投影与内积空间:机器学习的几何基础
前言 本文隶属于专栏《机器学习数学通关指南》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见《机器学习数学通关指南》 正文 🔍 1. 内积空间的…...
Qt中txt文件输出为PDF格式
main.cpp PdfReportGenerator pdfReportGenerator;// 加载中文字体if (QFontDatabase::addApplicationFont(":/new/prefix1/simsun.ttf") -1) {QMessageBox::warning(nullptr, "警告", "无法加载中文字体");}// 解析日志文件QVector<LogEntr…...
《HelloGitHub》第 107 期
兴趣是最好的老师,HelloGitHub 让你对编程感兴趣! 简介 HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。 github.com/521xueweihan/HelloGitHub 这里有实战项目、入门教程、黑科技、开源书籍、大厂开源项目等,涵盖多种编程语言 Python、…...
Langchain解锁LLM大语言模型的结构化输出能力(多种实现方案)
在 LangChain解锁LLM大语言模型的结构化输出能力:调用 with_structured_output() 方法 这篇博客中,我们了解了格式化LLM输出内容的必要性以及如何通过调用langchain框架中提供的 with_structured_output() 方法对LLM输出进行格式化(三种可选方…...
龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...
Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...
