当前位置: 首页 > news >正文

非阻塞重试与 Spring Kafka 的集成测试

        如何为启用重试和死信发布的消费者的 Spring Kafka 实现编写集成测试。

Kafka 非阻塞重试

Kafka 中的非阻塞重试是通过为主主题配置重试主题来完成的。如果需要,还可以配置其他死信主题。如果所有重试均已用尽,事件将转发至 DLT。公共领域提供了大量资源来了解技术细节。 

要测试什么?

在代码中为重试机制编写集成测试时,这可能是一项具有挑战性的工作。 

  • 如何测试该事件是否已重试所需的次数? 
  • 如何测试仅在发生某些异常时才执行重试,而对于其他异常则不执行重试?
  • 如果上次重试中异常已解决,如何测试是否未进行另一次重试?
  • 在(n-1)次重试尝试失败后,如何测试重试中的第n次尝试是否成功?
  • 当所有重试尝试都用完后,如何测试事件是否已发送到死信队列?

让我们看一些代码。您可以找到很多很好的文章,展示如何使用 Spring Kafka 设置非阻塞重试。下面给出了一种这样的实现。这是使用Spring-Kafka 的@RetryableTopic@DltHandler  注释来完成的。

设置可重试消费者

@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {private final CustomEventHandler handler;@RetryableTopic(attempts = "${retry.attempts}",backoff = @Backoff(delayExpression = "${retry.delay}",multiplierExpression = "${retry.delay.multiplier}"),topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,dltStrategy = FAIL_ON_ERROR,autoStartDltHandler = "true",autoCreateTopics = "false",include = {CustomRetryableException.class})@KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {try {log.info("Received event on topic {}", topic);handler.handleEvent(event);} catch (Exception e) {log.error("Error occurred while processing event", e);throw e;}}@DltHandlerpublic void listenOnDlt(@Payload CustomEvent event) {log.error("Received event on dlt.");handler.handleEventFromDlt(event);}}

如果您注意到上面的代码片段,include参数包含CustomRetryableException.class. 这告诉使用者仅在该方法抛出 CustomRetryableException 时才重试CustomEventHandler#handleEvent。您可以根据需要添加任意数量。还有一个排除参数,但一次可以使用其中任何一个参数。

${retry.attempts}在发布到 DLT 之前,事件处理应重试最多次数。

设置测试基础设施

要编写集成测试,您需要确保拥有一个正常运行的 Kafka 代理(首选嵌入式)和一个功能齐全的发布者。让我们设置我们的基础设施:

@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,brokerProperties = {"listeners=" + "${kafka.broker.listeners}", "port=" + "${kafka.broker.port}"},controlledShutdown = true,topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {@Autowiredprivate KafkaTemplate<String, CustomEvent> testKafkaTemplate;// tests}

** 配置是从 application-test.yml 文件导入的。

使用嵌入式 kafka 代理时,重要的是要提及要创建的主题。它们不会自动创建。在本例中,我们创建四个主题,即 

"test", "test-retry-0", "test-retry-1", "test-dlt"

我们已将最大重试尝试次数设置为 3 次。每个主题对应于每次重试尝试。因此,如果 3 次重试都用尽,则应将事件转发到 DLT。

测试用例

如果第一次尝试消费成功,则不应重试。

CustomEventHandler#handleEvent这可以通过该方法仅被调用一次的事实来测试。还可以添加对 Log 语句的进一步测试。

    @Testvoid test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}

如果引发不可重试的异常,则不应重试。

在这种情况下,该CustomEventHandler#handleEvent方法应该只调用一次:

    @Testvoid test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}

如果抛出 a,则重试配置的最大次数RetryableException,并在重试用完后将其发布到死信主题。

在这种情况下,该CustomEventHandler#handleEvent方法应被调用三次(maxRetries)次,并且CustomEventHandler#handleEventFromDlt该方法应被调用一次。

    @Testvoid test_should_retry_maximum_times_and_publish_to_dlt_if_retryable_exception_raised() throws ExecutionException, InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoThrow(CustomRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(10000).times(maxRetries)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(1)).handleEventFromDlt(any(CustomEvent.class));}

**在验证阶段添加了相当大的超时,以便在测试完成之前可以考虑指数退避延迟。这很重要,如果设置不当可能会导致断言失败。

应该重试直到RetryableException解决,并且如果引发不可重试的异常或消费最终成功,则不应继续重试。

