当前位置: 首页 > news >正文

【Spark】Spark的DataFrame向Impala写入数据异常及源码解析

背景

事情是这样的,当前业务有一个场景: 从业务库的Mysql抽取数据到Hive
由于运行环境的网络限制,当前选择的方案:
使用spark抽取业务库的数据表,然后利用impala jdbc数据灌输到hive。(没有spark on hive 的条件)

问题

结果就出现问题了:
报错信息如下:

java.sql.SQLFeatureNotSupportedException: [Cloudera][JDBC](10220) Driver does not support this optional feature.at com.cloudera.impala.exceptions.ExceptionConverter.toSQLException(Unknown Source)at com.cloudera.impala.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)at com.cloudera.impala.jdbc.common.SPreparedStatement.setNull(Unknown Source)at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:658)at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)at org.apache.spark.scheduler.Task.run(Task.scala:121)at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
23/03/04 23:24:51 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.sql.SQLFeatureNotSupportedException: [Cloudera][JDBC](10220) Driver does not support this optional feature.at com.cloudera.impala.exceptions.ExceptionConverter.toSQLException(Unknown Source)at com.cloudera.impala.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)at com.cloudera.impala.jdbc.common.SPreparedStatement.setNull(Unknown Source)at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:658)at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)at org.apache.spark.scheduler.Task.run(Task.scala:121)at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

问题溯源

sparkmysql中读出来的数据中,存在字段有string的类型。
这个类型在使用DataFrame.write.jdbc()通过impala jdbcHive中写数据的时候,如果没有创建Impalajdbc Dialect的时候,此时这个String的类型,会被转换成
在这里插入图片描述
源自 org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
java.sql.Types.ClOB类型,戳进这个变量。可以看到它代表的值
在这里插入图片描述
接着,我们找到impala jdbccom.cloudera.impala.jdbc.common.SPreparedStatement#checkTypeSupported
方法,发现这个列表里面没有2005所以,程序代码会报错。
在这里插入图片描述
对应的数字编码:
com.cloudera.impala.dsi.dataengine.utilities.TypeUtilities#sqlTypeToString

    public static String sqlTypeToString(short var0) {switch(var0) {case -11:return "SQL_GUID";case -10:return "SQL_WLONGVARCHAR";case -9:return "SQL_WVARCHAR";case -8:return "SQL_WCHAR";case -7:return "SQL_BIT";case -6:return "SQL_TINYINT";case -5:return "SQL_BIGINT";case -4:return "SQL_LONGVARBINARY";case -3:return "SQL_VARBINARY";case -2:return "SQL_BINARY";case -1:return "SQL_LONGVARCHAR";case 0:return "NULL";case 1:return "SQL_CHAR";case 2:return "SQL_NUMERIC";case 3:return "SQL_DECIMAL";case 4:return "SQL_INTEGER";case 5:return "SQL_SMALLINT";case 6:return "SQL_FLOAT";case 7:return "SQL_REAL";case 8:return "SQL_DOUBLE";case 12:return "SQL_VARCHAR";case 16:return "SQL_BOOLEAN";case 91:return "SQL_TYPE_DATE";case 92:return "SQL_TYPE_TIME";case 93:return "SQL_TYPE_TIMESTAMP";case 101:return "SQL_INTERVAL_YEAR";case 102:return "SQL_INTERVAL_MONTH";case 103:return "SQL_INTERVAL_DAY";case 104:return "SQL_INTERVAL_HOUR";case 105:return "SQL_INTERVAL_MINUTE";case 106:return "SQL_INTERVAL_SECOND";case 107:return "SQL_INTERVAL_YEAR_TO_MONTH";case 108:return "SQL_INTERVAL_DAY_TO_HOUR";case 109:return "SQL_INTERVAL_DAY_TO_MINUTE";case 110:return "SQL_INTERVAL_DAY_TO_SECOND";case 111:return "SQL_INTERVAL_HOUR_TO_MINUTE";case 112:return "SQL_INTERVAL_HOUR_TO_SECOND";case 113:return "SQL_INTERVAL_MINUTE_TO_SECOND";case 2003:return "SQL_ARRAY";default:return null;}}

