当前位置: 首页 > article >正文

ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统

ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐


📚 目录

  • ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐
    • 一、背景🚀
    • 二、系统整体架构 🏗️
    • 三、实战展示 🛠️:交易行为告警系统
      • 3.1 ABP 采集交易事件 📝
        • CAP + Outbox 配置示例 💼
      • 3.2 Flink CEP 模式与 Exactly-Once ⚡
      • 3.3 Redis Stream + SignalR 实时推送 🔔
    • 四、生产级部署和监控 📈
    • 五、自动化测试 🧪


一、背景🚀

在金融 💰、电商 🛒、IoT 🌐 等高频交互系统中,越来越多的场景需要“实时发现问题并响应”。


二、系统整体架构 🏗️

Publish Event
消费 Transaction
写入警报
推送警报
读取警报
实时推送
ABP VNext API
Kafka: transactions
Flink CEP Job
PostgreSQL Sink
Redis Stream
RiskAlertWorker
SignalR Hub

💡 图示展示了各组件之间的数据流向,实现消息解耦和高可用。


三、实战展示 🛠️:交易行为告警系统

3.1 ABP 采集交易事件 📝

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Distributed;public class TransactionCreatedDomainEvent : DomainEvent
{public Guid UserId { get; set; }public decimal Amount { get; set; }public string Location { get; set; }
}public class TransactionCreatedHandler : IDistributedEventHandler<TransactionCreatedDomainEvent>
{private readonly IDistributedEventBus _eventBus;private readonly ILogger<TransactionCreatedHandler> _logger;public TransactionCreatedHandler(IDistributedEventBus eventBus,ILogger<TransactionCreatedHandler> logger){_eventBus = eventBus;_logger = logger;}public async Task HandleEventAsync(TransactionCreatedDomainEvent eventData){var eto = new TransactionCreatedEto{UserId = eventData.UserId,Amount = eventData.Amount,Location = eventData.Location,OccurredAt = Clock.Now};try{await _eventBus.PublishAsync(eto);}catch (Exception ex){_logger.LogError(ex, "发布交易事件失败:{UserId}", eventData.UserId);throw;}}
}
CAP + Outbox 配置示例 💼
// appsettings.json
"Cap": {"UseEntityFramework": true,"UseDashboard": true,"Producer": {"Kafka": { "Servers": "localhost:9092" }},"Outbox": { "TableName": "CapOutboxMessages" }
}

