当前位置: 首页 > news >正文

Spark的转换算子和操作算子

Transformation转换算子

1.1 Value类型

1)创建包名:com.shangjack.value

1.1.1 map()映射

参数f是一个函数可以写作匿名子类,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。

1)具体实现

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

public class Test01_Map {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<String> lineRDD = sc.textFile("input/1.txt");

        // 需求:每行结尾拼接||

        // 两种写法  lambda表达式写法(匿名函数) 

        JavaRDD<String> mapRDD = lineRDD.map(s -> s + "||");

        // 匿名函数写法 

        JavaRDD<String> mapRDD1 = lineRDD.map(new Function<String, String>() {

            @Override

            public String call(String v1) throws Exception {

                return v1 + "||";

            }

        });

        for (String s : mapRDD.collect()) {

            System.out.println(s);

        }

        // 输出数据的函数写法

        mapRDD1.collect().forEach(a -> System.out.println(a));

        mapRDD1.collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.1.2 flatMap()扁平化

1)功能说明

与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。

区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

2)需求说明:创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中。

4)具体实现:

package com.shangjack.value;

import org.apache.commons.collections.ListUtils;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Iterator;

import java.util.List;

public class Test02_FlatMap {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        ArrayList<List<String>> arrayLists = new ArrayList<>();

        arrayLists.add(Arrays.asList("1","2","3"));

        arrayLists.add(Arrays.asList("4","5","6"));

        JavaRDD<List<String>> listJavaRDD = sc.parallelize(arrayLists,2);

        // 对于集合嵌套的RDD 可以将元素打散

        // 泛型为打散之后的元素类型

        JavaRDD<String> stringJavaRDD = listJavaRDD.flatMap(new FlatMapFunction<List<String>, String>() {

            @Override

            public Iterator<String> call(List<String> strings) throws Exception {

                return strings.iterator();

            }

        });

        stringJavaRDD. collect().forEach(System.out::println);

        // 通常情况下需要自己将元素转换为集合

        JavaRDD<String> lineRDD = sc.textFile("input/2.txt");

        JavaRDD<String> stringJavaRDD1 = lineRDD.flatMap(new FlatMapFunction<String, String>() {

            @Override

            public Iterator<String> call(String s) throws Exception {

                String[] s1 = s.split(" ");

                return Arrays.asList(s1).iterator();

            }

        });

        stringJavaRDD1. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.1.3 groupBy()分组

1)功能说明:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

2)需求说明:创建一个RDD,按照元素模以2的值进行分组。

3)具体实现

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

public class Test03_GroupBy {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        // 泛型为分组标记的类型

        JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = integerJavaRDD.groupBy(new Function<Integer, Integer>() {

            @Override

            public Integer call(Integer v1) throws Exception {

                return v1 % 2;

            }

        });

        groupByRDD.collect().forEach(System.out::println);

        // 类型可以任意修改

        JavaPairRDD<Boolean, Iterable<Integer>> groupByRDD1 = integerJavaRDD.groupBy(new Function<Integer, Boolean>() {

            @Override

            public Boolean call(Integer v1) throws Exception {

                return v1 % 2 == 0;

            }

        });

        groupByRDD1. collect().forEach(System.out::println);

Thread.sleep(600000);

        // 4. 关闭sc

        sc.stop();

    }

}

  • groupBy会存在shuffle过程
  • shuffle:将不同的分区数据进行打乱重组的过程
  • shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。

1.1.4 filter()过滤

1)功能说明

接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。

2)需求说明:创建一个RDD,过滤出对2取余等于0的数据

3)代码实现

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

public class Test04_Filter {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        JavaRDD<Integer> filterRDD = integerJavaRDD.filter(new Function<Integer, Boolean>() {

            @Override

            public Boolean call(Integer v1) throws Exception {

                return v1 % 2 == 0;

            }

        });

        filterRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.1.5 distinct()去重

1)功能说明:对内部的元素去重,并将去重后的元素放到新的RDD中。

2)代码实现

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

public class Test05_Distinct {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);

        // 底层使用分布式分组去重  所有速度比较慢,但是不会OOM

        JavaRDD<Integer> distinct = integerJavaRDD.distinct();

        distinct. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

注意:distinct会存在shuffle过程

1.1.6 sortBy()排序

