当前位置: 首页 > news >正文

大数据之Flink(三)

9.3、转换算子
9.3.1、基本转换算子
9.3.1.1、映射map

一一映射

package transform;import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @Title: MapDemo* @Author lizhe* @Package transform* @Date 2024/5/31 19:55* @description:*/
public class MapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s2", 2L, 2),new WaterSensor("s3", 3L, 3));SingleOutputStreamOperator<Object> map = sensorDataStreamSource.map((v) -> {return v.getId();});map.print();env.execute();}
}
9.3.1.2、过滤

转换操作,对数据流进行过滤,通过布尔条件表达式设置过滤条件,true正常输出,false被过滤掉

package transform;import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @Title: MapDemo* @Author lizhe* @Package transform* @Date 2024/5/31 19:55* @description:* s1数据:一进一出* s2数据:一进二出* s3数据:一进零出*/
public class FilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s2", 2L, 2),new WaterSensor("s3", 3L, 3));SingleOutputStreamOperator<WaterSensor> filter = sensorDataStreamSource.filter((v) -> {return "s1".equals(v.getId());});filter.print();env.execute();}
}
9.3.1.3、扁平映射flatMap

将数据流中的整体拆分成个体使用。消费一个元素可产生多个元素。(一进多出)flatMap为flatten和map的结合,即按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

package transform;import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @Title: MapDemo* @Author lizhe* @Package transform* @Date 2024/5/31 19:55* @description:*/
public class FlatMapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(new WaterSensor("s1", 1L, 11),new WaterSensor("s2", 2L, 22),new WaterSensor("s3", 3L, 3));SingleOutputStreamOperator<String> flatmap = sensorDataStreamSource.flatMap(new FlatMapFunction<WaterSensor, String>() {@Overridepublic void flatMap(WaterSensor value, Collector<String> out) throws Exception {if ("s1".equals(value.getId())) {out.collect(String.valueOf(value.getVc()));} else if ("s2".equals(value.getId())) {out.collect(String.valueOf(value.getTs()));out.collect(String.valueOf(value.getVc()));}}});flatmap.print();env.execute();}
}

map使用的是return来控制一进一出,flatMap使用Collector,可调用多次采集器实现一进多出

9.3.1.4、聚合算子Aggregation

