当前位置: 首页 > news >正文

Flink 中的滚动策略(Rolling Policy)

在 Apache Flink 中,滚动策略(Rolling Policy)是针对日志(或数据流)文件输出的一种管理策略,它决定了在日志文件的大小、时间或其他条件满足特定标准时,如何“滚动”生成新的日志文件。滚动策略常用于处理较大的数据流文件,避免单个文件过大导致存储和处理困难。

1. 滚动策略的作用

在 Flink 中,当作业的输出是通过文件系统(如 HDFS、S3、本地文件系统等)进行持久化时,往往会遇到生成的文件越来越大的问题。滚动策略能够在文件达到某个阈值时自动生成新文件,确保文件的大小在可接受的范围内,从而提高数据处理的可管理性和性能。

2. 滚动策略的基本类型

Flink 提供了几种常见的滚动策略来控制文件的滚动行为。以下是几种常见的策略:

(1) 基于文件大小的滚动策略(Size-based Rolling)

当文件的大小超过一个预设的阈值时,文件会自动“滚动”到一个新的文件中,旧的文件会被关闭,新的文件开始接收数据。

  • 适用场景:适用于对文件大小有严格要求的场景,特别是当文件过大时会影响系统性能或数据分析的效率。
  • 配置:通常通过配置 maxFileSize 来设置最大文件大小。

示例

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 1024)  // 设置最大文件大小为 1GB.build();
(2) 基于时间的滚动策略(Time-based Rolling)

基于时间的滚动策略根据时间间隔来决定何时滚动文件,通常以分钟、小时或天为单位进行滚动。比如,每小时生成一个新的文件。

  • 适用场景:适用于数据有时间要求的场景,比如需要按小时、按天划分存储的数据。
  • 配置:可以配置时间间隔,例如通过 rolloverInterval 设置文件滚动的时间间隔。

示例

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withRolloverInterval(60000L)  // 每 60 秒滚动一次.build();
(3) 基于事件数量的滚动策略(Count-based Rolling)

事件数量滚动策略根据文件中的事件数量来决定何时滚动文件。比如,当文件中累积了 10000 个事件后,文件会自动滚动。

  • 适用场景:适用于事件生成速率较快且文件大小不易预测的情况。
  • 配置:通过 maxPartCount 设置文件中的最大事件数。

示例

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartCount(10000)  // 设置每个文件最多包含 10000 个事件.build();

3. Flink 中的文件滚动配置

在 Flink 中,滚动策略通常是与 Flink 的 FileSink 配合使用的。你可以为输出的文件设置滚动策略,并定义如何滚动文件。

4. RollingPolicy 配置

Flink 提供了一个 RollingPolicy 接口,默认的实现是 DefaultRollingPolicy,它支持多种方式来配置文件滚动:

  • withMaxPartSize(long maxSize):设置单个文件的最大大小,当文件大小超过这个限制时,Flask 会滚动生成新文件。
  • withRolloverInterval(long interval):设置文件的滚动时间间隔,单位是毫秒。
  • withMaxPartCount(long maxPartCount):设置每个文件的最大事件数。

5. 使用示例

假设我们有一个 Flink 作业,将数据输出到 HDFS,并希望使用滚动策略来管理文件。我们可以通过以下方式设置文件大小滚动策略:

import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.RollingPolicy;public class RollingPolicyExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 假设有一个简单的字符串数据流DataStream<String> stream = env.fromElements("Hello", "Flink", "Rolling", "Policy");// 设置滚动策略:文件大小达到 100MB 时滚动RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 100)  // 100MB.withRolloverInterval(60000L)        // 每 60 秒滚动一次.build();// 使用 FileSink 来输出数据到 HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path("hdfs://path/to/output"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(rollingPolicy).build();// 将数据流写入到文件stream.addSink(sink);env.execute("Flink Rolling Policy Example");}
}

在上面的代码中,我们为 FileSink 设置了基于文件大小和时间的滚动策略。文件大小超过 100MB 或者每 60 秒就会滚动一次,确保输出文件不会无限增大。

