kafka广播消费组停机后未删除优化
背景
kafka广播消息的时候为了保证groupId不重复,再创建的时间采用前缀+时间戳的形式,这样可以保证每次启动的时候是创建的新的,但是
会出现一个问题:就是每次停机或者重启都会新建一个应用实例,关闭应用后并不会删除kafka下面的消费组,导致消费组越来越多,目前
我们有promethes监控kafka消息偏移,一直没有消费的消费组就会进行报警;
解决思路
既然是没有删除消费组就通过优雅停机,应用关闭前采用java的api操作kafka消费组,进行删除
代码实现
1)编写类实现DisposableBean接口,实现destroy方法,注意每个项目定义的id会不一样,此例子中 id = “cfgs-broadcast”
package com.simo.vsim.cfgs.init;import com.alibaba.nacos.api.config.annotation.NacosValue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;@Data
@Component
@Slf4j
public class ApplicationListen implements InitializingBean, DisposableBean {@Resourceprivate KafkaListenerEndpointRegistry registry;@NacosValue(value = "${spring.kafka.bootstrap-servers}", autoRefreshed = true)private String servers;@Overridepublic void destroy() {MessageListenerContainer listenerContainer = registry.getListenerContainer("cfgs-broadcast");String groupId = listenerContainer.getGroupId();Map<String, Object> props = new HashMap<>(1);props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers);AdminClient adminClient = AdminClient.create(props);DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(groupId));KafkaFuture resultFuture = deleteConsumerGroupsResult.all();try {resultFuture.get();log.info("kafka关闭消费组="+groupId);} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}adminClient.close();}@Overridepublic void afterPropertiesSet() {}
}
2)接收kafka广播消息的时候指定容器id,用于第一步通过id进行删除,id = “cfgs-broadcast”
/*** groupId不一样代表广播模式,earliest 可能重复消费,latest可能漏消费* @param message* @param ack*/
@KafkaListener(containerFactory = "manualImmediateListenerContainerFactory" , topics = {"${kafka.topic.cfgs-broadcast}"},properties = {"auto.offset.reset=latest"},groupId = "cfgs-broadcast-" + "#{T(java.lang.System).currentTimeMillis()}",idIsGroup = false,id = "cfgs-broadcast")
public void onMessageManualBroadcast(List<Object> message, Acknowledgment ack){message.forEach(item -> handleMsg(2,item));//直接提交offsetack.acknowledge();
}
效果
1)正常启动有这个消费组:cfgs-broadcast-1696754926097
2)重新启动,通过日志显示已经删除(k8s默认是优雅停机)

