kafka动态监听主题
简单版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);containerProperties.setMessageListener((MessageListener<String, String>) record -> {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}
手动ack版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置消息监听器为 AcknowledgingMessageListenercontainerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {try {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());// 模拟消息处理逻辑// 处理完成后手动确认消息if (ack != null) {ack.acknowledge();}} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}
批量处理版本
@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
可关闭版本
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;// 用于保存每个主题对应的监听器容器private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();/*** 开启一个监听*/public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();// 将监听器容器保存到 map 中containerMap.put(topic, container);}/*** 关闭一个监听*/public void stopListener(String topic) {ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);if (container != null && container.isRunning()) {container.stop();// 从 map 中移除已停止的监听器容器containerMap.remove(topic);}}
}
调用添加监听
/*** 配置详情*/@GetMapping("/getModelZdyConfInfo")public String getModelZdyConfInfo(String topic) {dynamicKafkaListenerService.registerListener(topic);return "添加" + topic + "监听成功";}
相关文章:
kafka动态监听主题
简单版本 import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.Containe…...
【PHP的static】
关于静态属性 最简单直接:静态方法也是一样 看了很多关于静态和动态的说法,无非是从 调用方式, 类访问实例变量, 访问静态变量, 需不要实例化这几个方向,太空了。问使用场景,好一点的 能说个…...
国产编辑器EverEdit - 光标位置跳转
1 光标位置跳转 1.1 应用场景 某些场景下,用户从当前编辑位置跳转到别的位置查阅信息,如果要快速跳转回之前编辑位置,则可以使用光标跳转相关功能。 1.2 使用方法 1.2.1 上一个编辑位置 跳转到上一个编辑位置,即文本修改过的位…...
cv2.Sobel
1. Sobel 算子简介 Sobel 算子是一种 边缘检测算子,通过对图像做梯度计算,可以突出边缘。 Sobel X 方向卷积核: 用于计算 水平方向(x 方向) 的梯度。 2. 输入图像示例 假设我们有一个 55 的灰度图像,像素…...
51单片机俄罗斯方块整行消除函数
/************************************************************************************************************** * 名称:flash * 功能:行清除动画 * 参数:NULL * 返回:NULL * 备注: * 采用非阻塞延时࿰…...
鸿蒙HarmonyOS NEXT开发:优化用户界面性能——组件复用(@Reusable装饰器)
文章目录 一、概述二、原理介绍三、使用规则四、复用类型详解1、标准型2、有限变化型2.1、类型1和类型2布局不同,业务逻辑不同2.2、类型1和类型2布局不同,但是很多业务逻辑公用 3、组合型4、全局型5、嵌套型 一、概述 组件复用是优化用户界面性能&#…...
langchain系列(二)- 提示词以及模板
导读 环境:OpenEuler、Windows 11、WSL 2、Python 3.12.3 langchain 0.3 背景:前期忙碌的开发阶段结束,需要沉淀自己的应用知识,过一遍LangChain 时间:20250212 说明:技术梳理 提示词模板理论说明 提…...
Openssl的使用,CA证书,中间证书,服务器证书的生成与使用
证书教程 1、Openssl相关文档2、生成证书命令初步解释3、准备openssl的配置文件 openssl.cnf4、证书生成4.1、生成根证书、CA根证书、自签名证书4.2、生成服务器证书4.3、生成中间证书4.3、使用中间证书生成服务器证书5、使用openssl操作证书5.1 查看证书内容5.2 进行证书测试5…...
深入浅出:Python 中的异步编程与协程
引言 大家好,今天我们来聊聊 异步编程 和 协程,这是近年来编程语言领域中的热点话题之一,尤其在 Python 中,它作为一种全新的编程模型,已经成为处理 IO密集型 任务的强力工具。尽管很多人对异步编程望而却步࿰…...
Windows中使用Docker安装Anythingllm,基于deepseek构建自己的本地知识库问答大模型,可局域网内多用户访问、离线运行
文章目录 Windows中使用Docker安装Anythingllm,基于deepseek构建自己的知识库问答大模型1. 安装 Docker Desktop2. 使用Docker拉取Anythingllm镜像2. 设置 STORAGE_LOCATION 路径3. 创建存储目录和 .env 文件.env 文件的作用关键配置项 4. 运行 Docker 命令docker r…...
Unity使用iTextSharp导出PDF-04图形
坐标系 pdf文档页面的原点(0,0)在左下角,向上为y,向右为x。 文档的PageSize可获取页面的宽高数值 单位:像素 绘制矢量图形 使用PdfContentByte类进行绘制,注意文档打开后才有此对象的实例。 绘制方法 …...
[SAP ABAP] OO ALV报表练习1
销售订单明细查询报表 业务目的:根据选择屏幕的筛选条件,使用 ALV 报表,显示销售订单详情 效果展示 用户的输入条件界面 用户的查询结果界面 涉及的主要功能点: 1.当在销售订单明细查询页面取不到任何数据时,在选择…...
安卓基础(第一集)
SharedPreferences(本地存储简单数据) 在 Android 中,SharedPreferences 用于存储小型数据。 (1)存储数据 // 获取 SharedPreferences 对象 SharedPreferences sharedPreferences getSharedPreferences("MyPre…...
数据库高安全—数据保护:数据动态脱敏
书接上文数据库高安全—审计追踪:传统审计&统一审计,从传统审计和统一审计两方面对高斯数据库的审计追踪技术进行解读,本篇将从数据动态脱敏方面对高斯数据库的数据保护技术进行解读。 5.1 数据动态脱敏 数据脱敏,顾名思义就…...
Datawhale 数学建模导论二 2025年2月
第6章 数据处理与拟合模型 本章主要涉及到的知识点有: 数据与大数据Python数据预处理常见的统计分析模型随机过程与随机模拟数据可视化 本章内容涉及到基础的概率论与数理统计理论,如果对这部分内容不熟悉,可以参考相关概率论与数理统计的…...
ArcGIS Enterprise 与 ArcGIS Online 的关系
ArcGIS Enterprise 和 ArcGIS Online 是 Esri 提供的两款核心产品,它们在功能、部署方式和使用场景上存在显著差异,但同时也有一定的联系和互补性。以下是关于这两款产品的详细关系说明: 1. 产品定位与功能 ArcGIS Enterprise 是一款企业级解决方案,支持在组织的基础设施上…...
ASP.NET Core SignalR实践指南
Hub类的生命周期是瞬态的,每次调用集线器的时候都会创建一个新的Hub类实例,因此不要在Hub类中通过属性、成员变量等方式保存状态。如果服务器的压力比较大,建议把ASP.NET Core程序和SignalR服务器端部署到不同服务器上,以免它们互…...
【力扣 - 简单题】88. 合并两个有序数组
题目:88. 合并两个有序数组 - 力扣(LeetCode) 解题: class Solution { public:void merge(vector<int>& nums1, int m, vector<int>& nums2, int n) {for (int i m; i < n m; i ){nums1[i] nums2[i -…...
【密评】 | 商用密码应用安全性评估从业人员考核题库(23)
在GM/T0048《智能密码钥匙密码检测规范》中,产品的对称算法性能应满足哪个标准中的要求()。 A.GM/T 0016《智能密码钥匙密码应用接口规范》 B.GM/T 0017《智能密码钥匙密码应用接口数据格式规范》 C.GM/T 0027《智能密码钥匙技术规范》 D.GM/T 0028《密码模块安全技术要求》…...
记录 | WPF基础学习MVVM例子讲解1
目录 前言一、NotificationObject与数据属性创建个类,声明NotificationObject 二、DelegateCommand与命令属性三、View与ViewModel的交互(难点)在ViewModel文件下创建MainWindowViewModel数据和方法绑定资源指定 代码下载四、优势体现代码下载…...
PyTorch 中 `torch.cuda.amp` 相关警告的解决方法
在最近的写代码过程中,遇到了两个与 PyTorch 的混合精度训练相关的警告信息。这里随手记录一下。 警告内容 警告 1: torch.cuda.amp.autocast FutureWarning: torch.cuda.amp.autocast(args...) is deprecated. Please use torch.amp.autocast(cuda, args...) i…...
实验7 路由器之间IPsec VPN配置
实验7 路由器之间IPsec VPN配置 1.实验目的 通过在两台路由器之间配置IPsec VPN连接,掌握IPsec VPN配置方法,加深对IPsec协议的理解。 2.实验内容 (1)按照实验拓扑搭建实验环境。 (2)在路由器R1和R4配置IP…...
Unity中快速制作2D沙雕动画:流程编
Unity中快速制作2D沙雕动画(搞笑/无厘头风格),通过以下方案实现低成本、高成效的开发流程,结合夸张的动作、滑稽的物理效果和魔性音效: 1. 角色与素材设计 核心原则:丑萌即正义,越怪越好&#…...
小白零基础如何搭建CNN
1.卷积层 在PyTorch中针对卷积操作的对象和使用的场景不同,如有1维卷积、2维卷积、 3维卷积与转置卷积(可以简单理解为卷积操作的逆操作),但它们的使用方法比较相似,都可以从torch.nn模块中调用,需要调用的…...
【Java八股文】01-Java基础面试篇
【Java八股文】01-Java基础面试篇 概念Java特点Java为什么跨平台JVM、JDK、JRE关系 面向对象什么是面向对象,什么是封装继承多态?多态体现的方面面向对象设计原则重载重写的区别抽象类和实体类区别Java抽象类和接口的区别抽象类可以被实例化吗 深拷贝浅拷…...
k8s部署logstash
1. 编写logstash.yaml配置文件 --- apiVersion: v1 kind: Service metadata:name: logstash spec:type: ClusterIPclusterIP: Noneports:- name: logstash-tcpport: 5000targetPort: 5000- name: logstash-beatsport: 5044targetPort: 5044- name: logstash-apiport: 9600targ…...
Arcgis/GeoScene API for JavaScript 三维场景底图网格设为透明
项目场景: 有时候加载的地图服务白色区域会露底,导致在三维场景时,露出了三维网格,影响效果,自此,我们需要将三维场景的底图设为白色或透明。 问题描述 如图所示: 解决方案: 提示…...
《qt open3d网格拉普拉斯平滑》
qt open3d网格拉普拉斯平滑 效果展示二、流程三、代码效果展示 二、流程 创建动作,链接到槽函数,并把动作放置菜单栏 参照前文 三、代码 1、槽函数实现 void on_actionFilterLaplacian_triggered();void MainWindow::on_actionFil...
怎么选择免费的SEO排名工具
随着2025年互联网的迅猛发展,越来越多的企业意识到,拥有一个高排名的网站对于品牌曝光和吸引客户至关重要。尤其是通过SEO(搜索引擎优化),可以提高网站在搜索引擎中的排名,进而带来更多的自然流量ÿ…...
缓存技术介绍
缓存技术是一种用于提高数据访问速度的技术,通过在快速存储介质(如内存)中保存频繁访问的数据,从而减少对较慢存储介质(如硬盘)的访问次数。缓存可以显著提高系统性能,尤其是在处理大量数据或进…...
