当前位置: 首页 > news >正文

Flink Flink中的分流

一、什么是分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
在这里插入图片描述

二、基于filter算子的简单实现分流

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));//.getExecutionEnvironment();//TODO 设置全局并行度为2streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO 先将输入流转为Integer类型SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {int i = Integer.parseInt(input);return i;});//TODO 使用匿名函数分流偶数流SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a % 2 == 0;}});//TODO 使用lamda表达式分流奇数流SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);ds1.print("偶数流");ds2.print("奇数流");streamExecutionEnvironment.execute();}
}

执行结果

奇数流:1> 1
偶数流:2> 2
偶数流:1> 2
偶数流:2> 4
奇数流:1> 3
奇数流:2> 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

三、使用测输出流

关于处理函数中侧输出流的用法,我们已经在 7.5 节做了详细介绍。简单来说,只需要调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。

相关文章:

Flink Flink中的分流

一、什么是分流 所谓“分流”&#xff0c;就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream&#xff0c;定义一些筛选条件&#xff0c;将符合条件的数据拣选出来放到对应的流里。 二、基于filter算子的简单实现分流 其实根据条件筛选数据的需求…...

传输层协议[精选]

网络: 跨主机通信. 互联网通信: 两点之间的通信路径有无数条. 集线器: 把一根网线差出来两根,但是同一时刻只能有一根线跑.交换机: 组建局域网.路由器: 本质就是将两个局域网连接起来 交换机和路由器之间的区别越来越模糊. 调制解调器: 使用电话线上网的时候,需要将电话线的模…...

LeetCode算法题解|474. 一和零

474. 一和零 题目链接&#xff1a;474. 一和零 题目描述 给你一个二进制字符串数组 strs 和两个整数 m 和 n 。 请你找出并返回 strs 的最大子集的长度&#xff0c;该子集中 最多 有 m 个 0 和 n 个 1 。 如果 x 的所有元素也是 y 的元素&#xff0c;集合 x 是集合 y 的 子…...

一种太阳能风能市电互补路灯方案介绍

太阳能市电互补路灯是一种环保、节能的照明设施&#xff0c;它利用太阳能进行发电并实现照明。这种路灯在白天吸收阳光并将其转化为电能&#xff0c;到了晚上则利用储存的电能为LED灯提供电力&#xff0c;实现照明功能。下面叁仟智慧将详细介绍太阳能市电互补路灯灯的工作原理和…...

世微 dc-dc降压恒流 LED汽车大灯 单灯 14V5A 68W车灯驱动方案 AP5191

产品描述 AP5191是一款PWM工作模式,高效率、外围简单、外置功率MOS管&#xff0c;适用于4.5-150V输入的高精度降压LED恒流驱动芯片。输出最大功率150W&#xff0c;最大电流6A。AP5191可实现线性调光和PWM调光&#xff0c;线性调光脚有效电压范围0.55-2.6V.AP5191 工作频率可以…...

基于时隙的多重冗余流指纹模型

文章信息 论文题目&#xff1a;基于时隙的多重冗余流指纹模型 期刊&#xff08;会议&#xff09;&#xff1a;网络与信息安全学报 时间&#xff1a;2023 级别&#xff1a;CCF C 概述 为确保内生网络流量安全可信&#xff0c;本文在研究流水印及其扩展的流指纹机制的基础上&a…...

Visual Studio 2019 C# System.BadImageFormatException 解决方法

文章目录 1.DLL文件缺失或不匹配原因解决方法 2.系统环境变量Path下内容过多原因解决方法 3.位数错误原因解决方法 分析几种可能因素 1.DLL文件缺失或不匹配 原因 检查对应Debug路径下的DLL文件是否有缺失 解决方法 将对应的DLL文件放到Debug文件夹里面&#xff0c;检查冗余…...

深度学习之基于YoloV5车辆和行人目标检测系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介YOLOv5 简介YOLOv5 特点 车辆和行人目标检测系统 二、功能三、系统四. 总结 一项目简介 # 深度学习之基于 YOLOv5 车辆和行人目标检测系统介绍 深度学习在…...

Django框架之中间件

目录 一、引入 二、Django中间件介绍 【1】什么是Django中间件 【2】Django中间件的作用 【3】示例 三、Django请求生命周期流程图 四、Django中间件是Django的门户 五、Django中间件详解 六、中间件必须要掌握的两个方法 (1) process_request (2) process_respon…...

BTC 复兴:Ordinals 带来创新活力,BitVM 与 BitStream 相继问世

除了备受瞩目的 ETF&#xff0c;今年 Bitcoin 生态迎来全新的发展活力和机遇。Ordinals 协议的横空出世&#xff0c;以此为基础诞生的 BRC20 协议给整个比特币生态带去了一波新的能量&#xff0c;迎来铭文热度高涨。而诸如 BitVM、BitStream 等新技术甫一问世&#xff0c;便引发…...

STM32 CAN协议讲解以及代码