6. 滚动策略的选择与最佳实践

  • 基于文件大小的滚动:适用于文件内容量预期较为稳定且文件大小有上限要求的情况。如果数据量大或变动较小,可以选择文件大小滚动策略。

  • 基于时间的滚动:适用于对时间敏感的数据处理需求,比如日志数据、定时任务的输出等。基于时间滚动策略通常有固定的时间间隔,适合实时性要求高的场景。

  • 基于事件数的滚动:适用于处理事件生成速率不确定,但希望文件滚动与事件数量挂钩的情况。比如,高速日志记录系统或事件驱动系统。

7. 总结

Flink 的滚动策略(Rolling Policy)是一个非常重要的功能,尤其在处理大量数据输出时,能帮助管理文件的大小、滚动周期和数据的合理分配。通过合理配置 RollingPolicy,开发者可以灵活地管理输出文件,提升系统的可扩展性和存储效率。选择合适的滚动策略可以根据数据量、时间需求以及事件生成的速率来制定最合适的策略。

相关文章:

Flink 中的滚动策略(Rolling Policy)

在 Apache Flink 中&#xff0c;滚动策略&#xff08;Rolling Policy&#xff09;是针对日志&#xff08;或数据流&#xff09;文件输出的一种管理策略&#xff0c;它决定了在日志文件的大小、时间或其他条件满足特定标准时&#xff0c;如何“滚动”生成新的日志文件。滚动策略…...

GPU和FPGA的区别

GPU&#xff08;Graphics Processing Unit&#xff0c;图形处理器&#xff09;和 FPGA&#xff08;Field-Programmable Gate Array&#xff0c;现场可编程门阵列&#xff09;不是同一种硬件。 我的理解是&#xff0c;虽然都可以用于并行计算&#xff0c;但是GPU是纯计算的硬件…...

网易云音乐分布式KV存储实践与演进

随着网易云音乐业务的快速发展&#xff0c;推荐和搜索场景对分布式KV存储的需求日益增长。本文将深入探讨网易云音乐在分布式KV存储方面的实践和演进&#xff0c;分析其技术选型、架构设计以及未来发展方向。 一、业务背景 网易云音乐的业务场景对分布式KV存储提出了高并发、…...

WordPress平台如何接入Deepseek,有效提升网站流量

深夜改代码到崩溃&#xff1f;《2024全球CMS生态报告》揭露&#xff1a;78%的WordPress站长因API对接复杂&#xff0c;错失AI内容红利。本文实测「零代码接入Deepseek」的保姆级方案&#xff0c;配合147SEO的智能发布系统&#xff0c;让你用3个步骤实现日均50篇EEAT合规内容自动…...

【嵌入式】STM32内部NOR Flash磨损平衡与掉电保护总结

1. NOR Flash与NAND Flash 先deepseek看结论&#xff1a; 特性Nor FlashNAND Flash读取速度快&#xff08;支持随机访问&#xff0c;直接执行代码&#xff09;较慢&#xff08;需按页顺序读取&#xff09;写入/擦除速度慢&#xff08;擦除需5秒&#xff0c;写入需逐字节操作&…...

什么是磁盘阵列(RAID)?如何提高磁盘阵列的性能

什么是磁盘阵列 ‌磁盘阵列&#xff08;RAID&#xff09;是一种将多个独立的硬盘组合成一个逻辑存储单元的技术&#xff0c;旨在提高数据存储的性能、容量、可靠性和冗余性‌。‌磁盘阵列通过将数据分割成多个区段并分别存储在不同的硬盘上&#xff0c;利用个别磁盘提供数据加…...

轻量级日志管理平台Grafana Loki

文章目录 轻量级日志管理平台Grafana Loki背景什么是Loki为什么使用 Grafana Loki&#xff1f;架构Log Storage Grafana部署使用基于 Docker Compose 安装 LokiMinIO K8s集群部署Loki采集Helm 部署方式和案例 参考 轻量级日志管理平台Grafana Loki 背景 在微服务以及云原生时…...

k8s集群部署

集群结构 角色IPmaster192.168.35.135node1192.168.35.136node2192.168.35.137 部署 #需在三台主机上操作 //关闭防火墙 [rootmaster ~]# systemctl disable --now firewalld//关闭selinux [rootmaster ~]# sed -i s/enforcing/disabled/ /etc/selinux/config//关闭swap分区…...

STM32MP157A-FSMP1A单片机移植Linux系统SPI总线驱动