测试已设置为RetryableException先抛出 a 然后再抛出 a NonRetryable exception,以便重试一次。

    @Testvoid test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}@Testvoid test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}

结论

因此,您可以看到集成测试是策略、超时、延迟和验证的混合和匹配,以确保 Kafka 事件驱动架构的重试机制万无一失。

相关文章:

非阻塞重试与 Spring Kafka 的集成测试

如何为启用重试和死信发布的消费者的 Spring Kafka 实现编写集成测试。 Kafka 非阻塞重试 Kafka 中的非阻塞重试是通过为主主题配置重试主题来完成的。如果需要&#xff0c;还可以配置其他死信主题。如果所有重试均已用尽&#xff0c;事件将转发至 DLT。公共领域提供了大量资…...

基于 Debian 12 的MX Linux 23 正式发布!

导读MX Linux 是基于 Debian 稳定分支的面向桌面的 Linux 发行&#xff0c;它是 antiX 及早先的 MEPIS Linux 社区合作的产物。它采用 Xfce 作为默认桌面环境&#xff0c;是一份中量级操作系统&#xff0c;并被设计为优雅而高效的桌面与如下特性的结合&#xff1a;配置简单、高…...

Nginx代理功能与负载均衡详解

序言 Nginx的代理功能与负载均衡功能是最常被用到的&#xff0c;关于nginx的基本语法常识与配置已在上篇文章中有说明&#xff0c;这篇就开门见山&#xff0c;先描述一些关于代理功能的配置&#xff0c;再说明负载均衡详细。 Nginx代理服务的配置说明 1、上一篇中我们在http…...

部署问题集合(特辑)虚拟机常用命令

基础 查看ip&#xff1a;ip addr或ipconfig压缩&#xff1a;tar -zcvf redis-3.2.8.tar.gz redis-3.2.8/ 注意&#xff1a;-zcvf对应gz&#xff0c;-vcf对应tar 解压&#xff1a;tar -zxvf redis-3.2.8.tar.gz压缩zip&#xff1a;zip nginx.zip nginx.txt nginx2.txt解压zip&a…...

【Git】如何将本地文件进行Git仓库归档

Git 全局设置 git config --global user.name "mcihael" git config --global user.email "michael520.com"创建新版本库 git clone gitcode.xxxxxx.git cd branch-name touch README.md git add README.md git commit -m "add README" git pu…...

uniapp 使用腾讯视频 的 坑

1. 版本号的问题 注意 1.X.X不维护了 &#xff0c; 需要升级要 2.X.X 2. 官网的 组件事件 调用需要去掉bind 才能调用 官网地址&#xff1a;腾讯视频 | 小程序插件 | 微信公众平台...

LinkedList

LinkedList的模拟实现&#xff08;底层是一个双向链表&#xff09;LinkedList使用 LinkedList的模拟实现&#xff08;底层是一个双向链表&#xff09; 无头双向链表&#xff1a;有两个指针&#xff1b;一个指向前一个节点的地址&#xff1b;一个指向后一个节点的地址。 节点定…...

创作新纪元:知乎、阅文加码AI大模型,撬动创作者经济

输入几个关键词就能生成一篇文章、一篇新闻、一篇小说&#xff0c;ChatGPT自诞生以来文本生成能力一直备受赞誉&#xff0c;ChatGPT要替代记者、编辑、作家的言论愈演愈烈&#xff0c;甚至有一些互联网企业宣布砍掉记者、编辑、文案等岗位全面拥抱AIGC。 目前ChatGPT是否会全面…...

PAT(Advanced Level) Practice(with python)——1067 Sort with Swap(0, i)

Code # 输入有毒&#xff0c;需避坑 # N int(input()) L list(map(int,input().split())) N L[0] L L[1:] res 0 for i in range(1,N):while L[0]!0:# 把所有不在正常位置下的数换到正常t L[0]L[0],L[t] L[t],L[0]res1if L[i]!i:# 换完全后如果对应位置下的数不是目标…...

Python爬取斗罗大陆全集

打开网址http://www.luoxu.cc/dmplay/C888H-1-265.html F12打开Fetch/XHR&#xff0c;看到m3u8&#xff0c;ts&#xff0c;一眼顶真&#xff0c;打开index.m3u8 由第一个包含第二个index.m3u8的地址&#xff0c;ctrlf在源代码中一查index&#xff0c;果然有&#xff0c;不过/…...

