当前位置: 首页 > news >正文

Flink 中的滚动策略(Rolling Policy)

在 Apache Flink 中,滚动策略(Rolling Policy)是针对日志(或数据流)文件输出的一种管理策略,它决定了在日志文件的大小、时间或其他条件满足特定标准时,如何“滚动”生成新的日志文件。滚动策略常用于处理较大的数据流文件,避免单个文件过大导致存储和处理困难。

1. 滚动策略的作用

在 Flink 中,当作业的输出是通过文件系统(如 HDFS、S3、本地文件系统等)进行持久化时,往往会遇到生成的文件越来越大的问题。滚动策略能够在文件达到某个阈值时自动生成新文件,确保文件的大小在可接受的范围内,从而提高数据处理的可管理性和性能。

2. 滚动策略的基本类型

Flink 提供了几种常见的滚动策略来控制文件的滚动行为。以下是几种常见的策略:

(1) 基于文件大小的滚动策略(Size-based Rolling)

当文件的大小超过一个预设的阈值时,文件会自动“滚动”到一个新的文件中,旧的文件会被关闭,新的文件开始接收数据。

  • 适用场景:适用于对文件大小有严格要求的场景,特别是当文件过大时会影响系统性能或数据分析的效率。
  • 配置:通常通过配置 maxFileSize 来设置最大文件大小。

示例

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 1024)  // 设置最大文件大小为 1GB.build();
(2) 基于时间的滚动策略(Time-based Rolling)

基于时间的滚动策略根据时间间隔来决定何时滚动文件,通常以分钟、小时或天为单位进行滚动。比如,每小时生成一个新的文件。

  • 适用场景:适用于数据有时间要求的场景,比如需要按小时、按天划分存储的数据。
  • 配置:可以配置时间间隔,例如通过 rolloverInterval 设置文件滚动的时间间隔。

示例

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withRolloverInterval(60000L)  // 每 60 秒滚动一次.build();
(3) 基于事件数量的滚动策略(Count-based Rolling)

事件数量滚动策略根据文件中的事件数量来决定何时滚动文件。比如,当文件中累积了 10000 个事件后,文件会自动滚动。

  • 适用场景:适用于事件生成速率较快且文件大小不易预测的情况。
  • 配置:通过 maxPartCount 设置文件中的最大事件数。

示例

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartCount(10000)  // 设置每个文件最多包含 10000 个事件.build();

3. Flink 中的文件滚动配置

在 Flink 中,滚动策略通常是与 Flink 的 FileSink 配合使用的。你可以为输出的文件设置滚动策略,并定义如何滚动文件。

4. RollingPolicy 配置

Flink 提供了一个 RollingPolicy 接口,默认的实现是 DefaultRollingPolicy,它支持多种方式来配置文件滚动:

  • withMaxPartSize(long maxSize):设置单个文件的最大大小,当文件大小超过这个限制时,Flask 会滚动生成新文件。
  • withRolloverInterval(long interval):设置文件的滚动时间间隔,单位是毫秒。
  • withMaxPartCount(long maxPartCount):设置每个文件的最大事件数。

5. 使用示例

假设我们有一个 Flink 作业,将数据输出到 HDFS,并希望使用滚动策略来管理文件。我们可以通过以下方式设置文件大小滚动策略:

import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.RollingPolicy;public class RollingPolicyExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 假设有一个简单的字符串数据流DataStream<String> stream = env.fromElements("Hello", "Flink", "Rolling", "Policy");// 设置滚动策略:文件大小达到 100MB 时滚动RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 100)  // 100MB.withRolloverInterval(60000L)        // 每 60 秒滚动一次.build();// 使用 FileSink 来输出数据到 HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path("hdfs://path/to/output"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(rollingPolicy).build();// 将数据流写入到文件stream.addSink(sink);env.execute("Flink Rolling Policy Example");}
}

在上面的代码中,我们为 FileSink 设置了基于文件大小和时间的滚动策略。文件大小超过 100MB 或者每 60 秒就会滚动一次,确保输出文件不会无限增大。

6. 滚动策略的选择与最佳实践

  • 基于文件大小的滚动:适用于文件内容量预期较为稳定且文件大小有上限要求的情况。如果数据量大或变动较小,可以选择文件大小滚动策略。

  • 基于时间的滚动:适用于对时间敏感的数据处理需求,比如日志数据、定时任务的输出等。基于时间滚动策略通常有固定的时间间隔,适合实时性要求高的场景。

  • 基于事件数的滚动:适用于处理事件生成速率不确定,但希望文件滚动与事件数量挂钩的情况。比如,高速日志记录系统或事件驱动系统。

