当前位置: 首页 > news >正文

Flink (六):DataStream API (三) 窗口

1. 窗口

窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 下面展示了 Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构。 我们可以看到,这两者唯一的区别仅在于:keyed streams 要调用 keyBy(...)后再调用 window(...) , 而 non-keyed streams 只用直接调用 windowAll(...)

1.1 窗口的生命周期

简单来说,一个窗口在第一个属于它的元素到达时就会被创建,然后在时间(event 或 processing time) 超过窗口的“结束时间戳 + 用户定义的 allowed lateness”时 被完全删除。Flink 仅保证删除基于时间的窗口,其他类型的窗口不做保证, 比如全局窗口。 例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么第一个元素落入 12:00 至 12:05 这个区间时,Flink 就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06 时,这个窗口将被摧毁。

另外,每个窗口会设置自己的 Trigger和 function (ProcessWindowFunctionReduceFunction、或 AggregateFunction)。该 function 决定如何计算窗口中的内容, 而 Trigger 决定何时窗口中的数据可以被 function 计算。 Trigger 的触发(fire)条件可能是“当窗口中有多于 4 条数据”或“当 watermark 越过窗口的结束时间”等。 Trigger 还可以在 window 被创建后、删除前的这段时间内定义何时清理(purge)窗口中的数据。 这里的数据仅指窗口内的元素,不包括窗口的 meta data。也就是说,窗口在 purge 后仍然可以加入新的数据。

除此之外,你也可以指定一个 Evictor,在 trigger 触发之后,Evictor 可以在窗口函数的前后删除数据。

1.2 Keyed Windows

stream.keyBy(...)               <-  仅 keyed 窗口需要.window(...)              <-  必填项:"assigner"[.trigger(...)]            <-  可选项:"trigger" (省略则使用默认 trigger)[.evictor(...)]            <-  可选项:"evictor" (省略则不使用 evictor)[.allowedLateness(...)]    <-  可选项:"lateness" (省略则为 0)[.sideOutputLateData(...)] <-  可选项:"output tag" (省略则不对迟到数据使用 side output).reduce/aggregate/process()      <-  必填项:"function"[.getSideOutput(...)]      <-  可选项:"output tag"

1.3 Non-Keyed Windows

stream.windowAll(...)           <-  必填项:"assigner"[.trigger(...)]            <-  可选项:"trigger" (else default trigger)[.evictor(...)]            <-  可选项:"evictor" (else no evictor)[.allowedLateness(...)]    <-  可选项:"lateness" (else zero)[.sideOutputLateData(...)] <-  可选项:"output tag" (else no side output for late data).reduce/aggregate/process()      <-  必填项:"function"[.getSideOutput(...)]      <-  可选项:"output tag"

1.4 Keyed 和 Non-Keyed Windows

首先必须要在定义窗口前确定的是你的 stream 是 keyed 还是 non-keyed。 keyBy(...) 会将你的无界 stream 分割为逻辑上的 keyed stream。 如果 keyBy(...) 没有被调用,你的 stream 就不是 keyed。对于 keyed stream,其中数据的任何属性都可以作为 key 。 使用 keyed stream 允许你的窗口计算由多个 task 并行,因为每个逻辑上的 keyed stream 都可以被单独处理。 属于同一个 key 的元素会被发送到同一个 task。对于 non-keyed stream,原始的 stream 不会被分割为多个逻辑上的 stream, 所以所有的窗口计算会被同一个 task 完成,也就是 parallelism 为 1

2. Window Assigners

指定了你的 stream 是否为 keyed 之后,下一步就是定义 window assigner。Window assigner 定义了 stream 中的元素如何被分发到各个窗口。 你可以在 window(...)(用于 keyed streams)或 windowAll(...) (用于 non-keyed streams)中指定一个 WindowAssigner。 

