当前位置: 首页 > article >正文

Flink深入浅出之04:时间、水印、TableSQL

深入理解Flink的waterMark的机制、Flink Table和SQL开发

3️⃣ 目标

  1. 掌握WaterMark的的原理
  2. 掌握WaterMark的运用
  3. 掌握Flink Table和SQL开发

4️⃣ 要点

📖 1. Flink中的Time概念

  • 对于流式数据处理,最大的特点是数据上具有时间的属性特征

  • Flink根据时间产生的位置不同,可以将时间区分为三种时间概念

    • Event Time(事件生成时间)
      • 事件产生的时间,它通常由事件中的时间戳描述
    • Ingestion time(事件接入时间)
      • 事件进入Flink程序的时间
    • Processing Time(事件处理时间)
      • 事件被处理时当前系统的时间

在这里插入图片描述

  • Flink在流处理程序中支持不同的时间概念。
1.1 EventTime
  • 1、事件生成时的时间,在进入Flink之前就已经存在,可以从event的字段中抽取
  • 2、必须指定watermarks(水位线)的生成方式
  • 3、优势:确定性,乱序、延时、或者数据重放等情况,都能给出正确的结果
  • 4、弱点:处理无序事件时性能和延迟受到影响
1.2 IngestTime
  • 1、事件进入flink的时间,即在source里获取的当前系统的时间,后续操作统一使用该时间。
  • 2、不需要指定watermarks的生成方式(自动生成)
  • 3、弱点:不能处理无序事件和延迟数据
1.3 ProcessingTime
  • 1、执行操作的机器的当前系统时间(每个算子都不一样)

  • 2、不需要流和机器之间的协调

  • 3、优势:最佳的性能和最低的延迟

  • 4、弱点:不确定性 ,容易受到各种因素影像(event产生的速度、到达flink的速度、在算子之间传输速度等),压根就不管顺序和延迟

1.4 三种时间的综合比较
  • 性能

    • ProcessingTime > IngestTime > EventTime
  • 延迟

    • ProcessingTime < IngestTime < EventTime
  • 确定性

    • EventTime > IngestTime > ProcessingTime
1.5 设置 Time 类型
  • 可以你的流处理程序是以哪一种时间为标志的。

    • 在我们创建StreamExecutionEnvironment的时候可以设置Time类型,不设置Time类型,默认是ProcessingTime。
    • 如果设置Time类型为EventTime或者IngestTime,需要在创建StreamExecutionEnvironment中调用setStreamTimeCharacteristic() 方法指定。
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//不设置Time 类型,默认是processingTime。
    environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//指定流处理程序以IngestionTime为准
    //environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//指定流处理程序以EventTime为准
    //environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
