Flink(九)【时间语义与水位线】
前言
2023-12-02-20:05,终于写完啦,最近状态不错。刚写完又收到了她的消息哈哈哈哈,开心。
再去全力打拼一次,奋战一场,就算最后打了败仗也无所谓,至少你留下了足迹。 《解忧杂货店》
1、时间语义
Flink 中的时间语义有两个:事件时间和处理时间。事件时间也就是数据产生的时间,通常都是数据自带的一个属性。处理时间则是指数据传输到我们集群被处理的时间。然而,由于在我们分布式系统中,数据在网络中有延迟,以及不同机器的时钟可能不一致,所以处理时间通常都要比事件时间滞后一些。
比如我们在 8:59:59 产生了一条数据,只考虑网络延迟为 2s,窗口的起始时间为 [8:00:00,9:00:00)。如果以事件时间作为默认的时间语义的话,那么我们的集群一定得等到数据在 9:00:01 才会开始计算输出;而如果以处理时间作为默认的时间语义的话,那么当集群机器的时间达达 9:00:00 时立即进行计算输出。所以,不难发现,使用事件时间会牺牲一定的实时性,而使用处理时间则会失去一定的准确性。
在实际应用中,事件时间更加常见。一般情况下,业务日志数据都会记录数据生成的时间戳,它就可以作为事件时间的判断基础。
在 Flink 的早期版本中是以处理时间作为基本语义的,但在 Flink 1.12 之后,考虑到事件时间在实际中更加广泛,所以 Flink 就以事件时间作为默认的时间语义了。
2、水位线(Watermark)
2.1、事件时间和窗口
我们的水位线正是基于事件时间提出来的,所以先梳理一下事件时间和窗口的关系。
在这个窗口的处理过程中,我们是基于数据的时间戳(数据自带时间戳属性),自定义了一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。
事件时间完全依赖数据本身,这样可以保证数据的结果绝对准确。也就是说,不管机器时间是多少,我们只以新来数据的时间戳更新时钟。一般的流处理场景中,事件时间可以基本与处理时间保持同步,只是略微有点延迟。
2.2、水位线概念
在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置是在某个数据到来之后;这样就可以把这个数据的时间戳抽出来,作为当前水位线的时间戳了。
上图是理想状态下,数据量小,数据按照有序的状态进入流中,每条数据产生一个水位线。
1)有序流中的水位线
然而,实际应用中,数据量非常大,并且数据之间的时间差非常小(几毫秒),如果依然在每条数据后面标记一个水位线,这样的代价是非常大的。所以为了提高效率,一般会每隔一段时间生产一个水位线。这时的水位线就像是一个周期性出现的时间标记。
2)无序流中的水位线
我们知道在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变(比如我们多个 Source 的情况下,数据通过不同的节点发送给下游,而由于不同节点网络性能或硬件的差异,3s 产生的数据可能在 1s 产生的数据之前被发送给下游被处理),这就是所谓的“乱序数据”。
上图中,很明显有很多乱序的数据,所以有可能新的时间戳比之前的还小,如果直接将这个时间的水位线再插入,我们的“时钟”就回退了。所以,当我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。但是这样的代价就是,每来一条数据,就去判断一下事件时间是否大于当前水位线时间。
如果考虑到大量数据同时到来的处理效率,明显每个数据比较一次是不可行的。我们同样可以周期性地生成水位线。这时只需要周期性地保存一下该周期内所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线 。
但是上面的这种方法依然存在问题:我们无法正确处理“迟到”的数据。在上面的例子中,当 9 秒产生的数据到来之后,我们就直接将时钟推进到了9 秒;如果有一个窗口结束时间就是 9 秒(比如,要统计 0-9 秒的所有数据),那么这时窗口就应该关闭、将收集到的所有数据计算输出结果了。但事实上,由于数据是乱序的,还可能有时间戳为 7 秒、8 秒的数据在 9 秒的数据之后才到来,这就是“迟到数据”(late data)。它们本来也应该属于 0~9 秒这个窗口,但此时窗口已经关闭,于是这些数据就被遗漏了,这会导致统计结果不正确。而解决这种问题的方法也比较简单,就是等一下,也就是说,为了让窗口能够正确的收集迟到的数据,我们可以让窗口等上一段时间,比如 2s。
同样,我们一般都是周期性地生成水位线:
这里需要特别注意的是,一个窗口所收集的数据,并不是之前所有已经到达的数据,而是真正数据的事件时间在该窗口范围内的。我们需要了解一下水位线和窗口的工作原理:
水位线和窗口的工作原理 (重点)
我们之前把窗口理解为一个桶,处理完一个范围内的数据后就清空,然后继续下一个窗口。这在处理时间语义下是没有问题的,因为我们并不关心数据的是什么时候产生的,我们只关心数据是什么时候来的,我只保证来一个处理一个,在处理时间范围内处理并输出就好了。但是在事件时间语义下,这种理解是错误的,因为数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。比如上图中,尽管水位线 W(20)之前有时间戳为 22 的数据到来,10~20 秒的窗口中也不会收集这个数据,进行计算依然可以得到正确的结果。
所以我们的每个窗口都是一个桶,每次收集数据时它只会取走属于自己窗口内的数据,当达到窗口的结束时间(比如等待 2s 的情况下,窗口 [0,10)的结束时间就是 12,也就是说当来一条 事件时间为 11s 的数据时,我们认为当前的时间达到了 w(11-2)=9,当来一条事件时间为 12s 的数据时 w(12-2)=10 ,10 已经达到了我们的窗口关闭时间,这事就说明事件时间在 10 之前的数据都已经到齐了,窗口[0,10) 也就会关闭了)时,就对桶内的数据进行计算处理。
注意:窗口是我们属于窗口范围内的第一条数据到来的时候现 new 的,也就是动态创建的,而不是静态创建好的。
3)水位线的特性
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
- 水位线是基于数据的时间戳生成的
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
- 水位线可以通过设置延迟,来保证正确处理乱序数据
- 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据
水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。
2.3、生成水位线
1)生成水位线的总体原则
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即,我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据,我们该怎么做呢?由于网络传输的延迟不确定(节点挂了,网络异常),为了获取所有迟到数据,保证计算结果完全正确,必须等待足够长的时间,但这会带来更高的延迟。
如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。
当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义(毕竟不在乎数据准确性也就无所谓迟到),这在理论上可以得到最低的延迟。
所以 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。接下来我们就具体了解一下水位线在代码中的使用。
2)水位线生成策略
在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方法:assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy)
这里的 WatermarkStrategy 是一个接口,它包含了一个 “时间戳分配器” 和一个“水位线生成器”。
DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);
至于为什么要有时间戳分配器,这是因为原始数据中的时间戳只是写入日志数据的一个字段,如果不提取出来并明确把它分配给数据,Flink 是无法知道数据真正产生的时间的。当然,有些时候数据源本身就提供了时间戳信息,比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳,而不需要单独提取字段分配了。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 时间戳分配器@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 水位线生成器@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
3)Flink 内置水位线策略
1、有序流中内置水位线设置
我们来演示一个水位线驱动的滚动窗口(注意:这里的水位线是事件时间语义下的),这里演示的是有序流。
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
public class WaterMarkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成:泛型方法,需要指定数据类型,升序的watermark 没有等待时间.<WaterSensor>forMonotonousTimestamps()// 指定如何从数据中提取事件时间.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { //函数接口 可以用lambda表达式@Overridepublic long extractTimestamp(WaterSensor sensor, long recordTimestamp) {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000; // 返回的时间戳单位是 ms}}));KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);// todo 1. 指定窗口分配器:基于事件时间的滚动窗口 watermark 才能起作用WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingEventTimeWindows.of(Time.seconds(10)));// todo 2. 指定窗口函数:增量聚合的规约函数SingleOutputStreamOperator<String> process = tumblingWindow.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String start = sdf.format(new Date(startTs));String end = sdf.format(new Date(endTs));long size = elements.spliterator().estimateSize();out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());}});process.print();env.execute();}
}
上面的代码中,我们把 WaterSensor 的 ts 属性当做数据自带的事件时间,因为单位是毫秒,所以我们 *1000。withTimestampAssigner()中的参数里的 recordTimeStamp 的默认值为 Long.MIN_VALUE,一般场景用不到。
测试输入:
s1,1,1
s1,2,2
s1,3,3
s1,5,5
s1,9,9
s1,10,10
s1,20,20
输出结果:
数据=WaterSensor{id='s1', ts=1, vc=1},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=2, vc=2},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=3, vc=3},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=5, vc=5},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=9, vc=9},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=10, vc=10},recordTs=-9223372036854775808
key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含5条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=3, vc=3}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=9, vc=9}]
数据=WaterSensor{id='s1', ts=20, vc=20},recordTs=-9223372036854775808
key=s1 的窗口[1970-01-01 08:00:10,1970-01-01 08:00:20]包含1条数据===>[WaterSensor{id='s1', ts=10, vc=10}]
可以看到,我们设置的窗口大小为 10 s,所以当WaterSensor{id='s1',ts=10,vc=10}来的时候才触发窗口计算输出并关闭。我们的窗口是左闭右开的。而且窗口并不会把不属于该窗口的数据包含进去。
2、乱序流中内置水位线设置
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
这里我们继续使用滚动窗口来演示:
我们只需在上面代码的基础上修改:
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 等待2s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));
这里我们设置等待时间为 2s。
测试输入:
s1,1,1
s1,2,2
s1,5,5
s1,7,7
s1,9,9
s1,10,10
s1,3,3
s1,11,11
s1,12,12
输出结果:
数据=WaterSensor{id='s1', ts=1, vc=1},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=2, vc=2},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=5, vc=5},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=7, vc=7},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=9, vc=9},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=10, vc=10},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=3, vc=3},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=11, vc=11},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=12, vc=12},recordTs=-9223372036854775808
key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含6条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=7, vc=7}, WaterSensor{id='s1', ts=9, vc=9}, WaterSensor{id='s1', ts=3, vc=3}]
key=s1 的窗口[1970-01-01 08:00:10,1970-01-01 08:00:20]包含3条数据===>[WaterSensor{id='s1', ts=10, vc=10}, WaterSensor{id='s1', ts=11, vc=11}, WaterSensor{id='s1', ts=12, vc=12}]
可以看到我们数据的事件时间达到10s时,窗口仍然没有关闭,此时依然可以接受迟到的数据,直到大于(等待时间+窗口关闭时间 = 12s)的数据来的时候,才会触发窗口计算关闭。
3、内置水位线原理
1)乱序流中水位线的生成原理
对于我们上面的乱序流中生成水位线原理,我们可以查看 <WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) 的底层源码:
2)有序流中水位线的生成原理
同样,我们查看 <WaterSensor>forMonotonousTimestamps() 方法的源码:
它也是返回一个对象,我们继续查看:
我们发现,有序水位线它的底层仍然是乱序水位线,只不过它的等待时间为 0ms 。
总结
内置水位线的生成原理:
- 都是周期性生产的:默认是 200ms(可以通过 env.getConfig().setAutoWatermarkInterval() 查看默认的水位线生成周期)
- 有序流:watermark = 当前最大事件时间 - 0 ms
- 乱序流:watermark = 当前最大事件时间 - 等待时间(也叫乱序程度) -1 ms
4、自定义水位线策略
1)周期性水位线生产策略
周期时间我们一般是不去随便修改的,默认为 200 ms。
下面我们模仿 Flink 的内置乱序流水位线策略来自定义一个水位线生成器:
public class MyPeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {private long maxTs; // 保存到当前为止最大的事件时间private long delayTs; // 保存等待时间public MyPeriodWatermarkGenerator(long delayTs) {this.maxTs = Long.MIN_VALUE + this.delayTs + 1;this.delayTs = delayTs;}/*** 每条数据来都会调用一次,用来提取最大的事件时间* @param event* @param eventTimestamp 提取到的事件时间* @param output*/@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTs = Math.max(maxTs,eventTimestamp);System.out.println("调用 onEvent 方法,获取当前最大的时间戳="+maxTs);}/*** 周期性调用: 生成 watermark* @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTs - delayTs - 1));System.out.println("调用onPeriodicEmit方法,生成watermark="+(maxTs - delayTs - 1));}
}
测试:
// 这里为了测试 一般不去修改水位线生成的周期时间
env.getConfig().setAutoWatermarkInterval(2000);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定自定义的watermark生成器.<WaterSensor>forGenerator(ctx -> new MyPeriodWatermarkGenerator<>(3000))// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));
我们可以发现,onPeriodEmit()方法是每周期执行一次。
2)断点式水位线生成器
断点式和周期式唯一的不同就是发送水位线的方法,上面的周期式中,我们使用 onPeriodicEmit()方法来周期性地发送水位线,而断电式则由 onEvent() 来发送水位线,也就是只要有新的一条数据来,它就会更新水位线。具体代码只需要修改以下部分:
@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTs = Math.max(maxTs,eventTimestamp);output.emitWatermark(new Watermark(maxTs - delayTs -1));System.out.println("调用 onEvent 方法,获取当前最大的时间戳="+maxTs+"生成watermark="+(maxTs - delayTs - 1));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 啥也不用干}
对于我们之前的 Kafka 数据源,我们现在可以指定它的水位线生产策略了:
env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2000L)),"kafkaSource").print();
注意:水位线策略的设置只需要设置一次!Kafka 数据源不需要设置时间戳读取器(也就是如何从数据源读取事件时间),因为对于 Kafka 数据源,框架可以直接从 Source 中获取事件时间。
2.4、水位线的传递
我们知道,水位线是数据流中插入的一个标记,用来表示事件时间的进展。它随着数据一起在任务间传递。
在直通式(forward)传输的情况下,数据和水位线都是按照本身的顺序依次传递、依次处理的。一旦水位线到达了算子任务,该任务就会将它内部的时钟设为这个水位线的时间戳。
然而,实际应用中往往上下游都有多个并行子任务,为了统一推进事件时间的进展,要求上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。这样,后续任务就不需要依赖原始数据中的时间戳(避免数据经过转化处理后发生改变),也可以知道当前事件时间了。
还有一个问题就是,在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步(有的子任务处理的数据的事件时间早,有的任务处理的的数据的事件时间晚,所以也就使得每个子任务的水位线时间戳有的快有的慢,也就使得不同子任务的逻辑时钟不同步),所以同一时刻发给下游任务的水位线可能并不相同。这个时候下游就要确定到底按照谁发来的水位线来确定为当前事件的最新进展,答案是最小的水位线,因为我们水位线的本质就是 “保证当前时间之前的数据,都已经到齐了”。
此外,多并行度情况下,我们的一条数据通常只会去往一个分区(分区就是子任务),但是我们的水位线是特殊的,它会广播到所有下游节点,来推进整个事件的进展。还需要注意的是,多并行度的情况下往往会对我们的水位线有影响,比如我们设置的等待时间为 3s,但当事件时间为 13 的数据到来后,它并不会立即关闭窗口,因为在多并行度下,水位线的更新是取最小的(取的是两个上游任务中的最小),比如:
上游并行任务(等待3s) 水位线
map1 -> 1-> 一条数据无法取最小
map2 -> 3-> 取最小=1 -2
map1 -> 5 -> 取最小=3 0
map2 -> 7-> 取最小=5 2
map1 -> 13 -> 取最小=7 4
map2 -> 14-> 取最小=13 10
2.5、设置空闲等待(Idleness)
在多个并行度的情况下,我们知道,水位线的更新需要至少通过两个上游并行任务的数据的事件时间来比较。而加入一个上游中只有一条数据会出现什么情况呢:
上游任务(等待3s) 事件时间 水位线
map1 -> 1-> 一条数据无法取最小
map2 -> 2-> 取最小=1 -2
map1 -> 3 -> 取最小=2 -1
map1 -> 5-> 一条数据无法取最小(还需要一条map2的数据)
map1 -> 7-> 一条数据无法取最小(还需要一条map2的数据)
map1 -> 13-> 一条数据无法取最小(还需要一条map2的数据)
可以看到,这样就会造成我们的逻辑时钟(水位线)迟迟无法推进,怎么解决呢?就是当我们的一个上游并行任务不再有数据到来时,我们下游任务不再等待。
public class WatermarkIdlenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 自定义分区器 把奇数和偶数分区到两个不同的map子任务// 输入的数字就是事件时间*1000msSingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("localhost", 9999).partitionCustom(new MyPartitioner(), num -> num) //根据自己来进行分区.map(Integer::parseInt)// todo 指定 watermark 策略.assignTimestampsAndWatermarks(WatermarkStrategy// 使用有序流的watermark生成器 升序.<Integer>forMonotonousTimestamps()// 指定如何从数据中提取事件时间.withTimestampAssigner((num, ts) -> num * 1000L)// 空闲等待时间 5s.withIdleness(Duration.ofSeconds(5)));SingleOutputStreamOperator<String> process = socketDS.keyBy(num -> num % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String start = sdf.format(new Date(startTs));String end = sdf.format(new Date(endTs));long size = elements.spliterator().estimateSize();out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());}});process.print();env.execute();}
}
上面的代码中,我们的并行度为2,由于数据源是 Socket ,所以 Source算子并行度只能为 1;而 输入的数据由于我们指定了 MyPartitioner 所以它会按照把奇数和偶数分到不同的 map算子;
在水位线传递的过程中,当上游没有偶数传递时,处理奇数的process算子需要等待偶数数据到来才能确定窗口的关闭时间。这是因为水位线的生成是基于事件时间的,而事件时间是根据数据本身的时间戳来计算的。处理奇数的process算子虽然只处理奇数数据,但是它需要等待偶数数据到来以便根据偶数数据的时间戳来确定窗口的关闭时间。如果处理奇数的process算子不等待偶数数据到来就关闭窗口,那么可能会出现数据丢失或计算结果不正确的情况。
2.6、迟到数据的处理
之前我们说,通过设置等待时间可以解决一定的数据乱序问题,但并不是 100% 的解决,因为往往不会把等待时间设置的太久(会造成计算的延迟),所以考虑到一些数据乱序程度无法预知,光靠等待时间是不行的(会造成结果不准确)。解决数据乱序问题我们除了设置等待时间,其实还有两招:设置窗口延迟关闭 和 使用侧输出流接收延迟数据。
2.6.1、设置窗口延迟关闭
我们可以在 window() 方法之后 .allowedLateness(Time.seconds(2)) 来设置关窗时间为 2s。窗口的触发计算和关闭是两码事,我们之前都是触发计算后直接关闭,这里我们设置延迟关闭 2s,也就是说,当有数据的事件时间达到窗口最大值,窗口被触发计算一次,但不会立即关闭,而是允许再多等一会,但是如果出现有比窗口最大关闭时间还要大2s的数据来时,窗口直接关闭。
public class WaterMarkAllowLaterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 等待2s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));SingleOutputStreamOperator<String> process = sensorDS.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) //设置运行窗口延迟关闭2s.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String start = sdf.format(new Date(startTs));String end = sdf.format(new Date(endTs));long size = elements.spliterator().estimateSize();out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());}});process.print();env.execute();}
}
测试输入:
s1,1,1
s1,2,2
s1,10,10
s1,12,12
s1,6,6
s1,3,3
s1,14,14
s1,5,5
s1,3,3
运行结果:
key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含2条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}]
key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含3条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=6, vc=6}]
key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含4条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=6, vc=6}, WaterSensor{id='s1', ts=3, vc=3}]
key=s1 的窗口[1970-01-01 08:00:10,1970-01-01 08:00:20]包含3条数据===>[WaterSensor{id='s1', ts=10, vc=10}, WaterSensor{id='s1', ts=12, vc=12}, WaterSensor{id='s1', ts=14, vc=14}]
可以看到,当数据 "s1,12,12" 到来时,窗口触发计算一次,但没有立即关闭,所以之后迟到的 "s1,6,6," 和 "s1,3,3" 仍然可以触发计算,但是当大于窗口最大关闭时间+2s(允许迟到的时间)的数据 "s1,14,14" 到来后,窗口彻底关闭,之后到来的 "s1,5,5" 和 "s1,3,3" 无法进行计算。
2.6.2、使用侧输出流接收延迟数据
流式数据没有 100% 的完美,数据迟到不可能彻底解决,为了尽可能让结果正确,让极端迟到的数据仍然能够计算,我们还可以使用侧输出流。
public class WaterMarkAllowLaterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 等待2s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));// 定义侧输出流OutputTag<WaterSensor> lateData = new OutputTag<>("lateData", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDS.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) //设置运行窗口延迟关闭2s.sideOutputLateData(lateData) // 关窗后的迟到数据放到侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String start = sdf.format(new Date(startTs));String end = sdf.format(new Date(endTs));long size = elements.spliterator().estimateSize();out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());}});process.print();// 从主流获取侧输出流并打印process.getSideOutput(lateData).printToErr();env.execute();}
}
测试输入:
s1,1,1
s1,2,2
s1,12,12
s1,5,5
s1,7,7
s1,14,14
s1,1,1
s1,2,2
运行结果:
2.7、迟到数据总结
2.7.1、乱序和迟到的区别
- 乱序:数据的顺序乱了,事件时间小的数据 比 事件时间大的数据 晚来
- 迟到:数据的事件时间 < 水位线时间,窗口关闭了才来
2.7.2、迟到数据的处理
- 设置乱序等待时间
- 如果开窗,设置窗口允许迟到,延迟关闭窗口
- 关窗后的数据放到侧输出流
对数据的延迟时间要做到心中有数
- 等待时间,设置一个不是特别大的,一般都是秒级,在 乱序和延迟中做取舍
- 允许迟到时间(窗口延迟关闭时间),置考虑大部分的迟到数据
- 极端迟到数据放到侧输出流,最后单独拿出来合并一下就好了
耗费三四天时间终于把这一块学完了,时间语义是非常重要的内容,需要好好理解记忆,也要知道怎么通过代码实现。
相关文章:

