当前位置: 首页 > news >正文

Flink 中的滚动策略(Rolling Policy)

在 Apache Flink 中,滚动策略(Rolling Policy)是针对日志(或数据流)文件输出的一种管理策略,它决定了在日志文件的大小、时间或其他条件满足特定标准时,如何“滚动”生成新的日志文件。滚动策略常用于处理较大的数据流文件,避免单个文件过大导致存储和处理困难。

1. 滚动策略的作用

在 Flink 中,当作业的输出是通过文件系统(如 HDFS、S3、本地文件系统等)进行持久化时,往往会遇到生成的文件越来越大的问题。滚动策略能够在文件达到某个阈值时自动生成新文件,确保文件的大小在可接受的范围内,从而提高数据处理的可管理性和性能。

2. 滚动策略的基本类型

Flink 提供了几种常见的滚动策略来控制文件的滚动行为。以下是几种常见的策略:

(1) 基于文件大小的滚动策略(Size-based Rolling)

当文件的大小超过一个预设的阈值时,文件会自动“滚动”到一个新的文件中,旧的文件会被关闭,新的文件开始接收数据。

  • 适用场景:适用于对文件大小有严格要求的场景,特别是当文件过大时会影响系统性能或数据分析的效率。
  • 配置:通常通过配置 maxFileSize 来设置最大文件大小。

示例

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 1024)  // 设置最大文件大小为 1GB.build();
(2) 基于时间的滚动策略(Time-based Rolling)

基于时间的滚动策略根据时间间隔来决定何时滚动文件,通常以分钟、小时或天为单位进行滚动。比如,每小时生成一个新的文件。

  • 适用场景:适用于数据有时间要求的场景,比如需要按小时、按天划分存储的数据。
  • 配置:可以配置时间间隔,例如通过 rolloverInterval 设置文件滚动的时间间隔。

示例

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withRolloverInterval(60000L)  // 每 60 秒滚动一次.build();
(3) 基于事件数量的滚动策略(Count-based Rolling)

事件数量滚动策略根据文件中的事件数量来决定何时滚动文件。比如,当文件中累积了 10000 个事件后,文件会自动滚动。

  • 适用场景:适用于事件生成速率较快且文件大小不易预测的情况。
  • 配置:通过 maxPartCount 设置文件中的最大事件数。

示例

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartCount(10000)  // 设置每个文件最多包含 10000 个事件.build();

3. Flink 中的文件滚动配置

在 Flink 中,滚动策略通常是与 Flink 的 FileSink 配合使用的。你可以为输出的文件设置滚动策略,并定义如何滚动文件。

4. RollingPolicy 配置

Flink 提供了一个 RollingPolicy 接口,默认的实现是 DefaultRollingPolicy,它支持多种方式来配置文件滚动:

  • withMaxPartSize(long maxSize):设置单个文件的最大大小,当文件大小超过这个限制时,Flask 会滚动生成新文件。
  • withRolloverInterval(long interval):设置文件的滚动时间间隔,单位是毫秒。
  • withMaxPartCount(long maxPartCount):设置每个文件的最大事件数。

5. 使用示例

假设我们有一个 Flink 作业,将数据输出到 HDFS,并希望使用滚动策略来管理文件。我们可以通过以下方式设置文件大小滚动策略:

import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.RollingPolicy;public class RollingPolicyExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 假设有一个简单的字符串数据流DataStream<String> stream = env.fromElements("Hello", "Flink", "Rolling", "Policy");// 设置滚动策略:文件大小达到 100MB 时滚动RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 100)  // 100MB.withRolloverInterval(60000L)        // 每 60 秒滚动一次.build();// 使用 FileSink 来输出数据到 HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path("hdfs://path/to/output"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(rollingPolicy).build();// 将数据流写入到文件stream.addSink(sink);env.execute("Flink Rolling Policy Example");}
}

在上面的代码中,我们为 FileSink 设置了基于文件大小和时间的滚动策略。文件大小超过 100MB 或者每 60 秒就会滚动一次,确保输出文件不会无限增大。

6. 滚动策略的选择与最佳实践

  • 基于文件大小的滚动:适用于文件内容量预期较为稳定且文件大小有上限要求的情况。如果数据量大或变动较小,可以选择文件大小滚动策略。

  • 基于时间的滚动:适用于对时间敏感的数据处理需求,比如日志数据、定时任务的输出等。基于时间滚动策略通常有固定的时间间隔,适合实时性要求高的场景。

  • 基于事件数的滚动:适用于处理事件生成速率不确定,但希望文件滚动与事件数量挂钩的情况。比如,高速日志记录系统或事件驱动系统。

