Flink Window DEMO 学习
该文档演示了fink windows的操作DEMO
环境准备:
- kafka本地运行:kafka部署
- 自动生成名字代码:随机名
- 自动生成随机IP代码:随机IP
- Flink 1.18
测试数据
自动向kafka推送数据
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.example.dto.KafkaPvDto;
import com.wfg.flink.example.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDateTime;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;import static com.wfg.flink.example.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.example.constants.Constants.TOPIC_NAME;public class KafkaTestProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", KAFKA_BROKERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");try (Producer<String, String> producer = new KafkaProducer<>(props)) {int times = 100000;for (int i = 0; i < times; i++) {System.out.println("Send No. :" + i);CompletableFuture.allOf(CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer))).join();producer.flush();Random random = new Random();int randomNumber = random.nextInt(7); // 生成一个0到6的随机数Thread.sleep(1000 * randomNumber);}} catch (InterruptedException e) {throw new RuntimeException(e);}}private static void sendKafkaMsg(Producer<String, String> producer) {String msg = createMsg();System.out.println(msg);producer.send(new ProducerRecord<>(TOPIC_NAME, UUID.randomUUID().toString().replaceAll("-", ""), msg));}private static String createMsg() {KafkaPvDto dto = new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());
// DateTime begin = DateUtil.beginOfDay(new Date());
// String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");String timeStr = DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss");dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);}
}
注意:
- kafka本地运行:kafka部署
- 自动生成名字代码:随机名
- 自动生成随机IP代码:随机IP
FLINK 数据
/**** @author wfg*/
@Slf4j
public class DataSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);if (data != null) {collector.collect(new Tuple2<>(data.getUserName(), 1));}}
}
基于时间窗口
*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(30)).sum(0).print();*/env.execute("flink window example");}
}
基于滑动时间窗口
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于滑动时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(60), Time.seconds(30)).sum(0).print();env.execute("flink window example");}
}
基于事件数量窗口
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件数量窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(3).sum(0).print();env.execute("flink window example");}
}
基于事件数量滑动窗口
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件数量滑动窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(4, 3).sum(0).print();*env.execute("flink window example");}
}
基于会话时间窗口
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于会话时间窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))//表示如果 5s 内没出现数据则认为超出会话时长,然后计算这个窗口的和.sum(1).print();env.execute("flink window example");}
}
滚动窗口(Tumbling Window)
滚动窗口(Tumbling Window)
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//滚动窗口(Tumbling Window) 基于处理时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).sum(1).print();;env.execute("flink window example");}
}
基于事件时间
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(TumblingEventTimeWindows.of(Time.seconds(30))).sum(1).print();env.execute("flink window example");}
}
滑动窗口(Sliding Window)
基于处理时间
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于处理时间的 30 秒滑动窗口,滑动间隔为 10 秒data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}
基于事件时间
/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件时间的 30 秒滑动窗口,滑动间隔为 10 秒 data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}
注意:
- kafka本地运行:kafka部署
- 自动生成名字代码:随机名
- 自动生成随机IP代码:随机IP
相关文章:
Flink Window DEMO 学习
该文档演示了fink windows的操作DEMO 环境准备: kafka本地运行:kafka部署自动生成名字代码:随机名自动生成随机IP代码:随机IPFlink 1.18 测试数据 自动向kafka推送数据 import cn.hutool.core.date.DateUtil; import com.alibab…...

library source does not match the bytecode for class SpringApplication
library source does not match the bytecode for class SpringApplication 问题描述:springboot源码点进去然后download source后提示标题内容。spring版本5.2.8.RELEASE,springboot版本2.7.18 解决方法:把spring版本改为与boot版本对应的6.…...

Linux基础指令介绍与详解——原理学习
前言:本节内容标题虽然为指令,但是并不只是讲指令, 更多的是和指令相关的一些原理性的东西。 如果友友只想要查一查某个指令的用法, 很抱歉, 本节不是那种带有字典性质的文章。但是如果友友是想要来学习的,…...
【代码随想录算法训练Day52】LeetCode 647. 回文子串、LeetCode 516.最长回文子串
Day51 动态规划第十三天 LeetCode 647. 回文子串 dp数组的含义:i到j的子串是否是回文的,是的话dp[i][j]1 递推公式:if(s[i]s[j]) i j 一个元素 是回文的 |i-j|1 两个元素 是回文的 j-i>1 判断dp[i1][j-1] 初始化:全部初始化成…...

VUE项目安全漏洞扫描和修复
npm audit 1、npm audit是npm 6 新增的一个命令,可以允许开发人员分析复杂的代码并查明特定的漏洞。 2、npm audit名称执行,需要包package.json和package-lock.json文件。它是通过分析 package-lock.json 文件,继而扫描我们的包分析是否包含漏洞的。 …...

Nginx主配置文件---Nginx.conf
nginx主配置文件的模块介绍 全局块: 全局块是配置文件从开始到 events 块之间的部分,其中指令的作用域是 Nginx 服务器全局。主要指令包括: user:指定可以运行 Nginx 服务的用户和用户组,只能在全局块配置。例如&…...
IOS Swift 从入门到精通:写入 Firestore数据库
文章目录 FirestoreManager 类创建文档更新文档更新 Firestore 权限规则现在,我们想要在 Firestore 中添加或更新文档。如果您还没有,我建议您阅读有关设置 Firebase Auth 和从 Firestore 读取的部分。您必须在应用程序中启用 Firebase,并在项目中启用 Firestore 数据库,才…...

