大数据-玩转数据-Flink定时器
一、说明
基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行.
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
二、基于处理时间的定时器
package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class ProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});stream.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println(timestamp);out.collect("wo be chu fa le ");}}).print();env.execute();}
}
三、基于事件时间的定时器
package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class EventTime_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element,recordTimestamp) -> element.getTs() * 1000);stream.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {System.out.println(ctx.timestamp());ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println("定时器被触发了");}}).print();env.execute();}
}
相关文章:
大数据-玩转数据-Flink定时器
一、说明 基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registe…...
Linux 操作系统实战视频课 - GPIO 基础介绍
文章目录 一、GPIO 概念说明二、视频讲解沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇我们将讲解 GPIO 。 一、GPIO 概念说明 ARM 平台中的 GPIO(通用输入/输出)是用于与外部设备进行数字输入和输出通信的重要硬件接口。ARM 平台的 GPIO 特性可以根据具体的芯…...
ChatGPT在医疗保健信息管理和电子病历中的应用前景如何?
ChatGPT在医疗保健信息管理和电子病历中有着广阔的应用前景,可以提高医疗保健行业的效率、准确性和可访问性。本文将详细讨论ChatGPT在医疗保健信息管理和电子病历中的应用前景,以及相关的益处和挑战。 ### 1. ChatGPT在医疗保健信息管理中的应用前景 …...
安防监控/视频存储/视频汇聚平台EasyCVR接入海康Ehome车载设备出现收流超时的原因排查
安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。视频汇聚平台既具…...
【zookeeper】zookeeper监控指标查看
zookeeper 监控指标 日常工作中,我们有时候需要对zookeeper集群的状态进行检查,下面分享一些常用的方法。 zookeeper获取监控指标已知的有两种方式: 通过zookeeper自带的四字命令 (four letter words command )获取各…...
Flink 如何处理反压?
分析&回答 什么是反压(backpressure) 反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而…...
JAVA基础-JDBC
本博客记录JAVA基础JDBC部分的学习内容 JDBC基本概念 JDBC : JAVA链接数据库,是JAVA链接数据库的技术的统称,包含如下两部分: 1. JAVA提供的JDBC规范(即各种数据库接口)存储在java.sql 和 javax.sql中的api 2. 各个数…...
嵌入式学习笔记(1)ARM的编程模式和7种工作模式
ARM提供的指令集 ARM态-ARM指令集(32-bit) Thumb态-Thumb指令集(16-bit) Thumb2态-Thumb2指令集(16 & 32 bit) Thumb指令集是对ARM指令集的一个子集重新编码得到的,指令长度为16位。通常在…...
[NSSCTF Round #15NSSCTF 2nd]——Web、Misc、Crypto方向 详细Writeup
前言 虽然师傅们已经尽力了,但是没拿到前十有点可惜,题很好吃,明年再来() 关于wp: 因为我没有学过misc,但是比赛的时候还是运气好出了三道,所以wp就只把做题步骤给出,也…...
Metasploit“MSF”连接postgresql时因排序规则版本不匹配导致无法连接
一、问题 更新Kali之后使用Metasploit时出现一个问题,连接postgresql时因排序规则版本不匹配导致无法连接 警告: database "msf" has a collation version mismatch DETAIL: The database was created using collation version 2.36, but the operati…...
CCF CSP题解:矩阵运算(202305-2)
链接和思路 OJ链接:传送门 本题要求计算1个公式: ( W ⋅ ( Q K T ) ) V \left(\mathbf{W} \cdot (\mathbf{Q} \times \mathbf{K}^{T})\right) \times \mathbf{V} (W⋅(QKT))V 其中, Q \mathbf{Q} Q、 K \mathbf{K} K和 V \mathbf{V} V均…...
划分字母区间【贪心算法】
划分字母区间 给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段,同一字母最多出现在一个片段中。 注意,划分结果需要满足:将所有划分结果按顺序连接,得到的字符串仍然是 s 。返回一个表示每个字符串片段的长度的列表。…...
低代码的探索之路
Gartner发布报告指出,2023年全球低代码开发平台市场规模将达到345亿美元,比2022年增长20%。 目前,国内外已经有许多低代码平台,包括OutSystems、Mendix、Appian、Microsoft Power App等。这些平台提供了丰富的功能和工具ÿ…...
easyUI combobox不可手动输入和禁用
combobox //下拉可用 $("#selectId").combobox(enable); //下拉不可用 $("#selectId").combobox(disable); //该元素可用 $("#selectId").combobox({ disabled: false }); //该元素不可用 $("#selectId").combobox({ disabled: tru…...
RV64和ARM64栈结构差异
RV64和ARM64栈结构差异 1 RV64和ARM64栈结构差异示意图1.1 RV64和ARM64寄存器介绍1.1.1 RV64寄存器1.1.2 ARM64寄存器 1.2 RV64和ARM64栈结构差异示意图 2 RV64和ARM64栈使用示例2.1 测试的程序2.2 RV64反汇编的汇编程序2.3 ARM64反汇编的汇编程序2.4 RV64和ARM64测试程序的栈结…...
将 Python 与 RStudio IDE 配合使用(R与Python系列第一篇)
目录 前言: 1-安装reticulate包 2-安装Python 3-选择Python的默认版本(配置Python环境) 4-使用Python 4.1 运行一个简单的Python脚本 4.2 在RStudio上安装Python模块 4.3 在 R 中调用 Python 模块 4.4 在RStudio上调用Python脚本写的…...
数据库访问性能优化
目录 IO性能分析数据库性能优化漏斗法则1、减少数据访问(减少磁盘访问)(1) 正确的创建并使用索引索引生效场景索引失效场景判断索引是否生效--执行计划 2、返回更少数据(减少网络传输或磁盘访问)(1) 数据分页处理(减少行数)客户端…...
vue 预览 有token验证的 doc、docx、pdf、xlsx、csv、图片 并下载
预览 doc我也不会 //docx <div v-if"previewType docx" ref"iframeDom" style"border: none; width: 100%; height: 100%"></div> import { renderAsync } from "docx-preview"; let iframeDom: any ref(); axios({url…...
WPF数据视图
将集合绑定到ItemsControl控件时,会不加通告的在后台创建数据视图——位于数据源和绑定的控件之间。数据视图是进入数据源的窗口,可以跟踪当前项,并且支持各种功能,如排序、过滤、分组。 这些功能和数据对象本身是相互独立的&…...
C++ new/delete 与 malloc/free 的区别?
new/delete 与 malloc/free 的区别? 分配内存的位置 malloc是从堆上动态分配内存new是从自由存储区为对象动态分配内存。自由存储区的位置取决于operator new的实现。自由存储区不仅可以为堆,还可以是静态存储区,这都看operator new在哪里为…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...
微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?
现有的 Redis 分布式锁库(如 Redisson)相比于开发者自己基于 Redis 命令(如 SETNX, EXPIRE, DEL)手动实现分布式锁,提供了巨大的便利性和健壮性。主要体现在以下几个方面: 原子性保证 (Atomicity)ÿ…...
libfmt: 现代C++的格式化工具库介绍与酷炫功能
libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库,提供了高效、安全的文本格式化功能,是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全:…...
AI语音助手的Python实现
引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...
Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement
Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...
API网关Kong的鉴权与限流:高并发场景下的核心实践
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 引言 在微服务架构中,API网关承担着流量调度、安全防护和协议转换的核心职责。作为云原生时代的代表性网关,Kong凭借其插件化架构…...
【无标题】湖北理元理律师事务所:债务优化中的生活保障与法律平衡之道
文/法律实务观察组 在债务重组领域,专业机构的核心价值不仅在于减轻债务数字,更在于帮助债务人在履行义务的同时维持基本生活尊严。湖北理元理律师事务所的服务实践表明,合法债务优化需同步实现三重平衡: 法律刚性(债…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
