大数据之Flink(四)
11、水位线
11.1、水位线概念
一般实时流处理场景中,事件时间基本与处理时间保持同步,可能会略微延迟。
flink中用来衡量事件时间进展的标记就是水位线(WaterMark)。水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容是一个时间戳,用来指示当前的事件时间。一般使用某个数据的时间戳作为水位线的时间戳。
水位线特性:
- 水位线是插入到数据流中的一个标记
- 水位线主要内容是一个时间戳用来表示当前事件时间的进展
- 水位线是基于数据的时间戳生成的
- 水位线时间戳单调递增
- 水位线可通过设置延迟正确处理乱序数据
- 一个水位线WaterMark(t)表示在当前流中事件时间已经达到了时间戳t,代表t之前的所有数据都到齐了,之后流中不会出现时间戳小于或等于t的数据
以WaterMark等2s为例:
**注意:**flink窗口并不是静态准备好的,而是动态创建的,当有罗在这个窗口区间范围的数据达到时才创建对应的窗口。当到达窗口结束时间后窗口就触发计算并关闭,触发计算和窗口关闭两个行为也是分开的。
11.2、生成水位线
11.2.1、原则
要性能就设置低水位线或不设置水位线,直接使用处理时间语义可得到最低的延迟,但有可能遗漏数据。
如要保证数据全部到齐可以设置高水位线,但会影响性能,计算会有延迟。
11.2.2、内置水位线
1、有序流中内置水位线设置
直接调用
package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//升序的WaterMark,没有等待时间.<WaterSensor>forMonotonousTimestamps()//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();env.execute();}
}
2、乱序流中内置水位线设置
设置等待时间为2秒,即12秒时触发窗口关闭
package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package sink* @Date 2024/6/5 21:57* @description:*/
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark,有等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();env.execute();}
}
结果:
可见当发送数据=WaterSensor{id=‘s1’, ts=12, vc=12}recordTS=-9223372036854775808时使得[0,10)窗口关闭,但是WaterSensor{id=‘s1’, ts=12, vc=12}不会在[0,10)窗口中,而是在[10,20)窗口中。
11.2.3、内置WaterMark生成原理
- 都是周期性生成的,默认是200ms
- 有序流:WaterMark=当前最大的事件时间-1ms
- 乱序流:WaterMark=当前最大的事件时间-延迟时间-1ms
11.3、水位线的传递
11.3.1、多并行度下水位线传递
水位线传递以最小的WaterMark为准,否则提前触发关窗造成数据丢失。
演示WaterMark多并行度下的传递
package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package sink* @Date 2024/6/5 21:57* @description:*/
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//演示WaterMark多并行度下的传递env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark,有等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();env.execute();}
}
结果:
在多并行度下,增加了一个WaterMark的更新操作。当数据WaterSensor{id=‘s1’, ts=12, vc=12}到来时,一个WaterMark,5-2=3,一个WaterMark是12-2=10,因WaterMark取小原则WaterMark是3未更新为10。当数据WaterSensor{id=‘s1’, ts=13, vc=13}到来,WaterMark更新为10,进而触发窗口关闭。
结论:在多并行度下,当触发WaterMark的下一条数据到来时才能进行关窗操作。
11.3.2、水位线空闲等待设置
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个座位当前任务的事件时钟,就会导致当前Task的水位线无法推进,从而导致窗口无法触发。这时候可以设置空闲等待。
package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package * @Date 2024/6/5 21:57* @description:*/
public class WatermarkIdlenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<Integer> streamOperator = env.socketTextStream("192.168.132.101", 7777)//自定义分区器,数据%分区数,只输入奇数,都只会去往一个子任务.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {return Integer.parseInt(key) % numPartitions;}}, value -> value).map(value -> Integer.parseInt(value)).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000)//空闲等待5s.withIdleness(Duration.ofSeconds(5)));//分成两组:奇数一组,偶数一组,开10s的事件时间滚动窗口streamOperator.keyBy(value -> value%2).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + integer + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}}).print();env.execute();}
}
11.4、迟到数据处理
11.4.1、推迟WaterMark推进
在WaterMark产生时设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多时间进入窗口。
forBoundedOutOfOrderness(Duration.ofSeconds(2))
11.4.2、设置窗口延迟关闭
flink的窗口允许迟到数据。当触发窗口计算后会先计算当前结果,但此时并不会关闭窗口。以后每来一条数据就触发一次窗口计算(增量计算)。直到WaterMark超过了窗口结束时间+推迟时间,窗口才会关闭。
.allowedLateness(Time.seconds(2))
package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package * @Date 2024/6/5 21:57* @description:*/
public class WatermarkAllowLatenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark,有等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10)))//推迟2秒关窗.allowedLateness(Time.seconds(2)).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();env.execute();}
}
11.4.3、使用侧流接收迟到数据
使用.sideOutputLateData()函数将迟到数据放到侧输出流
package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.lang.reflect.Type;
import java.time.Duration;/*** @Title:* @Author lizhe* @Package sink* @Date 2024/6/5 21:57* @description:*/
public class WatermarkAllowLatenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark,有等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));OutputTag<WaterSensor> outputTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10)))//推迟2秒关窗.allowedLateness(Time.seconds(2))//关窗后的迟到数据放到侧输出流.sideOutputLateData(outputTag).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();process.getSideOutput(outputTag).print("侧输出流");env.execute();}
}
11.4.4、总结
-
乱序与迟到的区别:
**乱序:**数据的顺序乱了,出现时间早的比时间晚的晚来
**迟到:**数据的时间戳<当前的WaterMark
-
乱序与迟到数据的处理
- 在WaterMark中指定乱序等待时间
- 如果开窗设置窗口允许迟到
- 关窗后的迟到数据放入侧输出流
-
WaterMark等待时间与窗口允许迟到时间并不能等同和替换
WaterMark涉及到窗口第一次计算时间,WaterMark等待时间过长会导致计算延迟变大。
窗口允许迟到时间只是要保证计算结果更加准确,但不应影响数据计算延迟。
所以二者不能等价代替。
-
WaterMark等待时间与窗口允许迟到时间设置经验
WaterMark等待时间不能设置过大,一般秒级。窗口允许迟到时间只考虑大部分的迟到数据。极端情况小部分迟到数据使用侧输出流。
12、基于时间的合流
上面提到的connect合流可满足大部分需求。但统计固定时间内两条流数据的匹配情况,对于connect要使用自定义,但可以使用更简单的Window来表示,flink 内置了API。
12.1、窗口联结Window Join
- 落在同一个时间窗口范围内才能匹配
- 根据keyby的key来进行匹配关联
- 只能拿到匹配上的数据,类似有固定时间范围的inner join
package waterMark;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @Title: WindowJoinDemo* @Author lizhe* @Package Window Join* @Date 2024/6/8 21:11* @description:*/
public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1,1),Tuple3.of("a", 11,1),Tuple3.of("b", 2,1),Tuple3.of("b", 12,1),Tuple3.of("c", 14,1),Tuple3.of("d", 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));DataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0).equalTo(r2 -> r2.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据调用join方法* @param first ds1的数据* @param second ds2的数据* @return* @throws Exception*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "----" + second;}});join.print();env.execute();}
}
12.2、间隔联结Interval Join
有时要处理的时间间隔并不固定。要匹配的数据可能刚开卡在窗口边缘两侧造成匹配失败。所有窗口联结并不能满足要求。
间隔联结的思路是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔指定,上下界的偏移,负号代表时间往前,正号代表时间往后,看这期间是否有来自另一条流的匹配。(只支持事件时间语义)
package waterMark;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @Title:* @Author lizhe* @Package* @Date 2024/6/8 21:11* @description:*/
public class IntervalJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1,1),Tuple3.of("a", 11,1),Tuple3.of("b", 2,1),Tuple3.of("b", 12,1),Tuple3.of("c", 14,1),Tuple3.of("d", 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));KeyedStream<Tuple2<String, Integer>, String> stream1 = ds1.keyBy(value -> value.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> stream2 = ds2.keyBy(value -> value.f0);stream1.intervalJoin(stream2).between(Time.seconds(-2),Time.seconds(2)).process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上才会调用方法* @param left stream1的数据* @param right stream2的数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {//进入这个方法是关联上的数据out.collect(left+"----"+right);}}).print();env.execute();}
}
1.17版本支持将该匹配上的迟到数据通过侧输出流输出
如果当前数据的事件时间<当前的WaterMark就是迟到数据,主流的process不处理。
但在between后使用SideOutputLeftLateData(),SideOutputRightLateData()函数将迟到数据放到侧输出流
13、处理函数
DataStream更下层的API,统一称为process算子,接口就是process function(处理函数)
13.1、基本处理函数
处理函数提供一个定时服务(TimeService),可以通过它访问流中的事件、时间戳、水位线,甚至可以注册定时事件。处理函数集成了AbstractRichFunction,拥有富函数类的所有特性,可以访问状态和其他运行时信息。处理函数可以直接将数据输出的侧输出流。处理函数是最为灵活的处理方法,可实现各种自定义逻辑。
分类:
- ProcessFunction
- KeyedProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
13.2、按键分区处理函数KeyedProcessFunction
只有在KeyedStream才支持使用TimeService设置定时器。
13.2.1、定时器和定时服务
keyedStream.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据处理一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//数据中提取出来的事件时间,如果没有则为nullLong timestamp = ctx.timestamp();//定时器TimerService timerService = ctx.timerService();//注册定时器:处理时间timerService.registerEventTimeTimer(10L);//注册定时器:事件时间timerService.currentProcessingTime();//删除定时器:事件时间timerService.deleteEventTimeTimer(10L);//删除定时器:处理时间timerService.deleteProcessingTimeTimer(10L);//获取当前处理时间,即系统时间timerService.currentProcessingTime();//获取当前WaterMarktimerService.currentWatermark();}});
事件时间定时器:
package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title: KeyedProcessTimerDemo* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;});SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());keyedStream.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据处理一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//数据中提取出来的事件时间,如果没有则为nullLong timestamp = ctx.timestamp();//定时器TimerService timerService = ctx.timerService();//注册定时器:处理时间timerService.registerEventTimeTimer(5000L);System.out.println("当前时间"+timestamp+",注册了一个5s的定时器");}/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println("现在时间"+timestamp + "定时器触发,key为"+ctx.getCurrentKey());}}).print();env.execute();}
}
输出:
TimeService会以key和时间戳作为标准,对定时器去重;即对每个key和时间戳最多只有一个定时器,如果注册了多次,onTimer()方法也将被调用一次。
处理时间定时器:
package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title: KeyedProcessTimerDemo* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;});SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());keyedStream.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据处理一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//数据中提取出来的事件时间,如果没有则为nullLong timestamp = ctx.timestamp();//定时器TimerService timerService = ctx.timerService();long currentProcessingTime = timerService.currentProcessingTime();timerService.registerProcessingTimeTimer(currentProcessingTime+5000L);System.out.println("当前时间"+currentProcessingTime+",注册了一个5后的定时器,key为"+ctx.getCurrentKey() );}/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println("现在时间"+timestamp + "定时器触发,key为"+ctx.getCurrentKey());}}).print();env.execute();}
}
总结:
-
事件时间定时器通过WaterMark来触发的,WaterMark>=注册时间。
注意:
WaterMark=当前最大事件时间-等待时间-1ms,因为-1ms会推迟一条数据。比如5s的定时器,如果等待=3s,WaterMark=8s-3s-1ms=4999ms,不会触发5s的定时器。需要WaterMark=9s-3s-1ms=5999ms才能触发5s的定时器
-
在Process中获取当前的WaterMark显示的是上一次的的WaterMark(因为Process还没接收到这条数据对应生成的新WaterMark)
13.3、应用案例
统计一段时间内出现次数最多的水位。统计10s内出现次数最多的两个水位,这两个水位每5s更新一次。
可使用滑动窗口实现按照不同水位进行统计
后面仔细看吧,可能有问题!!!
package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.*;/*** @Title:* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class TopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> operator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));//1、按照vc分组,开窗聚合(增量计算+全量打标签)//开窗聚合后就是普通的流,丢失了窗口信息需要自己打窗口标签(WindowEnd)SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> aggregate = operator.keyBy(value -> value.getVc()).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(), new WindowResult());//2、按照窗口标签keyby,保证同一个窗口时间范围的结果到一起去。排序去TopNaggregate.keyBy(value -> value.f2).process(new TopN(2)).print();env.execute();}public static class VcCountAgg implements AggregateFunction<WaterSensor,Integer,Integer>{@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {return ++accumulator;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下:* 第一个:输入类型=增量函数的输出 count值* 第二个:输出类型=Tuple(vc,count,WindowEnd)带上窗口结束时间的标签* 第三个:key类型,vc,Integer* 第四个:窗口类型*/public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer,Integer,Long>,Integer, TimeWindow>{@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {Integer count = elements.iterator().next();long windowsEnd = context.window().getEnd();out.collect(Tuple3.of(key,count,windowsEnd));}}public static class TopN extends KeyedProcessFunction<Long,Tuple3<Integer, Integer, Long>,String>{//存不同窗口的统计结果 key=windowEnd value=list数据private Map<Long, List< Tuple3<Integer,Integer,Long>>> dataListMap;//要取的Top的数量private int threshold;public TopN(int threshold) {dataListMap = new HashMap<>();this.threshold = threshold;}@Overridepublic void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {//进入这个方法只是一条数据,要排序,要存起来,不同的窗口要分开存//1、存到HashMap中Long windowEnd = value.f2;if (dataListMap.containsKey(windowEnd)){//1.1 包含vc 不是该vc的第一条,直接加到list中List<Tuple3<Integer, Integer, Long>> tuple3List = dataListMap.get(windowEnd);tuple3List.add(value);}else {//1.1 不包含vc是该vc的第一条,需要初始化listList<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();dataList.add(value);dataListMap.put(windowEnd,dataList);}//2、注册一个定时器,WindowsEnd+1ms即可(同一个窗口范围应该同时输出的,只不过是一条条调用ProcessElement方法,只需延迟1ms)ctx.timerService().registerProcessingTimeTimer(windowEnd+1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);//定时器触发,同一个窗口范围的计算结果攒齐了,开始、排序、取TopNLong windowEnd = ctx.getCurrentKey();//1、排序List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.sort(new Comparator<Tuple3<Integer, Integer, Long>>() {@Overridepublic int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {return o2.f1-o1.f1;}});//2、取TopNStringBuilder outStr = new StringBuilder();outStr.append("==========\n");//遍历 排序后的list,取出前threshold个,dataList要是不够dataList个取dataList.size()for (int i = 0; i < Math.min(threshold,dataList.size()); i++) {Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);outStr.append("Top"+(i+1)+"\n");outStr.append("vc="+vcCount.f0+"\n");outStr.append("count="+vcCount.f1 + "\n");outStr.append("窗口结束时间"+ vcCount.f2 + "\n");}//用完的list及时清理dataList.clear();out.collect(outStr.toString());}}
}
13.4、侧输出流
使用侧输出流实现水位告警
package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;/*** @Title:* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;});SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);final OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);SingleOutputStreamOperator<WaterSensor> process = singleOutputStreamOperator.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {//使用侧输出流告警if (value.getVc() > 10) {ctx.output(warnTag, "当前水位=" + value.getVc() + ",大于阈值10!");}out.collect(value);}});process.print();process.getSideOutput(warnTag).printToErr("warn");env.execute();}
}
14、状态管理
14.1、flink的状态
分为有状态和无状态两种。
无状态的算子任务只要观察每个独立事件,根据当前输入的数据直接转换输出结果。如:map、filter、flatMap。
有状态算子任务除当前数据外还要其他数据来得到计算结果。“其他数据”就是状态。如:聚合算子、窗口算子。
状态的分类:
-
托管状态和原始状态
托管状态:由flink统一管理使用时只需要调用相应接口。
原始状态:自定义的相当于开辟了一块内存自己管理,自己实现状态的序列化和故障恢复。
通常采用flink托管状态(重点)
-
算子状态和按键分区状态
通过keyby()函数的称为按键分区状态,其他为算子状态
14.2、算子状态
对于一个并行子任务,处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。
算子状态可以用在所有算子上,类似本地变量。
14.3、按键分区状态
状态根据输入流中定义的键来维护和访问,也就keyby后能用。
14.3.1、值状态
状态只保存一个值。
水位相差10则报警
package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.time.Duration;/*** @Title: KeyedValueStateDemo* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:水位相差10则报警*/
public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor lastVc = new ValueStateDescriptor<Integer>("lastVc", Integer.class);lastVcState=getRuntimeContext().getState(lastVc);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();if (Math.abs(value.getVc()-lastVc)>10) {out.collect("传感器id="+value.getId()+",当前水位值="+value.getVc()+",上一条水水位值="+lastVc+"相差超过10");}lastVcState.update(value.getVc());}}).print();env.execute();}
}
14.3.2、列表状态
将要保存的数据以列表形式进行保存
针对每种传感器输出最高的三个水位值
package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.ArrayList;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:针对每种传感器输出最高的三个水位值*/
public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ListState<Integer> vcListState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {vcListState.add(value.getVc());ArrayList<Integer> arrayList = new ArrayList<>();for (Integer vc : vcListState.get()) {arrayList.add(vc);}arrayList.sort((o1,o2)->{return o2-o1;});if (arrayList.size() > 3) {arrayList.remove(3);}out.collect("传感器id="+value.getId()+",最大三个水位值="+arrayList.toString());vcListState.update(arrayList);}}).print();env.execute();}
}
14.3.3、map状态
把键值对最为状态保存起来
统计每种传感器每种水位值出现的次数
package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:统计每种传感器每种水位值出现的次数*/
public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {MapState<Integer,Integer> vcCountMapState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Integer.class, Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {Integer vc = value.getVc();if (vcCountMapState.contains(vc)){int count = vcCountMapState.get(vc) ;vcCountMapState.put(vc,++count);}else {vcCountMapState.put(vc, 1);}StringBuilder outStr = new StringBuilder();outStr.append("传感器id为"+value.getId()+"\n");for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString()+"\n");}outStr.append("==============");out.collect(outStr.toString());}}).print();env.execute();}
}
14.3.4、规约状态
对添加的数据进行规约,将规约聚合后的值作为状态保存
计算每种传感器的水位和
package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:计算每种传感器的水位和*/
public class KeyedReduceStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ReducingState<Integer> vcSum;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcSum = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("vcSum", new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {vcSum.add(value.getVc());out.collect("传感器id="+value.getId()+"水位值和="+vcSum.get());}}).print();env.execute();}
}
14.3.5、聚合状态
类似规约状态,相比于规约状态,聚合里有个累加器来表示状态,聚合的状态类型可与输入数据类型不同
计算水位平均值
package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:计算水位平均值*/
public class KeyedAggregateStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {AggregatingState<Integer,Double> vcAggregatingState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("vcAggregatingState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return Tuple2.of(0,0);}@Overridepublic Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {return Tuple2.of(accumulator.f0+value,accumulator.f1 + 1);}@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return accumulator.f0*1D/accumulator.f1;}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {return null;}}, Types.TUPLE(Types.INT, Types.INT)));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {vcAggregatingState.add(value.getVc());Double vcAvg = vcAggregatingState.get();out.collect("传感器id="+value.getId()+"平均水位="+vcAvg);}}).print();env.execute();}
}
14.3.6、状态生存时间TTL
状态创建时候,失效时间=当前时间+TTL。可对时效时间进行更新,创建配置对象,调用状态描述器启动TTL
package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title: KeyedValueStateDemo* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:*/
public class KeyedStateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5))//过期时间5s.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//状态的创建和写入会刷新过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//不返回过期的状态值.build();ValueStateDescriptor<Integer> lastVc = new ValueStateDescriptor<>("lastVc", Integer.class);lastVc.enableTimeToLive(stateTtlConfig);lastVcState=getRuntimeContext().getState(lastVc);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {Integer value1 = lastVcState.value();out.collect("key="+value.getId()+"状态值"+ value1);lastVcState.update(value.getVc());}}).print();env.execute();}
}
14.4、算子状态
状态分为:列表状态ListState、联合列表状态ListUnionState、广播状态BroadcastState
算子并行实例上定义的状态,作用范围被限定为当前算子任务。
package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.List;/*** @Title: OperatorListDemo* @Author lizhe* @Package state* @Date 2024/6/13 21:50* @description:在map算子中计算数据个数*/
public class OperatorListDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("192.168.132.101", 7777).map(new MyCountMapFunction()).print();env.execute();}public static class MyCountMapFunction implements MapFunction<String,Long>, CheckpointedFunction{private long count =0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** 本地变量持久化:将本地变量拷贝到算子状态* @param context* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshotState");state.clear();state.add(count);}/*** 初始化本地变量:程序恢复时,从状态中将数据添加到本地变量,每个子任务调用一次* @param context* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState");state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state", Long.class));if (context.isRestored()){for (Long aLong : state.get()) {count+=aLong;}}}}
}
算子状态List与UnionList区别:
- list状态:轮询均分给新的子任务
- UnionList状态:将原先多个子任务状态的合并成一份完整的。给新的并行子任务每人一份完整的
广播状态:算子并行子任务都保持同一份全局状态。
水位超过指定的阈值发送告警,阈值可以动态修改
package state;import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;/*** @Title: OperatorListDemo* @Author lizhe* @Package state* @Date 2024/6/13 21:50* @description:水位超过指定的阈值发送告警,阈值可以动态修改*/
public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});//配置流用来广播配置DataStreamSource<String> configDS = env.socketTextStream("192.168.132.101", 8888);final MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>("configDS", String.class, Integer.class);BroadcastStream<String> broadcastStream = configDS.broadcast(descriptor);BroadcastConnectedStream<WaterSensor, String> connect = sensorDS.connect(broadcastStream);connect.process(new BroadcastProcessFunction<WaterSensor, String, String>() {/*** 数据流的处理方法* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(descriptor);Integer integer = broadcastState.get("threshold");//如果数据流先来,广播流为空,要判空integer=integer==null ?0:integer;if (value.getVc()>integer){out.collect("超过阈值,阈值="+integer);}}/*** 广播后的配置流处理方法* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {BroadcastState<String, Integer> state = ctx.getBroadcastState(descriptor);state.put("threshold",Integer.valueOf(value));}}).print();env.execute();}}
14.5、状态后端
状态的存储、访问以及维护都是由一个可插拔的组件决定的,这个组件为状态后端,主要负责管理本地状态的存储方式和位置。
14.5.1、状态后端分类
状态后端开箱即用,可不改变程序逻辑独立配置。有两种,一种为哈希表状态后端(默认),一种为内嵌RocksDB状态后端。
- 哈希表状态后端:状态存在内存,直接把状态当对象,存在TaskManager的JVM堆上,以键值对方式存储。
- RocksDB状态后端:RocksDB是kv型数据库,将数据存到硬盘。
相关文章:

大数据之Flink(四)
11、水位线 11.1、水位线概念 一般实时流处理场景中,事件时间基本与处理时间保持同步,可能会略微延迟。 flink中用来衡量事件时间进展的标记就是水位线(WaterMark)。水位线可以看作一条特殊的数据记录,它是插入到数…...

《Web性能权威指南》-网络技术概览-读书笔记
注:TCP/IP等知识牵涉面太广,且不说本文,哪怕是原书,限于篇幅,很多知识点都是大致介绍下。如果想深入理解,需要更一步Google相关页面资料。 延迟与带宽 WPO,Web Performance Optimization&…...

最新版php进销存系统源码 ERP进销存专业化管理 永久免费升级更新+完整图文搭建教程
在当今信息化时代,企业管理的高效性与精确性是企业竞争力的关键。分享一款最新版的PHP进销存系统源码,一款专为企业设计的ERP进销存管理工具,其丰富的功能、灵活的子账号设置、强大的权限控制、以及独家升级的合同管理和报价单打印功能&#…...

【高效办公】三、两台电脑共享鼠标、键盘和文件,两台电脑当一个用的神操作!barrier
1.下载 ubuntu:sudo apt install barrierwindows:https://github.com/debauchee/barrier/releases-下载 : 2.4.0-Assets-BarrierSetup-2.4.0-release.exe 2.运行 ubuntu:sudo apt install barrierwindows:https://github.com/debauchee/barrier/releases-下载 : 2.4.0-Asset…...