维克日记 v0.4.2:开发者友好的数字化笔记工具
维克日记,专为技术开发者和笔记爱好者设计的数字化笔记工具,以其强大的功能和灵活的配置赢得了用户的好评。软件采用Markdown语法,提供实时预览功能,让您的笔记编辑更加高效和直观。维克日记的用户界面简洁而功能齐全,…...

语音房平台交友,语聊APP系统开发线上语音交友平台成熟案例源码出售
随着移动互联网的快速发展,人们对于社交方式的需求也在不断变化,语音房平台交友语助APP作为一种新兴的社交方式,以其独特的语音交流模式和实时互动的特点,受到了越来越多用户的喜爱本文将详细介绍语音房平台交友语聊APP系统的开发…...

VMamba: Visual State Space Model论文笔记
文章目录 VMamba: Visual State Space Model摘要引言相关工作Preliminaries方法网络结构2D-Selective-Scan for Vision Data(SS2D) VMamba: Visual State Space Model 论文地址: https://arxiv.org/abs/2401.10166 代码地址: https://github.com/MzeroMiko/VMamba 摘要 卷积神…...

探索哈希函数:数据完整性的守护者
引言 银行在处理数以百万计的交易时,如何确保每一笔交易都没有出错?快递公司如何跟踪成千上万的包裹,确保每个包裹在运输过程中没有丢失或被替换?医院和诊所为庞大的患者提供有效的医疗保健服务,如何确保每个患者的医疗…...
解析Kotlin中的Unit【笔记摘要】
1. Kotlin的Unit 和 Java的void 的区别 // Java public void sayHello() {System.out.println("Hello!") }// Kotlin fun sayHello(): Unit {println("Hello!") }Unit 和 Java 的 void 真正的区别在于,void 是真的表示什么都不返回,…...

仿论坛项目--初识Spring Boot
1. 技术准备 技术架构 • Spring Boot • Spring、Spring MVC、MyBatis • Redis、Kafka、Elasticsearch • Spring Security、Spring Actuator 开发环境 • 构建工具:Apache Maven • 集成开发工具:IntelliJ IDEA • 数据库:MySQL、Redi…...

Spring boot 更改启动LOGO
在resources目录下创建banner.txt文件,然后编辑对应的图案即可 注释工具 Spring Boot Version: ${spring-boot.version},-.___,---.__ /|\ __,---,___,- \ -.____,- | -.____,- // -., | ~\ /~ | …...
python变成几个小程序
专家系统 需要建立‘capital_data.txt’ 空文件 from tkinter import Tk, simpledialog, messageboxdef read_from_file():with open(capital_data.txt) as file:for line in file:line line.rstrip(\n)country, city line.split(/)the_world[country] citydef write_to_fi…...
nginx配置stream代理
项目中遇到某些服务在内网,需要外网访问的情况,需要配置代理访问。可用nginx搭建代理服务。 TCP代理 通过nginx的stream模块可以直接代理TCP服务,步骤如下: 在/etc/nginx/下新建proxy文件夹,用于存放代理配置。此处…...

【瑞吉外卖 | day01】项目介绍+后台登录退出功能
文章目录 瑞吉外卖 — day011. 所需知识2. 软件开发整体介绍2.1 软件开发流程2.2 角色分工2.3 软件环境 3. 瑞吉外卖项目介绍3.1 项目介绍3.2 产品原型展示3.3 技术选型3.4 功能架构3.5 角色 4. 开发环境搭建4.1 数据库环境搭建4.2 Maven项目构建 5. 后台系统登录功能5.1 创建需…...

关于批量采集1688商品主图及链接的方式:软件采集/1688官方API接口数据采集
关于批量采集,我们通常用到的是软件 采集,或者通过1688官方API数据采集的形式:用户输入一组1688商品ID,一行一个,流程会自动逐个打开对应的1688商品详情页,采集主图的所有链接。 结果保存为表格的一行&…...
Shell 获取Hive表的location 信息
用shell 获取建表语句: hive -e "show create table ods_job.ods_job_tb"得到结果: CREATE TABLE ods_job.ods_job_tb(id bigint COMMENT id, auto int COMMENT job开启/关闭:0-关闭;1-开启, ....timeout_kill string…...
从零搭建教育管理系统:Java + Vue.js 教学-02
第三步:创建实体类和 Mapper 接口 现在我们已经设计好了数据库表,接下来使用 MyBatis-Plus 将这些表映射到 Java 对象,以便在代码中轻松地进行操作。 1. 创建实体类 在 src/main/java/<your_package>/entity 目录下 (如果没有该目录,请手动创建),创建与数据库表对应…...
零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?
一、核心优势:专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发,是一款收费低廉但功能全面的Windows NAS工具,主打“无学习成本部署” 。与其他NAS软件相比,其优势在于: 无需硬件改造:将任意W…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...

自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...

Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...