SPI总线驱动整体上与I2C总线驱动类型&#xff0c;差别主要在设备树和数据传输上&#xff0c;由于SPI是由4根线实现主从机的通信&#xff0c;在设备树上配置时需要对SPI进行设置。 原理图可知&#xff0c;数码管使用的SPI4对应了单片机上的PE11-->SPI4-NSS,PE12-->SPI4-S…...

系统基础与管理(2025更新中)

‌一、Linux 核心架构与组件‌ ‌内核架构‌ ‌核心职责‌&#xff1a; 管理进程生命周期、内存分配、硬件驱动交互及文件系统操作。 模块化设计支持动态加载硬件驱动&#xff08;如modprobe加载内核模块&#xff09;&#xff0c;提升灵活性和扩展性。 ‌内存管理‌&#xff1a…...

Python--内置函数与推导式(下)

3. 内置函数 数学运算类 函数说明示例​abs​绝对值​abs(-10) → 10​​pow​幂运算​pow(2, 3) → 8​​sum​求和​sum([1,2,3]) → 6​​divmod​返回商和余数​divmod(10, 3) → (3, 1)​ 数据转换类 # 进制转换 print(bin(10)) # 0b1010 print(hex(255)) # 0x…...

可狱可囚的爬虫系列课程 14:10 秒钟编写一个 requests 爬虫

一、前言 当重复性的工作频繁发生时&#xff0c;各种奇奇怪怪提高效率的想法就开始萌芽了。当重复代码的模块化封装已经不能满足要求的时候&#xff0c;更高效的方式就被揭开了神秘的面纱。本文基于这样的想法&#xff0c;来和大家探讨如何 10 秒钟编写一个 requests 爬虫程序。…...

Windows golang安装和环境配置

【1】、golang 1.19 sdk下载 https://download.csdn.net/download/notfindjob/90422529 【2】、安装 【3】、配置 GOPATH目录 【4】、LiteIDE下载安装 https://download.csdn.net/download/notfindjob/90422580 【5】、打开LiteIDE&#xff0c;选择查看->管理GOPATH&…...

IP-------GRE和MGRE

4.GRE和MGRE 1.应用场景 现实场景 居家工作&#xff0c;公司工作&#xff0c;分公司工作----------需要传输交换数据--------NAT---在该场景中需要两次NAT&#xff08;不安全&#xff09; 为了安全有两种手段-----1.物理专线---成本高 2.VPN--虚拟专用网---隧道技术--封装技…...

LabVIEW形状误差测量系统

在机械制造领域&#xff0c;形状与位置公差&#xff08;GD&T&#xff09;直接影响装配精度与产品寿命。国内中小型机加工企业因形状误差导致的返工率高达12%-18%。传统测量方式存在以下三大痛点&#xff1a; ​ 设备局限&#xff1a;机械式千分表需人工读数&#xff0c;精度…...

django校园互助平台~源码

博主介绍&#xff1a;✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…...

Vue进阶之AI智能助手项目(五)——ChatGPT的调用和开发

AI智能助手项目 前端页面Layout布局页面-viewssrc/views/chat/layout/Permission.vuesrc/views/chat/layout/sider/index.vuesrc/views/chat/layout/sider/List.vuesrc/views/chat/layout/sider/Footer.vueComponents 组件Header/index.vueMessage/index.vue前端页面 Layout布…...

Jenkins重启后Maven的Project加载失败

个人博客地址&#xff1a;Jenkins重启后Maven的Project加载失败 | 一张假钞的真实世界 Jenkins重启后发现Maven的项目都没有正常加载。检查Jenkins的启动日志发现以下错误信息&#xff1a; java.io.IOException: Unable to read /home/jenkins/.jenkins/jobs/test-maven/conf…...

【docker】docker pull拉取中不断重复下载问题,解决方案之一,磁盘空间扩容

问题类似这样 存储空间不足 如果 Docker 运行环境的磁盘空间不足&#xff0c;拉取的镜像可能会被自动清理&#xff0c;导致重复下载。可以检查磁盘使用情况&#xff1a; df -h docker system df如果空间不足&#xff0c;可以清理一些不用的容器和镜像&#xff1a; docker sy…...

Ubuntu指令(一)

一、终端操作指令 打开终端&#xff0c;有两种便捷方式&#xff1a; 直接点击系统中的终端按钮&#xff1b;使用快捷键ctrl alt T。 关闭终端&#xff0c;同样有多种选择&#xff1a; 在终端输入exit指令&#xff1b;使用快捷键ctrl d&#xff1b;直接点击终端窗口的关闭…...

