当前位置: 首页 > news >正文

kafka动态监听主题

简单版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);containerProperties.setMessageListener((MessageListener<String, String>) record -> {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}

手动ack版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置消息监听器为 AcknowledgingMessageListenercontainerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {try {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());// 模拟消息处理逻辑// 处理完成后手动确认消息if (ack != null) {ack.acknowledge();}} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}

批量处理版本 

    @Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}

可关闭版本

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;// 用于保存每个主题对应的监听器容器private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();/*** 开启一个监听*/public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();// 将监听器容器保存到 map 中containerMap.put(topic, container);}/*** 关闭一个监听*/public void stopListener(String topic) {ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);if (container != null && container.isRunning()) {container.stop();// 从 map 中移除已停止的监听器容器containerMap.remove(topic);}}
}

调用添加监听

    /*** 配置详情*/@GetMapping("/getModelZdyConfInfo")public String getModelZdyConfInfo(String topic) {dynamicKafkaListenerService.registerListener(topic);return "添加" + topic + "监听成功";}

相关文章:

kafka动态监听主题

简单版本 import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.Containe…...

Python虚拟环境管理工具 pyenv

情景 我现在在部署一个python 项目&#xff0c;需要用到Python 3.10。但是我本地已经有了一个3.12解释器&#xff0c;有没有一种方法&#xff0c;可以管理python 环境&#xff0c;还可以随意切换。怎么做&#xff1f; window 安装pyenv-win 使用 PowerShell&#xff08;以管…...

网络安全产品架构图 网络安全相关产品

一、信息安全产品分类 背景 美国将网络和信息安全产品分了9类&#xff1a;鉴别、访问控制、入侵检测、防火墙、公钥基础设施、恶意程序代码防护、漏洞扫描、取证、介质清理或擦除。中国公安部将网络和信息安全产品分了7类&#xff1a;操作系统安全、数据库安全、网络安全、病毒…...

C++ 实践扩展(Qt Creator 联动 Visual Studio 2022)

​ 这里我们将在 VS 上实现 QT 编程&#xff0c;实现如下&#xff1a; 一、Vs 2022 配置&#xff08;若已安装&#xff0c;可直接跳过&#xff09; 点击链接&#xff1a;​​​​​Visual Studio 2022 我们先去 Vs 官网下载&#xff0c;如下&#xff1a; 等待程序安装完成之…...

如何实现Deepseek的本地部署并集成本地知识库?

1、下载并配置Deepseek环境 https://blog.csdn.net/kxg6666/article/details/145593346?spm1001.2014.3001.5501 2、安装AnythingLLM AnythingLLM | The all-in-one AI application for everyone 如官网下载较慢&#xff0c;本文最后提供夸克离线下载链接。下载后默认安装…...

vue学习笔记8

Pinia基础使用 - 计数器案例 定义Store&#xff08;state action&#xff09; 组件使用Store getters实现 Pinia中的 getters 直接使用 computed函数 进行模拟, 组件中需要使用需要把 getters return出去 action异步实现 编写方式&#xff1a;异步action函数的写法和组件…...

【自学笔记】Vue基础知识点总览-持续更新

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 Vue重点知识点总览一、Vue基础1. Vue简介2. MVVM设计思想3. 响应式数据绑定4. 组件化开发 二、Vue核心特性1. 虚拟DOM2. 模板语法3. 计算属性与监听属性 三、Vue高级…...

ETL的使用(sqoop):数据导入,导出

ETL ETL: 是数据抽取&#xff08;Extract&#xff09;、数据转换&#xff08;Transform&#xff09;和数据加载&#xff08;Load&#xff09;的整个过程 常用的ETL工具 sqoop 1.Apache Sqoop 是 Apache 软件基金会旗下的一个开源项目&#xff0c;旨在帮助用户高效地在 Hado…...

【核心特性】从鸭子类型到Go的io.Writer设计哲学

在编程语言的设计中&#xff0c;鸭子类型和接口设计是两种非常重要的理念。它们都强调了对象的行为和能力&#xff0c;而非其具体的类型或继承关系。Go 语言的io.Writer 接口是这种设计理念的典型代表&#xff0c;它通过简洁的接口定义&#xff0c;实现了强大的功能和灵活性。 …...

多模态模型详解

多模态模型是什么 多模态模型是一种能够处理和理解多种数据类型&#xff08;如文本、图像、音频、视频等&#xff09;的机器学习模型&#xff0c;通过融合不同模态的信息来提升任务的性能。其核心在于利用不同模态之间的互补性&#xff0c;增强模型的鲁棒性和准确性。 如何融合…...

Go 语言里中的堆与栈

在 Go 语言里&#xff0c;堆和栈是内存管理的两个重要概念&#xff0c;它们在多个方面存在明显差异&#xff1a; 1. 内存分配与回收方式 栈 分配&#xff1a;Go 语言中&#xff0c;栈内存主要用于存储函数的局部变量和调用信息。当一个函数被调用时&#xff0c;Go 会自动为其…...

八、OSG学习笔记-

前一章节&#xff1a; 七、OSG学习笔记-碰撞检测-CSDN博客https://blog.csdn.net/weixin_36323170/article/details/145558132?spm1001.2014.3001.5501 一、了解OSG图元加载显示流程 本章节代码&#xff1a; OsgStudy/wids CuiQingCheng/OsgStudy - 码云 - 开源中国https:…...

