Flink之Watermark水印、水位线
Watermark水印、水位线
- 水位线
- 概述
- 水印本质
- 生成Watermark
- Watermark策略
- WatermarkStrategy工具类
- 使用Watermark策略
- 内置Watermark生成器
- 单调递增时间戳分配器
- 固定延迟的时间戳分配器
- 自定义WatermarkGenerator
- 周期性Watermark生成器
- 标记Watermark生成器
- Watermark策略与Kafka连接器
- 其他
- 处理空闲数据源
- 并行度下的水位线传递
- 迟到数据的处理
水位线
概述
在Apache Flink中,Watermark(水印)是一种用于处理事件时间(eventtime)的时间指示器。它模拟了事件流中事件时间进展的概念。
事件时间是指事件实际发生的时间,在分布式流处理中经常用于处理无序事件流。然而,由于网络延迟、乱序事件的到达以及分布式处理的特点,事件时间可能不按顺序到达处理器。在这种情况下,处理程序需要一种机制来标识它们已经处理过的事件时间,并据此生成或更新水印。
水印是一个特殊的事件,包含了一个时间戳。它表示截至到该时间戳的事件已经全部到达(或预期已到达),并且可以被认为是完整的。水印告知系统在事件时间维度上处理事件的进展情况,并在触发窗口计算、事件乱序处理等方面提供辅助。
水印的生成通常基于事件数据中的时间戳,通过一些策略来推断出未到达的事件的时间戳。简单的策略可以是事件时间减去一个固定的延迟值,例如,如果我们有一个事件的时间戳,我们可以生成一个比该事件时间戳小一定固定时间的水印。
Flink通过处理数据流中的时间戳和水印来衡量事件时间进展,并通过水印来驱动事件时间的处理。可以根据应用程序的需要自定义水印生成的策略。
水印本质
Watermark是水印、水位线的意思,水印的出现是为了解决实时计算中的数据乱序问题,它的本质是DataStream中一个带有时间戳的元素。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。通过使用水位线机制,Flink能够动态地处理乱序事件,并在保证准确性的同时提供低延迟的数据处理。
如果Flink系统中出现了一个WaterMarkT,那么就意味着EventTime<T的数据都已经到达,窗口的结束时间和T相同的那个窗口被触发进行计算了。因此,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。
在程序并行度大于1的情况下,会有多个流产生水印和窗口,这时候Flink会选取时间戳最小的水印。
生成Watermark
生成水位线使用assignTimestampsAndWatermarks()方法,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy);
需要传入一个WatermarkStrategy作为参数,也就是所谓的水位线生成策略
Watermark策略
Flink程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。通过使用TimestampAssigner API从元素中的某个字段去访问/提取时间戳。
时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉Flink应用程序事件时间的进度。其可以通过指定WatermarkGenerator 来配置watermark的生成方式。
需要设置一个同时包含TimestampAssigner 和WatermarkGenerator的WatermarkStrateg
WatermarkStrategy是一个接口,该接口中包含了一个时间戳分配器TimestampAssigner和一个水位线生成器WatermarkGenerator。
WatermarkStrategy接口如下:
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {/*** 根据策略实例化一个 watermark 生成器* 主要负责按照既定的方式,基于时间戳生成水位线*/WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context var1);/*** 负责从流中数据元素的某个字段中提取时间戳,并分配给元素* 时间戳的分配是生成水位线的基础*/ default TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new RecordTimestampAssigner();}
}
WatermarkStrategy工具类
工具类WatermarkStrategy中也提供了几个常用的watermark策略,并且可以在某些必要场景下构建自己的 watermark策略。
/*** 为时间戳单调递增的情况创建水印策略,适用于有序流*/static <T > WatermarkStrategy < T > forMonotonousTimestamps() {return (ctx) -> new AscendingTimestampsWatermarks<>();}/*** 为记录无序流的情况创建水印策略,但可以设置事件无序程度的上限。*/static <T > WatermarkStrategy < T > forBoundedOutOfOrderness(Duration maxOutOfOrderness) {return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);}/*** 基于watermarkgeneratorsupper自定义创建水印策略 */static <T > WatermarkStrategy < T > forGenerator(WatermarkGeneratorSupplier < T > generatorSupplier) {return generatorSupplier::createWatermarkGenerator;}/*** 创建完全不生成水印的水印策略。这在进行纯基于处理时间的流处理的场景中可能是有用*/static <T > WatermarkStrategy < T > noWatermarks() {return (ctx) -> new NoWatermarksGenerator<>();}
使用forBoundedOutOfOrderness watermark
生成器和一个lambda表达式作为时间戳分配器
DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4));SingleOutputStreamOperator<Tuple2<String, Integer>> assignTimestampsAndWatermarks = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));
注意:
时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。
使用Watermark策略
WatermarkStrategy在哪里使用?
1.直接在数据源上使用2.直接在非数据源的操作之后使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>);withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) -> a.add(b) ).addSink(...)
注意:
使用 WatermarkStrategy 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。
内置Watermark生成器
Flink内置了两个WaterMark生成器
1.forMonotonousTimestamps
: 时间戳单调增长:其实就是允许的延迟为0
WatermarkStrategy.forMonotonousTimestamps();
2.forBoundedOutOfOrderness
: 允许固定时间的延迟
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
单调递增时间戳分配器
对于有序流,主要特点就是时间戳单调增长,永远不会出现迟到数据的问题。因此当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。这是周期性生成水位线的最简单的场景,直接调用
WatermarkStrategy.forMonotonousTimestamps()
方法就可以实现
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 升序的watermark,没有等待时间,即当 数字转时间戳 达到 滚动处理时间窗口10s 就触发窗口执行.<Integer>forMonotonousTimestamps()// TimestampAssigner是一个可以从事件数据中提取时间戳字段的简单函数// 指定时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<Integer>() {@Overridepublic long extractTimestamp(Integer element, long recordTimestamp) {// 将输入数字转时间戳,单位毫秒,当作数据的时间戳System.out.println("数据 " + element);return element * 1000L;}});// 指定watermark策略SingleOutputStreamOperator<Integer> singleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator// 事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunction<Integer, String, TimeWindow>() {@Overridepublic void process(Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss");long count = input.spliterator().estimateSize();out.collect("窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}}).print();env.execute();}
> nc -lk 8888
1
2
5
8
9
10
15
18
20
21
数据 1
数据 2
数据 5
数据 8
数据 9
数据 10
窗口在时间区间: 1970-01-01 08:00:00-1970-01-01 08:00:10 产生5条数据,具体数据:[1, 2, 5, 8, 9]
数据 15
数据 18
数据 20
窗口在时间区间: 1970-01-01 08:00:10-1970-01-01 08:00:20 产生3条数据,具体数据:[10, 15, 18]
数据 21
固定延迟的时间戳分配器
乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间(数据流中的数据可能遇到的最大延迟)。此时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟时间的结果。
调用WatermarkStrategy.forBoundedOutOfOrderness()方法可以实现,方法传入一个maxOutOfOrderness参数,表示最大乱序程度,它表示数据流中乱序数据时间戳的最大差值,如果能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 最大容忍的延迟时间: 定watermark生成 乱序 等待3s 即当输入 (数字转时间戳 - 3) 达到 滚动处理时间窗口10s 就触发窗口执行.<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器 从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 将输入数字转时间戳,单位毫秒,当作数据的时间戳System.out.println("数据 " + element);return element * 1000L;});// 指定 watermark策略SingleOutputStreamOperator<Integer> singleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator// 使用事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunction<Integer, String, TimeWindow>() {@Overridepublic void process(Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = input.spliterator().estimateSize();out.collect("窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}}).print();env.execute();}
nc -lk 8888
1
5
8
6
7
11
4
13
15
20
19
23
26
数据 1
数据 5
数据 8
数据 6
数据 7
数据 11
数据 4
数据 13
窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生6条数据,具体数据:[1, 5, 8, 6, 7, 4]
数据 15
数据 20
数据 19
数据 23
窗口在时间区间: 1970-01-01 08:00:10.000-1970-01-01 08:00:20.000 产生4条数据,具体数据:[11, 13, 15, 19]
数据 26
自定义WatermarkGenerator
TimestampAssigner是一个可以从事件数据中提取时间戳字段的简单函数
watermark 的生成方式本质上是有两种:
1.周期性生成
周期性生成器通常通过 onEvent() 观察传入的事件数据,然后在框架调用 onPeriodicEmit() 时发出 watermark。
2.标记生成
标记生成器将查看 onEvent() 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 onPeriodicEmit() 发出 watermark。
都需要继承接口WatermarkGenerator,接口如下:
/*** {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。** <p><b>注意:</b> WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。*/
@Public
public interface WatermarkGenerator<T> {/*** 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 周期性的调用,也许会生成新的 watermark,也许不会** <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定*/void onPeriodicEmit(WatermarkOutput output);
}
周期性Watermark生成器
周期性生成器通常通过 onEvent() 观察传入的事件数据,然后在框架调用onPeriodicEmit()时发出watermark
生成watermark的时间间隔(每 n 毫秒)可以通过ExecutionConfig.setAutoWatermarkInterval(…) 指定。每次都会调用生成器的onPeriodicEmit()方法,如果返回的watermark非空且值大于前一个watermark,则将发出新的watermark
示例1:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 默认周期 200ms 修改默认周期时间为1000msenv.getConfig().setAutoWatermarkInterval(1000);WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 自定义 周期性生成器 3000L:延迟时间.<Integer>forGenerator(ctx -> new MyWatermarkGenerator<>(3000L)).withTimestampAssigner((element, recordTimestamp) -> {// 将输入数字转时间戳,单位毫秒,当作数据的时间戳System.out.println("数据 " + element);return element * 1000L;});SingleOutputStreamOperator<Integer> singleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator// 使用事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunction<Integer, String, TimeWindow>() {@Overridepublic void process(Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = input.spliterator().estimateSize();out.collect("窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}}).print();env.execute();}/*** 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。* 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。*/public static class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {/*** 乱序等待时间* 允许的最大延迟时间 ms*/private long maxOutOfOrderness;/*** 保存 当前为止 最大的事件时间*/private long currentMaxTimestamp;public MyWatermarkGenerator(long maxOutOfOrderness) {this.maxOutOfOrderness = maxOutOfOrderness;}/*** 每条数据来,都会调用一次: 用来生产WaterMark中的时间戳* 为每个事件调用,允许水印生成器检查和记住事件时间戳,或根据事件本身发出水印。** @param event* @param eventTimestamp 提取到数据的事件时间* @param output*/@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);System.out.println("调用onEvent 目前为止最大时间戳 " + currentMaxTimestamp);}/*** 周期性调用: 发送watermark 默认200ms调用一次* <p>* 调用此方法和生成水印的时间间隔取决于ExecutionConfig.getAutoWatermarkInterval()** @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发出的 watermark = 当前最大时间戳 - 最大乱序时间output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));System.out.println("调用onPeriodicEmit 生成watermark " + (currentMaxTimestamp - maxOutOfOrderness - 1));}}}
调用onPeriodicEmit 生成watermark -3001
调用onPeriodicEmit 生成watermark -3001
数据 5
调用onEvent 目前为止最大时间戳 5000
调用onPeriodicEmit 生成watermark 1999
数据 6
调用onEvent 目前为止最大时间戳 6000
调用onPeriodicEmit 生成watermark 2999
调用onPeriodicEmit 生成watermark 2999
数据 3
调用onEvent 目前为止最大时间戳 6000
调用onPeriodicEmit 生成watermark 2999
调用onPeriodicEmit 生成watermark 2999
数据 13
调用onEvent 目前为止最大时间戳 13000
调用onPeriodicEmit 生成watermark 9999
窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生3条数据,具体数据:[5, 6, 3]
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999
数据 10
调用onEvent 目前为止最大时间戳 13000
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999
示例2:
/*** 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 秒@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// 处理时间场景下不需要实现}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}
标记Watermark生成器
标记watermark生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。把发送水位线的逻辑写在onEvent方法当中即可
标记生成器将查看onEvent()中的事件数据,并等待检查在流中携带watermark的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出watermark。通常情况下,标记生成器不会通过onPeriodicEmit()发出 watermark。
WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 自定义间歇性生成器.<Integer>forGenerator(ctx -> new MyWatermarkGenerator<>(3000L)).withTimestampAssigner((element, recordTimestamp) -> {// 将输入数字转时间戳,单位毫秒,当作数据的时间戳System.out.println("数据 " + element);return element * 1000L;});
public static class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {/*** 乱序等待时间* 允许的最大延迟时间 ms*/private long maxOutOfOrderness;/*** 保存 当前为止 最大的事件时间*/private long currentMaxTimestamp;public MyWatermarkGenerator(long maxOutOfOrderness) {this.maxOutOfOrderness = maxOutOfOrderness;}/*** 每条数据来,都会调用一次: 用来提取最大的事件时间,保存下来,并发送watermark** @param event* @param eventTimestamp 提取到的数据的 事件时间* @param output*/@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));System.out.println("调用onEvent 目前为止最大时间戳 " + currentMaxTimestamp + " 生成watermark " + (currentMaxTimestamp - maxOutOfOrderness - 1));}/*** 周期性调用: 不需要** @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}}
数据 5
调用onEvent 目前为止最大时间戳 5000 生成watermark 1999
数据 6
调用onEvent 目前为止最大时间戳 6000 生成watermark 2999
数据 3
调用onEvent 目前为止最大时间戳 6000 生成watermark 2999
数据 13
调用onEvent 目前为止最大时间戳 13000 生成watermark 9999
窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生3条数据,具体数据:[5, 6, 3]
Watermark策略与Kafka连接器
使用 Apache Kafka 连接器作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)当使用Kafka数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式在这种情况下,可以使用Flink中可识别Kafka分区的watermark生成机制。使用此特性,将在Kafka消费端内部针对每个Kafka分区生成watermark,并且不同分区watermark的合并方式与在数据流shuffle时的合并方式相同。
注意:
在自定义数据源中发送水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KafkaSource<String> kafkaSource = KafkaSource.<String>builder()// 指定kafka节点的地址和端口.setBootstrapServers("node01:9092,node02:9092,node03:9092")// 指定消费者组的id.setGroupId("flink_group")// 指定消费的 Topic.setTopics("flink_topic")// 指定反序列化器,反序列化value.setValueOnlyDeserializer(new SimpleStringSchema())// flink消费kafka的策略.setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");DataStreamSink<String> kafka_source = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafka_source").print("Kafka");stream.print("Kafka");env.execute();}
其他
处理空闲数据源
如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着WatermarkGenerator也不会获得任何新数据去生成watermark。我们称这类数据源为空闲输入或空闲源。
在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子watermark的计算方式是取所有不同的上游并行数据源watermark的最小值,则其watermark将不会发生变化。
为了解决这个问题,可以使用WatermarkStrategy来检测空闲输入并将其标记为空闲状态。WatermarkStrategy为此提供了一个工具接口:
WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
并行度下的水位线传递
在多并行度下,当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 将数据合理地分发到不同的分区中DataStream<Integer> partitionCustom = dataStream.partitionCustom(new MyPartitioner(), value -> value);// 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 时间序列递增,没有等待时间,即当输入 数字转时间戳 达到 滚动处理时间窗口10s 就触发窗口执行.<Integer>forMonotonousTimestamps()// 将输入数字转时间戳,单位毫秒,当作数据的时间戳.withTimestampAssigner((r, ts) -> r * 1000L);// 指定 watermark策略SingleOutputStreamOperator<Integer> singleOutputStreamOperator = partitionCustom.assignTimestampsAndWatermarks(watermarkStrategy);// 分2组窗口 数据%分区数,分成两组: 奇数一组,偶数一组SingleOutputStreamOperator<String> process = singleOutputStreamOperator.keyBy(a -> a % 2)// 使用事件时间语义窗.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer key, Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = input.spliterator().estimateSize();out.collect("分组 " + key + " 的窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}});process.print();env.execute();}public static class MyPartitioner implements Partitioner<Integer> {@Overridepublic int partition(Integer key, int numPartitions) {if (key % 2 == 0) {// 将偶数分配到第一个分区return 0;} else {// 将奇数分配到第二个分区return 1;}}}
发送测试数据
> nc -lk 8888
1
3
5
7
9
11
13
15
17
此时,控制台不会有任何输出,原因如下:
偶数窗口中,没有任何数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就导致窗口无法触发。
因此,这里可以使用上面提到的处理空闲数据源
,设置空闲等待即可解决
// 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy// 升序的watermark,没有等待时间,即当输入 数字 达到 滚动处理时间窗口10s 就触发窗口执行.<Integer>forMonotonousTimestamps()// 指定时间戳分配器 从数据中提取.withTimestampAssigner((r, ts) -> r * 1000L)//空闲等待5s.withIdleness(Duration.ofSeconds(5));
2> 分组 1 的窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生5条数据,具体数据:[1, 3, 5, 7, 9]
迟到数据的处理
设置窗口推迟关窗时间,在关窗之前,迟到数据来了,还能被窗口计算,来一条迟到数据触发一次计算。关窗后,迟到数据不会被计算,放入侧输出流
在设置一定的窗口允许迟到时间时,只考虑大部分的迟到数据,忽略不考虑极端小部分迟到很久的数据
极端小部分迟到的数据, 放到侧输出流。 获取到之后可以做各种处理
1.推迟水印推进
在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
2.设置窗口延迟关闭
Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。
.window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3))
3.使用侧流接收迟到的数据
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3)).sideOutputLateData(lateWS)
实现示例
接收窗口关闭之后的迟到数据
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);// 将输入数据转换为IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 定义Watermark策略WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy.<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element * 1000L);// 指定 watermark策略SingleOutputStreamOperator<Integer> sensorDSwithWatermark = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<Integer> lateTag = new OutputTag<>("late-data", Types.POJO(Integer.class));SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer key, Context context, Iterable<Integer> input, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = input.spliterator().estimateSize();out.collect("分组 " + key + " 的窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
相关文章:

Flink之Watermark水印、水位线
Watermark水印、水位线 水位线概述水印本质生成WatermarkWatermark策略WatermarkStrategy工具类使用Watermark策略 内置Watermark生成器单调递增时间戳分配器固定延迟的时间戳分配器 自定义WatermarkGenerator周期性Watermark生成器标记Watermark生成器Watermark策略与Kafka连接…...

uni-app:对数组对象进行以具体某一项的分类处理
一、原始数据 这里定义为五个数组,种类product有aaa,bbb两种 原始数据在data中进行定义 res: {"success": true,"devices": [{no: 0,product: aaa,alias: "设备1",assign: [["a1", "a2", "a3"],[&q…...

顺序队列----数据结构
队列的概念 队列,符合先进先出特点的一种数据结构,是一种特殊的线性表,但它不像线性表一样可以任意插入和删除操作,而是只允许在表的一端插入,也就是在队列的尾部进行插入;只允许在表的另一端进行删除&…...

【Python学习笔记】字符串格式化
1. printf 风格 这种格式化语法 和 传统的C语言printf函数 一样 。 salary input(请输入薪资:)# 计算出缴税额,存入变量tax tax int(salary) *25/100 # 计算出税后工资,存入变量aftertax aftertax int(salary) *75/100 print(税前薪资&…...

RIP,EIGRP,OSPF区别
1. 动态路由协议的作用是什么? 2. 路由协议都有哪些种类? 3. 如何判断路由协议的优劣? -- RIP,EIGRP,OSPF - 动态路由协议 -- 路由协议 - 路由器上的软件 -- 帮助路由器彼此之间同步路由表 -- 相互的传递…...

驱动day2作业
编写应用程序控制三盏灯亮灭 head.h #ifndef __HEAD_H__ #define __HEAD_H__ #define PHY_LED1_MODER 0x50006000 #define PHY_LED2_MODER 0x50007000 #define PHY_LED1_ODR 0x50006014 #define PHY_LED2_ODR 0x50007014 #define PHY_RCC 0x50000A28#endif demo1.c #includ…...

MySQL基本操作之创建数据表
设计表: 学生表(Student): 学号(StudentID)- 主键,用于唯一标识每个学生姓名(Name)性别(Gender)年龄(Age)出生日期(BirthDate)地址(Address)电话(Phone)邮箱(Email)课程表(Course): 课程号(CourseID)- 主键,用于唯一标识每门课程课程名(CourseNam…...

rk平台android12修改dp和喇叭同时输出声音
客户的rk3588主板android12系统,要求接上type-c 进行dp输出显示以后,dp端和主板端都有声音。rk原有系统默认是接上dp显示以后,主板的喇叭声音会被切掉,导致没有声音。要让喇叭和dp同时输出声音需要做如下修改: --- a/…...

经典网络模型
Alexnet VGG VGG的启示 VGGNet采用了多次堆叠3x3的卷积核,这样做的目的是减少参数的数量。 例如,2个3x3的卷积核效果相当于1个5x5的卷积核效果,因为它们的感受野(输入图像上映射区域的大小)相同。但2个3x3卷积核的参数…...

SystemVerilog Assertions应用指南 Chapter1.29“ disable iff构造
在某些设计情况中,如果一些条件为真,则我们不想执行检验。换句话说,这就像是一个异步的复位,使得检验在当前时刻不工作。SVA提供了关键词“ disable iff来实现这种检验器的异步复位。“ disable iff”的基本语法如下。 disable iff (expression) <property definition> …...

C++设计模式之MVC
MVC(Model-View-Controller)是一种经典的软件架构模式,用于组织和分离应用程序的不同部分,以提高代码的可维护性、可扩展性和重用性。MVC模式将应用程序分为三个主要组成部分: Model(模型)&…...

Windows 下Tomcat监测重启
echo off setlocal enabledelayedexpansion rem 链接 set URL"localhost:8080/XXX.jsp" rem tomcat目录 set TOMCAT_HOMED:\apache-tomcat-7.0.100-windows-x64\apache-tomcat-7.0.100 rem 关闭tomcat命令的路径 set CLOSE_CMD%TOMCAT_HOME%\bin\shutdown.bat rem 启…...

数据库管理-第112期 Oracle Exadata 03-网络与ILOM(20231020)
数据库管理-第112期 Oracle Exadata 03-网络与ILOM(202301020) 在Exadata中,除了对外网络以外,其余网络都是服务于一体机内部各组件的网络,本期对这些网络的具体情况和硬件管理相关做一个讲解。 1 网络分类 1.1 生产…...

Kubeadm部署k8s集群 kuboard
目录 主机准备 主机配置 修改主机名(三个节点分别执行) 配置hosts(所有节点) 关闭防火墙、selinux、swap、dnsmasq(所有节点) 安装依赖包(所有节点) 系统参数设置(所有节点) 时间同步(所有节点) 配…...

虚拟机如何联网【NAT】
查看VMWARE的IP地址 #进入root用户 su -#更改虚拟网卡设置界面 vi /etc/sysconfig/network-scripts/ifcfg-ens33 修改ONBOOT为yes BOOTPROTO为static IPADDR为前面的网段 192.168.211.xx (xx为自己设置的,可以随意设置,前面的为前面查看的IP地址的前…...

机器学习,神经网络中,自注意力跟卷积神经网络之间有什么样的差异或者关联?
如图 6.38a 所示,如果用自注意力来处理一张图像,假设红色框内的“1”是要考虑的像素,它会产生查询,其他像素产生 图 6.37 使用自注意力处理图像 键。在做内积的时候,考虑的不是一个小的范围,而是整张图像的…...

这件事,准备考PMP的都必须知道
大家好,我是老原。 新的一月,新的困惑。最近接到的咨询很多,但的确出现了差异化的特质。 以前的粉丝朋友上来就问,我现在是项目经理,主要负责产品研发,我是考PMP还是NPDP好? 现在的粉丝朋友会…...

elasticsearch常用命令
Elasticsearch概念 ElasticsearchmysqlIndex(索引)数据库Type(类型)表Documents(文档)行Fields列 常用命令 索引 # 索引初始化,number_of_shards:分片数,不可修改;number_of_replicas:副本数,可修改 PUT lagou {"settings…...

2000-2021年上市公司MA并购溢价计算数据(含原始数据+Stata代码)
2000-2021年上市公司M&A并购溢价计算(原始数据Stata代码) 1、时间:2000-2021年 2、范围:沪深A股上市公司 3、指标: 原始数据指标:事件ID、公司ID、证券代码、业务编码、上市公司交易地位编码、首次公…...

移动端1px-从基本原理到开源解决方案介绍
1px 不够准确,应该说成 1 物理像素 为什么有 1px 这个问题?实现 1px 有哪些方法?这些方法分别有哪些优缺点?开源项目中使用的哪些解决方案?如何在项目中处理 1px 的相关问题? 基本概念 首先,我们…...

Linux——shell外壳程序
shell外壳程序 1. 什么是shell外壳程序 Linux严格意义上说的是一个操作系统,我们称之为“核心 “ ,但我们一般用户,不能直接使用核心。 而是通过核心的“外壳”程序,也就是所谓的shell。 shell是所有外壳程序的统称 平时程序员…...

攻防世界web篇-Training-WWW-Robots
直接点击给出的地址,然后会转到另一个网页界面,在这个界面,已经给出了提示,robots.txt 在浏览器中,直接在地址的后面加上robots.txt,会进到下面这个界面 因为对php语言一窍不通,所以这里纯粹就…...

Docker是一个流行的容器化平台,用于构建、部署和运行应用程序。
文章目录 Web应用程序数据库服务器微服务应用开发环境持续集成和持续部署 (CI/CD)应用程序依赖项云原生应用程序研究和教育 🎈个人主页:程序员 小侯 🎐CSDN新晋作者 🎉欢迎 👍点赞✍评论⭐收藏 ✨收录专栏:…...

如何压缩ppt文件的大小?
如何压缩ppt文件的大小?要知道现在很多课件都是使用ppt文件,那么就导致ppt文件过大,我们很多时候电脑的存储空间就不够了。为了能够更好的存储这些ppt文件,我们通常会选择压缩ppt文件。怎么压缩ppt文件更快更好,没有损…...

8个视频剪辑素材网站,免费下载
找视频剪辑素材就上这8个网站,免费下载,可商用,赶紧收藏起来~ 免费视频素材 1、菜鸟图库 https://www.sucai999.com/video.html?vNTYxMjky 菜鸟图库网素材非常丰富,网站主要还是以设计类素材为主,高清视频素材也很多…...

常用的二十种设计模式(上)-C++
C中常用的设计模式有很多,设计模式是解决常见问题的经过验证的最佳实践。以下是一些常用的设计模式: 单例模式(Singleton):确保一个类只有一个实例,并提供一个全局访问点。工厂模式(Factory&am…...

JS中var和let和const的区别
在我很早之前,我还在用着var,直到接触到了let与const,我才知道var造成的影响很多,我果断的抛弃了var,哈哈 让我为大家介绍一下它们的区别吧! 1.块级作用域 块作用域由 { }包括,let和const具有…...

如何利用IP定位技术进行反欺诈?
网络欺诈风险是指在互联网和数字领域中,存在各种类型的欺诈活动,旨在欺骗个人、组织或系统以获得非法获益。以下是一些常见的网络欺诈风险类型: 身份盗用:这是一种欺诈行为,涉及盗取他人的个人身份信息,如姓…...

wireshark抓包本地IDEA xml格式报文教程以及postman调用接口
1、选择 2、筛选...

MySQL学习(六)——视图和触发器
文章目录 1. 视图1.1 视图语法1.2 检查选项1.3 视图的更新1.4 视图的作用 2. 触发器2.1 介绍2.2 语法介绍2.3 触发器示例2.3.1 插入数据触发器2.3.2 修改数据触发器2.3.3 删除数据触发器 1. 视图 视图(View)是一种虚拟存在的表。视图中的数据并不在数据…...