计算结果不仅依赖当前数据,还与之前的数据有关

  1. 按键分区keyby

    DataStream没有直接聚合的API。在flink中要聚合先进行可以不用keyby分区。keyby通过指定key将一条流划分成不同的分区,分区就是并行处理的子任务。

    package aggreagte;import bean.WaterSensor;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;/*** @Title: MapDemo* @Author lizhe* @Package transform* @Date 2024/5/31 19:55* @description:* s1数据:一进一出* s2数据:一进二出* s3数据:一进零出*/
    public class KeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(new WaterSensor("s1", 1L, 11),new WaterSensor("s1",11L,11),new WaterSensor("s2", 2L, 22),new WaterSensor("s3", 3L, 3));/** 按照id分组* 返回一个键控流KeyedStream,keyBy不是算子* keyby分组与分区的关系:* 1)keyby对数据进行分组,保证相同key的数据在同一个分区* 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)* */KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor value) throws Exception {return value.getId();}});keyBy.print();env.execute();}
    }
  2. 简单聚合

    按键分区后可以进行聚合操作,基本的API包括:sum、min、max、minBy、maxBy。

    sum

    package aggreagte;import bean.WaterSensor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SimpleAggDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(new WaterSensor("s1", 1L, 11),new WaterSensor("s1", 11L, 11),new WaterSensor("s2", 2L, 22),new WaterSensor("s3", 3L, 3));/** 按照id分组* 返回一个键控流KeyedStream,keyBy不是算子* keyby分组与分区的关系:* 1)keyby对数据进行分组,保证相同key的数据在同一个分区* 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)* */KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor value) throws Exception {return value.getId();}});//传位置索引适用于tuple类型,不适合pojo类型
    //        SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2);SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc");sum.print();//        SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc");
    //        max.print();//        SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc");
    //        maxBy.print();env.execute();}
    }

    max

    package aggreagte;import bean.WaterSensor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SimpleAggDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 11L, 11),new WaterSensor("s2", 2L, 22),new WaterSensor("s3", 3L, 3));/** 按照id分组* 返回一个键控流KeyedStream,keyBy不是算子* keyby分组与分区的关系:* 1)keyby对数据进行分组,保证相同key的数据在同一个分区* 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)* */KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor value) throws Exception {return value.getId();}});//传位置索引适用于tuple类型,不适合pojo类型
    //        SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2);
    //        SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc");
    //        sum.print();SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc");max.print();//        SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc");
    //        maxBy.print();env.execute();}
    }

    输出结果:

    WaterSensor{id='s1', ts=1, vc=1}
    WaterSensor{id='s1', ts=1, vc=11}
    WaterSensor{id='s2', ts=2, vc=22}
    WaterSensor{id='s3', ts=3, vc=3}
    

    maxby

    package aggreagte;import bean.WaterSensor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SimpleAggDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 11L, 11),new WaterSensor("s2", 2L, 22),new WaterSensor("s3", 3L, 3));/** 按照id分组* 返回一个键控流KeyedStream,keyBy不是算子* keyby分组与分区的关系:* 1)keyby对数据进行分组,保证相同key的数据在同一个分区* 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)* */KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor value) throws Exception {return value.getId();}});//传位置索引适用于tuple类型,不适合pojo类型
    //        SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2);
    //        SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc");
    //        sum.print();//        SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc");
    //        max.print();SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc");maxBy.print();env.execute();}
    }

    输出结果

    WaterSensor{id='s1', ts=1, vc=1}
    WaterSensor{id='s1', ts=11, vc=11}
    WaterSensor{id='s2', ts=2, vc=22}
    WaterSensor{id='s3', ts=3, vc=3}
    

    max与maxby对比(min与minby同理):

    max只会取比较字段的最大值,非比较字段保留第一次的值

    maxby会取比较字段最大值这个对象

  3. 规约函数Reduce

    reduce:两两聚合,每个key第一条数据直接存起来并输出,聚合的结果作为下一次的第一条数据

    package aggreagte;import bean.WaterSensor;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 11L, 11),new WaterSensor("s1", 22L, 22),new WaterSensor("s2", 2L, 22),new WaterSensor("s3", 3L, 3));/** 按照id分组* 返回一个键控流KeyedStream,keyBy不是算子* keyby分组与分区的关系:* 1)keyby对数据进行分组,保证相同key的数据在同一个分区* 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)* */KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor value) throws Exception {return value.getId();}});SingleOutputStreamOperator<WaterSensor> reduce = keyBy.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("value1="+value1);System.out.println("value2="+value2);return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);}});reduce.print();env.execute();}
    }
    
9.3.1.5、自定义函数及分区

自定义函数分为:函数类、匿名函数、富函数类

物理分区即数据进入到多个线程中的哪个线程。常见分区策略:随机分配、轮询分配、重缩放、广播。

轮询(rebalance):一般一个source对应一个kafka的partition,如果partition数据源不均匀(数据倾斜),可通过轮询分配进行负载均衡。

缩放(rescale):实现轮询,局部组队,比rebalance高效。

广播(broadcast):下发到下游所有子任务

9.3.1.6、分流

将一条数据拆分成完全独立的两条或多条流。基于一个DataStream通过筛选条件将符合条件的数据放到对应的流里。
在这里插入图片描述
只要针对同一条流进行多次独立调用filter()方法进行筛选就可以得到拆分之后的流,但是效率较低,所有数据都要过滤多次

package split;import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SplitByFilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.132.101", 7777);dataStreamSource.filter(value -> Integer.parseInt(value)%2==0).print("偶数流");dataStreamSource.filter(value -> Integer.parseInt(value) % 2 == 1).print("奇数流");env.execute();}
}

使用侧输出流实现分流,可实现数据筛选、告警等

  1. 使用process算子
  2. 定义OutputTag对象
  3. 调用ctx.output
  4. 通过主流获取侧输出流
