Flink SQL 基于Update流出现空值无法过滤问题
问题背景
- 问题描述
基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入,
超过失败次数失败的情况
- 问题报错
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dws_table_name (op_date, school_year, campus_name, school_name, depart_name, total_opfare, ids, update_time) VALUES ('2024-03-11 00:00:00+08', '2023', 'xxxx', 'xxxx学校', 'xxxx小学部', '203333300000', '57', '2024-03-21 09:31:08.47+08') ON DUPLICATE KEY UPDATE school_year=VALUES(school_year), total_opfare=VALUES(total_opfare), ids=VALUES(ids), update_time=VALUES(update_time) was aborted: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint Call getNextException to see other errors in the batch.at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332]... 1 moreCaused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraintat com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332
定位
定位思路
1.方向一:怀疑数据库插入存在数据处理时,造成数据处理出现空值的情况,即数据本身不为空,但是数据插入却出现了空
2.方向二:Flink-SQL在消费kafka数据时存在了空值,故加工的数据计算结果存在空值
定位过程
- 因插入数据库定位比较麻烦,且数据库已经设置该字段为主属性,故出现插入时处理为空值的概率较小。故先从较为简单的Flink SQL查询数据
- 定位方法一,查询该字段为空的记录,待作业执行完成后,未查询到空值对应记录
select select * from table_name where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
- 因考虑到使用Flink-CDC进行变更数据捕获,故对应的update流存在-U,+U,-D,+I记录,因此随着插入记录存在空值被记录进去的情况,故采用view的方式,先将宽表的加工、关联方式创建为view,然后进行空值的过滤。实施如下
create view view_prd as
select a.* ,b.* from a join b on a.id = b.idselect * from view_prd where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
- 通过查询结果,发现存在最后一条记录存在空值的原因,往源头定位,发现该字段之前为空,后面进行更新填充到值出现-U记录,导致数据插入持续失败
原因
- 因为flink-SQL消费的数据时kafka topic,flink以upsert-kafka形式的connector进行写入,故存在changelog 流中数据更新存在-U,+U的记录(按照Key进行区分唯一条记录),value 为空(-U)的记录kafka也,导致出现空值,
解决
通过在DWS宽表创建一层View(如上),在写入DWS宽表的kafka topic之前,现将该字段空值过滤,即可排除空值涉及记录被纳入结果指标计算的范围中
相关文章:

Flink SQL 基于Update流出现空值无法过滤问题
问题背景 问题描述 基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入, 超过失败次数失败的情况问题报错 Caused by: java.sql.BatchUp…...

git-怎样把连续的多个commit合并成一个?
Git怎样把连续的多个commit合并成一个? Git怎样把连续的多个commit合并成一个? 参考URL: https://www.jianshu.com/p/5b4054b5b29e 查看git日志 git log --graph比如下图的commit 历史,想要把bai “Second change” 和 “Third change” 这…...

2024年2月游戏手柄线上电商(京东天猫淘宝)综合热销排行榜
鲸参谋监测的线上电商(京东天猫淘宝)游戏手柄品牌销售数据已出炉!2月游戏手柄销售数据呈现出强劲的增长势头。 根据鲸参谋数据显示,今年2月游戏手柄月销售量累计约43万件,同比去年上涨了78%;销售额累计达1…...
Sass5分钟速通基础语法
前言 近来在项目中使用sass,想着学习一下,但官方写的教程太冗杂,所以就有了本文速通Sass的基础语法 Sass 是 CSS 的一种预编译语言。它提供了 变量(variables)、嵌套规则(nested rules)、 混合(mixins) 等…...

百度蜘蛛池平台在线发外链-原理以及搭建教程
蜘蛛池平台是一款非常实用的SEO优化工具,它可以帮助网站管理员提高网站的排名和流量。百度蜘蛛池原理是基于百度搜索引擎的搜索算法,通过对网页的内容、结构、链接等方面进行分析和评估,从而判断网页的质量和重要性,从而对网页进行…...
Android_ android使用原生蓝牙协议_连接设备以后,给设备发送指令触发数据传输---Android原生开发工作笔记167
之前通过蓝牙连接设备的时候,直接就是连接上蓝牙以后,设备会自动发送数据,有数据的时候,会自动发送,但是,有一个设备就不会,奇怪了很久,设备启动了也连接上了,但是就是没有数据过来. 是因为,这个设备有几种模式是握力球,在设备连接到蓝牙以后,需要,给设备通过蓝牙发送一个指令…...

【Java面试题】操作系统
文章目录 1.进程/线程/协程1.1辨别进程和线程的异同1.2优缺点1.2.1进程1.2.2线程 1.3进程/线程之间通信的方法1.3.1进程之间通信的方法1.3.2线程之间通信的方法 1.4什么是线程上下文切换1.5协程1.5.1协程的定义?1.5.2使用协程的原因?1.5.3协程的优缺点&a…...

