当前位置: 首页 > news >正文

Flink消费kafka出现空指针异常

文章目录

      • 出现场景:
      • 表现:
      • 问题:
      • 解决:

tombstone : Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息.

出现场景:

双流join时,采用的是left join的方式,众所周知该方式会产生回撤流,下游kafka连接器使用的是upsert-kafka,在产生回撤流时,kafka会删除未join上的消息,填充join后的消息进去。

表现:

在这里插入图片描述

问题:

此时消费该topic的flink程序会出现,空指针异常

DataStream Api会出现,Table Api 未发现

解决:

自定义kafka反序列化器过滤Null值,flink1.14.4
代码:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("").setTopics("test").setGroupId("gid").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new MySimpleStringSchema()).setProperty("auto.offset.commit", "false").build();DataStreamSource<String> kfkDs = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk");kfkDs.print();env.execute();}// 自定义反序列化器static class MySimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String>{@Overridepublic String deserialize(byte[] message) {if (message != null) return new String(message, StandardCharsets.UTF_8);else{return deserialize(new byte[1]); // 返回空 不是Null}}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic byte[] serialize(String element) {return element.getBytes(StandardCharsets.UTF_8);}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}

相关文章:

Flink消费kafka出现空指针异常

文章目录 出现场景&#xff1a;表现&#xff1a;问题&#xff1a;解决&#xff1a; tombstone : Kafka中提供了一个墓碑消息&#xff08;tombstone&#xff09;的概念&#xff0c;如果一条消息的key不为null&#xff0c;但是其value为null&#xff0c;那么此消息就是墓碑消息. …...

【探索 Kubernetes|作业管理篇 系列 9】Pod 的服务对象

前言 大家好&#xff0c;我是秋意零。 在上一篇中&#xff0c;我们介绍了 Pod 的生命周期以及区分 Pod 字段的层次级别&#xff0c;相信你对此有了充分的认识。 今天&#xff0c;我们还会接着以 Pod 展开&#xff0c;说说它的 “服务对象”&#xff0c;一听就知道是对 Pod 提…...

多种拖拽= =自用留档

<template> <div class"main-drag"> <div v-if"stencil 0" class"mapped-fields"> <el-form ref"mapped" :model"mapped" class"demo-fieldsForm"> <el-form-item label"切换数…...

贝叶斯与认知——读《贝叶斯的博弈》有感

关于对贝叶斯与认知问题的相关思考 一、贝叶斯定理二、贝叶斯与认知的本质三、经验的偏见四、总结 自古以来&#xff0c;人们就在思考知识来自何处&#xff0c;“冯翼惟象&#xff0c;何以识之&#xff1f;”&#xff0c;对此的思考逐渐发展成哲学的认识论分支。德国哲学家康德…...

MySQL安装失败starting the sever

MySQL安装失败starting the sever 如果电脑是第一次安装MySQL&#xff0c;一般不会出现这样的报错。starting the sever失败&#xff0c;通常是因为上次安装该软件没有清除干净。 第一种解决方法&#xff1a;完全卸载mysql&#xff0c;重新安装 完全卸载该软件的办法&#…...

合并文件夹中所有文件,并输出重复的条形码值

文章目录 一、需求二、处理方式三、代码实现 一、需求 每天会生成一个记录文件&#xff08;文件名按日期yyyyMMdd格式命名&#xff09;&#xff0c;记录文件中记录有条形码的内容&#xff0c;需要合并最近20次的数据&#xff0c;并提取出有重复的条形码。 也可以进行最近30天数…...

P3089 [USACO13NOV] Pogo-Cow S 弹簧踩高跷

P3089 [USACO13NOV] Pogo-Cow S 弹簧踩高跷 洛谷题目传送门 文章目录 P3089 [USACO13NOV] Pogo-Cow S 弹簧踩高跷题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示题目大意方法一&#xff08;线段树维护dp&#xff09;code 方法二 &#xff08;单调队列维护dp&…...

计算机网络 - 第一章(下)

1.2_1 分层结构、协议、接口、服务_哔哩哔哩_bilibili1.2_1 分层结构、协议、接口、服务是王道计算机考研 计算机网络的第7集视频&#xff0c;该合集共计76集&#xff0c;视频收藏或关注UP主&#xff0c;及时了解更多相关视频内容。https://www.bilibili.com/video/BV19E411D78…...

【Uniapp】小程序携带Token请求接口+无感知登录方案2.0

本次改进原文《【Uniapp】小程序携带Token请求接口无感知登录方案》&#xff0c;在实际使用过程中我发现以下bug&#xff1a; 若token恰好在用户访问接口时到期&#xff0c;就会直接查询为空&#xff0c;不反映token过期问题&#xff08;例如&#xff1a;弹窗显示订单查询记录…...

