Python 实现的风控系统(使用了kafka、Faust、模拟drools、redis、分布式数据库)
以下是一个使用 Python 实现的风控系统示例,涵盖以下技术组件:
- Kafka 消息中间件:用于实时接收支付业务系统传递的交易数据。
- Faust(Kafka Streams 的 Python 等价):用于流式处理 Kafka 中的消息。
- 规则引擎:使用 Python 实现简单的规则评估逻辑,模拟 Drools 的功能。
- Redis 内存数据库:用于存储风险标签,快速获取账户的风险级别。
- 分布式数据库:使用 SQLite 模拟,从中获取风险标签数据(当 Redis 中没有时)。
我们将构建一个简单的风控系统,流程如下:
- 从 Kafka 中消费实时交易数据。
- 从 Redis 获取对应的风险标签,如果没有则从分布式数据库获取并更新到 Redis。
- 使用规则引擎对交易数据和风险标签进行评估。
- 将评估结果返回给支付业务系统或记录下来。
-
实时交易模块:接收交易数据 ——> 获取风险标签(Redis) ——> 调用规则引擎 ——> 评估结果返回↓ ↓ ↑ 规则引擎模块:交易数据 + 风险标签 ---> 规则执行 ----> 输出评估结果(通过/拒绝)
项目结构和依赖
1. 项目结构
risk_control_demo/
├── app.py # 主应用程序
├── models.py # 数据模型定义
├── rules.py # 规则引擎逻辑
├── database.py # 数据库服务类
├── redis_service.py # Redis 服务类
├── requirements.txt # 项目依赖
└── producer.py # Kafka 生产者,发送测试数据
2. 项目依赖(requirements.txt)
faust==1.10.4
redis==4.5.5
aiokafka==0.7.2
sqlite3==0.0.1
安装依赖
pip install -r requirements.txt
详细代码
1. models.py(数据模型定义)
# models.py
from dataclasses import dataclass@dataclass
class Transaction:transaction_id: straccount_id: stramount: floattimestamp: float@dataclass
class RiskTag:account_id: strrisk_level: int # 1-低风险, 2-中风险, 3-高风险
2. database.py(数据库服务类)
# database.py
import sqlite3
from models import RiskTagclass DatabaseService:def __init__(self):# 连接 SQLite 数据库,内存模式self.conn = sqlite3.connect(':memory:')self.initialize_database()def initialize_database(self):cursor = self.conn.cursor()# 创建风险标签表cursor.execute('''CREATE TABLE IF NOT EXISTS risk_tags (account_id TEXT PRIMARY KEY,risk_level INTEGER)''')# 插入示例数据cursor.execute('''INSERT INTO risk_tags (account_id, risk_level) VALUES ('account123', 2)''')self.conn.commit()def get_risk_tag(self, account_id):cursor = self.conn.cursor()cursor.execute('SELECT risk_level FROM risk_tags WHERE account_id = ?', (account_id,))result = cursor.fetchone()if result:return RiskTag(account_id, result[0])else:return Nonedef close(self):self.conn.close()
3. redis_service.py(Redis 服务类)
# redis_service.py
import redis
from models import RiskTagclass RedisService:def __init__(self, host='localhost', port=6379):self.redis_client = redis.Redis(host=host, port=port, decode_responses=True)def get_risk_tag(self, account_id):risk_level = self.redis_client.get(f'risk:{account_id}')if risk_level:return RiskTag(account_id, int(risk_level))return Nonedef set_risk_tag(self, risk_tag):self.redis_client.set(f'risk:{risk_tag.account_id}', risk_tag.risk_level)def close(self):self.redis_client.close()
4. rules.py(规则引擎逻辑)
# rules.py
from models import Transaction, RiskTagclass RiskEvaluator:def evaluate(self, transaction: Transaction, risk_tag: RiskTag) -> bool:"""返回 True 表示交易存在风险,需要阻止。返回 False 表示交易安全,可以通过。"""# 高风险交易规则if transaction.amount > 10000 and risk_tag.risk_level == 3:print(f"检测到高风险交易:{transaction}")return True # 阻止交易# 中风险交易规则if 5000 < transaction.amount <= 10000 and risk_tag.risk_level >= 2:print(f"检测到中风险交易:{transaction}")return True # 阻止交易# 低风险交易规则print(f"交易通过:{transaction}")return False # 允许交易
5. app.py(主应用程序)
# app.py
import faust
import asyncio
import json
from models import Transaction, RiskTag
from database.py import DatabaseService
from redis_service import RedisService
from rules import RiskEvaluator# 定义 Faust 应用
app = faust.App('risk_control_app',broker='kafka://localhost:9092',value_serializer='raw',
)# 定义 Kafka 主题
transaction_topic = app.topic('transaction_topic')# 初始化服务
redis_service = RedisService()
database_service = DatabaseService()
risk_evaluator = RiskEvaluator()@app.agent(transaction_topic)
async def process_transaction(stream):async for event in stream:try:# 解析交易数据data = json.loads(event)transaction = Transaction(transaction_id=data['transaction_id'],account_id=data['account_id'],amount=data['amount'],timestamp=data['timestamp'])# 从 Redis 获取风险标签risk_tag = redis_service.get_risk_tag(transaction.account_id)if not risk_tag:# 如果 Redis 中没有,从数据库获取并更新到 Redisrisk_tag = database_service.get_risk_tag(transaction.account_id)if risk_tag:redis_service.set_risk_tag(risk_tag)else:# 如果数据库中也没有,设定默认风险标签risk_tag = RiskTag(transaction.account_id, 1)# 使用规则引擎进行风险评估is_risky = risk_evaluator.evaluate(transaction, risk_tag)# 根据评估结果进行处理if is_risky:print(f"交易 {transaction.transaction_id} 存在风险,执行阻止操作")# TODO: 将结果返回给支付业务系统,阻止交易else:print(f"交易 {transaction.transaction_id} 安全,允许通过")# TODO: 将结果返回给支付业务系统,允许交易except Exception as e:print(f"处理交易时发生错误:{e}")if __name__ == '__main__':app.main()
注释:
- 使用 Faust 定义 Kafka Streams 应用程序,处理
transaction_topic中的消息。 - 在
process_transaction函数中,逐条处理交易数据。 - 从 Redis 获取风险标签,如果没有则从数据库获取并更新到 Redis。
- 使用自定义的
RiskEvaluator进行风险评估,根据评估结果执行相应的操作
6. producer.py(Kafka 生产者,发送测试数据)
# producer.py
from kafka import KafkaProducer
import json
import timeproducer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8')
)# 创建示例交易数据
transaction_data = {'transaction_id': 'tx1001','account_id': 'account123','amount': 12000.0,'timestamp': time.time()
}# 发送交易数据到 Kafka
producer.send('transaction_topic', transaction_data)
producer.flush()
print(f"已发送交易数据:{transaction_data}")
producer.close()
运行示例
1. 启动必要的服务
注意事项
总结
上述示例提供了一个基本的 Python 程序框架,演示了如何将 Kafka、Faust、Redis、规则引擎和分布式数据库集成在一起,完成实时风控的基本功能。您可以根据具体的业务需求和技术环境,对程序进行扩展和优化。
扩展建议:
-
Redis:确保 Redis 服务在本地的
6379端口运行 -
redis-serverKafka:确保 Kafka 服务在本地的
9092端口运行,并创建主题transaction_topic。 -
# 启动 Zookeeper zookeeper-server-start.sh config/zookeeper.properties # 启动 Kafka kafka-server-start.sh config/server.properties # 创建主题 kafka-topics.sh --create --topic transaction_topic --bootstrap-server localhost:90922. 运行应用程序
-
启动风控系统(
app.py): -
python app.py worker -l info运行 Kafka 生产者,发送交易数据(
producer.py): -
python producer.py3. 预期输出
风控系统将处理交易数据,使用规则引擎进行评估,并根据规则打印评估结果。例如:
-
检测到高风险交易:Transaction(transaction_id='tx1001', account_id='account123', amount=12000.0, timestamp=...) 交易 tx1001 存在风险,执行阻止操作说明
- Faust:Python 的流式处理库,类似于 Kafka Streams,用于处理 Kafka 中的消息流。
- 规则引擎:使用 Python 自定义规则评估逻辑,模拟 Drools 的功能。
- Redis:作为缓存,存储风险标签,快速获取账户的风险级别。
- 分布式数据库(SQLite 模拟):当 Redis 中没有风险标签时,从数据库获取,并更新到 Redis。
- 风险标签:简单地使用风险级别(1-低风险,2-中风险,3-高风险)来表示。
- 异常处理:在实际应用中,需要更完善的异常处理机制,防止因异常导致程序崩溃。
- 引入异步 Redis 客户端:使用
aioredis提升 Redis 操作的性能。 - 使用真正的分布式数据库:替换 SQLite,使用例如 PostgreSQL、MySQL 等数据库,并配置集群模式。
- 完善规则引擎:使用现有的 Python 规则引擎库(如
durable_rules、experta)实现更复杂的规则逻辑。 - 添加日志和监控:集成日志系统和监控工具,便于维护和故障排查。
- 性能优化:对于高并发场景,需要考虑异步 I/O、连接池等技术优化性能。
- 配置管理:将硬编码的配置(如主机地址、端口、主题名)提取到配置文件或环境变量中,便于管理和修改。
- 安全性:在生产环境中,注意保护敏感信息,确保数据传输和存储的安全。
相关文章:
Python 实现的风控系统(使用了kafka、Faust、模拟drools、redis、分布式数据库)
以下是一个使用 Python 实现的风控系统示例,涵盖以下技术组件: Kafka 消息中间件:用于实时接收支付业务系统传递的交易数据。Faust(Kafka Streams 的 Python 等价):用于流式处理 Kafka 中的消息。规则引擎…...
Linux运维_Rocky8 安装配置Zabbix
Zabbix 是一个开源的监控解决方案,用于监控网络、服务器、应用程序和服务的性能。它提供实时监控、数据收集、告警通知以及图形化界面,方便用户查看和分析监控数据。Zabbix 支持多种数据收集方式,包括 SNMP、IPMI、JMX 和自定义脚本ÿ…...
jQuery Mobile 滚屏事件
jQuery Mobile 滚屏事件 在移动开发中,滚屏事件是一个非常重要的交互方式,它可以让用户通过滚动屏幕来浏览内容。jQuery Mobile 是一个流行的移动框架,它提供了一套丰富的组件和事件,使得在移动设备上实现滚屏效果变得简单。本文将详细介绍 jQuery Mobile 中的滚屏事件,包…...
3.1.1ReactOS系统中搜索给定长度的空间地址区间函数的实现
系列文章目录 //搜索给定长度的空间地址区间 MmFindGap(); PMADDRESS_SPACE AddressSpace,//该进程用户空间 ULONG_PTR Length,//寻找的空间间隔大小 ULONG_PTR Granularity,//粒度位,表明空间起点的对齐要求,注意是起…...
arm64系统不支持32位的解决armel armhf
初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github:codetoys,所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的,可以在任何平台上使用。 源码指引:github源…...
【毕业设计】工具大礼包之『Maven3.6.3安装与配置』
系统版本 电脑系统:Windows 10 一.Maven下载 🎯 统一版本 apache-maven-3.6.3,下面两种下载方式2选1即可 1.官网直下 官网下载地址 https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/ 找到apache-maven-3.6.3-bin.zip 云盘…...
gin入门教程(9):路由分组与路由版本控制
在使用 Gin 框架构建 RESTful API 时,路由分组与版本控制是一种常见的实践,可以帮助你更好地管理不同版本的 API。下面是如何在 Gin 中实现路由分组和版本控制的示例。 目录结构 /hello-gin │ ├── cmd/ │ └── main.go ├── api/ │ ├── v1/ │ │ └─…...
rt-thread移植SystemView中遇到的问题
源代码地址dujunqiu/SystemView 我使用的rt-thread版本是5.2.0,应该是rt-thread适配的还有点问题 报错处理 1:warning: #223-D: function “typeof” declared implicitly 如下 typedef 的warning是C99规范没有typedef的定义,需要在keii中…...
【C++STL】list的模拟实现
✨ Blog’s 主页: 白乐天_ξ( ✿>◡❛) 🌈 个人Motto:他强任他强,清风拂山冈! 🔥 所属专栏:C深入学习笔记 💫 欢迎来到我的学习笔记! 一、三个类与成员函数接口 在list.…...
以30个面试问题和案例为导向:全面解析 Java Servlet是什么?基本概念、实现原理、生命周期、类结构、请求与响应的处理机制,以及性能优化和安全性管理
Servlet 是 Java Web 开发的核心组件之一,负责处理客户端请求并生成动态响应。本文将深入探讨 Servlet 的基本概念、实现原理、生命周期、类结构、请求与响应的处理机制,以及性能优化和安全性管理,帮助开发者从多方面掌握 Servlet。 文章目录…...
MFC小游戏设计
框架: 各个界面: 用户: 登录注册:账号和密码(昵称) 主菜单:各种游戏,查看自己信息(积分,装备【游戏数据】),退出 游戏界面&#…...
[漏洞挖掘与防护] 04.Windows系统安全缺陷之5次Shift漏洞启动计算机机理分析
这是作者新开的一个专栏——“漏洞挖掘与防护”,前期会复现各种经典和最新漏洞,并总结防护技巧;后期尝试从零学习漏洞挖掘技术,包括Web漏洞和二进制及IOT相关漏洞,以及Fuzzing技术。新的征程,新的开启,漫漫长征路,偏向虎山行。享受过程,感谢您的陪伴,一起加油~ 欢迎关…...
手机极简待办app哪款好用?
在快节奏的现代生活中,我们常常需要处理大量的任务和信息,这时候一款好用的极简待办软件就显得尤为重要。它们不仅能帮助我们记录和管理待办事项,还能提高我们的工作效率和生活质量。 在众多的待办软件中,敬业签是一款非常受欢迎…...
SpringBoot高级-底层原理
目录 1 SpringBoot自动化配置原理 01-SpringBoot2高级-starter依赖管理机制 02-SpringBoot2高级-自动化配置初体验 03-SpringBoot2高级-底层原理-Configuration配置注解 04-SpringBoot2高级-底层原理-Import注解使用1 05-SpringBoot2高级-底层原理-Import注解使用2 06-S…...
LabVIEW提高开发效率技巧----插入式架构
随着LabVIEW项目规模的扩大和系统复杂性的增加,传统的单一代码架构难以应对后期维护和功能扩展的需求。插入式架构(Plug-In Architecture)作为一种模块化设计方式,通过动态加载和运行子VI,使系统功能更加灵活、模块化&…...
MySQL COUNT(*)、COUNT(1)、COUNT(id)、COUNT(字段)效果及性能
文章目录 前言COUNT(exper)COUNT(*)优化COUNT(*) 与COUNT(1) COUNT(1)COUNT(id)COUNT(字段)总结参考 前言 业务开发中,我们经常要使用count做一些数据统计。今天根据MySQL5.7官方文档及丁奇老师的MySQL45讲,介绍一下COUNT(*)、COUNT(1)、COUNT(id)、COU…...
webpack4 - 动态导入文件 dynamic-import 报错的解决方法
介绍 webpack4动态导入文件报错,按照错误提示安装了插件,但未果。。 最后查到一个可行方案,记录如下。 1.通过懒加载的方式动态引入文件 const router new Router({routes: [{path: /home,name: Home,component: () >import(./views/h…...
【NodeJS】NodeJS+mongoDB在线版开发简单RestfulAPI (四):状态码的使用
本项目旨在学习如何快速使用 nodejs 开发后端api,并为以后开展其他项目的开启提供简易的后端模版。(非后端工程师) 由于文档是代码写完之后,为了记录项目中需要注意的技术点,因此文档的叙述方式并非开发顺序࿰…...
springboot061基于B2B平台的医疗病历交互系统(论文+源码)_kaic
摘 要 进入21世纪,计算机技术迅速向着网络化的、集成化方向发展。传统的单机版应用软件正在逐渐退出舞台,取而代之的是支持网络、支持多种数据信息的新一代网络版应用软件,形成了信息化的社会。信息化社会的形成和微电子技术日新月异的发展&…...
基于FFT + CNN -Transformer时域、频域特征融合的电能质量扰动识别模型
往期精彩内容: Python-电能质量扰动信号数据介绍与分类-CSDN博客 Python电能质量扰动信号分类(一)基于LSTM模型的一维信号分类-CSDN博客 Python电能质量扰动信号分类(二)基于CNN模型的一维信号分类-CSDN博客 Python电能质量扰动信号分类(三)基于Transformer的一…...
23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
IP如何挑?2025年海外专线IP如何购买?
你花了时间和预算买了IP,结果IP质量不佳,项目效率低下不说,还可能带来莫名的网络问题,是不是太闹心了?尤其是在面对海外专线IP时,到底怎么才能买到适合自己的呢?所以,挑IP绝对是个技…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
掌握 HTTP 请求:理解 cURL GET 语法
cURL 是一个强大的命令行工具,用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中,cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...