1)功能说明

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。

2)需求说明:创建一个RDD,按照数字大小分别实现正序和倒序排序

3)代码实现:

package com.shangjack.value;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

public class Test6_SortBy {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(5, 8, 1, 11, 20), 2);

        // (1)泛型为以谁作为标准排序  (2) true为正序  (3) 排序之后的分区个数

        JavaRDD<Integer> sortByRDD = integerJavaRDD.sortBy(new Function<Integer, Integer>() {

            @Override

            public Integer call(Integer v1) throws Exception {

                return v1;

            }

        }, true, 2);

        sortByRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2 Key-Value类型

1)创建包名:com.shangjack.keyvalue

要想使用Key-Value类型的算子首先需要使用特定的方法转换为PairRDD

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.Arrays;

public class Test01_pairRDD{

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        JavaPairRDD<Integer, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {

            @Override

            public Tuple2<Integer, Integer> call(Integer integer) throws Exception {

                return new Tuple2<>(integer, integer);

            }

        });

        pairRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2.1 mapValues()只对V进行操作

1)功能说明:针对于(K,V)形式的类型只对V进行操作

2)需求说明:创建一个pairRDD,并将value添加字符串"|||"

4)代码实现:

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import scala.Tuple2;

import java.util.Arrays;

public class Test02_MapValues {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaPairRDD<String, String> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("k", "v"), new Tuple2<>("k1", "v1"), new Tuple2<>("k2", "v2")));

        // 只修改value 不修改key

        JavaPairRDD<String, String> mapValuesRDD = javaPairRDD.mapValues(new Function<String, String>() {

            @Override

            public String call(String v1) throws Exception {

                return v1 + "|||";

            }

        });

        mapValuesRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2.2 groupByKey()按照K重新分组

1)功能说明

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。

该操作可以指定分区器或者分区数(默认使用HashPartitioner)

2)需求说明:统计单词出现次数

4)代码实现:

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.Arrays;

public class Test03_GroupByKey {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2);

        // 统计单词出现次数

        JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {

            @Override

            public Tuple2<String, Integer> call(String s) throws Exception {

                return new Tuple2<>(s, 1);

            }

        });

        // 聚合相同的key

        JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();

        // 合并值

        JavaPairRDD<String, Integer> result = groupByKeyRDD.mapValues(new Function<Iterable<Integer>, Integer>() {

            @Override

            public Integer call(Iterable<Integer> v1) throws Exception {

                Integer sum = 0;

                for (Integer integer : v1) {

                    sum += integer;

                }

                return sum;

            }

        });

        result. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}}

1.2.3 reduceByKey()按照K聚合V

1)功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。

2)需求说明:统计单词出现次数

3)代码实现:

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.Arrays;

public class Test04_ReduceByKey {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2);

        // 统计单词出现次数

        JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {

            @Override

            public Tuple2<String, Integer> call(String s) throws Exception {

                return new Tuple2<>(s, 1);

            }

        });

        // 聚合相同的key

        JavaPairRDD<String, Integer> result = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override

            public Integer call(Integer v1, Integer v2) throws Exception {

                return v1 + v2;

            }

        });

        result. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2.4 reduceByKey和groupByKey区别

1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。

2)groupByKey:按照key进行分组,直接进行shuffle。

3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.Arrays;

public class Test06_ReduceByKeyAvg {

    public static void main(String[] args) throws InterruptedException {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hi", 96), new Tuple2<>("hi", 97), new Tuple2<>("hello", 95), new Tuple2<>("hello", 195)));

        // ("hi",(96,1))

        JavaPairRDD<String, Tuple2<Integer, Integer>> tuple2JavaPairRDD = javaPairRDD.mapValues(new Function<Integer, Tuple2<Integer, Integer>>() {

            @Override

            public Tuple2<Integer, Integer> call(Integer v1) throws Exception {

                return new Tuple2<>(v1, 1);

            }

        });

        // 聚合RDD

        JavaPairRDD<String, Tuple2<Integer, Integer>> reduceRDD = tuple2JavaPairRDD.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {

            @Override

            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {

                return new Tuple2<>(v1._1 + v2._1, v1._2 + v2._2);

            }

        });

        // 相除

