ABP VNext + Webhook:订阅与异步回调
🚀 ABP VNext + Webhook:订阅与异步回调
📚 目录
- 🚀 ABP VNext + Webhook:订阅与异步回调
- 🎯 一、背景切入:如何优雅地支持第三方回调?
- 🏗 二、系统架构设计
- 🔍 三、核心能力实现
- 3.1 🔐 签名验证(防伪造)
- 接口定义
- 实现示例(Wxpay)
- 3.2 🔄 幂等控制(防重复处理)
- 接口与实现
- 🛠️ 3.3 多厂商处理策略
- 接口
- 策略工厂
- 🔁 四、关键流程图
- 4.1 请求处理流程
- 4.2 重试工作流程
- 🛠️ 3.5 接收控制器(统一入口)
- 📈 五、DevOps & 监控
- 1. Prometheus 指标
- 2. 健康检查
- ✅ 六、测试
- 6.1 单元测试
- 6.1.1 签名校验测试(xUnit)
- 6.1.2 幂等服务并发安全测试
- 6.1.3 重试 Worker 测试
- 6.2 集成测试(Testcontainers)
🎯 一、背景切入:如何优雅地支持第三方回调?
在现代分布式系统中,Webhook 是实现系统解耦和异步通知的重要手段,广泛用于支付通知、审核结果返回、消息推送等场景。但在实践中,我们需要同时解决以下挑战:
- 🔐 安全防护:如何防止伪造请求?
- 🔄 幂等控制:如何避免重复处理同一事件?
- ⚙️ 失败重试:如何确保最终一致性,并避免无限重试?
- 💼 多厂商 & 多通道:如何优雅地支持不同支付/消息通道?
- 📊 可观测 & 可运维:如何快速诊断、监控并手动补偿?
🏗 二、系统架构设计
🔍 三、核心能力实现
3.1 🔐 签名验证(防伪造)
接口定义
public interface ISignatureVerifier
{/// <summary>从安全配置中心获取 Secret</summary>string GetSecret(string provider);/// <summary>签名 Header 名</summary>string HeaderName { get; }bool Verify(string payload, string signature);
}
实现示例(Wxpay)
public class WxSignatureVerifier : ISignatureVerifier, ITransientDependency
{private readonly IDynamicParameterStore _paramStore;public string HeaderName { get; } = "X-Wxpay-Signature";public WxSignatureVerifier(IDynamicParameterStore paramStore)=> _paramStore = paramStore;public string GetSecret(string provider)=> _paramStore.GetOrNullAsync($"Webhook:Secret:{provider}").GetAwaiter().GetResult()?? throw new BusinessException("未配置签名 Secret");public bool Verify(string payload, string signature){var secret = GetSecret("Wxpay");var expected = ComputeHmac(payload, secret);return ConstantTimeEquals(expected, signature);}private static string ComputeHmac(string data, string key){using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));return Convert.ToHexString(hmac.ComputeHash(Encoding.UTF8.GetBytes(data))).ToLowerInvariant();}private static bool ConstantTimeEquals(string a, string b){if (a.Length != b.Length) return false;int diff = 0;for (int i = 0; i < a.Length; i++)diff |= a[i] ^ b[i];return diff == 0;}
}
3.2 🔄 幂等控制(防重复处理)
接口与实现
public interface IIdempotencyService
{Task<bool> IsProcessedAsync(string eventId);Task<bool> TryProcessAsync(string eventId, Func<Task> handler);
}
public class IdempotencyService : IIdempotencyService, ITransientDependency
{private readonly IDistributedCache _cache;private readonly IDistributedLockProvider _lockProvider;public IdempotencyService(IDistributedCache cache,IDistributedLockProvider lockProvider){_cache = cache;_lockProvider = lockProvider;}public async Task<bool> IsProcessedAsync(string eventId)=> await _cache.GetStringAsync(Key(eventId)) != null;public async Task<bool> TryProcessAsync(string eventId, Func<Task> handler){var lockName = $"webhook:lock:{eventId}";var locker = _lockProvider.Create(lockName);using var handle = await locker.TryAcquireAsync(TimeSpan.FromSeconds(5));if (handle == null)return false; // 获取锁失败if (await IsProcessedAsync(eventId))return true; // 已处理// 真正执行业务await handler.Invoke();// 缓存标记await _cache.SetStringAsync(Key(eventId),"1",new DistributedCacheEntryOptions {AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(2)});return true;}private static string Key(string id) => $"webhook:processed:{id}";
}
🛠️ 3.3 多厂商处理策略
接口
public interface IPaymentWebhookHandler : ITransientDependency
{string Provider { get; }Task<WebhookResult> HandleAsync(string payload, IDictionary<string, string> headers);
}
策略工厂
public class WebhookHandlerFactory : ITransientDependency
{private readonly IEnumerable<IPaymentWebhookHandler> _handlers;public WebhookHandlerFactory(IEnumerable<IPaymentWebhookHandler> handlers)=> _handlers = handlers;public IPaymentWebhookHandler Get(string provider)=> _handlers.FirstOrDefault(h =>h.Provider.Equals(provider, StringComparison.OrdinalIgnoreCase))?? throw new BusinessException($"不支持的厂商:{provider}");
}
🔁 四、关键流程图
4.1 请求处理流程
4.2 重试工作流程
🛠️ 3.5 接收控制器(统一入口)
[Route("api/webhooks/payments")]
public class WebhookController : AbpController
{private readonly ISignatureVerifier _verifier;private readonly WebhookHandlerFactory _factory;private readonly IIdempotencyService _idem;private readonly IRepository<WebhookLog, Guid> _logRepo;public WebhookController(ISignatureVerifier verifier,WebhookHandlerFactory factory,IIdempotencyService idem,IRepository<WebhookLog, Guid> logRepo){_verifier = verifier;_factory = factory;_idem = idem;_logRepo = logRepo;}[HttpPost("{provider}")]public async Task<IActionResult> HandleAsync(string provider){// 1️⃣ 读取原始 Bodyusing var sr = new StreamReader(Request.Body);var payload = await sr.ReadToEndAsync();// 2️⃣ 签名校验var signature = Request.Headers[_verifier.HeaderName].FirstOrDefault();if (signature == null || !_verifier.Verify(payload, signature))return Unauthorized(new { code = 1001, message = "Invalid signature" });// 3️⃣ 提取 EventIdstring eventId;try{var obj = JObject.Parse(payload);eventId = obj["eventId"]?.ToString() ?? throw new FormatException();}catch{return BadRequest(new { code = 1002, message = "Invalid payload" });}// 4️⃣ 幂等 & 业务处理var success = await _idem.TryProcessAsync(eventId, async () =>{var handler = _factory.Get(provider);var result = await handler.HandleAsync(payload,Request.Headers.ToDictionary(h => h.Key, h => h.Value.FirstOrDefault()));// 5️⃣ 持久化日志await _logRepo.InsertAsync(new WebhookLog{Provider = provider,Payload = payload,EventId = eventId,RetryCount = 0,Status = result.Success? WebhookStatus.Success: WebhookStatus.Failed});});// 6️⃣ 指标埋点Metrics.WebhookProcessed.WithLabels(provider, success ? "ok" : "duplicate").Inc();return Ok(new { code = 0, message = success ? "OK" : "Duplicate" });}
}
📈 五、DevOps & 监控
1. Prometheus 指标
public static class Metrics{public static readonly Counter WebhookProcessed =Metrics.CreateCounter("webhook_processed_total","Webhook 处理总数",new CounterConfiguration {LabelNames = new [] { "provider", "status" }});}
services.AddPrometheusMetrics();app.UseMetricServer(); // /metricsapp.UseHttpMetrics(); // HTTP 请求指标
2. 健康检查
services.AddHealthChecks().AddCheck<RedisHealthCheck>("redis").AddSqlServer(connStr, name: "sql").AddCheck<CustomWebhookHealthCheck>("webhook_receive");app.UseHealthChecks("/health");
- 容器部署(docker-compose.yml
version: '3.8'services:api:image: yourrepo/webhook-api:latestports:- "5000:80"healthcheck:test: ["CMD", "curl", "-f", "http://localhost/health"]interval: 30sretries: 3redis:image: redis:6db:image: mcr.microsoft.com/mssql/server:2019-latestenvironment:- ACCEPT_EULA=Y- SA_PASSWORD=Your_password123
✅ 六、测试
6.1 单元测试
6.1.1 签名校验测试(xUnit)
public class SignatureVerifierTests
{private readonly ISignatureVerifier _verifier;public SignatureVerifierTests(){// 这里用测试版本的 DynamicParameterStore 返回固定 secretvar paramStore = A.Fake<IDynamicParameterStore>();A.CallTo(() => paramStore.GetOrNullAsync("Webhook:Secret:Wxpay")).Returns(Task.FromResult<string>("test-secret"));_verifier = new WxSignatureVerifier(paramStore);}[Theory][InlineData("payload", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")] // 示例 hashpublic void Verify_ValidSignature_ReturnsTrue(string payload, string signature){var result = _verifier.Verify(payload, signature);Assert.True(result);}[Fact]public void Verify_InvalidSignature_ReturnsFalse(){var result = _verifier.Verify("payload", "bad-signature");Assert.False(result);}
}
6.1.2 幂等服务并发安全测试
public class IdempotencyServiceTests
{[Fact]public async Task TryProcessAsync_FirstConcurrency_OnlyOnceExecuted(){var cache = new MemoryDistributedCache(new OptionsWrapper<MemoryDistributedCacheOptions>(new MemoryDistributedCacheOptions()));var lockProvider = new DefaultDistributedLockProvider(); // 假设已实现var service = new IdempotencyService(cache, lockProvider);int executeCount = 0;Func<Task> handler = async () =>{await Task.Delay(50);Interlocked.Increment(ref executeCount);};// 并发 10 次调用var tasks = Enumerable.Range(0, 10).Select(_ => service.TryProcessAsync("evt-1", handler)).ToArray();await Task.WhenAll(tasks);// handler 只应执行一次Assert.Equal(1, executeCount);}
}
6.1.3 重试 Worker 测试
public class WebhookRetryWorkerTests
{[Fact]public async Task DoWorkAsync_FailedLogs_ExponentialBackoffAndDeadLetter(){// 准备内存仓库var logs = new List<WebhookLog>{new WebhookLog { Id = Guid.NewGuid(), Provider="Wxpay", Payload="p", EventId="1", RetryCount=5, Status=WebhookStatus.Failed }};var repo = new InMemoryRepository<WebhookLog, Guid>(logs);var fakeFactory = A.Fake<WebhookHandlerFactory>();// 模拟每次抛异常A.CallTo(() => fakeFactory.Get(A<string>._)).Returns(new FailingHandler());var worker = new WebhookRetryWorker(repo, fakeFactory);using var cts = new CancellationTokenSource();await worker.DoWorkAsync(cts.Token);var updated = logs.Single();Assert.Equal(WebhookStatus.Dead, updated.Status);Assert.Equal(6, updated.RetryCount);}private class FailingHandler : IPaymentWebhookHandler{public string Provider => "Wxpay";public Task<WebhookResult> HandleAsync(string payload, IDictionary<string, string> headers)=> throw new Exception("fail");}
}
6.2 集成测试(Testcontainers)
public class WebhookIntegrationTests : IAsyncLifetime
{private readonly TestcontainerDatabase _redisContainer;private readonly TestcontainerDatabase _sqlContainer;public WebhookIntegrationTests(){_redisContainer = new TestcontainersBuilder<TestcontainersDatabase>().WithDatabase(new RedisTestcontainerConfiguration()).Build();_sqlContainer = new TestcontainersBuilder<TestcontainersDatabase>().WithDatabase(new MsSqlTestcontainerConfiguration{Password = "Your_password123"}).Build();}public async Task InitializeAsync(){await _redisContainer.StartAsync();await _sqlContainer.StartAsync();// 这里可以动态构建 IConfiguration 并启动 TestServer}public async Task DisposeAsync(){await _redisContainer.DisposeAsync();await _sqlContainer.DisposeAsync();}[Fact]public async Task FullWebhookFlow_ReturnsOk(){// 使用 TestServer 调用 APIvar client = TestWebApplicationFactory.CreateClient(new Dictionary<string, string>{["ConnectionStrings:Redis"] = _redisContainer.ConnectionString,["ConnectionStrings:Default"] = _sqlContainer.ConnectionString});var payload = "{\"eventId\":\"evt-100\",\"data\":{}}";var signature = ComputeTestSignature(payload, "test-secret");var response = await client.PostAsync("/api/webhooks/payments/Wxpay",new StringContent(payload, Encoding.UTF8, "application/json"));Assert.Equal(HttpStatusCode.OK, response.StatusCode);// 再次调用应返回 Duplicateresponse = await client.PostAsync("/api/webhooks/payments/Wxpay",new StringContent(payload, Encoding.UTF8, "application/json"));var json = await response.Content.ReadAsStringAsync();Assert.Contains("Duplicate", json);}private static string ComputeTestSignature(string payload, string secret){using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(secret));return Convert.ToHexString(hmac.ComputeHash(Encoding.UTF8.GetBytes(payload))).ToLowerInvariant();}
}
说明:
- 使用 DotNet.Testcontainers 启动 Redis 和 SQL Server;
- 通过
TestWebApplicationFactory
启动完整 ASP.NET Core 应用;- 验证首次处理和幂等结果。
相关文章:
ABP VNext + Webhook:订阅与异步回调
🚀 ABP VNext Webhook:订阅与异步回调 📚 目录 🚀 ABP VNext Webhook:订阅与异步回调🎯 一、背景切入:如何优雅地支持第三方回调?🏗 二、系统架构设计🔍 三…...

Docker(二):开机自启动与基础配置、镜像加速器优化与疑难排查指南
引言 docker 的快速部署与高效运行依赖于两大核心环节:基础环境搭建与镜像生态优化。本期博文从零开始,系统讲解 docker 服务的管理配置与镜像加速实践。第一部分聚焦 docker 服务的安装、权限控制与自启动设置,确保环境稳定可用;…...

Lua基础语法
文章目录 一、注释二、 数据类型1. 注意事项2. 全局/局部变量 三、 标识符1. 保留字2. 变量3. 动态类型 四、 运算符1. 算术运算符2. 关系运算符3. 逻辑运算符4. 其他运算符 五、 函数1. 固定参函数2. 可变参函数3. 可返回多个值4. 函数作为参数 六、循环控制语句1. while...do…...

2025年渗透测试面试题总结-匿名[实习]安全工程师(安全厂商)(题目+回答)
网络安全领域各种资源,学习文档,以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具,欢迎关注。 目录 一面技术问题 1. Burp插件原理 2. JavaWeb项目经验 3. CC1-7链原理(以CC6为例࿰…...

【node.js】实战项目
个人主页:Guiat 归属专栏:node.js 文章目录 1. 项目概览与架构设计1.1 实战项目:企业级电商管理系统1.2 技术栈选择 2. 项目初始化与基础架构2.1 项目结构设计2.2 基础配置管理 3. 用户服务实现3.1 用户服务架构3.2 用户模型设计3.3 用户服务…...
从AD9361 到 ADSY1100 ,中间的迭代产品历史
从 AD9361 到 ADSY1100 的演进,是 Analog Devices(ADI)在射频收发器(RF Transceiver)集成化、高性能、宽带宽、低功耗和波束赋形能力方面持续推进的一个路线。以下是其中的重要芯片节点和核心参数对比: 1. …...

免费插件集-illustrator插件-Ai插件-查找选中颜色与pantone中匹配颜色
文章目录 1.介绍2.安装3.通过窗口>扩展>知了插件4.功能解释5.总结 1.介绍 本文介绍一款免费插件,加强illustrator使用人员工作效率,实现查找选中颜色与pantone中匹配颜色。首先从下载网址下载这款插件https://download.csdn.net/download/m0_6731…...
redis集合类型
练习命令使用,具体如下: 练习无序集合类型命令 sadd smembers scard srem sinter sunion sdiff sismember srandmember spop 练习有序集合类型命令 无序集合中的每个元素都是不同的,且没有顺序 创建/追加/删除/查看 127.0.0.1:6379>…...

[爬虫实战] 爬微博图片:xpath的具体运用
博客配套代码发布于github:微博图片 相关知识点:图片懒加载 [爬虫知识] 数据解析 相关爬虫专栏:JS逆向爬虫实战 爬虫知识点合集 爬虫实战案例 这里我们以网页微博图片为例,尝试获取该页面下所有图片并保存。 一、分析网站 刷…...

MySQL中简单的操作
一.数据库 1.1数据库的建立: create database 库名; 1.2数据库的查看: show databases; 1.3数据库的删除: drop database 库名; 二.数据库中的表 2.1表的建立: create table 表名&…...
NNG和DDS
NNG (Nanomsg Next Generation) 和 DDS (Data Distribution Service) 是两种不同的通信协议,各自在不同场景下具有其优势。下面我将对这两种技术进行详细解释,并通过具体的例子来说明它们如何应用在实际场景中。 1. NNG (Nanomsg Next Generation) NNG简…...

防震基座在半导体晶圆制造设备抛光机详细应用案例-江苏泊苏系统集成有限公司
在半导体制造领域,晶圆抛光作为关键工序,对设备稳定性要求近乎苛刻。哪怕极其细微的振动,都可能对晶圆表面质量产生严重影响,进而左右芯片制造的成败。以下为您呈现一个防震基座在半导体晶圆制造设备抛光机上的经典应用案例。 企…...
框架开发与原生开发的权衡:React案例分析(原生JavaScript)
文章目录 框架开发与原生开发的权衡:React案例分析引言框架开发的优势开发效率提升状态管理的便捷性组件复用与生态系统团队协作与规范统一 原生开发的优势性能优化空间学习曲线平缓精细控制与定制化避免版本依赖与迁移成本 实际应用案例分析大型企业应用性能关键型…...

Lua5.4.2常用API整理记录
一、基础函数 1.type(value) 返回值的类型(如 "nil", "number", "string", "table", "function" 等)。 代码测试: a 0 print(type(a)) a nil print(type(a)) a "aaaaaaaa&…...

Python打卡训练营学习记录Day36
仔细回顾一下神经网络到目前的内容,没跟上进度的同学补一下进度。 作业:对之前的信贷项目,利用神经网络训练下,尝试用到目前的知识点让代码更加规范和美观。 import pandas as pd #用于数据处理和分析,可处理表格数…...
### Mac电脑推送文件至Gitee仓库步骤详解
**核心流程及命令说明:** #### 1. **配置全局Git用户信息** bash git config --global user.name "shenguanling" git config --global user.email "3259125968qq.com" - **作用**:设置提交代码时的作者信息࿰…...
官方SDK停更后的选择:开源维护的Bugly Unity SDK
腾讯Bugly,为移动开发者提供专业的异常上报和运营统计,帮助开发者快速发现并解决异常,同时掌握产品运营动态,及时跟进用户反馈。 但是,免费版的Unity SDK已经很久不更新了,会有一些问题和特性缺失ÿ…...

什么是智能体agent?
文章目录 什么是智能体agent?最基本的核心思想我们是如何走到今天以及为什么是现在如何从思维上剖析“一个智能体系统”痛苦的教训结论 什么是智能体agent? 原文链接:https://windsurf.com/blog/what-is-an-agent 本文探讨了AI智能体的核心概…...
【多线程】Java 实现方式及其优缺点
以下是 Java 多线程实现方式及其优缺点的详细说明: 一、Java 多线程核心实现方式 1. 继承 Thread 类 public class MyThread extends Thread {Overridepublic void run() {System.out.println("Thread running: " Thread.currentThread().getName());}…...

Obsidian 数据可视化深度实践:用 DataviewJS 与 Charts 插件构建智能日报系统
Obsidian 数据可视化深度实践:用 DataviewJS 与 Charts 插件构建智能日报系统 一、核心架构解析 本系统基于 Obsidian 的 DataviewJS 和 Charts 插件,实现日报数据的自动采集、可视化分析及智能回溯功能(系统架构原理见)。其技术…...
Three.js 海量模型加载性能优化指南
一、性能瓶颈分析 1.1 常见性能杀手 问题类型典型表现影响范围Draw Call 爆炸每帧渲染调用超过1000次GPU 渲染性能内存占用过高浏览器进程内存突破1GB加载速度/崩溃风险模型文件过大单个GLB文件超过50MB网络传输时间几何数据冗余重复模型独立加载CPU/GPU资源浪费 1.2 性能监…...

6.4.3_有向无环图描述表达式
有向无环图: 有向图中不存在环即为有向无环图DAG图,即如下V0->V4->v3->V0或者V4->V1->v4就存在环不是有向无环图,即在一个路径中一个顶点不能出现2次? DAG描述表达式: 算术表达式用树来表示࿰…...
力扣第157场双周赛
1. 最大质数子字符串之和 给定一个字符串 s,找出可以由其 子字符串 组成的 3个最大的不同质数 的和。 返回这些质数的 总和 ,如果少于 3 个不同的质数,则返回 所有 不同质数的和。 质数是大于 1 且只有两个因数的自然数:1和它本身…...
青少年编程与数学 02-019 Rust 编程基础 19课题、项目发布
青少年编程与数学 02-019 Rust 编程基础 19课题、项目发布 一、准备工作1. 创建和配置项目2. 编写代码和测试3. 文档注释 二、构建发布版本1. 构建优化后的可执行文件2. 静态链接(可选) 三、发布到 crates.io1. Crates.io核心功能使用方法特点最新动态 2…...

【HarmonyOS Next之旅】DevEco Studio使用指南(二十五) -> 端云一体化开发 -> 业务介绍(二)
目录 1 -> 工作原理 2 -> 约束与限制 2.1 -> 支持的设备 2.2 -> 支持的国家/地区 2.3 -> 支持的签名方式 3 -> 总结 3.1 -> 关键功能与工具 3.2 -> 开发流程 3.3 -> 典型场景与优化 3.4 -> 常见问题与解决 3.5 -> 总结 1 -> 工…...
LLaMA-Factory 微调模型与训练数据量对应关系
在使用LLaMA-Factory的LoRA技术微调1.5B和7B模型时,数据量需求主要取决于任务类型、数据质量以及模型规模。以下是基于现有研究和实践的具体分析: 一、数据量需求的核心影响因素 模型规模与数据量的关系 通常情况下,模型参数越多(…...
数据库与Redis数据一致性解决方案
在写数据时保证 Redis 和数据库数据一致,可采用以下方案,需根据业务场景权衡选择: 1. 先更新数据库,再更新 Redis 步骤: 写入 / 更新数据库数据。删除或更新 Redis 缓存。适用场景:读多写少,对缓存一致性要求不高(短暂不一致可接受)。风险:若第二步失败,导致缓存与…...

Spring Boot AI 之 Chat Client API 使用大全
ChatClient提供了一套流畅的API用于与AI模型交互,同时支持同步和流式两种编程模型。 流畅API包含构建Prompt组成元素的方法,这些Prompt将作为输入传递给AI模型。从API角度来看,Prompt由一系列消息组成,其中包含指导AI模型输出和行为的指令文本。 AI模型主要处理两类消息: …...

分身空间:手机分身多开工具,轻松实现多账号登录
分身空间是一款功能强大的手机分身多开工具APP,专为需要同时登录多个账号的用户设计。它支持多开各种游戏和软件,让用户可以轻松实现多账号同时在线,提升使用效率和体验。无论是社交软件、游戏还是办公应用,分身空间都能帮助你轻松…...

音视频之视频压缩及数字视频基础概念
系列文章: 1、音视频之视频压缩技术及数字视频综述 一、视频压缩编码技术综述: 1、信息化与视频通信: 什么是信息: 众所周知,人类社会的三大支柱是物质、能量和信息。具体而言,农业现代化的支柱是物质&…...