当前位置: 首页 > news >正文

Flink(java版)

watermark

时间语义和 watermark

注意:数据进入flink的时间:如果用这个作为时间语义就不存在问题,但是开发中往往会用处理时间
作为时间语义这里就需要考虑延时的问题。
如上图,数据从kafka中获取出来,从多个分区中获取,这时候时间肯定有乱序,这时候就需要使用事
件时间。

场景:游戏连续过五关,给予奖励
地铁里面玩游戏,连过三关断网了,二分钟过了八关。这时候是用处理时间还是事件时间呢?
处理时间的优势:牺牲一定的数据准确性,没有延迟

package com.atguigu.apitest.window;/**import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认为当前机器的cpu的最大核数//env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setAutoWatermarkInterval(100);// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型,分配时间戳和watermarkDataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));})// 乱序数据设置时间戳和watermark.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {};// 基于事件时间的开窗聚合,统计15秒内温度的最小值SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1)).sideOutputLateData(outputTag).minBy("temperature");minTempStream.print("minTemp");minTempStream.getSideOutput(outputTag).print("late");env.execute();}
}
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718211,32.8
sensor_1,1547718212,37.1注意:第一个窗口是[1547718195,1547718210);

sensor_1,1547718213,33
sensor_1,1547718224,32.1
sensor_1,1547718225,31.6
sensor_1,1547718226,21.2
sensor_1,1547718227,33.6第二个窗口大小:第一个窗口是[1547718210,1547718225);

1.理想状态:来一条数据处理一条,每条数据代表对时间推进;如图到5之后就将【0,5)的窗口关闭并输出;2.乱序状态:原因:网络延迟、分布式、分区导致乱序数据产生;网络延迟和分布式处理造成的乱序都是几十毫秒和几百毫秒的范围的差距;这将回造成大多数延迟数据集中在几十毫秒和几百毫秒的范围内;3.解决方案:将时间事件放慢

flink的三重保证:1.设置watermaker将几百毫秒的数据全部输出;2.先输出一个近似的结果,但是不要关闭窗口后面延迟的时间还需要更新;3.当延时时间到了,窗口就关闭了;兜底方案使用侧输出流保证数据不丢失;注意:数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据都已经到
达了,因此,window 的执行也是由 Watermark 触发的。
6 3 2 5 4 1 
比如设置3秒的watermaker:
到达5:说明2秒之前的数据都到齐了,后面2,3都可以输出
到达6:说明3秒之前的数据都到齐了,大于等于3秒的数据才能输出意义:watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太
小数据就不准确,需要通过具体的业务场景去平衡这个值;

watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太小,乱序数据
没有搞定,数据就不准确,需要通过具体的业务场景去平衡这个值;如何找到watermaker:首先要了解乱序程度;
解决方案:通过机器学习构建一个模型,构建当前业务模型中的延迟状态的分布情况;

如图:大部分的延时数据都20ms和80ms之间的范围中,这时候设置80ms就搞定大部分乱序数据;
这时候还有很少的数据,如果对数据准确性要求比较高,这时候就需要设置窗口迟到机制去保证
数据的准备性;最后还有网络延迟的数据还是没有输出这时候就需要添加侧输出流作为兜底方案。

 watermark 生成问题

默认:来一条生产一条watermaker,如果短时间数据量比较大,会造成watermaker都一样造成资
源浪费;周期性添加watermaker:每隔一段时间更新一下watermaker 
周期性时间缺点:实时性不好;数据过于分散会造成资源浪费;如何选择:看数据的分布,过于集中使用周期性生成模式,数据稀疏,使用默认的模型;

状态编程 

需求:我们可以利用 Keyed state,实现这样一个需求: 检测传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警

package com.atguigu.apitest.state;/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved* <p>* Project: FlinkTutorial* Package: com.atguigu.apitest.state* Version: 1.0* <p>* Created by wushengran on 2020/11/10 16:33*/import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @ClassName: StateTest3_KeyedStateApplicationCase* @Description:* @Author: wushengran on 2020/11/10 16:33* @Version: 1.0*/
public class StateTest3_KeyedStateApplicationCase {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 定义一个flatmap操作,检测温度跳变,输出报警SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));resultStream.print();env.execute();}// 实现自定义函数类public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>>{// 私有属性,温度跳变阈值private Double threshold;public TempChangeWarning(Double threshold) {this.threshold = threshold;}// 定义状态,保存上一次的温度值private ValueState<Double> lastTempState;@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));}@Overridepublic void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {// 获取状态Double lastTemp = lastTempState.value();// 如果状态不为null,那么就判断两次温度差值if( lastTemp != null ){Double diff = Math.abs( value.getTemperature() - lastTemp );if( diff >= threshold )out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));}// 更新状态lastTempState.update(value.getTemperature());}@Overridepublic void close() throws Exception {lastTempState.clear();}}
}
sensor_1,1547718206,36.3
sensor_1,1547718206,37.9
sensor_1,1547718206,48
sensor_6,1547718201,15.4
sensor_6,1547718201,35
sensor_1,1547718226,36

 状态后端

