当前位置: 首页 > news >正文

解决Flink读取kafka主题数据无报错无数据打印的重大发现(问题已解决)



        亦菲、彦祖们,今天使用idea开发的时候,运行flink程序(读取kafka主题数据)的时候,发现操作台什么数据都没有只有满屏红色日志输出,关键干嘛?一点报错都没有,一开始我觉得应该执行程序的姿势有问题,然后我重新执行了一次还是不行,我就一直等待,发现等了好久都没有数据来到,我就开始察觉不对了。

        下面是我排查的思路:

        1.kafka broker有没有数据:因为我是读取kafka主题数据,所以我屁颠屁颠的去kakfa查看我的消费主题是否有数据,查看没有问题!

        2.读取的主题是否出现问题:经过切换其他主题读取数据,发现也是没有数据出现在操作台,所以不是主题的问题

        3.查看flink与kafka 连接器的配置是否有问题:我就回去查看构建kafka连接器的builder是否问题,我尝试把偏移量改为从最早的偏移量开始读取,也是无动于衷呀!

        通过以上思路之后,我就彻底无语了,那到底是什么问题?

因为我是从flink连接kafka读取数据的,所以我觉得直接连接kafka读取主题数据试一试,这样就可以排除是不是flink有问题了,所以我就写了以下代码进行测试:

 // 配置 Kafka 消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  "hadoop101:10092,hadoop102:10092,hadoop103:10092"); // Kafka 集群地址props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组 IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Key 反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Value 反序列化器props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的偏移量开始读取// 创建消费者、KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("AllData_topic_ods"));//拉取超时时间ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(6000));for (ConsumerRecord<String, String> record : poll) {System.out.printf("offset = %d,key= %s.value=%s%n",record.offset(),record.key(),record.value());}consumer.close();

         一开始当拉取超时时间为100ms的时候,我也是消费不到数据的,但是我就想是不是我拉取超时时间太短了,因为我网络io和电脑性能匹配不上的话,它拉取时间是需要进行网络io的。所以我尝试修改拉取时间为6000ms,就是6s啦!
        然后突然就消费到数据了,我了个豆,搞定了我感觉我已经!

        100ms

        6000ms 

        于是我就回去把我那个builder的参数也修改了,一执行flink程序,这次不负众望,成功消费到数据了!!!

return KafkaSource.<String>builder().setProperty("max.poll.interval.ms","10000") // 设置拉取超时时间为10s.setProperty("partition.discovery.interval.ms", "10000").setProperty("commit.offsets.on.checkpoint", "true").setProperty("isolation.level", "read_committed")//read_committed 只会读取事务型成功提交事务写入的消息;  read_uncommitted 默认值,能够读取到 Kafka 写入的任何消息.setBootstrapServers(bootstrapServers).setTopics(topicName).setGroupId(groupId).setClientIdPrefix(clientIdPrefix).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));

        亦菲、彦祖们,搞定了!如果不是这个问题的话,也参照我上面的排查思路看看是哪里出现了问题!我能解决也是一个一个排查到,给点耐心。 

        如果帮到你,恭喜呀!如果解决不了,那当我没说,你去看别人的文章吧! 



 感谢各位的观看,创作不易,能不能给哥们来一个点赞呢!!!

好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!

私信,评论我呗!!!!!! 

关注我下一篇不迷路哦!

相关文章:

解决Flink读取kafka主题数据无报错无数据打印的重大发现(问题已解决)

亦菲、彦祖们&#xff0c;今天使用idea开发的时候&#xff0c;运行flink程序&#xff08;读取kafka主题数据&#xff09;的时候&#xff0c;发现操作台什么数据都没有只有满屏红色日志输出&#xff0c;关键干嘛&#xff1f;一点报错都没有&#xff0c;一开始我觉得应该执行程序…...

python自动化测开面试题汇总(持续更新)

介绍他们测某云&#xff0c;底层是linux可以挂多个磁盘&#xff0c;有现有的接口&#xff0c;用python实现热插拔&#xff0c;查看它的功能&#xff0c;项目目前用到的是python,linux和虚拟云&#xff0c;结合你之前的项目介绍下三者(3min之内) 列表判断是否有重复元素 求1-9的…...

