java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志
pom 依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version>
</dependency>
或者
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.1</version>
</dependency>
ps:前面的 spring-kafka 依赖中已经包含了后面的 kafka-clients
KafkaConsumerDemo.java:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.logging.Logger;public class KafkaConsumerDemo {static Map<String,Object> properties = new HashMap<String,Object>();private static KafkaConsumer kafkaConsumer = null;/**** windows 环境需要将下面 8 行添加到 "C:\Windows\System32\drivers\etc\hosts" 文件中:* xxx.xxx.xxx.xxx1 xxx-data01* xxx.xxx.xxx.xxx2 xxx-data02* xxx.xxx.xxx.xxx3 xxx-data03* xxx.xxx.xxx.xxx4 xxx-data04* xxx.xxx.xxx.xxx5 xxx-data05* xxx.xxx.xxx.xxx6 xxx-data06* xxx.xxx.xxx.xxx7 xxx-data07* xxx.xxx.xxx.xxx8 xxx-data08* @param args*/public static void main(String[] args) {// 禁止控制台输出一些 org.apache.kafka.xxx 相关的日志LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.FetchSessionHandler").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.Fetcher").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.AbstractCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.network.Selector").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.OFF);properties.put("bootstrap.servers","127.0.0.1:9192,127.0.0.1:9192,127.0.0.1:9192"); // 指定 Brokerproperties.put("group.id", "11111111111111111111111"); // 指定消费组群 ID,为防止自己启动拉取消息导致其他生产环境的消费者无法消费该消息,请设置一个绝对不重复的值,以起到隔离的作用properties.put("max.poll.records", "1000");// todo 设置可批量拉取???properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class); // 将 value 的字节数组转成 Java 对象kafkaConsumer = new KafkaConsumer<String, String>(properties);// List<String> topics = queryAllTopics( consumer );kafkaConsumer.subscribe( Collections.singletonList( "ods_carbon_rfid_device_record" ) ); // 订阅主题 order-eventsnew Thread(new Runnable() {@Overridepublic void run() {receiveMessage();}}).start();}/*** 查询全部的主题(topic)列表* @param kafkaConsumer* @return*/private static List<String> queryAllTopics(KafkaConsumer kafkaConsumer) {if( kafkaConsumer == null ){return null;}Map<String, List<PartitionInfo>> map = kafkaConsumer.listTopics();if( map == null ){return null;}return new ArrayList<String>( map.keySet() );}public static void receiveMessage() {try {while ( true ){synchronized (KafkaConsumerDemo.class) {// ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 30L 表示超时时间为 30秒,有消息立即返回,没消息最多等 30 秒后返回SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(30L));String date = sdf.format(new Date());if( records == null ){System.out.println( date + " 本次未拉取到任何消息" );}else {System.out.println( date + " 本次拉取到 " + records.count() + " 条消息" );int i = 1;for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]", record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println( "第" + i + "条消息:" + info );i++;}kafkaConsumer.commitSync();}/*** 当你用 KafkaConsumer从Kafka里读取消息并且处理完后,commitSync 方法会帮你把这些消息的处理进度(也就是偏移量 offset )同步地告诉 Kafka 服务器。* 这样,Kafka 就知道你已经处理到哪儿了。如果消费者(也就是读取消息的程序)突然崩溃或者重启,Kafka 就能根据最后一次提交的偏移量,让你从上一次处理* 完的地方继续开始,而不会漏掉或者重复处理消息。* 简单来说,commitSync 方 法就是用来“保存进度”的,确保消息处理的可靠性和顺序性。*/// Thread.sleep( 5000L );}}} catch (Exception e){e.printStackTrace();} finally {kafkaConsumer.close();}}
}
相关文章:

java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志
pom 依赖: <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version> </dependency> 或者 <dependency><groupId>org.ap…...

【后端面试总结】Golang可能的内存泄漏场景及应对策略
Golang可能的内存泄漏场景及应对策略 一、引言 Golang作为一种高性能、并发友好的编程语言,其内置的垃圾回收机制极大地简化了内存管理。然而,这并不意味着开发者可以完全忽视内存泄漏问题。在实际开发中,由于不当的资源管理、循环引用、以…...

Java 反射机制详解
在 Java 编程世界中,反射机制犹如一把神奇的钥匙,它能够打开许多隐藏在代码深处的 “大门”,让开发者突破常规的限制,实现一些极具灵活性的功能。今天,就跟随我一同深入探究 Java 反射机制的奥秘。 一、什么是反射 反…...

【k8s】scc权限 restricted、anyuid、privileged
文章目录 概述1. 内置的scc2. OpenShift如何确定pod的scc2.1 Pod未带SCC标签的情况2.2. Pod带有SCC标签的情况 参考 概述 在OpenShift(后文简称OCP)中,很早就一个概念:Security Context Constraints ,简称SCC…...

2025华数杯国际赛A题完整论文讲解(含每一问python代码+数据+可视化图)
大家好呀,从发布赛题一直到现在,总算完成了2025“华数杯”国际大学生数学建模竞赛A题Can He Swim Faster的完整的成品论文。 本论文可以保证原创,保证高质量。绝不是随便引用一大堆模型和代码复制粘贴进来完全没有应用糊弄人的垃圾半成品论文…...

ThreadLocal 的使用场景
在现代电商平台中,ThreadLocal 常用于以下场景,特别是与线程隔离相关的业务中,以提高性能和简化上下文传递。 1. 用户上下文信息管理 场景:在用户发起的每次请求中,需要携带用户 ID、角色、权限等信息,而这…...

后端开发 Springboot整合Redis Spring Data Redis 模板
目录 redis 配置 RedisConfig 类 完整代码 代码讲解 1. 类定义和注解 2. 定义 RedisTemplate Bean 3. 配置 JSON 序列化 4. 配置 Redis 的 key 和 value 序列化方式 5. 完成配置并返回 RedisTemplate 总结 redis 服务接口实现类 类级别 注入 RedisTemplate 常用 Re…...

代码随想录算法训练营第 4 天(链表 2)| 24. 两两交换链表中的节点19.删除链表的倒数第N个节点 -
一、24. 两两交换链表中的节点 题目:24. 两两交换链表中的节点 - 力扣(LeetCode) 视频:帮你把链表细节学清楚! | LeetCode:24. 两两交换链表中的节点_哔哩哔哩_bilibili 讲解:代码随想录 dummy-…...

【RDMA学习笔记】1:RDMA(Remote Direct Memory Access)介绍
从帝国理工的PPT学习。 什么是RDMA Remote Direct Memory Access,也就是Remote的DMA,是一种硬件机制,能直接访问远端结点的内存,而不需要处理器介入。 其中: Remote:跨node进行数据传输Directÿ…...

网络安全常见的35个安全框架及模型
大家读完觉得有帮助记得关注和点赞!!! 01、概述 网络安全专业机构制定的一套标准、准则和程序,旨在帮助组织了解和管理面临的网络安全风险。优秀的安全框架及模型应该为用户提供一种可靠方法,帮助其实现网络安全建设…...

Elasticsearch介绍及使用
Elasticsearch 是一款基于 Lucene 库构建的开源、分布式、RESTful 风格的搜索引擎和分析引擎,具有强大的全文搜索、数据分析、机器学习等功能,广泛应用于日志分析、实时数据分析、全文检索等场景。 核心概念 索引(Index)…...

Leetocde516. 最长回文子序列 动态规划
原题链接:Leetocde516. 最长回文子序列 class Solution { public:int longestPalindromeSubseq(string s) {int n s.size();vector<vector<int>> dp(n, vector<int>(n, 1));for (int i 0; i < n; i) {dp[i][i] 1;if (i 1 < n &&…...

iOS 逆向学习 - Inter-Process Communication:进程间通信
iOS 逆向学习 - Inter-Process Communication:进程间通信 一、进程间通信概要二、iOS 进程间通信机制详解1. URL Schemes2. Pasteboard3. App Groups 和 Shared Containers4. XPC Services 三、不同进程间通信机制的差异四、总结 一、进程间通信概要 进程间通信&am…...

高级生化大纲
一,蛋白质化学: 蛋白质分离是生物化学和分子生物学研究中的一项基本技术,用于根据蛋白质的物理和化学特性将其从混合物中分离出来。 1. 离心分离法 离心分离法利用离心力来分离不同质量或密度的颗粒和分子。 差速离心:通过逐…...

YARN WebUI 服务
一、WebUI 使用 与HDFS一样,YARN也提供了一个WebUI服务,可以使用YARN Web用户界面监视群集、队列、应用程序、服务、流活动和节点信息。还可以查看集群详细配置的信息,检查各种应用程序和服务的日志。 1.1 首页 浏览器输入http://node2.itc…...

【Unity3D】利用IJob、Burst优化处理切割物体
参考文章: 【Unity】切割网格 【Unity3D】ECS入门学习(一)导入及基础学习_unity ecs教程-CSDN博客 【Unity3D】ECS入门学习(十二)IJob、IJobFor、IJobParallelFor_unity ijobparallelfor-CSDN博客 工程资源地址&…...

【大前端】Vue3 工程化项目使用详解
目录 一、前言 二、前置准备 2.1 环境准备 2.1.1 create-vue功能 2.1.2 nodejs环境 2.1.3 配置nodejs的环境变量 2.1.4 更换安装包的源 三、工程化项目创建与启动过程 3.1 创建工程化项目 3.2 项目初始化 3.3 项目启动 3.4 核心文件说明 四、VUE两种不同的API风格 …...

基于文件系统分布式锁原理
分布式锁:在一个公共的存储服务上打上一个标记,如Redis的setnx命令,是先到先得方式获得锁,ZooKeeper有点像下面的demo,比较大小的方式判决谁获得锁。 package com.ldj.mybatisflex.demo;import java.util.*; import java.util.co…...

简历整理YH
一,订单中心 1,调拨单 融通(Rocketmq)-订单中心:ECC_BMS123(已出单),125(分配),127(发货),129(收货) 通过RocketMq接入多场景订单数据 2,销售单 sap(FTP)-订单中心,下发1002,1003,…...

Kotlin 协程基础三 —— 结构化并发(二)
Kotlin 协程基础系列: Kotlin 协程基础一 —— 总体知识概述 Kotlin 协程基础二 —— 结构化并发(一) Kotlin 协程基础三 —— 结构化并发(二) Kotlin 协程基础四 —— CoroutineScope 与 CoroutineContext Kotlin 协程…...

微信小程序实现长按录音,点击播放等功能,CSS实现语音录制动画效果
有一个需求需要在微信小程序上实现一个长按时进行语音录制,录制时间最大为60秒,录制完成后,可点击播放,播放时再次点击停止播放,可以反复录制,新录制的语音把之前的语音覆盖掉,也可以主动长按删…...

校园跑腿小程序---轮播图,导航栏开发
hello hello~ ,这里是 code袁~💖💖 ,欢迎大家点赞🥳🥳关注💥💥收藏🌹🌹🌹 🦁作者简介:一名喜欢分享和记录学习的在校大学生…...

详细全面讲解C++中重载、隐藏、覆盖的区别
文章目录 总结1、重载示例代码特点1. 模板函数和非模板函数重载2. 重载示例与调用规则示例代码调用规则解释3. 特殊情况与注意事项二义性问题 函数特化与重载的交互 2. 函数隐藏(Function Hiding)概念示例代码特点 3. 函数覆盖(重写ÿ…...

一文读懂单片机的串口
目录 串口通信的基本概念 串口通信的关键参数 单片机串口的硬件连接 单片机串口的工作原理 数据发送过程 数据接收过程 单片机串口的编程实现 以51单片机为例 硬件连接 初始化串口 发送数据 接收数据 串口中断服务函数 代码示例 单片机串口的应用实例 单片机与…...

HTML5 网站模板
HTML5 网站模板 参考 HTML5 Website Templates...

mybatis分页插件:PageHelper、mybatis-plus-jsqlparser(解决SQL_SERVER2005连接分页查询OFFSET问题)
文章目录 引言I PageHelper坐标II mybatis-plus-jsqlparser坐标Spring Boot 添加分页插件自定义 Mapper 方法中使用分页注意事项解决SQL_SERVER2005连接分页查询OFFSET问题知识扩展MyBatis-Plus 框架结构mybatis-plus-jsqlparser的 Page 类引言 PageHelper import com.github.p…...

uniapp中rpx和upx的区别
在 UniApp 中,rpx 和 upx 是两种不同的单位,它们的主要区别在于适用的场景和计算方式。 ### rpx(Responsive Pixel) - **适用场景**:rpx 是一种响应式单位,主要用于小程序和移动端的布局。 - **计算方式**…...

什么是卷积网络中的平移不变性?平移shft在数据增强中的意义
今天来介绍一下数据增强中的平移shft操作和卷积网络中的平移不变性。 1、什么是平移 Shift 平移是指在数据增强(data augmentation)过程中,通过对输入图像或目标进行位置偏移(平移),让目标在图像中呈现出…...

java.net.SocketException: Connection reset 异常原因分析和解决方法
导致此异常的原因,总结下来有三种情况: 一、服务器端偶尔出现了异常,导致连接关闭 解决方法: 采用出错重试机制 二、 服务器端和客户端使用的连接方式不一致 解决方法: 服务器端和客户端使用相同的连接方式ÿ…...

Maven 仓库的分类
Maven 是一个广泛使用的项目构建和依赖管理工具,在 Java 开发生态中占据重要地位。作为 Maven 的核心概念之一,仓库(Repository)扮演着至关重要的角色,用于存储项目的依赖、插件以及构建所需的各种资源。 了解 Maven 仓…...