Ubuntu常用命令

文章目录 1&#xff1a;文件管理2&#xff1a;文档编辑3&#xff1a;系统管理4&#xff1a;磁盘管理5&#xff1a;文件传输6&#xff1a;网络通讯7&#xff1a;设备管理8&#xff1a;备份压缩9&#xff1a;其他命令扩展&#xff1a;知识干货 1&#xff1a;文件管理 ls命令 –…...

ERP重构-SLA子分类账-分布式实现方案

背景 ERP中的GL总账模块&#xff0c;明细数据来源于各个业务模块如库存、成本、应收、应付、费控、资产等&#xff0c;统称为子模块&#xff0c;生成的账叫做子分类账。然而记账的业务逻辑各式各样&#xff0c;但是最终输出都是来源、类型、期间、科目、借贷金额等等关键信息。…...

IP路由协议(RIP、IGRP、OSPF、IS-IS、BGP)

文章目录 1、路由分类2、RIP协议1&#xff09;RIP的工作原理2&#xff09;RIP路由表的更新过程3&#xff09;RIP路由表的更新原则4&#xff09;RIP的特性5&#xff09;RIP协议的版本 4、IGRP协议1&#xff09;IGRP路由表的更新2&#xff09;IGRP的度量标准 5、OSPF协议1&#x…...

互斥锁、自旋锁、读写锁、悲观锁、乐观锁的应用场景

多线程访问共享资源的时候&#xff0c;避免不了资源竞争而导致数据错乱的问题&#xff0c;所以我们通常为了解决这一问题&#xff0c;都会在访问共享资源之前加锁。 最常用的就是互斥锁&#xff0c;当然还有很多种不同的锁&#xff0c;比如自旋锁、读写锁、乐观锁等&#xff0…...

Python WSGI 与 Web 开发框架

目录 文章目录 目录WSGIWSGI 的工作原理environ 参数start_resposne 参数 WSGI 的中间件 WSGI Web 开发框架OpenStack 中的应用案例进程入口WSGI Application 加载Paste/PasteDeployRoutesWebOb WSGI Server 启动 WSGI WSGI&#xff08;Web Server Gateway Interface&#xff…...

[洛谷]P6464 [传智杯 #2 决赛] 传送门

看到数据范围&#xff1a;n<100&#xff0c;嗯......脑子闪过&#xff1a;还在想什么呢&#xff01;Floyd啊。哈哈哈 思路&#xff1a; 详细注释&#xff1a; 话不多说&#xff0c;上ACcode&#xff01;: #include<bits/stdc.h> using namespace std; #define int lo…...

Http协议和RestTemplate协议有什么区别?

目录 一、功能不同 二、技术不同 三、使用场景不同 四、总结 RestTemplate 是一个 Spring 框架提供的用于发送 HTTP请求的客户端工具&#xff0c;它封装了 Java 原生的 HTTP 客户端库&#xff0c;并提供了一组简洁易用的 API 来发送 HTTP 请求和处理响应。而 HTTP&#xff…...

基于SpringBoot+微信小程序的医院预约叫号小程序

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#xff1a; 该项目是基于uniappWe…...

springboot整合RabbitMQ 消费端处理数据

pom 依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>写一个rabbitmq配置文件 import org.springframework.amqp.core.Binding; import org.springframewo…...

计算机中CPU、内存、缓存的关系

CPU&#xff08;Central Processing Unit&#xff0c;中央处理器&#xff09; 内存&#xff08;Random Access Memory&#xff0c;随机存取存储器&#xff09; 缓存&#xff08;Cache&#xff09; CPU、内存和缓存之间有着密切的关系&#xff0c;它们共同构成了计算机系统的核…...

【Linux实验】构造一个简单的 shell

一、实验目的 l 用 C/C++构造一个简单的 shell; l 理解 shell 程序的功能; l 学会 shell 的使用;...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作&#xff1a; 1&#xff09;、切换集群 2&#xff09;、切换节点 3&#xff09;、切换到 apparmor 的目录 4&#xff09;、执行 apparmor 策略模块 5&#xff09;、修改 pod 文件 6&#xff09;、…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

Golang——6、指针和结构体

指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...

CTF show 数学不及格

拿到题目先查一下壳&#xff0c;看一下信息 发现是一个ELF文件&#xff0c;64位的 ​ 用IDA Pro 64 打开这个文件 ​ 然后点击F5进行伪代码转换 可以看到有五个if判断&#xff0c;第一个argc ! 5这个判断并没有起太大作用&#xff0c;主要是下面四个if判断 ​ 根据题目…...