前馈神经网络解密:深入理解人工智能的基石

目录 一、前馈神经网络概述什么是前馈神经网络前馈神经网络的工作原理应用场景及优缺点 二、前馈神经网络的基本结构输入层、隐藏层和输出层激活函数的选择与作用网络权重和偏置 三、前馈神经网络的训练方法损失函数与优化算法反向传播算法详解避免过拟合的策略 四、使用Python…...

顺序栈Sequential-stack

0、节点结构体定义 typedef struct SqStack{int *base;int *top; } SqStack; 1、初始化 bool InitStack(SqStack &S) {S.base new int[Maxsize]; //eg. #define Maxsize 100if(!S.base){return false;}S.top S.base;return true; } 2、入栈 bool Push(SqStack &…...

关于工牌(必须5-10个字)

今天蹲坑&#xff0c;低头看了下工牌觉得挺有意思&#xff1a;我从啥时候起也不排斥将工牌挂在脖子上了&#xff1f; 工牌&#xff0c;一个标识。不仅标识了你&#xff0c;也标识了你所在的群体。如果你认可这个群体&#xff0c;佩戴它那是一种荣誉、荣耀&#xff1b;如果你不…...

PHP混淆加密以及常用的一些加密工具

PHP混淆加密是一种将源代码转换为难以理解和阅读的方式&#xff0c;以保护代码的安全性。以下是一些常见的PHP混淆加密方法&#xff1a; 代码压缩&#xff1a;使用代码压缩工具&#xff08;如UglifyJS&#xff09;将PHP代码压缩为一行&#xff0c;去除空格、换行符等可读性的字…...

无涯教程-PHP - ereg()函数

ereg() - 语法 int ereg(string pattern, string originalstring, [array regs]); ereg()函数在string指定的字符串中搜索pattern指定的字符串&#xff0c;如果找到pattern&#xff0c;则返回true&#xff0c;否则返回false。搜索对于字母字符区分大小写。 可选的输入参数re…...

【Ubuntu】简洁高效企业级日志平台后起之秀Graylog

简介 Graylog 是一个用于集中式日志管理的开源平台。在现代数据驱动的环境中&#xff0c;我们需要处理来自各种设备、应用程序和操作系统的大量数据。Graylog提供了一种方法来聚合、组织和理解所有这些数据。它的核心功能包括流式标记、实时搜索、仪表板可视化、告警触发、内容…...

TCP特点UDP编程

目录 1、tcp协议和udp协议 2、多线程并发和多进程并发&#xff1a; &#xff08;1&#xff09;多进程并发服务端 &#xff08;2&#xff09;多进程并发客户端&#xff1a; 3、tcp: 4、粘包 5、UDP协议编程流程 (1)服务器端&#xff1a; (2)客户端&#xff1a; 6、tcp状…...

超级计算机

超级计算机是一种高性能计算机&#xff0c;它能够以极高的速度执行大规模的计算任务。超级计算机通常由数千个甚至数百万个处理器组成&#xff0c;这些处理器能够同时处理大量的数据&#xff0c;从而实现高效的计算。超级计算机广泛应用于科学、工程、金融、天气预报等领域&…...

LeetCode863. 二叉树中所有距离为 K 的结点(相关话题:深度遍历,广度遍历)

题目描述 给定一个二叉树(具有根结点 root), 一个目标结点 target ,和一个整数值 k 。 返回到目标结点 target 距离为 k 的所有结点的值的列表。 答案可以以 任何顺序 返回。 示例 1: 输入:root = [3,5,1,6,2,0,8,null,null,7,4], target = 5, k = 2 输出:[7,4,1] 解释…...

Kotlin 基础学习

NULL检查机制 Kotlin的空安全设计对于声明可为空的参数&#xff0c;在使用是进行空判断处理&#xff0c;有两种处理方式&#xff0c;字段后加 !! 像 java 一样抛出空异常&#xff0c;另外字段后面加 ? 可不做处理返回值为 null 或者配合 ?: 做空判断处理。 //类型后面加 ? 表…...

Qwen3-0.6B-FP8入门必看:6亿参数如何做到≤2GB显存?FP8量化压缩深度解析

Qwen3-0.6B-FP8入门必看&#xff1a;6亿参数如何做到≤2GB显存&#xff1f;FP8量化压缩深度解析 你是不是也遇到过这种情况&#xff1a;想在自己的电脑上跑个大模型试试&#xff0c;结果一看显存要求&#xff0c;动辄十几GB&#xff0c;直接劝退&#xff1f;或者好不容易找到一…...

手把手教你用4G Cat.1 bis开发智能硬件:从电路设计到低功耗优化的完整实战

4G Cat.1 bis智能硬件开发实战&#xff1a;从电路设计到低功耗优化的全流程指南 在共享充电宝扫码即用的便利背后&#xff0c;隐藏着一场关于低功耗通信的技术革命。当传统4G模块因高功耗让硬件开发者束手无策时&#xff0c;4G Cat.1 bis以单天线设计、10Mbps传输速率和μA级待…...

电工必看:正弦交流电路中的相量法实战技巧(附计算示例)

电工必看&#xff1a;正弦交流电路中的相量法实战技巧&#xff08;附计算示例&#xff09; 在电气工程领域&#xff0c;正弦交流电路的分析是每位电工和电气工程师必须掌握的核心技能。面对复杂的电路计算&#xff0c;传统的三角函数解析法往往让从业者陷入繁琐的运算泥潭。相量…...

用格子玻尔兹曼方法 - 浸没边界法模拟圆柱绕流(LBM - IBM in C++)

格子玻尔兹曼方法-浸没边界法模拟圆柱绕流 LBM- IBM &#xff08;C&#xff09;在计算流体力学&#xff08;CFD&#xff09;的领域里&#xff0c;格子玻尔兹曼方法&#xff08;Lattice Boltzmann Method, LBM&#xff09;和浸没边界法&#xff08;Immersed Boundary Method, IB…...

苹果内购Java后端避坑指南:沙盒测试、凭据验证与订单防重的那些事儿

苹果内购Java后端避坑指南&#xff1a;沙盒测试、凭据验证与订单防重的那些事儿 第一次对接苹果应用内购&#xff08;IAP&#xff09;时&#xff0c;我以为按照官方文档走完流程就万事大吉了。直到凌晨三点收到服务器告警——重复充值、验证超时、沙盒环境漏测等问题接踵而至。…...

从零开始:SpaCy安装与模型下载的完整流程(含版本查询技巧)

从零开始&#xff1a;SpaCy安装与模型下载的完整流程&#xff08;含版本查询技巧&#xff09; 自然语言处理&#xff08;NLP&#xff09;正在改变我们与计算机交互的方式&#xff0c;而SpaCy作为这一领域的明星工具库&#xff0c;以其高效性和易用性赢得了众多开发者的青睐。无…...

FFXIV国际服中文补丁解决方案:零基础上手实战指南

FFXIV国际服中文补丁解决方案&#xff1a;零基础上手实战指南 【免费下载链接】FFXIVChnTextPatch 项目地址: https://gitcode.com/gh_mirrors/ff/FFXIVChnTextPatch 你是否曾在《最终幻想XIV》国际服中因语言障碍错失关键剧情&#xff1f;是否因英文界面降低了游戏沉浸…...

pyNastran:工程仿真领域的Python变革者——打破商业软件垄断的技术突围

pyNastran&#xff1a;工程仿真领域的Python变革者——打破商业软件垄断的技术突围 【免费下载链接】pyNastran A Python-based interface tool for Nastrans file formats 项目地址: https://gitcode.com/gh_mirrors/py/pyNastran 价值定位&#xff1a;重新定义工程仿真…...

Android10音频系统实战:如何自定义音量曲线(附default_volume_tables.xml修改指南)

Android 10音频系统深度定制&#xff1a;音量曲线调优实战手册 在移动设备音频体验的精细打磨中&#xff0c;音量曲线的定制往往是最容易被忽视却至关重要的环节。作为一名长期从事Android系统定制的开发者&#xff0c;我曾为多款旗舰设备调整过音频参数&#xff0c;发现原厂音…...

Phi-4-Reasoning-Vision简单调用:Python API封装与REST接口调用示例

Phi-4-Reasoning-Vision简单调用&#xff1a;Python API封装与REST接口调用示例 1. 项目概述 Phi-4-Reasoning-Vision是基于微软Phi-4-reasoning-vision-15B多模态大模型开发的高性能推理工具&#xff0c;专为双卡4090环境优化。该工具严格遵循官方SYSTEM PROMPT规范&#xf…...