智能合约系统DAPP开发
智能合约系统DAPP(去中心化应用)的开发是一个复杂且综合性的过程,它结合了区块链技术、智能合约编程、前端开发以及安全性等多方面的知识和技能。以下是对智能合约系统DAPP开发过程的详细概述: 一、需求分析 明确应用场景…...

宠物狗检测-目标检测数据集(包括VOC格式、YOLO格式)
宠物狗检测-目标检测数据集(包括VOC格式、YOLO格式) 数据集: 链接:https://pan.baidu.com/s/1roegkaGAURWUVRR-D7OzzA?pwddxv6 提取码:dxv6 数据集信息介绍: 共有20580 张图像和一一对应的标注文件 标…...

2.5多任务示例编程2
1.CUBEMX配置 2.代码 void StartADC(void const * argument) {/* USER CODE BEGIN StartADC */TickType_t pxPreviousWakeTimexTaskGetTickCount();/* Infinite loop */for(;;){HAL_ADC_Start(&hadc1);if(HAL_ADC_PollForConversion(&hadc1,100)HAL_OK){uint32_t valu…...

JavaWeb - 4 - Vue Ajax
一.Vue Vue Vue是一套前端框架,免除原生JavaScript中的DOM操作,简化书写 基于MVVM(Model-VIew-ViewModel)思想,实现数据的双向绑定,将编程的关注点放在数据上 官网:https://cn.vuejs.org…...

深入掌握Go语言中的正则表达式与字符串处理
Go语言中的正则表达式与模式匹配 在编程中,字符串处理是常见的需求之一,而正则表达式则是一个强大的工具,能够帮助我们实现复杂的字符串匹配、提取和替换功能。Go语言内置了对正则表达式的支持,通过regexp包,我们可以…...

Docker进入容器运行命令
Docker进入容器运行命令 1. **使用 docker exec 进入容器并运行命令**语法:示例 1:进入容器并启动交互式 Bash 终端示例 2:在容器中运行单个命令 2. **使用 docker attach 进入容器**3. **使用 docker run 启动新容器并运行命令**4. **使用 d…...

[数据集][目标检测]机油泄漏检测数据集VOC+YOLO格式43张1类别
数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):43 标注数量(xml文件个数):43 标注数量(txt文件个数):43 标注类别数…...

Python实现读取Excel数据详细教学版
Python实现读取Excel数据详细教学版 在处理数据和进行数据分析时,Excel文件是常见的数据载体。通过Python读取Excel数据,可以方便地对数据进行进一步的处理和分析。以下将详细介绍使用Python读取Excel数据的方法和相关库的使用,并提供具体代…...

【HarmonyOS】- 内存优化
文章目录 知识回顾前言源码分析1. onMemoryLevel2. 使用LRUCache优化ArkTS内存原理介绍3. 使用生命周期管理优化ArkTS内存4. 使用purgeable优化C++内存拓展知识1. Purgeable Memory总结知识回顾 前言 当应用程序占用过多内存时,系统可能会频繁进行内存回收和重新分配,导致应…...

【生日视频制作】保时捷车主提车交车仪式感AE模板修改文字软件生成器教程特效素材【AE模板】
生日视频制作教程保时捷车主提车交车仪式感AE模板修改文字特效广告生成神器素材祝福玩法AE模板工程 怎么如何做的【生日视频制作】保时捷车主提车交车仪式感AE模板修改文字软件生成器教程特效素材【AE模板】 生日视频制作步骤: 下载AE模板 安装AE软件 把AE模板导入…...

【自用14】C++俄罗斯方块-思路复盘3
在上篇降落函数中使用到了判断游戏是否结束的功能,因此这篇先从判断游戏是否结束开始 判断游戏是否结束 void failCheck(void){if(!moveable(START_X,START_Y,MOVE_DOWN,BLOCK_UP)){setcolor(WHITE);setfont(45,0,_T("隶体"));outtextxy(75,300,_T(&quo…...

ElasticSearch的DSL查询⑤(ES数据聚合、DSL语法数据聚合、RestClient数据聚合)
目录 一、数据聚合 1.1 DSL实现聚合 1.1.1 Bucket聚合 1.1.2 带条件聚合 1.1.3 Metric聚合 1.1.4 总结 2.1 RestClient实现聚合 2.1.1 Bucket聚合 2.1.2 带条件聚合 2.2.3 Metric聚合 一、数据聚合 聚合(aggregations)可以让我们极其方便的实…...

DBeaver 24.0 高阶用法
DBeaver 24.0 高阶用法 文章目录 DBeaver 24.0 高阶用法DBeaver 介绍功能一、元数据搜索功能二、仪表盘显示功能三、ER图功能四、导出数据最后 DBeaver 介绍 DBeaver 确实是一款功能强大的通用数据库管理工具,适合所有需要以专业方式处理数据的用户。它不仅提供了直…...

外卖会员卡项目骗局揭秘,你还在做梦吗?改醒醒了
大家好,我是鲸天科技千千,大家都知道我是做开发的,做互联网行业很多年了,平时会在这里给大家分享一些互联网相关的小技巧和小项目,感兴趣的给我点个关注。 关于外卖会员卡这个项目的一些骗局和套路,我真的…...

比较顺序3s1,3s2,4s1之间的关系
(A,B)---6*30*2---(0,1)(1,0) 分类A和B,让B全是0。当收敛误差为7e-4,收敛199次取迭代次数平均值,3s1为 3s2为 4s1为 3s1,3s2,4s1这3个顺序之间是否有什么联系 , 因为4s1可以按照结构加法 变换成与4s1内在…...

BUUCTF靶场[web][极客大挑战 2019]Http、[HCTF 2018]admin
目录 [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 [web][HCTF 2018]admin 考点:弱密码字典爆破 四种方法: [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 访问…...

数据库锁之行级锁、记录锁、间隙锁和临键锁
1. 行级锁 InnoDB 引擎支持行级锁,而MyISAM 引擎不支持行级锁,只支持表级锁。行级锁是基于索引实现的。 对于普通的select语句,是不会加记录锁的,因为它属于快照读,通过在MVCC中的undo log版本链实现。如果要在查询时对…...

基于yolov8的血细胞检测计数系统python源码+onnx模型+评估指标曲线+精美GUI界面
【算法介绍】 基于YOLOv8的血细胞检测与计数系统是一种利用深度学习技术,特别是YOLOv8目标检测算法,实现高效、准确血细胞识别的系统。该系统能够自动识别并计数图像或视频中的血细胞,包括红细胞、白细胞和血小板等,为医疗诊断提…...

【深度学习详解】Task3 实践方法论-分类任务实践 Datawhale X 李宏毅苹果书 AI夏令营
前言 综合之前的学习内容, 本篇将探究机器学习实践方法论 出现的问题及其原因 🍎 🍎 🍎 系列文章导航 【深度学习详解】Task1 机器学习基础-线性模型 Datawhale X 李宏毅苹果书 AI夏令营 【深度学习详解】Task2 分段线性模型-引入…...

乐凡北斗 | 手持北斗智能终端的作用与应用场景
在科技日新月异的今天,北斗智能终端作为一项融合了北斗导航系统与现代智能技术的创新成果,正悄然改变着我们的生活方式和工作模式。 北斗智能终端,是以北斗卫星导航系统为核心,集成了高精度定位、导航、授时等功能的智能设备。它…...

Linux:线程互斥
线程互斥 先看到一个抢票案例: class customer { public:int _ticket_num 0;pthread_t _tid;string _name; };int g_ticket 10000;void* buyTicket(void* args) {customer* cust (customer*)args;while(true){if(g_ticket > 0){usleep(1000);cout << …...

misc流量分析
一、wireshark语法 1、wireshark过滤语法 (1)过滤IP地址 ip.srcx.x..x.x 过滤源IP地址 ip.dstx.x.x.x 过滤目的IP ip.addrx.x.x.x 过滤某个IP (2)过滤端口号 tcp.port80tcp.srcport80 显示TCP的源端口80tcp.dstport80 显示…...

Linux驱动(五):Linux2.6驱动编写之设备树
目录 前言一、设备树是个啥?二、设备树编写语法规则1.文件类型2.设备树源文件(DTS)结构3.设备树源文件(DTS)解析 三、设备树API函数1.在内核中获取设备树节点(三种)2.获取设备树节点的属性 四、…...

算法【Java】 —— 前缀和
模板引入 一维前缀和 https://www.nowcoder.com/share/jump/9257752291725692504394 解法一:暴力枚举 在每次提供 l 与 r 的时候,都从 l 开始遍历数组,直到遇到 r 停止,这个方法的时间复杂度为 O(N * q) 解法二:前…...

python网络爬虫(四)——实战练习
0.为什么要学习网络爬虫 深度学习一般过程: 收集数据,尤其是有标签、高质量的数据是一件昂贵的工作。 爬虫的过程,就是模仿浏览器的行为,往目标站点发送请求,接收服务器的响应数据,提取需要的信息,…...

tio websocket 客户端 java 代码 工具类
为了更好地组织代码并提高可复用性,我们可以将WebSocket客户端封装成一个工具类。这样可以在多个地方方便地使用WebSocket客户端功能。以下是使用tio库实现的一个WebSocket客户端工具类。 1. 添加依赖 确保项目中添加了tio的依赖。如果使用的是Maven,可以…...