WindowAssigner 负责将 stream 中的每个数据分发到一个或多个窗口中。 Flink 为最常用的情况提供了一些定义好的 window assigner,也就是 tumbling windows、 sliding windows、 session windows 和 global windows。 你也可以继承 WindowAssigner 类来实现自定义的 window assigner。 所有内置的 window assigner(除了 global window)都是基于时间分发数据的,processing time 或 event time 均可。 

基于时间的窗口用 start timestamp(包含)和 end timestamp(不包含)描述窗口的大小。 在代码中,Flink 处理基于时间的窗口使用的是 TimeWindow, 它有查询开始和结束 timestamp 以及返回窗口所能储存的最大 timestamp 的方法 maxTimestamp()

接下来我们会说明 Flink 内置的 window assigner 如何工作,以及他们如何用在 DataStream 程序中。 下面的图片展示了每种 assigner 如何工作。 紫色的圆圈代表 stream 中按 key 划分的元素(本例中是按 user 1user 2 和 user 3 划分)。 x 轴表示时间的进展。

2.1 滚动窗口(Tumbling Windows)

滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。

DataStream<T> input = ...;// 滚动 event-time 窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滚动 processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

时间间隔可以用 Time.milliseconds(x)Time.seconds(x)Time.minutes(x) 等来指定。滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。 比如说,不设置 offset 时,长度为一小时的滚动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.9992:00:00.000 - 2:59:59.999 等。 如果你想改变对齐方式,你可以设置一个 offset。如果设置了 15 分钟的 offset, 你会得到 1:15:00.000 - 2:14:59.9992:15:00.000 - 3:14:59.999 等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)

2.2 滑动窗口(Sliding Windows)

与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

DataStream<T> input = ...;// 滑动 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口,偏移量为 -8 小时
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

时间间隔可以使用 Time.milliseconds(x)Time.seconds(x)Time.minutes(x) 等来指定。如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。 比如说,不设置 offset 时,长度为一小时、滑动距离为 30 分钟的滑动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.9991:30:00.000 - 2:29:59.999 等。 如果你想改变对齐方式,你可以设置一个 offset。 如果设置了 15 分钟的 offset,你会得到 1:15:00.000 - 2:14:59.9991:45:00.000 - 2:44:59.999 等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)

2.3 会话窗口(Session Windows)

会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

DataStream<T> input = ...;// 设置了固定间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);

固定间隔可以使用 Time.milliseconds(x)Time.seconds(x)Time.minutes(x) 等来设置。动态间隔可以通过实现 SessionWindowTimeGapExtractor 接口来指定。

会话窗口并没有固定的开始或结束时间,所以它的计算方法与滑动窗口和滚动窗口不同。在 Flink 内部,会话窗口的算子会为每一条数据创建一个窗口, 然后将距离不超过预设间隔的窗口合并。 想要让窗口可以被合并,会话窗口需要拥有支持合并的 Triggers 和 Window Functions, 比如说 ReduceFunctionAggregateFunction 或 ProcessWindowFunction

2.4 全局窗口(Global Windows)

全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义 Triggers 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);

3. 窗口函数(Window Functions)

定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了。

窗口函数有三种:ReduceFunctionAggregateFunction 或 ProcessWindowFunction。 前两者执行起来更高效,因为 Flink 可以在每条数据到达窗口后 进行增量聚合(incrementally aggregate)。 而 ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable,以及关于这个窗口的 meta-information。

使用 ProcessWindowFunction 的窗口转换操作没有其他两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据。 

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 合并来提高效率。 这样做既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction 接收窗口的 metadata。 我们接下来看看每种函数的例子。

3.1 ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

ReduceFunction 可以像下面这样定义:

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});

上面的例子是对窗口内元组的第二个属性求和。

3.2 AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。 输入数据的类型是输入流的元素类型,AggregateFunction 接口有如下几个方法: 把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(OUT 类型)。我们通过下例说明。

与 ReduceFunction 相同,Flink 会在输入数据到达窗口时直接进行增量聚合。