package split;import bean.WaterSensor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.lang.reflect.Type;public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//如果是s1放到侧输出流s1中OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));//如果是s2放到侧输出流s2中OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} );SingleOutputStreamOperator<WaterSensor> process = dataStreamSource.process(new ProcessFunction<WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {String id = value.getId();if (id.equals("s1")) {ctx.output(s1, value);} else if (id.equals("s2")) {ctx.output(s2, value);} else {//其他放到主流中out.collect(value);}}});//打印主输出流process.print("主输出流");//打印侧输出流process.getSideOutput(s1).print("s1侧输出流");process.getSideOutput(s2).print("s2侧输出流");env.execute();}
}
9.3.1.7、合流

1、联合union

最简单的合流操作就是将多条流合到一起,要求流中的数据类型必须相同,合并后新流包括所有流的元素,数据类型不变,一次可以合并多条流。
在这里插入图片描述

package combineDemo;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33, 44, 55);DataStreamSource<String> source3 = env.fromElements("111", "222","333","444","555");DataStream<Integer> union = source1.union(source2).union(source3.map(value -> Integer.parseInt(value)));union.print();env.execute();}
}

2、连接connect
为合并不同数据类型的数据flink提供connect合流操作。connect连接后得到的是ConnectedStream,形式上统一但内部内部各自数据形式不变,彼此之间相互独立。如要得到新的DataStream要使用“同处理”(co-process),如map、flatmap等,各自处理各自的。

connect一次只能连接两条流。

package combineDemo;import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);DataStreamSource<String> source2 = env.fromElements("111", "222","333","444","555");ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return value.toString();}@Overridepublic String map2(String value) throws Exception {return value;}});map.print();env.execute();}
}

ConnectedStreams可以直接调用keyBy()进行按键分区得到的还是一个ConnectedStreams,通过keyBy()将两条流中key相同的数据放到了一起,然后针对来源的流再各自处理。(类似inner join)

package combineDemo;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class ConnectKeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));DataStreamSource<Tuple3<Integer, String,Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1",1),Tuple3.of(1, "aa2",2),Tuple3.of(2, "bb",1),Tuple3.of(3, "cc",1));ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);//多并行度下,要根据关联条件进行keyby,才能保证key相同的数据在一个子任务(线程)里,这样才能匹配上ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> keyBy = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);SingleOutputStreamOperator<String> process = keyBy.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {//每条流定一个hashmap用来存储数据Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();@Overridepublic void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;//先把s1数据存到map中if(!s1Cache.containsKey(id)){ArrayList<Tuple2<Integer, String>> s1Value = new ArrayList<>();s1Value.add(value);s1Cache.put(id, s1Value);}else {s1Cache.get(id).add(value);}if (s2Cache.containsKey(id)){for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("s1"+value+"---------"+"s2"+s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;//先把s1数据存到map中if(!s2Cache.containsKey(id)){ArrayList<Tuple3<Integer, String, Integer>> s2Value = new ArrayList<>();s2Value.add(value);s2Cache.put(id, s2Value);}else {s2Cache.get(id).add(value);}if (s1Cache.containsKey(id)){for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("s1"+s1Element+"---------"+"s2"+value);}}}});process.print();env.execute();}
}
9.4、输出算子sink

将计算结果写到外部存储
在这里插入图片描述
输出到外部系统参考官网。

9.4.1、输出到文件FileSink
package sink;import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//必须开启,否则文件一直是.inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataStreamSource<String> source = env.fromElements("111", "222","333","444","555");FileSink<String> sink = FileSink//官网示例.forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toSeconds(5))
//                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)).withMaxPartSize(1024L).build()).build();//        FileSink<String> fileSink = FileSink
//                //输出行式存储文件
//                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>())
//                //输出文件配置
//                .withOutputFileConfig(
//                        OutputFileConfig.builder()
//                                .withPartPrefix("test")
//                                .withPartSuffix(".log")
//                                .build()
//                )
//                //文件分桶
//                .withBucketAssigner(new DateTimeBucketAssigner<>("yy-MM-dd", ZoneId.systemDefault()))
//                //文件滚动策略
//                .withRollingPolicy(DefaultRollingPolicy.builder()
//                        .withRolloverInterval(5L * 1000L)
//                        .withMaxPartSize(1L * 1024L)
//                        .build()
//                ).build();source.sinkTo(sink);env.execute();}
}
9.4.2、输出到kafka