Flink(九)【时间语义与水位线】
前言 2023-12-02-20:05,终于写完啦,最近状态不错。刚写完又收到了她的消息哈哈哈哈,开心。 再去全力打拼一次,奋战一场,就算最后打了败仗也无所谓,至少你留下了足迹。 《解忧杂货店》 1、时间语义 …...

torch中的随机数种子
如何在torch生成随机数时,设置随机种子,要求每次调用生成的随机数都一样 在 PyTorch 中,可以使用 torch.manual_seed(seed) 函数设置随机种子,以确保每次运行代码时生成的随机数都一样。 以下是一个示例代码,展示了如…...

C 标准库 <math.h>
C 标准库 <math.h> C <math.h>头文件声明了一组函数来执行数学运算,例如:sqrt()计算平方根,log()查找数字的自然对数,等等。 math.h 头文件定义了各种数学函数和一个宏。在这个库中所有可用的函数都带有一个 double…...

一篇带你串通数据结构
文章目录 导论数据结构的定义数据结构在计算机科学中的重要性为什么学习数据结构很重要 1、基本概念1.1、数据、数据元素和数据项的概念1.2、数据对象与数据结构的关系1.3、逻辑结构与物理结构 2、线性结构2.1、数组2.2、链表2.3、栈2.4、队列 3、非线性结构3.1、树3.2、图 4、…...

