java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志
pom 依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version>
</dependency>
或者
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.1</version>
</dependency>
ps:前面的 spring-kafka 依赖中已经包含了后面的 kafka-clients
KafkaConsumerDemo.java:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.logging.Logger;public class KafkaConsumerDemo {static Map<String,Object> properties = new HashMap<String,Object>();private static KafkaConsumer kafkaConsumer = null;/**** windows 环境需要将下面 8 行添加到 "C:\Windows\System32\drivers\etc\hosts" 文件中:* xxx.xxx.xxx.xxx1 xxx-data01* xxx.xxx.xxx.xxx2 xxx-data02* xxx.xxx.xxx.xxx3 xxx-data03* xxx.xxx.xxx.xxx4 xxx-data04* xxx.xxx.xxx.xxx5 xxx-data05* xxx.xxx.xxx.xxx6 xxx-data06* xxx.xxx.xxx.xxx7 xxx-data07* xxx.xxx.xxx.xxx8 xxx-data08* @param args*/public static void main(String[] args) {// 禁止控制台输出一些 org.apache.kafka.xxx 相关的日志LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.FetchSessionHandler").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.Fetcher").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.AbstractCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.network.Selector").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.OFF);properties.put("bootstrap.servers","127.0.0.1:9192,127.0.0.1:9192,127.0.0.1:9192"); // 指定 Brokerproperties.put("group.id", "11111111111111111111111"); // 指定消费组群 ID,为防止自己启动拉取消息导致其他生产环境的消费者无法消费该消息,请设置一个绝对不重复的值,以起到隔离的作用properties.put("max.poll.records", "1000");// todo 设置可批量拉取???properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class); // 将 value 的字节数组转成 Java 对象kafkaConsumer = new KafkaConsumer<String, String>(properties);// List<String> topics = queryAllTopics( consumer );kafkaConsumer.subscribe( Collections.singletonList( "ods_carbon_rfid_device_record" ) ); // 订阅主题 order-eventsnew Thread(new Runnable() {@Overridepublic void run() {receiveMessage();}}).start();}/*** 查询全部的主题(topic)列表* @param kafkaConsumer* @return*/private static List<String> queryAllTopics(KafkaConsumer kafkaConsumer) {if( kafkaConsumer == null ){return null;}Map<String, List<PartitionInfo>> map = kafkaConsumer.listTopics();if( map == null ){return null;}return new ArrayList<String>( map.keySet() );}public static void receiveMessage() {try {while ( true ){synchronized (KafkaConsumerDemo.class) {// ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 30L 表示超时时间为 30秒,有消息立即返回,没消息最多等 30 秒后返回SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(30L));String date = sdf.format(new Date());if( records == null ){System.out.println( date + " 本次未拉取到任何消息" );}else {System.out.println( date + " 本次拉取到 " + records.count() + " 条消息" );int i = 1;for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]", record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println( "第" + i + "条消息:" + info );i++;}kafkaConsumer.commitSync();}/*** 当你用 KafkaConsumer从Kafka里读取消息并且处理完后,commitSync 方法会帮你把这些消息的处理进度(也就是偏移量 offset )同步地告诉 Kafka 服务器。* 这样,Kafka 就知道你已经处理到哪儿了。如果消费者(也就是读取消息的程序)突然崩溃或者重启,Kafka 就能根据最后一次提交的偏移量,让你从上一次处理* 完的地方继续开始,而不会漏掉或者重复处理消息。* 简单来说,commitSync 方 法就是用来“保存进度”的,确保消息处理的可靠性和顺序性。*/// Thread.sleep( 5000L );}}} catch (Exception e){e.printStackTrace();} finally {kafkaConsumer.close();}}
}
相关文章:
java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志
pom 依赖: <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version> </dependency> 或者 <dependency><groupId>org.ap…...
【后端面试总结】Golang可能的内存泄漏场景及应对策略
Golang可能的内存泄漏场景及应对策略 一、引言 Golang作为一种高性能、并发友好的编程语言,其内置的垃圾回收机制极大地简化了内存管理。然而,这并不意味着开发者可以完全忽视内存泄漏问题。在实际开发中,由于不当的资源管理、循环引用、以…...
Java 反射机制详解
在 Java 编程世界中,反射机制犹如一把神奇的钥匙,它能够打开许多隐藏在代码深处的 “大门”,让开发者突破常规的限制,实现一些极具灵活性的功能。今天,就跟随我一同深入探究 Java 反射机制的奥秘。 一、什么是反射 反…...

【k8s】scc权限 restricted、anyuid、privileged
文章目录 概述1. 内置的scc2. OpenShift如何确定pod的scc2.1 Pod未带SCC标签的情况2.2. Pod带有SCC标签的情况 参考 概述 在OpenShift(后文简称OCP)中,很早就一个概念:Security Context Constraints ,简称SCC…...

2025华数杯国际赛A题完整论文讲解(含每一问python代码+数据+可视化图)
大家好呀,从发布赛题一直到现在,总算完成了2025“华数杯”国际大学生数学建模竞赛A题Can He Swim Faster的完整的成品论文。 本论文可以保证原创,保证高质量。绝不是随便引用一大堆模型和代码复制粘贴进来完全没有应用糊弄人的垃圾半成品论文…...
ThreadLocal 的使用场景
在现代电商平台中,ThreadLocal 常用于以下场景,特别是与线程隔离相关的业务中,以提高性能和简化上下文传递。 1. 用户上下文信息管理 场景:在用户发起的每次请求中,需要携带用户 ID、角色、权限等信息,而这…...
后端开发 Springboot整合Redis Spring Data Redis 模板
目录 redis 配置 RedisConfig 类 完整代码 代码讲解 1. 类定义和注解 2. 定义 RedisTemplate Bean 3. 配置 JSON 序列化 4. 配置 Redis 的 key 和 value 序列化方式 5. 完成配置并返回 RedisTemplate 总结 redis 服务接口实现类 类级别 注入 RedisTemplate 常用 Re…...

代码随想录算法训练营第 4 天(链表 2)| 24. 两两交换链表中的节点19.删除链表的倒数第N个节点 -
一、24. 两两交换链表中的节点 题目:24. 两两交换链表中的节点 - 力扣(LeetCode) 视频:帮你把链表细节学清楚! | LeetCode:24. 两两交换链表中的节点_哔哩哔哩_bilibili 讲解:代码随想录 dummy-…...

【RDMA学习笔记】1:RDMA(Remote Direct Memory Access)介绍
从帝国理工的PPT学习。 什么是RDMA Remote Direct Memory Access,也就是Remote的DMA,是一种硬件机制,能直接访问远端结点的内存,而不需要处理器介入。 其中: Remote:跨node进行数据传输Directÿ…...
网络安全常见的35个安全框架及模型
大家读完觉得有帮助记得关注和点赞!!! 01、概述 网络安全专业机构制定的一套标准、准则和程序,旨在帮助组织了解和管理面临的网络安全风险。优秀的安全框架及模型应该为用户提供一种可靠方法,帮助其实现网络安全建设…...

Elasticsearch介绍及使用
Elasticsearch 是一款基于 Lucene 库构建的开源、分布式、RESTful 风格的搜索引擎和分析引擎,具有强大的全文搜索、数据分析、机器学习等功能,广泛应用于日志分析、实时数据分析、全文检索等场景。 核心概念 索引(Index)…...

Leetocde516. 最长回文子序列 动态规划
原题链接:Leetocde516. 最长回文子序列 class Solution { public:int longestPalindromeSubseq(string s) {int n s.size();vector<vector<int>> dp(n, vector<int>(n, 1));for (int i 0; i < n; i) {dp[i][i] 1;if (i 1 < n &&…...

iOS 逆向学习 - Inter-Process Communication:进程间通信
iOS 逆向学习 - Inter-Process Communication:进程间通信 一、进程间通信概要二、iOS 进程间通信机制详解1. URL Schemes2. Pasteboard3. App Groups 和 Shared Containers4. XPC Services 三、不同进程间通信机制的差异四、总结 一、进程间通信概要 进程间通信&am…...

高级生化大纲
一,蛋白质化学: 蛋白质分离是生物化学和分子生物学研究中的一项基本技术,用于根据蛋白质的物理和化学特性将其从混合物中分离出来。 1. 离心分离法 离心分离法利用离心力来分离不同质量或密度的颗粒和分子。 差速离心:通过逐…...

YARN WebUI 服务
一、WebUI 使用 与HDFS一样,YARN也提供了一个WebUI服务,可以使用YARN Web用户界面监视群集、队列、应用程序、服务、流活动和节点信息。还可以查看集群详细配置的信息,检查各种应用程序和服务的日志。 1.1 首页 浏览器输入http://node2.itc…...

【Unity3D】利用IJob、Burst优化处理切割物体
参考文章: 【Unity】切割网格 【Unity3D】ECS入门学习(一)导入及基础学习_unity ecs教程-CSDN博客 【Unity3D】ECS入门学习(十二)IJob、IJobFor、IJobParallelFor_unity ijobparallelfor-CSDN博客 工程资源地址&…...

【大前端】Vue3 工程化项目使用详解
目录 一、前言 二、前置准备 2.1 环境准备 2.1.1 create-vue功能 2.1.2 nodejs环境 2.1.3 配置nodejs的环境变量 2.1.4 更换安装包的源 三、工程化项目创建与启动过程 3.1 创建工程化项目 3.2 项目初始化 3.3 项目启动 3.4 核心文件说明 四、VUE两种不同的API风格 …...
基于文件系统分布式锁原理
分布式锁:在一个公共的存储服务上打上一个标记,如Redis的setnx命令,是先到先得方式获得锁,ZooKeeper有点像下面的demo,比较大小的方式判决谁获得锁。 package com.ldj.mybatisflex.demo;import java.util.*; import java.util.co…...
简历整理YH
一,订单中心 1,调拨单 融通(Rocketmq)-订单中心:ECC_BMS123(已出单),125(分配),127(发货),129(收货) 通过RocketMq接入多场景订单数据 2,销售单 sap(FTP)-订单中心,下发1002,1003,…...
Kotlin 协程基础三 —— 结构化并发(二)
Kotlin 协程基础系列: Kotlin 协程基础一 —— 总体知识概述 Kotlin 协程基础二 —— 结构化并发(一) Kotlin 协程基础三 —— 结构化并发(二) Kotlin 协程基础四 —— CoroutineScope 与 CoroutineContext Kotlin 协程…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...

【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...

20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
Python Einops库:深度学习中的张量操作革命
Einops(爱因斯坦操作库)就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库,用类似自然语言的表达式替代了晦涩的API调用,彻底改变了深度学习工程…...