参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

package sink;import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
import org.apache.kafka.clients.producer.ProducerRecord;import java.nio.charset.StandardCharsets;
import java.util.Properties;public class SinkKafkaDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.132.100:9092,192.168.132.101:9092,192.168.132.102:9092");SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} );KafkaSerializationSchema<WaterSensor> serializationSchema = new KafkaSerializationSchema<WaterSensor>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(WaterSensor s,Long time  ) {return new ProducerRecord<>("test", // target topics.toString().getBytes(StandardCharsets.UTF_8)); // record contents}};dataStreamSource.addSink(new FlinkKafkaProducer<WaterSensor>("test",serializationSchema,properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE));env.execute();}
}![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/c12239189c82417f8d17f9f8312dcf97.png)##### 9.4.3、输出到MySQL参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/```java
package sink;import bean.WaterSensor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;public class SinkMysqlDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} );dataStreamSource.print();SinkFunction<WaterSensor> waterSensorSinkFunction = JdbcSink.sink("insert into ws (id,ts,vc)  values (?, ?, ?)",                       // mandatorynew JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.id);preparedStatement.setLong(2, waterSensor.ts);preparedStatement.setInt(3, waterSensor.vc);}},                  // mandatoryJdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),                  // optionalnew JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/testflink?" +"autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
//                        .withDriverName("org.Mysql.Driver").withUsername("root").withPassword("123").withConnectionCheckTimeoutSeconds(60).build()                  // mandatory);dataStreamSource.addSink(waterSensorSinkFunction);env.execute();}
}

10、时间和窗口

窗口一般是划定的一段时间范围,即时间窗。窗口本事是截取有界数据的一种方式,对这个范围内的数据进行处理。

10.1、窗口分类
  1. 按照驱动类型分:时间窗口(定点发车)、计数窗口(人齐发车)
  2. 按照窗口分配数据的规则分:滚动窗口、滑动窗口、会话窗口、全局窗口
10.2、窗口API概述

按键分区和非按键分区

1、按键分区

按键分区后数据流被key分成多条逻辑流KeyedStream,窗口计算会在多个并行子任务上同时执行。相同key的数据会在一个子任务中,相当于每个key都定义了一组窗口各自独立进行统计计算。

2、非按键分区

原始流dataStreamSource不会分成多条逻辑流,窗口逻辑只能在一个任务上执行,相当于并行度为1。

10.3、窗口分配器

Window Assigners 是构建窗口算子的第一步,用来定义数据被分配到哪个窗口,即指定窗口的类型。一般使用.window()方法,传入Window Assigners参数,返回WindowedStream。非按键分区使用.windowAll(),返回AllWindowedStream.

基于时间:

  • 按键分区滚动窗口,窗口长度2秒

    keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(2)));
    
  • 按键分区滑动时间窗口,窗口长度10s,滑动步长2s

    keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(2)));
    
  • 按键分区会话窗口,窗口长度2s

    keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)));
    

基于计数:

  • 按键分区滚动窗口,窗口长度为5个元素

    keyedStream.countWindow(5);
    
  • 按键分区滑动窗口,窗口长度5个元素,滑动步长2个元素

    keyedStream.countWindow(5,2);
    
10.4、窗口函数

窗口分配器只收集数据,窗口函数Window Function进行计算操作。

各种流的相互关系
在这里插入图片描述

  • 增量聚合:来一条算一条,窗口触发时输出计算结果
  • 全窗口函数:数据来了不计算先存上,等窗口触发时计算并输出结果
10.4.1、增量聚合函数

每来一个数据就聚合一次

1、归约函数ReduceFunction

相同key的第一条数据来的时候不会调用reduce方法,来一条数据就算一条,窗口触发输出计算结果

package window;import bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class WindowAPIDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} );KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());WindowedStream<WaterSensor, String, TimeWindow> windowStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<WaterSensor> reduce = windowStream.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);}});reduce.print();env.execute();}
}

2、聚合函数Aggregate Function

