当前位置: 首页 > news >正文

flink 报错:Caused by: java.lang.RuntimeException: Assigned key must not be null!

问题描述

不同情况下需要找对应的解决方法,这里介绍的解决方法不能拓展到别的场景。

场景描述: flink job 的开发过程中遇到这样的需求,需要先 map 处理,然后把返回的 DataStream 作为输入,流入别的 map 中。这里我们遇到的场景是从原来的 map 流到 AsyncDataStream 中。
大概的 java 代码为:

        SingleOutputStreamOperator<xxxxx> task = source.rebalance().map(new XXXXMapFunction()).uid("xxxxxx").name("xxxx");DataStream<yyyyy> dataStreamHyKontrast = AsyncDataStream.unorderedWait(task, new YYYYRequestMapFunction(), defaultTimeoutMills, TimeUnit.SECONDS).name("yyyyyy").uid("yyyyyy");.......

问题描述: 开发完成本地运行flink入口 main 方法的时候,错误提示如下:

Caused by: java.lang.RuntimeException: Assigned key must not be null!at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:298)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:78)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:371)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:352)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)at java.lang.Thread.run(Thread.java:750)

解决方法

我们避免数据直接从第一个 map 过程后直接流向第二个 AsynMap,中间添加一个处理过程,尽管这个处理过程我们啥也不干。

        SingleOutputStreamOperator<xxxxx> task = source.rebalance().map(new XXXXMapFunction()).uid("xxxxxx").name("xxxx");// 这里添加一个处理过程,task -> process ,然后process到下一个 mapSingleOutputStreamOperator<KontrastAlgoTaskStated> process = task.process(new DefaultProcessFunction<>());DataStream<yyyyy> dataStreamHyKontrast = AsyncDataStream.unorderedWait(process, new YYYYRequestMapFunction(), defaultTimeoutMills, TimeUnit.SECONDS).name("yyyyyy").uid("yyyyyy");

其中的 DefaultProcessFunction 是我自己编写的,就是什么也不做。

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;/*** 默认的 collect,用于处理 map 后不做任何处理直接到下一个结点* @author smileyan* @param <IN> 对应的实体类*/
public final class DefaultProcessFunction<IN> extends ProcessFunction<IN, IN> {@Overridepublic void processElement(IN value, ProcessFunction<IN, IN>.Context ctx, Collector<IN> out) throws Exception {out.collect(value);}
}

总结

遇到问题后查了一下,都没有找到我这种情况的博客。后来折腾了一会儿发现如上方法可以解决问题,特此记录,希望可能帮到遇到相同问题的小伙伴 ~ 感谢阅览 ~

Smileyan
2023.07.18 10:12

相关文章:

flink 报错:Caused by: java.lang.RuntimeException: Assigned key must not be null!

问题描述 不同情况下需要找对应的解决方法&#xff0c;这里介绍的解决方法不能拓展到别的场景。 场景描述&#xff1a; flink job 的开发过程中遇到这样的需求&#xff0c;需要先 map 处理&#xff0c;然后把返回的 DataStream 作为输入&#xff0c;流入别的 map 中。这里我们遇…...

AN OVERVIEW OF LANGUAGE MODELS RECENT DEVELOPMENTS AND OUTLOOK

LLM系列相关文章&#xff0c;针对《AN OVERVIEW OF LANGUAGE MODELS: RECENT DEVELOPMENTS AND OUTLOOK》的翻译。 语言模型综述&#xff1a;近年来的发展与展望 摘要1 引言2 语言模型的类型2.1 结构化LM2.2 双向LM2.3 置换LM 3 语言单元3.1 字符3.2 单词和子单词3.2.1 基于统…...

ArcGIS、ENVI、InVEST、FRAGSTATS等多技术融合提升

专题一 空间数据获取与制图 1.1 软件安装与应用讲解 1.2 空间数据介绍 1.3海量空间数据下载 1.4 ArcGIS软件快速入门 1.5 Geodatabase地理数据库 专题二 ArcGIS专题地图制作 2.1专题地图制作规范 2.2 空间数据的准备与处理 2.3 空间数据可视化&#xff1a;地图符号与注…...

fastapi初使用,构建自己的api

文章目录 1、安装2、api实现2.1、 app.get("/1")2.2、app.get("/{a}")2.3、app.get("/{a}{b}")2.4、函数和api分离 3、运行 原文链接&#xff1a;https://wangguo.site/posts/d98bb3c9.html fastapi 是一个基于 Python 的 API 构建框架&#xff…...

Html基础知识学习——圣杯布局、margin负值、等高布局(十七)