解决

我们在代码中添加一个这样的类:

import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StringType;
import scala.Option;import java.sql.Types;/*** @author wmh* @date 2021/1/12* impala的sql的方言,为了使impala sql能在spark中正确的执行*/
public class ImpalaDialect extends JdbcDialect {@Overridepublic boolean canHandle(String url) {return url.startsWith("jdbc:impala") || url.contains("impala");}@Overridepublic String quoteIdentifier(String colName) {return "`" + colName + "`";}@Overridepublic Option<DataType> getCatalystType(int sqlType, String typeName, int size, MetadataBuilder md) {return super.getCatalystType(sqlType, typeName, size, md);}@Overridepublic Option<JdbcType> getJDBCType(DataType dt) {if (dt instanceof StringType) {return Option.apply(new JdbcType("String", Types.VARCHAR));}return super.getJDBCType(dt);}
}

会出现这个问题:
在这里插入图片描述

	at com.cloudera.impala.hivecommon.api.HS2Client.executeStatementInternal(Unknown Source)at com.cloudera.impala.hivecommon.api.HS2Client.executeStatement(Unknown Source)at com.cloudera.impala.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelper(Unknown Source)at com.cloudera.impala.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.execute(Unknown Source)at com.cloudera.impala.jdbc.common.SPreparedStatement.executePreparedAnyBatch(Unknown Source)at com.cloudera.impala.jdbc.common.SPreparedStatement.executeBatch(Unknown Source)at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)at org.apache.spark.scheduler.Task.run(Task.scala:121)at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Caused by: com.cloudera.impala.support.exceptions.GeneralException: [Cloudera][ImpalaJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: TStatus(statusCode:ERROR_STATUS, sqlState:HY000, errorMessage:AnalysisException: Char size must be > 0: 0

上述问题解释一下:
注意最后一句:errorMessage:AnalysisException: Char size must be > 0: 0
是因为在DataFrame里面存在’'没有长度的空字符串,这样的空字符串会导致如上报错
因为在spark构建insert into xx table values(cast('' as char(0)) ,因为这个char(0)的数字不能等于0,所以会出现如上错误。所以字符串中不能为
‘’
源代码路径:impalajdbc41/2.6.4/impalajdbc41-2.6.4.jar!/com/cloudera/impala/impala/querytranslation/ImpalaInsertQueryGenerator.class
在这里插入图片描述
那么针对这个问题,我们要在impala的jdbc的参数上面加上一个UseNativeQuery=1, 即可解决该问题。
这个UseNativeQuery=1参数含义是:
在这里插入图片描述
上图来自impala jdbc的官方文档
我这里来翻译一下:
此属性指定驱动程序是否转换应用程序发出的查询。
1:驱动程序不会转换应用程序发出的查询,直接使用sql查询。
0:驱动程序将应用程序发出的查询转换为Impala SQL中的等效形式。

也就是说,如果查询sql本来就是impala查询sql,那么就不用进行转换了。

总结

如果有什么更好的方法,请在下方评论区留言,谢谢大哥们了!

相关文章:

【Spark】Spark的DataFrame向Impala写入数据异常及源码解析

背景 事情是这样的&#xff0c;当前业务有一个场景: 从业务库的Mysql抽取数据到Hive 由于运行环境的网络限制&#xff0c;当前选择的方案&#xff1a; 使用spark抽取业务库的数据表&#xff0c;然后利用impala jdbc数据灌输到hive。&#xff08;没有spark on hive 的条件&…...

学习笔记-架构的演进之限流-3月day03

文章目录前言限流的目标流量统计指标限流设计模式流量计数器模式滑动时间窗模式漏桶模式令牌桶模式分布式限流总结附前言 任何一个系统的运算、存储、网络资源都不是无限的&#xff0c;当系统资源不足以支撑外部超过预期的突发流量时&#xff0c;就应该要有取舍&#xff0c;建…...

动态规划 背包问题

动态规划 背包问题 问题描述&#xff1a; 有一个背包&#xff0c;总容量为12。有6件物品&#xff0c;每件物品的重量和价值不同&#xff0c;求在背包总容量12的前提下&#xff0c;装进物品的最大价值以及装进物品的编号 单个物品重量和价值&#xff1a; 为方便进行思考&#…...

C++ Primer Plus 学习笔记(四)—— 内存模型和名称空间

1 单独编译 C允许将组件函数放在独立的文件即头文件中&#xff0c;头文件中可以包含以下内容&#xff1a; 函数原型&#xff1b;使用#define或const定义的符号常量&#xff1b;结构声明&#xff1b;类声明&#xff1b;模板声明&#xff1b;内联函数。 注意&#xff0c;在包含…...

详解基于 Celestia、Eclipse 构建的首个Layer3 链 Nautilus Chain

以流支付为主要概念的Zebec生态&#xff0c;正在推动流支付这种新兴的支付方式向更远的方向发展&#xff0c;该生态最初以Zebec Protocol的形态发展&#xff0c;并从初期的Solana进一步拓展至BNB Chian以及Near上。与此同时&#xff0c;Zebec生态也在积极的寻求从协议形态向公链…...

列表与数组的转化

目录用np.array(a)将列表转换为数组列表转数组的特殊情况(一)列表转数组的特殊情况(二)针对子元素个数不一致的解决办法用a.tolist()函数将数组转化为列表在python的学习中&#xff0c;经常会用到数组与列表的相互转化&#xff0c;本文主要介绍下关于数组与列表转化的问题。用n…...

docker 运行花生壳实现内外网穿透

环境&#xff1a;centos 7 ,64位 1、创建一个指定的文件夹作为安装示例所用&#xff0c;该示例文件夹为“hsk-nwct”。“hsk-nwct”内创建“app”文件夹作为docker容器挂载出来的文件。 2、在“app”内下载花生壳linux安装包&#xff0c;下载花生壳应用&#xff1a;花生壳客户…...

操作系统——16.时间片轮转、优先级、多级反馈队列算法

这篇文章我们来看一下进程调度算法中的时间片轮转、优先级、多级反馈队列算法 目录 1.概述 2.时间片轮转调度算法&#xff08;RR&#xff0c;Round-Robin&#xff09; 3.优先级调度算法 4.多级反馈队列调度算法 5.分析对比 1.概述 首先&#xff0c;我们来看一下这篇文章…...

Python3.8.8-Django3.2-Redis-连接池-数据类型-字符串-list-hashmap-命令行操作

文章目录1.认识Redis1.1.优点1.2.缺点2.在Django中Redis的连接3.Redis的基础用法3.1.hashmap结构3.2.list结构4.命令行查看数据库5.作者答疑1.认识Redis Remote DIctionary Server(Redis) 是一个key-value 存储系统&#xff0c;是跨平台的非关系型数据库。是一个开源的使用 AN…...

Android kotlin 系列讲解(进阶篇)高级项目架构模式 - MVVM

<<返回总目录 1、MVVM是什么 MVVM是Model-View-ViewModel的缩写&#xff0c;是一种高级项目架构模式。 MVVM架构可以将程序结构主要分成三个部分&#xff1a; Model&#xff1a;数据模型部分&#xff0c;包括从服务端获取的json数据或者从本地获取的数据等等View&…...

8. 查找

1 题目描述 查找成绩10开启时间2021年09月24日 星期五 18:00折扣0.8折扣时间2021年11月15日 星期一 00:00允许迟交否关闭时间2021年11月23日 星期二 00:00 输入 n(n ≤ 10^6)个不超过 10^9的单调不减的&#xff08;就是后面的数字不小于前面的数字&#xff09;非负整数 &#…...

二分查找算法

感谢“五点七边”工作室的算法讲解&#xff0c;详细内容可以参考视频讲解 二分查找为什么总是写错&#xff1f;_哔哩哔哩_bilibili 此处仅是个人学习总结 以target等于5为例&#xff0c;输入: 1 2 3 5 5 5 8 9 1. 找到第一个 > target 的元素 判断条件 < target&am…...

Git(3)之远程服务器

Git基础之远程服务器 Author&#xff1a;onceday date&#xff1a;2023年3月5日 满满长路有人对你微笑过嘛… windows安装可参考文章&#xff1a;git简易配置_onceday_CSDN博客 參考文档&#xff1a; 《progit2.pdf》&#xff0c;Progit2 Github。《git-book.pdf》 文章目…...

Javalin解构

Javalin Javalin是一个轻量级http框架&#xff0c;我们可以很容易的了解请求的处理过程及其设计&#xff0c;具有较高的学习意义。 从demo说起 public static void main(String[] args) {Javalin app Javalin.create(config -> {System.out.println("用户配置"…...

yolov5算法,训练模型,模型检测

嘟嘟嘟嘟&#xff01;工作需要&#xff0c;所以学习了下yolov5算法。是干什么的呢&#xff1f; 通俗来说&#xff0c;可以将它看做是一个小孩儿&#xff0c;通过成年人&#xff08;开发人员&#xff09;提供的大量图片的学习&#xff0c;让自己知道我看到的哪些场景需要提醒给成…...

linux系统防火墙开放端口

linux系统防火墙开放端口 在外部访问CentOS中部署应用时&#xff0c;需要通过防火墙管理软件,开端口,或者直接关闭防火墙进行解决(不建议) 加粗样式 常用命令&#xff1a; systemctl start firewalld #启动 systemctl stop firewalld #停止 systemctl status firewalld #查看…...

CSAPP第九章 虚拟内存

理解虚拟内存的原因 本章前部分描述虚拟内存是如何工作的&#xff0c;后一部分描述应用程序如何使用和管理虚拟内存 物理和虚拟寻址 虚拟内存作为缓存的工具 页表 页命中 缺页 虚拟内存作为内存管理的工具 简化链接&#xff0c;简化加载&#xff0c;简化共享&#xff0c;简化…...

numpy数组与矩阵运算(二)

文章目录矩阵生成与常用操作矩阵生成矩阵转置查看矩阵特性矩阵乘法计算相关系数矩阵计算方差、协方差、标准差计算特征值与特征向量计算逆矩阵求解线性方程组奇异值分解函数向量化矩阵生成与常用操作 矩阵生成 扩展库numpy中提供的matrix()函数可以用来把列表、元组、range对…...

Dubbo 中 Zookeeper 注册中心原理分析

Dubbo 中 Zookeeper 注册中心原理分析 文章目录Dubbo 中 Zookeeper 注册中心原理分析一、ZooKeeper注册中心1.1 ZooKeeper数据结构1.2 ZooKeeper的Watcher机制1.3 ZooKeeper会话机制1.4 使用ZooKeeper作为注册中心二、源码分析2.1 AbstractRegistry2.2 FailbackRegistry2.2.1 核…...

素数产生新的算法(由筛法减法改为增加法)--哥德巴赫猜想的第一次实际应用

素数产生新的算法&#xff08;由筛法减法改为增加法&#xff09;--哥德巴赫猜想的第一次实际应用 摘要&#xff1a;长期以来&#xff0c;人们认为哥德巴赫猜想没有什么实际应用的。 现在&#xff0c;我假设这个不是猜想&#xff0c;而是定理或公理&#xff0c;就产生了新的应用…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作&#xff1a; 1&#xff09;、切换集群 2&#xff09;、切换节点 3&#xff09;、切换到 apparmor 的目录 4&#xff09;、执行 apparmor 策略模块 5&#xff09;、修改 pod 文件 6&#xff09;、…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

转转集团旗下首家二手多品类循环仓店“超级转转”开业

6月9日&#xff0c;国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解&#xff0c;“超级…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

SpringCloudGateway 自定义局部过滤器

场景&#xff1a; 将所有请求转化为同一路径请求&#xff08;方便穿网配置&#xff09;在请求头内标识原来路径&#xff0c;然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

算法岗面试经验分享-大模型篇

文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer &#xff08;1&#xff09;资源 论文&a…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

Java编程之桥接模式

定义 桥接模式&#xff08;Bridge Pattern&#xff09;属于结构型设计模式&#xff0c;它的核心意图是将抽象部分与实现部分分离&#xff0c;使它们可以独立地变化。这种模式通过组合关系来替代继承关系&#xff0c;从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...