二百七十一、Kettle——ClickHouse增量导入数据清洗记录表
一、目的
在完成错误数据表任务后,需要对每条错误数据的错误字段及其字段值进行分析

Hive中原有SQL语句和ClickHouse现有SQL语句很大不同
二、Hive中原有代码
2.1 表结构
--31、静态排队数据清洗记录表 create table if not exists hurys_db.dwd_data_clean_record_queue(id string comment '唯一ID',data_type int comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',device_no string comment '设备编号',create_time string comment '创建时间',field_name string comment '字段名',field_value string comment '字段值' ) comment '静态排队数据清洗记录表' partitioned by (day string) stored as orc ;
2.2 SQL代码
with t3 as(
selectid,device_no,case when device_no is null then CONCAT('device_no:','null') END AS device_no_value,create_time,case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END AS lane_no_value,case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING)) END AS queue_len_value,case when queue_head < 0 or queue_head > 500 then CONCAT('queue_head:', CAST(queue_head AS STRING)) END AS queue_head_value,case when queue_tail < 0 or queue_tail > 500 then CONCAT('queue_tail:', CAST(queue_tail AS STRING)) END AS queue_tail_value,case when queue_count < 0 or queue_count > 100 then CONCAT('queue_count:', CAST(queue_count AS STRING)) END AS queue_count_value,concat_ws(',',case when device_no is null then CONCAT('device_no:','null') end ,case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END ,case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING)) END,case when queue_head < 0 or queue_head > 500 then CONCAT('queue_head:', CAST(queue_head AS STRING)) END,case when queue_tail < 0 or queue_tail > 500 then CONCAT('queue_tail:', CAST(queue_tail AS STRING)) END,case when queue_count < 0 or queue_count > 100 then CONCAT('queue_count:', CAST(queue_count AS STRING)) END) AS kv_pairs ,day
from hurys_db.dwd_queue_errorwhere day='2024-09-10'
)
insert overwrite table hurys_db.dwd_data_clean_record_queue partition(day)
selectid,'6' data_type,t3.device_no,create_time,split(pair, ':')[0] AS field_name,split(pair, ':')[1] AS field_value,day
from t3
lateral view explode(split(t3.kv_pairs , ',')) exploded_table AS pair
where device_no_value is not null or queue_len_value is not null or lane_no_value is not null
or queue_head_value is not null or queue_tail_value is not null or queue_count_value is not null
;
三、ClickHouse中现有代码
3.1 表结构
--31、静态排队数据清洗记录表(长期存储)
create table if not exists hurys_jw.dwd_data_clean_record_queue(id String comment '唯一ID',data_type Nullable(Int32) comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',device_no Nullable(String) comment '设备编号',create_time DateTime comment '创建时间',field_name Nullable(String) comment '字段名',field_value Nullable(String) comment '字段值',day Date comment '日期'
)
ENGINE = MergeTree
PARTITION BY day
PRIMARY KEY (day,id)
ORDER BY (day,id)
SETTINGS index_granularity = 8192;
3.2 SQL代码
SELECTid,'6' AS data_type,device_no,create_time,splitByString(':', pair)[1] AS field_name,splitByString(':', pair)[2] AS field_value,day
FROM (SELECTid,device_no,create_time,day,arrayConcat(if(device_no IS NULL, ['device_no:null'], []),if(lane_no < 0 OR lane_no > 255, [concat('lane_no:', toString(lane_no))], []),if(queue_len < 0 OR queue_len > 500, [concat('queue_len:', toString(queue_len))], []),if(queue_head < 0 OR queue_head > 500, [concat('queue_head:', toString(queue_head))], []),if(queue_tail < 0 OR queue_tail > 500, [concat('queue_tail:', toString(queue_tail))], []),if(queue_count < 0 OR queue_count > 100, [concat('queue_count:', toString(queue_count))], [])) AS pairsFROM hurys_jw.dwd_queue_errorWHERE device_no IS NULL ORlane_no < 0 OR lane_no > 255 OR queue_len < 0 OR queue_len > 500 ORqueue_head < 0 OR queue_head > 500 OR queue_tail < 0 OR queue_tail > 500 ORqueue_count < 0 OR queue_count > 100
) AS subquery
array join pairs AS pair
;
注意:1、错误数据表dwd_queue_error的清洗字段不能设置nullable,这是一大坑
2、如果错误数据表中的清洗字段是Decimal(10,1),那么相关字段就要调整
arrayConcat(if(device_no IS NULL, ['device_no:null'], []),if(lane_no < 0 OR lane_no > 255, [concat('lane_no:', toString(lane_no))], []),if(azimuth < 0 OR azimuth > toDecimal32(359.9,1), [concat('azimuth:', toString(azimuth))], []),if(rcs < -64 OR rcs > toDecimal32(63.5,1), [concat('rcs:', toString(rcs))], []),if(prob < 0 OR prob > 100, [concat('prob:', toString(prob))], [])
) AS pairs
3.3 Kettle任务
3.3.1 newtime

