当前位置: 首页 > news >正文

kafka动态监听主题

简单版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);containerProperties.setMessageListener((MessageListener<String, String>) record -> {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}

手动ack版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置消息监听器为 AcknowledgingMessageListenercontainerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {try {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());// 模拟消息处理逻辑// 处理完成后手动确认消息if (ack != null) {ack.acknowledge();}} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}

批量处理版本 

    @Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}

可关闭版本

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;// 用于保存每个主题对应的监听器容器private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();/*** 开启一个监听*/public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();// 将监听器容器保存到 map 中containerMap.put(topic, container);}/*** 关闭一个监听*/public void stopListener(String topic) {ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);if (container != null && container.isRunning()) {container.stop();// 从 map 中移除已停止的监听器容器containerMap.remove(topic);}}
}

调用添加监听

    /*** 配置详情*/@GetMapping("/getModelZdyConfInfo")public String getModelZdyConfInfo(String topic) {dynamicKafkaListenerService.registerListener(topic);return "添加" + topic + "监听成功";}

相关文章:

kafka动态监听主题

简单版本 import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.Containe…...

Python虚拟环境管理工具 pyenv

情景 我现在在部署一个python 项目&#xff0c;需要用到Python 3.10。但是我本地已经有了一个3.12解释器&#xff0c;有没有一种方法&#xff0c;可以管理python 环境&#xff0c;还可以随意切换。怎么做&#xff1f; window 安装pyenv-win 使用 PowerShell&#xff08;以管…...

网络安全产品架构图 网络安全相关产品

一、信息安全产品分类 背景 美国将网络和信息安全产品分了9类&#xff1a;鉴别、访问控制、入侵检测、防火墙、公钥基础设施、恶意程序代码防护、漏洞扫描、取证、介质清理或擦除。中国公安部将网络和信息安全产品分了7类&#xff1a;操作系统安全、数据库安全、网络安全、病毒…...

C++ 实践扩展(Qt Creator 联动 Visual Studio 2022)

​ 这里我们将在 VS 上实现 QT 编程&#xff0c;实现如下&#xff1a; 一、Vs 2022 配置&#xff08;若已安装&#xff0c;可直接跳过&#xff09; 点击链接&#xff1a;​​​​​Visual Studio 2022 我们先去 Vs 官网下载&#xff0c;如下&#xff1a; 等待程序安装完成之…...

如何实现Deepseek的本地部署并集成本地知识库?

1、下载并配置Deepseek环境 https://blog.csdn.net/kxg6666/article/details/145593346?spm1001.2014.3001.5501 2、安装AnythingLLM AnythingLLM | The all-in-one AI application for everyone 如官网下载较慢&#xff0c;本文最后提供夸克离线下载链接。下载后默认安装…...

vue学习笔记8

Pinia基础使用 - 计数器案例 定义Store&#xff08;state action&#xff09; 组件使用Store getters实现 Pinia中的 getters 直接使用 computed函数 进行模拟, 组件中需要使用需要把 getters return出去 action异步实现 编写方式&#xff1a;异步action函数的写法和组件…...

【自学笔记】Vue基础知识点总览-持续更新

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 Vue重点知识点总览一、Vue基础1. Vue简介2. MVVM设计思想3. 响应式数据绑定4. 组件化开发 二、Vue核心特性1. 虚拟DOM2. 模板语法3. 计算属性与监听属性 三、Vue高级…...

ETL的使用(sqoop):数据导入,导出

ETL ETL: 是数据抽取&#xff08;Extract&#xff09;、数据转换&#xff08;Transform&#xff09;和数据加载&#xff08;Load&#xff09;的整个过程 常用的ETL工具 sqoop 1.Apache Sqoop 是 Apache 软件基金会旗下的一个开源项目&#xff0c;旨在帮助用户高效地在 Hado…...

【核心特性】从鸭子类型到Go的io.Writer设计哲学

在编程语言的设计中&#xff0c;鸭子类型和接口设计是两种非常重要的理念。它们都强调了对象的行为和能力&#xff0c;而非其具体的类型或继承关系。Go 语言的io.Writer 接口是这种设计理念的典型代表&#xff0c;它通过简洁的接口定义&#xff0c;实现了强大的功能和灵活性。 …...

多模态模型详解

多模态模型是什么 多模态模型是一种能够处理和理解多种数据类型&#xff08;如文本、图像、音频、视频等&#xff09;的机器学习模型&#xff0c;通过融合不同模态的信息来提升任务的性能。其核心在于利用不同模态之间的互补性&#xff0c;增强模型的鲁棒性和准确性。 如何融合…...

Go 语言里中的堆与栈

在 Go 语言里&#xff0c;堆和栈是内存管理的两个重要概念&#xff0c;它们在多个方面存在明显差异&#xff1a; 1. 内存分配与回收方式 栈 分配&#xff1a;Go 语言中&#xff0c;栈内存主要用于存储函数的局部变量和调用信息。当一个函数被调用时&#xff0c;Go 会自动为其…...