        JavaPairRDD<String, Double> result = reduceRDD.mapValues(new Function<Tuple2<Integer, Integer>, Double>() {

            @Override

            public Double call(Tuple2<Integer, Integer> v1) throws Exception {

                return (new Double(v1._1) / v1._2);

            }

        });

        result. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

1.2.5 sortByKey()按照K进行排序

1)功能说明

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

2)需求说明:创建一个pairRDD,按照key的正序和倒序进行排序

3)代码实现:

package com.shangjack.keyValue;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

import java.util.Arrays;

public class Test05_SortByKey {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaPairRDD<Integer, String> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>(4, "a"), new Tuple2<>(3, "c"), new Tuple2<>(2, "d")));

        // 填写布尔类型选择正序倒序

        JavaPairRDD<Integer, String> pairRDD = javaPairRDD.sortByKey(false);

        pairRDD. collect().forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

2 Action行动算子

行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。

1)创建包名:com.shangjack.action

2.1 collect()以数组的形式返回数据集

1)功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。

注意:所有的数据都会被拉取到Driver端,慎用

2)需求说明:创建一个RDD,并将RDD内容收集到Driver端打印

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

import java.util.List;

public class Test01_Collect {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        List<Integer> collect = integerJavaRDD.collect();

        for (Integer integer : collect) {

            System.out.println(integer);

        }

        // 4. 关闭sc

        sc.stop();

    }

}

2.2 count()返回RDD中元素个数

1)功能说明:返回RDD中元素的个数

3)需求说明:创建一个RDD,统计该RDD的条数

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

public class Test02_Count {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        long count = integerJavaRDD.count();

        System.out.println(count);

        // 4. 关闭sc

        sc.stop();

    }

}

2.3 first()返回RDD中的第一个元素

1)功能说明:返回RDD中的第一个元素

2)需求说明:创建一个RDD,返回该RDD中的第一个元素

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

public class Test03_First {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        Integer first = integerJavaRDD.first();

        System.out.println(first);

        // 4. 关闭sc

        sc.stop();

    }

}

2.4 take()返回由RDD前n个元素组成的数组

1)功能说明:返回一个由RDD的前n个元素组成的数组

2)需求说明:创建一个RDD,取出前两个元素

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

import java.util.List;

public class Test04_Take {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        List<Integer> list = integerJavaRDD.take(3);

        list.forEach(System.out::println);

        // 4. 关闭sc

        sc.stop();

    }

}

2.5 countByKey()统计每种key的个数

1)功能说明:统计每种key的个数

2)需求说明:创建一个PairRDD,统计每种key的个数

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

import java.util.Arrays;

import java.util.Map;

public class Test05_CountByKey {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 8), new Tuple2<>("b", 8), new Tuple2<>("a", 8), new Tuple2<>("d", 8)));

        Map<String, Long> map = pairRDD.countByKey();

        System.out.println(map);

        // 4. 关闭sc

        sc.stop();

    }

}

2.6 save相关算子

1)saveAsTextFile(path)保存成Text文件

功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

2)saveAsObjectFile(path) 序列化成对象保存到文件

功能说明:用于将RDD中的元素序列化成对象,存储到文件中。

3)代码实现

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

import java.util.Arrays;

public class Test06_Save {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

        integerJavaRDD.saveAsTextFile("output");

        integerJavaRDD.saveAsObjectFile("output1");

        // 4. 关闭sc

        sc.stop();

    }

}

2.7 foreach()遍历RDD中每一个元素

2)需求说明:创建一个RDD,对每个元素进行打印

package com.shangjack.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.VoidFunction;

import java.util.Arrays;

public class Test07_Foreach {

    public static void main(String[] args) {

        // 1.创建配置对象

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),4);

        integerJavaRDD.foreach(new VoidFunction<Integer>() {

            @Override

            public void call(Integer integer) throws Exception {

                System.out.println(integer);

            }

        });

        // 4. 关闭sc

        sc.stop();

    }

}

2.8 foreachPartition ()遍历RDD中每一个分区

package com.shangjack.spark.action;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.VoidFunction;

import java.util.Arrays;

import java.util.Iterator;

public class Test08_ForeachPartition {

    public static void main(String[] args) {

        // 1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");

        // 2. 创建sc环境

        JavaSparkContext sc = new JavaSparkContext(conf);

        // 3. 编写代码

        JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);

