当前位置: 首页 > news >正文

flinkSql中累计窗口CUMULATE

eventTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _05_flinkSql_Cumulate_eventTime {/*** 累积窗口 + eventTime* 1 分钟 每十秒计算一次 3秒水印* 数据格式* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:43.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:53.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:03.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:13.000"}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` TIMESTAMP(3),\n" +"   watermark for event_time as event_time - interval '3' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

processTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _06_flinkSql_Cumulate_processTime {/*** 累积窗口 + processTime* 1 分钟 每十秒计算一次* 数据格式* {"username":"zs","price":20}* {"username":"lisi","price":15}* {"username":"lisi","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` as proctime()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

topN案例

需求:在每个分钟内找出点击量最多的Top 3网页。 滚动窗口(1分钟)+eventTime+3秒水印hive sqlwith t1 as (select page_id,sum(clicks)  totalSum  from  table1group by page_id
), t2 as(select page_id,totalSum,row_number() over ( order by totalSum desc) px from t1 
) select  * from t2 where px <=3flink sqlwith t1 as (select window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(event_time), INTERVAL '60' second )) group by window_start,window_end,page_id
), t2 as(select window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 
) select  * from t2 where px <=3* 数据格式
{"ts": "2023-09-05 12:00:10", "page_id": 1, "clicks": 100}
{"ts": "2023-09-05 12:00:20", "page_id": 2, "clicks": 90}
{"ts": "2023-09-05 12:00:30", "page_id": 3, "clicks": 110}
{"ts": "2023-09-05 12:00:40", "page_id": 4, "clicks": 23}
{"ts": "2023-09-05 12:00:50", "page_id": 5, "clicks": 456}
{"ts": "2023-09-05 12:00:55", "page_id": 5, "clicks": 456}
// 触发数据
{"ts": "2023-09-05 12:01:03", "page_id": 5, "clicks": 456}
package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _07_flinkSql_topN {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表//3. 通过sql语句统计结果tenv.executeSql("CREATE TABLE table1 (\n" +"    `page_id` INT,\n" +"    `clicks` INT,\n" +"  `ts` TIMESTAMP(3) ,\n" +"   watermark for ts as ts - interval '3' second \n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");tenv.executeSql("with t1 as (\n" +"\tselect window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(ts), INTERVAL '60' second )) group by window_start,window_end,page_id\n" +"), t2 as(\n" +"\tselect window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 \n" +") select  * from t2 where px <=3").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

相关文章:

flinkSql中累计窗口CUMULATE

eventTime package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _05_flinkSql_Cumulate_eventTime {/*** 累积窗口 eventTime* …...

关于在ubuntu上无法运行EasyConnect的解决方法

需要这三个文件 libpangocairo-1.0-0_1.40.14-1_amd64.deb libpangoft2-1.0-0_1.40.14-1_amd64.deb libpango-1.0-0_1.40.14-1_amd64.deb然后执行 cp source /usr/share/sangfor/EasyConnect再重启EasyConnect即可 下载链接 http://kr.archive.ubuntu.com/ubuntu/pool/main/…...

【Axure高保真原型】数值条件分组

今天和大家分享数值条件分组的原型模板&#xff0c;效果包括&#xff1a; 点击添加分组按钮&#xff0c;可以显示添加弹窗&#xff0c;填写分组名称和数值区间后&#xff0c;可以新增该分组信息‘’ 修改分组区间&#xff0c;可以直接在输入框里修改已有的分组区间&#xff0c…...

python学习——字符串的拼接操作

在Python中&#xff0c;字符串拼接是一项基本操作&#xff0c;用于将多个字符串合并成一个字符串。以下是几种常见的字符串拼接方式&#xff1a; 1. 使用 运算符 最简单和直接的方式是使用 运算符来拼接字符串。 str1 "Hello, " str2 "World!" resu…...

多线程篇-8--线程安全(死锁,常用保障安全的方法,安全容器,原子类,Fork/Join框架等)

1、线程安全和不安全定义 &#xff08;1&#xff09;、线程安全 线程安全是指一个类或方法在被多个线程访问的情况下可以正确得到结果&#xff0c;不会出现数据不一致或其他错误行为。 线程安全的条件 1、原子性&#xff08;Atomicity&#xff09; 多个操作要么全部完成&a…...

el-select的搜索功能

el-select的相关信息&#xff1a; 最基本信息 v-model的值为当前被选中的el-option的 value 属性值 :label是选择器可以看到的内容 过滤搜索 普通过滤搜索 <el-selectv-model"selectedCountry"placeholder"请选择国家"filterable:loading"lo…...

MFC实现全屏功能

之前全屏都是参考&#xff1a; MFC单文档&#xff08;SDI&#xff09;全屏程序的实现 主要思路就是将各种菜单工具栏隐藏恢复。 随着MFC的升级&#xff0c;MFC框架本身就具备了全屏的功能。 微软有一个全屏实现类&#xff1a; CFullScreenImpl Class managing full-screen mod…...

网络安全技术详解:虚拟专用网络(VPN) 安全信息与事件管理(SIEM)

虚拟专用网络&#xff08;VPN&#xff09;详细介绍 虚拟专用网络&#xff08;VPN&#xff09;通过在公共网络上创建加密连接来保护数据传输的安全性和隐私性。 工作原理 VPN的工作原理涉及建立安全隧道和数据加密&#xff1a; 隧道协议&#xff1a;使用协议如PPTP、L2TP/IP…...

v-model 根据后端接口返回的数据动态地确定要绑定的变量

在 Vue 中&#xff0c;v-model 是用于创建双向绑定的指令。通常&#xff0c;它用于与组件或表单元素的值进行绑定。但有时你可能需要根据后端接口返回的数据动态地确定要绑定的变量。 你可以通过以下步骤来实现这个需求&#xff1a; 步骤 1: 获取后端接口数据 首先&#xff…...

图形开发基础之在WinForms中使用OpenTK.GLControl进行图形绘制

前言 GLControl 是 OpenTK 库中一个重要的控件&#xff0c;专门用于在 Windows Forms 应用程序中集成 OpenGL 图形渲染。通过 GLControl&#xff0c;可以轻松地将 OpenGL 的高性能图形绘制功能嵌入到传统的桌面应用程序中。 1. GLControl 的核心功能 OpenGL 渲染上下文&…...

离散数学重点复习

第一章.集合论 概念 1.集合是不能精确定义的基本数学概念.通常是由指定范围内的满足给定条件的所有对象聚集在一起构成的 2.制定范围内的每一个对象称为这个集合的元素 3.固定符号如下: N:自然数集合 Z:整数集合 Q:有理数集合 R:实数集合 C:复数集合 4.集合中的元素是…...

Javaweb梳理21——Servlet

Javaweb梳理21——Servlet 21 Servlet21.1 简介21.3 执行流程21.4 生命周期4.5 方法介绍21.6 体系结构21.7 urlPattern配置21.8 XML配置 21 Servlet 21.1 简介 Servlet是JavaWeb最为核心的内容&#xff0c;它是Java提供的一门动态web资源开发技术。使用Servlet就可以实现&…...

推荐学习笔记:矩阵补充和矩阵分解

参考&#xff1a; 召回 fun-rec/docs/ch02/ch2.1/ch2.1.1/mf.md at master datawhalechina/fun-rec GitHub 业务 隐语义模型与矩阵分解 协同过滤算法的特点&#xff1a; 协同过滤算法的特点就是完全没有利用到物品本身或者是用户自身的属性&#xff0c; 仅仅利用了用户与…...

etcd分布式存储系统快速入门指南

在分布式系统的复杂世界中&#xff0c;确保有效的数据管理至关重要。分布式可靠的键值存储在维护跨分布式环境的数据一致性和可伸缩性方面起着关键作用。 在这个全面的教程中&#xff0c;我们将深入研究etcd&#xff0c;这是一个开源的分布式键值存储。我们将探索其基本概念、特…...

解决VUE3 Vite打包后动态图片资源不显示问题

解决VUE3 Vite打包后动态图片资源不显示问题 <script setup> let url ref()const setimg (item)>{let src ../assets/image/${e}.pngurl.value src }</script><template><div v-for"item in 6"><h1 click"setimg(item)"…...

大数据新视界 -- 大数据大厂之 Hive 临时表与视图:灵活数据处理的技巧(上)(29 / 30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…...

Android学习14--charger

1 概述 最近正好在做关机充电这个&#xff0c;就详细看看吧。还是本着保密的原则&#xff0c;项目里的代码也不能直接用&#xff0c;这里就用的Github的。https://github.com/aosp-mirror 具体位置是&#xff1a;https://github.com/aosp-mirror/platform_system_core/tree/mai…...

页面开发样式和布局入门:Vite + Vue 3 + Less

页面开发样式和布局入门&#xff1a;Vite Vue 3 Less 引言 在现代前端开发中&#xff0c;样式和布局是页面开发的核心部分。随着技术的不断发展&#xff0c;Vite、Vue 3和Less等工具和框架的出现&#xff0c;使得前端开发变得更加高效和灵活。然而&#xff0c;尽管这些工具…...

瑞芯微RK3566/RK3568开发板安卓11固件ROOT教程,Purple Pi OH演示

本文介绍RK3566/RK3568开发板Android11系统&#xff0c;编译ROOT权限固件的方法。触觉智能Purple Pi OH鸿蒙开发板演示&#xff0c;搭载了瑞芯微RK3566四核处理器&#xff0c;Laval鸿蒙社区推荐开发板&#xff0c;已适配全新OpenHarmony5.0 Release系统&#xff0c;SDK源码全开…...

Netty 入门应用:结合 Redis 实现服务器通信

在上篇博客中&#xff0c;我们了解了 Netty 的基本概念和架构。本篇文章将带你深入实践&#xff0c;构建一个简单的 Netty 服务端&#xff0c;并结合 Redis 实现一个数据存取的示例。在这个场景中&#xff0c;Redis 作为缓存存储&#xff0c;Netty 作为服务端处理客户端请求。通…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上&#xff0c;开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识&#xff0c;在 vs 2017 平台上&#xff0c;进行 ASP.NET 应用程序和简易网站的开发&#xff1b;初步熟悉开发一…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

P3 QT项目----记事本(3.8)

3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...

论文阅读:LLM4Drive: A Survey of Large Language Models for Autonomous Driving

地址&#xff1a;LLM4Drive: A Survey of Large Language Models for Autonomous Driving 摘要翻译 自动驾驶技术作为推动交通和城市出行变革的催化剂&#xff0c;正从基于规则的系统向数据驱动策略转变。传统的模块化系统受限于级联模块间的累积误差和缺乏灵活性的预设规则。…...

VisualXML全新升级 | 新增数据库编辑功能

VisualXML是一个功能强大的网络总线设计工具&#xff0c;专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑&#xff08;如DBC、LDF、ARXML、HEX等&#xff09;&#xff0c;并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...