八、OSG学习笔记-

前一章节&#xff1a; 七、OSG学习笔记-碰撞检测-CSDN博客https://blog.csdn.net/weixin_36323170/article/details/145558132?spm1001.2014.3001.5501 一、了解OSG图元加载显示流程 本章节代码&#xff1a; OsgStudy/wids CuiQingCheng/OsgStudy - 码云 - 开源中国https:…...

本地部署【LLM-deepseek】大模型 ollama+deepseek/conda(python)+openwebui/docker+openwebui

通过ollama本地部署deepseek 总共两步 1.模型部署 2.[web页面] 参考官网 ollama:模型部署 https://ollama.com/ open-webui:web页面 https://github.com/open-webui/open-webui 设备参考 Mac M 芯片 windows未知 蒸馏模型版本:deepseek-r1:14b 运行情况macminim2 24256 本地…...

网络分析工具—WireShark的安装及使用

Wireshark 是一个广泛使用的网络协议分析工具&#xff0c;常被网络管理员、开发人员和安全专家用来捕获和分析网络数据包。它支持多种网络协议&#xff0c;能够帮助用户深入理解网络流量、诊断网络问题以及进行安全分析。 Wireshark 的主要功能 数据包捕获与分析&#xff1a; …...

MobaXterm的图形化界面支持:原理与分辨率问题解决

1. 概述 MobaXterm 是一款功能强大的远程访问工具&#xff0c;支持SSH、RDP、X11、VNC等多种协议&#xff0c;并内置了强大的图形界面支持&#xff0c;让用户能够在远程操作Linux/Unix系统时&#xff0c;享受到类似本地桌面的流畅体验。 与传统的SSH客户端不同&#xff0c;Mo…...

Java JVM(Java Virtual Machine)解析

Java Virtual Machine&#xff08;JVM&#xff09;是Java平台的核心组成部分&#xff0c;它负责执行Java字节码&#xff0c;并提供了一个运行时环境。本文将深入探讨JVM的工作原理、组成部分以及其在Java开发中的重要性。 一、JVM的基本概念 JVM是一个虚拟的计算机&#xff0…...

pytest测试专题 - 1.2 如何获得美观的测试报告

<< 返回目录 1 pytest测试专题 - 1.2 如何获得美观的测试报告 1.1 背景 虽然pytest命令的报文很详细&#xff0c;用例在执行调试时还算比较方便阅读和提取失败信息&#xff0c; 但对于大量测试用例运行时&#xff0c;可能会存在以下不足 报文被冲掉测试日志没法归档 …...

现阶段股指期货交易保证金和费用多少?股指期货一手多少钱?

股指期货交易的保证金就是你在买卖股指期货合约时&#xff0c;需存入交易账户的一笔资金。 股指期货交易保证金是多少&#xff1f; 股指期货的交易保证金就像是租房时的押金&#xff0c;确保你能承担交易带来的风险。 一般来说&#xff0c;保证金的比例大概在合约价值的12-14…...

使用mermaid画流程图

本文介绍使用mermaid画流程图&#xff0c;并给出几个示例。 背景 目前&#xff0c;除有明确格式要求的文档外&#xff0c;笔者一般使用markdown写文档、笔记。当文档有图片时&#xff0c;使用Typora等软件可实时渲染&#xff0c;所见即所得。但如果文档接收方没有安装相关工具…...

大模型笔记:pytorch实现MOE

0 导入库 import torch import torch.nn as nn import torch.nn.functional as F 1 专家模型 #一个简单的专家模型&#xff0c;可以是任何神经网络架构 class Expert(nn.Module):def __init__(self, input_size, output_size):super(Expert, self).__init__()self.fc nn.L…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

JVM垃圾回收机制全解析

Java虚拟机&#xff08;JVM&#xff09;中的垃圾收集器&#xff08;Garbage Collector&#xff0c;简称GC&#xff09;是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象&#xff0c;从而释放内存空间&#xff0c;避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题

分区配置 (ptab.json) img 属性介绍&#xff1a; img 属性指定分区存放的 image 名称&#xff0c;指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件&#xff0c;则以 proj_name:binary_name 格式指定文件名&#xff0c; proj_name 为工程 名&…...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面&#xff0c;gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress&#xff0c;说明目标所使用的cms是wordpress&#xff0c;访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...

MyBatis-Plus 常用条件构造方法

1.常用条件方法 方法 说明eq等于 ne不等于 <>gt大于 >ge大于等于 >lt小于 <le小于等于 <betweenBETWEEN 值1 AND 值2notBetweenNOT BETWEEN 值1 AND 值2likeLIKE %值%notLikeNOT LIKE %值%likeLeftLIKE %值likeRightLIKE 值%isNull字段 IS NULLisNotNull字段…...