kafka动态监听主题
简单版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);containerProperties.setMessageListener((MessageListener<String, String>) record -> {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}
手动ack版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置消息监听器为 AcknowledgingMessageListenercontainerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {try {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());// 模拟消息处理逻辑// 处理完成后手动确认消息if (ack != null) {ack.acknowledge();}} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}
批量处理版本
@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
可关闭版本
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;// 用于保存每个主题对应的监听器容器private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();/*** 开启一个监听*/public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();// 将监听器容器保存到 map 中containerMap.put(topic, container);}/*** 关闭一个监听*/public void stopListener(String topic) {ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);if (container != null && container.isRunning()) {container.stop();// 从 map 中移除已停止的监听器容器containerMap.remove(topic);}}
}
调用添加监听
/*** 配置详情*/@GetMapping("/getModelZdyConfInfo")public String getModelZdyConfInfo(String topic) {dynamicKafkaListenerService.registerListener(topic);return "添加" + topic + "监听成功";}
相关文章:
kafka动态监听主题
简单版本 import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.Containe…...
Python虚拟环境管理工具 pyenv
情景 我现在在部署一个python 项目,需要用到Python 3.10。但是我本地已经有了一个3.12解释器,有没有一种方法,可以管理python 环境,还可以随意切换。怎么做? window 安装pyenv-win 使用 PowerShell(以管…...
网络安全产品架构图 网络安全相关产品
一、信息安全产品分类 背景 美国将网络和信息安全产品分了9类:鉴别、访问控制、入侵检测、防火墙、公钥基础设施、恶意程序代码防护、漏洞扫描、取证、介质清理或擦除。中国公安部将网络和信息安全产品分了7类:操作系统安全、数据库安全、网络安全、病毒…...
C++ 实践扩展(Qt Creator 联动 Visual Studio 2022)
这里我们将在 VS 上实现 QT 编程,实现如下: 一、Vs 2022 配置(若已安装,可直接跳过) 点击链接:Visual Studio 2022 我们先去 Vs 官网下载,如下: 等待程序安装完成之…...
如何实现Deepseek的本地部署并集成本地知识库?
1、下载并配置Deepseek环境 https://blog.csdn.net/kxg6666/article/details/145593346?spm1001.2014.3001.5501 2、安装AnythingLLM AnythingLLM | The all-in-one AI application for everyone 如官网下载较慢,本文最后提供夸克离线下载链接。下载后默认安装…...
vue学习笔记8
Pinia基础使用 - 计数器案例 定义Store(state action) 组件使用Store getters实现 Pinia中的 getters 直接使用 computed函数 进行模拟, 组件中需要使用需要把 getters return出去 action异步实现 编写方式:异步action函数的写法和组件…...
【自学笔记】Vue基础知识点总览-持续更新
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 Vue重点知识点总览一、Vue基础1. Vue简介2. MVVM设计思想3. 响应式数据绑定4. 组件化开发 二、Vue核心特性1. 虚拟DOM2. 模板语法3. 计算属性与监听属性 三、Vue高级…...
ETL的使用(sqoop):数据导入,导出
ETL ETL: 是数据抽取(Extract)、数据转换(Transform)和数据加载(Load)的整个过程 常用的ETL工具 sqoop 1.Apache Sqoop 是 Apache 软件基金会旗下的一个开源项目,旨在帮助用户高效地在 Hado…...
【核心特性】从鸭子类型到Go的io.Writer设计哲学
在编程语言的设计中,鸭子类型和接口设计是两种非常重要的理念。它们都强调了对象的行为和能力,而非其具体的类型或继承关系。Go 语言的io.Writer 接口是这种设计理念的典型代表,它通过简洁的接口定义,实现了强大的功能和灵活性。 …...
多模态模型详解
多模态模型是什么 多模态模型是一种能够处理和理解多种数据类型(如文本、图像、音频、视频等)的机器学习模型,通过融合不同模态的信息来提升任务的性能。其核心在于利用不同模态之间的互补性,增强模型的鲁棒性和准确性。 如何融合…...
Go 语言里中的堆与栈
在 Go 语言里,堆和栈是内存管理的两个重要概念,它们在多个方面存在明显差异: 1. 内存分配与回收方式 栈 分配:Go 语言中,栈内存主要用于存储函数的局部变量和调用信息。当一个函数被调用时,Go 会自动为其…...
八、OSG学习笔记-
前一章节: 七、OSG学习笔记-碰撞检测-CSDN博客https://blog.csdn.net/weixin_36323170/article/details/145558132?spm1001.2014.3001.5501 一、了解OSG图元加载显示流程 本章节代码: OsgStudy/wids CuiQingCheng/OsgStudy - 码云 - 开源中国https:…...
本地部署【LLM-deepseek】大模型 ollama+deepseek/conda(python)+openwebui/docker+openwebui
通过ollama本地部署deepseek 总共两步 1.模型部署 2.[web页面] 参考官网 ollama:模型部署 https://ollama.com/ open-webui:web页面 https://github.com/open-webui/open-webui 设备参考 Mac M 芯片 windows未知 蒸馏模型版本:deepseek-r1:14b 运行情况macminim2 24256 本地…...
网络分析工具—WireShark的安装及使用
Wireshark 是一个广泛使用的网络协议分析工具,常被网络管理员、开发人员和安全专家用来捕获和分析网络数据包。它支持多种网络协议,能够帮助用户深入理解网络流量、诊断网络问题以及进行安全分析。 Wireshark 的主要功能 数据包捕获与分析: …...
MobaXterm的图形化界面支持:原理与分辨率问题解决
1. 概述 MobaXterm 是一款功能强大的远程访问工具,支持SSH、RDP、X11、VNC等多种协议,并内置了强大的图形界面支持,让用户能够在远程操作Linux/Unix系统时,享受到类似本地桌面的流畅体验。 与传统的SSH客户端不同,Mo…...
Java JVM(Java Virtual Machine)解析
Java Virtual Machine(JVM)是Java平台的核心组成部分,它负责执行Java字节码,并提供了一个运行时环境。本文将深入探讨JVM的工作原理、组成部分以及其在Java开发中的重要性。 一、JVM的基本概念 JVM是一个虚拟的计算机࿰…...
pytest测试专题 - 1.2 如何获得美观的测试报告
<< 返回目录 1 pytest测试专题 - 1.2 如何获得美观的测试报告 1.1 背景 虽然pytest命令的报文很详细,用例在执行调试时还算比较方便阅读和提取失败信息, 但对于大量测试用例运行时,可能会存在以下不足 报文被冲掉测试日志没法归档 …...
现阶段股指期货交易保证金和费用多少?股指期货一手多少钱?
股指期货交易的保证金就是你在买卖股指期货合约时,需存入交易账户的一笔资金。 股指期货交易保证金是多少? 股指期货的交易保证金就像是租房时的押金,确保你能承担交易带来的风险。 一般来说,保证金的比例大概在合约价值的12-14…...
使用mermaid画流程图
本文介绍使用mermaid画流程图,并给出几个示例。 背景 目前,除有明确格式要求的文档外,笔者一般使用markdown写文档、笔记。当文档有图片时,使用Typora等软件可实时渲染,所见即所得。但如果文档接收方没有安装相关工具…...
大模型笔记:pytorch实现MOE
0 导入库 import torch import torch.nn as nn import torch.nn.functional as F 1 专家模型 #一个简单的专家模型,可以是任何神经网络架构 class Expert(nn.Module):def __init__(self, input_size, output_size):super(Expert, self).__init__()self.fc nn.L…...
用docker来安装部署freeswitch记录
今天刚才测试一个callcenter的项目,所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码
目录 一、👨🎓网站题目 二、✍️网站描述 三、📚网站介绍 四、🌐网站效果 五、🪓 代码实现 🧱HTML 六、🥇 如何让学习不再盲目 七、🎁更多干货 一、👨…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
嵌入式学习笔记DAY33(网络编程——TCP)
一、网络架构 C/S (client/server 客户端/服务器):由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序,负责提供用户界面和交互逻辑 ,接收用户输入,向服务器发送请求,并展示服务…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
uniapp 字符包含的相关方法
在uniapp中,如果你想检查一个字符串是否包含另一个子字符串,你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的,但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...
【LeetCode】算法详解#6 ---除自身以外数组的乘积
1.题目介绍 给定一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O…...
ubuntu22.04有线网络无法连接,图标也没了
今天突然无法有线网络无法连接任何设备,并且图标都没了 错误案例 往上一顿搜索,试了很多博客都不行,比如 Ubuntu22.04右上角网络图标消失 最后解决的办法 下载网卡驱动,重新安装 操作步骤 查看自己网卡的型号 lspci | gre…...