AggregateFunction 可以像下面这样定义:


/*** The accumulator is used to keep a running sum and a count. The {@code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());

上例计算了窗口内所有元素第二个属性的平均值。

3.3 ProcessWindowFunction

ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。

ProcessWindowFunction 的代码如下:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param context The context in which the window is being evaluated.* @param elements The elements in the window being evaluated.* @param out A collector for emitting elements.** @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public abstract void process(KEY key,Context context,Iterable<IN> elements,Collector<OUT> out) throws Exception;/*** Deletes any state in the {@code Context} when the Window expires (the watermark passes its* {@code maxTimestamp} + {@code allowedLateness}).** @param context The context to which the window is being evaluated* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public void clear(Context context) throws Exception {}/*** The context holding window metadata.*/public abstract class Context implements java.io.Serializable {/*** Returns the window that is being evaluated.*/public abstract W window();/** Returns the current processing time. */public abstract long currentProcessingTime();/** Returns the current event-time watermark. */public abstract long currentWatermark();/*** State accessor for per-key and per-window state.** <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up* by implementing {@link ProcessWindowFunction#clear(Context)}.*/public abstract KeyedStateStore windowState();/*** State accessor for per-key global state.*/public abstract KeyedStateStore globalState();}}

key 参数由 keyBy() 中指定的 KeySelector 选出。 如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple, 并且你需要手动将它转换为正确大小的 tuple 才能提取 key。

ProcessWindowFunction 可以像下面这样定义:

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}
}

上例使用 ProcessWindowFunction 对窗口中的元素计数,并且将窗口本身的信息一同输出。

注意,使用 ProcessWindowFunction 完成简单的聚合任务是非常低效的。 接下来会说明如何将 ReduceFunction 或 AggregateFunction 与 ProcessWindowFunction 组合成既能 增量聚合又能获得窗口额外信息的窗口函数。

3.4 增量聚合的 ProcessWindowFunction

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。

3.4.1 使用 ReduceFunction 增量聚合

下例展示了如何将 ReduceFunction 与 ProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。

DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitionsprivate static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}

3.4.2 使用 AggregateFunction 增量聚合

下例展示了如何将 AggregateFunction 与 ProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate(), new MyProcessWindowFunction());// Function definitions/*** The accumulator is used to keep a running sum and a count. The {@code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {public void process(String key,Context context,Iterable<Double> averages,Collector<Tuple2<String, Double>> out) {Double average = averages.iterator().next();out.collect(new Tuple2<>(key, average));}
}

3.5 在 ProcessWindowFunction 中使用 per-window state

除了访问 keyed state (任何富函数都可以),ProcessWindowFunction 还可以使用作用域仅为 “当前正在处理的窗口”的 keyed state。在这种情况下,理解 per-window 中的 window 指的是什么非常重要。 总共有以下几种窗口的理解:

  • 在窗口操作中定义的窗口:比如定义了长一小时的滚动窗口长两小时、滑动一小时的滑动窗口
  • 对应某个 key 的窗口实例:比如 以 user-id xyz 为 key,从 12:00 到 13:00 的时间窗口。 具体情况取决于窗口的定义,根据具体的 key 和时间段会产生诸多不同的窗口实例。

Per-window state 作用于后者。也就是说,如果我们处理有 1000 种不同 key 的事件, 并且目前所有事件都处于 [12:00, 13:00) 时间窗口内,那么我们将会得到 1000 个窗口实例, 且每个实例都有自己的 keyed per-window state。

process() 接收到的 Context 对象中有两个方法允许我们访问以下两种 state:

  • globalState(),访问全局的 keyed state
  • windowState(), 访问作用域仅限于当前窗口的 keyed state

如果你可能将一个 window 触发多次(比如当你的迟到数据会再次触发窗口计算, 或你自定义了根据推测提前触发窗口的 trigger),那么这个功能将非常有用。 这时你可能需要在 per-window state 中储存关于之前触发的信息或触发的总次数。