7. 总结

Flink 的滚动策略(Rolling Policy)是一个非常重要的功能,尤其在处理大量数据输出时,能帮助管理文件的大小、滚动周期和数据的合理分配。通过合理配置 RollingPolicy,开发者可以灵活地管理输出文件,提升系统的可扩展性和存储效率。选择合适的滚动策略可以根据数据量、时间需求以及事件生成的速率来制定最合适的策略。

相关文章:

Flink 中的滚动策略(Rolling Policy)

在 Apache Flink 中&#xff0c;滚动策略&#xff08;Rolling Policy&#xff09;是针对日志&#xff08;或数据流&#xff09;文件输出的一种管理策略&#xff0c;它决定了在日志文件的大小、时间或其他条件满足特定标准时&#xff0c;如何“滚动”生成新的日志文件。滚动策略…...

GPU和FPGA的区别

GPU&#xff08;Graphics Processing Unit&#xff0c;图形处理器&#xff09;和 FPGA&#xff08;Field-Programmable Gate Array&#xff0c;现场可编程门阵列&#xff09;不是同一种硬件。 我的理解是&#xff0c;虽然都可以用于并行计算&#xff0c;但是GPU是纯计算的硬件…...

网易云音乐分布式KV存储实践与演进

随着网易云音乐业务的快速发展&#xff0c;推荐和搜索场景对分布式KV存储的需求日益增长。本文将深入探讨网易云音乐在分布式KV存储方面的实践和演进&#xff0c;分析其技术选型、架构设计以及未来发展方向。 一、业务背景 网易云音乐的业务场景对分布式KV存储提出了高并发、…...

WordPress平台如何接入Deepseek,有效提升网站流量

深夜改代码到崩溃&#xff1f;《2024全球CMS生态报告》揭露&#xff1a;78%的WordPress站长因API对接复杂&#xff0c;错失AI内容红利。本文实测「零代码接入Deepseek」的保姆级方案&#xff0c;配合147SEO的智能发布系统&#xff0c;让你用3个步骤实现日均50篇EEAT合规内容自动…...

【嵌入式】STM32内部NOR Flash磨损平衡与掉电保护总结

1. NOR Flash与NAND Flash 先deepseek看结论&#xff1a; 特性Nor FlashNAND Flash读取速度快&#xff08;支持随机访问&#xff0c;直接执行代码&#xff09;较慢&#xff08;需按页顺序读取&#xff09;写入/擦除速度慢&#xff08;擦除需5秒&#xff0c;写入需逐字节操作&…...

什么是磁盘阵列(RAID)?如何提高磁盘阵列的性能

什么是磁盘阵列 ‌磁盘阵列&#xff08;RAID&#xff09;是一种将多个独立的硬盘组合成一个逻辑存储单元的技术&#xff0c;旨在提高数据存储的性能、容量、可靠性和冗余性‌。‌磁盘阵列通过将数据分割成多个区段并分别存储在不同的硬盘上&#xff0c;利用个别磁盘提供数据加…...

轻量级日志管理平台Grafana Loki

文章目录 轻量级日志管理平台Grafana Loki背景什么是Loki为什么使用 Grafana Loki&#xff1f;架构Log Storage Grafana部署使用基于 Docker Compose 安装 LokiMinIO K8s集群部署Loki采集Helm 部署方式和案例 参考 轻量级日志管理平台Grafana Loki 背景 在微服务以及云原生时…...

k8s集群部署

集群结构 角色IPmaster192.168.35.135node1192.168.35.136node2192.168.35.137 部署 #需在三台主机上操作 //关闭防火墙 [rootmaster ~]# systemctl disable --now firewalld//关闭selinux [rootmaster ~]# sed -i s/enforcing/disabled/ /etc/selinux/config//关闭swap分区…...

STM32MP157A-FSMP1A单片机移植Linux系统SPI总线驱动

SPI总线驱动整体上与I2C总线驱动类型&#xff0c;差别主要在设备树和数据传输上&#xff0c;由于SPI是由4根线实现主从机的通信&#xff0c;在设备树上配置时需要对SPI进行设置。 原理图可知&#xff0c;数码管使用的SPI4对应了单片机上的PE11-->SPI4-NSS,PE12-->SPI4-S…...

系统基础与管理(2025更新中)

‌一、Linux 核心架构与组件‌ ‌内核架构‌ ‌核心职责‌&#xff1a; 管理进程生命周期、内存分配、硬件驱动交互及文件系统操作。 模块化设计支持动态加载硬件驱动&#xff08;如modprobe加载内核模块&#xff09;&#xff0c;提升灵活性和扩展性。 ‌内存管理‌&#xff1a…...

