当前位置: 首页 > news >正文

大数据-玩转数据-Flink定时器

一、说明

基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行.
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

二、基于处理时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class ProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});stream.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println(timestamp);out.collect("wo be chu fa le ");}}).print();env.execute();}
}

三、基于事件时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class EventTime_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element,recordTimestamp) -> element.getTs() * 1000);stream.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {System.out.println(ctx.timestamp());ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println("定时器被触发了");}}).print();env.execute();}
}

相关文章:

大数据-玩转数据-Flink定时器

一、说明 基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registe…...

Linux 操作系统实战视频课 - GPIO 基础介绍

文章目录 一、GPIO 概念说明二、视频讲解沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇我们将讲解 GPIO 。 一、GPIO 概念说明 ARM 平台中的 GPIO(通用输入/输出)是用于与外部设备进行数字输入和输出通信的重要硬件接口。ARM 平台的 GPIO 特性可以根据具体的芯…...

ChatGPT在医疗保健信息管理和电子病历中的应用前景如何?

ChatGPT在医疗保健信息管理和电子病历中有着广阔的应用前景&#xff0c;可以提高医疗保健行业的效率、准确性和可访问性。本文将详细讨论ChatGPT在医疗保健信息管理和电子病历中的应用前景&#xff0c;以及相关的益处和挑战。 ### 1. ChatGPT在医疗保健信息管理中的应用前景 …...

安防监控/视频存储/视频汇聚平台EasyCVR接入海康Ehome车载设备出现收流超时的原因排查

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。视频汇聚平台既具…...

【zookeeper】zookeeper监控指标查看

zookeeper 监控指标 日常工作中&#xff0c;我们有时候需要对zookeeper集群的状态进行检查&#xff0c;下面分享一些常用的方法。 zookeeper获取监控指标已知的有两种方式&#xff1a; 通过zookeeper自带的四字命令 &#xff08;four letter words command &#xff09;获取各…...

Flink 如何处理反压?

分析&回答 什么是反压&#xff08;backpressure&#xff09; 反压通常是从某个节点传导至数据源并降低数据源&#xff08;比如 Kafka consumer&#xff09;的摄入速率。反压意味着数据管道中某个节点成为瓶颈&#xff0c;处理速率跟不上上游发送数据的速率&#xff0c;而…...

JAVA基础-JDBC

本博客记录JAVA基础JDBC部分的学习内容 JDBC基本概念 JDBC : JAVA链接数据库&#xff0c;是JAVA链接数据库的技术的统称&#xff0c;包含如下两部分&#xff1a; 1. JAVA提供的JDBC规范&#xff08;即各种数据库接口&#xff09;存储在java.sql 和 javax.sql中的api 2. 各个数…...

嵌入式学习笔记(1)ARM的编程模式和7种工作模式

ARM提供的指令集 ARM态-ARM指令集&#xff08;32-bit&#xff09; Thumb态-Thumb指令集&#xff08;16-bit&#xff09; Thumb2态-Thumb2指令集&#xff08;16 & 32 bit&#xff09; Thumb指令集是对ARM指令集的一个子集重新编码得到的&#xff0c;指令长度为16位。通常在…...