STM32 CAN 文章目录 STM32 CAN前言一、CAN外设1.主控制寄存器CAN_MCR2.位时序寄存器CAN_BTR3.CAN的发送邮箱4.CAN的接收FIFO5.验收筛选器 二、代码配置1.初始化2.发送数据3.接收数据4.main.c 前言 前面学习了CAN的一些理论知识&#xff0c;他在我们的STM32里面是怎么用的呢 前…...

京东数据分析(京东大数据):2023年10月京东手机行业品牌销售排行榜

鲸参谋监测的京东平台10月份手机市场销售数据已出炉&#xff01; 根据鲸参谋平台的数据显示&#xff0c;今年10月份&#xff0c;京东平台手机行业的销量约340万&#xff0c;环比增长约11%&#xff0c;同比则下滑约2%&#xff1b;销售额为108亿&#xff0c;环比增长约17%&#x…...

计算机毕业设计 基于Hadoop的物品租赁系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…...

pop链反序列化 [MRCTF2020]Ezpop1

打开题目 网站源码为 Welcome to index.php <?php //flag is in flag.php //WTF IS THIS? //Learn From https://ctf.ieki.xyz/library/php.html#%E5%8F%8D%E5%BA%8F%E5%88%97%E5%8C%96%E9%AD%94%E6%9C%AF%E6%96%B9%E6%B3%95 //And Crack It! class Modifier {protected …...

yolov5从英伟达平台移植到华为昇腾开发板上的思路

作者&#xff1a;朱金灿 来源&#xff1a;clever101的专栏 为什么大多数人学不会人工智能编程&#xff1f;>>> 最近需要将yolov5代码从英伟达平台移植到华为昇腾开发板上。搜了一些代码和资料&#xff0c;大致明白了二者的差别。 1.二者使用的模型文件不一样 yolov…...

网络运维与网络安全 学习笔记2023.11.25

网络运维与网络安全 学习笔记 第二十六天 今日目标 ACL原理与类型、基本ACL配置、高级ACL配置 高级ACL之ICMP、高级ACL之telnet ACL原理与类型 项目背景 为了企业的业务安全&#xff0c;要求不同部门对服务器有不同的权限 PC1不能访问Server PC2允许访问Server 允许其他所…...

Trustzone/TEE/安全 面试100问

关键词:cache学习、mmu学习、cache资料、mmu资料、arm资料、armv8资料、armv9资料、 trustzone视频、tee视频、ATF视频、secureboot视频、安全启动视频、selinux视频,cache视频、mmu视频,armv8视频、armv9视频、FF-A视频、密码学视频、RME/CCA视频、学习资料下载、免费学习资…...

【数据结构】D : 图的顶点可达闭包

D : 图的顶点可达闭包 Description 给定有向图的邻接矩阵A&#xff0c;其元素定义为&#xff1a;若存在顶点i到顶点j的有向边则A[i,j]1&#xff0c;若没有有向边则A[i,j] 0。试求A的可达闭包矩阵A*&#xff0c;其元素定义为&#xff1a;若存在顶点i到顶点j的有向路径则A*[i,j…...

链表?细!详细知识点总结!

链表 定义&#xff1a;链表是一种递归的数据结构&#xff0c;它或者为空&#xff08;null)&#xff0c;或者是指向一个结点&#xff08;node&#xff09;的引用&#xff0c;该结点含有一个泛型的元素和一个指向另一条链表的引用。 ​ 其实链表就是有序的列表&#xff0c;它在内…...

【数据结构实验】排序(三)快速排序算法的改进(三者取中法)

文章目录 1. 引言2. 快速排序算法2.1 传统快速排序2.2 三者取中法 3. 实验内容3.1 实验题目&#xff08;一&#xff09;输入要求&#xff08;二&#xff09;输出要求 3.2 算法实现 4. 实验结果 1. 引言 快速排序是一种经典的排序算法&#xff0c;其核心思想是通过选择一个基准元…...

利用ngx_stream_return_module构建简易 TCP/UDP 响应网关

一、模块概述 ngx_stream_return_module 提供了一个极简的指令&#xff1a; return <value>;在收到客户端连接后&#xff0c;立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量&#xff08;如 $time_iso8601、$remote_addr 等&#xff09;&a…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

排序算法总结(C++)

目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指&#xff1a;同样大小的样本 **&#xff08;同样大小的数据&#xff09;**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...

Vite中定义@软链接

在webpack中可以直接通过符号表示src路径&#xff0c;但是vite中默认不可以。 如何实现&#xff1a; vite中提供了resolve.alias&#xff1a;通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...

苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会

在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...

十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建

【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...

负载均衡器》》LVS、Nginx、HAproxy 区别

虚拟主机 先4&#xff0c;后7...

验证redis数据结构

一、功能验证 1.验证redis的数据结构&#xff08;如字符串、列表、哈希、集合、有序集合等&#xff09;是否按照预期工作。 2、常见的数据结构验证方法&#xff1a; ①字符串&#xff08;string&#xff09; 测试基本操作 set、get、incr、decr 验证字符串的长度和内容是否正…...