状态后端: 1.本地的状态管理(如何存,上下文配置,怎么存,怎么写)  2.做快照容错,如何恢复数据

1. 测试环境:MemoryStateBackend
2. 生产环境:FsStateBackend 
3. 数据非常大时候:RocksDBStateBackend
state.backend: filesystem //默认使用FsStateBackend 
tate.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints 
//配置一个checkpoint的hdfs的存储路径jobmanager.execution.failover-strategy: region //区域化重启state.backend.incremental: false  //增量添加checkpoint

相关文章:

Flink(java版)

watermark 时间语义和 watermark 注意:数据进入flink的时间&#xff1a;如果用这个作为时间语义就不存在问题&#xff0c;但是开发中往往会用处理时间 作为时间语义这里就需要考虑延时的问题。 如上图&#xff0c;数据从kafka中获取出来&#xff0c;从多个分区中获取&#xf…...

什么是动态组件以及使用场景

文章目录 一、vue中的动态组件是什么&#xff1f;有什么用&#xff1f;二、使用demo1.tab页签中的使用2.模拟新闻页demo 三、用keep-alive包裹&#xff0c;保持状态总结 一、vue中的动态组件是什么&#xff1f;有什么用&#xff1f; 动态组件指可以动态切换组件的显示和隐藏。…...

CRM销售管理系统如何提高销售效率

CRM销售管理系统是帮助企业对销售活动进行管理、执行和优化的软件系统。它可以帮助企业提高销售效率&#xff0c;提高客户转化率&#xff0c;实现企业的业绩增长。那么&#xff0c;CRM销售管理系统好用吗&#xff1f; CRM销售管理系统的功能 线索管理&#xff1a; CRM系统可…...

纯小白安卓刷机1

文章目录 常见的英文意思刷机是什么&#xff1f;为什么要刷机&#xff1f;什么是BL锁&#xff08;BootLoader锁&#xff09;&#xff1f;我的机能够刷机吗&#xff1f;什么是Boot镜像/分区&#xff1f;什么是Recovery镜像/分区&#xff08;缩写为rec&#xff09;&#xff1f;什…...

C高级day4循环语句

1&#xff0c;思维导图 运行结果为&#xff1a; 运行结果为&#xff1a;...

Linux 操作系统云服务器安装部署 Tomcat 服务器详细教程

Tomcat 基本概述 Tomcat 服务器是Apache软件基金会&#xff08;Apache Software Foundation&#xff09;的 Jakarta 项目中的一个核心项目&#xff0c;由 Apache、Sun 和其他一些公司及个人共同开发而成。它是一个免费的开放源代码的 Web 应用服务器&#xff0c;属于轻量级应用…...

【易盾点选】

拿官网的点选做个例子吧&#xff0c;比较省事&#xff0c;水一篇~ ​ 官网的接口目前都改成V3了&#xff0c;多了个dt参数&#xff0c;以及加密的一个函数也变动了下 点选坐标在这&#xff0c;加密函数未变&#xff0c;用逗号拼接 整个加密里的函数变了&#xff0c;直接重新…...

vue中打印指定dom元素

和window.print()效果一样&#xff0c;调出打印窗口&#xff0c;只是当前使用的插件是vue-print-nb 官网地址&#xff1a;vue-print-nb vue2中使用 安装插件 npm install vue-print-nb --save导入插件 import Print from vue-print-nb // 全局使用 Vue.use(Print);//or// 单…...

OpenCV(三十六):霍夫直线检测

1.检测直线的霍夫变换原理 2.检测直线函数HoughLines() 检测直线流程: Step1:将参数空间的坐标轴离散化。 Step2:将图像中每个非0像素通过映射关系求取在参数空间通过的方格 Step3:统计参数空间内每个方格出现的次数&#xff0c;选取次数大于某一值的方格作为表示直线的方格…...

文心一言 VS 讯飞星火 VS chatgpt (91)-- 算法导论8.3 4题

四、用go语言&#xff0c;说明如何在 O(n) 时间内&#xff0c;对 0 到 n 3 − 1 n^3-1 n3−1 区间内的n个整数进行排序。 文心一言&#xff1a; 在 O(n) 时间内对 0 到 n^3-1 区间内的 n 个整数进行排序&#xff0c;可以使用基数排序&#xff08;Radix Sort&#xff09;算法…...

keep-alive缓存三级及三级以上路由

需求需要缓存这个出入记录&#xff0c;当tab切换时不重新加载&#xff0c;当刷新页面时&#xff0c;或把这个关闭在重新打开时重新加载如图&#xff1a; &#xff08;我这里用的是芋道源码的前端框架) keep-alive 1、include 包含页面组件name的这些组件页面&#xff0c;会被…...

