Flink SQL 基于Update流出现空值无法过滤问题
问题背景
- 问题描述
基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入,
超过失败次数失败的情况
- 问题报错
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dws_table_name (op_date, school_year, campus_name, school_name, depart_name, total_opfare, ids, update_time) VALUES ('2024-03-11 00:00:00+08', '2023', 'xxxx', 'xxxx学校', 'xxxx小学部', '203333300000', '57', '2024-03-21 09:31:08.47+08') ON DUPLICATE KEY UPDATE school_year=VALUES(school_year), total_opfare=VALUES(total_opfare), ids=VALUES(ids), update_time=VALUES(update_time) was aborted: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint Call getNextException to see other errors in the batch.at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332]... 1 moreCaused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraintat com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332

定位
定位思路
1.方向一:怀疑数据库插入存在数据处理时,造成数据处理出现空值的情况,即数据本身不为空,但是数据插入却出现了空
2.方向二:Flink-SQL在消费kafka数据时存在了空值,故加工的数据计算结果存在空值
定位过程
- 因插入数据库定位比较麻烦,且数据库已经设置该字段为主属性,故出现插入时处理为空值的概率较小。故先从较为简单的Flink SQL查询数据
- 定位方法一,查询该字段为空的记录,待作业执行完成后,未查询到空值对应记录
select select * from table_name where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
- 因考虑到使用Flink-CDC进行变更数据捕获,故对应的update流存在-U,+U,-D,+I记录,因此随着插入记录存在空值被记录进去的情况,故采用view的方式,先将宽表的加工、关联方式创建为view,然后进行空值的过滤。实施如下
create view view_prd as
select a.* ,b.* from a join b on a.id = b.idselect * from view_prd where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
- 通过查询结果,发现存在最后一条记录存在空值的原因,往源头定位,发现该字段之前为空,后面进行更新填充到值出现-U记录,导致数据插入持续失败

原因
- 因为flink-SQL消费的数据时kafka topic,flink以upsert-kafka形式的connector进行写入,故存在changelog 流中数据更新存在-U,+U的记录(按照Key进行区分唯一条记录),value 为空(-U)的记录kafka也,导致出现空值,