当使用窗口状态时,一定记得在删除窗口时清除这些状态。他们应该定义在 clear() 方法中。

4. Triggers

Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要,你可以在 trigger(...) 调用中指定自定义的 trigger。

Trigger 接口提供了五个方法来响应不同的事件:

  • onElement() 方法在每个元素被加入窗口时调用。
  • onEventTime() 方法在注册的 event-time timer 触发时调用。
  • onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
  • onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
  • 最后,clear() 方法处理在对应窗口被移除时所需的逻辑。

有两点需要注意:

1. 前三个方法通过返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种:

  • CONTINUE: 什么也不做
  • FIRE: 触发计算
  • PURGE: 清空窗口内的元素
  • FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

2. 上面的任意方法都可以用来注册 processing-time 或 event-time timer。

4.1 触发(Fire)与清除(Purge)

当 trigger 认定一个窗口可以被计算时,它就会触发,也就是返回 FIRE 或 FIRE_AND_PURGE。 这是让窗口算子发送当前窗口计算结果的信号。 如果一个窗口指定了 ProcessWindowFunction,所有的元素都会传给 ProcessWindowFunction。 如果是 ReduceFunction 或 AggregateFunction,则直接发送聚合的结果。

当 trigger 触发时,它可以返回 FIRE 或 FIRE_AND_PURGE。 FIRE 会保留被触发的窗口中的内容,而 FIRE_AND_PURGE 会删除这些内容。 Flink 内置的 trigger 默认使用 FIRE,不会清除窗口的状态。

注意:Purge 只会移除窗口的内容, 不会移除关于窗口的 meta-information 和 trigger 的状态。

4.2 WindowAssigner 默认的 Triggers

WindowAssigner 默认的 Trigger 足以应付诸多情况。 比如说,所有的 event-time window assigner 都默认使用 EventTimeTrigger。 这个 trigger 会在 watermark 越过窗口结束时间后直接触发。

GlobalWindow 的默认 trigger 是永远不会触发的 NeverTrigger。因此,使用 GlobalWindow 时,你必须自己定义一个 trigger。

当你在 trigger() 中指定了一个 trigger 时, 你实际上覆盖了当前 WindowAssigner 默认的 trigger。 比如说,如果你指定了一个 CountTrigger 给 TumblingEventTimeWindows,你的窗口将不再根据时间触发, 而是根据元素数量触发。如果你希望即响应时间,又响应数量,就需要自定义 trigger 了。

4.3 内置 Triggers 和自定义 Triggers

Flink 包含一些内置 trigger。

  • 之前提到过的 EventTimeTrigger 根据 watermark 测量的 event time 触发。
  • ProcessingTimeTrigger 根据 processing time 触发。
  • CountTrigger 在窗口中的元素超过预设的限制时触发。
  • PurgingTrigger 接收另一个 trigger 并将它转换成一个会清理数据的 trigger。

如果你需要实现自定义的 trigger,可以参考官方文档抽象类 Trigger 。 

5. Evictors

Flink 的窗口模型允许在 WindowAssigner 和 Trigger 之外指定可选的 Evictor。 如本文开篇的代码中所示,通过 evictor(...) 方法传入 Evictor。 Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。 Evictor 接口提供了两个方法实现此功能:

/*** Optionally evicts elements. Called before windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/*** Optionally evicts elements. Called after windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

evictBefore() 包含在调用窗口函数前的逻辑,而 evictAfter() 包含在窗口函数调用之后的逻辑。 在调用窗口函数之前被移除的元素不会被窗口函数计算。

Flink 内置有三个 evictor:

  • CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除
  • DeltaEvictor: 接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。
  • TimeEvictor: 接收 interval 参数,以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素。

默认情况下,所有内置的 evictor 逻辑都在调用窗口函数前执行。指定一个 evictor 可以避免预聚合,因为窗口中的所有元素在计算前都必须经过 evictor。

Flink 不对窗口中元素的顺序做任何保证。也就是说,即使 evictor 从窗口缓存的开头移除一个元素,这个元素也不一定是最先或者最后到达窗口的。

6. Allowed Lateness

在使用 event-time 窗口时,数据可能会迟到,即 Flink 用来追踪 event-time 进展的 watermark 已经 越过了窗口结束的 timestamp 后,数据才到达。

默认情况下,watermark 一旦越过窗口结束的 timestamp,迟到的数据就会被直接丢弃。 但是 Flink 允许指定窗口算子最大的 allowed lateness。 Allowed lateness 定义了一个元素可以在迟到多长时间的情况下不被丢弃,这个参数默认是 0。 在 watermark 超过窗口末端、到达窗口末端加上 allowed lateness 之前的这段时间内到达的元素, 依旧会被加入窗口。取决于窗口的 trigger,一个迟到但没有被丢弃的元素可能会再次触发窗口,比如 EventTimeTrigger

为了实现这个功能,Flink 会将窗口状态保存到 allowed lateness 超时才会将窗口及其状态删除 

默认情况下,allowed lateness 被设为 0。即 watermark 之后到达的元素会被丢弃。

你可以像下面这样指定 allowed lateness:

DataStream<T> input = ...;input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).<windowed transformation>(<window function>);

使用 GlobalWindows 时,没有数据会被视作迟到,因为全局窗口的结束 timestamp 是 Long.MAX_VALUE

6.1 从旁路输出(side output)获取迟到数据

首先,你需要在开窗后的 stream 上使用 sideOutputLateData(OutputTag) 表明你需要获取迟到数据。 然后,你就可以从窗口操作的结果中获取旁路输出流了。

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};DataStream<T> input = ...;SingleOutputStreamOperator<T> result = input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).sideOutputLateData(lateOutputTag).<windowed transformation>(<window function>);DataStream<T> lateStream = result.getSideOutput(lateOutputTag);

6.2 迟到数据的一些考虑

当指定了大于 0 的 allowed lateness 时,窗口本身以及其中的内容仍会在 watermark 越过窗口末端后保留。 这时,如果一个迟到但未被丢弃的数据到达,它可能会再次触发这个窗口。 这种触发被称作 late firing,与表示第一次触发窗口的 main firing 相区别。 如果是使用会话窗口的情况,late firing 可能会进一步合并已有的窗口,因为他们可能会连接现有的、未被合并的窗口。

你应该注意:late firing 发出的元素应该被视作对之前计算结果的更新,即你的数据流中会包含一个相同计算任务的多个结果。你的应用需要考虑到这些重复的结果,或去除重复的部分。

相关文章:

Flink (六):DataStream API (三) 窗口

1. 窗口 窗口&#xff08;Window&#xff09;是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中&#xff0c;再对每个“桶”加以处理。 下面展示了 Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构。 我们可以看到&#xff0c;这两者唯一的…...

MYSQL学习笔记(二):基本的SELECT语句使用(基本、条件、聚合函数查询)

前言&#xff1a; 学习和使用数据库可以说是程序员必须具备能力&#xff0c;这里将更新关于MYSQL的使用讲解&#xff0c;大概应该会更新30篇&#xff0c;涵盖入门、进阶、高级(一些原理分析);这一篇是讲解SELECT语句使用&#xff0c;包括基本、条件、聚合函数查询&#xff0c;…...

PCL 点到面的ICP算法实现点云配准(C++详细过程版)

ICP算法 一、算法原理1、算法概述2、实现流程3、参考文献二、代码实现三、结果展示四、相关链接一、算法原理 1、算法概述 实现的算法与 PCL 点到面的ICP精配准(线性最小二乘优化)一文相同,使用C++代码复现线性优化的求解过程,求解过程如下所示,由于原版英文文献的计算过…...

MarsCode青训营打卡Day1(2025年1月14日)|稀土掘金-16.最大矩形面积问题

资源引用&#xff1a; 最大矩形面积问题 - MarsCode 打卡小记录&#xff1a; 今天是开营第一天&#xff0c;和小伙伴们组成了8人的团队&#xff0c;在接下来的数十天里相互监督&#xff0c;打卡刷题&#xff01; 稀土掘金-16.最大矩形面积问题&#xff08;16.最大矩形面积问题…...

我的世界-与门、或门、非门等基本门电路实现

一、红石比较器 (1) 红石比较器结构 红石比较器有前端单火把、后端双火把以及两个侧端 其中后端和侧端是输入信号,前端是输出信号 (2) 红石比较器的两种模式 比较模式 前端火把未点亮时处于比较模式 侧端>后端 → 0 当任一侧端强度大于后端强度时,输出…...

【FISCO BCOS】二十三、部署WeBASE-Node-Manager

WeBASE-Node-Manager是WeBASE的子组件之一,可以处理前端页面所有web请求,管理各个节点的状态,管理链上所有智能合约,对区块链的数据进行统计、分析,对异常交易的审计,私钥管理等,今天我们来部署WeBASE-Node-Manager。 环境:ubuntu 22 、已搭建单机四节点(节点已启动)…...

app版本控制java后端接口版本管理

java api version 版本控制 java接口版本管理 1 自定义 AppVersionHandleMapping 自定义AppVersionHandleMapping实现RequestMappingHandlerMapping里面的方法 public class AppVersionHandleMapping extends RequestMappingHandlerMapping {Overrideprotected RequestCondit…...

Go语言strings包与字符串操作:从基础到高级的全面解析

Go语言strings包与字符串操作:从基础到高级的全面解析 引言 Go语言以其简洁、高效和强大的标准库而闻名,其中strings包是处理字符串操作的核心工具。本文将深入探讨Go语言中strings包的功能及其在实际开发中的应用,帮助开发者更好地理解和使用这一工具。 1. strings包概述…...

使用redis-cli命令实现redis crud操作

项目场景&#xff1a; 线上环境上redis中的key影响数据展示&#xff0c;需要删除。但环境特殊没办法通过 redis客户端工具直连。只能使用redis-cli命令来实现。 操作步骤&#xff1a; 1、确定redis安装的服务器&#xff1b; 2、找到redis的安装目录下 ##找到redis安装目…...

Ubuntu升级Linux内核教程

本文作者CVE-柠檬i: CVE-柠檬i-CSDN博客 本文使用的方法是dpkg安装&#xff0c;目前版本为5.4.0-204&#xff0c;要升级成5.8.5版本 下载 下载网站&#xff1a;https://kernel.ubuntu.com/mainline/ 在该网站下载deb包&#xff0c;选择自己想要升级的版本&#xff0c;这里是5…...

5、docker-compose和docker-harbor

安装部署docker-compose 自动编排工具&#xff0c;可以根据dockerfile自动化的部署docker容器。是yaml文件格式&#xff0c;注意缩进。 1、安装docker-compose 2、配置compose配置文件docker-compose.yml 3、运行docker-compose.yml -f&#xff1a;指定文件&#xff0c;up&…...

Leetcode3097:或值至少为 K 的最短子数组 II

题目描述&#xff1a; 给你一个 非负 整数数组 nums 和一个整数 k 。 如果一个数组中所有元素的按位或运算 OR 的值 至少 为 k &#xff0c;那么我们称这个数组是 特别的 。 请你返回 nums 中 最短特别非空 子数组的长度&#xff0c;如果特别子数组不存在&#xff0c;那么返…...

HTML应用指南:利用GET请求获取全国特斯拉充电桩位置

随着电动汽车的普及&#xff0c;充电基础设施的建设变得至关重要。作为电动汽车领域的先驱&#xff0c;特斯拉不仅在车辆技术创新上持续领先&#xff0c;还积极构建广泛的充电网络&#xff0c;以支持其不断增长的用户群体。为了提升用户体验和服务质量&#xff0c;开发人员和数…...

阿里云通义实验室自然语言处理方向负责人黄非:通义灵码2.0,迈入 Agentic AI

通义灵码是基于阿里巴巴通义大模型研发的AI 智能编码助手&#xff0c;在通义灵码 1.0 时代&#xff0c;我们针对代码的生成、补全和问答&#xff0c;通过高效果、低时延&#xff0c;研发出了国内最受欢迎的编码助手。 在通义灵码 2.0 发布会上&#xff0c;阿里云通义实验室自然…...

第8篇:从入门到精通:掌握Python异常处理

第8篇&#xff1a;异常处理 内容简介 本篇文章将深入探讨Python中的异常处理机制。您将学习异常的基本概念与类型&#xff0c;掌握使用try-except块处理异常的方法&#xff0c;了解finally语句的作用&#xff0c;以及如何抛出和定义自定义异常。通过丰富的代码示例&#xff0…...

设计模式-结构型-装饰器模式

装饰器模式&#xff08;Decorator Pattern&#xff09;是结构型设计模式中的一种&#xff0c;它允许你通过将对象封装在一个新的对象中&#xff0c;来动态地添加新的功能&#xff0c;而无需改变原对象的结构。装饰器模式的核心思想是“将功能附加到对象上”&#xff0c;它是一种…...

git详细使用教程

文章目录 一、 git介绍与安装1、git介绍2、git的安装3、git使用前的说明 二、git的基础使用1、走进git之前2、git基础使用1、git init 项目初始化&#xff08;init&#xff09;成仓库&#xff08;repository&#xff09;2、git add 管理文件3、git commit 把文件提交到仓库&…...

java实现word转html(支持docx及doc文件)

private final static String tempPath "C:\\Users\\xxx\\Desktop\\Word2Html\\src\\test\\";//图片及相关文件保存的路径public static void main(String argv[]) {try {JFileChooser fileChooser new JFileChooser();fileChooser.setDialogTitle("Select a …...

搜维尔科技:Xsens人形机器人解决方案的优势

Xsens 致力于推动人形机器人技术的发展&#xff0c;塑造机器人与人类环境无缝融合的未来&#xff0c;通过创新精确和协作&#xff0c;协助生产和服务&#xff0c;改善人类生活和产业。 Xsens通过人形跟随捕捉详细的人体运动数据&#xff0c;使机器人能够学习类人的动作&#x…...

【王树森搜索引擎技术】概要01:搜索引擎的基本概念

1. 基本名词 query&#xff1a;查询词SUG&#xff1a;搜索建议文档&#xff1a;搜索结果标签/筛选项 文档单列曝光 文档双列曝光 2. 曝光与点击 曝光&#xff1a;用户在搜索结果页上看到文档&#xff0c;就算曝光文档点击&#xff1a;在曝光后&#xff0c;用户点击文档&…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南&#xff1a;从入门到实战 一、Grunt 是什么&#xff1f; Grunt是一个基于 Node.js 的前端自动化任务运行器&#xff0c;主要用于自动化执行项目开发中重复性高的任务&#xff0c;例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

免费数学几何作图web平台

光锐软件免费数学工具&#xff0c;maths,数学制图&#xff0c;数学作图&#xff0c;几何作图&#xff0c;几何&#xff0c;AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...

计算机基础知识解析:从应用到架构的全面拆解

目录 前言 1、 计算机的应用领域&#xff1a;无处不在的数字助手 2、 计算机的进化史&#xff1a;从算盘到量子计算 3、计算机的分类&#xff1a;不止 “台式机和笔记本” 4、计算机的组件&#xff1a;硬件与软件的协同 4.1 硬件&#xff1a;五大核心部件 4.2 软件&#…...