Flink 中的滚动策略(Rolling Policy)
在 Apache Flink 中,滚动策略(Rolling Policy)是针对日志(或数据流)文件输出的一种管理策略,它决定了在日志文件的大小、时间或其他条件满足特定标准时,如何“滚动”生成新的日志文件。滚动策略常用于处理较大的数据流文件,避免单个文件过大导致存储和处理困难。
1. 滚动策略的作用
在 Flink 中,当作业的输出是通过文件系统(如 HDFS、S3、本地文件系统等)进行持久化时,往往会遇到生成的文件越来越大的问题。滚动策略能够在文件达到某个阈值时自动生成新文件,确保文件的大小在可接受的范围内,从而提高数据处理的可管理性和性能。
2. 滚动策略的基本类型
Flink 提供了几种常见的滚动策略来控制文件的滚动行为。以下是几种常见的策略:
(1) 基于文件大小的滚动策略(Size-based Rolling)
当文件的大小超过一个预设的阈值时,文件会自动“滚动”到一个新的文件中,旧的文件会被关闭,新的文件开始接收数据。
- 适用场景:适用于对文件大小有严格要求的场景,特别是当文件过大时会影响系统性能或数据分析的效率。
- 配置:通常通过配置
maxFileSize来设置最大文件大小。
示例:
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 1024) // 设置最大文件大小为 1GB.build();
(2) 基于时间的滚动策略(Time-based Rolling)
基于时间的滚动策略根据时间间隔来决定何时滚动文件,通常以分钟、小时或天为单位进行滚动。比如,每小时生成一个新的文件。
- 适用场景:适用于数据有时间要求的场景,比如需要按小时、按天划分存储的数据。
- 配置:可以配置时间间隔,例如通过
rolloverInterval设置文件滚动的时间间隔。
示例:
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withRolloverInterval(60000L) // 每 60 秒滚动一次.build();
(3) 基于事件数量的滚动策略(Count-based Rolling)
事件数量滚动策略根据文件中的事件数量来决定何时滚动文件。比如,当文件中累积了 10000 个事件后,文件会自动滚动。
- 适用场景:适用于事件生成速率较快且文件大小不易预测的情况。
- 配置:通过
maxPartCount设置文件中的最大事件数。
示例:
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartCount(10000) // 设置每个文件最多包含 10000 个事件.build();
3. Flink 中的文件滚动配置
在 Flink 中,滚动策略通常是与 Flink 的 FileSink 配合使用的。你可以为输出的文件设置滚动策略,并定义如何滚动文件。
4. RollingPolicy 配置
Flink 提供了一个 RollingPolicy 接口,默认的实现是 DefaultRollingPolicy,它支持多种方式来配置文件滚动:
withMaxPartSize(long maxSize):设置单个文件的最大大小,当文件大小超过这个限制时,Flask 会滚动生成新文件。withRolloverInterval(long interval):设置文件的滚动时间间隔,单位是毫秒。withMaxPartCount(long maxPartCount):设置每个文件的最大事件数。
5. 使用示例
假设我们有一个 Flink 作业,将数据输出到 HDFS,并希望使用滚动策略来管理文件。我们可以通过以下方式设置文件大小滚动策略:
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.RollingPolicy;public class RollingPolicyExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 假设有一个简单的字符串数据流DataStream<String> stream = env.fromElements("Hello", "Flink", "Rolling", "Policy");// 设置滚动策略:文件大小达到 100MB 时滚动RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 100) // 100MB.withRolloverInterval(60000L) // 每 60 秒滚动一次.build();// 使用 FileSink 来输出数据到 HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path("hdfs://path/to/output"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(rollingPolicy).build();// 将数据流写入到文件stream.addSink(sink);env.execute("Flink Rolling Policy Example");}
}
在上面的代码中,我们为 FileSink 设置了基于文件大小和时间的滚动策略。文件大小超过 100MB 或者每 60 秒就会滚动一次,确保输出文件不会无限增大。
6. 滚动策略的选择与最佳实践
-
基于文件大小的滚动:适用于文件内容量预期较为稳定且文件大小有上限要求的情况。如果数据量大或变动较小,可以选择文件大小滚动策略。
-
基于时间的滚动:适用于对时间敏感的数据处理需求,比如日志数据、定时任务的输出等。基于时间滚动策略通常有固定的时间间隔,适合实时性要求高的场景。
-
基于事件数的滚动:适用于处理事件生成速率不确定,但希望文件滚动与事件数量挂钩的情况。比如,高速日志记录系统或事件驱动系统。
7. 总结
Flink 的滚动策略(Rolling Policy)是一个非常重要的功能,尤其在处理大量数据输出时,能帮助管理文件的大小、滚动周期和数据的合理分配。通过合理配置 RollingPolicy,开发者可以灵活地管理输出文件,提升系统的可扩展性和存储效率。选择合适的滚动策略可以根据数据量、时间需求以及事件生成的速率来制定最合适的策略。
相关文章:
Flink 中的滚动策略(Rolling Policy)
在 Apache Flink 中,滚动策略(Rolling Policy)是针对日志(或数据流)文件输出的一种管理策略,它决定了在日志文件的大小、时间或其他条件满足特定标准时,如何“滚动”生成新的日志文件。滚动策略…...
GPU和FPGA的区别
GPU(Graphics Processing Unit,图形处理器)和 FPGA(Field-Programmable Gate Array,现场可编程门阵列)不是同一种硬件。 我的理解是,虽然都可以用于并行计算,但是GPU是纯计算的硬件…...
网易云音乐分布式KV存储实践与演进
随着网易云音乐业务的快速发展,推荐和搜索场景对分布式KV存储的需求日益增长。本文将深入探讨网易云音乐在分布式KV存储方面的实践和演进,分析其技术选型、架构设计以及未来发展方向。 一、业务背景 网易云音乐的业务场景对分布式KV存储提出了高并发、…...
WordPress平台如何接入Deepseek,有效提升网站流量
深夜改代码到崩溃?《2024全球CMS生态报告》揭露:78%的WordPress站长因API对接复杂,错失AI内容红利。本文实测「零代码接入Deepseek」的保姆级方案,配合147SEO的智能发布系统,让你用3个步骤实现日均50篇EEAT合规内容自动…...
【嵌入式】STM32内部NOR Flash磨损平衡与掉电保护总结
1. NOR Flash与NAND Flash 先deepseek看结论: 特性Nor FlashNAND Flash读取速度快(支持随机访问,直接执行代码)较慢(需按页顺序读取)写入/擦除速度慢(擦除需5秒,写入需逐字节操作&…...
什么是磁盘阵列(RAID)?如何提高磁盘阵列的性能
什么是磁盘阵列 磁盘阵列(RAID)是一种将多个独立的硬盘组合成一个逻辑存储单元的技术,旨在提高数据存储的性能、容量、可靠性和冗余性。磁盘阵列通过将数据分割成多个区段并分别存储在不同的硬盘上,利用个别磁盘提供数据加…...
轻量级日志管理平台Grafana Loki
文章目录 轻量级日志管理平台Grafana Loki背景什么是Loki为什么使用 Grafana Loki?架构Log Storage Grafana部署使用基于 Docker Compose 安装 LokiMinIO K8s集群部署Loki采集Helm 部署方式和案例 参考 轻量级日志管理平台Grafana Loki 背景 在微服务以及云原生时…...
k8s集群部署
集群结构 角色IPmaster192.168.35.135node1192.168.35.136node2192.168.35.137 部署 #需在三台主机上操作 //关闭防火墙 [rootmaster ~]# systemctl disable --now firewalld//关闭selinux [rootmaster ~]# sed -i s/enforcing/disabled/ /etc/selinux/config//关闭swap分区…...
STM32MP157A-FSMP1A单片机移植Linux系统SPI总线驱动
SPI总线驱动整体上与I2C总线驱动类型,差别主要在设备树和数据传输上,由于SPI是由4根线实现主从机的通信,在设备树上配置时需要对SPI进行设置。 原理图可知,数码管使用的SPI4对应了单片机上的PE11-->SPI4-NSS,PE12-->SPI4-S…...
系统基础与管理(2025更新中)
一、Linux 核心架构与组件 内核架构 核心职责: 管理进程生命周期、内存分配、硬件驱动交互及文件系统操作。 模块化设计支持动态加载硬件驱动(如modprobe加载内核模块),提升灵活性和扩展性。 内存管理:…...
Python--内置函数与推导式(下)
3. 内置函数 数学运算类 函数说明示例abs绝对值abs(-10) → 10pow幂运算pow(2, 3) → 8sum求和sum([1,2,3]) → 6divmod返回商和余数divmod(10, 3) → (3, 1) 数据转换类 # 进制转换 print(bin(10)) # 0b1010 print(hex(255)) # 0x…...
可狱可囚的爬虫系列课程 14:10 秒钟编写一个 requests 爬虫
一、前言 当重复性的工作频繁发生时,各种奇奇怪怪提高效率的想法就开始萌芽了。当重复代码的模块化封装已经不能满足要求的时候,更高效的方式就被揭开了神秘的面纱。本文基于这样的想法,来和大家探讨如何 10 秒钟编写一个 requests 爬虫程序。…...
Windows golang安装和环境配置
【1】、golang 1.19 sdk下载 https://download.csdn.net/download/notfindjob/90422529 【2】、安装 【3】、配置 GOPATH目录 【4】、LiteIDE下载安装 https://download.csdn.net/download/notfindjob/90422580 【5】、打开LiteIDE,选择查看->管理GOPATH&…...
IP-------GRE和MGRE
4.GRE和MGRE 1.应用场景 现实场景 居家工作,公司工作,分公司工作----------需要传输交换数据--------NAT---在该场景中需要两次NAT(不安全) 为了安全有两种手段-----1.物理专线---成本高 2.VPN--虚拟专用网---隧道技术--封装技…...
LabVIEW形状误差测量系统
在机械制造领域,形状与位置公差(GD&T)直接影响装配精度与产品寿命。国内中小型机加工企业因形状误差导致的返工率高达12%-18%。传统测量方式存在以下三大痛点: 设备局限:机械式千分表需人工读数,精度…...
django校园互助平台~源码
博主介绍:✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…...
Vue进阶之AI智能助手项目(五)——ChatGPT的调用和开发
AI智能助手项目 前端页面Layout布局页面-viewssrc/views/chat/layout/Permission.vuesrc/views/chat/layout/sider/index.vuesrc/views/chat/layout/sider/List.vuesrc/views/chat/layout/sider/Footer.vueComponents 组件Header/index.vueMessage/index.vue前端页面 Layout布…...
Jenkins重启后Maven的Project加载失败
个人博客地址:Jenkins重启后Maven的Project加载失败 | 一张假钞的真实世界 Jenkins重启后发现Maven的项目都没有正常加载。检查Jenkins的启动日志发现以下错误信息: java.io.IOException: Unable to read /home/jenkins/.jenkins/jobs/test-maven/conf…...
【docker】docker pull拉取中不断重复下载问题,解决方案之一,磁盘空间扩容
问题类似这样 存储空间不足 如果 Docker 运行环境的磁盘空间不足,拉取的镜像可能会被自动清理,导致重复下载。可以检查磁盘使用情况: df -h docker system df如果空间不足,可以清理一些不用的容器和镜像: docker sy…...
Ubuntu指令(一)
一、终端操作指令 打开终端,有两种便捷方式: 直接点击系统中的终端按钮;使用快捷键ctrl alt T。 关闭终端,同样有多种选择: 在终端输入exit指令;使用快捷键ctrl d;直接点击终端窗口的关闭…...
深入解析x86控制寄存器CR0:从分页机制到写保护的关键作用
1. CR0寄存器:x86架构的"控制中枢" 如果把CPU比作计算机的大脑,那么CR0寄存器就像是这个大脑的"控制面板"。这个32位的特殊寄存器直接决定了处理器如何管理内存、如何处理异常、甚至如何执行最基本的指令。我第一次在内核源码中看到…...
DeOldify GPU算力优化教程:显存占用控制与推理速度提升技巧
DeOldify GPU算力优化教程:显存占用控制与推理速度提升技巧 1. 项目简介与优化价值 DeOldify是一个基于深度学习技术的黑白图像上色工具,它使用U-Net架构结合ResNet编码器来实现高质量的图像色彩还原。虽然这个工具使用起来很简单,但在实际…...
PyTorch自定义损失超简单
💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 PyTorch自定义损失函数:轻松实现的秘诀目录PyTorch自定义损失函数:轻松实现的秘诀 引言:打破…...
AI股票分析师场景应用:快速搭建本地化金融分析工具全流程
AI股票分析师场景应用:快速搭建本地化金融分析工具全流程 1. 引言:金融分析的智能化转型 在金融投资领域,及时获取专业分析报告是做出投资决策的关键。传统方式需要依赖券商研究报告或付费咨询,不仅成本高昂,还存在隐…...
nginx+rtmp实现直播完整流程
一,环境准备 1.下载nginx-rtmp-module: 1 2 cd /www/server/ git clone https://github.com/arut/nginx-rtmp-module.git 2.Nginx安装: 这是用了宝塔哈。 软件商店 > 应用搜索:nginx > 安装 > 编译安装 > 添加自定义模块 模块…...
The Ultimate Guide to Ruby Timeouts:如何为第三方服务API设置超时
The Ultimate Guide to Ruby Timeouts:如何为第三方服务API设置超时 【免费下载链接】the-ultimate-guide-to-ruby-timeouts Timeouts for popular Ruby gems 项目地址: https://gitcode.com/gh_mirrors/th/the-ultimate-guide-to-ruby-timeouts 在Ruby开发中…...
保姆级教程:STM32+ESP8266接入机智云,从零完成数据点上报与APP控制
STM32与ESP8266接入机智云实战:从数据点定义到APP控制全解析 在智能硬件开发领域,快速实现设备联网与远程控制是许多嵌入式工程师面临的挑战。本文将手把手带您完成一个基于STM32和ESP8266的智能温湿度监测系统,从机智云平台配置到代码移植&a…...
阿里云专有云网络架构
一、 网络设备角色详解(基于阿里云飞天网络架构) 结合 v3.18.6r 版本特性,对图中各缩写设备进行标准化定义:设备缩写全称在单元Region中的核心职责1659台规模下的配置建议NCNode Controller物理服务器节点。包含计算节点ÿ…...
CSS如何提高团队协作效率_推广BEM规范减少样式沟通成本
BEM命名能减少CSS样式扯皮,因其类名明确表达“是什么、在哪用、干什么”,如header__logo--dark精准锁定作用域和上下文,避免复用冲突与逻辑覆盖。为什么BEM命名能减少CSS样式扯皮因为类名本身说了清楚“这是啥、在哪用、干啥的”,…...
深入解析Linux CMA内存管理机制及其优化策略
1. Linux CMA内存管理机制揭秘 第一次在嵌入式设备上调试摄像头驱动时,我遇到了一个棘手的问题:系统总是无法分配足够大的连续内存块。经过三天三夜的排查,终于发现是CMA配置不当导致的。这段经历让我深刻认识到理解CMA机制的重要性。 CMA&am…...