3.2 Flink CEP 模式与 Exactly-Once ⚡

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.eventtime._
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import java.time.Durationval env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.setStateBackend(new RocksDBStateBackend("file:///flink-checkpoints"))
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)val watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness[Transaction](Duration.ofSeconds(5)).withTimestampAssigner((event, _) => event.timestamp.toEpochMilli)val stream = env.addSource(new FlinkKafkaConsumer[Transaction]("transactions", deserializer, props)).assignTimestampsAndWatermarks(watermarkStrategy)val pattern = Pattern.begin[Transaction]("first").where(_.amount > 10000).next("second").where(new IterativeCondition[Transaction] {override def filter(event: Transaction, ctx: IterativeCondition.Context[Transaction]) = {val first = ctx.getEventsForPattern("first").iterator().next()event.location != first.location}}).within(Time.minutes(5))pattern.handleTimeout(new PatternTimeoutFunction[Transaction, Unit] {override def timeout(map: java.util.Map[String, java.util.List[Transaction]], timestamp: Long, out: Collector[Unit]): Unit = {// 超时清理逻辑}
}, Time.minutes(5))
ABP API Kafka Flink Redis Worker SignalR 发布交易事件 消费并处理流 推送警报 StreamReadGroup SignalR 推送 ABP API Kafka Flink Redis Worker SignalR

💡 建议全链路使用 Schema Registry 管理消息格式,防止兼容性问题。


3.3 Redis Stream + SignalR 实时推送 🔔

using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;public class RiskAlertWorker : BackgroundService
{private readonly IConnectionMultiplexer _redis;private readonly IHubContext<RiskAlertHub> _hubContext;private readonly ILogger<RiskAlertWorker> _logger;public RiskAlertWorker(IConnectionMultiplexer redis,IHubContext<RiskAlertHub> hubContext,ILogger<RiskAlertWorker> logger){_redis = redis;_hubContext = hubContext;_logger = logger;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){var db = _redis.GetDatabase();try { await db.StreamCreateConsumerGroupAsync("risk-alerts", "alert-group", "$", true); }catch { /* 忽略 BUSYGROUP */ }int backoff = 1000;while (!stoppingToken.IsCancellationRequested){try{var entries = await db.StreamReadGroupAsync("risk-alerts", "alert-group", "consumer-1",count: 10, flags: CommandFlags.Block(5000));foreach (var entry in entries){var alert = JsonSerializer.Deserialize<RiskEventDto>(entry["data"]!);await _hubContext.Clients.Group(alert.UserId.ToString()).SendAsync("ReceiveAlert", alert, stoppingToken);await db.StreamAcknowledgeAsync("risk-alerts", "alert-group", entry.Id);}backoff = 1000;}catch (Exception ex){_logger.LogError(ex, "处理 Redis 告警失败");await Task.Delay(backoff, stoppingToken);backoff = Math.Min(backoff * 2, 16000);}}}
}[Authorize]
public class RiskAlertHub : Hub { }

四、生产级部署和监控 📈

组件推荐配置
ABP 后端Pod 存活/就绪探针 ✅ + HTTPS 🔒 + Serilog→Elasticsearch Sink 📝 + CAP Outbox
Kafkaenable.idempotence=true 🔁, acks=all ✅, TLS/SASL 🔐
FlinkRocksDBStateBackend ⚙️ + EXACTLY_ONCE ⚡ + State TTL 🕒 + HA 🌟
RedisRedis Cluster 🔄 + AOF 📝 + ACL 🔑 + 阻塞消费 ⏳
PostgreSQL主从流复制 🛠️ + WAL 日志 📜 + TimescaleDB 插件 📊
SignalRAzure SignalR ☁️ / Redis Backplane 🔄 + JWT 鉴权 🔏
# Flink YAML 示例
state.backend: rocksdb
checkpointing:interval: 10smode: EXACTLY_ONCEexternalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# Flink Prometheus Reporter
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250

📊 在 Grafana 中可视化:Kafka TPS、Flink 延迟分位、Redis 消费速率、ABP 请求成功率/错误率。


五、自动化测试 🧪

// Testcontainers 启动依赖
var kafka = new KafkaContainer().StartAsync().GetAwaiter().GetResult();
var redis = new RedisContainer().StartAsync().GetAwaiter().GetResult();
var postgres = new PostgreSqlContainer().StartAsync().GetAwaiter().GetResult();// 注入到 ABP 测试模块
context.Services.Configure<CapOptions>(opts => {opts.ProducerConnectionString = kafka.GetBootstrapAddress();opts.OutboxTableName = "CapOutboxMessages";
});// Flink MiniCluster
var flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().Build());
flinkCluster.Start();

相关文章:

ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统

ABP VNext Apache Flink 实时流计算&#xff1a;打造高可用“交易风控”系统 &#x1f310; &#x1f4da; 目录 ABP VNext Apache Flink 实时流计算&#xff1a;打造高可用“交易风控”系统 &#x1f310;一、背景&#x1f680;二、系统整体架构 &#x1f3d7;️三、实战展示…...

前端面试题-HTML篇

1. 请谈谈你对 Web 标准以及 W3C 的理解和认识。 我对 Web 标准 的理解是&#xff0c;它就像是互联网世界的“交通规则”&#xff0c;由 W3C&#xff08;World Wide Web Consortium&#xff0c;万维网联盟&#xff09; 这样一个国际性组织制定。这些规则规范了我们在编写 HTML…...

JS数组 concat() 与扩展运算符的深度解析与最佳实践

文章目录 前言一、语法对比1. Array.prototype.concat()2. 扩展运算符&#xff08;解构赋值&#xff09; 二、性能差异&#xff08;大规模数组&#xff09;关键差异原因 三、适用场景建议总结 前言 最近工作中遇到了一个大规模数组合并相关的问题&#xff0c;在数据合并时有些…...

人工智能与机器学习从理论、技术与实践的多维对比

人工智能(Artificial Intelligence, AI)提出“让机器像人类一样思考”的目标,其核心理论围绕符号系统假设展开——认为智能行为可通过逻辑符号系统(如谓词逻辑、产生式规则)建模。 机器学习(Machine Learning, ML)是人工智能的子集,聚焦于通过数据自动改进算法性能的理…...

Netty 实战篇:手写一个轻量级 RPC 框架原型

本文将基于前文实现的编解码与心跳机制&#xff0c;构建一个简单的 RPC 框架&#xff0c;包括请求封装、响应解析、动态代理调用。为打造微服务通信基础打下基础。 一、什么是 RPC&#xff1f; RPC&#xff08;Remote Procedure Call&#xff0c;远程过程调用&#xff09;允许…...

什么是 WPF 技术?什么是 WPF 样式?下载、安装、配置、基本语法简介教程

什么是 WPF 技术&#xff1f;什么是 WPF 样式&#xff1f;下载、安装、配置、基本语法简介教程 摘要 WPF教程、WPF开发、.NET 8 WPF、Visual Studio 2022 WPF、WPF下载、WPF安装、WPF配置、WPF样式、WPF样式详解、XAML语法、XAML基础、MVVM架构、数据绑定、依赖属性、资源字典…...