本地部署【LLM-deepseek】大模型 ollama+deepseek/conda(python)+openwebui/docker+openwebui

通过ollama本地部署deepseek 总共两步 1.模型部署 2.[web页面] 参考官网 ollama:模型部署 https://ollama.com/ open-webui:web页面 https://github.com/open-webui/open-webui 设备参考 Mac M 芯片 windows未知 蒸馏模型版本:deepseek-r1:14b 运行情况macminim2 24256 本地…...

网络分析工具—WireShark的安装及使用

Wireshark 是一个广泛使用的网络协议分析工具&#xff0c;常被网络管理员、开发人员和安全专家用来捕获和分析网络数据包。它支持多种网络协议&#xff0c;能够帮助用户深入理解网络流量、诊断网络问题以及进行安全分析。 Wireshark 的主要功能 数据包捕获与分析&#xff1a; …...

MobaXterm的图形化界面支持:原理与分辨率问题解决

1. 概述 MobaXterm 是一款功能强大的远程访问工具&#xff0c;支持SSH、RDP、X11、VNC等多种协议&#xff0c;并内置了强大的图形界面支持&#xff0c;让用户能够在远程操作Linux/Unix系统时&#xff0c;享受到类似本地桌面的流畅体验。 与传统的SSH客户端不同&#xff0c;Mo…...

Java JVM(Java Virtual Machine)解析

Java Virtual Machine&#xff08;JVM&#xff09;是Java平台的核心组成部分&#xff0c;它负责执行Java字节码&#xff0c;并提供了一个运行时环境。本文将深入探讨JVM的工作原理、组成部分以及其在Java开发中的重要性。 一、JVM的基本概念 JVM是一个虚拟的计算机&#xff0…...

pytest测试专题 - 1.2 如何获得美观的测试报告

<< 返回目录 1 pytest测试专题 - 1.2 如何获得美观的测试报告 1.1 背景 虽然pytest命令的报文很详细&#xff0c;用例在执行调试时还算比较方便阅读和提取失败信息&#xff0c; 但对于大量测试用例运行时&#xff0c;可能会存在以下不足 报文被冲掉测试日志没法归档 …...

现阶段股指期货交易保证金和费用多少?股指期货一手多少钱?

股指期货交易的保证金就是你在买卖股指期货合约时&#xff0c;需存入交易账户的一笔资金。 股指期货交易保证金是多少&#xff1f; 股指期货的交易保证金就像是租房时的押金&#xff0c;确保你能承担交易带来的风险。 一般来说&#xff0c;保证金的比例大概在合约价值的12-14…...

使用mermaid画流程图

本文介绍使用mermaid画流程图&#xff0c;并给出几个示例。 背景 目前&#xff0c;除有明确格式要求的文档外&#xff0c;笔者一般使用markdown写文档、笔记。当文档有图片时&#xff0c;使用Typora等软件可实时渲染&#xff0c;所见即所得。但如果文档接收方没有安装相关工具…...

大模型笔记:pytorch实现MOE

0 导入库 import torch import torch.nn as nn import torch.nn.functional as F 1 专家模型 #一个简单的专家模型&#xff0c;可以是任何神经网络架构 class Expert(nn.Module):def __init__(self, input_size, output_size):super(Expert, self).__init__()self.fc nn.L…...

黑马Mybatis

Mybatis 表现层&#xff1a;页面展示 业务层&#xff1a;逻辑处理 持久层&#xff1a;持久数据化保存 在这里插入图片描述 Mybatis快速入门 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6501c2109c4442118ceb6014725e48e4.png //logback.xml <?xml ver…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具

第2章 虚拟机性能监控&#xff0c;故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令&#xff1a;jps [options] [hostid] 功能&#xff1a;本地虚拟机进程显示进程ID&#xff08;与ps相同&#xff09;&#xff0c;可同时显示主类&#x…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲

文章目录 前言第一部分&#xff1a;体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分&#xff1a;体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

腾讯云V3签名

想要接入腾讯云的Api&#xff0c;必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口&#xff0c;但总是卡在签名这一步&#xff0c;最后放弃选择SDK&#xff0c;这次终于自己代码实现。 可能腾讯云翻新了接口文档&#xff0c;现在阅读起来&#xff0c;清晰了很多&…...

jmeter聚合报告中参数详解

sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample&#xff08;样本数&#xff09; 表示测试中发送的请求数量&#xff0c;即测试执行了多少次请求。 单位&#xff0c;以个或者次数表示。 示例&#xff1a;…...

React从基础入门到高级实战:React 实战项目 - 项目五:微前端与模块化架构

React 实战项目&#xff1a;微前端与模块化架构 欢迎来到 React 开发教程专栏 的第 30 篇&#xff01;在前 29 篇文章中&#xff0c;我们从 React 的基础概念逐步深入到高级技巧&#xff0c;涵盖了组件设计、状态管理、路由配置、性能优化和企业级应用等核心内容。这一次&…...

DeepSeek越强,Kimi越慌?

被DeepSeek吊打的Kimi&#xff0c;还有多少人在用&#xff1f; 去年&#xff0c;月之暗面创始人杨植麟别提有多风光了。90后清华学霸&#xff0c;国产大模型六小虎之一&#xff0c;手握十几亿美金的融资。旗下的AI助手Kimi烧钱如流水&#xff0c;单月光是投流就花费2个亿。 疯…...