Flink 中的滚动策略(Rolling Policy)
在 Apache Flink 中,滚动策略(Rolling Policy)是针对日志(或数据流)文件输出的一种管理策略,它决定了在日志文件的大小、时间或其他条件满足特定标准时,如何“滚动”生成新的日志文件。滚动策略常用于处理较大的数据流文件,避免单个文件过大导致存储和处理困难。
1. 滚动策略的作用
在 Flink 中,当作业的输出是通过文件系统(如 HDFS、S3、本地文件系统等)进行持久化时,往往会遇到生成的文件越来越大的问题。滚动策略能够在文件达到某个阈值时自动生成新文件,确保文件的大小在可接受的范围内,从而提高数据处理的可管理性和性能。
2. 滚动策略的基本类型
Flink 提供了几种常见的滚动策略来控制文件的滚动行为。以下是几种常见的策略:
(1) 基于文件大小的滚动策略(Size-based Rolling)
当文件的大小超过一个预设的阈值时,文件会自动“滚动”到一个新的文件中,旧的文件会被关闭,新的文件开始接收数据。
- 适用场景:适用于对文件大小有严格要求的场景,特别是当文件过大时会影响系统性能或数据分析的效率。
- 配置:通常通过配置
maxFileSize
来设置最大文件大小。
示例:
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 1024) // 设置最大文件大小为 1GB.build();
(2) 基于时间的滚动策略(Time-based Rolling)
基于时间的滚动策略根据时间间隔来决定何时滚动文件,通常以分钟、小时或天为单位进行滚动。比如,每小时生成一个新的文件。
- 适用场景:适用于数据有时间要求的场景,比如需要按小时、按天划分存储的数据。
- 配置:可以配置时间间隔,例如通过
rolloverInterval
设置文件滚动的时间间隔。
示例:
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withRolloverInterval(60000L) // 每 60 秒滚动一次.build();
(3) 基于事件数量的滚动策略(Count-based Rolling)
事件数量滚动策略根据文件中的事件数量来决定何时滚动文件。比如,当文件中累积了 10000 个事件后,文件会自动滚动。
- 适用场景:适用于事件生成速率较快且文件大小不易预测的情况。
- 配置:通过
maxPartCount
设置文件中的最大事件数。
示例:
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartCount(10000) // 设置每个文件最多包含 10000 个事件.build();
3. Flink 中的文件滚动配置
在 Flink 中,滚动策略通常是与 Flink 的 FileSink
配合使用的。你可以为输出的文件设置滚动策略,并定义如何滚动文件。
4. RollingPolicy
配置
Flink 提供了一个 RollingPolicy
接口,默认的实现是 DefaultRollingPolicy
,它支持多种方式来配置文件滚动:
withMaxPartSize(long maxSize)
:设置单个文件的最大大小,当文件大小超过这个限制时,Flask 会滚动生成新文件。withRolloverInterval(long interval)
:设置文件的滚动时间间隔,单位是毫秒。withMaxPartCount(long maxPartCount)
:设置每个文件的最大事件数。
5. 使用示例
假设我们有一个 Flink 作业,将数据输出到 HDFS,并希望使用滚动策略来管理文件。我们可以通过以下方式设置文件大小滚动策略:
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.RollingPolicy;public class RollingPolicyExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 假设有一个简单的字符串数据流DataStream<String> stream = env.fromElements("Hello", "Flink", "Rolling", "Policy");// 设置滚动策略:文件大小达到 100MB 时滚动RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 100) // 100MB.withRolloverInterval(60000L) // 每 60 秒滚动一次.build();// 使用 FileSink 来输出数据到 HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path("hdfs://path/to/output"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(rollingPolicy).build();// 将数据流写入到文件stream.addSink(sink);env.execute("Flink Rolling Policy Example");}
}
在上面的代码中,我们为 FileSink
设置了基于文件大小和时间的滚动策略。文件大小超过 100MB 或者每 60 秒就会滚动一次,确保输出文件不会无限增大。
6. 滚动策略的选择与最佳实践
-
基于文件大小的滚动:适用于文件内容量预期较为稳定且文件大小有上限要求的情况。如果数据量大或变动较小,可以选择文件大小滚动策略。
-
基于时间的滚动:适用于对时间敏感的数据处理需求,比如日志数据、定时任务的输出等。基于时间滚动策略通常有固定的时间间隔,适合实时性要求高的场景。
-
基于事件数的滚动:适用于处理事件生成速率不确定,但希望文件滚动与事件数量挂钩的情况。比如,高速日志记录系统或事件驱动系统。
7. 总结
Flink 的滚动策略(Rolling Policy)是一个非常重要的功能,尤其在处理大量数据输出时,能帮助管理文件的大小、滚动周期和数据的合理分配。通过合理配置 RollingPolicy
,开发者可以灵活地管理输出文件,提升系统的可扩展性和存储效率。选择合适的滚动策略可以根据数据量、时间需求以及事件生成的速率来制定最合适的策略。
相关文章:
Flink 中的滚动策略(Rolling Policy)
在 Apache Flink 中,滚动策略(Rolling Policy)是针对日志(或数据流)文件输出的一种管理策略,它决定了在日志文件的大小、时间或其他条件满足特定标准时,如何“滚动”生成新的日志文件。滚动策略…...
GPU和FPGA的区别
GPU(Graphics Processing Unit,图形处理器)和 FPGA(Field-Programmable Gate Array,现场可编程门阵列)不是同一种硬件。 我的理解是,虽然都可以用于并行计算,但是GPU是纯计算的硬件…...
网易云音乐分布式KV存储实践与演进
随着网易云音乐业务的快速发展,推荐和搜索场景对分布式KV存储的需求日益增长。本文将深入探讨网易云音乐在分布式KV存储方面的实践和演进,分析其技术选型、架构设计以及未来发展方向。 一、业务背景 网易云音乐的业务场景对分布式KV存储提出了高并发、…...