ReduceFunction能解决大多归约聚合问题,但聚合状态类型、输出结果类型和输入数据类型必须一样。Aggregate Function更加灵活,有三种类型:输入IN、累加器ACC、输出OUT。输入IN是输入流中元素的数据类型;累加器类型ACC是聚合中间状态类型;输出OUT是最终计算结果类型。

  • 第一条数据来创建窗口和累加器
  • 增量聚合:来一条算一条(调用一次add方法)
  • 窗口输出调用一次getresult方法
  • 输入、输出、中间累加器的类型可以不一样
package window;import bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} );KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("初始化累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add");return value.vc + accumulator;}@Overridepublic String getResult(Integer accumulator) {System.out.println("输出结果");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {//只有会话窗口才用return null;}});aggregate.print();env.execute();}
}
10.4.2、全窗口函数

1、窗口函数

.apply(),但是该方法能提供的上下文信息比较少,已经被ProcessWindowFunction全覆盖

window.apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void apply(String key, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {}});

2、处理窗口函数

ProcessWindowFunction除了能拿到窗口数据外还能获取上下文对象。上下文包括窗口信息、当前的时间和状态信息(处理时间、事件时间水位线)

窗口触发时才调用一次,统一计算窗口内的所有数据

package window;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} );KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
//        dataStreamSource.windowAll();WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();env.execute();}
}
10.4.3、增量聚合与全窗口函数接合使用

增量聚合Aggregate+全窗口的ProcessWindow

  1. 增量聚合函数处理数据:来一条算一条
  2. 窗口触发时,增量聚合结果(只有一条数据)传给全窗口函数
  3. 经过全窗口函数的处理后输出

从而实现了两者的优点(reduce函数也能传全窗口函数)

  1. 增量聚合:来一条算一条只存储中间计算结果,占用空间少
  2. 全窗口函数:可以通过上下文实现灵活的功能