亚远景-ISO 21434标准:汽车网络安全实践的落地指南

一、ISO 21434标准概述 ISO 21434是针对道路车辆网络安全的国际标准&#xff0c;旨在确保汽车组织在车辆的整个生命周期内采用结构化方法进行网络安全风险管理。 该标准适用于参与车辆开发的所有利益相关者&#xff0c;包括OEM、一级和二级供应商、汽车软件供应商以及网络安全…...

【动手学深度学习】2.4. 微积分

目录 2.4. 微积分1&#xff09;导数和微分2&#xff09;偏导数3&#xff09;梯度4&#xff09;链式法则5&#xff09;小结 . 2.4. 微积分 微积分的起源&#xff1a; 古希腊人通过逼近法&#xff08;多边形边数↑ → 面积逼近圆&#xff09;发展出积分的思想。 微分&#xff…...

流程自动化引擎:让业务自己奔跑

在当今竞争激烈的商业环境中&#xff0c;企业面临着快速变化的市场需求、日益复杂的业务流程以及不断增长的运营成本。如何优化业务流程、提升效率并降低成本&#xff0c;成为企业持续发展的关键问题。 流程自动化引擎&#xff08;Process Automation Engine&#xff09;作为一…...

AI炼丹日志-23 - MCP 自动操作 自动进行联网检索 扩展MCP能力

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇&#xff1a; MyBatis 更新完毕目前开始更新 Spring&#xff0c;一起深入浅出&#xff01; 大数据篇 300&#xff1a; Hadoop&…...

用 Python 模拟雪花飘落效果

用 Python 模拟雪花飘落效果 雪花轻轻飘落&#xff0c;给冬日带来一份浪漫与宁静。本文将带你用一份简单的 Python 脚本&#xff0c;手把手实现「雪花飘落效果」动画。文章深入浅出&#xff0c;零基础也能快速上手&#xff0c;完整代码仅需一个脚本文件即可运行。 目录 前言…...

基于定制开发开源AI智能名片S2B2C商城小程序的大零售渗透策略研究

摘要&#xff1a;本文聚焦“一切皆零售”理念下的大零售渗透趋势&#xff0c;提出以定制开发开源AI智能名片S2B2C商城小程序为核心工具的渗透策略。通过分析该小程序在需求感应、场景融合、数据驱动等方面的技术优势&#xff0c;结合零售渗透率提升的关键路径&#xff0c;揭示其…...

重拾Scrapy框架