        // 多线程一起计算   分区间无序  单个分区有序

        parallelize.foreachPartition(new VoidFunction<Iterator<Integer>>() {

            @Override

            public void call(Iterator<Integer> integerIterator) throws Exception {

                // 一次处理一个分区的数据

                while (integerIterator.hasNext()) {

                    Integer next = integerIterator.next();

                    System.out.println(next);

                }

            }

        });

        // 4. 关闭sc

        sc.stop();

    }

}

相关文章:

Spark的转换算子和操作算子

1 Transformation转换算子 1.1 Value类型 1&#xff09;创建包名&#xff1a;com.shangjack.value 1.1.1 map()映射 参数f是一个函数可以写作匿名子类&#xff0c;它可以接收一个参数。当某个RDD执行map方法时&#xff0c;会遍历该RDD中的每一个数据项&#xff0c;并依次应用f函…...

传奇手游天花板赤月【盛世遮天】【可做底版】服务端+自主授权+详细教程

搭建资源下载地址&#xff1a;传奇手游天花板赤月【盛世遮天】【可做底版】服务端自主授权详细教程-海盗空间...

TP触摸屏调试

此处以MT6739 1g版本敦泰TP为例(kernel 4.19),主要修改点如下: 1. 两个配置文件defconfig: kernel-4.19\arch\arm\configs\k39tv1_bsp_1g_k419_debug_defconfig: kernel-4.19\arch\arm\configs\k39tv1_bsp_1g_k419_defconfig: CONFIG_INPUT_TOUCHSCREEN=y CONFIG_TOUCHSCRE…...

11-13 spring整合web

spring注解 把properties文件中的key注入到属性当中去 xml配置文件拆分 -> import标签 注解开发中 import 实现 搞一个主配置类&#xff0c;其他配置类全部导入进来这个这个主配置类 而且其他配置类不需要 加上configuration注解 之前这个注解用于表示这是一个配置文件 …...

基于C#开发的任天堂 Switch 开源模拟器

今天给大家推荐一款基于C#开发的任天堂 Switch 开源模拟器&#xff0c;可方便开发人员来测试游戏&#xff0c;也用于娱乐。 01 项目简介 Ryujinx 是一个开源的任天堂 Switch 模拟器&#xff0c;可以在 PC 上模拟运行 Switch 游戏。采用C#开发&#xff0c;基于 .NET Core技术框…...

做一个Sprngboot文件上传-阿里云

概述 这个模块是用来上传头像以及文章封面的&#xff0c;图片的值是一个地址字符串&#xff0c;一般存放在本地或阿里云服务中 1、本地文件上传 我们将文件保存在一个本地的文件夹下&#xff0c;由于可能两个人上传不同图片但是却同名的图片&#xff0c;那么就会一个人的图片就…...

k8s ----对外暴露

目录 一、Ingress 简介 1、Ingress 组成 2、Ingress 工作原理 二、部署Ingress 1、部署 nginx-ingress-controller 2、暴露ingress 4.1 DaemonSetHostNetworknodeSelector模式的service 4.2 DeploymentNodePort模式的Service 三、Ingress HTTP 代理访问 四、Ingress …...

每日一题(LeetCode)----数组--长度最小的子数组

每日一题(LeetCode)----数组–长度最小的子数组 1.题目&#xff08; 209.长度最小的子数组&#xff09; 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 连续子数组 [numsl, numsl1, ..., numsr-1, numsr] &…...

TCP与UDP

文章目录 TCP与UDP传输层的作用端口号UDPTCPUDP首部的格式TCP首部格式 TCP与UDP TCP/IP中有两个具有代表性的传输层协议&#xff0c;它们分别是TCP和UDP。TCP提供可靠的通信传输&#xff0c;而UDP则常被用于让广播和细节控制交给应用的通信传输。总之&#xff0c;根据通信的具…...

js实现对象数组去重

数组去重&#xff0c;一般会在面试的时候才会碰到&#xff0c;要求手写数组去重方法的代码。如果是被提问到&#xff0c;数组去重的方法有哪些&#xff1f;你能答出其中的10种&#xff0c;面试官很有可能对你刮目相看。 在实际项目中碰到的数组去重&#xff0c;一般都是后台去处…...