Python--内置函数与推导式(下)

3. 内置函数 数学运算类 函数说明示例​abs​绝对值​abs(-10) → 10​​pow​幂运算​pow(2, 3) → 8​​sum​求和​sum([1,2,3]) → 6​​divmod​返回商和余数​divmod(10, 3) → (3, 1)​ 数据转换类 # 进制转换 print(bin(10)) # 0b1010 print(hex(255)) # 0x…...

可狱可囚的爬虫系列课程 14:10 秒钟编写一个 requests 爬虫

一、前言 当重复性的工作频繁发生时&#xff0c;各种奇奇怪怪提高效率的想法就开始萌芽了。当重复代码的模块化封装已经不能满足要求的时候&#xff0c;更高效的方式就被揭开了神秘的面纱。本文基于这样的想法&#xff0c;来和大家探讨如何 10 秒钟编写一个 requests 爬虫程序。…...

Windows golang安装和环境配置

【1】、golang 1.19 sdk下载 https://download.csdn.net/download/notfindjob/90422529 【2】、安装 【3】、配置 GOPATH目录 【4】、LiteIDE下载安装 https://download.csdn.net/download/notfindjob/90422580 【5】、打开LiteIDE&#xff0c;选择查看->管理GOPATH&…...

IP-------GRE和MGRE

4.GRE和MGRE 1.应用场景 现实场景 居家工作&#xff0c;公司工作&#xff0c;分公司工作----------需要传输交换数据--------NAT---在该场景中需要两次NAT&#xff08;不安全&#xff09; 为了安全有两种手段-----1.物理专线---成本高 2.VPN--虚拟专用网---隧道技术--封装技…...

LabVIEW形状误差测量系统

在机械制造领域&#xff0c;形状与位置公差&#xff08;GD&T&#xff09;直接影响装配精度与产品寿命。国内中小型机加工企业因形状误差导致的返工率高达12%-18%。传统测量方式存在以下三大痛点&#xff1a; ​ 设备局限&#xff1a;机械式千分表需人工读数&#xff0c;精度…...

django校园互助平台~源码

博主介绍&#xff1a;✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…...

Vue进阶之AI智能助手项目(五)——ChatGPT的调用和开发

AI智能助手项目 前端页面Layout布局页面-viewssrc/views/chat/layout/Permission.vuesrc/views/chat/layout/sider/index.vuesrc/views/chat/layout/sider/List.vuesrc/views/chat/layout/sider/Footer.vueComponents 组件Header/index.vueMessage/index.vue前端页面 Layout布…...

Jenkins重启后Maven的Project加载失败

个人博客地址&#xff1a;Jenkins重启后Maven的Project加载失败 | 一张假钞的真实世界 Jenkins重启后发现Maven的项目都没有正常加载。检查Jenkins的启动日志发现以下错误信息&#xff1a; java.io.IOException: Unable to read /home/jenkins/.jenkins/jobs/test-maven/conf…...

【docker】docker pull拉取中不断重复下载问题,解决方案之一,磁盘空间扩容

问题类似这样 存储空间不足 如果 Docker 运行环境的磁盘空间不足&#xff0c;拉取的镜像可能会被自动清理&#xff0c;导致重复下载。可以检查磁盘使用情况&#xff1a; df -h docker system df如果空间不足&#xff0c;可以清理一些不用的容器和镜像&#xff1a; docker sy…...

Ubuntu指令(一)

一、终端操作指令 打开终端&#xff0c;有两种便捷方式&#xff1a; 直接点击系统中的终端按钮&#xff1b;使用快捷键ctrl alt T。 关闭终端&#xff0c;同样有多种选择&#xff1a; 在终端输入exit指令&#xff1b;使用快捷键ctrl d&#xff1b;直接点击终端窗口的关闭…...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...

MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用

文章目录 一、背景知识&#xff1a;什么是 B-Tree 和 BTree&#xff1f; B-Tree&#xff08;平衡多路查找树&#xff09; BTree&#xff08;B-Tree 的变种&#xff09; 二、结构对比&#xff1a;一张图看懂 三、为什么 MySQL InnoDB 选择 BTree&#xff1f; 1. 范围查询更快 2…...

在 Spring Boot 中使用 JSP

jsp&#xff1f; 好多年没用了。重新整一下 还费了点时间&#xff0c;记录一下。 项目结构&#xff1a; pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...