网络篇---第九篇
系列文章目录 文章目录 系列文章目录前言一、说说TCP/IP四层网络模型二、说说域名解析详细过程?三、 IP 地址分为几类,每类都代表什么,私网是哪些?四、说说TCP 如何保证可靠性的?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家…...

Python基础学习快速入门
文章目录 Number变量String字符串Def函数Class类List列表Tuple元组Dictionary字典Set集合值与引用类型if条件控制Loop循环 Number变量 python直接赋值,不需要定义变量类型。不需要**,逗号结尾符 使用print**直接进行输出 #赋值 a 1.0 print(a)a 7 print(a)p…...

C语言-预处理与库
预处理、动态库、静态库 1. 声明与定义分离 一个源文件对应一个头文件 注意: 头文件名以 .h 作为后缀头文件名要与对应的原文件名 一致 例: 源文件:01_code.c #include <stdio.h> int num01 10; int num02 20; void add(int a, in…...

王道数据结构课后代码题p40 9.给定一个带表头结点的单链表,写出算法 : 按递增次序输出单链表中各结点的数据元素并释放结点 (c语言代码实现)
本题代码如下(有注释) void delete_min(linklist* head) {while ((*head)->next ! NULL)//循环到只剩下头节点{lnode* pre *head;//pre为元素最小结点的前驱结点指针lnode* p (*head)->next;//p为工作指针lnode* q;//指向被删除的结点while (p-…...

对系统的 Go 版本进行升级
方法一 直接升级系统的 Go 版本 注意以下操作仅适用于:amd64 架构的 Centos 系统。如果需要适配其他架构,需要自行编写代码实现。 手动执行: # 显示当前版本 go version # 查看环境变量 cat /etc/profile # 进入 go 的安装目录,…...

【从删库到跑路 | MySQL总结篇】事务详细介绍
个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【MySQL学习专栏】🎈 本专栏旨在分享学习MySQL的一点学习心得,欢迎大家在评论区讨论💌 目录 一、事务…...

七牛云1024创建节-赛后有感
距离比赛结束已经过去快半个月时间,七牛云又发起了有奖征文的活动,正好借此机会记录一下自己参加这次比赛的经历,感受和一些比赛的心得。 如何了解到的比赛信息 其实我很早就开始关注七牛云了,最早是在今年二三月的时候…...

CSS 选择器优先级,!important 也会被覆盖?
目录 1,重要性2,专用性3,源代码顺序 CSS 属性值的计算过程中。其中第2步层叠冲突只是简单说明了下,这篇文章来详细介绍。 层叠冲突更广泛的被称为 CSS选择器优先级计算。 为什么叫层叠冲突,可以理解为 CSS 是 Cascadi…...

关于src别名的配置之tsconfig.json配置
tsconfig.json {"compilerOptions": {"baseUrl": "./", // 解析非相对模块的基地址,默认是当前目录"paths": { //路径映射,相对于baseUrl"/*": ["src/*"] }} } ① "baseUrl": &…...

Mybatis如何执行批量操作
文章目录 Mybatis如何执行批量操作使用foreach标签 使用ExecutorType.BATCH如何获取生成的主键 Mybatis如何执行批量操作 使用foreach标签 foreach的主要用在构建in条件中,它可以在SQL语句中进行迭代一个集合。foreach标签的属性主要有item,index&…...

LeetCode 1094. 拼车:优先队列
【LetMeFly】1094.拼车:优先队列 力扣题目链接:https://leetcode.cn/problems/car-pooling/ 车上最初有 capacity 个空座位。车 只能 向一个方向行驶(也就是说,不允许掉头或改变方向) 给定整数 capacity 和一个数组…...

项目开发维护技术文档(总结梳理)
目录 一、项目背景 二、架构设计 1.技术栈 2.架构图 3.代码结构 三、模块划分 1.用户模块 2.商品模块 四、开发规范 1.命名规范 2.代码格式 3.版本控制 五、部署流程 1.环境要求 2.部署流程 六、问题解决 1.数据库连接异常 2.Redis缓存失效 七、参考资料 项…...

01_学习使用javax_ws_rs_上传文件
文章目录 1 前言2 Maven 依赖3 上传接口4 如何解析 MultipartFormDataInput5 结语 1 前言 使用 Spring MVC 来处理文件上传,想必是大家耳熟能详的了,如下代码: ResponseBody PostMapping("/upload") public String upload(Request…...

MFC 发布CLXHHandleEngine动态库1.0.0.0版本
第一版发布以下功能,此项目使用VS2013创建,项目配置包括Unicode的Mdd,md与多字节版本: //MFC Grid表格 #include "../MFCGridCtrl/GridCtrl.h" //使用AES与Base64加密解密可以与java中的AES加解密衔接 //AES加密解密 #include &q…...

MicroPython 基于microdot框架搭建网页服务器
MicroPython 基于microdot框架搭建网页服务器 简介简单demo 简介 Microdot是一个极简的Python web框架,灵感来自于Flask,它被设计用来运行在资源有限的系统上,如微控制器。它运行在标准的Python和MicroPython上。 API参考microdot 资源下载m…...

FL Studio21.2汉化永久中文语言包
FL Studio21.2这款软件在国内被广泛使用,因此又被称为"水果"。它提供音符编辑器,可以针对作曲者的要求编辑出不同音律的节奏,例如鼓、镲、锣、钢琴、笛、大提琴、筝、扬琴等等任何乐器的节奏律动。此外,它还提供了方便快…...

Glide结合OkHttp保证短信验证接口携带图形验证码接口返回Cookie值去做网络请求
一、实现效果 二、步骤 注意:仅展示核心部分代码 1、导入依赖 api com.github.bumptech.glide:glide:4.10.0 kapt com.github.bumptech.glide:compiler:4.10.0 api com.squareup.okhttp3:okhttp:3.11.0 api com.squareup.okhttp3:logging-interceptor:3.11.02、自…...

怎样用Ajax提交from表单并接收其中的json数据
怎样用Ajax提交表单并接收其中的json数据 需求:实现点击按钮后,数据以表单形式提交至服务器,并接收来自服务器的返回数据。过程中页面不刷新。 AJAX 不是新的编程语言,而是一种使用现有标准的新方法。AJAX 是与服务器交换数据并…...

【动态规划】LeetCode-746LCR 088.使用最小花费爬楼梯
🎈算法那些事专栏说明:这是一个记录刷题日常的专栏,每个文章标题前都会写明这道题使用的算法。专栏每日计划至少更新1道题目,在这立下Flag🚩 🏠个人主页:Jammingpro 📕专栏链接&…...

Unity 接入TapADN播放广告时闪退 LZ4JavaSafeCompressor
通过跟踪安卓日志,发现报如下错误 Didnt find class "com.tapadn.lz4.LZ4JavaSafeCompressor" 解决方案: 去掉Minify这边的勾选,再打包即可。...

【九】linux下部署frp客户端服务端实践(内网穿透)
linux下部署frp客户端服务端实践 简介: 今天有一个这样的需求,部署在公司内部局域网虚拟机上的服务需要在外网能够访问到,这不就是内网穿透的需求吗,之前通过路由器实现过,现在公司这块路由器不具备这个功能了&#x…...

华为1+x网络系统建设与运维(中级)-练习题2
一.设备命令 LSW1 [Huawei]sys LSW1 同理可得,给所有设备改名 二.VLAN LSW1 [LSW1]vlan ba 10 20 [LSW1]int g0/0/1 [LSW1-GigabitEthernet0/0/1]port link-type trunk [LSW1-GigabitEthernet0/0/1]port trunk allow-pass vlan 10 20 [LSW1-GigabitEthernet0/0/1]in…...

自定义类型-结构体,联合体和枚举-C语言
引言 能看到结构体,说明C语言想必学习的时间也不少了,在之前肯定也学习过基本数据类型,包括整型int,浮点型float等等。可是在日常生活中,想要描述一个事物并没有那么简单。比如,你要描述一本书,…...

Windows 安装redis,设置开机自启动
Windows 安装redis,设置开机自启动 文章目录 Windows 安装redis,设置开机自启动下载, 解压到指定目录设置redis密码启动redis服务端停止redis服务端设置自启动 下载, 解压到指定目录 官网地址: https://redis.io/ 安装包下载地址: https://github.com/tporadowski/redis/relea…...

Windows安装Mysql Workbench及常用操作
Mysql Workbench是mysql自带的可视化操作界面,功能是强大的,但界面和navicat比,就是觉得别扭,但其实用惯了也还好,各有特色吧。这里记录一下常用的操作。 官方手册:MySQL Workbench 一、安装 1. 下载 官方…...

【计算机网络】15、NAT、NAPT 网络地址转换、打洞
文章目录 一、概念二、分类(主要是传统 NAT)2.1 基本 NAT2.2 NAPT 三、访问NAT下的内网设备的方式3.1 多拨3.2 端口转发、DMZ3.3 UPnP IGD、NAT-PMP3.4 服务器中转:frp 内网穿透3.4.1 NAT 打洞3.4.2 NAT 类型与打洞成功率3.4.2.1 完全圆锥形 …...