文章目录 圣杯布局margin负值等高布局 圣杯布局 两边页面固定中间页面宽度随着浏览器大小自适应 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-widt…...

从一长串字符串中找出图片,查看是否符合md5要求

/**检查内容中的图片否含有外部链接*/ function checkExternalLinks(content){var pattern /<img[^>]src["]([^"])["][^>]*>/g;var match;var index 0;while ((match pattern.exec(content)) ! null) {var imageUrl match[1];var regex /\/sto…...

新手小白如何学好UI设计?一般学多久? 优漫动游

学习UI设计首先就是软件&#xff1a;PS、AI、CDR等但是掌握了软件不等于就掌握了UI设计&#xff0c;设计的思维也是很重要的网上很多关于UI设计的教程视频&#xff0c;可以多去看看 广州平面设计培训 要多久这个看个人的学习能力吧&#xff0c;有些人天资聪慧&#xff0c;很快…...

实现 Rollup 插件alias 并使用vitest提高开发效率

本篇文章是对 实现 Rollup 插件 alias | 使用 TypeScript 实现库的基本流程 | 使用单元测试提高开发效率 的总结。其中涉及到开发一个组件库的诸多知识点。 实现一个经常用的 rollup 插件 alias 首先执行npm init命令初始化一个package.json文件&#xff0c;因为插件使用了ty…...

【DSL】ES+DSL 查询语法

【DSL】ESDSL 查询语法 一、前言二、定义1.基本介绍2.语法说明&#xff08;1&#xff09;关键字(Keywords)&#xff08;2&#xff09;标识符(Identifiers)&#xff08;3&#xff09;表达式(Expressions)&#xff08;4&#xff09;运算符(Operators)&#xff08;5&#xff09;函…...

Vue第三篇:最简单的vue购物车示例

本文参考&#xff1a;Vue Cli&#xff08;脚手架&#xff09;实现购物车小案例 - - php中文网博客 效果图&#xff1a; 编写流程&#xff1a; 1、首先通过vue/cli创建工程 vue create totalprice 2、改写App.vue代码如下&#xff1a; <template><div><div v…...

MFC 基于数据库的管理系统

文章目录 初始化设置菜单 添加数据库类创建数据库配置数据库 全部代码 初始化 创建文件选择基于CListView 初始化数据 public:CListCtrl& m_list;CSQLView::CSQLView() noexcept:m_list(GetListCtrl()) {// TODO: 在此处添加构造代码}void CSQLView::OnInitialUpdate() {C…...

EfficientNet论文笔记

EfficientNet论文笔记 通过NAS平衡了channel&#xff0c;depth&#xff0c;resolution&#xff0c;发现在相同的FLOPs下&#xff0c;同时增加 depth和 resolution的效果最好。 数据集效果小于resolution怎么办&#xff1f; EfficientNet—b0框架 表格中每个MBConv后会跟一个…...

系统学习Linux-SSH远程服务(二)

概念 安全外壳协议&#xff0c;提供安全可靠的远程连接 特点 ssh是工作在传输层和应用层的协议 ssh提供了一组管理命令 ssh 远程登陆 scp 远程拷贝 sftp 远程上传下载 ssh-copy-id ssh keygen 生成 提供了多种身份验证机制 身份验证机制 密码验证 需要提供密码 密…...

PyTorch训练RNN, GRU, LSTM:手写数字识别

文章目录 pytorch 神经网络训练demoResult参考来源 pytorch 神经网络训练demo 数据集&#xff1a;MNIST 该数据集的内容是手写数字识别&#xff0c;其分为两部分&#xff0c;分别含有60000张训练图片和10000张测试图片 图片来源&#xff1a;https://tensornews.cn/mnist_intr…...

基于深度学习的高精度道路瑕疵检测系统(PyTorch+Pyside6+YOLOv5模型)

摘要&#xff1a;基于深度学习的高精度道路瑕疵&#xff08;裂纹&#xff08;Crack&#xff09;、检查井&#xff08;Manhole&#xff09;、网&#xff08;Net&#xff09;、裂纹块&#xff08;Patch-Crack&#xff09;、网块&#xff08;Patch-Net&#xff09;、坑洼块&#x…...

【裸辞转行】是告别,也是新的开始

一年多了没有更新&#xff0c;是因为去年身体加心理因素辞职了&#xff0c;并且大概率不会再做程序员了&#xff0c;嗯。本来觉得可能再也不会打开 CSDN 了&#xff0c;想了想&#xff0c;还是来做个告别吧&#xff0c;任何事情都该有始有终才对。 回忆碎碎念 是在去年的 11 …...

了解交换机接口的链路类型(access、trunk、hybrid)

上一个章节中讲到了vlan的作用及使用&#xff0c;这篇了解一下交换机接口的链路类型和什么情况下使用 vlan在数据包中是如何体现的&#xff0c;在上一篇的时候提到测试了一下&#xff0c;从PC1去访问PC4的时候&#xff0c;只从E0/0/2发送给了E0/0/3这是&#xff0c;因为两个接…...

Android系统启动流程分析

当按下Android系统的开机电源按键时候&#xff0c;硬件会触发引导芯片&#xff0c;执行预定义的代码&#xff0c;然后加载引导程序(BootLoader)到RAM&#xff0c;Bootloader是Android系统起来前第一个程序&#xff0c;主要用来拉起Android系统程序&#xff0c;Android系统被拉起…...

如何在Ubuntu上安装OpenneBula

OpenNebula是一个开源云计算平台&#xff0c;允许我们在完全虚拟化云中组合和管理VMware和KVM虚拟机 第1步&#xff1a;安装MariaDB数据库服务器 OpenNebula还需要一个数据库服务器来存储其内容。 安装MariaDB&#xff1a; 1 2 sudo apt update sudo apt install mariadb-s…...

解决MySQL中分页查询时多页有重复数据,实际只有一条数据的问题

0 前言 有一个离奇的BUG&#xff0c;在查询时&#xff0c;第一页跟第二页有一个共同的数据。有的数据却不显示。 后来发现是在SQL排序时没用主键排序。 解决&#xff1a;使用主键排序 以下是我准备的举例&#xff0c;可以自己试试。 1 数据准备 SET NAMES utf8mb4; SET FORE…...

为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?

在建筑行业&#xff0c;项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升&#xff0c;传统的管理模式已经难以满足现代工程的需求。过去&#xff0c;许多企业依赖手工记录、口头沟通和分散的信息管理&#xff0c;导致效率低下、成本失控、风险频发。例如&#…...

Neo4j 集群管理:原理、技术与最佳实践深度解析

Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解

在 C/C 编程的编译和链接过程中&#xff0c;附加包含目录、附加库目录和附加依赖项是三个至关重要的设置&#xff0c;它们相互配合&#xff0c;确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中&#xff0c;这些概念容易让人混淆&#xff0c;但深入理解它们的作用和联…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制

目录 节点的功能承载层&#xff08;GATT/Adv&#xff09;局限性&#xff1a; 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能&#xff0c;如 Configuration …...

Docker拉取MySQL后数据库连接失败的解决方案

在使用Docker部署MySQL时&#xff0c;拉取并启动容器后&#xff0c;有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致&#xff0c;包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因&#xff0c;并提供解决方案。 一、确认MySQL容器的运行状态 …...

Vue 3 + WebSocket 实战:公司通知实时推送功能详解

&#x1f4e2; Vue 3 WebSocket 实战&#xff1a;公司通知实时推送功能详解 &#x1f4cc; 收藏 点赞 关注&#xff0c;项目中要用到推送功能时就不怕找不到了&#xff01; 实时通知是企业系统中常见的功能&#xff0c;比如&#xff1a;管理员发布通知后&#xff0c;所有用户…...

Spring事务传播机制有哪些?

导语&#xff1a; Spring事务传播机制是后端面试中的必考知识点&#xff0c;特别容易出现在“项目细节挖掘”阶段。面试官通过它来判断你是否真正理解事务控制的本质与异常传播机制。本文将从实战与源码角度出发&#xff0c;全面剖析Spring事务传播机制&#xff0c;帮助你答得有…...

Spring是如何实现无代理对象的循环依赖

无代理对象的循环依赖 什么是循环依赖解决方案实现方式测试验证 引入代理对象的影响创建代理对象问题分析 源码见&#xff1a;mini-spring 什么是循环依赖 循环依赖是指在对象创建过程中&#xff0c;两个或多个对象相互依赖&#xff0c;导致创建过程陷入死循环。以下通过一个简…...

后端下载限速(redis记录实时并发,bucket4j动态限速)

✅ 使用 Redis 记录 所有用户的实时并发下载数✅ 使用 Bucket4j 实现 全局下载速率限制&#xff08;动态&#xff09;✅ 支持 动态调整限速策略✅ 下载接口安全、稳定、可监控 &#x1f9e9; 整体架构概览 模块功能Redis存储全局并发数和带宽令牌桶状态Bucket4j Redis分布式限…...