如果是iead直接关闭下,不要一下子点击两下停止,点击一次是优雅停机,连续点击2次就是kill -9的效果,就无法看到效果
查看kafka消费组,确实已经删除
相关文章:
kafka广播消费组停机后未删除优化
背景 kafka广播消息的时候为了保证groupId不重复,再创建的时间采用前缀时间戳的形式,这样可以保证每次启动的时候是创建的新的,但是 会出现一个问题:就是每次停机或者重启都会新建一个应用实例,关闭应用后并不会删除…...
深度学习自学笔记十三:unet网络详解和环境配置
一、unet网络详解 UNet(全名为 U-Net)是一种深度学习架构,最初由Olaf Ronneberger、Philipp Fischer和Thomas Brox于2015年提出,用于图像分割任务。该网络的名称来源于其U形状的架构,该架构使得网络在编码和解码过程中…...
如何给苹果ipa和安卓apk应用APP包体修改手机屏幕上logo图标iocn?
虽然修改应用文件图标是一个简单的事情,但是还是有很多小可爱是不明白的,你要是想要明白的话,那我就让你今天明白明白,我们今天采用的非常规打包方式,常规打包方式科技一下教程铺天盖地,既然小弟我出马&…...
复旦MBA魏文童:构建完备管理知识体系,助力企业数字化发展
日月光华,旦复旦兮!复旦MBA如同一个巨大的磁场,吸引了诸多来自五湖四海、各行各业的职场精英。从初入职场的青涩懵懂到如今的独当一面专业干练,他们逐渐成长为职场的中坚力量,在各自领域内发光发热。作为新时代的青年&…...
【算能】在Docker中调用PCIe卡
开发需求,需要在centos下开发对应的内容 首先拉取docker 镜像 docker pull centos:centos7 然后在空白的centos容器下使用PCIe卡,这个部分特别提醒,需要挂载/dev的这个目录,才能读到内容,故而创建docker的命令 dock…...
【MySQL】表的查询与连接
文章目录 预备工作一、表的基本查询1、简单基本查询2、分组聚合统计3、基本查询练习 二、表的复合查询1、多表查询2、子查询2.1 **单行子查询**2.2 **多行子查询**2.3 **多列子查询**2.4 在from子句中使用子查询 3、合并查询 三、表的连接1、自连接2、内连接3、外连接 预备工作…...
AtCoder Beginner Contest 324(F)
AtCoder Beginner Contest 324 F Beautiful Path 需要一点思维的转化,一时竟然没想到。 题意 给定大小为 n n n 的有向图, m m m 条边,每条边有 b i , c i b_i,c_i bi,ci 两个属性,需要找到一条从 1 ∼ n 1\sim n 1∼n…...
LuatOS-SOC接口文档(air780E)-- i2s - 数字音频
示例 -- 这个库属于底层适配库, 具体用法请查阅示例 -- demo/multimedia -- demo/tts -- demo/record常量 常量 类型 解释 i2s.MODE_I2S number I2S标准,比如ES7149 i2s.MODE_LSB number LSB格式 i2s.MODE_MSB number MSB格式,比如TM8211 …...
瑞芯微RK3568核心板在边缘服务器产品中的应用-迅为电子
迅为RK3568核心板在边缘服务器产品中可以发挥关键作用,为边缘计算应用提供高性能的计算和多媒体处理能力。边缘服务器通常用于处理和存储数据,执行本地计算任务,并支持与远程云服务的通信。以下是RK3568核心板在边缘服务器产品中的应用方案&a…...
pg ash自制版 pg_active_session_history
一、 实现功能 由于pgsentinel插件存在严重的内存占用问题,本篇改为自行实现,但其语句仍可以参考pgsentinel插件。PostgreSQL ash —— pgsentinel插件 学习与踩坑记录_CSDN博客 v1.0 根据pg 14版本设计及测试,仅支持收集主库信息。默认每10秒…...
Elasticsearch系列组件:Kibana无缝集成的数据可视化和探索平台
Elasticsearch 是一个开源的、基于 Lucene 的分布式搜索和分析引擎,设计用于云计算环境中,能够实现实时的、可扩展的搜索、分析和探索全文和结构化数据。它具有高度的可扩展性,可以在短时间内搜索和分析大量数据。 Elasticsearch 不仅仅是一个…...
phpcms_v9模板制作及二次开发常用代码
0:调用最新文章,带所在版块 {pc:get sql"SELECT a.title, a.catid, b.catid, b.catname, a.url as turl ,b.url as curl,a.id FROM v9_news a, v9_category b WHERE a.catid b.catid ORDER BY a.id DESC " num"15" cache"300"} {lo…...
自然语言处理(NLP)-概述
NLP 一、什么是自然语言处理(NLP)二、NLP的发展三、相关理论1 语言模型2 词向量表征和语义分析3 深度学习 一、什么是自然语言处理(NLP) 什么是自然语言处理 二、NLP的发展 三、相关理论 1 语言模型 序列数据形式多样…...
Python开发者的宝典:CSV和JSON数据处理技巧大公开!
更多资料获取 📚 个人网站:涛哥聊Python 在Python中处理CSV和JSON数据时,需要深入了解这两种数据格式的读取、写入、处理和转换方法。 下面将详细介绍如何在Python中处理CSV和JSON数据,并提供一些示例和最佳实践。 CSV数据处理…...
Unity中Commpont类获取子物体的示例
// 本脚本用于演示Component类 方法 //任何一个组件 都可以从游戏物体获取或者从其父对象哪里 子对象哪里获取,一个组件也可以拿到同一个物体上的其他组件 using System.Collections; using System.Collections.Generic; using UnityEngine; public class Component…...
【Vue面试题二十一】、Vue中的过滤器了解吗?过滤器的应用场景有哪些?
文章底部有个人公众号:热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享? 踩过的坑没必要让别人在再踩,自己复盘也能加深记忆。利己利人、所谓双赢。 面试官:Vue中的过滤器了解吗&am…...
Unity 3D基础——缓动效果
1.在场景中新建两个 Cube 立方体,在 Scene 视图中将两个 Cude的位置错开。 2.新建 C# 脚本 MoveToTarget.cs(写完记得保存) using System.Collections; using System.Collections.Generic; using UnityEngine;public class MoveToTarget : M…...
高校教务系统登录页面JS分析——南京邮电大学
高校教务系统密码加密逻辑及JS逆向 本文将介绍南京邮电大学教务系统的密码加密逻辑以及使用JavaScript进行逆向分析的过程。通过本文,你将了解到密码加密的基本概念、常用加密算法以及如何通过逆向分析来破解密码。 本文仅供交流学习,勿用于非法用途。 一…...
css实现排行榜样式(vue组件)
先看效果图: <template><div class"lawyer-refund-wrap"><div class"content"><divv-for"(item, index) in dataList" :key"index":style"{width: calc(100% - ${(index 1) * 10}px)}"c…...
I2VGen-XL高清图像生成视频大模型
本项目I2VGen-XL旨在解决根据输入图像生成高清视频任务。I2VGen-XL由达摩院研发的高清视频生成基础模型之一,其核心部分包含两个阶段,分别解决语义一致性和清晰度的问题,参数量共计约37亿,模型经过在大规模视频和图像数据混合预训…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...
使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!
简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求,并检查收到的响应。它以以下模式之一…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
