当前位置: 首页 > news >正文

【Flink银行反欺诈系统设计方案】1.短时间内多次大额交易场景的flink与cep的实现

【flink应用系列】1.Flink银行反欺诈系统设计方案

  • 1. 经典案例:短时间内多次大额交易
    • 1.1 场景描述
    • 1.2 风险判定逻辑
  • 2. 使用Flink实现
    • 2.1 实现思路
    • 2.2 代码实现
    • 2.3 使用Flink流处理
  • 3. 使用Flink CEP实现
    • 3.1 实现思路
    • 3.2 代码实现
  • 4. 总结

1. 经典案例:短时间内多次大额交易

1.1 场景描述

规则1:单笔交易金额超过10,000元。

规则2:同一用户在10分钟内进行了3次或更多次交易。

风险行为:同时满足规则1和规则2的交易行为。

1.2 风险判定逻辑

检测每笔交易是否满足“单笔交易金额超过10,000元”。

对同一用户,统计10分钟内的交易次数。

如果交易次数达到3次或更多,则判定为风险行为。

2. 使用Flink实现

2.1 实现思路

使用Flink的KeyedStream按用户分组。

使用ProcessFunction实现自定义窗口逻辑,统计10分钟内的交易次数。

结合规则1和规则2,判断是否为风险行为。

2.2 代码实现

// 定义交易数据POJO
public class Transaction {private String transactionId;private String userId;private Double amount;private Long timestamp;// getters and setters
}// 定义风控结果POJO
public class RiskResult {private String userId;private String transactionId;private String riskLevel;private String actionTaken;private Long createTime;// getters and setters
}// 实现风控逻辑
public class FraudDetectionProcessFunction extends KeyedProcessFunction<String, Transaction, RiskResult> {private transient ValueState<Integer> transactionCountState;private transient ValueState<Long> timerState;@Overridepublic void open(Configuration parameters) {// 初始化状态ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>("transactionCount", Types.INT);transactionCountState = getRuntimeContext().getState(countDescriptor);ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);timerState = getRuntimeContext().getState(timerDescriptor);}@Overridepublic void processElement(Transaction transaction,Context ctx,Collector<RiskResult> out) throws Exception {// 规则1:单笔交易金额超过10,000元if (transaction.getAmount() > 10000) {// 更新交易次数Integer count = transactionCountState.value();if (count == null) {count = 0;}count += 1;transactionCountState.update(count);// 如果是第一次满足规则1,设置10分钟的定时器if (count == 1) {long timer = ctx.timestamp() + 10 * 60 * 1000; // 10分钟ctx.timerService().registerEventTimeTimer(timer);timerState.update(timer);}// 规则2:10分钟内交易次数达到3次if (count >= 3) {RiskResult result = new RiskResult();result.setUserId(transaction.getUserId());result.setTransactionId(transaction.getTransactionId());result.setRiskLevel("HIGH");result.setActionTaken("ALERT");result.setCreateTime(System.currentTimeMillis());out.collect(result);}}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<RiskResult> out) throws Exception {// 定时器触发时,重置状态transactionCountState.clear();timerState.clear();}
}

2.3 使用Flink流处理

java

DataStream<Transaction> transactionStream = env.addSource(transactionSource);DataStream<RiskResult> riskResultStream = transactionStream.keyBy(Transaction::getUserId).process(new FraudDetectionProcessFunction());riskResultStream.addSink(new AlertSink());

3. 使用Flink CEP实现

Flink CEP(Complex Event Processing)是Flink提供的复杂事件处理库,适合处理基于时间序列的模式匹配。以下是使用Flink CEP实现上述风控规则的示例。

3.1 实现思路

定义模式:检测10分钟内3次或更多次大额交易。

使用Flink CEP的模式匹配功能,匹配符合条件的事件序列。

3.2 代码实现

java

// 定义交易数据POJO
public class Transaction {private String transactionId;private String userId;private Double amount;private Long timestamp;// getters and setters
}// 定义风控结果POJO
public class RiskResult {private String userId;private List<String> transactionIds;private String riskLevel;private String actionTaken;private Long createTime;// getters and setters
}// 实现风控逻辑
public class FraudDetectionCEP {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 交易数据流DataStream<Transaction> transactionStream = env.addSource(transactionSource).assignTimestampsAndWatermarks(WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, timestamp) -> event.getTimestamp()));// 按用户分组KeyedStream<Transaction, String> keyedStream = transactionStream.keyBy(Transaction::getUserId);// 定义CEP模式:10分钟内3次或更多次大额交易Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).next("second").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).next("third").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).within(Time.minutes(10));// 应用模式PatternStream<Transaction> patternStream = CEP.pattern(keyedStream, pattern);// 生成风控结果DataStream<RiskResult> riskResultStream = patternStream.process(new PatternProcessFunction<Transaction, RiskResult>() {@Overridepublic void processMatch(Map<String, List<Transaction>> match,Context ctx,Collector<RiskResult> out) throws Exception {RiskResult result = new RiskResult();result.setUserId(match.get("first").get(0).getUserId());result.setTransactionIds(match.values().stream().flatMap(List::stream).map(Transaction::getTransactionId).collect(Collectors.toList()));result.setRiskLevel("HIGH");result.setActionTaken("ALERT");result.setCreateTime(System.currentTimeMillis());out.collect(result);}});// 输出结果riskResultStream.addSink(new AlertSink());env.execute("Fraud Detection with Flink CEP");}
}