7. 总结

Flink 的滚动策略(Rolling Policy)是一个非常重要的功能,尤其在处理大量数据输出时,能帮助管理文件的大小、滚动周期和数据的合理分配。通过合理配置 RollingPolicy,开发者可以灵活地管理输出文件,提升系统的可扩展性和存储效率。选择合适的滚动策略可以根据数据量、时间需求以及事件生成的速率来制定最合适的策略。

相关文章:

Flink 中的滚动策略(Rolling Policy)

在 Apache Flink 中&#xff0c;滚动策略&#xff08;Rolling Policy&#xff09;是针对日志&#xff08;或数据流&#xff09;文件输出的一种管理策略&#xff0c;它决定了在日志文件的大小、时间或其他条件满足特定标准时&#xff0c;如何“滚动”生成新的日志文件。滚动策略…...

GPU和FPGA的区别

GPU&#xff08;Graphics Processing Unit&#xff0c;图形处理器&#xff09;和 FPGA&#xff08;Field-Programmable Gate Array&#xff0c;现场可编程门阵列&#xff09;不是同一种硬件。 我的理解是&#xff0c;虽然都可以用于并行计算&#xff0c;但是GPU是纯计算的硬件…...

网易云音乐分布式KV存储实践与演进

随着网易云音乐业务的快速发展&#xff0c;推荐和搜索场景对分布式KV存储的需求日益增长。本文将深入探讨网易云音乐在分布式KV存储方面的实践和演进&#xff0c;分析其技术选型、架构设计以及未来发展方向。 一、业务背景 网易云音乐的业务场景对分布式KV存储提出了高并发、…...

WordPress平台如何接入Deepseek,有效提升网站流量

深夜改代码到崩溃&#xff1f;《2024全球CMS生态报告》揭露&#xff1a;78%的WordPress站长因API对接复杂&#xff0c;错失AI内容红利。本文实测「零代码接入Deepseek」的保姆级方案&#xff0c;配合147SEO的智能发布系统&#xff0c;让你用3个步骤实现日均50篇EEAT合规内容自动…...

【嵌入式】STM32内部NOR Flash磨损平衡与掉电保护总结

1. NOR Flash与NAND Flash 先deepseek看结论&#xff1a; 特性Nor FlashNAND Flash读取速度快&#xff08;支持随机访问&#xff0c;直接执行代码&#xff09;较慢&#xff08;需按页顺序读取&#xff09;写入/擦除速度慢&#xff08;擦除需5秒&#xff0c;写入需逐字节操作&…...

什么是磁盘阵列(RAID)?如何提高磁盘阵列的性能

什么是磁盘阵列 ‌磁盘阵列&#xff08;RAID&#xff09;是一种将多个独立的硬盘组合成一个逻辑存储单元的技术&#xff0c;旨在提高数据存储的性能、容量、可靠性和冗余性‌。‌磁盘阵列通过将数据分割成多个区段并分别存储在不同的硬盘上&#xff0c;利用个别磁盘提供数据加…...

轻量级日志管理平台Grafana Loki

文章目录 轻量级日志管理平台Grafana Loki背景什么是Loki为什么使用 Grafana Loki&#xff1f;架构Log Storage Grafana部署使用基于 Docker Compose 安装 LokiMinIO K8s集群部署Loki采集Helm 部署方式和案例 参考 轻量级日志管理平台Grafana Loki 背景 在微服务以及云原生时…...

k8s集群部署

集群结构 角色IPmaster192.168.35.135node1192.168.35.136node2192.168.35.137 部署 #需在三台主机上操作 //关闭防火墙 [rootmaster ~]# systemctl disable --now firewalld//关闭selinux [rootmaster ~]# sed -i s/enforcing/disabled/ /etc/selinux/config//关闭swap分区…...

STM32MP157A-FSMP1A单片机移植Linux系统SPI总线驱动

SPI总线驱动整体上与I2C总线驱动类型&#xff0c;差别主要在设备树和数据传输上&#xff0c;由于SPI是由4根线实现主从机的通信&#xff0c;在设备树上配置时需要对SPI进行设置。 原理图可知&#xff0c;数码管使用的SPI4对应了单片机上的PE11-->SPI4-NSS,PE12-->SPI4-S…...