SQLite数据库成为内存中数据库(三)
返回:SQLite—系列文章目录 上一篇:SQLite使用的临时文件(二) 下一篇:SQLite中的原子提交(四) SQLite数据库通常存储在单个普通磁盘中文件。但是,在某些情况下,数据库可能…...

多张图片怎么合成一张gif?快来试试这个方法
将多张图片合成一张gif动图是现在常见的图像处理的方式,适合制作一些简单的动态图片。通过使用在线图片合成网站制作的gif动图不仅体积小画面丰富,画质还很清晰。不需要下载任何软件小白也能轻松上手,支持上传jpg、png以及gif格式图片&#x…...

爬取b站音频和视频数据,未合成一个视频
一、首先找到含有音频和视频的url地址 打开一个视频,刷新后,找到这个包,里面有我们所需要的数据 访问这个数据包后,获取字符串数据,用正则提取,再转为json字符串方便提取。 二、获得标题和音频数据后&…...

mysql进阶知识总结
1.存储引擎 1.1MySQL体系结构 1).连接层 最上层是一些客户端和链接服务,包含本地sock通信和大多数基于客户端/服务端工具实现的类似于TCP/IP的通信。主要完成一些类似于连接处理、授权认证、及相关的安全方案。在该层上引入了线程池的概念,为通过认证…...

量化交易入门(二十五)什么是RSI,原理和炒股实操
前面我们了解了KDJ,MACD,MTM三个技术指标,也进行了回测,结果有好有坏,今天我们来学习第四个指标RSI。RSI指标全称是相对强弱指标(Relative Strength Index),是通过比较一段时期内的平均收盘涨数和平均收盘跌数来分析市…...

快速上手Spring Cloud 九:服务间通信与消息队列
快速上手Spring Cloud 一:Spring Cloud 简介 快速上手Spring Cloud 二:核心组件解析 快速上手Spring Cloud 三:API网关深入探索与实战应用 快速上手Spring Cloud 四:微服务治理与安全 快速上手Spring Cloud 五:Spring …...
python——遍历网卡并禁用/启用
一、遍历网卡 注意:只能遍历到启用状态的网卡,如果网卡是禁止状态,则遍历不到!!! import os import time import psutil import loggingdef get_multi_physical_network_card():physical_nic_list []try:…...
初识 51
keil的使用: 具体细节请移步我上一篇博客:创建第一个51文件-CSDN博客 hex -- 汇编语言实现的文件 -- 直接与单片机对接的文件 单片机 -- 一个集成电脑芯片 单片机开发版 -- 基于单片机的集成电路 stc 89 c52RC / RD 系列单片机 命名规则: 89 -- 版本号? C --…...

【回溯与邻里交换】纸牌三角
1.回溯算法 旋转有3种可能,镜像有2种 所以最后次数:counts/3/2 #include<iostream> using namespace std;int num[9]; int counts0; bool bools[9];//默认为false int dfs(int step){if(step9){//索引 if((num[0]num[1]num[2]num[3]num[3]num[4]n…...

微服务(基础篇-004-Feign)
目录 http客户端Feign Feign替代RestTemplate(1) Feign的介绍(1.1) 使用Feign的步骤(1.2) 自定义配置(2) 配置Feign日志的两种方式(2.1) Feign使用优化…...

Linux IRC
目录 入侵框架检测 检测流程图 账号安全 查找账号中的危险信息 查看保存的历史命令 检测异常端口 入侵框架检测 1、系统安全检查(进程、开放端口、连接、日志) 这一块是目前个人该脚本所实现的功能 2、Rootkit 建议使用rootkit专杀工具来检查&#…...

五、Elasticsearch 集成
目录 5.1 Spring Data 框架集成5.1.1 Spring Data 框架介绍5.1.2 Spring Data Elasticsearch 介绍5.1.3 Spring Data Elasticsearch 版本对比5.1.4 集成步骤 5.1 Spring Data 框架集成 5.1.1 Spring Data 框架介绍 Spring Data 是一个用于简化数据库开发的开源框架。其主要目…...

Qt 完成图片的缩放拖动
1. 事件和函数 主要使用事件paintEvent(QPaintEvent *event)和drawTiledPixmap函数实现绘图。 paintEvent事件在改变窗口大小、移动窗口、手动调用update等情形下会被调用。需先了解下绘图该函数的用法。 - QPainter::drawTiledPixmap(int x, int y, int w, int h, const QPi…...

IDEA运行Tomcat出现乱码问题解决汇总
最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...

uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...