4. 总结

Flink实现:通过KeyedProcessFunction和状态管理实现多规则匹配。

Flink CEP实现:通过定义复杂事件模式,简化多规则匹配的逻辑。

适用场景:

Flink适合需要自定义逻辑的场景。

Flink CEP适合基于时间序列的模式匹配场景。

通过以上实现,可以高效检测银行交易中的风险行为,并根据需要扩展更多规则

相关文章:

【Flink银行反欺诈系统设计方案】1.短时间内多次大额交易场景的flink与cep的实现

【flink应用系列】1.Flink银行反欺诈系统设计方案 1. 经典案例&#xff1a;短时间内多次大额交易1.1 场景描述1.2 风险判定逻辑 2. 使用Flink实现2.1 实现思路2.2 代码实现2.3 使用Flink流处理 3. 使用Flink CEP实现3.1 实现思路3.2 代码实现 4. 总结 1. 经典案例&#xff1a;短…...

HashMap的table数组何时初始化?默认容量和扩容阈值是多少?

HashMap 的 table 数组何时初始化&#xff1f; 答案&#xff1a; table 数组在第一次调用 put() 方法时初始化。 为什么&#xff1f; HashMap 为了节省内存&#xff0c;采用了“懒加载”机制。即使用 new HashMap() 创建对象时&#xff0c;只是计算了参数&#xff08;如容量、…...

基于CURL命令封装的JAVA通用HTTP工具

文章目录 一、简要概述二、封装过程1. 引入依赖2. 定义脚本执行类 三、单元测试四、其他资源 一、简要概述 在Linux中curl是一个利用URL规则在命令行下工作的文件传输工具&#xff0c;可以说是一款很强大的http命令行工具。它支持文件的上传和下载&#xff0c;是综合传输工具&…...

docker学习笔记(1)从安装docker到使用Portainer部署容器

docker学习笔记第一课 先交代背景 docker宿主机系统&#xff1a;阿里云ubuntu22.04 开发机系统&#xff1a;win11 docker镜像仓库&#xff1a;阿里云&#xff0c;此阿里云与宿主机系统没有关系&#xff0c;是阿里云提供的一个免费的docker仓库 代码托管平台&#xff1a;github&…...

数据集/API 笔记:新加坡PSI(空气污染指数)API

data.gov.sg 数据范围&#xff1a;2016年2月 - 2025年3月 1 获取API方式 curl --request GET \--url https://api-open.data.gov.sg/v2/real-time/api/psi 2 返回数据 API 的数据结构可以分为 3 大部分&#xff1a; 区域元数据&#xff08;regionMetadata&#xff09; →…...

计算机网络数据传输探秘:包裹如何在数字世界旅行?

计算机网络数据传输探秘:包裹如何在数字世界旅行? 一、从快递网络看数据传输本质 想象你网购了一件商品: 打包:商家用纸箱包装,贴上地址标签(数据封装)运输:包裹经过网点→分拣中心→运输车(网络节点与链路)签收:快递员核对信息后交付(数据校验与接收)数据的网络…...

笔记:代码随想录算法训练营day36:LeetCode1049. 最后一块石头的重量 II、494. 目标和、474.一和零

学习资料&#xff1a;代码随想录 1049.最后一块石头的重量II 力扣题目链接 思路&#xff1a;如何讲该问题转化为背包问题&#xff1a;还是对半分去碰&#xff0c;对半分去碰碰剩下的就是最小的。然后背包容量就是一半儿&#xff0c;物品重量等于物品价值等于stones[i] 和上…...

Bitmap -> Bitmap安卓设备上的显示和内存

Android 屏幕显示与 Bitmap 内存详解 前言 在 Android 开发中&#xff0c;理解屏幕显示单位和 Bitmap 内存占用是构建高效应用的基础。本文将详细介绍相关概念、计算公式及单位转换&#xff0c;并通过实例分析 Bitmap 在内存中的表现。 一、屏幕显示单位基础 1.1 基本单位及…...

QT study DAY2

作业 代码 Widget.h class Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);~Widget();void save_data(const QString& filename,const QString& data); private slots:void on_lineEdit_textChanged(); //账户栏void on_l…...

