当前位置: 首页 > news >正文

大数据-玩转数据-Flink 水印

一、Flink 中的水印

在Flink的流式操作中, 会涉及不同的时间概念:

1.1 处理时间

是指的执行操作的各个设备的时间,对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据。 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据,然后下个窗口是10:00am-11:00am, 等等。处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调。他提供了最好的性能和最低的延迟。 但是, 在分布式和异步的环境下,处理时间没有办法保证确定性,容易受到数据传递速度的影响: 事件的延迟和乱序。在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器。

1.2 事件时间

是指的这个事件发生的时间。在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在。在事件时间体系中, 时间的进度依赖于数据本身,和任何设备的时间无关。事件时间程序必须制定如何产生Event Time Watermarks(水印) 。假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,这些记录落入该小时。在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器。从1.12开始, Flink内部已经把默认的语义改成了事件时间。

1.3 Flink中的WaterMark

支持event time的流式处理框架需要一种能够测量event time 进度的方式。 比如, 一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间,从而在程序中去关闭这个窗口。事件时间可以不依赖处理时间来表示时间的进度。例如,在程序中, 即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间。另外一方面使用事件时间可以在几秒内处理已经缓存在Kafka中多周的数据,这些数据可以照样被正确处理, 就像实时发生的一样能够进入正确的窗口。这种在Flink中去测量事件时间的进度的机制就是watermark(水印)。

1.4 Flink中如何产生水印

在这里插入图片描述

二、代码集成

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import javax.naming.Context;
import java.time.Duration;public class WatorMark_01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}});stream.assignTimestampsAndWatermarks(wms) // 指定水印和时间戳.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {String msg = "当前key: " + key+ "窗口: [" + ctx.window().getStart() / 1000 + "," + ctx.window().getEnd()/1000 + ") 一共有 "+ elements.spliterator().estimateSize() + "条数据 ";out.collect(msg);}}).print();env.execute();}
}

三、测试结果

在这里插入图片描述
在这里插入图片描述

相关文章:

大数据-玩转数据-Flink 水印

一、Flink 中的水印 在Flink的流式操作中, 会涉及不同的时间概念&#xff1a; 1.1 处理时间 是指的执行操作的各个设备的时间&#xff0c;对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。比如, 一个长度为1个小时的窗口将会包含设备…...

【Apollo】阿波罗自动驾驶系统:驶向未来的智能出行(含源码安装)

前言 Apollo (阿波罗)是一个开放的、完整的、安全的平台&#xff0c;将帮助汽车行业及自动驾驶领域的合作伙伴结合车辆和硬件系统&#xff0c;快速搭建一套属于自己的自动驾驶系统。 开放能力、共享资源、加速创新、持续共赢是 Apollo 开放平台的口号。百度把自己所拥有的强大、…...

网络-Netty

how pipeline.addLast(ChannelHandler)...

如何使用vue-smooth-dnd

Vue Smooth DnD是一个基于Vue的平滑易用的拖放库。它提供了简单易用的API和可自定义的样式。 要使用Vue Smooth DnD&#xff0c;可以按照以下步骤进行操作&#xff1a; 安装Vue Smooth DnD npm install vue-smooth-dnd --save 在组件中引入Vue Smooth DnD import VueSmoot…...

为AWS认证做好准备:一份全面的备考指南

随着云计算的快速发展&#xff0c;越来越多的专业人士选择获取AWS&#xff08;亚马逊网络服务&#xff09;认证。这个认证不仅可以证明你对AWS的理解和专业技能&#xff0c;还有助于你在云计算领域获得更好的工作机会。 以下是一份全面的备考指南&#xff0c;帮助你为AWS认证做…...

尚硅谷SpringMVC

九、HttpMessageConverter...

django的简易的图书管理系统jsp书店进销存源代码MySQL

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 django的简易的图书管理系统 系统有1权限&#xff1a…...

力扣125. 验证回文串

125. 验证回文串 如果在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后&#xff0c;短语正着读和反着读都一样。则可以认为该短语是一个 回文串 。 字母和数字都属于字母数字字符。 给你一个字符串 s&#xff0c;如果它是 回文串 &#xff0c;返回 true &…...

用WebStorm创建Mock数据

WebStorm是一款强大的集成式开发环境&#xff0c;它集成了许多实用的功能&#xff0c;包括Mock数据的创建。 下面是用WebStorm创建Mock数据的步骤&#xff1a; 打开WebStorm&#xff0c;选择一个项目或新建一个项目&#xff1b;在项目中创建一个名为“mock”的文件夹&#xf…...

Python钢筋混凝土结构计算.pdf-已知弯矩确定混凝土梁截面尺寸