1-1 Gerrit实用指南

注&#xff1a;学习gerrit需要拥有git相关知识&#xff0c;如果没有学习过git请先回顾git相关知识点 黑马程序员git教程 一小时学会git git参考博客 git 实操博客 1.0 定义 Gerrit 是一个基于 Web 的代码审查系统&#xff0c;它使用 Git 作为底层版本控制系统。Gerrit 的主要功…...

docker如何安装redis

第一步 如果未指定redis&#xff0c;则安装的是最新版的 docker pull redis 创建一个目录 mkdir /usr/local/docker/redis 然后直接可以下载redis&#xff0c;这是方式确实不怎么好&#xff0c;应该找在官网上找对应的redis配置文件 wget http://download.redis.io/redis-stab…...

省级新质生产力数据(蔡湘杰版本)2012-2022年

测算方式&#xff1a;参考《当代经济管理》蔡湘杰&#xff08;2024&#xff09;老师研究的做法&#xff0c;本文以劳动者、劳动对象和劳动资料为准则层&#xff0c;从新质生产力“量的积累、质的提升、新的拓展”三维目标出发&#xff0c;构建新质生产力综合评价指标体系&#…...

【游资悟道】-作手新一悟道心法

作手新一经典语录节选&#xff1a; 乔帮主传完整版&#xff1a;做股票5年&#xff0c;炼成18式&#xff0c;成为A股低吸大神&#xff01;从小白到大神&#xff0c;散户炒股的六个过程&#xff0c;不看不知道自己水平 围着主线做&#xff0c;多研究龙头&#xff0c;研究涨停&am…...

Diffusion中的Unet (DIMP)

针对UNet2DConditionModel模型 查看Unet的源码&#xff0c;得知Unet的down,mid,up blocks的类型分别是&#xff1a; down_block_types: Tuple[str] ("CrossAttnDownBlock2D","CrossAttnDownBlock2D","CrossAttnDownBlock2D","DownBlock2…...

编译以前项目更改在x64下面时报错:函数“PVOID GetCurrentFiber(void)”已有主体

win32下面编译成功&#xff0c;但是x64报错 1>GetWord.c 1>md5.c 这两个文件无法编译 1>C:\Program Files (x86)\Windows Kits\10\Include\10.0.22000.0\um\winnt.h(24125,1): error C2084: 函数“PVOID GetCurrentFiber(void)”已有主体 1>C:\Program Files (x…...

【AIGC】大模型面试高频考点-数据清洗篇

【AIGC】大模型面试高频考点-数据清洗篇 &#xff08;一&#xff09;常用文本清洗方法1.去除无用的符号2.去除表情符号3.文本只保留汉字4.中文繁体、简体转换5.删除 HTML 标签和特殊字符6.标记化7.小写8.停用词删除9.词干提取和词形还原10.处理缺失数据11.删除重复文本12.处理嘈…...

当测试时间与测试资源有限时,你会如何优化测试策略?

1.优先级排序&#xff1a;根据项目的需求和紧急程度进行优先级排序&#xff0c;将测试用例用例划分优先级&#xff0c;合理安排测试资源 和时间。这样能够保障在有限的时间内测试到最关键的功能 2.提前介入测试&#xff1a;在开发过程中提前进行测试&#xff0c;可以迅速发现问…...

基于R语言森林生态系统结构、功能与稳定性分析与可视化

在生态学研究中&#xff0c;森林生态系统的结构、功能与稳定性是核心研究内容之一。这些方面不仅关系到森林动态变化和物种多样性&#xff0c;还直接影响森林提供的生态服务功能及其应对环境变化的能力。森林生态系统的结构主要包括物种组成、树种多样性、树木的空间分布与密度…...

如何使用 Python 实现插件式架构

使用 Python 实现插件式架构可以通过动态加载和调用模块或类&#xff0c;构建一个易于扩展和维护的系统。以下是实现插件式架构的步骤和核心思想。 1. 插件式架构核心概念 主程序&#xff1a;负责加载、管理插件&#xff0c;并调用插件的功能。插件&#xff1a;独立的模块或类…...

【北京迅为】iTOP-4412全能版使用手册-第二十章 搭建和测试NFS服务器

