flink 报错:Caused by: java.lang.RuntimeException: Assigned key must not be null!
问题描述
不同情况下需要找对应的解决方法,这里介绍的解决方法不能拓展到别的场景。
场景描述: flink job 的开发过程中遇到这样的需求,需要先 map 处理,然后把返回的 DataStream 作为输入,流入别的 map 中。这里我们遇到的场景是从原来的 map 流到 AsyncDataStream 中。
大概的 java 代码为:
SingleOutputStreamOperator<xxxxx> task = source.rebalance().map(new XXXXMapFunction()).uid("xxxxxx").name("xxxx");DataStream<yyyyy> dataStreamHyKontrast = AsyncDataStream.unorderedWait(task, new YYYYRequestMapFunction(), defaultTimeoutMills, TimeUnit.SECONDS).name("yyyyyy").uid("yyyyyy");.......
问题描述: 开发完成本地运行flink入口 main 方法的时候,错误提示如下:
Caused by: java.lang.RuntimeException: Assigned key must not be null!at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:298)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:78)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:371)at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:352)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)at java.lang.Thread.run(Thread.java:750)
解决方法
我们避免数据直接从第一个 map 过程后直接流向第二个 AsynMap,中间添加一个处理过程,尽管这个处理过程我们啥也不干。
SingleOutputStreamOperator<xxxxx> task = source.rebalance().map(new XXXXMapFunction()).uid("xxxxxx").name("xxxx");// 这里添加一个处理过程,task -> process ,然后process到下一个 mapSingleOutputStreamOperator<KontrastAlgoTaskStated> process = task.process(new DefaultProcessFunction<>());DataStream<yyyyy> dataStreamHyKontrast = AsyncDataStream.unorderedWait(process, new YYYYRequestMapFunction(), defaultTimeoutMills, TimeUnit.SECONDS).name("yyyyyy").uid("yyyyyy");
其中的 DefaultProcessFunction 是我自己编写的,就是什么也不做。
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;/*** 默认的 collect,用于处理 map 后不做任何处理直接到下一个结点* @author smileyan* @param <IN> 对应的实体类*/
public final class DefaultProcessFunction<IN> extends ProcessFunction<IN, IN> {@Overridepublic void processElement(IN value, ProcessFunction<IN, IN>.Context ctx, Collector<IN> out) throws Exception {out.collect(value);}
}
总结
遇到问题后查了一下,都没有找到我这种情况的博客。后来折腾了一会儿发现如上方法可以解决问题,特此记录,希望可能帮到遇到相同问题的小伙伴 ~ 感谢阅览 ~
Smileyan
2023.07.18 10:12
相关文章:
flink 报错:Caused by: java.lang.RuntimeException: Assigned key must not be null!
问题描述 不同情况下需要找对应的解决方法,这里介绍的解决方法不能拓展到别的场景。 场景描述: flink job 的开发过程中遇到这样的需求,需要先 map 处理,然后把返回的 DataStream 作为输入,流入别的 map 中。这里我们遇…...
AN OVERVIEW OF LANGUAGE MODELS RECENT DEVELOPMENTS AND OUTLOOK
LLM系列相关文章,针对《AN OVERVIEW OF LANGUAGE MODELS: RECENT DEVELOPMENTS AND OUTLOOK》的翻译。 语言模型综述:近年来的发展与展望 摘要1 引言2 语言模型的类型2.1 结构化LM2.2 双向LM2.3 置换LM 3 语言单元3.1 字符3.2 单词和子单词3.2.1 基于统…...
ArcGIS、ENVI、InVEST、FRAGSTATS等多技术融合提升
专题一 空间数据获取与制图 1.1 软件安装与应用讲解 1.2 空间数据介绍 1.3海量空间数据下载 1.4 ArcGIS软件快速入门 1.5 Geodatabase地理数据库 专题二 ArcGIS专题地图制作 2.1专题地图制作规范 2.2 空间数据的准备与处理 2.3 空间数据可视化:地图符号与注…...
fastapi初使用,构建自己的api
文章目录 1、安装2、api实现2.1、 app.get("/1")2.2、app.get("/{a}")2.3、app.get("/{a}{b}")2.4、函数和api分离 3、运行 原文链接:https://wangguo.site/posts/d98bb3c9.html fastapi 是一个基于 Python 的 API 构建框架ÿ…...
Html基础知识学习——圣杯布局、margin负值、等高布局(十七)
文章目录 圣杯布局margin负值等高布局 圣杯布局 两边页面固定中间页面宽度随着浏览器大小自适应 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-widt…...
从一长串字符串中找出图片,查看是否符合md5要求
/**检查内容中的图片否含有外部链接*/ function checkExternalLinks(content){var pattern /<img[^>]src["]([^"])["][^>]*>/g;var match;var index 0;while ((match pattern.exec(content)) ! null) {var imageUrl match[1];var regex /\/sto…...
新手小白如何学好UI设计?一般学多久? 优漫动游
学习UI设计首先就是软件:PS、AI、CDR等但是掌握了软件不等于就掌握了UI设计,设计的思维也是很重要的网上很多关于UI设计的教程视频,可以多去看看 广州平面设计培训 要多久这个看个人的学习能力吧,有些人天资聪慧,很快…...
实现 Rollup 插件alias 并使用vitest提高开发效率
本篇文章是对 实现 Rollup 插件 alias | 使用 TypeScript 实现库的基本流程 | 使用单元测试提高开发效率 的总结。其中涉及到开发一个组件库的诸多知识点。 实现一个经常用的 rollup 插件 alias 首先执行npm init命令初始化一个package.json文件,因为插件使用了ty…...
【DSL】ES+DSL 查询语法
【DSL】ESDSL 查询语法 一、前言二、定义1.基本介绍2.语法说明(1)关键字(Keywords)(2)标识符(Identifiers)(3)表达式(Expressions)(4)运算符(Operators)(5)函…...
Vue第三篇:最简单的vue购物车示例
本文参考:Vue Cli(脚手架)实现购物车小案例 - - php中文网博客 效果图: 编写流程: 1、首先通过vue/cli创建工程 vue create totalprice 2、改写App.vue代码如下: <template><div><div v…...
MFC 基于数据库的管理系统
文章目录 初始化设置菜单 添加数据库类创建数据库配置数据库 全部代码 初始化 创建文件选择基于CListView 初始化数据 public:CListCtrl& m_list;CSQLView::CSQLView() noexcept:m_list(GetListCtrl()) {// TODO: 在此处添加构造代码}void CSQLView::OnInitialUpdate() {C…...
EfficientNet论文笔记
EfficientNet论文笔记 通过NAS平衡了channel,depth,resolution,发现在相同的FLOPs下,同时增加 depth和 resolution的效果最好。 数据集效果小于resolution怎么办? EfficientNet—b0框架 表格中每个MBConv后会跟一个…...
系统学习Linux-SSH远程服务(二)
概念 安全外壳协议,提供安全可靠的远程连接 特点 ssh是工作在传输层和应用层的协议 ssh提供了一组管理命令 ssh 远程登陆 scp 远程拷贝 sftp 远程上传下载 ssh-copy-id ssh keygen 生成 提供了多种身份验证机制 身份验证机制 密码验证 需要提供密码 密…...
PyTorch训练RNN, GRU, LSTM:手写数字识别
文章目录 pytorch 神经网络训练demoResult参考来源 pytorch 神经网络训练demo 数据集:MNIST 该数据集的内容是手写数字识别,其分为两部分,分别含有60000张训练图片和10000张测试图片 图片来源:https://tensornews.cn/mnist_intr…...
基于深度学习的高精度道路瑕疵检测系统(PyTorch+Pyside6+YOLOv5模型)
摘要:基于深度学习的高精度道路瑕疵(裂纹(Crack)、检查井(Manhole)、网(Net)、裂纹块(Patch-Crack)、网块(Patch-Net)、坑洼块&#x…...
【裸辞转行】是告别,也是新的开始
一年多了没有更新,是因为去年身体加心理因素辞职了,并且大概率不会再做程序员了,嗯。本来觉得可能再也不会打开 CSDN 了,想了想,还是来做个告别吧,任何事情都该有始有终才对。 回忆碎碎念 是在去年的 11 …...
了解交换机接口的链路类型(access、trunk、hybrid)
上一个章节中讲到了vlan的作用及使用,这篇了解一下交换机接口的链路类型和什么情况下使用 vlan在数据包中是如何体现的,在上一篇的时候提到测试了一下,从PC1去访问PC4的时候,只从E0/0/2发送给了E0/0/3这是,因为两个接…...
Android系统启动流程分析
当按下Android系统的开机电源按键时候,硬件会触发引导芯片,执行预定义的代码,然后加载引导程序(BootLoader)到RAM,Bootloader是Android系统起来前第一个程序,主要用来拉起Android系统程序,Android系统被拉起…...
如何在Ubuntu上安装OpenneBula
OpenNebula是一个开源云计算平台,允许我们在完全虚拟化云中组合和管理VMware和KVM虚拟机 第1步:安装MariaDB数据库服务器 OpenNebula还需要一个数据库服务器来存储其内容。 安装MariaDB: 1 2 sudo apt update sudo apt install mariadb-s…...
解决MySQL中分页查询时多页有重复数据,实际只有一条数据的问题
0 前言 有一个离奇的BUG,在查询时,第一页跟第二页有一个共同的数据。有的数据却不显示。 后来发现是在SQL排序时没用主键排序。 解决:使用主键排序 以下是我准备的举例,可以自己试试。 1 数据准备 SET NAMES utf8mb4; SET FORE…...
Polars 2.0清洗架构解密(含完整数据流拓扑图):为什么92%的团队还在用Pandas硬扛TB级脏数据?
第一章:Polars 2.0清洗架构解密:从设计哲学到性能跃迁Polars 2.0 的清洗架构并非简单功能叠加,而是以“零拷贝流式处理”与“惰性执行图优化”为双核驱动的范式重构。其设计哲学根植于两个核心信条:数据不应在内存中被无谓复制&am…...
鸿蒙应用开发全景解析与高阶面试指南
第一章 鸿蒙生态技术演进与开发环境鸿蒙操作系统(HarmonyOS)的分布式架构实现了跨设备算力调度,其核心设计思想可抽象为: $$ \text{Device}i \xrightarrow{\text{IDMS}} \text{Pool}{\text{compute}} \xrightarrow{\text{DistSche…...
**发散创新:用Python + ROS2实现多机器人协同路径规划与避障控制**在现代机器人系统中,**
发散创新:用Python ROS2实现多机器人协同路径规划与避障控制 在现代机器人系统中,多机器人协同控制已成为智能仓储、物流配送和工业自动化的核心技术之一。本文将带你深入一个真实可运行的案例——使用 Python 语言结合ROS2(Robot Operating…...
HunyuanVideo-Foley部署案例:混合精度(FP16/AMP)推理性能实测报告
HunyuanVideo-Foley部署案例:混合精度(FP16/AMP)推理性能实测报告 1. 测试环境与配置 1.1 硬件配置 显卡:RTX 4090D 24GB显存(驱动550.90.07)CPU:10核心处理器内存:120GB DDR4存储…...
精益生产方式的核心功能拆解:精益生产方式如何解决多品种小批量场景下的库存积压难题
在当前制造业从“少品种大批量”向“多品种小批量”急剧转型的背景下,精益生产方式已成为企业打破库存僵局的唯一出路,它通过准时化拉动和消除浪费的核心逻辑,精准解决了传统模式下因预测失效导致的严重库存积压问题;面对多变的订…...
开源工具gInk:高效标注从入门到精通
开源工具gInk:高效标注从入门到精通 【免费下载链接】gInk An easy to use on-screen annotation software inspired by Epic Pen. 项目地址: https://gitcode.com/gh_mirrors/gi/gInk 在数字化协作与远程沟通日益频繁的今天,屏幕标注工具已成为提…...
HunyuanVideo-Foley效果展示:AI生成ASMR触发音、白噪音与专注背景音
HunyuanVideo-Foley效果展示:AI生成ASMR触发音、白噪音与专注背景音 1. 核心能力概览 HunyuanVideo-Foley是一款专为音效生成优化的AI模型,能够根据文字描述自动生成高质量的音频内容。基于RTX 4090D 24GB显存深度优化,该镜像提供了开箱即用…...
京东云GPU服务器省钱攻略:如何根据业务需求灵活选择计费模式和虚拟化方案
京东云GPU服务器成本优化实战指南:精准匹配业务需求的选型策略 在AI与高性能计算领域,GPU服务器已成为企业技术基础设施的核心组件。然而,面对复杂的计费模式、多样的硬件配置以及差异化的虚拟化方案,许多技术决策者常常陷入"…...
6个高效突破内容访问限制的开源工具使用指南
6个高效突破内容访问限制的开源工具使用指南 【免费下载链接】bypass-paywalls-chrome-clean 项目地址: https://gitcode.com/GitHub_Trending/by/bypass-paywalls-chrome-clean 在信息爆炸的数字时代,优质内容常常被付费墙限制访问。本文将系统介绍基于开源…...
OpenClaw性能优化:降低GLM-4.7-Flash任务Token消耗的5个技巧
OpenClaw性能优化:降低GLM-4.7-Flash任务Token消耗的5个技巧 1. 为什么需要关注Token消耗 当我第一次在本地部署OpenClaw并接入GLM-4.7-Flash模型时,最让我震惊的不是它的自动化能力,而是执行简单任务后查看账单时的Token消耗数字。一个看似…...