计算原理 确定混凝土梁截面的合理尺寸通常需要考虑弯矩、受力要求和约束条件等多个因素。以下是一种常见的计算公式&#xff0c;用于基于已知弯矩确定混凝土梁截面的合理尺寸&#xff1a; 请注意&#xff0c;以上公式仅提供了一种常见的计算方法&#xff0c;并且具体的规范和设…...

【正点原子STM32连载】第二十四章 高级定时器PWM输入模式实验 摘自【正点原子】APM32F407最小系统板使用指南

1&#xff09;实验平台&#xff1a;正点原子stm32f103战舰开发板V4 2&#xff09;平台购买地址&#xff1a;https://detail.tmall.com/item.htm?id609294757420 3&#xff09;全套实验源码手册视频下载地址&#xff1a; http://www.openedv.com/thread-340252-1-1.html# 第二…...

Adapter Tuning Overview:在CV,NLP,多模态领域的代表性工作

文章目录 Delta TuningAdapter Tuning in CVAdapter Tuning in NLP Delta Tuning Adapter Tuning in CV 题目: Learning multiple visual domains with residual adapters 机构&#xff1a;牛津VGG组 论文: https://arxiv.org/pdf/1705.08045.pdf Adapter Tuning in NLP …...

velocity一个基于Java的模板引擎

参考&#xff1a;https://blog.csdn.net/m0_51517236/article/details/126175283 http://www.51gjie.com/javaweb/896.html...

异步servlet

我们日常使用的 SpringMVC&#xff0c;基本上都不是异步 Servlet&#xff0c;而学习 WebFlux&#xff0c;异步 Servlet 是基础&#xff0c;WebFlux。 1.什么是异步 Servlet 先来说说什么是非异步 Servlet。 在 Servlet3.0 之前&#xff0c;Servlet 采用 Thread-Per-Request 的方…...

煤矿皮带运输智能监控算法 opencv

煤矿皮带运输智能监控算法通过opencvpython深度学习算法网络模型&#xff0c;煤矿皮带运输智能监控算法实时监测皮带运输过程中的各种异常情况&#xff0c;如跑偏、撕裂、堆料异常等&#xff0c;一旦检测到异常情况&#xff0c;立即发出告警并采取相应的措施&#xff0c;以保障…...

Docker搭建elasticsearch+kibana测试

最近需要做大数据画像&#xff0c;所以先简单搭建一个eskibana学习使用&#xff0c;记录一下搭建过程和遇到的问题以及解决办法 1.拉取es和kibana镜像 在拉取镜像之前先搜索一下 elasticsearch发现是存在elasticsearch镜像的&#xff0c;我一般习惯性拉取最新镜像&#xff0c…...

QT(C++)-QTreeview节点折叠与展开

文章目录 1、前言2、QTreeview全部展开与折叠3、QTreeview某个节点展开与折叠3.1 节点折叠与展开的信号与槽3.2 槽函数的实现3.3 某个节点展开与折叠 1、前言 最近要用QT开发项目&#xff0c;对QT不是很熟&#xff0c;就根据网上的查到的知识和自己的摸索&#xff0c;将一些经…...

项目 - 后端技术栈转型方案

前言 某开发项目的后端技术栈比较老了&#xff0c;现在想换到新的技术栈上。使用更好的模式、设计思想、更合理的架构等&#xff0c;为未来的需求迭代做铺垫。怎么办呢&#xff1f;假设系统目前在线上运行着的&#xff0c;直接整体换的话耗时太久&#xff0c;且中间还有新的需…...

Oracle权限语句

授予权限&#xff1a;grant 权限 to 用户名; 撤销权限&#xff1a;revoke 权限 from 用户名; 常用&#xff1a; 创建用户&#xff1a; create user zhangsan identified by zhangsan; grant connect, resource to zhangsan; //授权zhangsan用户连接权限 grant create …...

微信小程序发布一个npm包

参考:https://developers.weixin.qq.com/miniprogram/dev/devtools/npm.html 同npm一样流程 npm install weixin_heath_apis...

vscode里如何用git

打开vs终端执行如下&#xff1a; 1 初始化 Git 仓库&#xff08;如果尚未初始化&#xff09; git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?

Otsu 是一种自动阈值化方法&#xff0c;用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理&#xff0c;能够自动确定一个阈值&#xff0c;将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

C# 类和继承(抽象类)

抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

python执行测试用例,allure报乱码且未成功生成报告

allure执行测试用例时显示乱码&#xff1a;‘allure’ &#xfffd;&#xfffd;&#xfffd;&#xfffd;&#xfffd;ڲ&#xfffd;&#xfffd;&#xfffd;&#xfffd;ⲿ&#xfffd;&#xfffd;&#xfffd;Ҳ&#xfffd;&#xfffd;&#xfffd;ǿ&#xfffd;&am…...

九天毕昇深度学习平台 | 如何安装库?

pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子&#xff1a; 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...