[NSSCTF Round #15NSSCTF 2nd]——Web、Misc、Crypto方向 详细Writeup

前言 虽然师傅们已经尽力了&#xff0c;但是没拿到前十有点可惜&#xff0c;题很好吃&#xff0c;明年再来&#xff08;&#xff09; 关于wp&#xff1a; 因为我没有学过misc&#xff0c;但是比赛的时候还是运气好出了三道&#xff0c;所以wp就只把做题步骤给出&#xff0c;也…...

Metasploit“MSF”连接postgresql时因排序规则版本不匹配导致无法连接

一、问题 更新Kali之后使用Metasploit时出现一个问题&#xff0c;连接postgresql时因排序规则版本不匹配导致无法连接 警告: database "msf" has a collation version mismatch DETAIL: The database was created using collation version 2.36, but the operati…...

CCF CSP题解:矩阵运算(202305-2)

链接和思路 OJ链接&#xff1a;传送门 本题要求计算1个公式&#xff1a; ( W ⋅ ( Q K T ) ) V \left(\mathbf{W} \cdot (\mathbf{Q} \times \mathbf{K}^{T})\right) \times \mathbf{V} (W⋅(QKT))V 其中&#xff0c; Q \mathbf{Q} Q、 K \mathbf{K} K和 V \mathbf{V} V均…...

划分字母区间【贪心算法】

划分字母区间 给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。 注意&#xff0c;划分结果需要满足&#xff1a;将所有划分结果按顺序连接&#xff0c;得到的字符串仍然是 s 。返回一个表示每个字符串片段的长度的列表。…...

低代码的探索之路

Gartner发布报告指出&#xff0c;2023年全球低代码开发平台市场规模将达到345亿美元&#xff0c;比2022年增长20%。 目前&#xff0c;国内外已经有许多低代码平台&#xff0c;包括OutSystems、Mendix、Appian、Microsoft Power App等。这些平台提供了丰富的功能和工具&#xff…...

easyUI combobox不可手动输入和禁用

combobox //下拉可用 $("#selectId").combobox(enable); //下拉不可用 $("#selectId").combobox(disable); //该元素可用 $("#selectId").combobox({ disabled: false }); //该元素不可用 $("#selectId").combobox({ disabled: tru…...

RV64和ARM64栈结构差异

RV64和ARM64栈结构差异 1 RV64和ARM64栈结构差异示意图1.1 RV64和ARM64寄存器介绍1.1.1 RV64寄存器1.1.2 ARM64寄存器 1.2 RV64和ARM64栈结构差异示意图 2 RV64和ARM64栈使用示例2.1 测试的程序2.2 RV64反汇编的汇编程序2.3 ARM64反汇编的汇编程序2.4 RV64和ARM64测试程序的栈结…...

将 Python 与 RStudio IDE 配合使用(R与Python系列第一篇)

目录 前言&#xff1a; 1-安装reticulate包 2-安装Python 3-选择Python的默认版本&#xff08;配置Python环境&#xff09; 4-使用Python 4.1 运行一个简单的Python脚本 4.2 在RStudio上安装Python模块 4.3 在 R 中调用 Python 模块 4.4 在RStudio上调用Python脚本写的…...

数据库访问性能优化

目录 IO性能分析数据库性能优化漏斗法则1、减少数据访问&#xff08;减少磁盘访问&#xff09;(1) 正确的创建并使用索引索引生效场景索引失效场景判断索引是否生效--执行计划 2、返回更少数据&#xff08;减少网络传输或磁盘访问&#xff09;(1) 数据分页处理(减少行数)客户端…...

vue 预览 有token验证的 doc、docx、pdf、xlsx、csv、图片 并下载

预览 doc我也不会 //docx <div v-if"previewType docx" ref"iframeDom" style"border: none; width: 100%; height: 100%"></div> import { renderAsync } from "docx-preview"; let iframeDom: any ref(); axios({url…...

WPF数据视图

将集合绑定到ItemsControl控件时&#xff0c;会不加通告的在后台创建数据视图——位于数据源和绑定的控件之间。数据视图是进入数据源的窗口&#xff0c;可以跟踪当前项&#xff0c;并且支持各种功能&#xff0c;如排序、过滤、分组。 这些功能和数据对象本身是相互独立的&…...

C++ new/delete 与 malloc/free 的区别?

new/delete 与 malloc/free 的区别&#xff1f; 分配内存的位置 malloc是从堆上动态分配内存new是从自由存储区为对象动态分配内存。自由存储区的位置取决于operator new的实现。自由存储区不仅可以为堆&#xff0c;还可以是静态存储区&#xff0c;这都看operator new在哪里为…...

Spark 之 入门讲解详细版(1)

1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室&#xff08;Algorithms, Machines, and People Lab&#xff09;开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目&#xff0c;8个月后成为Apache顶级项目&#xff0c;速度之快足见过人之处&…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

Mac软件卸载指南,简单易懂!

刚和Adobe分手&#xff0c;它却总在Library里给你写"回忆录"&#xff1f;卸载的Final Cut Pro像电子幽灵般阴魂不散&#xff1f;总是会有残留文件&#xff0c;别慌&#xff01;这份Mac软件卸载指南&#xff0c;将用最硬核的方式教你"数字分手术"&#xff0…...

【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)

1.获取 authorizationCode&#xff1a; 2.利用 authorizationCode 获取 accessToken&#xff1a;文档中心 3.获取手机&#xff1a;文档中心 4.获取昵称头像&#xff1a;文档中心 首先创建 request 若要获取手机号&#xff0c;scope必填 phone&#xff0c;permissions 必填 …...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

SpringAI实战:ChatModel智能对话全解

一、引言&#xff1a;Spring AI 与 Chat Model 的核心价值 &#x1f680; 在 Java 生态中集成大模型能力&#xff0c;Spring AI 提供了高效的解决方案 &#x1f916;。其中 Chat Model 作为核心交互组件&#xff0c;通过标准化接口简化了与大语言模型&#xff08;LLM&#xff0…...

用递归算法解锁「子集」问题 —— LeetCode 78题解析

文章目录 一、题目介绍二、递归思路详解&#xff1a;从决策树开始理解三、解法一&#xff1a;二叉决策树 DFS四、解法二&#xff1a;组合式回溯写法&#xff08;推荐&#xff09;五、解法对比 递归算法是编程中一种非常强大且常见的思想&#xff0c;它能够优雅地解决很多复杂的…...

PH热榜 | 2025-06-08

1. Thiings 标语&#xff1a;一套超过1900个免费AI生成的3D图标集合 介绍&#xff1a;Thiings是一个不断扩展的免费AI生成3D图标库&#xff0c;目前已有超过1900个图标。你可以按照主题浏览&#xff0c;生成自己的图标&#xff0c;或者下载整个图标集。所有图标都可以在个人或…...

Mac flutter环境搭建

一、下载flutter sdk 制作 Android 应用 | Flutter 中文文档 - Flutter 中文开发者网站 - Flutter 1、查看mac电脑处理器选择sdk 2、解压 unzip ~/Downloads/flutter_macos_arm64_3.32.2-stable.zip \ -d ~/development/ 3、添加环境变量 命令行打开配置环境变量文件 ope…...