QT-自定义参数设计框架软件

QT-自定义参数设计框架软件 Chapter1 QT-自定义参数设计框架软件前言一、演示效果二、使用步骤1.应用进行参数注册2.数据库操作单例对象3.参数操作单例对象 三、下载链接 Chapter2 Qt中管理配置参数&#xff08;QSettings、单例模式&#xff09;1 前言2 QSettings类ini文件写in…...

VUE集成Live2d

VUE集成Live2d 目前基于大模型&#xff0c;可以实现一个桌面的3D动画小人&#xff0c;个人猜测可以简介这个项目进行实现 1-参考网址 试了很多项目&#xff0c;只有这个项目直观的把问题说清楚了 Live2D Vue3技术应用:https://blog.csdn.net/hh1233321/article/details/1406947…...

【CPP面经】科大讯飞 腾讯后端开发面经分享

文章目录 C 面试问题整理基础问题简答1. 内存对齐2. this 指针3. 在成员函数中删除 this4. 引用占用内存吗&#xff1f;5. C 越界访问场景6. 进程通信方式7. 无锁队列实现8. ping 在哪一层&#xff1f;实现原理&#xff1f;9. HTTPS 流程10. GDB 使用及 CPU 高使用定位11. 智能…...

el-card 结合 el-descriptions 作为信息展示

记录下el-card 组合 el-descriptions 实现动态展示信息 文章结构 实现效果1. el-descriptions 组件使用1.1 结合v-for实现列表渲染1.2 解析 2. 自定义 el-descriptions 样式2.1 修改背景色、字体颜色2.2 调整字体大小2.3 解析 3. el-card 结合 el-descriptions 作为信息展示3.…...

GaussDB自带诊断工具实战指南

一、引言 GaussDB是一种分布式的关系型数据库。在数据库运维中&#xff0c;快速定位性能瓶颈、诊断故障是保障业务连续性的关键。GaussDB内置了多种诊断工具&#xff0c;结合日志分析、执行计划解析和实时监控功能&#xff0c;帮助开发者与运维人员高效解决问题。本文深入讲解…...

LeetCode 链表章节

简单 21. 合并两个有序链表 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1&#xff1a; 输入&#xff1a;l1 [1,2,4], l2 [1,3,4] 输出&#xff1a;[1,1,2,3,4,4]示例 2&#xff1a; 输入&#xff1a;l1 [], l2…...

SSL证书和HTTPS:全面解析它们的功能与重要性

每当我们在互联网上输入个人信息、进行在线交易时&#xff0c;背后是否有一个安全的保障&#xff1f;这时&#xff0c;SSL证书和HTTPS便扮演了至关重要的角色。本文将全面分析SSL证书和HTTPS的含义、功能、重要性以及它们在网络安全中的作用。 一、SSL证书的定义与基本概念 S…...

正交投影与内积空间:机器学习的几何基础

前言 本文隶属于专栏《机器学习数学通关指南》&#xff0c;该专栏为笔者原创&#xff0c;引用请注明来源&#xff0c;不足和错误之处请在评论区帮忙指出&#xff0c;谢谢&#xff01; 本专栏目录结构和参考文献请见《机器学习数学通关指南》 正文 &#x1f50d; 1. 内积空间的…...

Qt中txt文件输出为PDF格式

main.cpp PdfReportGenerator pdfReportGenerator;// 加载中文字体if (QFontDatabase::addApplicationFont(":/new/prefix1/simsun.ttf") -1) {QMessageBox::warning(nullptr, "警告", "无法加载中文字体");}// 解析日志文件QVector<LogEntr…...

《HelloGitHub》第 107 期

兴趣是最好的老师&#xff0c;HelloGitHub 让你对编程感兴趣&#xff01; 简介 HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。 github.com/521xueweihan/HelloGitHub 这里有实战项目、入门教程、黑科技、开源书籍、大厂开源项目等&#xff0c;涵盖多种编程语言 Python、…...

Langchain解锁LLM大语言模型的结构化输出能力(多种实现方案)

在 LangChain解锁LLM大语言模型的结构化输出能力&#xff1a;调用 with_structured_output() 方法 这篇博客中&#xff0c;我们了解了格式化LLM输出内容的必要性以及如何通过调用langchain框架中提供的 with_structured_output() 方法对LLM输出进行格式化&#xff08;三种可选方…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 &#xff08;部分有免费额度&#x…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具

第2章 虚拟机性能监控&#xff0c;故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令&#xff1a;jps [options] [hostid] 功能&#xff1a;本地虚拟机进程显示进程ID&#xff08;与ps相同&#xff09;&#xff0c;可同时显示主类&#x…...

rnn判断string中第一次出现a的下标

# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...