Flink Window DEMO 学习
该文档演示了fink windows的操作DEMO
环境准备:
- kafka本地运行:kafka部署
- 自动生成名字代码:随机名
- 自动生成随机IP代码:随机IP
- Flink 1.18
测试数据
自动向kafka推送数据
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.example.dto.KafkaPvDto;
import com.wfg.flink.example.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDateTime;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;import static com.wfg.flink.example.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.example.constants.Constants.TOPIC_NAME;public class KafkaTestProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", KAFKA_BROKERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");try (Producer<String, String> producer = new KafkaProducer<>(props)) {int times = 100000;for (int i = 0; i < times; i++) {System.out.println("Send No. :" + i);CompletableFuture.allOf(CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer))).join();producer.flush();Random random = new Random();int randomNumber = random.nextInt(7); // 生成一个0到6的随机数Thread.sleep(1000 * randomNumber);}} catch (InterruptedException e) {throw new RuntimeException(e);}}private static void sendKafkaMsg(Producer<String, String> producer) {String msg = createMsg();System.out.println(msg);producer.send(new ProducerRecord<>(TOPIC_NAME, UUID.randomUUID().toString().replaceAll("-", ""), msg));}private static String createMsg() {KafkaPvDto dto = new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());
// DateTime begin = DateUtil.beginOfDay(new Date());
// String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");String timeStr = DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss");dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);}
}
注意:
- kafka本地运行:kafka部署
- 自动生成名字代码:随机名
- 自动生成随机IP代码:随机IP
FLINK 数据
/**** @author wfg*/
@Slf4j
public class DataSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);if (data != null) {collector.collect(new Tuple2<>(data.getUserName(), 1));}}
}
基于时间窗口
*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(30)).sum(0).print();*/env.execute("flink window example");}
}
基于滑动时间窗口
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于滑动时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(60), Time.seconds(30)).sum(0).print();env.execute("flink window example");}
}
基于事件数量窗口
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件数量窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(3).sum(0).print();env.execute("flink window example");}
}
基于事件数量滑动窗口
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件数量滑动窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(4, 3).sum(0).print();*env.execute("flink window example");}
}
基于会话时间窗口
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于会话时间窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))//表示如果 5s 内没出现数据则认为超出会话时长,然后计算这个窗口的和.sum(1).print();env.execute("flink window example");}
}
滚动窗口(Tumbling Window)
滚动窗口(Tumbling Window)
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//滚动窗口(Tumbling Window) 基于处理时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).sum(1).print();;env.execute("flink window example");}
}
基于事件时间
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(TumblingEventTimeWindows.of(Time.seconds(30))).sum(1).print();env.execute("flink window example");}
}
滑动窗口(Sliding Window)
基于处理时间
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于处理时间的 30 秒滑动窗口,滑动间隔为 10 秒data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}
基于事件时间
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件时间的 30 秒滑动窗口,滑动间隔为 10 秒 data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}
注意:
- kafka本地运行:kafka部署
- 自动生成名字代码:随机名
- 自动生成随机IP代码:随机IP
相关文章:
Flink Window DEMO 学习
该文档演示了fink windows的操作DEMO 环境准备: kafka本地运行:kafka部署自动生成名字代码:随机名自动生成随机IP代码:随机IPFlink 1.18 测试数据 自动向kafka推送数据 import cn.hutool.core.date.DateUtil; import com.alibab…...
library source does not match the bytecode for class SpringApplication
library source does not match the bytecode for class SpringApplication 问题描述:springboot源码点进去然后download source后提示标题内容。spring版本5.2.8.RELEASE,springboot版本2.7.18 解决方法:把spring版本改为与boot版本对应的6.…...
Linux基础指令介绍与详解——原理学习
前言:本节内容标题虽然为指令,但是并不只是讲指令, 更多的是和指令相关的一些原理性的东西。 如果友友只想要查一查某个指令的用法, 很抱歉, 本节不是那种带有字典性质的文章。但是如果友友是想要来学习的,…...
【代码随想录算法训练Day52】LeetCode 647. 回文子串、LeetCode 516.最长回文子串
Day51 动态规划第十三天 LeetCode 647. 回文子串 dp数组的含义:i到j的子串是否是回文的,是的话dp[i][j]1 递推公式:if(s[i]s[j]) i j 一个元素 是回文的 |i-j|1 两个元素 是回文的 j-i>1 判断dp[i1][j-1] 初始化:全部初始化成…...
VUE项目安全漏洞扫描和修复
npm audit 1、npm audit是npm 6 新增的一个命令,可以允许开发人员分析复杂的代码并查明特定的漏洞。 2、npm audit名称执行,需要包package.json和package-lock.json文件。它是通过分析 package-lock.json 文件,继而扫描我们的包分析是否包含漏洞的。 …...
Nginx主配置文件---Nginx.conf
nginx主配置文件的模块介绍 全局块: 全局块是配置文件从开始到 events 块之间的部分,其中指令的作用域是 Nginx 服务器全局。主要指令包括: user:指定可以运行 Nginx 服务的用户和用户组,只能在全局块配置。例如&…...
IOS Swift 从入门到精通:写入 Firestore数据库
文章目录 FirestoreManager 类创建文档更新文档更新 Firestore 权限规则现在,我们想要在 Firestore 中添加或更新文档。如果您还没有,我建议您阅读有关设置 Firebase Auth 和从 Firestore 读取的部分。您必须在应用程序中启用 Firebase,并在项目中启用 Firestore 数据库,才…...
维克日记 v0.4.2:开发者友好的数字化笔记工具
维克日记,专为技术开发者和笔记爱好者设计的数字化笔记工具,以其强大的功能和灵活的配置赢得了用户的好评。软件采用Markdown语法,提供实时预览功能,让您的笔记编辑更加高效和直观。维克日记的用户界面简洁而功能齐全,…...
语音房平台交友,语聊APP系统开发线上语音交友平台成熟案例源码出售
随着移动互联网的快速发展,人们对于社交方式的需求也在不断变化,语音房平台交友语助APP作为一种新兴的社交方式,以其独特的语音交流模式和实时互动的特点,受到了越来越多用户的喜爱本文将详细介绍语音房平台交友语聊APP系统的开发…...
VMamba: Visual State Space Model论文笔记
文章目录 VMamba: Visual State Space Model摘要引言相关工作Preliminaries方法网络结构2D-Selective-Scan for Vision Data(SS2D) VMamba: Visual State Space Model 论文地址: https://arxiv.org/abs/2401.10166 代码地址: https://github.com/MzeroMiko/VMamba 摘要 卷积神…...
探索哈希函数:数据完整性的守护者
引言 银行在处理数以百万计的交易时,如何确保每一笔交易都没有出错?快递公司如何跟踪成千上万的包裹,确保每个包裹在运输过程中没有丢失或被替换?医院和诊所为庞大的患者提供有效的医疗保健服务,如何确保每个患者的医疗…...
解析Kotlin中的Unit【笔记摘要】
1. Kotlin的Unit 和 Java的void 的区别 // Java public void sayHello() {System.out.println("Hello!") }// Kotlin fun sayHello(): Unit {println("Hello!") }Unit 和 Java 的 void 真正的区别在于,void 是真的表示什么都不返回,…...
仿论坛项目--初识Spring Boot
1. 技术准备 技术架构 • Spring Boot • Spring、Spring MVC、MyBatis • Redis、Kafka、Elasticsearch • Spring Security、Spring Actuator 开发环境 • 构建工具:Apache Maven • 集成开发工具:IntelliJ IDEA • 数据库:MySQL、Redi…...
Spring boot 更改启动LOGO
在resources目录下创建banner.txt文件,然后编辑对应的图案即可 注释工具 Spring Boot Version: ${spring-boot.version},-.___,---.__ /|\ __,---,___,- \ -.____,- | -.____,- // -., | ~\ /~ | …...
python变成几个小程序
专家系统 需要建立‘capital_data.txt’ 空文件 from tkinter import Tk, simpledialog, messageboxdef read_from_file():with open(capital_data.txt) as file:for line in file:line line.rstrip(\n)country, city line.split(/)the_world[country] citydef write_to_fi…...
nginx配置stream代理
项目中遇到某些服务在内网,需要外网访问的情况,需要配置代理访问。可用nginx搭建代理服务。 TCP代理 通过nginx的stream模块可以直接代理TCP服务,步骤如下: 在/etc/nginx/下新建proxy文件夹,用于存放代理配置。此处…...
【瑞吉外卖 | day01】项目介绍+后台登录退出功能
文章目录 瑞吉外卖 — day011. 所需知识2. 软件开发整体介绍2.1 软件开发流程2.2 角色分工2.3 软件环境 3. 瑞吉外卖项目介绍3.1 项目介绍3.2 产品原型展示3.3 技术选型3.4 功能架构3.5 角色 4. 开发环境搭建4.1 数据库环境搭建4.2 Maven项目构建 5. 后台系统登录功能5.1 创建需…...
关于批量采集1688商品主图及链接的方式:软件采集/1688官方API接口数据采集
关于批量采集,我们通常用到的是软件 采集,或者通过1688官方API数据采集的形式:用户输入一组1688商品ID,一行一个,流程会自动逐个打开对应的1688商品详情页,采集主图的所有链接。 结果保存为表格的一行&…...
Shell 获取Hive表的location 信息
用shell 获取建表语句: hive -e "show create table ods_job.ods_job_tb"得到结果: CREATE TABLE ods_job.ods_job_tb(id bigint COMMENT id, auto int COMMENT job开启/关闭:0-关闭;1-开启, ....timeout_kill string…...
从零搭建教育管理系统:Java + Vue.js 教学-02
第三步:创建实体类和 Mapper 接口 现在我们已经设计好了数据库表,接下来使用 MyBatis-Plus 将这些表映射到 Java 对象,以便在代码中轻松地进行操作。 1. 创建实体类 在 src/main/java/<your_package>/entity 目录下 (如果没有该目录,请手动创建),创建与数据库表对应…...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...
Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换
目录 关键点 技术实现1 技术实现2 摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车…...
抽象类和接口(全)
一、抽象类 1.概念:如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象,这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法,包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中,⼀个类如果被 abs…...
【UE5 C++】通过文件对话框获取选择文件的路径
目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 ,这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器,右键点击 .uproject 文件,选择 "Generate Visual Studio project files",重…...
智能职业发展系统:AI驱动的职业规划平台技术解析
智能职业发展系统:AI驱动的职业规划平台技术解析 引言:数字时代的职业革命 在当今瞬息万变的就业市场中,传统的职业规划方法已无法满足个人和企业的需求。据统计,全球每年有超过2亿人面临职业转型困境,而企业也因此遭…...
《信号与系统》第 6 章 信号与系统的时域和频域特性
目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...
【工具教程】多个条形码识别用条码内容对图片重命名,批量PDF条形码识别后用条码内容批量改名,使用教程及注意事项
一、条形码识别改名使用教程 打开软件并选择处理模式:打开软件后,根据要处理的文件类型,选择 “图片识别模式” 或 “PDF 识别模式”。如果是处理包含条形码的 PDF 文件,就选择 “PDF 识别模式”;若是处理图片文件&…...