WordPress平台如何接入Deepseek,有效提升网站流量
深夜改代码到崩溃?《2024全球CMS生态报告》揭露:78%的WordPress站长因API对接复杂,错失AI内容红利。本文实测「零代码接入Deepseek」的保姆级方案,配合147SEO的智能发布系统,让你用3个步骤实现日均50篇EEAT合规内容自动…...

【嵌入式】STM32内部NOR Flash磨损平衡与掉电保护总结
1. NOR Flash与NAND Flash 先deepseek看结论: 特性Nor FlashNAND Flash读取速度快(支持随机访问,直接执行代码)较慢(需按页顺序读取)写入/擦除速度慢(擦除需5秒,写入需逐字节操作&…...

什么是磁盘阵列(RAID)?如何提高磁盘阵列的性能
什么是磁盘阵列 磁盘阵列(RAID)是一种将多个独立的硬盘组合成一个逻辑存储单元的技术,旨在提高数据存储的性能、容量、可靠性和冗余性。磁盘阵列通过将数据分割成多个区段并分别存储在不同的硬盘上,利用个别磁盘提供数据加…...

轻量级日志管理平台Grafana Loki
文章目录 轻量级日志管理平台Grafana Loki背景什么是Loki为什么使用 Grafana Loki?架构Log Storage Grafana部署使用基于 Docker Compose 安装 LokiMinIO K8s集群部署Loki采集Helm 部署方式和案例 参考 轻量级日志管理平台Grafana Loki 背景 在微服务以及云原生时…...
k8s集群部署
集群结构 角色IPmaster192.168.35.135node1192.168.35.136node2192.168.35.137 部署 #需在三台主机上操作 //关闭防火墙 [rootmaster ~]# systemctl disable --now firewalld//关闭selinux [rootmaster ~]# sed -i s/enforcing/disabled/ /etc/selinux/config//关闭swap分区…...

STM32MP157A-FSMP1A单片机移植Linux系统SPI总线驱动
SPI总线驱动整体上与I2C总线驱动类型,差别主要在设备树和数据传输上,由于SPI是由4根线实现主从机的通信,在设备树上配置时需要对SPI进行设置。 原理图可知,数码管使用的SPI4对应了单片机上的PE11-->SPI4-NSS,PE12-->SPI4-S…...

系统基础与管理(2025更新中)
一、Linux 核心架构与组件 内核架构 核心职责: 管理进程生命周期、内存分配、硬件驱动交互及文件系统操作。 模块化设计支持动态加载硬件驱动(如modprobe加载内核模块),提升灵活性和扩展性。 内存管理:…...
Python--内置函数与推导式(下)
3. 内置函数 数学运算类 函数说明示例abs绝对值abs(-10) → 10pow幂运算pow(2, 3) → 8sum求和sum([1,2,3]) → 6divmod返回商和余数divmod(10, 3) → (3, 1) 数据转换类 # 进制转换 print(bin(10)) # 0b1010 print(hex(255)) # 0x…...

可狱可囚的爬虫系列课程 14:10 秒钟编写一个 requests 爬虫
一、前言 当重复性的工作频繁发生时,各种奇奇怪怪提高效率的想法就开始萌芽了。当重复代码的模块化封装已经不能满足要求的时候,更高效的方式就被揭开了神秘的面纱。本文基于这样的想法,来和大家探讨如何 10 秒钟编写一个 requests 爬虫程序。…...

Windows golang安装和环境配置
【1】、golang 1.19 sdk下载 https://download.csdn.net/download/notfindjob/90422529 【2】、安装 【3】、配置 GOPATH目录 【4】、LiteIDE下载安装 https://download.csdn.net/download/notfindjob/90422580 【5】、打开LiteIDE,选择查看->管理GOPATH&…...

IP-------GRE和MGRE
4.GRE和MGRE 1.应用场景 现实场景 居家工作,公司工作,分公司工作----------需要传输交换数据--------NAT---在该场景中需要两次NAT(不安全) 为了安全有两种手段-----1.物理专线---成本高 2.VPN--虚拟专用网---隧道技术--封装技…...

LabVIEW形状误差测量系统
在机械制造领域,形状与位置公差(GD&T)直接影响装配精度与产品寿命。国内中小型机加工企业因形状误差导致的返工率高达12%-18%。传统测量方式存在以下三大痛点: 设备局限:机械式千分表需人工读数,精度…...

django校园互助平台~源码
博主介绍:✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…...
Vue进阶之AI智能助手项目(五)——ChatGPT的调用和开发
AI智能助手项目 前端页面Layout布局页面-viewssrc/views/chat/layout/Permission.vuesrc/views/chat/layout/sider/index.vuesrc/views/chat/layout/sider/List.vuesrc/views/chat/layout/sider/Footer.vueComponents 组件Header/index.vueMessage/index.vue前端页面 Layout布…...
Jenkins重启后Maven的Project加载失败
个人博客地址:Jenkins重启后Maven的Project加载失败 | 一张假钞的真实世界 Jenkins重启后发现Maven的项目都没有正常加载。检查Jenkins的启动日志发现以下错误信息: java.io.IOException: Unable to read /home/jenkins/.jenkins/jobs/test-maven/conf…...

【docker】docker pull拉取中不断重复下载问题,解决方案之一,磁盘空间扩容
问题类似这样 存储空间不足 如果 Docker 运行环境的磁盘空间不足,拉取的镜像可能会被自动清理,导致重复下载。可以检查磁盘使用情况: df -h docker system df如果空间不足,可以清理一些不用的容器和镜像: docker sy…...
Ubuntu指令(一)
一、终端操作指令 打开终端,有两种便捷方式: 直接点击系统中的终端按钮;使用快捷键ctrl alt T。 关闭终端,同样有多种选择: 在终端输入exit指令;使用快捷键ctrl d;直接点击终端窗口的关闭…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...

Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
大模型多显卡多服务器并行计算方法与实践指南
一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...
苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会
在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...