解决Flink读取kafka主题数据无报错无数据打印的重大发现(问题已解决)


亦菲、彦祖们,今天使用idea开发的时候,运行flink程序(读取kafka主题数据)的时候,发现操作台什么数据都没有只有满屏红色日志输出,关键干嘛?一点报错都没有,一开始我觉得应该执行程序的姿势有问题,然后我重新执行了一次还是不行,我就一直等待,发现等了好久都没有数据来到,我就开始察觉不对了。
下面是我排查的思路:
1.kafka broker有没有数据:因为我是读取kafka主题数据,所以我屁颠屁颠的去kakfa查看我的消费主题是否有数据,查看没有问题!
2.读取的主题是否出现问题:经过切换其他主题读取数据,发现也是没有数据出现在操作台,所以不是主题的问题
3.查看flink与kafka 连接器的配置是否有问题:我就回去查看构建kafka连接器的builder是否问题,我尝试把偏移量改为从最早的偏移量开始读取,也是无动于衷呀!
通过以上思路之后,我就彻底无语了,那到底是什么问题?
因为我是从flink连接kafka读取数据的,所以我觉得直接连接kafka读取主题数据试一试,这样就可以排除是不是flink有问题了,所以我就写了以下代码进行测试:
// 配置 Kafka 消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:10092,hadoop102:10092,hadoop103:10092"); // Kafka 集群地址props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组 IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Key 反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Value 反序列化器props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的偏移量开始读取// 创建消费者、KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("AllData_topic_ods"));//拉取超时时间ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(6000));for (ConsumerRecord<String, String> record : poll) {System.out.printf("offset = %d,key= %s.value=%s%n",record.offset(),record.key(),record.value());}consumer.close();
一开始当拉取超时时间为100ms的时候,我也是消费不到数据的,但是我就想是不是我拉取超时时间太短了,因为我网络io和电脑性能匹配不上的话,它拉取时间是需要进行网络io的。所以我尝试修改拉取时间为6000ms,就是6s啦!
然后突然就消费到数据了,我了个豆,搞定了我感觉我已经!
100ms

6000ms

于是我就回去把我那个builder的参数也修改了,一执行flink程序,这次不负众望,成功消费到数据了!!!
return KafkaSource.<String>builder().setProperty("max.poll.interval.ms","10000") // 设置拉取超时时间为10s.setProperty("partition.discovery.interval.ms", "10000").setProperty("commit.offsets.on.checkpoint", "true").setProperty("isolation.level", "read_committed")//read_committed 只会读取事务型成功提交事务写入的消息; read_uncommitted 默认值,能够读取到 Kafka 写入的任何消息.setBootstrapServers(bootstrapServers).setTopics(topicName).setGroupId(groupId).setClientIdPrefix(clientIdPrefix).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));
亦菲、彦祖们,搞定了!如果不是这个问题的话,也参照我上面的排查思路看看是哪里出现了问题!我能解决也是一个一个排查到,给点耐心。
如果帮到你,恭喜呀!如果解决不了,那当我没说,你去看别人的文章吧!