系统基础与管理(2025更新中)

‌一、Linux 核心架构与组件‌ ‌内核架构‌ ‌核心职责‌&#xff1a; 管理进程生命周期、内存分配、硬件驱动交互及文件系统操作。 模块化设计支持动态加载硬件驱动&#xff08;如modprobe加载内核模块&#xff09;&#xff0c;提升灵活性和扩展性。 ‌内存管理‌&#xff1a…...

Python--内置函数与推导式(下)

3. 内置函数 数学运算类 函数说明示例​abs​绝对值​abs(-10) → 10​​pow​幂运算​pow(2, 3) → 8​​sum​求和​sum([1,2,3]) → 6​​divmod​返回商和余数​divmod(10, 3) → (3, 1)​ 数据转换类 # 进制转换 print(bin(10)) # 0b1010 print(hex(255)) # 0x…...

可狱可囚的爬虫系列课程 14:10 秒钟编写一个 requests 爬虫

一、前言 当重复性的工作频繁发生时&#xff0c;各种奇奇怪怪提高效率的想法就开始萌芽了。当重复代码的模块化封装已经不能满足要求的时候&#xff0c;更高效的方式就被揭开了神秘的面纱。本文基于这样的想法&#xff0c;来和大家探讨如何 10 秒钟编写一个 requests 爬虫程序。…...

Windows golang安装和环境配置

【1】、golang 1.19 sdk下载 https://download.csdn.net/download/notfindjob/90422529 【2】、安装 【3】、配置 GOPATH目录 【4】、LiteIDE下载安装 https://download.csdn.net/download/notfindjob/90422580 【5】、打开LiteIDE&#xff0c;选择查看->管理GOPATH&…...

IP-------GRE和MGRE

4.GRE和MGRE 1.应用场景 现实场景 居家工作&#xff0c;公司工作&#xff0c;分公司工作----------需要传输交换数据--------NAT---在该场景中需要两次NAT&#xff08;不安全&#xff09; 为了安全有两种手段-----1.物理专线---成本高 2.VPN--虚拟专用网---隧道技术--封装技…...

LabVIEW形状误差测量系统

在机械制造领域&#xff0c;形状与位置公差&#xff08;GD&T&#xff09;直接影响装配精度与产品寿命。国内中小型机加工企业因形状误差导致的返工率高达12%-18%。传统测量方式存在以下三大痛点&#xff1a; ​ 设备局限&#xff1a;机械式千分表需人工读数&#xff0c;精度…...

django校园互助平台~源码

博主介绍&#xff1a;✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…...

Vue进阶之AI智能助手项目(五)——ChatGPT的调用和开发

AI智能助手项目 前端页面Layout布局页面-viewssrc/views/chat/layout/Permission.vuesrc/views/chat/layout/sider/index.vuesrc/views/chat/layout/sider/List.vuesrc/views/chat/layout/sider/Footer.vueComponents 组件Header/index.vueMessage/index.vue前端页面 Layout布…...

Jenkins重启后Maven的Project加载失败

个人博客地址&#xff1a;Jenkins重启后Maven的Project加载失败 | 一张假钞的真实世界 Jenkins重启后发现Maven的项目都没有正常加载。检查Jenkins的启动日志发现以下错误信息&#xff1a; java.io.IOException: Unable to read /home/jenkins/.jenkins/jobs/test-maven/conf…...

【docker】docker pull拉取中不断重复下载问题,解决方案之一,磁盘空间扩容

问题类似这样 存储空间不足 如果 Docker 运行环境的磁盘空间不足&#xff0c;拉取的镜像可能会被自动清理&#xff0c;导致重复下载。可以检查磁盘使用情况&#xff1a; df -h docker system df如果空间不足&#xff0c;可以清理一些不用的容器和镜像&#xff1a; docker sy…...

Ubuntu指令(一)

一、终端操作指令 打开终端&#xff0c;有两种便捷方式&#xff1a; 直接点击系统中的终端按钮&#xff1b;使用快捷键ctrl alt T。 关闭终端&#xff0c;同样有多种选择&#xff1a; 在终端输入exit指令&#xff1b;使用快捷键ctrl d&#xff1b;直接点击终端窗口的关闭…...

【零成本降AI】别盲目改论文!基于知网报告的DeepSeek降AI实操(附神级提示词)