vite vue项目 运行时 \esbuild\esbuild.exe 缺失 错误码 errno: -4058, code: ‘ENOENT‘,

vite vue项目运行 npm run dev 报错某个模块启动文件丢失信息 D:\PengYe_code\2\vite-vue3-admin>npm run dev> vite-vue3-admin1.0.2 dev > vitenode:events:504throw er; // Unhandled error event^Error: spawn D:\PengYe_code\2\vite-vue3-admin\node_modules\vi…...

favicon.ico网站图标不显示问题 Failed to load resource: net::ERR_FILE_NOT_FOU

上述问题主要由于网站的小图标无法显示导致的&#xff1a;可以检查如下部分&#xff1a; 1、是否存在一个favicon.ico文件在根目录下 2、如果存在&#xff0c;看是否写的相对路径&#xff1a;改为绝对路径 <link rel"shortcut icon" href"../favicon.ico&quo…...

微服务·架构组件之服务注册与发现-Nacos

微服务组件架构之服务注册与发现之Nacos Nacos服务注册与发现流程 服务注册&#xff1a;Nacos 客户端会通过发送REST请求的方式向Nacos Server注册自己的服务&#xff0c;提供自身的元数据&#xff0c;比如ip地址、端口等信息。 Nacos Server接收到注册请求后&#xff0c;就会…...

Linux驱动【day2】

mychrdev.c: #include <linux/init.h> #include <linux/module.h> #include <linux/fs.h> #include<linux/uaccess.h> #include<linux/io.h> #include"head.h" unsigned int major; // 保存主设备号 char kbuf[128]{0}; unsigned int…...

4、Nginx 配置实例-反向代理

文章目录 4、nginx 配置实例-反向代理4.1 反向代理实例一4.1.1 实验代码 4.3 反向代理实例二4.3.1 实验代码 【尚硅谷】尚硅谷Nginx教程由浅入深 志不强者智不达&#xff1b;言不信者行不果。 4、nginx 配置实例-反向代理 4.1 反向代理实例一 实现效果&#xff1a;使用 nginx…...

2023年世界机器人大会回顾

1、前记&#xff1a; 本次记录是我自己去世界机器人博览会参观的一些感受&#xff0c;所有回顾为个人感兴趣部分的机器人产品分享。整个参观下来最大的感受就是科学技术、特别是机器人技术和人工智能毫无疑问地、广泛的应用在我们日常生活的方方面面&#xff0c;在安全巡检、特…...

Mac系统 AndroidStudio Missing essential plugin:org.jetbrains.android报错

打开Android Studio,提示 Missing essential plugin:org.jetbrains.android错误&#xff0c;产生的原因是Kotlin被禁用。 解决的方法是删除disabled_plugins.txt&#xff0c;Mac OS对应的路径为&#xff1a; /Users/xzh/Library/Application Support/Google/AndroidStudio202…...

读书笔记:多Transformer的双向编码器表示法(Bert)-1

多Transformer的双向编码器表示法 Bidirectional Encoder Representations from Transformers&#xff0c;即Bert&#xff1b; 本笔记主要是对谷歌Bert架构的入门学习&#xff1a; 介绍Transformer架构&#xff0c;理解编码器和解码器的工作原理&#xff1b;掌握Bert模型架构…...

第二证券:股利支付率和留存收益率的关系?

股利付出率和留存收益率是股票出资中非常重要的目标&#xff0c;它们可以反映公司的盈余才能和未来开展的潜力。那么&#xff0c;二者之间究竟有什么联系呢&#xff1f; 一、股利付出率和留存收益率的定义 股利付出率是指公司向股东分配的股息占当期净利润的比例&#xff0c;通…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

ios苹果系统,js 滑动屏幕、锚定无效

现象&#xff1a;window.addEventListener监听touch无效&#xff0c;划不动屏幕&#xff0c;但是代码逻辑都有执行到。 scrollIntoView也无效。 原因&#xff1a;这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作&#xff0c;从而会影响…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

Qt 事件处理中 return 的深入解析

Qt 事件处理中 return 的深入解析 在 Qt 事件处理中&#xff0c;return 语句的使用是另一个关键概念&#xff0c;它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别&#xff1a;不同层级的事件处理 方…...

Linux 下 DMA 内存映射浅析

序 系统 I/O 设备驱动程序通常调用其特定子系统的接口为 DMA 分配内存&#xff0c;但最终会调到 DMA 子系统的dma_alloc_coherent()/dma_alloc_attrs() 等接口。 关于 dma_alloc_coherent 接口详细的代码讲解、调用流程&#xff0c;可以参考这篇文章&#xff0c;我觉得写的非常…...

6️⃣Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙

Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙 一、前言:离区块链还有多远? 区块链听起来可能遥不可及,似乎是只有密码学专家和资深工程师才能涉足的领域。但事实上,构建一个区块链的核心并不复杂,尤其当你已经掌握了一门系统编程语言,比如 Go。 要真正理解区…...