当前位置: 首页 > news >正文

flinkSql中累计窗口CUMULATE

eventTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _05_flinkSql_Cumulate_eventTime {/*** 累积窗口 + eventTime* 1 分钟 每十秒计算一次 3秒水印* 数据格式* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:43.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:53.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:03.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:13.000"}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` TIMESTAMP(3),\n" +"   watermark for event_time as event_time - interval '3' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

processTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _06_flinkSql_Cumulate_processTime {/*** 累积窗口 + processTime* 1 分钟 每十秒计算一次* 数据格式* {"username":"zs","price":20}* {"username":"lisi","price":15}* {"username":"lisi","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` as proctime()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

topN案例

需求:在每个分钟内找出点击量最多的Top 3网页。 滚动窗口(1分钟)+eventTime+3秒水印hive sqlwith t1 as (select page_id,sum(clicks)  totalSum  from  table1group by page_id
), t2 as(select page_id,totalSum,row_number() over ( order by totalSum desc) px from t1 
) select  * from t2 where px <=3flink sqlwith t1 as (select window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(event_time), INTERVAL '60' second )) group by window_start,window_end,page_id
), t2 as(select window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 
) select  * from t2 where px <=3* 数据格式
{"ts": "2023-09-05 12:00:10", "page_id": 1, "clicks": 100}
{"ts": "2023-09-05 12:00:20", "page_id": 2, "clicks": 90}
{"ts": "2023-09-05 12:00:30", "page_id": 3, "clicks": 110}
{"ts": "2023-09-05 12:00:40", "page_id": 4, "clicks": 23}
{"ts": "2023-09-05 12:00:50", "page_id": 5, "clicks": 456}
{"ts": "2023-09-05 12:00:55", "page_id": 5, "clicks": 456}
// 触发数据
{"ts": "2023-09-05 12:01:03", "page_id": 5, "clicks": 456}
package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _07_flinkSql_topN {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表//3. 通过sql语句统计结果tenv.executeSql("CREATE TABLE table1 (\n" +"    `page_id` INT,\n" +"    `clicks` INT,\n" +"  `ts` TIMESTAMP(3) ,\n" +"   watermark for ts as ts - interval '3' second \n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");tenv.executeSql("with t1 as (\n" +"\tselect window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(ts), INTERVAL '60' second )) group by window_start,window_end,page_id\n" +"), t2 as(\n" +"\tselect window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 \n" +") select  * from t2 where px <=3").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

相关文章:

flinkSql中累计窗口CUMULATE

eventTime package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _05_flinkSql_Cumulate_eventTime {/*** 累积窗口 eventTime* …...

关于在ubuntu上无法运行EasyConnect的解决方法

需要这三个文件 libpangocairo-1.0-0_1.40.14-1_amd64.deb libpangoft2-1.0-0_1.40.14-1_amd64.deb libpango-1.0-0_1.40.14-1_amd64.deb然后执行 cp source /usr/share/sangfor/EasyConnect再重启EasyConnect即可 下载链接 http://kr.archive.ubuntu.com/ubuntu/pool/main/…...

【Axure高保真原型】数值条件分组

今天和大家分享数值条件分组的原型模板&#xff0c;效果包括&#xff1a; 点击添加分组按钮&#xff0c;可以显示添加弹窗&#xff0c;填写分组名称和数值区间后&#xff0c;可以新增该分组信息‘’ 修改分组区间&#xff0c;可以直接在输入框里修改已有的分组区间&#xff0c…...

python学习——字符串的拼接操作

在Python中&#xff0c;字符串拼接是一项基本操作&#xff0c;用于将多个字符串合并成一个字符串。以下是几种常见的字符串拼接方式&#xff1a; 1. 使用 运算符 最简单和直接的方式是使用 运算符来拼接字符串。 str1 "Hello, " str2 "World!" resu…...

多线程篇-8--线程安全(死锁,常用保障安全的方法,安全容器,原子类,Fork/Join框架等)

1、线程安全和不安全定义 &#xff08;1&#xff09;、线程安全 线程安全是指一个类或方法在被多个线程访问的情况下可以正确得到结果&#xff0c;不会出现数据不一致或其他错误行为。 线程安全的条件 1、原子性&#xff08;Atomicity&#xff09; 多个操作要么全部完成&a…...

el-select的搜索功能

el-select的相关信息&#xff1a; 最基本信息 v-model的值为当前被选中的el-option的 value 属性值 :label是选择器可以看到的内容 过滤搜索 普通过滤搜索 <el-selectv-model"selectedCountry"placeholder"请选择国家"filterable:loading"lo…...

MFC实现全屏功能

之前全屏都是参考&#xff1a; MFC单文档&#xff08;SDI&#xff09;全屏程序的实现 主要思路就是将各种菜单工具栏隐藏恢复。 随着MFC的升级&#xff0c;MFC框架本身就具备了全屏的功能。 微软有一个全屏实现类&#xff1a; CFullScreenImpl Class managing full-screen mod…...

网络安全技术详解:虚拟专用网络(VPN) 安全信息与事件管理(SIEM)

虚拟专用网络&#xff08;VPN&#xff09;详细介绍 虚拟专用网络&#xff08;VPN&#xff09;通过在公共网络上创建加密连接来保护数据传输的安全性和隐私性。 工作原理 VPN的工作原理涉及建立安全隧道和数据加密&#xff1a; 隧道协议&#xff1a;使用协议如PPTP、L2TP/IP…...

v-model 根据后端接口返回的数据动态地确定要绑定的变量

在 Vue 中&#xff0c;v-model 是用于创建双向绑定的指令。通常&#xff0c;它用于与组件或表单元素的值进行绑定。但有时你可能需要根据后端接口返回的数据动态地确定要绑定的变量。 你可以通过以下步骤来实现这个需求&#xff1a; 步骤 1: 获取后端接口数据 首先&#xff…...

图形开发基础之在WinForms中使用OpenTK.GLControl进行图形绘制

前言 GLControl 是 OpenTK 库中一个重要的控件&#xff0c;专门用于在 Windows Forms 应用程序中集成 OpenGL 图形渲染。通过 GLControl&#xff0c;可以轻松地将 OpenGL 的高性能图形绘制功能嵌入到传统的桌面应用程序中。 1. GLControl 的核心功能 OpenGL 渲染上下文&…...

离散数学重点复习

第一章.集合论 概念 1.集合是不能精确定义的基本数学概念.通常是由指定范围内的满足给定条件的所有对象聚集在一起构成的 2.制定范围内的每一个对象称为这个集合的元素 3.固定符号如下: N:自然数集合 Z:整数集合 Q:有理数集合 R:实数集合 C:复数集合 4.集合中的元素是…...

Javaweb梳理21——Servlet

Javaweb梳理21——Servlet 21 Servlet21.1 简介21.3 执行流程21.4 生命周期4.5 方法介绍21.6 体系结构21.7 urlPattern配置21.8 XML配置 21 Servlet 21.1 简介 Servlet是JavaWeb最为核心的内容&#xff0c;它是Java提供的一门动态web资源开发技术。使用Servlet就可以实现&…...

推荐学习笔记:矩阵补充和矩阵分解

参考&#xff1a; 召回 fun-rec/docs/ch02/ch2.1/ch2.1.1/mf.md at master datawhalechina/fun-rec GitHub 业务 隐语义模型与矩阵分解 协同过滤算法的特点&#xff1a; 协同过滤算法的特点就是完全没有利用到物品本身或者是用户自身的属性&#xff0c; 仅仅利用了用户与…...

etcd分布式存储系统快速入门指南

在分布式系统的复杂世界中&#xff0c;确保有效的数据管理至关重要。分布式可靠的键值存储在维护跨分布式环境的数据一致性和可伸缩性方面起着关键作用。 在这个全面的教程中&#xff0c;我们将深入研究etcd&#xff0c;这是一个开源的分布式键值存储。我们将探索其基本概念、特…...

解决VUE3 Vite打包后动态图片资源不显示问题

解决VUE3 Vite打包后动态图片资源不显示问题 <script setup> let url ref()const setimg (item)>{let src ../assets/image/${e}.pngurl.value src }</script><template><div v-for"item in 6"><h1 click"setimg(item)"…...

大数据新视界 -- 大数据大厂之 Hive 临时表与视图:灵活数据处理的技巧(上)(29 / 30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…...

Android学习14--charger

1 概述 最近正好在做关机充电这个&#xff0c;就详细看看吧。还是本着保密的原则&#xff0c;项目里的代码也不能直接用&#xff0c;这里就用的Github的。https://github.com/aosp-mirror 具体位置是&#xff1a;https://github.com/aosp-mirror/platform_system_core/tree/mai…...

页面开发样式和布局入门:Vite + Vue 3 + Less

页面开发样式和布局入门&#xff1a;Vite Vue 3 Less 引言 在现代前端开发中&#xff0c;样式和布局是页面开发的核心部分。随着技术的不断发展&#xff0c;Vite、Vue 3和Less等工具和框架的出现&#xff0c;使得前端开发变得更加高效和灵活。然而&#xff0c;尽管这些工具…...

瑞芯微RK3566/RK3568开发板安卓11固件ROOT教程,Purple Pi OH演示

本文介绍RK3566/RK3568开发板Android11系统&#xff0c;编译ROOT权限固件的方法。触觉智能Purple Pi OH鸿蒙开发板演示&#xff0c;搭载了瑞芯微RK3566四核处理器&#xff0c;Laval鸿蒙社区推荐开发板&#xff0c;已适配全新OpenHarmony5.0 Release系统&#xff0c;SDK源码全开…...

Netty 入门应用:结合 Redis 实现服务器通信

在上篇博客中&#xff0c;我们了解了 Netty 的基本概念和架构。本篇文章将带你深入实践&#xff0c;构建一个简单的 Netty 服务端&#xff0c;并结合 Redis 实现一个数据存取的示例。在这个场景中&#xff0c;Redis 作为缓存存储&#xff0c;Netty 作为服务端处理客户端请求。通…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来

一、破局&#xff1a;PCB行业的时代之问 在数字经济蓬勃发展的浪潮中&#xff0c;PCB&#xff08;印制电路板&#xff09;作为 “电子产品之母”&#xff0c;其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透&#xff0c;PCB行业面临着前所未有的挑战与机遇。产品迭代…...

Debian系统简介

目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版&#xff…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...

mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包

文章目录 现象&#xff1a;mysql已经安装&#xff0c;但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时&#xff0c;可能是因为以下几个原因&#xff1a;1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

ABAP设计模式之---“简单设计原则(Simple Design)”

“Simple Design”&#xff08;简单设计&#xff09;是软件开发中的一个重要理念&#xff0c;倡导以最简单的方式实现软件功能&#xff0c;以确保代码清晰易懂、易维护&#xff0c;并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计&#xff0c;遵循“让事情保…...

并发编程 - go版

1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程&#xff0c;系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...