iTOP-4412全能版采用四核Cortex-A9&#xff0c;主频为1.4GHz-1.6GHz&#xff0c;配备S5M8767 电源管理&#xff0c;集成USB HUB,选用高品质板对板连接器稳定可靠&#xff0c;大厂生产&#xff0c;做工精良。接口一应俱全&#xff0c;开发更简单,搭载全网通4G、支持WIFI、蓝牙、…...

【纯原生js】原生实现h5落地页面中的单选组件按钮及功能

h5端的按钮系统自带的一般都很丑&#xff0c;需要我们进行二次美化&#xff0c;比如单选按钮复选框之类的&#xff0c;那怎么对其进行html和css的改造&#xff1f; 实现效果 实现代码 <section id"tags"><h2>给景区添加标题</h2><label><…...

深入浅出:开发者如何快速上手Web3生态系统

Web3作为互联网的未来发展方向&#xff0c;正在逐步改变传统互联网架构&#xff0c;推动去中心化技术的发展。对于开发者而言&#xff0c;Web3代表着一个充满机遇与挑战的新领域&#xff0c;学习和掌握Web3的基本技术和工具&#xff0c;将为未来的项目开发提供强大的支持。那么…...

通过深度点图表示的隐式场实现肺树结构的高效解剖标注文献速递-生成式模型与transformer在医学影像中的应用

Title 题目 Efficient anatomical labeling of pulmonary tree structures via deeppoint-graph representation-based implicit fields 通过深度点图表示的隐式场实现肺树结构的高效解剖标注 01 文献速递介绍 近年来&#xff0c;肺部疾病&#xff08;Decramer等&#xff…...

数据结构 (17)广义表

前言 数据结构中的广义表&#xff08;Generalized List&#xff0c;又称列表Lists&#xff09;是一种重要的数据结构&#xff0c;它是对线性表的一种推广&#xff0c;放松了对表元素的原子限制&#xff0c;容许它们具有其自身的结构。 一、定义与表示 定义&#xff1a;广义表是…...

论文笔记 SliceGPT: Compress Large Language Models By Deleting Rows And Columns

欲买桂花同载酒&#xff0c;终不似&#xff0c;少年游。 数学知识 秩&#xff1a; 矩阵中最大线性无关的行/列向量数。行秩与列秩相等。 线性无关&#xff1a;对于N个向量而言&#xff0c;如果任取一个向量 v \textbf{v} v&#xff0c;不能被剩下的N-1个向量通过线性组合的方式…...

前端工具的选择和安装

选择和安装前端工具是前端开发过程中的重要步骤。现代前端开发需要一些工具来提高效率和协作能力。以下是一些常用的前端工具及其选择和安装指南。 1. 代码编辑器 选择一个好的代码编辑器可以显著提高开发效率。以下是几款流行的代码编辑器&#xff1a; Visual Studio Code (…...

Fantasy中定时器得驱动原理

一、服务器框架启动 public static async FTask Start(){// 启动ProcessStartProcess().Coroutine();await FTask.CompletedTask;while (true){ThreadScheduler.Update();Thread.Sleep(1);}} 二、主线程 Fantasy.ThreadScheduler.Update internal static void Update(){MainS…...

基于大模型的 UI 自动化系统

基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...

CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型

CVPR 2025 | MIMO&#xff1a;支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题&#xff1a;MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者&#xff1a;Yanyuan Chen, Dexuan Xu, Yu Hu…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来

一、破局&#xff1a;PCB行业的时代之问 在数字经济蓬勃发展的浪潮中&#xff0c;PCB&#xff08;印制电路板&#xff09;作为 “电子产品之母”&#xff0c;其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透&#xff0c;PCB行业面临着前所未有的挑战与机遇。产品迭代…...

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

在四层代理中还原真实客户端ngx_stream_realip_module

一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡&#xff08;如 HAProxy、AWS NLB、阿里 SLB&#xff09;发起上游连接时&#xff0c;将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后&#xff0c;ngx_stream_realip_module 从中提取原始信息…...

SpringCloudGateway 自定义局部过滤器

场景&#xff1a; 将所有请求转化为同一路径请求&#xff08;方便穿网配置&#xff09;在请求头内标识原来路径&#xff0c;然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...