当前位置: 首页 > news >正文

FLINK 分流

在Apache Flink中,分流(Stream Splitting)是指将一条数据流拆分成完全独立的两条或多条流的过程。这通常基于一定的筛选条件,将符合条件的数据拣选出来并放入对应的流中。以下是关于Flink分流的详细解释:

一、分流方式

在这里插入图片描述

Flink提供了多种分流方式,以满足不同的数据处理需求:

  1. 基于filter的分流:
    • 这是最直接的分流方式,通过多次调用.filter()方法,将符合不同条件的数据筛选出来,形成不同的流。
    • 例如,可以将一个整数数据流拆分为奇数流和偶数流。
  2. 基于split的分流(已废弃):
    • 在早期的Flink版本中,.split()方法允许用户根据条件将数据流拆分为多个流。
    • 但由于该方法限制了数据类型转换,且随着Flink的发展,更灵活和高效的分流方式(如侧输出流)被引入,因此.split()方法已被废弃。
  3. 基于侧输出流(Side Output)的分流:
    • 侧输出流是Flink提供的一种更灵活和高效的分流方式。
    • 它允许用户在处理函数(如.process())中,根据条件将数据输出到不同的侧输出流中。
    • 使用侧输出流时,需要先定义输出标签(OutputTag),然后在处理函数中通过ctx.output()方法将数据写入对应的侧输出流。
    • 最后,可以通过getSideOutput()方法从侧输出流中获取数据。

三、内部机制

  1. 数据流的拆分:
    • 当数据流通过分流操作时,Flink会根据用户定义的筛选条件或处理函数,将数据元素分发到不同的子流中。
    • 这个过程通常是在Flink的算子(如filter算子、process算子)内部实现的,算子会根据输入数据的属性和条件来决定数据元素的去向。
  2. 子流的独立性:
    • 一旦数据流被拆分成多个子流,这些子流在后续的处理中就是相互独立的。
    • 用户可以对每个子流进行独立的操作和处理,如转换、聚合、窗口计算等。
  3. 资源的分配和调度:
    • Flink会根据任务的并行度和资源情况,动态地分配和调度资源来处理这些子流。
    • 这确保了每个子流都能得到足够的资源来处理数据,并且能够在满足性能要求的同时,尽可能地提高系统的吞吐量和效率。

四、应用场景

分流在Flink中有着广泛的应用场景,包括但不限于:

  • 数据路由:根据数据的某些属性(如用户ID、地区等)将数据路由到不同的处理路径上。
  • 异常检测:将正常数据和异常数据分开处理,以便对异常数据进行更详细的分析和处理。
  • 数据过滤:从原始数据流中筛选出符合特定条件的数据进行进一步处理。
  • 多版本处理:在处理数据升级或迁移时,将旧版本数据和新版本数据分开处理。

五、示例

1. filter分流

基于整数的奇偶性进行分流

