Flink1.17 DataStream API
目录
一.执行环境(Execution Environment)
1.1 创建执行环境
1.2 执行模式
1.3 触发程序执行
二.源算子(Source)
2.1 从集合中读取数据
2.2 从文件读取数据
2.3 从 RabbitMQ 中读取数据
2.4 从数据生成器读取数据
2.5 Flink支持的数据类型
2.5.1 Flink的类型系统
2.5.2 Flink支持的数据类型
2.5.3 类型提示(Type Hints)
三.转换算子(Transformation)
3.1 基本转换算子(map/ filter/ flatMap)
3.1.1 映射(map)
3.1.2 过滤(filter)
3.1.2 扁平映射(flatMap)
3.2 聚合算子(Aggregation)
3.2.1 按键分区(keyBy)
3.2.2 简单聚合(sum/min/max/minBy/maxBy)
3.2.3 归约聚合(reduce)
3.3 用户自定义函数(UDF)
3.3.1 函数类(Function Classes)
3.3.2 富函数类(Rich Function Classes)
3.4 物理分区算子(Physical Partitioning)
3.4.1 随机分区(shuffle)
3.4.2 轮询分区(Round-Robin)
3.4.3 重缩放分区(rescale)
3.4.4 广播(broadcast)
3.4.5 全局分区(global)
3.4.6 自定义分区(Custom)
3.5 分流
3.5.1 Filter 实现分流
3.5.2 使用侧输出流
3.6 基本合流操作
3.6.1 联合(Union)
3.6.2 连接(Connect)
3.6.2.1 连接流(ConnectedStreams)
3.6.2.2 CoProcessFunction
四.输出算子(Sink)
4.1 连接到外部系统
4.2 输出到文件
4.3 输出到RabbitMQ
4.4 输出到MySQL(JDBC)
4.5 自定义Sink输出
DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:

一.执行环境(Execution Environment)
Flink程序可以在各种上下文环境中运行:既可以可以在本地JVM中执行程序,也可以提交到远程集群上运行。
1.1 创建执行环境
获取的执行环境是StreamExecutionEnvironment类的对象(流处理,批处理已经标记为过时),创建执行环境一般有以下三种方式:
// 创建一个本地执行环境并返回,可传入并行度,默认是本地CPU核心数
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 返回远程集群执行环境,需传入远程IobManager的主机名与端口、及在集群中需运行的Jar包
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("JobManager ip", "JobManager port","提交给JobManager的JAR包");// (推荐)根据当前环境自动选择执行环境,无脑选这个即可
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
当使用 getExecutionEnvironment() 创建环境时,可以传入 org.apache.flink.configuration.Configuration 类来手动指定默认的参数,例如端口等。
Configuration conf = new Configuration();
conf.set(RestOptions.PORT,8088);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
1.2 执行模式
从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API(已过时)。
通过代码指定:
// 流 执行模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 批 执行模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 自动模式,根据数据源是否有界自动选择执行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
提交任务时命令行指定(推荐):
bin/flink run -Dexecution.runtime-mode=BATCH ...
同一套代码/API,既可以指定流处理也可以指定批处理,这就是“流批一体”的其中一个解释。
1.3 触发程序执行
// 程序执行
env.execute();
写完输出(sink)操作并不代表程序已经结束。因为当 main() 方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
默认 main 方法的一个 env.execute() 会触发一个 Flink Job,并且一个 main 方法可以调用多个 env.execute() ,但无意义,因为第一个会阻塞住。可使用 env.executeAsync() 可以异步触发,而且不会产生阻塞。
在application模式下,代码中有多少个 env.executeAsync() ,就会有多少个Job,对应就会有多少个 JboManager。
二.源算子(Source)
Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource<String> stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。
2.1 从集合中读取数据
最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从集合中读取数据DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 10, 99, 53));source.print();env.execute();}
输出结果:
6> 1
8> 99
1> 53
7> 10
2.2 从文件读取数据
在实际场景中,可能要读取、处理日志文件这样的需求,这也是批处理最常见的读取方式。
读取文件,需要添加文件连接器依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>
代码如下:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/words.txt")).build();// !使用官方推荐的新的 Source 架构 => env.fromSource(Source实现类,Watermark,资源名称)env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();env.execute();}
输出结果:
3> hello flink
3> hello world
3> hello java
2.3 从 RabbitMQ 中读取数据
导入相关依赖:
<!--RabbitMQ 连接器-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>
<!--amqp 客户端-->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.1</version>
</dependency>
相关代码:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** 从 RabbitMQ读取数据*/
public class RabbitMQSourceDemo {public static void main(String[] args) throws Exception {// 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 RabbitMQ 连接信息RMQConnectionConfig config = new RMQConnectionConfig.Builder().setHost("RabbitMQ服务器地址").setPort(RabbitMQ端口).setUserName(用户名).setPassword(密码).setVirtualHost(虚拟主机).build();// 添加 RabbitMQ 数据源(Flink 1.17 并不支持使用 env.fromSource 在 RabbitMQ 读取数据!)RMQSource<String> source = new RMQSource<>(config, // 连接配置"test_queue", // 队列名称new SimpleStringSchema()); // 反序列化器// 添加数据源DataStreamSource<String> rabbitMQStream = env.addSource(source);// 打印rabbitMQStream.print();// 执行env.execute("RabbitMQ job");}
}
进入 RabbitMQ Web 页面,在对应的虚拟主机下创建相关的队列,进入队列中,使用 Web 中的 Publish message给队列发送消息:

输出结果:

2.4 从数据生成器读取数据
Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version>
</dependency>
代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Flink 数据生成器*/
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {// 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 数据生成器的四个参数:* 1、GeneratorFunction的map实现。重写返回值* 2、返回的个数 会从0开始依次返回(使用Long.MAX_VALUE可模拟出无界流)* 3、限速,每秒多少个数据* 4、返回值类型*/DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>((GeneratorFunction<Long, String>) num -> "Number:" + num,30,RateLimiterStrategy.perSecond(3),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(),"datagenerator-source").print();// 执行env.execute();}
}
输出:
1> Number:8
8> Number:12
4> Number:27
3> Number:0
2> Number:4
6> Number:24
5> Number:16
7> Number:20
3> Number:1
7> Number:21
5> Number:17
.
.
.
2.5 Flink支持的数据类型
2.5.1 Flink的类型系统
Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
2.5.2 Flink支持的数据类型
对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:
其中包含 Java 基本类型及包装类、数组类型、复合数据类型、辅助类型(List、Map等)、泛型类型(GENERIC)。
符合类型又包括:
- Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
- Scala 样例类及Scala元组:不支持空字段。
- 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
- POJO:Flink自定义的类似于Java bean模式的类。(POJO的类和属性是公有的、有一个无参构造、属性可序列化)
2.5.3 类型提示(Type Hints)
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。例如:
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
可写作:
.map(word -> Tuple2.of(word, 1L))
.returns(new TypeHint<Tuple2<String, Long>>(){})
三.转换算子(Transformation)
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。

3.1 基本转换算子(map/ filter/ flatMap)
准备工作
为了方便练习,这里使用WaterSensor作为数据模型。
| 字段名 | 数据类型 | 说明 |
| id | String | 水位传感器类型 |
| ts | Long | 传感器记录时间戳 |
| vc | Integer | 水位记录 |
代码如下:
public class WaterSensor {public String id;public Long ts;public Integer vc;// 省略getter、setter、构造器、toString
}
3.1.1 映射(map)
与 JDK1.8 中的Stream中的 Map 类似。Map 就是将一个元素映射成另一个元素。基于DataStream调用map()方法就可以进行转换处理。
例子:需要提取 WaterSensor 中的 id 属性:
public class MapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 方法1:实现匿名内部类source.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}}).print();// 方法2:Lambda 表达式source.map(WaterSensor::getId).print();// 方法三:定义 MapFunction 实现类source.map(new MyMapFunction()).print();env.execute();}// 实现 MapFunction , 可以复用static class MyMapFunction implements MapFunction<WaterSensor , String>{@Overridepublic String map(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}}
}
结果输出:
2> id_3
8> id_1
1> id_2
3.1.2 过滤(filter)
与 JDK1.8 中的Stream中的 Fliter类似。对数据流进行过滤,满足条件的元素则会被输出,不满足则被过滤。
例子:过滤掉 WaterSensor 中 id 不为 “id_1” 的元素。
public class FilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 12l, 2),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 过滤数据中 id 不为 id_1 的元素source.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor waterSensor) throws Exception {return "id_1".equals(waterSensor.getId());}}).print();env.execute();}}
结果输出:
3> WaterSensor{id='id_1', ts=1, vc=1}
4> WaterSensor{id='id_1', ts=12, vc=2}
3.1.2 扁平映射(flatMap)
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
例子:如果 id 为 id_1 则输出 vc 属性,如果 id 为 id_2 则输出 ts、vc 属性。
public class FlatmapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 12l, 2),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));/*** 如果 id 为 id_1 则输出 vc 属性* 如果 id 为 id_2 则输出 ts、vc 属性*/source.flatMap(new FlatMapFunction<WaterSensor, String>() {@Overridepublic void flatMap(WaterSensor waterSensor, Collector<String> collector) throws Exception {if("id_1".equals(waterSensor.getId())){// 将 vc 放入采集器collector.collect(waterSensor.getVc().toString());} else if ("id_2".equals(waterSensor.getId())) {// 将 ts、vc 放入采集器collector.collect(waterSensor.getVc().toString());collector.collect(waterSensor.getTs().toString());}}}).print();env.execute();}
}
结果输出:
2> 1
3> 2
4> 2
4> 2
3.2 聚合算子(Aggregation)
计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。
3.2.1 按键分区(keyBy)
在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
- 基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。一个子任务就可以理解为一个分区。
- KeyBy 返回的是 KeyedStream 键控流。
- KeyBy 不是转换算子,只是对数据做重分区,不能设置并行度。
- 分区是通过对 Key 进行 Hash 再对分区数取模来实现的。
例子:以 id 作为 Key 进行分区:
public class KeyByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 12l, 2),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 以 id 为 Key 进行分区source.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}}).print();env.execute();}}
结果输出:
2> WaterSensor{id='id_2', ts=2, vc=2}
3> WaterSensor{id='id_1', ts=1, vc=1}
3> WaterSensor{id='id_1', ts=12, vc=2}
3> WaterSensor{id='id_3', ts=3, vc=3}
3.2.2 简单聚合(sum/min/max/minBy/maxBy)
所有的聚合操作都要基于按键分区的数据流KeyedStream。 Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:
- sum():在输入流上,对指定的字段做叠加求和的操作。
- min():在输入流上,对指定的字段求最小值。
- max():在输入流上,对指定的字段求最大值。
- sumBy()、minBy()、maxBy():功能类似,xxxBy() 会返回包含符合要求的整条数据。而不加 By 只会保留第一次的非比较字段。
例子:
public class AggrDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 12l, 22),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 以 id 为 Key 进行分区KeyedStream<WaterSensor, String> keyBySource = source.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}});keyBySource.sum("vc").print();keyBySource.min("vc").print();keyBySource.max("vc").print();/*** max结果:* 1> WaterSensor{id='id_1', ts=1, vc=1}* 1> WaterSensor{id='id_1', ts=1, vc=22}* 1> WaterSensor{id='id_2', ts=2, vc=2}* 1> WaterSensor{id='id_3', ts=3, vc=3}* ts 还是 第一次的值*/keyBySource.maxBy("vc").print();/*** max结果:* 1> WaterSensor{id='id_1', ts=1, vc=1}* 1> WaterSensor{id='id_1', ts=12, vc=22}* 1> WaterSensor{id='id_2', ts=2, vc=2}* 1> WaterSensor{id='id_3', ts=3, vc=3}* 取当前整列值 */env.execute();}}
3.2.3 归约聚合(reduce)
reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
例子:只保存每个分组中 VC 最大的那条数据
public class ReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 21l, 21),new WaterSensor("id_1", 31l, 31),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 以 id 为 Key 进行分区KeyedStream<WaterSensor, String> sensorKs = source.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}});/*** reduce:* 1.必须在KeyBy后调用* 2.输入类型 = 输出类型* 3.每个分区的第一条数据来的时候不会执行reduce,但是会存起来保存状态,直接输出,,“Flink有状态的体现”* 4.reduce( value1, value2)* a.value1 是上一次的计算结果* b.value2 是当前进入的数据*/SingleOutputStreamOperator<WaterSensor> reduce = sensorKs.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {if(value2.getVc() > value1.getVc()){return new WaterSensor(value2.getId(), value2.getTs(), value2.getVc());}else {return value1;}}});reduce.print();env.execute();}
}
结果输出:
WaterSensor{id='id_1', ts=1, vc=1} // 分组的第一条数据直接返回
WaterSensor{id='id_1', ts=21, vc=21}
WaterSensor{id='id_1', ts=31, vc=31}
WaterSensor{id='id_2', ts=2, vc=2} // 分组的第一条数据直接返回
WaterSensor{id='id_3', ts=3, vc=3} // 分组的第一条数据直接返回
reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上。
3.3 用户自定义函数(UDF)
用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类。
3.3.1 函数类(Function Classes)
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
匿名内部类实现:
source.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor waterSensor) throws Exception {return "id_1".equals(waterSensor.getId());}}).print();
Lambda表达式实现:
source.filter((FilterFunction<WaterSensor>) waterSensor -> "id_1".equals(waterSensor.getId())).print();
实现 XxxFunction 接口:
public class FilterFunctionImpl implements FilterFunction<WaterSensor> {public String id ;public FilterFunctionImpl(String id) {this.id = id;}@Overridepublic boolean filter(WaterSensor waterSensor) throws Exception {return this.id.equals(waterSensor.getId());}
}
3.3.2 富函数类(Rich Function Classes)
“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。
RichXxxFunction 与 XxxFunction 的区别是可以获取到任务运行时的一些上下文信息、环境信息以及对任务生命周期的管理。
典型的生命周期方法有:
- 重写open()方法,每个子任务在启动时,会调用一次。
- 重写close()方法,每个子任务在结束时会调用一次。
- 程序异常退出不会调用 close() 方法。
- 手动取消任务会调用 close() 方法。
在open、close中可以使用 getRuntimeContext() 来获取运行时上下文信息。
public class RichMapFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<Integer, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("open:子任务名称"+getRuntimeContext().getTaskNameWithSubtasks());System.out.println("open:子任务编号"+getRuntimeContext().getIndexOfThisSubtask());super.open(parameters);}@Overridepublic void close() throws Exception {System.out.println("close:子任务名称"+getRuntimeContext().getTaskNameWithSubtasks());System.out.println("close:子任务编号"+getRuntimeContext().getIndexOfThisSubtask());super.close();}@Overridepublic Integer map(Integer value) throws Exception {return value + 1;}});map.print();env.execute();}
}
输出结果:
open:子任务名称Source: Collection Source -> Map -> Sink: Print to Std. Out (1/1)#0
open:子任务编号0
2
3
4
5
close:子任务名称Source: Collection Source -> Map -> Sink: Print to Std. Out (1/1)#0
close:子任务编号0
3.4 物理分区算子(Physical Partitioning)
Flink 为我们提供了7种分区策略和一个用户自定义分区器。常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。
分区算子就是将数据按照某种策略分配到下游算子的子任务分区中。
3.4.1 随机分区(shuffle)
通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务分区中去。
shuffle底层实现采用的是 生成随机数
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("ip", 端口);// 随机分区 random.nextInt(下游算子并行度)source.shuffle().print();env.execute();}
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
1
2
3
4
5输出:
1> 1
2> 2
2> 3
1> 4
1> 5
从控制台输出的左侧子任务编号可以看出子任务分区是随机分配的。
3.4.2 轮询分区(Round-Robin)
通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance采用的是对并行度取模,可以将输入流数据平均分配到下游的并行任务中去。可以解决 数据源数据倾斜 的问题。
// 轮询重分区source.rebalance().print();
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
1
2
3
4
5输出:
1> 1
2> 2
1> 3
2> 4
1> 5
3.4.3 重缩放分区(rescale)
与 rebalance 类似,也是轮询的效果,不过比轮询更加高效。rescale的做法是将数据在固定的几个分区中进行轮询,而不是轮询所有分区。