2023 极术通讯-安谋科技发布“山海”S20F安全解决方案,持续加码智能汽车“芯”赛道

导读&#xff1a;极术社区推出极术通讯&#xff0c;引入行业媒体和技术社区、咨询机构优质内容&#xff0c;定期分享产业技术趋势与市场应用热点。 芯方向 AMBA向多芯片和CHI C2C进发 Arm的Advanced Microcontroller Bus Architecture&#xff08;AMBA&#xff09;在与生态系…...

GRPC学习

GRPC元数据 在gRPC中&#xff0c;元数据&#xff08;metadata&#xff09;是用于在gRPC请求和响应中传递附加信息的一种机制。元数据是以键值对&#xff08;key-value pairs&#xff09;的形式组织的信息&#xff0c;它可以包含请求的上下文信息、安全凭证、消息传输相关的信息…...

c++ latch 使用详解

c latch 使用详解 std::latch c20 头文件 #include <latch>。作用&#xff1a;提供了一种机制&#xff0c;可以让一个或多个线程等待&#xff0c;直到计数器减为零。注意事项&#xff1a; latch 为向下计数器&#xff0c;即只能减少不能增加或者重置。这也使得其只能单…...

linux 下正确使用cp命令复制目录

linux下复制目录时&#xff0c;cp -r 没有 cp -a 好&#xff1a; 使用cp -r 拷贝数据&#xff0c;拷贝的结果是生成新的时间戳等信息 使用cp -a 相当于将原数据原封不动的拷贝过来&#xff0c;不改变里面的任何信息 指定目录时&#xff0c; 源目录/* 【说明&#xff1a;斜…...

CTF----Web真零基础入门

目录 前置知识导图&#xff1a; ​TCP/IP体系结构&#xff08;IP和端口&#xff09;&#xff1a; IP是什么&#xff1a;是计算机在互联网上的唯一标识&#xff08;坐标&#xff0c;代号&#xff09;&#xff0c;用于在互联网中寻找计算机。 内网&#xff08;局域网&#xf…...

css实现元素四周阴影

前言 首先确定的是需要使用box-shadow这一属性 语法如下&#xff1a; box-shadow: h-shadow v-shadow blur spread color inset; h-shadow&#xff1a;表示水平方向上的阴影偏移量&#xff0c;必须指明&#xff0c;可以是正数、负数、0&#xff0c;如果为正数左方有阴影&…...

《QT从基础到进阶·二十五》界面假死处理

假如有这样一种情况&#xff0c;我们在主线程写了一个死循环&#xff0c;当程序运行到主线程的死循环代码后界面便卡死点了没有反应&#xff0c;这里提供几种方法处理界面假死的情况&#xff0c;保证比如主线程在执行死循环没有退出的时候点击界面不会卡死能继续执行其他功能。…...

卷积神经网络(1)

目录 卷积 1 自定义二维卷积算子 2 自定义带步长和零填充的二维卷积算子 3 实现图像边缘检测 4 自定义卷积层算子和汇聚层算子 4.1 卷积算子 4.2 汇聚层算子 5 学习torch.nn.Conv2d()、torch.nn.MaxPool2d()&#xff1b;torch.nn.avg_pool2d()&#xff0c;简要介绍使用方…...

Mysql中名叫infomaiton_schema的数据库是什么东西?

在 MySQL 中&#xff0c;information_schema 是一个系统数据库&#xff0c;用于存储关于数据库服务器元数据的信息。它并不存储用户数据&#xff0c;而是包含有关数据库、表、列、索引、权限等方面的元数据信息。这些信息可以通过 SQL 查询来获取&#xff0c;用于了解和管理数据…...

Django(复习篇)

项目创建 1. 虚拟环境 python -m venv my_env ​ cd my_env activate/deactivate ​ pip install django ​2. 项目和app创建 cd mypros django-admin startproject Pro1 django-admin startapp app1 ​3. settings配置INSTALLED_APPS【app1"】TEMPLATES【 DIRS: [os.pat…...

提升Python编码效率:ptpython语法高亮与自动补全的终极指南

提升Python编码效率&#xff1a;ptpython语法高亮与自动补全的终极指南 【免费下载链接】ptpython A better Python REPL 项目地址: https://gitcode.com/gh_mirrors/pt/ptpython ptpython是一款功能强大的Python REPL工具&#xff0c;它通过语法高亮、智能自动补全和丰…...