深入解析x86控制寄存器CR0:从分页机制到写保护的关键作用

1. CR0寄存器&#xff1a;x86架构的"控制中枢" 如果把CPU比作计算机的大脑&#xff0c;那么CR0寄存器就像是这个大脑的"控制面板"。这个32位的特殊寄存器直接决定了处理器如何管理内存、如何处理异常、甚至如何执行最基本的指令。我第一次在内核源码中看到…...

DeOldify GPU算力优化教程:显存占用控制与推理速度提升技巧

DeOldify GPU算力优化教程&#xff1a;显存占用控制与推理速度提升技巧 1. 项目简介与优化价值 DeOldify是一个基于深度学习技术的黑白图像上色工具&#xff0c;它使用U-Net架构结合ResNet编码器来实现高质量的图像色彩还原。虽然这个工具使用起来很简单&#xff0c;但在实际…...

PyTorch自定义损失超简单

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 PyTorch自定义损失函数&#xff1a;轻松实现的秘诀目录PyTorch自定义损失函数&#xff1a;轻松实现的秘诀 引言&#xff1a;打破…...

AI股票分析师场景应用:快速搭建本地化金融分析工具全流程

AI股票分析师场景应用&#xff1a;快速搭建本地化金融分析工具全流程 1. 引言&#xff1a;金融分析的智能化转型 在金融投资领域&#xff0c;及时获取专业分析报告是做出投资决策的关键。传统方式需要依赖券商研究报告或付费咨询&#xff0c;不仅成本高昂&#xff0c;还存在隐…...

nginx+rtmp实现直播完整流程

一&#xff0c;环境准备 1.下载nginx-rtmp-module&#xff1a; 1 2 cd /www/server/ git clone https://github.com/arut/nginx-rtmp-module.git 2.Nginx安装: 这是用了宝塔哈。 软件商店 > 应用搜索&#xff1a;nginx > 安装 > 编译安装 > 添加自定义模块 模块…...

The Ultimate Guide to Ruby Timeouts:如何为第三方服务API设置超时

The Ultimate Guide to Ruby Timeouts&#xff1a;如何为第三方服务API设置超时 【免费下载链接】the-ultimate-guide-to-ruby-timeouts Timeouts for popular Ruby gems 项目地址: https://gitcode.com/gh_mirrors/th/the-ultimate-guide-to-ruby-timeouts 在Ruby开发中…...

保姆级教程:STM32+ESP8266接入机智云,从零完成数据点上报与APP控制

STM32与ESP8266接入机智云实战&#xff1a;从数据点定义到APP控制全解析 在智能硬件开发领域&#xff0c;快速实现设备联网与远程控制是许多嵌入式工程师面临的挑战。本文将手把手带您完成一个基于STM32和ESP8266的智能温湿度监测系统&#xff0c;从机智云平台配置到代码移植&a…...

阿里云专有云网络架构

一、 网络设备角色详解&#xff08;基于阿里云飞天网络架构&#xff09; 结合 v3.18.6r 版本特性&#xff0c;对图中各缩写设备进行标准化定义&#xff1a;设备缩写全称在单元Region中的核心职责1659台规模下的配置建议NCNode Controller物理服务器节点。包含计算节点&#xff…...

CSS如何提高团队协作效率_推广BEM规范减少样式沟通成本

BEM命名能减少CSS样式扯皮&#xff0c;因其类名明确表达“是什么、在哪用、干什么”&#xff0c;如header__logo--dark精准锁定作用域和上下文&#xff0c;避免复用冲突与逻辑覆盖。为什么BEM命名能减少CSS样式扯皮因为类名本身说了清楚“这是啥、在哪用、干啥的”&#xff0c;…...

深入解析Linux CMA内存管理机制及其优化策略

1. Linux CMA内存管理机制揭秘 第一次在嵌入式设备上调试摄像头驱动时&#xff0c;我遇到了一个棘手的问题&#xff1a;系统总是无法分配足够大的连续内存块。经过三天三夜的排查&#xff0c;终于发现是CMA配置不当导致的。这段经历让我深刻认识到理解CMA机制的重要性。 CMA&am…...