package window;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} );KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> outputStreamOperator = window.aggregate(new MyAgg(), new MyProcess());outputStreamOperator.print();env.execute();}public static class MyAgg implements  AggregateFunction<WaterSensor, Integer, String>{@Overridepublic Integer createAccumulator() {System.out.println("初始化累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add");return value.vc + accumulator;}@Overridepublic String getResult(Integer accumulator) {System.out.println("输出结果");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {//只有会话窗口才用return null;}}public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<String> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}}
}
10.5、小结

触发器、移除器:现成的几个窗口都有默认的实现,一般不需要定义

以时间滚动窗口为例:

  • 窗口什么时候触发输出:时间进展>=窗口的最大时间戳(end-1ms)

  • 窗口是怎么划分的:start=取窗口长度的整数倍,向下取整,end=start+窗口长度,窗口左闭右开[start,end)

  • 窗口生命周期:

    创建:属于本窗口的第一条数据来的时候现new的,放入一个singleton单例的集合中;

    销毁(关窗):时间进展>=窗口的最大时间戳(end-1ms)+允许迟到时间(默认为0)

相关文章:

大数据之Flink(三)

9.3、转换算子 9.3.1、基本转换算子 9.3.1.1、映射map 一一映射 package transform;import bean.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; impor…...

【HCIA-Datacom】IPv4地址介绍

| | &#x1f449;个人主页&#xff1a;Reuuse 希望各位多多支持&#xff01;❀ | &#x1f449;HCIA专栏博客 | 最后如果对你们有帮助的话希望有一个大大的赞&#xff01; | ⭐你们的支持是我最大的动力&#xff01;⭐ | 目录 IPv4地址定义IPv4地址分类方式二级目录三级目录 I…...

maven父子工程多模块如何管理统一的版本号?

1.为什么要统一管理&#xff1f; maven父子工程多模块&#xff0c;每个模块还都可以独立存在&#xff0c;子模块往往通常希望和父工程保持一样的版本&#xff0c;如果每个工程单独定义版本号&#xff0c;后期变更打包也非常麻烦&#xff0c;如何维护一个全局的版本号呢&#x…...

JavaScript --函数的作用域(全局和局部)

全局作用域 全局作用域&#xff0c;就算不在一个script标签也能调用 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta nam…...

贪吃蛇项目实现(C语言)——附源码

前言 贪吃蛇是一款十分经典的游戏&#xff0c;其通过控制贪吃蛇的上下左右移动来吃食物&#xff0c;延长自己的身体&#xff0c;也会因为撞到墙体和自身而死亡。下面我们通过C语言来实现贪吃蛇。 1.技术要点 C语言枚举&#xff0c;结构体&#xff0c;链表&#xff0c;动态内…...

【C++】42道面试经典问题总结

C this指针是干什么用的&#xff1f; 假如一个类型定义了很多对象&#xff0c;类里面有很多定义的私有成员变量&#xff0c;共享一套成员方法。通过this指针这可以区分方法、变量是操作的哪个对象的。 C的new和delete&#xff0c;new[]和delete[]可以混用吗&#xff1f; 一般来…...

php 实现JWT

在 PHP 中&#xff0c;JSON Web Token (JWT) 是一种开放标准 (RFC 7519) 用于在各方之间作为 JSON 对象安全地传输信息。JWT 通常用于身份验证系统&#xff0c;如 OAuth2 或基于令牌的身份验证。 以下是一个基本的 PHP 实现 JWT 生成和验证的代码示例。 JWT 的组成部分 JWT …...

vue table id一样的列合并

合并场景&#xff1a;如果id一样&#xff0c;则主表列合并&#xff0c;子表列不做合并&#xff0c;可实现单行、多行合并&#xff0c;亲测&#xff01;&#xff01;&#xff01; 展示效果如图示&#xff1a; 组件代码&#xff1a; // table组件 :span-method"objectSpa…...

xshell密钥方式连接阿里云Linux

前提条件 有阿里云ECS linux实例安装好xshell工具 步骤 创建密钥对并绑定ECS实例 浏览器登录阿里云-->控制台-->ECS服务器-->网络与安全-->密钥对-->创建密钥对 根据提示填写密钥名称-->选中默认资源组-->创建 创建完成&#xff0c;会自动下载密钥对的…...

Wni11 下 WSL 安装 CentOS

Wni11 下 WSL 安装 CentOS 方法一、安装包安装下载包安装安装打开 CentOS1. 从 Windows 终端 打开2. 从 PowerShell 打开 方法二、导入 CentOS 的 tar 文件进行安装0. 查看版本&#xff08;可选&#xff09;1. 导出 Docker 容器到 tar 文件2. 将 tar 文件导入 WSL2.1. 导入 tar…...

ROADM(可重构光分插复用器)-介绍

1. 引用 https://zhuanlan.zhihu.com/p/163369296 https://zhuanlan.zhihu.com/p/521352954 https://zhuanlan.zhihu.com/p/91103069 https://zhuanlan.zhihu.com/p/50610236 术语&#xff1a; 英文缩写描述灰光模块彩光模块CWDM&#xff1a;Coarse Wave-Length Division …...

HarmonyOS开发之路由跳转

文章目录 一、路由跳转模式与实例1.router.pushUrl2.router.replaceUrl3.router.back 一、路由跳转模式与实例 跳转模式 有点类似于vue的路由跳转 router.pushUrl 保留路由栈&#xff0c;保留当前的页面&#xff1b;router.replaceUrl 销毁当前页面&#xff0c;跳转一个新的页…...

怎么使用ai 免费生成ppt?这4个工具可以帮忙

随之AI工具的流行&#xff0c;网络上也涌现了一批 AIPPT 工具&#xff0c;可以在办公上帮助我们节省很多制作PPT的时间。通常它们的操作也比较简单&#xff0c;所以适合很多人使用。为了可以帮助大家提高办公效率&#xff0c;我在这里跟大家分享4款可以免费使用的AIPPT制作工具…...

Android主副屏显示-Android13

Android主副屏显示-Android13 1、DisplayDeviceInfo屏幕信息2、每个屏幕对应LogicalDisplay2.1 LogicalDisplay添加对应DisplayContent2.2 configureDisplayLocked刷新 DisplayManagerService启动及主屏添加-Android13 1、DisplayDeviceInfo屏幕信息 DisplayManagerService启动…...

什么是 SMB 服务器以及它如何工作?

在本文中&#xff0c;您将了解 SMB 服务器以及它们如何促进网络文件共享。 我们将介绍它们的基本功能、主要特性以及如何安全地设置它们。无论您是新手还是需要复习&#xff0c;本指南都将帮助您更好地了解 SMB 服务器。 什么是 SMB 服务器&#xff1f; SMB&#xff08;服务器…...

【python计算机视觉编程——10.OpenCV】

python计算机视觉编程——10.OpenCV 10.OpenCV10.2 OpenCV基础知识10.2.1 读取和写入图像10.2.2 颜色空间10.2.3 显示图像及结果 10.3 处理视频10.3.1 视频输入10.3.2 将视频读取到NumPy数组中 10.4 跟踪10.4.1 光流10.4.2 Lucas-Kanade算法使用跟踪器使用发生器 10.5 更多示例…...

医学数据分析实训 项目二 数据预处理预备知识(数据标准化处理,数据离差标准化处理,数据二值化处理,独热编码处理,数据PCA降维处理)

文章目录 数据预处理预备知识任务一 数据标准化处理1. 数据准备2. 数据标准化 任务二 数据离差标准化处理任务三 数据二值化处理任务五 独热编码处理对数据进行“离散化处理”&#xff08;装箱&#xff09;将已经装箱的数据进行OneHotEncoder独热编码 任务六 数据PCA降维处理1.…...

MySQL查询执行(四):查一行也很慢

假设存在表t&#xff0c;这个表有两个字段id和c&#xff0c;并且我在里面插入了10万行记录。 -- 创建表t CREATE TABLE t (id int(11) NOT NULL,c int(11) DEFAULT NULL,PRIMARY KEY (id) ) ENGINEInnoDB;-- 通过存储过程向t写入10w行数据 delimiter ;; create procedure idat…...

【Obsidian】当笔记接入AI,Copilot插件推荐

当笔记接入AI&#xff0c;Copilot插件推荐 自己的知识库笔记如果增加AI功能会怎样&#xff1f;AI的回答完全基于你自己的知识库余料&#xff0c;是不是很有趣。在插件库中有Copilot插件这款插件&#xff0c;可以实现这个梦想。 一、什么是Copilot&#xff1f; 我们知道githu…...

Spring Cloud集成Gateaway

Spring Cloud Gateway 是一个基于 Spring 生态的网关框架&#xff0c;用于构建微服务架构中的API网关。它可以对请求进行路由、过滤、限流等操作&#xff0c;是Spring Cloud微服务体系中常用的组件之一。下面介绍 Spring Cloud Gateway 的核心概念、应用场景以及简单的示例。 …...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

HTML 列表、表格、表单

1 列表标签 作用&#xff1a;布局内容排列整齐的区域 列表分类&#xff1a;无序列表、有序列表、定义列表。 例如&#xff1a; 1.1 无序列表 标签&#xff1a;ul 嵌套 li&#xff0c;ul是无序列表&#xff0c;li是列表条目。 注意事项&#xff1a; ul 标签里面只能包裹 li…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)

本期内容并不是很难&#xff0c;相信大家会学的很愉快&#xff0c;当然对于有后端基础的朋友来说&#xff0c;本期内容更加容易了解&#xff0c;当然没有基础的也别担心&#xff0c;本期内容会详细解释有关内容 本期用到的软件&#xff1a;yakit&#xff08;因为经过之前好多期…...

iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈

在日常iOS开发过程中&#xff0c;性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期&#xff0c;开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发&#xff0c;但背后往往隐藏着系统资源调度不当…...

springboot整合VUE之在线教育管理系统简介

可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生&#xff0c;小白用户&#xff0c;想学习知识的 有点基础&#xff0c;想要通过项…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别

【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而&#xff0c;传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案&#xff0c;能够实现大范围覆盖并远程采集数据。尽管具备这些优势&#xf…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配

目录 一、C 内存的基本概念​ 1.1 内存的物理与逻辑结构​ 1.2 C 程序的内存区域划分​ 二、栈内存分配​ 2.1 栈内存的特点​ 2.2 栈内存分配示例​ 三、堆内存分配​ 3.1 new和delete操作符​ 4.2 内存泄漏与悬空指针问题​ 4.3 new和delete的重载​ 四、智能指针…...