// 缩放轮询
source.rescale().print();
3.4.4 广播(broadcast)
通过调用DataStream的broadcast()方法,会将数据发送到下游算子的所有并行任务中去。慎用!
// 广播source.broadcast().print();
3.4.5 全局分区(global)
全局分区做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1。慎用,可能对程序造成很大的压力!
// 全局分区source.global().print();
3.4.6 自定义分区(Custom)
当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。
例子:实现将奇数与偶数分配到不同的分区
自定义分区器实现 Partitioner:
public class MyPartitioner implements Partitioner<String> {@Overridepublic int partition(String key, int numPartitions) {// key 为当前数据,numPartitions 为下游并行度return Integer.parseInt(key) % numPartitions;}
}
使用自定义分区
public class PartitionCustomDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("IP", 端口);source.partitionCustom(new MyPartitioner(), v -> v).print();env.execute();}
}
结果输出:
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
2
3
4
6
8
10输出:
2> 1
1> 2
2> 3
1> 4
1> 6
1> 8
1> 10
3.5 分流
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
与分区不同的是,分流是是将一条数据流拆分成多条流。而分区是将数据分配到下游算子的子任务中。
3.5.1 Filter 实现分流
例子:读取一个整数数字流,将数据流划分为奇数流和偶数流。
public class SplitByFilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("IP", 端口);source.filter(num -> Integer.parseInt(num) % 2 == 0).print("偶数流:");source.filter(num -> Integer.parseInt(num) % 2 == 1).print("奇数流:");env.execute();}
}
输入输出结果:
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
1
2
45
324321
234325235
12312
11412输出:
奇数流::1> 1
偶数流::2> 2
奇数流::1> 45
偶数流::2> 324321
奇数流::1> 234325235
偶数流::2> 12312
偶数流::2> 11412
用 Filter 实现虽然简单但不够高效,因为每次数据流都会经过两次 Filter 过滤 。
3.5.2 使用侧输出流
一条未被分类操作的流被称为“主流”,经过分流操作后,侧输出流可以理解为“主流”的“支流”。
需求:id 为 s1 、s2 的数据被到另外两条侧流 ,非 s1 、s2不受影响,放在主流:
public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("IP", 端口);SingleOutputStreamOperator<WaterSensor> sensorDs = source.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String value) throws Exception {String[] data = value.split(",");return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));}});// 侧输出流的标记OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<WaterSensor> process = sensorDs.process(new ProcessFunction<WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, ProcessFunction<WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {if ("s1".equals(value.getId())) {ctx.output(s1Tag, value);} else if ("s2".equals(value.getId())) {ctx.output(s2Tag, value);} else {out.collect(value);}}});// process 默认只会返回主流数据process.print("主流");// 根据输出标签(流的标签)找到 s1 这条支流斌输出process.getSideOutput(s1Tag).printToErr("测输出流S1");// 根据输出标签(流的标签)找到 s2 这条支流斌输出process.getSideOutput(s2Tag).printToErr("测输出流S2");env.execute();}
}
输入与输出结果:
输入:[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s3,3,3
s2,2,2
s9,9,9
s1,33,1134输出:测输出流S1:2> WaterSensor{id='s1', ts=1, vc=1}主流:1> WaterSensor{id='s3', ts=3, vc=3}测输出流S2:2> WaterSensor{id='s2', ts=2, vc=2}主流:1> WaterSensor{id='s9', ts=9, vc=9}测输出流S1:2> WaterSensor{id='s1', ts=33, vc=1134}
- Process 算子非常灵活,基础算子底层都是调用 Process 来实现的。
- OutputTag 可以理解为侧输出流的名称以流的数据类型。
- 将数据放入侧输出流中需要使用 ctx.output()传入输出流标签和数据 ;
- process 返回的流是主流,想获取侧输出流必须通过 process.getSideOutput()传入输出流标签来获取。
3.6 基本合流操作
在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。
3.6.1 联合(Union)
通过调用数据源的 Union() 就可以将一条或者多条流进行合并。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
public class UnionDemo {/*** Union : 合并一条或多条相同数据类型的流*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> source2 = env.fromElements(44, 55, 66);DataStreamSource<String> source3 = env.fromElements("777", "888", "999");// 写法1 : 一次合并一个流DataStream<Integer> union = source1.union(source2).union(source3.map(Integer::parseInt));// 写法2 : 一次合并多个流source1.union(source2,source3.map(Integer::parseInt));union.print();env.execute();}
}
结果输出:
1
2
3
44
55
66
777
888
999
3.6.2 连接(Connect)
Union 虽然使用简单,但是受限于只能合并相同类型的流,不太灵活。Flink 提供了另一个更方便的河流操作:连接(Connect)。
3.6.2.1 连接流(ConnectedStreams)
通过 Connect 可以将两条不同类型的流进行连接,但是不再返回 DataStream ,而是返回 ConnectedStreams(连接流)。
且两条流连接后只是形式上的“合并”,对这条流进行处理转换则需要对原本的两条流单独处理。
public class ConnectDemo {/*** Connect : 连接(合并)两条流* 返回的是 ConnectedStreams(连接流) 而不是 DataStream* 只是名义上的统一,处理逻辑需要每条流单独处理*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);DataStreamSource<String> source2 = env.fromElements("777", "888", "999");ConnectedStreams<Integer, String> connect = source1.connect(source2);// 需要对两条单独处理 CoMapFunction(第一条流的类型,第二条流的类型,输出的类型)SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return value.toString();}@Overridepublic String map2(String value) throws Exception {return value;}});map.print();env.execute();}
}
结果输出:
1
777
2
888
3
999
3.6.2.2 CoProcessFunction
与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
例子:有两条数据类型不同的流,需要根据各自数据的第一个字段进行匹配。类似于 MySQL中的 Inner Join。
public class ConnectKeyByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1", 1),Tuple3.of(1, "aa2", 2),Tuple3.of(2, "bb", 1),Tuple3.of(3, "cc", 1));ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);SingleOutputStreamOperator<String> process = connect.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {// 为各自两条流定义中间变量用于存储匹配时的数据Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();@Overridepublic void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// 第一次出现该 Key 则直接将数据put进s1的数据集合中if (!s1Cache.containsKey(id)) {ArrayList<Tuple2<Integer, String>> s1Values = new ArrayList<>();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 不是第一次出现该 Key ,直接添加进该 Key 的数组中s1Cache.get(id).add(value);}// 去另外一条流的数据中寻找有没有 id 相匹配的,有则放入采集器if (s2Cache.containsKey(id)) {for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("S1:" + value + "<---->" + "s2:" + s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// 第一次出现该 Key 则直接将数据put进s2的数据集合中if (!s2Cache.containsKey(id)) {ArrayList<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 不是第一次出现该 Key ,直接添加进该 Key 的数组中s2Cache.get(id).add(value);}// 去另外一条流的数据中寻找有没有 id 相匹配的,有则放入采集器if (s1Cache.containsKey(id)) {for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("S2:" + value + "<---->" + "s1:" + s1Element);}}}});process.print();env.execute();}
}
结果:
S2:(1,aa1,1)<---->s1:(1,a1)
S1:(1,a2)<---->s2:(1,aa1,1)
S2:(1,aa2,2)<---->s1:(1,a1)
S2:(1,aa2,2)<---->s1:(1,a2)
S2:(2,bb,1)<---->s1:(2,b)
S2:(3,cc,1)<---->s1:(3,c)
注意:在多并行度下,以上匹配会出错,因为多并行度下,数据会被发往 Process 不同的子任务中(Slot),而不同的子任务间数据无法共享,导致读取不到另一个子任务的数组,从而匹配错误。所以需要在连接流后对要匹配的字段进行 KeyBy 操作,确保同一个 Key 被分配到同一个子任务中。
四.输出算子(Sink)
Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

4.1 连接到外部系统
Flink 1.17 中的DataStream API专门提供了向外部写入数据的方法:sinkTo,对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
stream.sinkTo(…)
在大部分情况下,Sink 并不需要我们手动实现,Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

及第三方提供的连接器:
地址:Overview | Apache Flink
4.2 输出到文件
Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
例子:使用数据生成器源源不断生成数据,并输出到文件夹的文本文件中,每隔一个小时生成一个新的文件夹,且每隔20秒或者文件大小达到 3KB 则新建一个文本文件。
导入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>
public class SinkFileDemo {public static void main(String[] args) throws Exception {// 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 全局并行度设置为 2env.setParallelism(2);// 开启checkpointenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);/*** 数据生成器:无限生成数字,一秒生成 1000 条*/DataGeneratorSource<String> dataGenSource = new DataGeneratorSource<>((GeneratorFunction<Long, String>) num -> "Number:" + num,Long.MIN_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> streamSource = env.fromSource(dataGenSource, WatermarkStrategy.noWatermarks(),"data-generator");/*** 输出到文件系统* Sink 算子同样会受到 并行度 的影响:例如会同时有 并行度个 个文件被写入*/FileSink<String> fileSink = FileSink// 指定要输出的 文件目录 及 文件编码.<String>forRowFormat(new Path("D:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 指定要生成文件的 前后缀.withOutputFileConfig(OutputFileConfig.builder() // 建造者模式// 文件的前缀.withPartPrefix("flink-file-test")// 文件的后缀.withPartSuffix(".txt").build())// 指定目录分桶:按照小时进行分桶(一小时生成一个新的目录),并设置时区为 Asia/Shanghai.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai")))// 文件滚动策略:每隔多少秒 或 文件超过多大 就生成新的文件.withRollingPolicy(DefaultRollingPolicy.builder() // 建造者模式// 每隔 20S 生成一个新的文件.withRolloverInterval(Duration.ofSeconds(20))// 文件大小超过大于 3KB 则生成一个新的文件.withMaxPartSize(new MemorySize(1024 * 3)).build()).build();// 输出streamSource.sinkTo(fileSink);// 执行env.execute();}}
结果:

FileSink
.forRowFormat:指定要输出的文件目录及文件编码
.withOutputFileConfig:指定要生成文件的前后缀
.withBucketAssigner:指定目录分桶
.withRollingPolicy:文件滚动策略
4.3 输出到RabbitMQ
想要输出到 RabbitMQ,也需要调用对应的 Sink 算子--RMQSink 。
例子:从 Socket 读数据,写入到 RabbitMQ 中,作为一条消息。
添加Kafka 连接器依赖:
<!--RabbitMQ 连接器-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>
public class SinkRabbitMqDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataStreamSource<String> streamSource = env.socketTextStream("ip", 1234);// 配置 RabbitMQ 连接信息RMQConnectionConfig mqConfig = new RMQConnectionConfig.Builder().setHost("xxx.xxx.xxx.xxx") // RabbitMQ 服务地址.setPort(5379) // RabbitMQ 服务端口.setUserName("用户名") // 用户名.setPassword("密码") // 密码.setVirtualHost("/") // 虚拟主机名.build();// 创建一个RMQSink,用于将数据发送到RabbitMQ队列// mq配置信息,队列名称,序列化器RMQSink rmqSink = new RMQSink(mqConfig, "test_queue", new SimpleStringSchema());// 将数据流写入RabbitMQ队列 Flink 1.17 并不支持使用 sinkTo 对第三方系统进行输出streamSource.addSink(rmqSink);env.execute("flink connectors rabbitmq");}
}
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
hello flink
hello rabbitmq
结果:

4.4 输出到MySQL(JDBC)
同样的,要输出到 MySQL ,需要调用 JdbcSink.sink() 算子,且也只能使用 addSink 来添加输出。
例子: 在 Socket 中写入数据,写入MySQL中。
在 MySQL 中新建表:
CREATE TABLE `ws` (`id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
导入 MySQL 驱动:
<!-- MySQL 驱动-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency>
导入 Flink - MySQL 连接器:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version>
</dependency>
代码:
public class SinkMySQLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> streamSource = env.socketTextStream("xxx.xxx.xxx.xxx", 1234);// 将从 Socket 中读到的字符串转成实体类SingleOutputStreamOperator<WaterSensor> map = streamSource.map((MapFunction<String, WaterSensor>) s -> {String[] data = s.split(",");return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));});/*** jdbcSink 四大参数:* 1、要执行的 SQL 语句* 2、为占位符填充值* 3、执行选项:重试次数,攒批* 4、MySQL 连接信息*/SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values( ? , ? , ?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withBatchIntervalMs(3000) // 批次的时间.withBatchSize(100) // 批次的大小:条数.withMaxRetries(3) // 重试次数.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("Username").withPassword("Password").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());map.addSink(jdbcSink);env.execute("flink connectors MySQL");}
}
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
hello,1,1
flink,2,2
mysql,3,3
输出:

4.5 自定义Sink输出
Flink 为我们提供很多常用的连接器,一般不推荐自定义Sink,因为需要自行处理连接逻辑及错误逻辑。
如果要自定义Sink,Flink 为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
streamSource.addSink(new MySink());
推荐继承RichSinkDunction,实现其中的三个方法open()、close()、invoke(String value, Context context)。
public class MySink extends RichSinkFunction<String>{@Overridepublic void open(Configuration parameters) throws Exception {// 启动时会被调用一次// 可以在这里创建连接}@Overridepublic void close() throws Exception {// 销毁时会被调用一次// 可以在这里销毁连接}// Sink 的核心逻辑@Overridepublic void invoke(String value, Context context) throws Exception {// 每条数据来都会调用一次// 具体的写入逻辑...}
}相关文章:
Flink1.17 DataStream API
目录 一.执行环境(Execution Environment) 1.1 创建执行环境 1.2 执行模式 1.3 触发程序执行 二.源算子(Source) 2.1 从集合中读取数据 2.2 从文件读取数据 2.3 从 RabbitMQ 中读取数据 2.4 从数据生成器读取数据 2.5 …...
数据结构中树、森林 与 二叉树的转换
1 树转换为 二叉树 将树转换成二叉树的步骤是: 加线。在所有的兄弟结点之间加一条线。去线。对于树中的每个结点,只保留它与第一个孩子结点的连线,删除该结点其他孩子结点之间的连线。调整。以树的根结点为轴心,将整个树顺时针旋…...
力扣labuladong——一刷day43
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、力扣257. 二叉树的所有路径二、力扣129. 求根节点到叶节点数字之和三、力扣199. 二叉树的右视图四、力扣662. 二叉树最大宽度 前言 一般来说,如…...
MapApp 地图应用
1. 简述 1.1 重点 1)更好地理解 MVVM 架构 2)更轻松地使用 SwiftUI 框架、对齐、动画和转换 1.2 资源下载地址: Swiftful-Thinking:https://www.swiftful-thinking.com/downloads 1.3 项目结构图: 1.4 图片、颜色资源文件图: 1.5 启动图片配置图: 2. Mo…...
Java之反射获取和赋值字段
在Java中,反射能够使得代码更加通用,往往用于工具类中。 但常常我们在获取或者赋值反射的属性时,会出现没有赋值成功的结果,往往是由于这个属性在父级中而导致的,这个时候我们就不能用getDeclaredField方法单独获取字段…...
ckplayer自己定义风格播放器的开发记录
CKplayer是一款基于Flash和HTML5技术的开源视频播放器,支持多种格式的音视频播放,并且具有优秀的兼容性和扩展性。 它不仅可以在网页上播放本地或者网络上的视频,还可以通过代码嵌入到网页中,实现更加个性化的播放效果。CKplayer…...
全网最全Django面试题整理(一)
Django 中的 MTV 是什么意思? 在Django中,MTV指的是“Model-Template-View”,而不是常见的MVC(Model-View-Controller)架构。Django的设计理念是基于MTV的 Model(模型) 模型代表数据存取层&am…...
vue统一登录
说明: 统一登录其实就是前端去判断Url地址的token 之后如果有token未过期就直接跳转到首页。 说到浏览器输入url地址,那从浏览器输入地址一共发生了几件事大致如下: DNS解析域名,获取IP地址 --》 建立TCP连接(三次握…...
MVSNet论文笔记
MVSNet论文笔记 摘要1 引言2 相关基础2.1 多视图立体视觉重建(MVS Reconstruction)2.2 基于学习的立体视觉(Learned Stereo)2.3 基于学习的多视图的立体视觉(Learned MVS) 3 MVSNet3.1 网络架构3.2 提取图片…...
大型 APP 的性能优化思路
做客户端开发都基本都做过性能优化,比如提升自己所负责的业务的速度或流畅性,优化内存占用等等。但是大部分开发者所做的性能优化可能都是针对中小型 APP 的,大型 APP 的性能优化经验并不会太多,毕竟大型 APP 就只有那么几个&…...
K8S配置资源管理
这里写目录标题 K8S配置资源管理一.Secret1.介绍2.Secret 有四种类型3.创建 Secret4.使用方式 二.ConfigMap1.介绍2.创建 ConfigMap3.Pod 中使用 ConfigMap4.用 ConfigMap 设置命令行参数5.通过数据卷插件使用ConfigMap6.ConfigMap 的热更新7.ConfigMap 更新后滚动更新 Pod K8S…...
Redis 的集群模式实现高可用
来源:Redis高可用:武林秘籍存在集群里,那稳了~ (qq.com) 1. 引言 前面我们已经聊过 Redis 的主从同步(复制)和哨兵机制,这期我们来聊 Redis 的集群模式。 但是在超大规模的互联网应用中,业务规…...
21、嵌套路由实战操作
1、创建内嵌子路由,你需要添加一个vue文件,同时添加一个与该文件同名的目录用来存放子视图组件。 2、在父组件(.vue)内增加用于显示子视图内容 新建文件 pages\index_id.vue 生成的对应路由 {path: "/",component: _…...
WPF 控件的缩放和移动
WPF 控件的缩放和移动 1.页面代码 <ContentControl ClipToBounds"True" Cursor"SizeAll"><Viewboxx:Name"viewbox"MouseDown"viewbox_MouseDown"MouseMove"viewbox_MouseMove"MouseWheel"Viewbox_MouseWhee…...
Python and和or的优先级实例比较
Python and和or的优先级 and和or都是Python的逻辑运算符,都为保留字。通常情况下,在没有括号影响,and和or的优先级中and在代码的逻辑运算过程中会相对优先一些,及在同一行的Python代码中,and会优先与or执行。下面将通…...
数据结构与算法编程题2
逆置线性表,使空间复杂度为 O(1) #include <iostream> using namespace std;typedef int ElemType; #define Maxsize 100 #define OK 1 #define ERROR 0 typedef struct SqList {ElemType data[Maxsize];int length; }SqList;void Init_SqList(SqList& …...
Java开发者的Python快速进修指南:控制之if-else和循环技巧
简单介绍 在我们今天的学习中,让我们简要了解一下Python的控制流程。考虑到我们作为有着丰富Java开发经验的程序员,我们将跳过一些基础概念,如变量和数据类型。如果遇到不熟悉的内容,可以随时查阅文档。但在编写程序或逻辑时&…...
二进制部署k8s集群-过程中的问题总结(接上篇的部署)
1、kube-apiserver部署过程中的问题 kube-apiserver.conf配置文件更改 2、calico的下载地址 curl https://docs.projectcalico.org/v3.20/manifests/calico.yaml -O 这里如果kubernetes的节点服务器为多网卡配置会产生报错 修改calino.yaml配置文件 解决方法: 调…...
IOS 关于CoreText的笔记
放大 一.CoreText计算attributeString显示所占区域 百度搜索有三种方法: 1.方法 - (CGRect)boundingRectWithSize:(CGSize)size options:(NSStringDrawingOptions)options context:(nullable NSStringDrawingContext *)context 2.使用CTFrameRef 的 CTFrameGetLin…...
基础课6——开放领域对话系统架构
开放领域对话系统是指针对非特定领域或行业的对话系统,它可以与用户进行自由的对话,不受特定领域或行业的知识和规则的限制。开放领域对话系统需要具备更广泛的语言理解和生成能力,以便与用户进行自然、流畅的对话。 与垂直领域对话系统相比…...
安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...
在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...
面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