解决
通过在DWS宽表创建一层View(如上),在写入DWS宽表的kafka topic之前,现将该字段空值过滤,即可排除空值涉及记录被纳入结果指标计算的范围中
相关文章:
Flink SQL 基于Update流出现空值无法过滤问题
问题背景 问题描述 基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入, 超过失败次数失败的情况问题报错 Caused by: java.sql.BatchUp…...
git-怎样把连续的多个commit合并成一个?
Git怎样把连续的多个commit合并成一个? Git怎样把连续的多个commit合并成一个? 参考URL: https://www.jianshu.com/p/5b4054b5b29e 查看git日志 git log --graph比如下图的commit 历史,想要把bai “Second change” 和 “Third change” 这…...
2024年2月游戏手柄线上电商(京东天猫淘宝)综合热销排行榜
鲸参谋监测的线上电商(京东天猫淘宝)游戏手柄品牌销售数据已出炉!2月游戏手柄销售数据呈现出强劲的增长势头。 根据鲸参谋数据显示,今年2月游戏手柄月销售量累计约43万件,同比去年上涨了78%;销售额累计达1…...
Sass5分钟速通基础语法
前言 近来在项目中使用sass,想着学习一下,但官方写的教程太冗杂,所以就有了本文速通Sass的基础语法 Sass 是 CSS 的一种预编译语言。它提供了 变量(variables)、嵌套规则(nested rules)、 混合(mixins) 等…...
百度蜘蛛池平台在线发外链-原理以及搭建教程
蜘蛛池平台是一款非常实用的SEO优化工具,它可以帮助网站管理员提高网站的排名和流量。百度蜘蛛池原理是基于百度搜索引擎的搜索算法,通过对网页的内容、结构、链接等方面进行分析和评估,从而判断网页的质量和重要性,从而对网页进行…...
Android_ android使用原生蓝牙协议_连接设备以后,给设备发送指令触发数据传输---Android原生开发工作笔记167
之前通过蓝牙连接设备的时候,直接就是连接上蓝牙以后,设备会自动发送数据,有数据的时候,会自动发送,但是,有一个设备就不会,奇怪了很久,设备启动了也连接上了,但是就是没有数据过来. 是因为,这个设备有几种模式是握力球,在设备连接到蓝牙以后,需要,给设备通过蓝牙发送一个指令…...
【Java面试题】操作系统
文章目录 1.进程/线程/协程1.1辨别进程和线程的异同1.2优缺点1.2.1进程1.2.2线程 1.3进程/线程之间通信的方法1.3.1进程之间通信的方法1.3.2线程之间通信的方法 1.4什么是线程上下文切换1.5协程1.5.1协程的定义?1.5.2使用协程的原因?1.5.3协程的优缺点&a…...
SQLite数据库成为内存中数据库(三)
返回:SQLite—系列文章目录 上一篇:SQLite使用的临时文件(二) 下一篇:SQLite中的原子提交(四) SQLite数据库通常存储在单个普通磁盘中文件。但是,在某些情况下,数据库可能…...
多张图片怎么合成一张gif?快来试试这个方法
将多张图片合成一张gif动图是现在常见的图像处理的方式,适合制作一些简单的动态图片。通过使用在线图片合成网站制作的gif动图不仅体积小画面丰富,画质还很清晰。不需要下载任何软件小白也能轻松上手,支持上传jpg、png以及gif格式图片&#x…...
爬取b站音频和视频数据,未合成一个视频
一、首先找到含有音频和视频的url地址 打开一个视频,刷新后,找到这个包,里面有我们所需要的数据 访问这个数据包后,获取字符串数据,用正则提取,再转为json字符串方便提取。 二、获得标题和音频数据后&…...
mysql进阶知识总结
1.存储引擎 1.1MySQL体系结构 1).连接层 最上层是一些客户端和链接服务,包含本地sock通信和大多数基于客户端/服务端工具实现的类似于TCP/IP的通信。主要完成一些类似于连接处理、授权认证、及相关的安全方案。在该层上引入了线程池的概念,为通过认证…...
量化交易入门(二十五)什么是RSI,原理和炒股实操
前面我们了解了KDJ,MACD,MTM三个技术指标,也进行了回测,结果有好有坏,今天我们来学习第四个指标RSI。RSI指标全称是相对强弱指标(Relative Strength Index),是通过比较一段时期内的平均收盘涨数和平均收盘跌数来分析市…...
快速上手Spring Cloud 九:服务间通信与消息队列
快速上手Spring Cloud 一:Spring Cloud 简介 快速上手Spring Cloud 二:核心组件解析 快速上手Spring Cloud 三:API网关深入探索与实战应用 快速上手Spring Cloud 四:微服务治理与安全 快速上手Spring Cloud 五:Spring …...
python——遍历网卡并禁用/启用
一、遍历网卡 注意:只能遍历到启用状态的网卡,如果网卡是禁止状态,则遍历不到!!! import os import time import psutil import loggingdef get_multi_physical_network_card():physical_nic_list []try:…...
初识 51
keil的使用: 具体细节请移步我上一篇博客:创建第一个51文件-CSDN博客 hex -- 汇编语言实现的文件 -- 直接与单片机对接的文件 单片机 -- 一个集成电脑芯片 单片机开发版 -- 基于单片机的集成电路 stc 89 c52RC / RD 系列单片机 命名规则: 89 -- 版本号? C --…...
【回溯与邻里交换】纸牌三角
1.回溯算法 旋转有3种可能,镜像有2种 所以最后次数:counts/3/2 #include<iostream> using namespace std;int num[9]; int counts0; bool bools[9];//默认为false int dfs(int step){if(step9){//索引 if((num[0]num[1]num[2]num[3]num[3]num[4]n…...
微服务(基础篇-004-Feign)
目录 http客户端Feign Feign替代RestTemplate(1) Feign的介绍(1.1) 使用Feign的步骤(1.2) 自定义配置(2) 配置Feign日志的两种方式(2.1) Feign使用优化…...
Linux IRC
目录 入侵框架检测 检测流程图 账号安全 查找账号中的危险信息 查看保存的历史命令 检测异常端口 入侵框架检测 1、系统安全检查(进程、开放端口、连接、日志) 这一块是目前个人该脚本所实现的功能 2、Rootkit 建议使用rootkit专杀工具来检查&#…...
五、Elasticsearch 集成
目录 5.1 Spring Data 框架集成5.1.1 Spring Data 框架介绍5.1.2 Spring Data Elasticsearch 介绍5.1.3 Spring Data Elasticsearch 版本对比5.1.4 集成步骤 5.1 Spring Data 框架集成 5.1.1 Spring Data 框架介绍 Spring Data 是一个用于简化数据库开发的开源框架。其主要目…...
Qt 完成图片的缩放拖动
1. 事件和函数 主要使用事件paintEvent(QPaintEvent *event)和drawTiledPixmap函数实现绘图。 paintEvent事件在改变窗口大小、移动窗口、手动调用update等情形下会被调用。需先了解下绘图该函数的用法。 - QPainter::drawTiledPixmap(int x, int y, int w, int h, const QPi…...
毕业设计:基于SpringBoot+Vue大学生租房平台 (源码)
目录 一、项目背景 二、技术介绍 三、功能介绍 四、代码设计 五、系统实现 一、项目背景 近年来,随着我国高等教育事业的持续发展,在校大学生及刚步入社会的毕业生数量逐年攀升。据统计,2024年全国高校毕业生规模已突破1100万人&#x…...
Nihonga风格AI生成稀缺资源包泄露:含17世纪狩野派笔触扫描集、200+古籍《本朝画史》描述性Prompt语料库、及唯一通过日本文化厅AI伦理审查的商用授权协议范本
更多请点击: https://intelliparadigm.com 第一章:Nihonga风格AI生成资源包的伦理边界与文化权重 文化符号的不可压缩性 Nihonga(日本画)并非仅由矿物颜料、金箔或桑皮纸构成的技术集合,其内嵌着神道自然观、物哀美学…...
Cognize-Agent™空间智能体,98.5%故障预警准确率,终结非计划停机
Cognize-Agent™空间智能体,98.5%故障预警准确率,终结非计划停机工业制造领域,设备非计划停机始终是制约生产效率、拉高运维成本的核心痛点。传统设备运维依赖定期检修、事后抢修,依赖人工巡检与单一数据监测,无法提前…...
【JVM】面试题-有哪些垃圾回收器
【JVM】面试题-有哪些垃圾回收器 在JVM的内存管理中,垃圾收集算法是内存回收的核心逻辑与方法论,而垃圾收集器则是将这套方法论落地实现的具体工具。 不同的垃圾收集器针对JVM堆的不同分代(新生代、老年代)设计,具备不…...
全网没人敢说,关于中小企业AI营销一体机到底是卖硬件还是卖落地闭环的屎盆子,我先扣为敬。
[实话] 干这行十年,我拍着桌子定过一条死规矩。三个不做:不做只卖盒子不管结果的,不做签完合同就消失的,不做让你自己研究三个月才能用的。[实话] 现在的“AI营销一体机”,90%都是在收智商税。我见过太多老板ÿ…...
2026年国民技术数字IC笔试试卷带答案
满分:100分 时间:90分钟 一、单选题(每题3分,共30分) 1. 在静态时序分析(STA)中,建立时间检查的公式为( ) A. Tclk + Tskew ≥ Tck-q + Tlogic + Tsetup B. Tclk - Tskew ≥ Tck-q + Tlogic + Tsetup C. Tclk ≥ Tck-q + Tlogic - Tsetup D. Tlogic ≥ Tsetup + Tho…...
利用Taotoken的API兼容性将现有基于OpenAI的应用快速迁移上线
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 利用Taotoken的API兼容性将现有基于OpenAI的应用快速迁移上线 对于已经投入开发并依赖OpenAI官方API的应用,切换到新的…...
ARM CoreSight DAP-Lite调试架构与双协议切换技术
1. ARM CoreSight DAP-Lite技术架构解析作为ARM调试体系的核心组件,DAP-Lite(Debug Access Port Lite)是嵌入式系统开发中连接调试工具与片上资源的桥梁。我在实际芯片调试中发现,这个仅约2mm面积的IP模块,却能实现传统…...
离线AI教育工具开发实战:模型轻量化、边缘计算与五大应用场景
1. 项目概述:当AI导师走进离线课堂“每个学生都值得拥有一位AI导师”——这个想法听起来很美好,但在全球范围内,一个残酷的现实是:稳定、高速的网络连接并非理所当然。在许多乡村学校、资源匮乏的地区,甚至在城市里信号…...
Android系统开发避坑:为什么你改了config.xml,导航栏还是不显示?
Android系统导航栏显示失效的深度排查指南 当你熬夜修改了config.xml文件,满怀期待地刷入系统,却发现导航栏依然不见踪影——这种挫败感我太熟悉了。导航栏显示问题看似简单,实则涉及Android资源覆盖机制的复杂层级。本文将带你深入AOSP的底层…...
