当前位置: 首页 > news >正文

flinkSql中累计窗口CUMULATE

eventTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _05_flinkSql_Cumulate_eventTime {/*** 累积窗口 + eventTime* 1 分钟 每十秒计算一次 3秒水印* 数据格式* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:43.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:53.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:03.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:13.000"}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` TIMESTAMP(3),\n" +"   watermark for event_time as event_time - interval '3' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

processTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _06_flinkSql_Cumulate_processTime {/*** 累积窗口 + processTime* 1 分钟 每十秒计算一次* 数据格式* {"username":"zs","price":20}* {"username":"lisi","price":15}* {"username":"lisi","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` as proctime()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

topN案例

需求:在每个分钟内找出点击量最多的Top 3网页。 滚动窗口(1分钟)+eventTime+3秒水印hive sqlwith t1 as (select page_id,sum(clicks)  totalSum  from  table1group by page_id
), t2 as(select page_id,totalSum,row_number() over ( order by totalSum desc) px from t1 
) select  * from t2 where px <=3flink sqlwith t1 as (select window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(event_time), INTERVAL '60' second )) group by window_start,window_end,page_id
), t2 as(select window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 
) select  * from t2 where px <=3* 数据格式
{"ts": "2023-09-05 12:00:10", "page_id": 1, "clicks": 100}
{"ts": "2023-09-05 12:00:20", "page_id": 2, "clicks": 90}
{"ts": "2023-09-05 12:00:30", "page_id": 3, "clicks": 110}
{"ts": "2023-09-05 12:00:40", "page_id": 4, "clicks": 23}
{"ts": "2023-09-05 12:00:50", "page_id": 5, "clicks": 456}
{"ts": "2023-09-05 12:00:55", "page_id": 5, "clicks": 456}
// 触发数据
{"ts": "2023-09-05 12:01:03", "page_id": 5, "clicks": 456}
package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _07_flinkSql_topN {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表//3. 通过sql语句统计结果tenv.executeSql("CREATE TABLE table1 (\n" +"    `page_id` INT,\n" +"    `clicks` INT,\n" +"  `ts` TIMESTAMP(3) ,\n" +"   watermark for ts as ts - interval '3' second \n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");tenv.executeSql("with t1 as (\n" +"\tselect window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(ts), INTERVAL '60' second )) group by window_start,window_end,page_id\n" +"), t2 as(\n" +"\tselect window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 \n" +") select  * from t2 where px <=3").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

相关文章:

flinkSql中累计窗口CUMULATE

eventTime package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _05_flinkSql_Cumulate_eventTime {/*** 累积窗口 eventTime* …...

关于在ubuntu上无法运行EasyConnect的解决方法

需要这三个文件 libpangocairo-1.0-0_1.40.14-1_amd64.deb libpangoft2-1.0-0_1.40.14-1_amd64.deb libpango-1.0-0_1.40.14-1_amd64.deb然后执行 cp source /usr/share/sangfor/EasyConnect再重启EasyConnect即可 下载链接 http://kr.archive.ubuntu.com/ubuntu/pool/main/…...

【Axure高保真原型】数值条件分组

今天和大家分享数值条件分组的原型模板&#xff0c;效果包括&#xff1a; 点击添加分组按钮&#xff0c;可以显示添加弹窗&#xff0c;填写分组名称和数值区间后&#xff0c;可以新增该分组信息‘’ 修改分组区间&#xff0c;可以直接在输入框里修改已有的分组区间&#xff0c…...

python学习——字符串的拼接操作

在Python中&#xff0c;字符串拼接是一项基本操作&#xff0c;用于将多个字符串合并成一个字符串。以下是几种常见的字符串拼接方式&#xff1a; 1. 使用 运算符 最简单和直接的方式是使用 运算符来拼接字符串。 str1 "Hello, " str2 "World!" resu…...

多线程篇-8--线程安全(死锁,常用保障安全的方法,安全容器,原子类,Fork/Join框架等)

1、线程安全和不安全定义 &#xff08;1&#xff09;、线程安全 线程安全是指一个类或方法在被多个线程访问的情况下可以正确得到结果&#xff0c;不会出现数据不一致或其他错误行为。 线程安全的条件 1、原子性&#xff08;Atomicity&#xff09; 多个操作要么全部完成&a…...

el-select的搜索功能

el-select的相关信息&#xff1a; 最基本信息 v-model的值为当前被选中的el-option的 value 属性值 :label是选择器可以看到的内容 过滤搜索 普通过滤搜索 <el-selectv-model"selectedCountry"placeholder"请选择国家"filterable:loading"lo…...

MFC实现全屏功能

之前全屏都是参考&#xff1a; MFC单文档&#xff08;SDI&#xff09;全屏程序的实现 主要思路就是将各种菜单工具栏隐藏恢复。 随着MFC的升级&#xff0c;MFC框架本身就具备了全屏的功能。 微软有一个全屏实现类&#xff1a; CFullScreenImpl Class managing full-screen mod…...

网络安全技术详解:虚拟专用网络(VPN) 安全信息与事件管理(SIEM)

虚拟专用网络&#xff08;VPN&#xff09;详细介绍 虚拟专用网络&#xff08;VPN&#xff09;通过在公共网络上创建加密连接来保护数据传输的安全性和隐私性。 工作原理 VPN的工作原理涉及建立安全隧道和数据加密&#xff1a; 隧道协议&#xff1a;使用协议如PPTP、L2TP/IP…...

v-model 根据后端接口返回的数据动态地确定要绑定的变量

在 Vue 中&#xff0c;v-model 是用于创建双向绑定的指令。通常&#xff0c;它用于与组件或表单元素的值进行绑定。但有时你可能需要根据后端接口返回的数据动态地确定要绑定的变量。 你可以通过以下步骤来实现这个需求&#xff1a; 步骤 1: 获取后端接口数据 首先&#xff…...

图形开发基础之在WinForms中使用OpenTK.GLControl进行图形绘制

前言 GLControl 是 OpenTK 库中一个重要的控件&#xff0c;专门用于在 Windows Forms 应用程序中集成 OpenGL 图形渲染。通过 GLControl&#xff0c;可以轻松地将 OpenGL 的高性能图形绘制功能嵌入到传统的桌面应用程序中。 1. GLControl 的核心功能 OpenGL 渲染上下文&…...

离散数学重点复习

第一章.集合论 概念 1.集合是不能精确定义的基本数学概念.通常是由指定范围内的满足给定条件的所有对象聚集在一起构成的 2.制定范围内的每一个对象称为这个集合的元素 3.固定符号如下: N:自然数集合 Z:整数集合 Q:有理数集合 R:实数集合 C:复数集合 4.集合中的元素是…...

Javaweb梳理21——Servlet

Javaweb梳理21——Servlet 21 Servlet21.1 简介21.3 执行流程21.4 生命周期4.5 方法介绍21.6 体系结构21.7 urlPattern配置21.8 XML配置 21 Servlet 21.1 简介 Servlet是JavaWeb最为核心的内容&#xff0c;它是Java提供的一门动态web资源开发技术。使用Servlet就可以实现&…...

推荐学习笔记:矩阵补充和矩阵分解

参考&#xff1a; 召回 fun-rec/docs/ch02/ch2.1/ch2.1.1/mf.md at master datawhalechina/fun-rec GitHub 业务 隐语义模型与矩阵分解 协同过滤算法的特点&#xff1a; 协同过滤算法的特点就是完全没有利用到物品本身或者是用户自身的属性&#xff0c; 仅仅利用了用户与…...

etcd分布式存储系统快速入门指南

在分布式系统的复杂世界中&#xff0c;确保有效的数据管理至关重要。分布式可靠的键值存储在维护跨分布式环境的数据一致性和可伸缩性方面起着关键作用。 在这个全面的教程中&#xff0c;我们将深入研究etcd&#xff0c;这是一个开源的分布式键值存储。我们将探索其基本概念、特…...

解决VUE3 Vite打包后动态图片资源不显示问题

解决VUE3 Vite打包后动态图片资源不显示问题 <script setup> let url ref()const setimg (item)>{let src ../assets/image/${e}.pngurl.value src }</script><template><div v-for"item in 6"><h1 click"setimg(item)"…...

大数据新视界 -- 大数据大厂之 Hive 临时表与视图:灵活数据处理的技巧(上)(29 / 30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…...

Android学习14--charger

1 概述 最近正好在做关机充电这个&#xff0c;就详细看看吧。还是本着保密的原则&#xff0c;项目里的代码也不能直接用&#xff0c;这里就用的Github的。https://github.com/aosp-mirror 具体位置是&#xff1a;https://github.com/aosp-mirror/platform_system_core/tree/mai…...

页面开发样式和布局入门:Vite + Vue 3 + Less

页面开发样式和布局入门&#xff1a;Vite Vue 3 Less 引言 在现代前端开发中&#xff0c;样式和布局是页面开发的核心部分。随着技术的不断发展&#xff0c;Vite、Vue 3和Less等工具和框架的出现&#xff0c;使得前端开发变得更加高效和灵活。然而&#xff0c;尽管这些工具…...

瑞芯微RK3566/RK3568开发板安卓11固件ROOT教程,Purple Pi OH演示

本文介绍RK3566/RK3568开发板Android11系统&#xff0c;编译ROOT权限固件的方法。触觉智能Purple Pi OH鸿蒙开发板演示&#xff0c;搭载了瑞芯微RK3566四核处理器&#xff0c;Laval鸿蒙社区推荐开发板&#xff0c;已适配全新OpenHarmony5.0 Release系统&#xff0c;SDK源码全开…...

Netty 入门应用:结合 Redis 实现服务器通信

在上篇博客中&#xff0c;我们了解了 Netty 的基本概念和架构。本篇文章将带你深入实践&#xff0c;构建一个简单的 Netty 服务端&#xff0c;并结合 Redis 实现一个数据存取的示例。在这个场景中&#xff0c;Redis 作为缓存存储&#xff0c;Netty 作为服务端处理客户端请求。通…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

Android15默认授权浮窗权限

我们经常有那种需求&#xff0c;客户需要定制的apk集成在ROM中&#xff0c;并且默认授予其【显示在其他应用的上层】权限&#xff0c;也就是我们常说的浮窗权限&#xff0c;那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

laravel8+vue3.0+element-plus搭建方法

创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?

Redis 的发布订阅&#xff08;Pub/Sub&#xff09;模式与专业的 MQ&#xff08;Message Queue&#xff09;如 Kafka、RabbitMQ 进行比较&#xff0c;核心的权衡点在于&#xff1a;简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...

GitFlow 工作模式(详解)

今天再学项目的过程中遇到使用gitflow模式管理代码&#xff0c;因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存&#xff0c;无论是github还是gittee&#xff0c;都是一种基于git去保存代码的形式&#xff0c;这样保存代码…...