基于Scrapy框架实现 舔狗语录百度翻译 输出结果到txt文档 爬虫脚本 from typing import Iterable, Any, AsyncIteratorimport scrapy import json from post.items import PostItemclass BaidufanyiSpider(scrapy.Spider):name "baidufanyi"allowed_domains [&quo…...

Day 40

单通道图片的规范写法 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader , Dataset from torchvision import datasets, transforms import matplotlib.pyplot as plt import warnings warnings.filterwarnings(&q…...

XPlifeapp:高效打印,便捷生活

在数字化时代&#xff0c;虽然电子设备的使用越来越普遍&#xff0c;但打印的需求依然存在。无论是学生需要打印课表、资料&#xff0c;还是职场人士需要打印名片、报告&#xff0c;一个高效便捷的打印软件都能大大提高工作效率。XPlifeapp就是这样一款超级好用的手机打印软件&…...

等保测评-Mysql数据库测评篇

Mysql数据库测评 0x01 前言 "没有网络安全、就没有国家安全" 等保测评是什么&#xff1f; 等保测评&#xff08;网络安全等级保护测评&#xff09;是根据中国《网络安全法》及相关标准&#xff0c;对信息系统安全防护能力进行检测评估的法定流程。其核心依据《信…...

CSS篇-2

4. position 的值分别是相对于哪个位置定位的&#xff1f; position 属性是 CSS 布局中一个非常核心的概念&#xff0c;它允许我们精确控制元素在文档中的定位方式&#xff0c;从而脱离或部分脱离正常的文档流。理解 position 的不同值以及它们各自的定位基准&#xff0c;是实…...

02.K8S核心概念

服务的分类 有状态服务&#xff1a;会对本地环境产生依赖&#xff0c;例如需要把数据存储到本地磁盘&#xff0c;如mysql、redis&#xff1b; 无状态服务&#xff1a;不会对本地环境产生任何依赖&#xff0c;例如不会存储数据到本地磁盘&#xff0c;如nginx、apache&#xff…...

一套qt c++的串口通信

实现了创建线程使用串口的功能 具备功能: 1.线程使用串口 2.定时发送队列内容&#xff0c;防止粘包 3.没处理接收粘包&#xff0c;根据你的需求来&#xff0c;handleReadyRead函数中&#xff0c;可以通过m_receiveBuffer来缓存接收&#xff0c;然后拆分数据来处理 源码 seri…...

【高频面试题】数组中的第K个最大元素(堆、快排进阶)

文章目录 数组中的第K个最大元素题目描述示例1示例2提示&#xff1a; 解法1&#xff08;堆维护前k大元素&#xff09;解法2 手写堆维护解法3&#xff08;快速选择算法&#xff09;例题&#xff1a;P1923 【深基9.例4】求第 k 小的数参考 数组中的第K个最大元素 题目描述 给定…...

Java互联网大厂面试:从Spring Boot到Kafka的技术深度探索

Java互联网大厂面试&#xff1a;从Spring Boot到Kafka的技术深度探索 在某家互联网大厂的面试中&#xff0c;面试官A是一位技术老兵&#xff0c;而被面试者谢飞机&#xff0c;号称有丰富的Java开发经验。以下是他们的面试情景&#xff1a; 场景&#xff1a;电商平台的后端开发…...

基于Python的单斜式ADC建模与仿真分析

基于Python的单斜式ADC建模与仿真分析 1 引言 CMOS图像传感器的读出电路中,列级ADC因其面积效率高(每列共享ADC)、功耗低(并行工作降低频率需求)和固定模式噪声小(结构对称性高)等优势成为大像素阵列的首选方案。本文针对50KS/s采样率、10位分辨率的单斜式ADC进行系统…...

笔记本电脑右下角wifi不显示,连不上网怎么办?

解决思路&#xff1a;设备管理器--先禁用wifi6硬件-再启用wifi6硬件&#xff08;20秒搞定&#xff09; 笔记本电脑右下角的wifi经常莫名其妙的不显示&#xff0c;连不上网&#xff0c;感觉应该是与什么程序不兼容&#xff0c;导致wifi模块被办掉了&#xff0c;怎么这种情况出现…...

一篇文章玩转CAP原理

CAP 原理是分布式系统设计的核心理论之一&#xff0c;揭示了系统设计中的 根本性权衡。 一、CAP 的定义 CAP 由三个核心属性组成&#xff0c;任何分布式系统最多只能同时满足其中两个&#xff1a; 一致性&#xff08;Consistency&#xff09; 所有节点在同一时刻看到的数据完全…...

Vue-收集表单信息

收集表单信息 Input label for 和 input id 关联, 点击账号标签 也能聚焦 input 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><title>表单数据</title><!-- 引入Vue --><scrip…...

私服 nexus 之间迁移 npm 仓库

本文介绍如何将一个 Nexus 特定仓库中的 npm 包内容迁移到另一个 Nexus 特定仓库。此过程适用于需要重构仓库结构或合并仓库的场景。 迁移脚本 以下是完整的迁移脚本&#xff0c;它会自动完成以下操作&#xff1a; 从源仓库获取所有 npm 包列表下载每个包的 .tgz 文件解压并…...

微服务及容器化设计--可扩展的架构设计

引言 在当今快速发展的技术环境中&#xff0c;企业需要构建能够适应变化、支持快速迭代且可靠的软件系统。传统的单体应用架构在面对高并发、大规模部署和复杂业务逻辑时往往力不从心。微服务架构结合容器化技术应运而生&#xff0c;成为现代可扩展系统设计的主流选择。本文将…...

vscode开发stm32,main.c文件中出现很多报错影响开发解决日志

本质上为 .vscode/c_cpp_properties.json文件和Makefile文件中冲突&#xff0c;两者没有同步。 将makefile文件中的内容同步过来即可&#xff0c;下面给出一个json文件的模板&#xff0c;每个人的情况不同&#xff0c;针对性修改即可 {"configurations": [{"na…...

嵌入式鸿蒙系统中水平和垂直以及图片调用方法

利用openharmony操作的具体现象: 第一:Column 作用:沿垂直方向布局的容器。 第二:常用接口 Column(value?: {space?: string | number}) 参数: 参数名参数类型必填参数描述spacestring | number否纵向布局元素垂直方向间距。 从API version 9开始,space为负数或者ju…...

【海康USB相机被HALCON助手连接过后,MVS显示无法连接故障。】

在Halcon里使用助手调用海康USB相机时&#xff0c;如果这个界面点击了【是】 那么恭喜你&#xff0c;相机只能被HALCON调用使用&#xff0c;使用MVS或者海康开发库&#xff0c;将查找不到相机 解决方式&#xff1a; 右键桌面【此电脑】图标 ->选择【管理】 ->选择【设备…...