最近学术圈有个大动作&#xff0c;不知道大家发现没——知网的AIGC检测算法又升级了。 这就导致一个很尴尬的现象&#xff1a;哪怕是你一个字一个字熬夜敲出来的&#xff0c;只要逻辑太顺、用词太标准&#xff0c;大概率也会被标红。现在想找个靠谱的aigc免费降重方法&#xff…...

保姆级教程:STM32+ESP8266接入机智云,从零完成数据点上报与APP控制

STM32与ESP8266接入机智云实战&#xff1a;从数据点定义到APP控制全解析 在智能硬件开发领域&#xff0c;快速实现设备联网与远程控制是许多嵌入式工程师面临的挑战。本文将手把手带您完成一个基于STM32和ESP8266的智能温湿度监测系统&#xff0c;从机智云平台配置到代码移植&a…...

从形式逻辑到认知几何:基于RAE引擎的逻辑律强制与可信AI构建方法研究(修订稿)

从形式逻辑到认知几何&#xff1a;基于RAE引擎的逻辑律强制与可信AI构建方法研究&#xff08;修订稿&#xff09; From Formal Logic to Cognitive Geometry: A Study on Logical Law Enforcement and Trustworthy AI via RAE Engine作者&#xff1a;方见华 单位&#xff1a;世…...

OpenClaw v2026.4.12 功能介绍

最新版本&#xff1a; v2026.4.12&#xff08;2026-04-13 发布&#xff09; License&#xff1a; MIT一、核心定位OpenClaw 是一个私有化部署的个人 AI 助手&#xff0c;运行在你自己的设备上&#xff0c;通过你日常使用的消息渠道&#xff08;微信、飞书、Telegram、Discord、…...

SQL如何获取分组最后一条数据_LAST_VALUE的滑动窗口陷阱

LAST_VALUE默认只返回当前行而非分组最后一条&#xff0c;因默认窗口帧为ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW&#xff1b;需显式指定UNBOUNDED FOLLOWING并配合确定性ORDER BY&#xff08;如时间降序二级排序&#xff09;才能正确取最新值。LAST_VALUE 默认是 R…...

深入H7内核:手把手教你为STM32H723的LWIP+FreeRTOS工程配置MPU内存区域

深入H7内核&#xff1a;手把手教你为STM32H723的LWIPFreeRTOS工程配置MPU内存区域 在嵌入式网络开发中&#xff0c;STM32H7系列凭借其高性能Cortex-M7内核和丰富的外设资源成为许多项目的首选。然而&#xff0c;当我们将LWIP协议栈与FreeRTOS结合使用时&#xff0c;往往会忽视一…...

整理了一些大模型的课程,非常详细,大模型零基础入门到精通,建议收藏

本文介绍了多个科普类大模型课程&#xff0c;包括复旦大学的《大模型开发与赋能》专题讲习班、清华大学的自然语言处理实验室与OpenBMB合作的大模型公开课、好未来学而思网校的《人工智能第一课》等。此外&#xff0c;还推荐了吴恩达教授的《AI for Everyone》课程&#xff0c;…...

FPGA时序约束进阶:Set_Bus_Skew在跨时钟域设计中的实战解析

1. 什么是Set_Bus_Skew约束&#xff1f; 第一次在跨时钟域设计中遇到总线偏斜问题时&#xff0c;我盯着时序报告里那些莫名其妙的违例数字整整发呆了半小时。作为FPGA工程师&#xff0c;你可能已经熟悉了常规的setup/hold检查&#xff0c;但当多个信号需要同步跨时钟域传输时&a…...

Frida离线安装全攻略:手把手带你搭建无网环境(附资源包)

1. 为什么需要Frida离线安装&#xff1f; 最近在做一个移动端安全测试项目时&#xff0c;遇到了一个棘手的问题&#xff1a;客户公司的内网环境完全隔离&#xff0c;所有测试设备都无法连接外网。这意味着常规的pip install frida安装方式完全失效。经过两天的摸索和踩坑&#…...

多模型场景下的成本治理指标体系幢

为 HagiCode 添加 GitHub Pages 自动部署支持 本项目早期代号为 PCode&#xff0c;现已正式更名为 HagiCode。本文记录了如何为项目引入自动化静态站点部署能力&#xff0c;让内容发布像喝水一样简单。 背景/引言 在 HagiCode 的开发过程中&#xff0c;我们遇到了一个很现实的问…...