3.3.2 替换NULL值

3.3.3 clickhouse输入

3.3.4 字段选择

3.3.5 clickhouse输出

3.3.6 执行任务

3.3.7 海豚调度

由于不需要实时记录,因为把所有数据的清洗记录任务放在一个海豚工作流里面,T+1执行即可!
相关文章:
二百七十一、Kettle——ClickHouse增量导入数据清洗记录表
一、目的 在完成错误数据表任务后,需要对每条错误数据的错误字段及其字段值进行分析 Hive中原有SQL语句和ClickHouse现有SQL语句很大不同 二、Hive中原有代码 2.1 表结构 --31、静态排队数据清洗记录表 create table if not exists hurys_db.dwd_data_clean_…...
为什么说Tcp是面向字节流的以及(Tcp粘包问题、TCP/UDP对比、listen函数的backlog参数的意义)
为什么说Tcp是面向字节流的: Tcp通信的本质是创建一个tcp的socket,同时就会对应的创建一个发送缓冲区和接收缓冲区。 调用write时, 数据会先写入发送缓冲区中;如果发送的字节数太长, 会被拆分成多个TCP的数据包发出如果发送的字节数太短, 就会先在缓冲…...
Flink PostgreSQL CDC源码解读:深入理解数据流同步
目录 一、PostgreSQL的数据捕获和复制机制 二、WAL日志格式 三、Debezium部署架构 3.1 Kafka Connect With Debezium 3.2 Debezium Server 编辑3.3 作为嵌入式引擎 四、Flink Postgres CDC源码解读 4.1. 如何捕捉数据和更新快照 4.2. 捕获的数据怎么从Postgres SQL…...
系统架构设计师 软件架构的定义与生命周期
软件架构的定义 通过一系列的设计活动,以满足系统的功能性需求和符合一定的非功能性需求与质量属性有相似含义的软件系统框架模式。在软件体系结构设计过程中,主要考虑的是系统的非功能性需求 软件体系结构设计经验的总结与重用是软件工程的重要目标之一…...
从零开始使用Surya-OCR最新版本0.6.1——最强文本检测模型:新添表单表格检测识别
目录 一、更新概述 二、环境安装 1.基础环境配置 2.模型参数下载 3.参数地址配置——settings.py 三、指令使用 1.命令指令运行 一、更新概述 surya项目Github地址:https://github.com/VikParuchuri/surya 号称今年最强OCR的surya近期迎来新的更新,Vik…...
linux中级wed服务器(https搭建加密服务器)
一。非对称加密算法: 公钥:公共密钥,开放 私钥:私有密钥,保密 1.发送方用自己的公钥加密,接受方用发送方的私钥解密:不可行 2.发送方用接受方的公钥加密,接受方用自己的私钥解密…...
聊一聊为什么企业数字化转型总是三天热度
听到“数字化转型”,是不是脑子里立马蹦出各种炫酷词汇:AI、大数据、物联网、区块链……瞬间觉得公司马上就要起飞?可惜,现实往往是:转型刚刚起步时大家热血沸腾,结果没过多久一哄而散。最终,这…...
2025年NPDP产品经理认证考试时间和报考条件
在报考2025年NPDP认证考试前,了解NPDP相关考试信息是非常重要的,可以帮助我们更好地制定备考计划,提高学习效率。 NPDP考试时间 NPDP考试每年举办两次,分别在5月和11月进行,且考试一般安排在周末,以便在职的专业人士…...
微信小程序文字转语音播报案例
插件申请 在小程序官方申请同声传译插件,地址: mp.weixin.qq.com 引入插件 在app.json中加入 "plugins": {"WechatSI": {"version": "0.3.6","provider": "wx069ba97219f66d99"}},封装…...
QT SSDP 局域网检测支持扫描通信
一. 什么是SSDP? 简单服务发现协议(SSDP,Simple Service Discovery Protocol)是一种应用层协议,简单服务发现协议是在HTTPU和HTTPMU的基础上实现的协议。简单服务发现协议(SSDP)提供了在局域网里面发现设备的机制。客户端可以通过使用SSDP,根据自己的需要,在局域网查找特…...
python_学习2(仅为本人学习记录)
二、变量与字符串 1、变量的声明和赋值 a.变量在使用前必须要先赋值 b.删除变量,可以通过del语句删除。 a123 del a c.链式赋值 xy123 相当于 x123;y123 d.解包赋值 a,b,c1,2,3 相当于 a1 b2 c3 使用解包赋值给变量交换值:a,b3,4 a,bb,a 2、基本…...
手动将python的flask程序打包成exe在windows上执行
1、安装pyinstaller工具 (venv) PS D:\django\locallibrary> pip install pyinstaller Collecting pyinstallerDownloading pyinstaller-6.11.0-py3-none-win_amd64.whl.metadata (8.4 kB) Requirement already satisfied: setuptools>42.0.0 in d:\django\locallibrary…...
老生常谈,MySQL事务隔离级别
在 MySQL 关系型数据库中,事务隔离级别主要有以下四种: 1)读未提交(READ UNCOMMITTED): 这是最低的隔离级别,在该级别下,一个事务可以看到另一个事务尚未提交的数据修改。这可能会…...
百度翻译以及另外三款翻译工具推荐!!!
在这个全球化的时代,翻译工具已经成为我们生活中不可或缺的一部分。我们需要使用翻译工具来克服语言障碍,无论是出国旅行、商务谈判还是学术研究。那么,市场上有各种各样的翻译工具。有哪些好用的在线翻译软件呢?别担心࿰…...
Redis JSON介绍和命令大全
Redis JSON介绍和命令大全 Redis JSON先说说JSON是什么再说说JSON Path先推荐两个网站JSONPath JAVA clents Redis JSON 安装内存json命令语法命令url命令解释JSON.ARRAPPENDJSON.ARRINDEXJSON.ARRINSERTJSON.ARRLENJSON.ARRPOPJSON.ARRTRIMJSON.CLEARJSON.DEBUG MEMORYJSON.DE…...
yolo自动化项目实例解析(八)自建UI-键鼠录制回放
项目中关于键鼠的操作,不像我们之前自动化那样一步一步去定义的,而是用C写了一个记录键鼠的操作,通过回放的方法来实现的 一、通讯系统 1、创建websocket服务器 首先通过事件循环asyncio 和websockets,创建一个持久化的服务端进程…...
C++ 面向对象知识汇总(超详细)
学习交流:0voice GitHub 1.什么是类? 在C中,类(Class) 是一种用户定义的数据类型,用来描述具有相同特征和行为的一组对象。类是面向对象编程(OOP)的核心概念,它通过将…...
stm32使用SIM900A模块实现MQTT对接远程服务器
SIM900A模块是一种GSM/GPRS无线通信模块,它可以通过SIM卡连接移动通信网络,并通过串口或USB接口与微控制器或计算机进行通信。 SIM900A驱动代码如下: #include "stm32f10x.h" #include "stdio.h" #include "stdlib.h" #include "sim900a…...
MATLAB Simulink (一)直接序列扩频通信系统
MATLAB & Simulink (一)直接序列扩频通信系统 写在前面1 系统原理1.1 扩频通信系统理论基础1.1.1 基本原理1.1.2 扩频通信系统处理增益和干扰容限1.1.3 各种干扰模式下抗干扰性能 1.2 直接序列扩频通信系统理论基础1.2.1 基本原理1.2.2 物理模型 2 方…...
标准数字隔离器主要特性和应用---腾恩科技
在现代电子系统中,不同电路部分之间需要可靠的隔离,尤其是在高压环境或必须保持敏感信号完整性的情况下。一种这样的解决方案是使用标准数字隔离器。这些组件在电路的不同部分之间提供电气隔离,确保安全、降噪和可靠的信号传输。本文深入探讨…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...
渗透实战PortSwigger靶场:lab13存储型DOM XSS详解
进来是需要留言的,先用做简单的 html 标签测试 发现面的</h1>不见了 数据包中找到了一个loadCommentsWithVulnerableEscapeHtml.js 他是把用户输入的<>进行 html 编码,输入的<>当成字符串处理回显到页面中,看来只是把用户输…...
ZYNQ学习记录FPGA(一)ZYNQ简介
一、知识准备 1.一些术语,缩写和概念: 1)ZYNQ全称:ZYNQ7000 All Pgrammable SoC 2)SoC:system on chips(片上系统),对比集成电路的SoB(system on board) 3)ARM:处理器…...
Linux 内存管理调试分析:ftrace、perf、crash 的系统化使用
Linux 内存管理调试分析:ftrace、perf、crash 的系统化使用 Linux 内核内存管理是构成整个内核性能和系统稳定性的基础,但这一子系统结构复杂,常常有设置失败、性能展示不良、OOM 杀进程等问题。要分析这些问题,需要一套工具化、…...
持续交付的进化:从DevOps到AI驱动的IT新动能
文章目录 一、持续交付的本质:从手动到自动的交付飞跃关键特性案例:电商平台的高效部署 二、持续交付的演进:从CI到AI驱动的未来发展历程 中国…...