import org.apache.flink.api.common.functions.FilterFunction;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.datastream.DataStreamSource;  
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class FlinkFilterSplitExample {  public static void main(String[] args) throws Exception {  // 创建Flink执行环境  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 从Socket接收数据流(这里假设Socket发送的是整数数据)  DataStreamSource<String> socketStream = env.socketTextStream("localhost", 9999);  // 将字符串数据流转换为整数数据流  SingleOutputStreamOperator<Integer> intStream = socketStream.map(Integer::valueOf);  // 使用filter算子进行分流:偶数流和奇数流  SingleOutputStreamOperator<Integer> evenStream = intStream.filter(new FilterFunction<Integer>() {  @Override  public boolean filter(Integer value) throws Exception {  return value % 2 == 0;  }  });  SingleOutputStreamOperator<Integer> oddStream = intStream.filter(new FilterFunction<Integer>() {  @Override  public boolean filter(Integer value) throws Exception {  return value % 2 != 0;  }  });  // 打印偶数流和奇数流  evenStream.print("Even Stream: ");  oddStream.print("Odd Stream: ");  // 执行Flink程序  env.execute("Flink Filter Split Example");  }  
}

说明:

  1. 创建执行环境:首先,我们创建了一个Flink的执行环境StreamExecutionEnvironment。
  2. 接收数据流:通过env.socketTextStream(“localhost”, 9999),我们从本地的9999端口接收一个文本数据流。这里假设发送的是整数数据的字符串表示。
  3. 数据类型转换:使用map算子,我们将接收到的字符串数据流转换为整数数据流。
  4. 分流操作:
    • 使用filter算子,我们根据整数的奇偶性将数据流拆分为偶数流和奇数流。
    • evenStream包含所有偶数,oddStream包含所有奇数。
  5. 打印结果:最后,我们使用print算子打印偶数流和奇数流的结果。
  6. 执行程序:通过调用env.execute(),我们启动了Flink程序。

2. split分流(已废弃)

基于传感器温度的split分流

import org.apache.flink.api.common.functions.OutputSelector;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.datastream.DataStreamSplit;  
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  // 传感器数据类  
class SensorReading {  String deviceNo;  long timestamp;  double temperature;  // 构造函数、getter和setter方法省略  
}  public class FlinkSplitExample {  public static void main(String[] args) throws Exception {  // 创建Flink执行环境  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 假设有一个数据源,这里使用一个简单的示例数据源  SingleOutputStreamOperator<SensorReading> sensorStream = env.fromElements(  new SensorReading("device1", 1610035289736L, 84.3),  new SensorReading("device2", 1610035371758L, 38.8),  // ... 其他传感器数据  );  // 使用split算子进行分流  DataStreamSplit<SensorReading> splitStream = sensorStream.split(new OutputSelector<SensorReading>() {  @Override  public Iterable<String> select(SensorReading sensorReading) {  ArrayList<String> output = new ArrayList<>();  if (sensorReading.temperature > 70.0) {  output.add("high");  } else {  output.add("low");  }  return output;  }  });  // 从SplitStream中选择出高温流和低温流  DataStream<SensorReading> highTempStream = splitStream.select("high");  DataStream<SensorReading> lowTempStream = splitStream.select("low");  // 打印结果  highTempStream.print("High Temperature Stream: ");  lowTempStream.print("Low Temperature Stream: ");  // 执行Flink程序  env.execute("Flink Split Example");  }  
}

3. 侧输出流(Side Output)分流

基于整数的奇偶性进行分流

import org.apache.flink.api.common.typeinfo.Types;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.datastream.DataStreamSource;  
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.functions.ProcessFunction;  
import org.apache.flink.util.Collector;  
import org.apache.flink.util.OutputTag;  
import org.apache.flink.api.common.functions.FilterFunction;  public class SplitStreamByOutputTag {  // 定义输出标签  private static final OutputTag<Integer> evenTag = new OutputTag<Integer>("even") {};  private static final OutputTag<Integer> oddTag = new OutputTag<Integer>("odd") {};  public static void main(String[] args) throws Exception {  // 创建Flink上下文环境  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  env.setParallelism(1);  // Source  DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8888);  // Transform  SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map(input -> {  int i = Integer.parseInt(input);  return i;  });  // Process and split  SingleOutputStreamOperator<Integer> processedStream = mapResult.process(new ProcessFunction<Integer, Integer>() {  @Override  public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {  if (value % 2 == 0) {  ctx.output(evenTag, value);  } else {  ctx.output(oddTag, value);  }  // 注意:这里不向主输出流输出任何数据,所有数据都通过侧输出流输出。  // 如果需要同时向主输出流输出数据,可以在else分支中添加 out.collect(value);  }  });  // 获取侧输出流并打印  DataStream<Integer> evenStream = processedStream.getSideOutput(evenTag);  DataStream<Integer> oddStream = processedStream.getSideOutput(oddTag);  evenStream.print("Even Stream: ");  oddStream.print("Odd Stream: ");  // 执行  env.execute();  }  
}

相关文章:

FLINK 分流

在Apache Flink中&#xff0c;分流&#xff08;Stream Splitting&#xff09;是指将一条数据流拆分成完全独立的两条或多条流的过程。这通常基于一定的筛选条件&#xff0c;将符合条件的数据拣选出来并放入对应的流中。以下是关于Flink分流的详细解释&#xff1a; 一、分流方式…...

从零开始:构建一个高效的开源管理系统——使用 React 和 Ruoyi-Vue-Plus 的实战指南

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…...

windows下pycharm社区版2024下载与安装(包含新建第一个工程)

windows下pycharm社区版2024下载与安装 下载pycharm pycharm官网 安装pycharm 1.进入官网 pycharm官网 下载 点击Download–>右侧Other versions 下载对应的社区版&#xff08;如下图&#xff09;&#xff1a;下载网址 2.点击运行下载好的安装包 点击下一步 3.更改pychar…...

重构案例:将纯HTML/JS项目迁移到Webpack

我们已经了解了许多关于 Webpack 的知识&#xff0c;但要完全熟练掌握它并非易事。一个很好的学习方法是通过实际项目练习。当我们对 Webpack 的配置有了足够的理解后&#xff0c;就可以尝试重构一些项目。本次我选择了一个纯HTML/JS的PC项目进行重构&#xff0c;项目位于 GitH…...

表格编辑demo

<el-form :model"form" :rules"status ? rules : {}" ref"form" class"form-container" :inline"true"><el-table :data"tableData"><el-table-column label"计算公式"><templat…...

企业自建邮件系统选U-Mail ,功能强大、安全稳定

在现代企业运营中&#xff0c;电子邮件扮演着至关重要的角色&#xff0c;随着企业规模的增长和业务的多样化&#xff0c;传统的租用第三方企业邮箱服务逐渐显现出其局限性。例如&#xff0c;存储空间受限、数据安全风险、缺乏灵活的管理和备份功能&#xff0c;以及无法与其他企…...

蓝桥杯题目理解

1. 一维差分 1.1. 小蓝的操作 1.1.1. 题目解析&#xff1a; 这道题提到了对于“区间”进行操作&#xff0c;而差分数列就是对于区间进行操作的好方法。 观察差分数列&#xff1a; 给定数列&#xff1a;1 3 5 2 7 1 差分数列&#xff1a;1 2 2 -3 5 6 题目要求把原数组全部…...

浪潮云启操作系统(InLinux)bcache缓存实践:理解OpenStack环境下虚拟机卷、Ceph OSD、bcache设备之间的映射关系

前言 在OpenStack平台上&#xff0c;采用bcache加速ceph分布式存储的方案被广泛用于企业和云环境。一方面&#xff0c;Ceph作为分布式存储系统&#xff0c;与虚拟机存储卷紧密结合&#xff0c;可以提供高可用和高性能的存储服务。另一方面&#xff0c;bcache作为混合存储方案&…...

通过ssh端口反向通道建立并实现linux系统的xrdp以及web访问

Content 1 问题描述2 原因分析3 解决办法3.1 安装x11以及gnome桌面环境查看是否安装x11否则使用下面指令安装x11组件查看是否安装gnome否则使用下面指令安装gnome桌面环境 3.2 安装xrdp使用下面指令安装xrdp&#xff08;如果安装了则跳过&#xff09;启动xrdp服务 3.3 远程服务…...

# 渗透测试#安全见闻8 量子物理面临的安全挑战

# 渗透测试#安全见闻8 量子物理面临的安全挑战 ##B站陇羽Sec## 量子计算原理与技术 量子计算是一种基于量子力学原理的计算方式&#xff0c;它利用量子位&#xff08;qubits&#xff09;来进行信息处理和计算…...

【rabbitmq】实现问答消息消费示例

目录 1. 说明2. 截图2.1 接口调用截图2.2 项目结构截图 3. 代码示例 1. 说明 1.实现的是一个简单的sse接口&#xff0c;单向的长连接&#xff0c;后端可以向前端不断输出数据。2.通过调用sse接口&#xff0c;触发rabbitmq向队列塞消息&#xff0c;向前端返回一个sseEmitter对象…...

单片机_RTOS__架构概念

经典单片机程序 void main() {while(1){函数1&#xff08;&#xff09;&#xff1b;函数2&#xff08;&#xff09;&#xff1b;}} 有无RTOS区别 裸机 RTOS RTOS程序 喂饭&#xff08;&#xff09; {while&#xff08;1&#xff09;{喂一口饭&#xff08;&#xff09;;} } …...

ClickHouse在百度MEG数据中台的落地和优化

导读 百度MEG上一代大数据产品存在平台分散、质量不均和易用性差等问题&#xff0c;导致开发效率低下、学习成本高&#xff0c;业务需求响应迟缓。为了解决这些问题&#xff0c;百度MEG内部开发了图灵3.0生态系统&#xff0c;包括Turing Data Engine(TDE)计算引擎、Turing Dat…...

B/S架构(Browser/Server)与C/S架构(Client/Server)

基本概念 B/S架构&#xff08;Browser/Server&#xff09;&#xff1a;即浏览器/服务器架构。在这种架构中&#xff0c;用户通过浏览器&#xff08;如Chrome、Firefox、Safari等&#xff09;访问服务器上的应用程序。服务器端负责处理业务逻辑、存储数据等核心功能&#xff0c;…...

idea中自定义注释模板语法

文章目录 idea 自定义模板语法1.自定义模板语法是什么&#xff1f;2.如何在idea中设置呢&#xff1f; idea 自定义模板语法 1.自定义模板语法是什么&#xff1f; 打开我的idea&#xff0c;创建一个测试类&#xff1a; 这里看到我的 test 测试类里面会有注释&#xff0c;这是怎…...

基于SSM的儿童教育网站【附源码】

基于SpringBoot的课程作业管理系统&#xff08;源码L文说明文档&#xff09; 目录 4 系统设计 4.1 系统概述 4.2 系统模块设计 4.3.3 数据库表设计 5 系统实现 5.1 管理员功能模块的实现 5.1.1 视频列表 5.1.2 文章信息管理 5.1.3 文章类…...

深挖自闭症病因与孩子表现的关联

自闭症&#xff0c;亦称为孤独症&#xff0c;乃是一种对儿童发展有着严重影响的神经发育障碍性疾病。深入探寻自闭症的病因与孩子表现之间的联系&#xff0c;对于更深刻地理解并助力自闭症儿童而言&#xff0c;可谓至关重要。 当前&#xff0c;自闭症的病因尚未完全明晰&#x…...

[网络协议篇] UDP协议

文章目录 1. 简介2. 特点3. UDP数据报结构4. 基于UDP的应用层协议5. UDP安全性问题6. 使用udp传输数据的系统就一定不可靠吗&#xff1f;7. 基于UDP的主机探活 python实现 1. 简介 User Datagram Protocol&#xff0c;用户数据报协议&#xff0c;基于IP协议提供面向无连接的网…...

关系型数据库(1)----MySQL(初阶)

目录 1.mysql 2.mysqld 3.mysql架构 1.连接层 2.核心服务层 3.存储引擎层 4.数据存储层 4.SQL分类 5.MySQL操作库 6.MySQL数据类型 1. 数值类型 2. 日期和时间类型 3. 字符串类型 4. 空间类型 5. JSON数据类型 7.MySQL表的约束 1. 主键约束&#xff08;PRIMARY…...

计算机毕业设计Python+大模型租房推荐系统 租房大屏可视化 租房爬虫 hadoop spark 58同城租房爬虫 房源推荐系统

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 用到的技术: 1. python…...

后进先出(LIFO)详解

LIFO 是 Last In, First Out 的缩写&#xff0c;中文译为后进先出。这是一种数据结构的工作原则&#xff0c;类似于一摞盘子或一叠书本&#xff1a; 最后放进去的元素最先出来 -想象往筒状容器里放盘子&#xff1a; &#xff08;1&#xff09;你放进的最后一个盘子&#xff08…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

Python爬虫实战:研究feedparser库相关技术

1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...

转转集团旗下首家二手多品类循环仓店“超级转转”开业

6月9日&#xff0c;国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解&#xff0c;“超级…...

C# SqlSugar:依赖注入与仓储模式实践

C# SqlSugar&#xff1a;依赖注入与仓储模式实践 在 C# 的应用开发中&#xff0c;数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护&#xff0c;许多开发者会选择成熟的 ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;SqlSugar 就是其中备受…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南&#xff1a;从入门到实战 一、Grunt 是什么&#xff1f; Grunt是一个基于 Node.js 的前端自动化任务运行器&#xff0c;主要用于自动化执行项目开发中重复性高的任务&#xff0c;例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

STM32HAL库USART源代码解析及应用

STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看

文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...

上位机开发过程中的设计模式体会(1):工厂方法模式、单例模式和生成器模式

简介 在我的 QT/C 开发工作中&#xff0c;合理运用设计模式极大地提高了代码的可维护性和可扩展性。本文将分享我在实际项目中应用的三种创造型模式&#xff1a;工厂方法模式、单例模式和生成器模式。 1. 工厂模式 (Factory Pattern) 应用场景 在我的 QT 项目中曾经有一个需…...