5大理由选择Blueman:Linux蓝牙管理工具的最优解

5大理由选择Blueman&#xff1a;Linux蓝牙管理工具的最优解 【免费下载链接】blueman Blueman is a GTK Bluetooth Manager 项目地址: https://gitcode.com/gh_mirrors/bl/blueman Blueman作为基于GTK框架的Linux蓝牙管理工具&#xff0c;以其深度的桌面环境整合能力、完…...

从‘古董’协议到云存储桥梁:聊聊FTP在现代开发中的那些‘真香’应用场景

从‘古董’协议到云存储桥梁&#xff1a;聊聊FTP在现代开发中的那些‘真香’应用场景 当谈到文件传输协议时&#xff0c;很多人第一反应可能是"这不是上个世纪的技术吗&#xff1f;"。确实&#xff0c;FTP(File Transfer Protocol)诞生于1971年&#xff0c;比大多数程…...

[TI板]MSPM0G3507开发全攻略:从环境搭建到实战应用

1. 环境配置&#xff1a;从零搭建MSPM0G3507开发环境 第一次接触TI的MSPM0G3507开发板时&#xff0c;最头疼的就是环境搭建。我花了整整两天时间踩遍了所有坑&#xff0c;现在把最顺滑的配置流程分享给你。这个芯片支持Keil、IAR和CCS三大主流IDE&#xff0c;但实测下来Keil的兼…...

怎么样辨别生活中遇到的那些理财平台的真假?

怎么样辨别生活中遇到的那些理财平台的真假&#xff1f;凡是声称高息保本的投资理财平台极有可能是黑平台。尝试用手机官方应用商城搜索理财软件&#xff0c;如果是别人通过聊天软件发链接给你安装的&#xff0c;不是正规手机应用商城下载的&#xff0c;且在应用商城无法搜索到…...

哔咔漫画下载器:多线程极速下载完整指南

哔咔漫画下载器&#xff1a;多线程极速下载完整指南 【免费下载链接】picacomic-downloader 哔咔漫画 picacomic pica漫画 bika漫画 PicACG 多线程下载器&#xff0c;带图形界面 带收藏夹&#xff0c;已打包exe 下载速度飞快 项目地址: https://gitcode.com/gh_mirrors/pi/pi…...

3个核心技巧:PS手柄无缝适配PC完全指南

3个核心技巧&#xff1a;PS手柄无缝适配PC完全指南 【免费下载链接】DS4Windows Like those other ds4tools, but sexier 项目地址: https://gitcode.com/gh_mirrors/ds/DS4Windows 对于拥有PS4/PS5手柄的玩家而言&#xff0c;在PC上实现完美适配一直是提升游戏体验的关…...

最近在折腾语音端点检测的时候发现个有意思的方法——频带方差检测。这玩意儿特别适合对付环境噪声,原理简单粗暴但有效。今天咱们就手撕代码看看它怎么玩转语音段定位

基于matlab的频带方差端点检测&#xff0c;噪声频谱中&#xff0c;各频带之间变化很平缓&#xff0c;语音各频带之间变化较激烈。 据此特征&#xff0c;语音和噪声就极易区分。 计算短时频带方差&#xff0c;实质就是计算某一帧信号的各频带能量之间的方差。 这种以短时频带方差…...

【BUUCTF】MISC 弱口令实战:从安装Python库到LSB隐写破解全流程

1. 弱口令与LSB隐写技术入门 第一次接触CTF比赛时&#xff0c;我被各种隐写术搞得晕头转向。特别是遇到需要破解弱口令和LSB隐写的题目时&#xff0c;简直就像在黑暗中摸索。后来经过多次实战&#xff0c;终于总结出一套行之有效的方法。今天我就来分享从安装Python库到最终破解…...

AI专著生成新玩法!掌握这些工具,快速产出高质量专业专著

学术专著写作挑战与 AI 辅助工具介绍 学术专著的根本价值在于其内容的系统性与内部逻辑的完整性&#xff0c;但这往往是写作中的一个重大挑战。相较于期刊论文专注于某个特定问题&#xff0c;学术专著必须构建一个包括绪论、理论框架、核心研究、应用拓展&#xff0c;以及结论…...