java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志
pom 依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version>
</dependency>
或者
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.1</version>
</dependency>
ps:前面的 spring-kafka 依赖中已经包含了后面的 kafka-clients
KafkaConsumerDemo.java:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.logging.Logger;public class KafkaConsumerDemo {static Map<String,Object> properties = new HashMap<String,Object>();private static KafkaConsumer kafkaConsumer = null;/**** windows 环境需要将下面 8 行添加到 "C:\Windows\System32\drivers\etc\hosts" 文件中:* xxx.xxx.xxx.xxx1 xxx-data01* xxx.xxx.xxx.xxx2 xxx-data02* xxx.xxx.xxx.xxx3 xxx-data03* xxx.xxx.xxx.xxx4 xxx-data04* xxx.xxx.xxx.xxx5 xxx-data05* xxx.xxx.xxx.xxx6 xxx-data06* xxx.xxx.xxx.xxx7 xxx-data07* xxx.xxx.xxx.xxx8 xxx-data08* @param args*/public static void main(String[] args) {// 禁止控制台输出一些 org.apache.kafka.xxx 相关的日志LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.FetchSessionHandler").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.Fetcher").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.AbstractCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.network.Selector").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.OFF);properties.put("bootstrap.servers","127.0.0.1:9192,127.0.0.1:9192,127.0.0.1:9192"); // 指定 Brokerproperties.put("group.id", "11111111111111111111111"); // 指定消费组群 ID,为防止自己启动拉取消息导致其他生产环境的消费者无法消费该消息,请设置一个绝对不重复的值,以起到隔离的作用properties.put("max.poll.records", "1000");// todo 设置可批量拉取???properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class); // 将 value 的字节数组转成 Java 对象kafkaConsumer = new KafkaConsumer<String, String>(properties);// List<String> topics = queryAllTopics( consumer );kafkaConsumer.subscribe( Collections.singletonList( "ods_carbon_rfid_device_record" ) ); // 订阅主题 order-eventsnew Thread(new Runnable() {@Overridepublic void run() {receiveMessage();}}).start();}/*** 查询全部的主题(topic)列表* @param kafkaConsumer* @return*/private static List<String> queryAllTopics(KafkaConsumer kafkaConsumer) {if( kafkaConsumer == null ){return null;}Map<String, List<PartitionInfo>> map = kafkaConsumer.listTopics();if( map == null ){return null;}return new ArrayList<String>( map.keySet() );}public static void receiveMessage() {try {while ( true ){synchronized (KafkaConsumerDemo.class) {// ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 30L 表示超时时间为 30秒,有消息立即返回,没消息最多等 30 秒后返回SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(30L));String date = sdf.format(new Date());if( records == null ){System.out.println( date + " 本次未拉取到任何消息" );}else {System.out.println( date + " 本次拉取到 " + records.count() + " 条消息" );int i = 1;for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]", record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println( "第" + i + "条消息:" + info );i++;}kafkaConsumer.commitSync();}/*** 当你用 KafkaConsumer从Kafka里读取消息并且处理完后,commitSync 方法会帮你把这些消息的处理进度(也就是偏移量 offset )同步地告诉 Kafka 服务器。* 这样,Kafka 就知道你已经处理到哪儿了。如果消费者(也就是读取消息的程序)突然崩溃或者重启,Kafka 就能根据最后一次提交的偏移量,让你从上一次处理* 完的地方继续开始,而不会漏掉或者重复处理消息。* 简单来说,commitSync 方 法就是用来“保存进度”的,确保消息处理的可靠性和顺序性。*/// Thread.sleep( 5000L );}}} catch (Exception e){e.printStackTrace();} finally {kafkaConsumer.close();}}
}
相关文章:
java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志
pom 依赖: <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version> </dependency> 或者 <dependency><groupId>org.ap…...
【后端面试总结】Golang可能的内存泄漏场景及应对策略
Golang可能的内存泄漏场景及应对策略 一、引言 Golang作为一种高性能、并发友好的编程语言,其内置的垃圾回收机制极大地简化了内存管理。然而,这并不意味着开发者可以完全忽视内存泄漏问题。在实际开发中,由于不当的资源管理、循环引用、以…...
Java 反射机制详解
在 Java 编程世界中,反射机制犹如一把神奇的钥匙,它能够打开许多隐藏在代码深处的 “大门”,让开发者突破常规的限制,实现一些极具灵活性的功能。今天,就跟随我一同深入探究 Java 反射机制的奥秘。 一、什么是反射 反…...
【k8s】scc权限 restricted、anyuid、privileged
文章目录 概述1. 内置的scc2. OpenShift如何确定pod的scc2.1 Pod未带SCC标签的情况2.2. Pod带有SCC标签的情况 参考 概述 在OpenShift(后文简称OCP)中,很早就一个概念:Security Context Constraints ,简称SCC…...
2025华数杯国际赛A题完整论文讲解(含每一问python代码+数据+可视化图)
大家好呀,从发布赛题一直到现在,总算完成了2025“华数杯”国际大学生数学建模竞赛A题Can He Swim Faster的完整的成品论文。 本论文可以保证原创,保证高质量。绝不是随便引用一大堆模型和代码复制粘贴进来完全没有应用糊弄人的垃圾半成品论文…...
ThreadLocal 的使用场景
在现代电商平台中,ThreadLocal 常用于以下场景,特别是与线程隔离相关的业务中,以提高性能和简化上下文传递。 1. 用户上下文信息管理 场景:在用户发起的每次请求中,需要携带用户 ID、角色、权限等信息,而这…...
后端开发 Springboot整合Redis Spring Data Redis 模板
目录 redis 配置 RedisConfig 类 完整代码 代码讲解 1. 类定义和注解 2. 定义 RedisTemplate Bean 3. 配置 JSON 序列化 4. 配置 Redis 的 key 和 value 序列化方式 5. 完成配置并返回 RedisTemplate 总结 redis 服务接口实现类 类级别 注入 RedisTemplate 常用 Re…...
代码随想录算法训练营第 4 天(链表 2)| 24. 两两交换链表中的节点19.删除链表的倒数第N个节点 -
一、24. 两两交换链表中的节点 题目:24. 两两交换链表中的节点 - 力扣(LeetCode) 视频:帮你把链表细节学清楚! | LeetCode:24. 两两交换链表中的节点_哔哩哔哩_bilibili 讲解:代码随想录 dummy-…...
【RDMA学习笔记】1:RDMA(Remote Direct Memory Access)介绍
从帝国理工的PPT学习。 什么是RDMA Remote Direct Memory Access,也就是Remote的DMA,是一种硬件机制,能直接访问远端结点的内存,而不需要处理器介入。 其中: Remote:跨node进行数据传输Directÿ…...
网络安全常见的35个安全框架及模型
大家读完觉得有帮助记得关注和点赞!!! 01、概述 网络安全专业机构制定的一套标准、准则和程序,旨在帮助组织了解和管理面临的网络安全风险。优秀的安全框架及模型应该为用户提供一种可靠方法,帮助其实现网络安全建设…...
Elasticsearch介绍及使用
Elasticsearch 是一款基于 Lucene 库构建的开源、分布式、RESTful 风格的搜索引擎和分析引擎,具有强大的全文搜索、数据分析、机器学习等功能,广泛应用于日志分析、实时数据分析、全文检索等场景。 核心概念 索引(Index)…...
Leetocde516. 最长回文子序列 动态规划
原题链接:Leetocde516. 最长回文子序列 class Solution { public:int longestPalindromeSubseq(string s) {int n s.size();vector<vector<int>> dp(n, vector<int>(n, 1));for (int i 0; i < n; i) {dp[i][i] 1;if (i 1 < n &&…...
iOS 逆向学习 - Inter-Process Communication:进程间通信
iOS 逆向学习 - Inter-Process Communication:进程间通信 一、进程间通信概要二、iOS 进程间通信机制详解1. URL Schemes2. Pasteboard3. App Groups 和 Shared Containers4. XPC Services 三、不同进程间通信机制的差异四、总结 一、进程间通信概要 进程间通信&am…...
高级生化大纲
一,蛋白质化学: 蛋白质分离是生物化学和分子生物学研究中的一项基本技术,用于根据蛋白质的物理和化学特性将其从混合物中分离出来。 1. 离心分离法 离心分离法利用离心力来分离不同质量或密度的颗粒和分子。 差速离心:通过逐…...
YARN WebUI 服务
一、WebUI 使用 与HDFS一样,YARN也提供了一个WebUI服务,可以使用YARN Web用户界面监视群集、队列、应用程序、服务、流活动和节点信息。还可以查看集群详细配置的信息,检查各种应用程序和服务的日志。 1.1 首页 浏览器输入http://node2.itc…...
【Unity3D】利用IJob、Burst优化处理切割物体
参考文章: 【Unity】切割网格 【Unity3D】ECS入门学习(一)导入及基础学习_unity ecs教程-CSDN博客 【Unity3D】ECS入门学习(十二)IJob、IJobFor、IJobParallelFor_unity ijobparallelfor-CSDN博客 工程资源地址&…...
【大前端】Vue3 工程化项目使用详解
目录 一、前言 二、前置准备 2.1 环境准备 2.1.1 create-vue功能 2.1.2 nodejs环境 2.1.3 配置nodejs的环境变量 2.1.4 更换安装包的源 三、工程化项目创建与启动过程 3.1 创建工程化项目 3.2 项目初始化 3.3 项目启动 3.4 核心文件说明 四、VUE两种不同的API风格 …...
基于文件系统分布式锁原理
分布式锁:在一个公共的存储服务上打上一个标记,如Redis的setnx命令,是先到先得方式获得锁,ZooKeeper有点像下面的demo,比较大小的方式判决谁获得锁。 package com.ldj.mybatisflex.demo;import java.util.*; import java.util.co…...
简历整理YH
一,订单中心 1,调拨单 融通(Rocketmq)-订单中心:ECC_BMS123(已出单),125(分配),127(发货),129(收货) 通过RocketMq接入多场景订单数据 2,销售单 sap(FTP)-订单中心,下发1002,1003,…...
Kotlin 协程基础三 —— 结构化并发(二)
Kotlin 协程基础系列: Kotlin 协程基础一 —— 总体知识概述 Kotlin 协程基础二 —— 结构化并发(一) Kotlin 协程基础三 —— 结构化并发(二) Kotlin 协程基础四 —— CoroutineScope 与 CoroutineContext Kotlin 协程…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...
JavaScript 数据类型详解
JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型(Primitive) 和 对象类型(Object) 两大类,共 8 种(ES11): 一、原始类型(7种) 1. undefined 定…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
Python竞赛环境搭建全攻略
Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型(算法、数据分析、机器学习等)不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...
Java 与 MySQL 性能优化:MySQL 慢 SQL 诊断与分析方法详解
文章目录 一、开启慢查询日志,定位耗时SQL1.1 查看慢查询日志是否开启1.2 临时开启慢查询日志1.3 永久开启慢查询日志1.4 分析慢查询日志 二、使用EXPLAIN分析SQL执行计划2.1 EXPLAIN的基本使用2.2 EXPLAIN分析案例2.3 根据EXPLAIN结果优化SQL 三、使用SHOW PROFILE…...
