kafka动态监听主题
简单版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);containerProperties.setMessageListener((MessageListener<String, String>) record -> {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}
手动ack版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置消息监听器为 AcknowledgingMessageListenercontainerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {try {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());// 模拟消息处理逻辑// 处理完成后手动确认消息if (ack != null) {ack.acknowledge();}} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}
批量处理版本
@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
可关闭版本
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;// 用于保存每个主题对应的监听器容器private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();/*** 开启一个监听*/public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();// 将监听器容器保存到 map 中containerMap.put(topic, container);}/*** 关闭一个监听*/public void stopListener(String topic) {ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);if (container != null && container.isRunning()) {container.stop();// 从 map 中移除已停止的监听器容器containerMap.remove(topic);}}
}
调用添加监听
/*** 配置详情*/@GetMapping("/getModelZdyConfInfo")public String getModelZdyConfInfo(String topic) {dynamicKafkaListenerService.registerListener(topic);return "添加" + topic + "监听成功";}
相关文章:
kafka动态监听主题
简单版本 import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.Containe…...
Python虚拟环境管理工具 pyenv
情景 我现在在部署一个python 项目,需要用到Python 3.10。但是我本地已经有了一个3.12解释器,有没有一种方法,可以管理python 环境,还可以随意切换。怎么做? window 安装pyenv-win 使用 PowerShell(以管…...
网络安全产品架构图 网络安全相关产品
一、信息安全产品分类 背景 美国将网络和信息安全产品分了9类:鉴别、访问控制、入侵检测、防火墙、公钥基础设施、恶意程序代码防护、漏洞扫描、取证、介质清理或擦除。中国公安部将网络和信息安全产品分了7类:操作系统安全、数据库安全、网络安全、病毒…...
C++ 实践扩展(Qt Creator 联动 Visual Studio 2022)
这里我们将在 VS 上实现 QT 编程,实现如下: 一、Vs 2022 配置(若已安装,可直接跳过) 点击链接:Visual Studio 2022 我们先去 Vs 官网下载,如下: 等待程序安装完成之…...
如何实现Deepseek的本地部署并集成本地知识库?
1、下载并配置Deepseek环境 https://blog.csdn.net/kxg6666/article/details/145593346?spm1001.2014.3001.5501 2、安装AnythingLLM AnythingLLM | The all-in-one AI application for everyone 如官网下载较慢,本文最后提供夸克离线下载链接。下载后默认安装…...
vue学习笔记8
Pinia基础使用 - 计数器案例 定义Store(state action) 组件使用Store getters实现 Pinia中的 getters 直接使用 computed函数 进行模拟, 组件中需要使用需要把 getters return出去 action异步实现 编写方式:异步action函数的写法和组件…...
【自学笔记】Vue基础知识点总览-持续更新
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 Vue重点知识点总览一、Vue基础1. Vue简介2. MVVM设计思想3. 响应式数据绑定4. 组件化开发 二、Vue核心特性1. 虚拟DOM2. 模板语法3. 计算属性与监听属性 三、Vue高级…...
ETL的使用(sqoop):数据导入,导出
ETL ETL: 是数据抽取(Extract)、数据转换(Transform)和数据加载(Load)的整个过程 常用的ETL工具 sqoop 1.Apache Sqoop 是 Apache 软件基金会旗下的一个开源项目,旨在帮助用户高效地在 Hado…...
【核心特性】从鸭子类型到Go的io.Writer设计哲学
在编程语言的设计中,鸭子类型和接口设计是两种非常重要的理念。它们都强调了对象的行为和能力,而非其具体的类型或继承关系。Go 语言的io.Writer 接口是这种设计理念的典型代表,它通过简洁的接口定义,实现了强大的功能和灵活性。 …...
多模态模型详解
多模态模型是什么 多模态模型是一种能够处理和理解多种数据类型(如文本、图像、音频、视频等)的机器学习模型,通过融合不同模态的信息来提升任务的性能。其核心在于利用不同模态之间的互补性,增强模型的鲁棒性和准确性。 如何融合…...
Go 语言里中的堆与栈
在 Go 语言里,堆和栈是内存管理的两个重要概念,它们在多个方面存在明显差异: 1. 内存分配与回收方式 栈 分配:Go 语言中,栈内存主要用于存储函数的局部变量和调用信息。当一个函数被调用时,Go 会自动为其…...
八、OSG学习笔记-
前一章节: 七、OSG学习笔记-碰撞检测-CSDN博客https://blog.csdn.net/weixin_36323170/article/details/145558132?spm1001.2014.3001.5501 一、了解OSG图元加载显示流程 本章节代码: OsgStudy/wids CuiQingCheng/OsgStudy - 码云 - 开源中国https:…...
本地部署【LLM-deepseek】大模型 ollama+deepseek/conda(python)+openwebui/docker+openwebui
通过ollama本地部署deepseek 总共两步 1.模型部署 2.[web页面] 参考官网 ollama:模型部署 https://ollama.com/ open-webui:web页面 https://github.com/open-webui/open-webui 设备参考 Mac M 芯片 windows未知 蒸馏模型版本:deepseek-r1:14b 运行情况macminim2 24256 本地…...
网络分析工具—WireShark的安装及使用
Wireshark 是一个广泛使用的网络协议分析工具,常被网络管理员、开发人员和安全专家用来捕获和分析网络数据包。它支持多种网络协议,能够帮助用户深入理解网络流量、诊断网络问题以及进行安全分析。 Wireshark 的主要功能 数据包捕获与分析: …...
MobaXterm的图形化界面支持:原理与分辨率问题解决
1. 概述 MobaXterm 是一款功能强大的远程访问工具,支持SSH、RDP、X11、VNC等多种协议,并内置了强大的图形界面支持,让用户能够在远程操作Linux/Unix系统时,享受到类似本地桌面的流畅体验。 与传统的SSH客户端不同,Mo…...
Java JVM(Java Virtual Machine)解析
Java Virtual Machine(JVM)是Java平台的核心组成部分,它负责执行Java字节码,并提供了一个运行时环境。本文将深入探讨JVM的工作原理、组成部分以及其在Java开发中的重要性。 一、JVM的基本概念 JVM是一个虚拟的计算机࿰…...
pytest测试专题 - 1.2 如何获得美观的测试报告
<< 返回目录 1 pytest测试专题 - 1.2 如何获得美观的测试报告 1.1 背景 虽然pytest命令的报文很详细,用例在执行调试时还算比较方便阅读和提取失败信息, 但对于大量测试用例运行时,可能会存在以下不足 报文被冲掉测试日志没法归档 …...
现阶段股指期货交易保证金和费用多少?股指期货一手多少钱?
股指期货交易的保证金就是你在买卖股指期货合约时,需存入交易账户的一笔资金。 股指期货交易保证金是多少? 股指期货的交易保证金就像是租房时的押金,确保你能承担交易带来的风险。 一般来说,保证金的比例大概在合约价值的12-14…...
使用mermaid画流程图
本文介绍使用mermaid画流程图,并给出几个示例。 背景 目前,除有明确格式要求的文档外,笔者一般使用markdown写文档、笔记。当文档有图片时,使用Typora等软件可实时渲染,所见即所得。但如果文档接收方没有安装相关工具…...
大模型笔记:pytorch实现MOE
0 导入库 import torch import torch.nn as nn import torch.nn.functional as F 1 专家模型 #一个简单的专家模型,可以是任何神经网络架构 class Expert(nn.Module):def __init__(self, input_size, output_size):super(Expert, self).__init__()self.fc nn.L…...
基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
Unity3D中Gfx.WaitForPresent优化方案
前言 在Unity中,Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染(即CPU被阻塞),这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案: 对惹,这里有一个游戏开发交流小组&…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...
Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...
CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...
JVM虚拟机:内存结构、垃圾回收、性能优化
1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...
DiscuzX3.5发帖json api
参考文章:PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下,适配我自己的需求 有一个站点存在多个采集站,我想通过主站拿标题,采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...