1.6 ProcessWindowFunction实现时间确定
  • 需求

    • 通过process实现处理时间的确定,包括数据时间、window时间等
  • 代码开发

    package com.kaikeba.timeimport org.apache.commons.lang3.time.FastDateFormat
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector/*** 通过process实现处理时间的确定,包括数据时间,window时间等*/
    object TimeWindowWordCount {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketSource: DataStream[String] = environment.socketTextStream("node01",9999)//对数据进行处理socketSource.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).timeWindow(Time.seconds(2),Time.seconds(1)).process(new SumProcessFunction).print()environment.execute()}}class SumProcessFunction extends ProcessWindowFunction[(String,Int),(String,Int),Tuple,TimeWindow]{val format: FastDateFormat = FastDateFormat.getInstance("HH:mm:ss")override def process(key: Tuple, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {println("当前系统时间为:"+format.format(System.currentTimeMillis()))println("window的处理时间为:"+format.format(context.currentProcessingTime))println("window的开始时间为:"+format.format(context.window.getStart))println("window的结束时间为:"+format.format(context.window.getEnd))var sum:Int = 0for(eachElement <- elements){sum += eachElement._2}out.collect((key.getField(0),sum))}}

📖 2. Watermark机制

2.1 Watermark的概念
   通常情况下由于网络或者系统等外部因素影响下,事件数据往往不能及时传输至FLink系统中,导致系统的不稳定而造成数据乱序到达或者延迟达到等问题,因此需要有一种机制能够控制数据处理的进度。具体来讲,在创建一个基于时间的window后,需要确定属于该window的数据元素是否已经全部到达,确定后才可以对window中的所有数据做计算处理(如汇总、分组),如果数据并没有全部到达,则继续等待该窗口的数据全部到达后再开始计算。但是对于但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。在这种情况下就需要用到水位线 (Watermark) 机制。
2.2 Watermark的作用
    它能够衡量数据处理进度,保证事件数据全部到达Flink系统,即使数据乱序或者延迟到达,也能够像预期一样计算出正确和连续的结果。通常watermark是结合window来实现。
2.3 Watermark的原理
   在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据(全部)到达Flink系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。 
  • 那么 Flink 是怎么计算 Watermark 的值呢?
    ⭐️

    • Watermark = 进入 Flink 的最大的事件产生时间(maxEventTime)— 指定的乱序时间(t)
  • 那么有 Watermark 的 Window 是怎么触发窗口函数的呢?

(1) watermark >= window的结束时间
(2) 该窗口必须有数据 注意:[window_start_time,window_end_time) 中有数据存在,前闭后开区间

  • 注意:Watermark 本质可以理解成一个延迟触发机制。
2.4 Watermark 的使用存在三种情况
  • (1)有序的数据流中的watermark

    	如果数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和事件时间保持一直(因为既然是有序的时间,就不需要设置延迟了,那么 t 就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想状态下的水位线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是一样。
    

    在这里插入图片描述

  • (2)乱序的数据流watermark

    	现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理,而频繁出现乱序或迟到的情况,这种情况就需要使用 Watermarks 来应对。比如下图,设置延迟时间t为2。
    

    在这里插入图片描述

  • (3)并行数据流中的 Watermark

    	在多并行度的情况下,Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel 中最小的 Watermark。
    

    在这里插入图片描述

2.5 引入watermark和eventtime
2.5.1 有序数据流中引入 Watermark 和 EventTime
  • 它会将数据中的timestamp根据指定的字段提取得到Eventtime,然后使用Eventtime作为最新的watermark, 这种适合于事件按顺序生成,没有乱序事件的情况。

  • 对于有序的数据,代码比较简洁,主要需要从源 Event 中抽取 EventTime。

  • 需求

    • 对socket中有序(按照时间递增)的数据流,进行每5s处理一次
  • 代码演示

package com.kaikeba.watermarkimport org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.functions.{MapFunction}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorobject OrderedStreamWaterMark {def main(args: Array[String]): Unit = {//todo:1.构建流式处理环境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._environment.setParallelism(1)//todo:2.设置时间类型environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//todo:3.获取数据源val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)//todo:4. 数据处理val mapStream: DataStream[(String, Long)] = sourceStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong))//todo: 5.从源Event中抽取eventTimeval watermarkStream: DataStream[(String, Long)] = mapStream.assignAscendingTimestamps(x=>x._2)//todo:6. 数据计算watermarkStream.keyBy(0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction[(String, Long),(String,Long),Tuple,TimeWindow] {override def process(key: Tuple, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {val value: String = key.getField[String](0)//窗口的开始时间val startTime: Long = context.window.getStart//窗口的结束时间val startEnd: Long = context.window.getEnd//获取当前的 watermarkval watermark: Long = context.currentWatermarkvar sum:Long = 0val toList: List[(String, Long)] = elements.toListfor(eachElement <-  toList){sum +=1}println("窗口的数据条数:"+sum+" |窗口的第一条数据:"+toList.head+" |窗口的最后一条数据:"+toList.last+" |窗口的开始时间: "+ startTime+" |窗口的结束时间: "+ startEnd+" |当前的watermark:"+ watermark)out.collect((value,sum))}}).print()environment.execute()}}
  • 发送数据
000001,1461756862000
000001,1461756866000
000001,1461756872000
000001,1461756873000
000001,1461756874000
000001,1461756875000
2.5.2 乱序数据流中引入 Watermark 和 EventTime

对于乱序数据流,有两种常见的引入方法:周期性和间断性

  • 1、With Periodic(周期性的) Watermark

    	周期性地生成 Watermark 的生成,默认是 100ms。每隔 N 毫秒自动向流里注入一个 Watermark,时间间隔由 streamEnv.getConfig.setAutoWatermarkInterval()决定。
    
    • 需求
      • 对socket中无序数据流,进行每5s处理一次,数据中会有延迟
    • 代码演示
    package com.kaikeba.watermarkimport org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector//对无序的数据流周期性的添加水印object OutOfOrderStreamPeriodicWaterMark {def main(args: Array[String]): Unit = {//todo:1.构建流式处理环境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._environment.setParallelism(1)//todo:2.设置时间类型environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//todo:3.获取数据源val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)//todo:4. 数据处理val mapStream: DataStream[(String, Long)] = sourceStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong))//todo:5. 添加水位线mapStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {//定义延迟时间长度//表示在5秒以内的数据延时有效,超过5秒的数据被认定为迟到事件val maxOutOfOrderness=5000L//历史最大事件时间var currentMaxTimestamp:Long=_var watermark:Watermark=_//周期性的生成水位线watermarkoverride def getCurrentWatermark: Watermark ={watermark =  new Watermark(currentMaxTimestamp -maxOutOfOrderness)watermark}//抽取事件时间override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long ={//获取事件时间val currentElementEventTime: Long = element._2//对比当前事件时间和历史最大事件时间, 将较大值重新赋值给currentMaxTimestampcurrentMaxTimestamp=Math.max(currentMaxTimestamp,currentElementEventTime)println("接受到的事件:"+element+" |事件时间: "+currentElementEventTime)currentElementEventTime}}).keyBy(0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction[(String, Long),(String,Long),Tuple,TimeWindow] {override def process(key: Tuple, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {val value: String = key.getField[String](0)//窗口的开始时间val startTime: Long = context.window.getStart//窗口的结束时间val startEnd: Long = context.window.getEnd//获取当前的 watermarkval watermark: Long = context.currentWatermarkvar sum:Long = 0val toList: List[(String, Long)] = elements.toListfor(eachElement <-  toList){sum +=1}//窗口的开始时间val startTime: Long = context.window.getStart//窗口的结束时间val startEnd: Long = context.window.getEnd//获取当前的 watermarkval watermark: Long = context.currentWatermarkvar sum:Long = 0val toList: List[(String, Long)] = elements.toListfor(eachElement <-  toList){sum +=1}  println("窗口的数据条数:"+sum+" |窗口的第一条数据:"+toList.head+" |窗口的最后一条数据:"+toList.last+" |窗口的开始时间: "+  startTime +" |窗口的结束时间: "+ startEnd+" |当前的watermark:"+watermark)out.collect((value,sum))}}).print()       environment.execute() }  }     
    
    • 发送数据
    000001,1461756862000
    000001,1461756872000
    000001,1461756866000
    000001,1461756873000
    000001,1461756874000
    000001,1461756875000
    000001,1461756879000
    000001,1461756880000
    
  • 2、With Punctuated(间断性的) Watermark

      间断性的生成 Watermark 一般是基于某些事件触发 Watermark 的生成和发送。比如说只给用户id为000001的添加watermark,其他的用户就不添加
    
    • 需求

      • 对socket中无序数据流,进行每5s处理一次,数据中会有延迟
    • 代码演示

      package com.kaikeba.watermarkimport org.apache.commons.lang3.time.FastDateFormatimport org.apache.flink.api.common.functions.MapFunctionimport org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.streaming.api.scala.function.ProcessWindowFunctionimport org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector//对无序的数据流间断性的添加水印object OutOfOrderStreamPunctuatedWaterMark {def main(args: Array[String]): Unit = {//todo:1.构建流式处理环境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._environment.setParallelism(1)//todo:2.设置时间类型environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//todo:3.获取数据源val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)//todo:4. 数据处理val mapStream: DataStream[(String, Long)] = sourceStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong))//todo:5. 添加水位线mapStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[(String, Long)] {//定义延迟时间长度//表示在5秒以内的数据延时有效,超过5秒的数据被认定为迟到事件val maxOutOfOrderness=5000L//历史最大事件时间var currentMaxTimestamp:Long=_override def checkAndGetNextWatermark(lastElement: (String, Long), extractedTimestamp: Long): Watermark ={//当用户id为000001生成watermarkif(lastElement._1.equals("000001")){val watermark=  new Watermark(currentMaxTimestamp-maxOutOfOrderness)watermark}else{//其他情况下不返回水位线null}}override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {//获取事件时间val currentElementEventTime: Long = element._2//对比当前事件时间和历史最大事件时间, 将较大值重新赋值给currentMaxTimestampcurrentMaxTimestamp=Math.max(currentMaxTimestamp,currentElementEventTime)println("接受到的事件:"+element+" |事件时间: "+currentElementEventTime )currentElementEventTime}}).keyBy(0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction[(String, Long),(String,Long),Tuple,TimeWindow] {override def process(key: Tuple, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {val value: String = key.getField[String](0)//窗口的开始时间val startTime: Long = context.window.getStart//窗口的结束时间val startEnd: Long = context.window.getEnd//获取当前的 watermarkval watermark: Long = context.currentWatermarkvar sum:Long = 0val toList: List[(String, Long)] = elements.toListfor(eachElement <-  toList){sum +=1}println("窗口的数据条数:"+sum+" |窗口的第一条数据:"+toList.head+" |窗口的最后一条数据:"+toList.last+" |窗口的开始时间: "+startTime +" |窗口的结束时间: "+startEnd+" |当前的watermark:"+watermark)out.collect((value,sum))}}).print()environment.execute()}}        
    
    • 发送数据
    000001,1461756862000
    000001,1461756866000
    000001,1461756872000
    000002,1461756867000
    000002,1461756868000
    000002,1461756875000
    000001,1461756875000
    
2.5.3 Window 的allowedLateness处理延迟太大的数据
	基于 Event-Time 的窗口处理流式数据,虽然提供了 Watermark 机制,却只能在一定程度上解决了数据乱序的问题。但在某些情况下数据可能延时会非常严重,即使通过 Watermark 机制也无法等到数据全部进入窗口再进行处理。Flink 中默认会将这些迟到的数据做丢弃处理,但是有些时候用户希望即使数据延迟到达的情况下,也能够正常按照流程处理并输出结果,此时就需要使用 Allowed Lateness 机制来对迟到的数据进行额外的处理。
  • 迟到数据的处理机制

    • 1、直接丢弃

    • 2、指定允许再次迟到的时间

      //例如
      assignTimestampsAndWatermarks(new EventTimeExtractor() ).keyBy(0).timeWindow(Time.seconds(3)).allowedLateness(Time.seconds(2)) // 允许事件再迟到2秒.process(new SumProcessWindowFunction()).print().setParallelism(1);//注意:
      //(1). 当我们设置允许迟到2秒的事件,第一次 window 触发的条件是 watermark >= window_end_time
      //(2). 第二次(或者多次)触发的条件是watermark < window_end_time + allowedLateness
      
    • 3、收集迟到太多的数据

      //例如
      assignTimestampsAndWatermarks(new EventTimeExtractor() ).keyBy(0).timeWindow(Time.seconds(3)).allowedLateness(Time.seconds(2)) //允许事件迟到2秒.sideOutputLateData(outputTag)    //收集迟到太多的数据.process(new SumProcessWindowFunction()).print().setParallelism(1);
      
  • 代码演示

    package com.kaikeba.watermarkimport org.apache.commons.lang3.time.FastDateFormat
    import org.apache.flink.api.common.functions.MapFunction
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector//运行数据再次延延迟一段时间,并且对延迟太多的数据进行收集
    object AllowedLatenessTest {def main(args: Array[String]): Unit = {//todo:1.构建流式处理环境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._environment.setParallelism(1)//todo:2.设置时间类型environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//todo:3.获取数据源val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)//todo:4. 数据处理val mapStream: DataStream[(String, Long)] = sourceStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong))//定义一个侧输出流的标签,用于收集迟到太多的数据val lateTag=new OutputTag[(String, Long)]("late")//todo:5.  数据计算--添加水位线val result: DataStream[(String, Long)] = mapStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {//定义延迟时间长度//表示在5秒以内的数据延时有效,超过5秒的数据被认定为迟到事件val maxOutOfOrderness = 5000L//历史最大事件时间var currentMaxTimestamp: Long = _ //周期性的生成水位线watermarkoverride def getCurrentWatermark: Watermark = {val watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)watermark}//抽取事件时间override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {//获取事件时间val currentElementEventTime: Long = element._2//对比当前事件时间和历史最大事件时间, 将较大值重新赋值给currentMaxTimestampcurrentMaxTimestamp = Math.max(currentMaxTimestamp, currentElementEventTime)println("接受到的事件:" + element + " |事件时间: " + currentElementEventTime )currentElementEventTime}}).keyBy(0).timeWindow(Time.seconds(5)).allowedLateness(Time.seconds(2)) //允许数据延迟2s.sideOutputLateData(lateTag)     //收集延迟大多的数据.process(new ProcessWindowFunction[(String, Long), (String, Long), Tuple, TimeWindow] {override def process(key: Tuple, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {val value: String = key.getField[String](0)//窗口的开始时间val startTime: Long = context.window.getStart//窗口的结束时间val startEnd: Long = context.window.getEnd//获取当前的 watermarkval watermark: Long = context.currentWatermarkvar sum: Long = 0val toList: List[(String, Long)] = elements.toListfor (eachElement <- toList) {sum += 1}println("窗口的数据条数:" + sum +" |窗口的第一条数据:" + toList.head +" |窗口的最后一条数据:" + toList.last +" |窗口的开始时间: " + startTime +" |窗口的结束时间: " + startEnd +" |当前的watermark:" + watermark)out.collect((value, sum))}})//打印延迟太多的数据result.getSideOutput(lateTag).print("late")//打印result.print("ok")environment.execute()}
    }                  
    
  • 发送数据

    000001,1461756862000
    000001,1461756866000
    000001,1461756868000
    000001,1461756869000
    000001,1461756870000
    000001,1461756862000
    000001,1461756871000
    000001,1461756872000
    000001,1461756862000
    000001,1461756863000
    
2.5.4 多并行度下的WaterMark

在这里插入图片描述

本地测试的过程中,如果不设置并行度的话,
默认读取本机CPU数量设置并行度,
可以手动设置并行度environment.setParallelism(1),每一个线程都会有一个watermark.
多并行度的情况下,一个window可能会接受到多个不同线程waterMark,
  • watermark对齐会取所有channel最小的watermark,以最小的watermark为准。

  • 案例演示

    
    package com.kaikeba.watermarkimport org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector/*** 得到并打印每隔 5 秒钟统计前 5秒内的相同的 key 的所有的事件* 测试多并行度下的watermark*/
    object WaterMarkWindowWithMultipart {def main(args: Array[String]): Unit = {//todo:1.构建流式处理环境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//设置并行度为2environment.setParallelism(2)//todo:2.设置时间类型environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//todo:3.获取数据源val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)//todo:4. 数据处理val mapStream: DataStream[(String, Long)] = sourceStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong))//todo:5. 添加水位线mapStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {//定义延迟时间长度//表示在5秒以内的数据延时有效,超过5秒的数据被认定为迟到事件val maxOutOfOrderness=5000L//历史最大事件时间var currentMaxTimestamp:Long=_//周期性的生成水位线watermarkoverride def getCurrentWatermark: Watermark ={val  watermark =  new Watermark(currentMaxTimestamp -maxOutOfOrderness)watermark}//抽取事件时间override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long ={//获取事件时间val currentElementEventTime: Long = element._2//对比当前事件时间和历史最大事件时间, 将较大值重新赋值给currentMaxTimestampcurrentMaxTimestamp=Math.max(currentMaxTimestamp,currentElementEventTime)val id: Long = Thread.currentThread.getIdprintln("当前的线程id:"+id+" |接受到的事件:"+element+" |事件时间: "+currentElementEventTime+" |当前值的watermark:"+getCurrentWatermark().getTimestamp())currentElementEventTime}}).keyBy(0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction[(String, Long),(String,Long),Tuple,TimeWindow] {override def process(key: Tuple, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {val value: String = key.getField[String](0)//窗口的开始时间val startTime: Long = context.window.getStart//窗口的结束时间val startEnd: Long = context.window.getEnd//获取当前的 watermarkval watermark: Long = context.currentWatermarkvar sum:Long = 0val toList: List[(String, Long)] = elements.toListfor(eachElement <-  toList){sum +=1} println("窗口的数据条数:"+sum+" |窗口的第一条数据:"+toList.head+" |窗口的最后一条数据:"+toList.last+" |窗口的开始时间: "+  startTime +" |窗口的结束时间: "+ startEnd+" |当前的watermark:"+ watermark)out.collect((value,sum))}}).print()environment.execute()}    }
    
  • 输入数据

    000001,1461756862000
    000001,1461756864000
    000001,1461756866000
    000001,1461756870000
    000001,1461756871000
    
  • 输出结果

    当前的线程id:65 |接受到的事件:(000001,1461756862000) |事件时间: 1461756862000 |当前值的watermark:1461756857000
    当前的线程id:64 |接受到的事件:(000001,1461756864000) |事件时间: 1461756864000 |当前值的watermark:1461756859000
    当前的线程id:65 |接受到的事件:(000001,1461756866000) |事件时间: 1461756866000 |当前值的watermark:1461756861000
    当前的线程id:64 |接受到的事件:(000001,1461756870000) |事件时间: 1461756870000 |当前值的watermark:1461756865000
    当前的线程id:65 |接受到的事件:(000001,1461756871000) |事件时间: 1461756871000 |当前值的watermark:1461756866000
    窗口的数据条数:2 |窗口的第一条数据:(000001,1461756862000) |窗口的最后一条数据:(000001,1461756864000) |窗口的开始时间: 1461756860000 |窗口的结束时间: 1461756865000 |当前的watermark:1461756865000
    2> (000001,2)
    
  • 结果分析

在这里插入图片描述

📖 3. Flink的Table和SQL

3.1 Table与SQL基本介绍
	在Spark中有DataFrame这样的关系型编程接口,因其强大且灵活的表达能力,能够让用户通过非常丰富的接口对数据进行处理,有效降低了用户的使用成本。Flink也提供了关系型编程接口 Table API 以及基于Table API 的 SQL API,让用户能够通过使用结构化编程接口高效地构建Flink应用。同时Table API 以及 SQL 能够统一处理批量和实时计算业务, 无须切换修改任何应用代码就能够基于同一套 API 编写流式应用和批量应用,从而达到真正意义的批流统一。
  • Apache Flink 具有两个关系型API:Table API 和SQL,用于统一流和批处理
  • Table API 是用于 Scala 和 Java 语言的查询API,允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。
  • Table API和SQL接口彼此集成,Flink的DataStream和DataSet API亦是如此。我们可以轻松地在基于API构建的所有API和库之间切换。
  • 注意,到目前最新版本为止,Table API和SQL还有很多功能正在开发中。 并非[Table API,SQL]和[stream,batch]输入的每种组合都支持所有操作

在这里插入图片描述

3.2 为什么需要SQL
  • Table API 是一种关系型API,类 SQL 的API,用户可以像操作表一样地操作数据, 非常的直观和方便。

  • SQL 作为一个"人所皆知"的语言,如果一个引擎提供 SQL,它将很容易被人们接受。这已经是业界很常见的现象了。

  • Table & SQL API 还有另一个职责,就是流处理和批处理统一的API层。

    在这里插入图片描述

3.3 开发环境构建
  • 在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴 Blink 团队贡献的诸多功能,取名叫: Blink Planner。

  • 在使用 Table API 和 SQL 开发 Flink 应用之前,通过添加 Maven 的依赖配置到项目中,在本地工程中引入相应的依赖库,库中包含了 Table API 和 SQL 接口。

  • 添加pom依赖

     <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.9.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.9.2</version></dependency>
    
3.4 TableEnvironment构建
  • 和 DataStream API 一样,Table API 和 SQL 中具有相同的基本编程模型。首先需要构建对应的 TableEnviroment 创建关系型编程环境,才能够在程序中使用 Table API 和 SQL来编写应用程序,另外 Table API 和 SQL 接口可以在应用中同时使用,Flink SQL 基于 Apache Calcite 框架实现了 SQL 标准协议,是构建在 Table API 之上的更高级接口。

  • 首先需要在环境中创建 TableEnvironment 对象,TableEnvironment 中提供了注册内部表、执行 Flink SQL 语句、注册自定义函数等功能。根据应用类型的不同,TableEnvironment 创建方式也有所不同,但是都是通过调用 create()方法创建。

  • 流计算环境下创建 TableEnviroment

    //初始化Flink的Streaming(流计算)上下文执行环境 
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
    //初始化Table API的上下文环境 
    val tableEvn =StreamTableEnvironment.create(streamEnv)
    
  • 在 Flink1.9 之后由于引入了 Blink Planner

    val bsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() 
    val bsTableEnv = StreamTableEnvironment.create(streamEnv, bsSettings)
    
  • 注意

    • Flink 社区完整保留原有 Flink Planner (Old Planner),同时又引入了新的Blink Planner,用户可以自行选择使用 Old Planner 还是 Blink Planner。官方推荐暂时使用 Old Planner。
3.5 Table API
  • 在 Flink 中创建一张表有两种方法:
    • (1)从一个文件中导入表结构(Structure)(常用于批计算)(静态)
    • (2)从 DataStream 或者 DataSet 转换成 Table (动态)
3.5.1 创建 Table
  • Table API 中已经提供了 TableSource 从外部系统获取数据,例如常见的数据库、文件系统和 Kafka 消息队列等外部系统。

  • 1、从文件中创建 Table(静态表)

    • 需求

      • 读取csv文件,文件内容参见课件当中的flinksql.csv文件,查询年龄大于18岁的人,并将结果写入到csv文件里面去,这里涉及到flink的connect的各种与其他外部系统的连接,参见
      • https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html
    • 代码开发

      package com.kaikeba.tableimport org.apache.flink.api.common.typeinfo.TypeInformation
      import org.apache.flink.core.fs.FileSystem.WriteMode
      import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
      import org.apache.flink.table.api.{Table, Types}
      import org.apache.flink.table.api.scala.StreamTableEnvironment
      import org.apache.flink.table.sinks.CsvTableSink
      import org.apache.flink.table.sources.CsvTableSource
      import org.apache.flink.api.scala._
      /*** flink table加载csv文件*/
      object TableCsvSource {def main(args: Array[String]): Unit = {//todo:1、构建流处理环境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//todo:2、构建TableEnvironmentval tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(environment)//todo:3、构建csv数据源val csvSource = CsvTableSource.builder().path("d:\\flinksql.csv").field("id", Types.INT()).field("name", Types.STRING()).field("age", Types.INT()).fieldDelimiter(",") //字段的分隔符.ignoreParseErrors() //忽略解析错误.ignoreFirstLine()   //忽略第一行.build()//todo:4、注册表tableEnvironment.registerTableSource("myUser", csvSource)//todo: 5、查询结果val result: Table = tableEnvironment.scan("myUser").filter("age>25").select("id,name,age")result.printSchema()//todo: 6、构建Sinkval tableSink = new CsvTableSink("./out/tableSink.txt","\t",1,WriteMode.OVERWRITE)//todo:7、注册sinktableEnvironment.registerTableSink("csvOutputTable",Array[String]("f1","f2","f3"),Array[TypeInformation[_]](Types.INT,Types.STRING,Types.INT) ,tableSink)//todo:8、写数据到sinkresult.insertInto("csvOutputTable")environment.execute("TableCsvSource")}
      }
  • 2、从DataStream中创建 Table(动态表)

    • 需求

      • 使用TableApi完成基于流数据的处理
    • 代码开发

      package com.kaikeba.tableimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
      import org.apache.flink.table.api.{Table, Types}
      import org.apache.flink.table.api.scala.StreamTableEnvironment
      import org.apache.flink.types.Row
      /*** 使用TableApi完成基于流数据的处理*/
      object TableFromDataStream {//todo:定义样例类case class User(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//todo:1、构建流处理环境val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setParallelism(1)//todo:2、构建TableEnvironmentval tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv)import org.apache.flink.api.scala._/*** 101,zhangsan,18* 102,lisi,28* 103,wangwu,25* 104,zhaoliu,30*///todo:3、接受socket数据val socketStream: DataStream[String] = streamEnv.socketTextStream("node01",9999)//todo:4、对数据进行处理val userStream: DataStream[User] = socketStream.map(x=>x.split(",")).map(x=>User(x(0).toInt,x(1),x(2).toInt))//todo:5、将流注册成一张表tableEnvironment.registerDataStream("userTable",userStream)//todo:6、使用table 的api查询年龄大于20岁的人val result:Table = tableEnvironment.scan("userTable").filter("age >20")//todo:7、将table转化成流tableEnvironment.toAppendStream[Row](result).print()      //todo:8、启动tableEnvironment.execute("TableFromDataStream")}}  
      
    • 发送数据

      nc -lk 9999101,zhangsan,18
      102,lisi,28
      103,wangwu,25
      104,zhaoliu,30
      
    • DataStream转换成Table逻辑

      • 构建StreamExecutionEnvironment和StreamTableEnvironment对象
        • StreamTableEnvironment.fromDataStream(dataStream: DataStream)
        • StreamTableEnvironment.registerDataStream(dataStream: DataStream)
  • 更多的table API操作详细见官网

    • https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tableApi.html

    在这里插入图片描述

3.5.2 Table中的window
  • Flink 支持 ProcessTime、EventTime 和 IngestionTime 三种时间概念,针对每种时间概念,Flink Table API 中使用 Schema 中单独的字段来表示时间属性,当时间字段被指定后,就可以在基于时间的操作算子中使用相应的时间属性。

  • 在 Table API 中通过使用==.rowtime 来定义 EventTime 字段==,在 ProcessTime 时间字段名后使用.proctime 后缀来指定 ProcessTime 时间属性

  • 需求

    • 统计最近 5 秒钟,每个单词出现的次数
  • 代码开发

    package com.kaikeba.tableimport org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.table.api.{GroupWindowedTable, Table, Tumble}
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.types.Row/*** 基于table的window窗口操作处理延迟数据*/
    object TableWindowWaterMark {//定义样例类case class Message(word:String,createTime:Long)def main(args: Array[String]): Unit = {//todo:1、构建流处理环境val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setParallelism(1)import org.apache.flink.api.scala._//指定EventTime为时间语义streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//todo: 2、构建StreamTableEnvironmentval tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv)//todo: 3、接受socket数据val sourceStream: DataStream[String] = streamEnv.socketTextStream("node01",9999)//todo: 4、数据切分处理val mapStream: DataStream[Message] = sourceStream.map(x=>Message(x.split(",")(0),x.split(",")(1).toLong))//todo: 5、添加watermarkval watermarksStream: DataStream[Message] = mapStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] {//定义延迟时长val maxOutOfOrderness = 5000L//历史最大事件时间var currentMaxTimestamp: Long = _override def getCurrentWatermark: Watermark = {val watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)watermark}override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = {val eventTime: Long = element.createTimecurrentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp)eventTime}})//todo:6、构建Table , 设置时间属性import org.apache.flink.table.api.scala._val table: Table = tableEnvironment.fromDataStream(watermarksStream,'word,'createTime.rowtime)//todo:7、添加window//滚动窗口第一种写法//val windowedTable: GroupWindowedTable = table.window(Tumble.over("5.second").on("createTime").as("window"))//滚动窗口的第二种写法val windowedTable: GroupWindowedTable = table.window(Tumble over 5.second on 'createTime as 'window)//todo:8、对窗口数据进行处理// 使用2个字段分组,窗口名称和单词val result: Table = windowedTable.groupBy('window,'word)//单词、窗口的开始、结束e、聚合计算.select('word,'window.start,'window.end,'word.count)//todo:9、将table转换成DataStreamval resultStream: DataStream[(Boolean, Row)] = tableEnvironment.toRetractStream[Row](result)resultStream.filter(x =>x._1 ==true).print()tableEnvironment.execute("table")
    }
    }
  • 发送数据

    hadoop,1461756862000
    hadoop,1461756866000
    hadoop,1461756864000
    hadoop,1461756870000
    hadoop,1461756875000
    
3.6 SQL使用
  • SQL 作为 Flink 中提供的接口之一,占据着非常重要的地位,主要是因为 SQL 具有灵活和丰富的语法,能够应用于大部分的计算场景。
  • Flink SQL 底层使用 Apache Calcite 框架, 将标准的 Flink SQL 语句解析并转换成底层的算子处理逻辑,并在转换过程中基于语法规则层面进行性能优化,比如谓词下推等。另外用户在使用 SQL 编写 Flink 应用时,能够屏蔽底层技术细节,能够更加方便且高效地通过SQL语句来构建Flink应用。
  • Flink SQL构建在Table API 之上,并含盖了大部分的 Table API 功能特性。同时 Flink SQL 可以和 Table API 混用,Flink 最终会在整体上将代码合并在同一套代码逻辑中
3.6.1 SQL操作
  • 代码开发演示

    package com.kaikeba.tableimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.table.api.Tableimport org.apache.flink.table.api.scala.StreamTableEnvironmentimport org.apache.flink.types.Rowobject FlinkSQLTest {//todo:定义样例类case class User(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = { //todo:1、构建流处理环境val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setParallelism(1)//todo:2、构建TableEnvironmentval tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv)import org.apache.flink.api.scala._/*** 101,zhangsan,18* 102,lisi,20* 103,wangwu,25* 104,zhaoliu,15*///todo:3、接受socket数据val socketStream: DataStream[String] = streamEnv.socketTextStream("node01",9999)//todo:4、对数据进行处理val userStream: DataStream[User] = socketStream.map(x=>x.split(",")).map(x=>User(x(0).toInt,x(1),x(2).toInt))//todo:5、将流注册成一张表tableEnvironment.registerDataStream("userTable",userStream)//todo:6、使用table 的api查询年龄大于20岁的人val result:Table = tableEnvironment.sqlQuery("select * from userTable where age >20")//todo:7、将table转化成流tableEnvironment.toAppendStream[Row](result).print()//todo:8、启动tableEnvironment.execute("TableFromDataStream")}}   
    
  • 发送数据

    101,zhangsan,18
    102,lisi,20
    103,wangwu,25
    104,zhaoliu,15
    
  • 将Table转换成为DataStream的两种模式

    • 第一种方式:AppendMode(追加模式)

         将表附加到流数据,表当中只能有查询或者添加操作,如果有update或者delete操作,那么就会失败。只有在动态Table仅通过INSERT时才能使用此模式,即它仅附加,并且以前发出的结果永远不会更新。如果更新或删除操作使用追加模式会失败报错。
      
    • 第二种模式:RetractMode(撤回模式)

           始终可以使用此模式。返回值是boolean类型。它用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回。
      
  • 按照官网的理解如果数据只是不断添加,可以使用追加模式,其余方式则不可以使用追加模式,而撤回模式侧可以适用于更新,删除等场景。具体的区别 如下图所示:

    在这里插入图片描述

    在这里插入图片描述

  • 通过上图可以清晰的看到两种方式的区别,我们在利用flinkSQL处理实时数据把表转化成流的时候,如果使用的sql语句包含:count() group by时,必须使用RetractMode撤回模式。

3.6.2 SQL中的window
  • Flink SQL 也支持三种窗口类型,分别为 Tumble Windows、HOP Windows 和 Session Windows,其中 HOP Windows 对应 Table API 中的 Sliding Window,同时每种窗口分别有相应的使用场景和方法。

  • 需求

    • 统计最近 5 秒钟,每个单词出现的次数
  • 代码开发

    package com.kaikeba.tableimport org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api.{GroupWindowedTable, Table, Tumble}
    import org.apache.flink.types.Row/*** 基于SQL的window窗口操作处理延迟数据*/
    object SQLWindowWaterMark {//定义样例类case class Message(word:String,createTime:Long)def main(args: Array[String]): Unit = {//todo:1、构建流处理环境val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setParallelism(1)import org.apache.flink.api.scala._//指定EventTime为时间语义streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//todo: 2、构建StreamTableEnvironmentval tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv)//todo: 3、接受socket数据val sourceStream: DataStream[String] = streamEnv.socketTextStream("node01",9999)//todo: 4、数据切分处理val mapStream: DataStream[Message] = sourceStream.map(x=>Message(x.split(",")(0),x.split(",")(1).toLong))//todo: 5、添加watermarkval watermarksStream: DataStream[Message] = mapStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] {//定义延迟时长val maxOutOfOrderness = 5000L//历史最大事件时间var currentMaxTimestamp: Long = _override def getCurrentWatermark: Watermark = {val watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)watermark}override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = {val eventTime: Long = element.createTimecurrentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp)eventTime}})//todo:6、注册DataStream成表 ,设置时间属性import org.apache.flink.table.api.scala._tableEnvironment.registerDataStream("t_socket",watermarksStream,'word,'createTime.rowtime)  //todo:7、sql查询---添加window---滚动窗口----窗口长度5sval result: Table = tableEnvironment.sqlQuery("select word,count(*) from t_socket group by tumble(createTime,interval '5' second),word")   //todo:8、将table转换成DataStreamval resultStream: DataStream[(Boolean, Row)] = tableEnvironment.toRetractStream[Row](result)resultStream.filter(x =>x._1 ==true).print()tableEnvironment.execute("table")}   }  
    
  • 发送数据

hadoop,1461756862000
hadoop,1461756865000
hadoop,1461756863000
hadoop,1461756868000
hadoop,1461756870000
hadoop,1461756875000
hadoop,1461756880000
  • 更多的SQL操作详细见官网

    • https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html

      在这里插入图片描述

把所有的代码都敲一遍

相关文章:

Flink深入浅出之04:时间、水印、TableSQL

深入理解Flink的waterMark的机制、Flink Table和SQL开发 3️⃣ 目标 掌握WaterMark的的原理掌握WaterMark的运用掌握Flink Table和SQL开发 4️⃣ 要点 &#x1f4d6; 1. Flink中的Time概念 对于流式数据处理&#xff0c;最大的特点是数据上具有时间的属性特征 Flink根据时…...

MongoDB Compass 使用说明

MongoDB Compass 使用说明 安装工具栏按钮详细介绍Connect(连接)1. New Window&#xff08;新窗口&#xff09;2. Disconnect&#xff08;断开连接&#xff09;3. Import Saved Connections&#xff08;导入保存的连接&#xff09;4. Export Saved Connections&#xff08;导出…...

Halcon 算子 一维码检测识别、项目案例

首先我们要明白码的识别思路 把窗口全部关闭读取新的图片图像预处理创建条码模型设置模型参数搜索模型获取条码结果显示条码结果 图像预处理和条码增强 对比度太低&#xff1a; scale_image&#xff08;或使用外部程序scale_image_range&#xff09;,增强图像的对比度图像模糊…...

信号完整性基础:高速信号的扩频时钟SSC测试

扩频时钟 SSC 是 Spread Spectrum Clock 的英文缩写&#xff0c;目前很多数字电路芯片都支持 SSC 功能&#xff0c;如&#xff1a;PCIE、USB3.0、SATA 等等。那么扩频时钟是用来做什么的呢&#xff1f; SSC背景&#xff1a; 扩频时钟是出于解决电磁干扰&#xff08;EMI&#…...

stm32移植LCD2002驱动

介绍 LCD2002支持20X2个字符串显示&#xff0c;引脚功能和读写时序跟LCD1602都很像 LCD类型&#xff1a;字符点阵 点 阵 数&#xff1a;202 外形尺寸&#xff1a;116.0mm37.0mm&#xff08;长宽&#xff09; 视域尺寸&#xff1a;83.0mm18.6mm 点 距 离&#xff1a;0.05mm…...

RAG技术深度解析:从基础Agent到复杂推理Deep Search的架构实践

重磅推荐专栏: 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经…...

OpenCV计算摄影学(18)平滑图像中的纹理区域同时保留边缘信息函数textureFlattening()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::textureFlattening 是 OpenCV 中用于图像处理的一个函数&#xff0c;旨在平滑图像中的纹理区域&#xff0c;同时保留边缘信息。该技术特别适…...

“此电脑”中删除WPS云盘方法(百度网盘通用)

&#x1f4e3;此方法适用于卸载WPS云盘后&#xff0c;WPS云盘图标依然在此电脑中显示的问题。 原理&#xff1a;通过注册来进行删除 步骤&#xff1a; WIN键R,打开运行窗口&#xff0c;输入regedit命令&#xff0c;来打开【注册表编辑器】&#xff1b; 从左侧&#xff0c;依…...

Agent革命:Manus如何用工作流拆解掀起AI生产力革命

一、现象级产品的诞生背景 2025年3月6日&#xff0c;一款名为Manus的AI产品在技术圈引发地震式传播。其官方测试数据显示&#xff1a;在GAIA基准测试中&#xff0c;基础任务准确率达86.5%&#xff08;接近人类水平&#xff09;&#xff0c;中高级任务完成率突破57%。这标志着A…...

四款GIS工具箱软件解析:满足企业多样化空间数据需求

概述 随着地理信息系统&#xff08;GIS&#xff09;在城市规划、环境监测、资源管理等领域的广泛应用&#xff0c;各种GIS工具箱软件不断涌现&#xff0c;为用户提供了强大的数据处理、空间分析和地图制图功能。本文将为大家介绍4款GIS工具箱软件&#xff0c;这些软件各具特色…...

nginx 配置403页面(已亲测)

问题&#xff1a;GET请求访问漏洞url即可看到泄露的内网ip 解决方式&#xff1a; 1.配置nginx 不显示真实Ip 2.限制接口只能是POST请求 具体配置&#xff1a; 编写一个403.html 在nginx的配置文件中&#xff0c;配置location参数&#xff1a; location /api/validationCode…...

After Effects的图钉与关键帧动画

姜 子 博 引言 在数字媒体时代&#xff0c;动态图形和视觉效果在信息传播和表达中扮演着越来越重要的角色。After Effects 作为行业领先的软件&#xff0c;提供了丰富的工具和功能&#xff0c;帮助用户创作出令人惊叹的视觉作品。图钉工具和关键帧动画是 AE 中实现复杂动画效…...

SAP DOI EXCEL宏的使用

OAOR里上传EXCEL模版 屏幕初始化PBO创建DOI EXCEL对象&#xff0c;并填充EXCEL内容 *&---------------------------------------------------------------------* *& Module INIT_DOI_DISPLAY_9100 OUTPUT *&--------------------------------------------…...

新编大学应用英语综合教程3 U校园全套参考答案

获取全套答案&#xff1a; 链接&#xff1a;https://pan.quark.cn/s/abaa0338724e...

高考數學。。。

2024上 具体来说&#xff0c;直线的参数方程可以写为&#xff1a; x1t y−t z1t 二、简答题(本大题共5小题&#xff0c;每小题7分&#xff0c;共35分。) 12.数学学习评价不仅要关注结果评价&#xff0c;也要关注过程评价。简要说明过程评价应关注哪几个方面。…...

STM32 子设备通过CAN发送数据到主设备

采集ADC、GPS经纬坐标、温湿度数据、大气压数据通过CAN方式发送给主设备端&#xff0c;帧ID按照如下定义&#xff1a; 我尼玛一个标准帧ID位数据是11位&#xff0c;扩展帧才是111829位&#xff0c;它说最开头的是四位是真类型&#xff0c;并给我如下解释&#xff1a; 它把帧的定…...

HCIA-IP路由动态-RIP

一、概念 动态路由是指路由器通过运行动态路由协议&#xff08;RIP、OSPF等&#xff09;&#xff0c;自动学习和发现网络中的路由信息。路由器之间通过交换路由协议数据包&#xff0c;互相通告自己所知道的网络信息&#xff0c;从而构建和更新路由表。 二、RIP(路由信息协议)…...

CentOS7离线部署安装docker和docker-compose

CentOS7离线部署安装docker和docker-compose 安装包准备 docker下载地址、docker-compose下载地址 docker和docker-compose版本对应关系 注&#xff1a;本次安装部署选择的版本是 docker&#xff1a;docker-28.0.1.tgzdocker-compose&#xff1a;docker-compose-linux-x86_6…...

Sora与AGI的结合:从多模态模型到智能体推理的演进

全文目录&#xff1a; 开篇语前言前言&#xff1a;AGI的挑战与Sora的突破Sora的多模态学习架构&#xff1a;支撑智能体推理的基础1. **多模态学习的核心&#xff1a;信息融合与交叉理解**2. **智能体推理&#xff1a;从感知到决策** Sora如何推动AGI的发展&#xff1a;自主学习…...

Core Speech Kit(基础语音服务)

文章目录 一、Core Speech Kit简介场景介绍约束与限制二、文本转语音1. 场景介绍2. 约束与限制3. 开发步骤4. 设置播报策略设置数字播报策略插入静音停顿指定汉字发音5. 开发实例三、语音识别约束与限制开发步骤开发实例一、Core Speech Kit简介 Core Speech Kit(基础语音服务…...

VsCode 快捷键备忘

移动光标及选择文本 Ctrl ← / → &#xff1a;以单词为单位移动游标Home / End&#xff1a;光标移到行首/行位Ctrl Home / End&#xff1a;光标移到文件首和文件尾Ctrl Shift \&#xff1a;在匹配的分隔符之间跳转 配对的分隔符 是指分隔代码元素的字符&#xff0c;比如字…...

配置 Thunderbird 以使用 QQ 邮箱

配置 Thunderbird 以使用 QQ 邮箱 本片文章的操作系统为 windws 10 &#xff0c;thunder bird 客户端版本为 128.7.1esr(64位)。注意到其他文章的图片中 thunder bird 的 ui 界面和我这个不一样&#xff0c;导致看起来不太方便&#xff0c;所以这里写一篇博客。不同版本的 thu…...

如何使用MyBatis进行多表查询

前言 在实际开发中&#xff0c;对数据库的操作通常会涉及多张表&#xff0c;MyBatis提供了关联映射&#xff0c;这些关联映射可以很好地处理表与表&#xff0c;对象与对象之间的的关联关系。 一对一查询 步骤&#xff1a; 先确定表的一对一关系确定好实体类&#xff0c;添加关…...

第六课:数据存储三剑客:CSV/JSON/MySQL

在Python的数据存储与处理领域&#xff0c;CSV、JSON和MySQL被广大开发者誉为“数据存储三剑客”。它们各自在不同的场景下发挥着重要作用&#xff0c;无论是简单的数据交换、轻量级的数据存储&#xff0c;还是复杂的关系型数据库管理&#xff0c;都能找到它们的身影。本文将详…...

Python通过SSH隧道访问数据库

本文介绍通过sshtunnel类库建立SSH隧道&#xff0c;使用paramiko通过SSH来访问数据库。 实现了两种建立SSH方式&#xff1a;公私钥验证、密码验证。 公私钥可读本地&#xff0c;也可读取Aws S3上的私钥文件。 本质上就是在本机建立SSH隧道&#xff0c;然后将访问DB转发到本地SS…...

Aws batch task 无法拉取ECR 镜像unable to pull secrets or registry auth 问题排查

AWS batch task使用了自定义镜像&#xff0c;在提作业后出现错误 具体错误是ResourceInitializationError: unable to pull secrets or registry auth: The task cannot pull registry auth from Amazon ECR: There is a connection issue between the task and Amazon ECR. C…...

立即释放 Mac 空间!Duplicate File Finder 8 重复文件高速清理工具

Duplicate File Finder 专业的 Mac 重复文件清理工具。查找并删除重复的文件、文件夹&#xff0c;甚至相似的照片。 不要让无用的文件占用磁盘上的宝贵空间。 整理你的 Mac。用最好的重复文件查找器来管理你的文件集合。 扫描任何磁盘或文件夹 主文件夹、照片/音乐库、外部磁…...

quillEditor 禁用复制粘贴图片,以及class转style等问题

<template><div><div class"search-term"><el-form :inline"true" :model"searchInfo" class"demo-form-inline"><el-form-item label"案例标题"><el-input v-model"searchInfo.titl…...

快速掌握EasyOCR应用实战指南

EasyOCR 是一个开源的、支持多语言&#xff08;28种&#xff09;和多文档格式&#xff08;PDF/PNG/JPG/TIFF等&#xff09;的 OCR&#xff08;光学字符识别&#xff09;工具库&#xff0c;由 Hugging Face 团队维护。其目标是简化 OCR 的开发流程&#xff0c;提供易用、高性能的…...

ubuntu22.04本地部署OpenWebUI

一、简介 Open WebUI 是一个可扩展、功能丰富且用户友好的自托管 AI 平台&#xff0c;旨在完全离线运行。它支持各种 LLM 运行器&#xff0c;如 Ollama 和 OpenAI 兼容的 API&#xff0c;并内置了 RAG 推理引擎&#xff0c;使其成为强大的 AI 部署解决方案。 二、安装 方法 …...