感谢各位的观看,创作不易,能不能给哥们来一个点赞呢!!!
好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!
私信,评论我呗!!!!!!
关注我下一篇不迷路哦!
相关文章:
解决Flink读取kafka主题数据无报错无数据打印的重大发现(问题已解决)
亦菲、彦祖们,今天使用idea开发的时候,运行flink程序(读取kafka主题数据)的时候,发现操作台什么数据都没有只有满屏红色日志输出,关键干嘛?一点报错都没有,一开始我觉得应该执行程序…...
python自动化测开面试题汇总(持续更新)
介绍他们测某云,底层是linux可以挂多个磁盘,有现有的接口,用python实现热插拔,查看它的功能,项目目前用到的是python,linux和虚拟云,结合你之前的项目介绍下三者(3min之内) 列表判断是否有重复元素 求1-9的…...
1-1 Gerrit实用指南
注:学习gerrit需要拥有git相关知识,如果没有学习过git请先回顾git相关知识点 黑马程序员git教程 一小时学会git git参考博客 git 实操博客 1.0 定义 Gerrit 是一个基于 Web 的代码审查系统,它使用 Git 作为底层版本控制系统。Gerrit 的主要功…...
docker如何安装redis
第一步 如果未指定redis,则安装的是最新版的 docker pull redis 创建一个目录 mkdir /usr/local/docker/redis 然后直接可以下载redis,这是方式确实不怎么好,应该找在官网上找对应的redis配置文件 wget http://download.redis.io/redis-stab…...
省级新质生产力数据(蔡湘杰版本)2012-2022年
测算方式:参考《当代经济管理》蔡湘杰(2024)老师研究的做法,本文以劳动者、劳动对象和劳动资料为准则层,从新质生产力“量的积累、质的提升、新的拓展”三维目标出发,构建新质生产力综合评价指标体系&#…...
【游资悟道】-作手新一悟道心法
作手新一经典语录节选: 乔帮主传完整版:做股票5年,炼成18式,成为A股低吸大神!从小白到大神,散户炒股的六个过程,不看不知道自己水平 围着主线做,多研究龙头,研究涨停&am…...
Diffusion中的Unet (DIMP)
针对UNet2DConditionModel模型 查看Unet的源码,得知Unet的down,mid,up blocks的类型分别是: down_block_types: Tuple[str] ("CrossAttnDownBlock2D","CrossAttnDownBlock2D","CrossAttnDownBlock2D","DownBlock2…...
编译以前项目更改在x64下面时报错:函数“PVOID GetCurrentFiber(void)”已有主体
win32下面编译成功,但是x64报错 1>GetWord.c 1>md5.c 这两个文件无法编译 1>C:\Program Files (x86)\Windows Kits\10\Include\10.0.22000.0\um\winnt.h(24125,1): error C2084: 函数“PVOID GetCurrentFiber(void)”已有主体 1>C:\Program Files (x…...
【AIGC】大模型面试高频考点-数据清洗篇
【AIGC】大模型面试高频考点-数据清洗篇 (一)常用文本清洗方法1.去除无用的符号2.去除表情符号3.文本只保留汉字4.中文繁体、简体转换5.删除 HTML 标签和特殊字符6.标记化7.小写8.停用词删除9.词干提取和词形还原10.处理缺失数据11.删除重复文本12.处理嘈…...
当测试时间与测试资源有限时,你会如何优化测试策略?
1.优先级排序:根据项目的需求和紧急程度进行优先级排序,将测试用例用例划分优先级,合理安排测试资源 和时间。这样能够保障在有限的时间内测试到最关键的功能 2.提前介入测试:在开发过程中提前进行测试,可以迅速发现问…...
基于R语言森林生态系统结构、功能与稳定性分析与可视化
在生态学研究中,森林生态系统的结构、功能与稳定性是核心研究内容之一。这些方面不仅关系到森林动态变化和物种多样性,还直接影响森林提供的生态服务功能及其应对环境变化的能力。森林生态系统的结构主要包括物种组成、树种多样性、树木的空间分布与密度…...
如何使用 Python 实现插件式架构
使用 Python 实现插件式架构可以通过动态加载和调用模块或类,构建一个易于扩展和维护的系统。以下是实现插件式架构的步骤和核心思想。 1. 插件式架构核心概念 主程序:负责加载、管理插件,并调用插件的功能。插件:独立的模块或类…...
【北京迅为】iTOP-4412全能版使用手册-第二十章 搭建和测试NFS服务器
iTOP-4412全能版采用四核Cortex-A9,主频为1.4GHz-1.6GHz,配备S5M8767 电源管理,集成USB HUB,选用高品质板对板连接器稳定可靠,大厂生产,做工精良。接口一应俱全,开发更简单,搭载全网通4G、支持WIFI、蓝牙、…...
【纯原生js】原生实现h5落地页面中的单选组件按钮及功能
h5端的按钮系统自带的一般都很丑,需要我们进行二次美化,比如单选按钮复选框之类的,那怎么对其进行html和css的改造? 实现效果 实现代码 <section id"tags"><h2>给景区添加标题</h2><label><…...
深入浅出:开发者如何快速上手Web3生态系统
Web3作为互联网的未来发展方向,正在逐步改变传统互联网架构,推动去中心化技术的发展。对于开发者而言,Web3代表着一个充满机遇与挑战的新领域,学习和掌握Web3的基本技术和工具,将为未来的项目开发提供强大的支持。那么…...
通过深度点图表示的隐式场实现肺树结构的高效解剖标注文献速递-生成式模型与transformer在医学影像中的应用
Title 题目 Efficient anatomical labeling of pulmonary tree structures via deeppoint-graph representation-based implicit fields 通过深度点图表示的隐式场实现肺树结构的高效解剖标注 01 文献速递介绍 近年来,肺部疾病(Decramer等ÿ…...
数据结构 (17)广义表
前言 数据结构中的广义表(Generalized List,又称列表Lists)是一种重要的数据结构,它是对线性表的一种推广,放松了对表元素的原子限制,容许它们具有其自身的结构。 一、定义与表示 定义:广义表是…...
论文笔记 SliceGPT: Compress Large Language Models By Deleting Rows And Columns
欲买桂花同载酒,终不似,少年游。 数学知识 秩: 矩阵中最大线性无关的行/列向量数。行秩与列秩相等。 线性无关:对于N个向量而言,如果任取一个向量 v \textbf{v} v,不能被剩下的N-1个向量通过线性组合的方式…...
前端工具的选择和安装
选择和安装前端工具是前端开发过程中的重要步骤。现代前端开发需要一些工具来提高效率和协作能力。以下是一些常用的前端工具及其选择和安装指南。 1. 代码编辑器 选择一个好的代码编辑器可以显著提高开发效率。以下是几款流行的代码编辑器: Visual Studio Code (…...
Fantasy中定时器得驱动原理
一、服务器框架启动 public static async FTask Start(){// 启动ProcessStartProcess().Coroutine();await FTask.CompletedTask;while (true){ThreadScheduler.Update();Thread.Sleep(1);}} 二、主线程 Fantasy.ThreadScheduler.Update internal static void Update(){MainS…...
LangGraph实战:5分钟给你的AI助手装上‘对话记忆’,告别每轮都是新朋友
LangGraph实战:5分钟为AI助手构建对话记忆系统 每次和AI对话都像初次见面?这个问题困扰着许多开发者。想象一下,你告诉助手"我叫Alex",下一句问"你知道我的名字吗?",它却一脸茫然地回答…...
轨迹规划实战:用多项式插值+粒子群玩转机械臂运动优化
轨迹规划 路径规划 matlab 353多项式插值 基于改进粒子群算法 时间最优 针对六自由度 四自由度都可以,轨迹规划,多项式插值,更改轨迹点位置就可以搞机器人轨迹规划最头疼的就是既要轨迹丝滑又要时间最短。今天咱们用Matlab整点狠活—…...
正点原子IMX6ULL史诗级新内核Linux7.0移植教程(5)梭哈配置主线设备树
正点原子IMX6ULL史诗级新内核Linux7.0移植教程(5)梭哈配置主线设备树 仓库已经开源,可以研究补丁和直接看完整教程:https://github.com/Awesome-Embedded-Learning-Studio/imx-forge 有任何意见欢迎提出 PR!会第一时间…...
UniApp+Vue3避坑指南:为什么getAppWebview会失效?从原理到解决方案
UniAppVue3深度解析:getAppWebview失效的底层逻辑与工程化解决方案 在UniApp与Vue3的技术栈组合中,不少开发者遭遇过getAppWebview神秘失效的困境。这个看似简单的API调用问题,背后却隐藏着Vue3响应式系统变革与UniApp多端渲染机制的深层交互…...
Redis 的核心机制
Redis 作为高性能内存数据库,在现代架构中早已超越了单纯的“缓存”角色,成为了支撑高并发、分布式系统的基石。深入理解其核心场景、持久化机制、内存管理及集群原理,是构建稳定、高效系统的关键。 以下结合具体业务场景,深度解析…...
ES启动失败:深入解析No buffer space available错误及连接数优化策略
1. 当ES启动失败时发生了什么 第一次看到"No buffer space available"这个报错时,我也是一头雾水。那天凌晨三点,线上监控突然报警,ES集群集体罢工,整个搜索服务直接瘫痪。查看日志发现满屏都是"java.net.SocketE…...
Solidity 智能合约入门:从 0 到 1 编写第一个区块链合约
一、什么是 Solidity? Solidity 是一门面向以太坊虚拟机(EVM)、静态类型的高级编程语言,专门用于编写区块链上的智能合约。 简单来说: 智能合约 运行在区块链上的自动执行代码(无需第三方,代…...
用腾讯云轻量锐驰和对象存储,手把手教你30分钟搞定私人不限速网盘(附SSL证书配置)
零基础30分钟搭建高性能私人网盘:腾讯云轻量锐驰对象存储实战指南 你是否也受够了公有网盘动辄几百KB的下载速度?每次分享文件给朋友,对方总要忍受龟速下载的煎熬。更别提那些突然消失的文件和频繁弹出的会员广告——是时候拥有一个完全自主掌…...
低查重不是梦!AI写教材工具,让教材生成轻松又高效!
借助AI工具,开启教材创作新纪元 谁没有在编写教材框架时陷入困境呢?面对一张空白的文档,足足坐在那里半小时却不知道该从哪里开始——究竟是先介绍概念,还是先提供案例?章节划分该遵循逻辑还是按课时来的?…...
【AI重塑科研】无需通读全文,三步教你用大模型高效产出文献综述
1. 为什么你需要AI辅助文献综述? 每次打开文献库看到上百篇待读论文就头皮发麻?我完全理解这种感受。去年准备开题报告时,导师要求我两周内完成50篇核心文献的综述,当时差点崩溃。直到我发现用大模型处理文献可以节省90%的时间&am…...
