当前位置: 首页 > news >正文

Dataset 的一些 Java api 操作

文章目录

      • 一、使用 Java API 和 JavaRDD<Row> 在 Spark SQL 中向数据帧添加新列
      • 二、foreachPartition 遍历 Dataset
      • 三、Dataset 自定义 Partitioner
      • 四、Dataset 重分区并且获取分区数

一、使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列

  在应用 mapPartition 函数后创建一个新的数据框:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;public class Handler implements Serializable {public void handler(Dataset<Row> sourceData) {Dataset<Row> rowDataset = sourceData.where("rowKey = 'abcdefg_123'").selectExpr("split(rowKey, '_')[0] as id","name","time").where("name = '小强'").orderBy(functions.col("id").asc(), functions.col("time").desc());FlatMapFunction<Iterator<Row>,Row> mapPartitonstoTime = rows->{Int count = 0; // 只能在每个分区内自增,不能保证全局自增String startTime = "";String endTime = "";List<Row> mappedRows=new ArrayList<Row>();while(rows.hasNext()){count++;Row next = rows.next();String id = next.getAs("id");if (count == 2) {startTime = next.getAs("time");endTime = next.getAs("time");}Row mappedRow= RowFactory.create(next.getString(0), next.getString(1), next.getString(2), endTime, startTime);mappedRows.add(mappedRow);}return mappedRows.iterator();};JavaRDD<Row> sensorDataDoubleRDD=rowDataset.toJavaRDD().mapPartitions(mapPartitonstoTime);StructType oldSchema=rowDataset.schema();StructType newSchema =oldSchema.add("startTime",DataTypes.StringType,false).add("endTime",DataTypes.StringType,false);System.out.println("The new schema is: ");newSchema.printTreeString();System.out.println("The old schema is: ");oldSchema.printTreeString();Dataset<Row> sensorDataDoubleDF=spark.createDataFrame(sensorDataDoubleRDD, newSchema);sensorDataDoubleDF.show(100, false);}
}

打印结果:

The new schema is: 
root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- time: string (nullable = true)The old schema is: 
root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- time: string (nullable = true)|-- startTime: string (nullable = true)|-- endTime: string (nullable = true)+-----------+---------+----------+----------+----------+
|id         |name     |time      |startTime |endTime   |
+-----------+---------+----------+----------+----------+
|abcdefg_123|xiaoqiang|1693462023|1693462023|1693462023|
|abcdefg_321|xiaoliu  |1693462028|1693462028|1693462028|
+-----------+---------+----------+----------+----------+

参考:
java - 使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列
java.util.Arrays$ArrayList cannot be cast to java.util.Iterator

二、foreachPartition 遍历 Dataset

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;public class Handler implements Serializable {public void handler(Dataset<Row> sourceData) {JavaRDD<Row> dataRDD = rowDataset.toJavaRDD();dataRDD.foreachPartition(new VoidFunction<Iterator<Row>>() {@Overridepublic void call(Iterator<Row> rowIterator) throws Exception {while (rowIterator.hasNext()) {Row next = rowIterator.next();String id = next.getAs("id");if (id.equals("123")) {String startTime = next.getAs("time");// 其他业务逻辑}}}});// 转换为 lambda 表达式dataRDD.foreachPartition((VoidFunction<Iterator<Row>>) rowIterator -> {while (rowIterator.hasNext()) {Row next = rowIterator.next();String id = next.getAs("id");if (id.equals("123")) {String startTime = next.getAs("time");// 其他业务逻辑}}});}
}

三、Dataset 自定义 Partitioner

参考:spark 自定义 partitioner 分区 java 版

import org.apache.commons.collections.CollectionUtils;
import org.apache.spark.Partitioner;
import org.junit.Assert;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** Created by lesly.lai on 2018/7/25.*/
public class CuxGroupPartitioner extends Partitioner {private int partitions;/*** map<key, partitionIndex>* 主要为了区分不同分区*/private Map<Object, Integer> hashCodePartitionIndexMap = new ConcurrentHashMap<>();public CuxGroupPartitioner(List<Object> groupList) {int size = groupList.size();this.partitions = size;initMap(partitions, groupList);}private void initMap(int size, List<Object> groupList) {Assert.assertTrue(CollectionUtils.isNotEmpty(groupList));for (int i=0; i<size; i++) {hashCodePartitionIndexMap.put(groupList.get(i), i);}}@Overridepublic int numPartitions() {return partitions;}@Overridepublic int getPartition(Object key) {return hashCodePartitionIndexMap.get(key);}public boolean equals(Object obj) {if (obj instanceof CuxGroupPartitioner) {return ((CuxGroupPartitioner) obj).partitions == partitions;}return false;}
}

查看分区分布情况工具类:
(1)Scala:

import org.apache.spark.sql.{Dataset, Row}/*** Created by lesly.lai on 2017/12FeeTask/25.*/
class SparkRddTaskInfo {def getTask(dataSet: Dataset[Row]) {val size = dataSet.rdd.partitions.lengthprintln(s"==> partition size: $size " )import scala.collection.Iteratorval showElements = (it: Iterator[Row]) => {val ns = it.toSeqimport org.apache.spark.TaskContextval pid = TaskContext.get.partitionIdprintln(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")}dataSet.foreachPartition(showElements)}
}

(2)Java:

import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;public class SparkRddTaskInfo {public static void getTask(Dataset<Row> dataSet) {int size = dataSet.rdd().partitions().length;System.out.println("==> partition size:" + size);JavaRDD<Row> dataRDD = dataSet.toJavaRDD();dataRDD.foreachPartition((VoidFunction<Iterator<Row>>) rowIterator -> {List<String> mappedRows = new ArrayList<String>();int count = 0;while (rowIterator.hasNext()) {Row next = rowIterator.next();String id = next.getAs("id");String partitionKey = next.getAs("partition_key");String name = next.getAs("name");mappedRows.add(id + "/" + partitionKey+ "/" + name);}int pid = TaskContext.get().partitionId();System.out.println("[partition: " + pid + "][size: " + mappedRows.size() + "]" + mappedRows);});}
}

调用方式:

import com.vip.spark.db.ConnectionInfos;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;import java.util.List;
import java.util.stream.Collectors;/*** Created by lesly.lai on 2018/7/23.*/
public class SparkSimpleTestPartition {public static void main(String[] args) throws InterruptedException {SparkSession sparkSession = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();// 原始数据集Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties());originSet.selectExpr("split(rowKey, '_')[0] as id","concat(split(rowKey, '_')[0],'_',split(rowKey, '_')[1]) as partition_key","split(rowKey, '_')[1] as name".createOrReplaceTempView("people");// 获取分区分布情况工具类SparkRddTaskInfo taskInfo = new SparkRddTaskInfo();Dataset<Row> groupSet = sparkSession.sql(" select partition_key from people group by partition_key");List<Object> groupList = groupSet.javaRDD().collect().stream().map(row -> row.getAs("partition_key")).collect(Collectors.toList());// 创建pairRDD 目前只有pairRdd支持自定义partitioner,所以需要先转成pairRddJavaPairRDD pairRDD = originSet.javaRDD().mapToPair(row -> {return new Tuple2(row.getAs("partition_key"), row);});// 指定自定义partitionerJavaRDD javaRdd = pairRDD.partitionBy(new CuxGroupPartitioner(groupList)).map(new Function<Tuple2<String, Row>, Row>(){@Overridepublic Row call(Tuple2<String, Row> v1) throws Exception {return v1._2;}});Dataset<Row> result = sparkSession.createDataFrame(javaRdd, originSet.schema());// 打印分区分布情况taskInfo.getTask(result);}
}

四、Dataset 重分区并且获取分区数

        System.out.println("1-->"+rowDataset.rdd().partitions().length);System.out.println("1-->"+rowDataset.rdd().getNumPartitions());Dataset<Row> hehe = rowDataset.coalesce(1);System.out.println("2-->"+hehe.rdd().partitions().length);System.out.println("2-->"+hehe.rdd().getNumPartitions());

运行结果:

1-->29
1-->29
2-->2
2-->2

注意:在使用 repartition() 时两次打印的结果相同:

print(rdd.getNumPartitions())
rdd.repartition(100)
print(rdd.getNumPartitions())

产生上述问题的原因有两个:
  首先 repartition() 是惰性求值操作,需要执行一个 action 操作才可以使其执行。
  其次,repartition() 操作会返回一个新的 rdd,并且新的 rdd 的分区已经修改为新的分区数,因此必须使用返回的 rdd,否则将仍在使用旧的分区。
  修改为:rdd2 = rdd.repartition(100)

参考:repartition() is not affecting RDD partition size

相关文章:

Dataset 的一些 Java api 操作

文章目录 一、使用 Java API 和 JavaRDD<Row> 在 Spark SQL 中向数据帧添加新列二、foreachPartition 遍历 Dataset三、Dataset 自定义 Partitioner四、Dataset 重分区并且获取分区数 一、使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列 在应用 mapPartition…...

Vue + Element UI 前端篇(十一):第三方图标库

Vue Element UI 实现权限管理系统 前端篇&#xff08;十一&#xff09;&#xff1a;第三方图标库 使用第三方图标库 用过Elment的同鞋都知道&#xff0c;Element UI提供的字体图符少之又少&#xff0c;实在是不够用啊&#xff0c;幸好现在有不少丰富的第三方图标库可用&…...

HDFS:Hadoop文件系统(HDFS)

Hadoop文件系统&#xff08;HDFS&#xff09;是一个分布式文件系统&#xff0c;主要用于存储和处理大规模的数据集。HDFS是Apache Hadoop的核心组件之一&#xff0c;能够支持上千个节点的集群&#xff0c;并能够处理PB级别的数据。 HDFS将大文件切割成小的数据块&#xff08;默…...

SpringMvc--综合案例

目录 1.SpringMvc的常用注解 2.参数传递 基础类型&#xff08;String&#xff09; 创建一个paramController类&#xff1a; 创建一个index.jsp 测试结果 复杂方式 ​编辑 测试结果 RequestParam 测试结果 PathVariable 测试结果 RequestBody pom.xml依赖导入 输…...

工业4.0时代生产系统对接集成优势,MES和ERP专业一体化管理-亿发

在现代制造业中&#xff0c;市场变化都在不断加速。企业面临着不断加强生产效率、生产质量和快速适应市场需求的挑战。在制造行业&#xff0c;日常管理中的ERP系统、MES系统就显得尤为重要。越来越多的企业正在采用MES系统和ERP管理系统的融合&#xff0c;以实现智能化生产管理…...

IT运维监控系统和网络运维一样吗

IT运维监控系统和网络运维不是一样的。IT运维监控系统是一系列IT管理产品的统称&#xff0c;它所包含的产品功能强大、易于使用、解决方案齐全&#xff0c;可一站式满足用户的各种IT管理需求。而网络运维是指对网络设备进行监控、维护和管理&#xff0c;包括硬件故障的排除、软…...

c语言flag的使用

flag在c语言中标识某种状态或记录某种信息&#xff0c;可以通过修改flag中来控制程序流程,判断某种状态是否存在或记录某种信息 操作:(1)初始化 (2)赋值 (3)判断 (4)修改 (5)去初始化 #include <stdlib.h>int power_state_check;int main() {int i 0;power_state_check…...

docker push image harbor http 镜像

前言 搭建的 harbor 仓库为 http 协议&#xff0c;在本地登录后&#xff0c;推送镜像发生如下报错&#xff1a; docker push 192.168.xx.xx/test/grafana:v10.1.1 The push refers to repository [192.168.xx.xx/test/grafana] Get "https://192.168.xx.xx/v2/": dia…...

羊城杯2023 部分wp

目录 D0nt pl4y g4m3!!!(php7.4.21源码泄露&pop链构造) Serpent(pickle反序列化&python提权) ArkNights(环境变量泄露) Ez_misc(win10sinpping_tools恢复) D0nt pl4y g4m3!!!(php7.4.21源码泄露&pop链构造) 访问/p0p.php 跳转到了游戏界面 应该是存在302跳转…...

解读Java对Execl读取数据

1.读取execl文件路径,或者打开execl // 初始化文件流FileInputStream in = null;in = new FileInputStream(new File(path));workbook = new XSSFWorkbook(in);sheet = workbook.getSheetAt(0);rows = sheet.getPhysicalNumberOfRows(); 2.读取execl中sheet页数,即获取当前E…...

RHCE——十七、文本搜索工具-grep、正则表达式

RHCE 一、文本搜索工具--grep1、作用2、格式3、参数4、注意5、示例5.1 操作对象文件&#xff1a;/etc/passwd5.2 grep过滤命令示例 二、正则表达式1、概念2、基本正则表达式2.1 常见元字符2.2 POSIX字符类2.3 示例 3、扩展正则表达式3.1 概念3.2 示例 三、作业1、作业一2、作业…...

小程序实现摄像头拍照 + 水印绘制

文章标题 01 功能说明02 使用方式 & 效果图2.1 基础用法2.2 拍照 底部定点水印 预览2.3 拍照 整体背景水印 预览 03 全部代码3.1 页面布局 html3.2 业务核心 js3.3 基础样式 css 01 功能说明 需求&#xff1a;小程序端需要调用前置摄像头进行拍照&#xff0c;并且将拍…...

SpringMVC:从入门到精通,7篇系列篇带你全面掌握--三.使用SpringMVC完成增删改查

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于SpringMVC的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 效果演示 一.导入项目的相关依赖 二.…...

ABAP GN_DELIVERY_CREATE 报错 VL 561

GN_DELIVERY_CREATE 去创建内向交货单的时候。 报错 VL 561 Essential transfer parameters are missing in record 表示一些必输字段没输入 诸如一些&#xff0c;物料号。单位。等一些字段 输入之后即可 DATA: ls_return TYPE bapireturn.DATA: lt_return TYPE STANDARD T…...

AWS-数据库迁移工具DMS-场景:单账号跨区域迁移RDS for Mysql

参考文档&#xff1a; 分为几个环节&#xff1a; 要使用 AWS DMS 迁移至 Amazon RDS 数据库实例&#xff1a; 1.创建复制实例 有坑内存必须8g或者以上&#xff0c;我测试空库 都提示内存不足 2.创建目标和源终端节点 目标空库也得自己创建哈 3.刷新源终端节点架构 4.创建迁…...

【漏洞复现】E-office文件包含漏洞

漏洞描述 Weaver E-Office是中国泛微科技(Weaver)公司的一个协同办公系统。泛微 E-Office 是一款标准化的协同 OA 办公软件,实行通用化产品设计,充分贴合企业管理需求,本着简洁易用、高效智能的原则,为企业快速打造移动化、无纸化、数字化的办公平台。 该漏洞是由于存在…...

Linux 系统常用命令总结

目录 提示一、文件和目录操作二、文件查看和编辑三、文件权限管理四、文件压缩和解压缩五、查找文件六、系统信息和状态七、用户和权限管理八、网络相关操作九、包管理十、进程管理十一、时间和日期十二、系统关机和重启十三、文件传输十四、其他常用命令 提示 [ ]&#xff1a…...

【数据结构】树的基础入门

文章目录 什么是树树的常见术语树的表示树的应用 什么是树 相信大家刚学数据结构的时候最先接触的就是顺序表,栈,队列等线性结构. 而树则是一种非线性存储结构,存储的是具有“一对多”关系的数据元素的集合 非线性 体现在它是由n个有限结点(可以是零个结点)组成一个具有层次关…...

【多线程】Thread的常用方法

Thread的常用方法 1.构造器 Thread提供的常见构造器说明public Thread(String name)可以为当前线程指定名称public Thread(Runnable target)封装Runnable对象成为线程对象public Thread(Runnable target,String name)封装Runnable对象成为线程对象&#xff0c;并指定线程名称…...

windows 下docker安装宝塔镜像 宝塔docker获取镜像

1. docker 安装宝塔 打开链接&#xff1a;https://www.docker.com/get-started&#xff0c;找对应的版本下载docker&#xff0c;安装docker打开百度云盘&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1DGIjpKkNDAmy4roaKGLA_w 提取码&#xff1a;u8bi 2. 设置镜像 点…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

企业如何增强终端安全?

在数字化转型加速的今天&#xff0c;企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机&#xff0c;到工厂里的物联网设备、智能传感器&#xff0c;这些终端构成了企业与外部世界连接的 “神经末梢”。然而&#xff0c;随着远程办公的常态化和设备接入的爆炸式…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

C# 表达式和运算符(求值顺序)

求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如&#xff0c;已知表达式3*52&#xff0c;依照子表达式的求值顺序&#xff0c;有两种可能的结果&#xff0c;如图9-3所示。 如果乘法先执行&#xff0c;结果是17。如果5…...

深入浅出Diffusion模型:从原理到实践的全方位教程

I. 引言&#xff1a;生成式AI的黎明 – Diffusion模型是什么&#xff1f; 近年来&#xff0c;生成式人工智能&#xff08;Generative AI&#xff09;领域取得了爆炸性的进展&#xff0c;模型能够根据简单的文本提示创作出逼真的图像、连贯的文本&#xff0c;乃至更多令人惊叹的…...

【C++】纯虚函数类外可以写实现吗?

1. 答案 先说答案&#xff0c;可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...

Python实现简单音频数据压缩与解压算法

Python实现简单音频数据压缩与解压算法 引言 在音频数据处理中&#xff0c;压缩算法是降低存储成本和传输效率的关键技术。Python作为一门灵活且功能强大的编程语言&#xff0c;提供了丰富的库和工具来实现音频数据的压缩与解压。本文将通过一个简单的音频数据压缩与解压算法…...

UE5 音效系统

一.音效管理 音乐一般都是WAV,创建一个背景音乐类SoudClass,一个音效类SoundClass。所有的音乐都分为这两个类。再创建一个总音乐类&#xff0c;将上述两个作为它的子类。 接着我们创建一个音乐混合类SoundMix&#xff0c;将上述三个类翻入其中&#xff0c;通过它管理每个音乐…...

【版本控制】GitHub Desktop 入门教程与开源协作全流程解析

目录 0 引言1 GitHub Desktop 入门教程1.1 安装与基础配置1.2 核心功能使用指南仓库管理日常开发流程分支管理 2 GitHub 开源协作流程详解2.1 Fork & Pull Request 模型2.2 完整协作流程步骤步骤 1: Fork&#xff08;创建个人副本&#xff09;步骤 2: Clone&#xff08;克隆…...

深入浅出JavaScript中的ArrayBuffer:二进制数据的“瑞士军刀”

深入浅出JavaScript中的ArrayBuffer&#xff1a;二进制数据的“瑞士军刀” 在JavaScript中&#xff0c;我们经常需要处理文本、数组、对象等数据类型。但当我们需要处理文件上传、图像处理、网络通信等场景时&#xff0c;单纯依赖字符串或数组就显得力不从心了。这时&#xff…...