Flink Flink中的分流
一、什么是分流
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

二、基于filter算子的简单实现分流
其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));//.getExecutionEnvironment();//TODO 设置全局并行度为2streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO 先将输入流转为Integer类型SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {int i = Integer.parseInt(input);return i;});//TODO 使用匿名函数分流偶数流SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a % 2 == 0;}});//TODO 使用lamda表达式分流奇数流SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);ds1.print("偶数流");ds2.print("奇数流");streamExecutionEnvironment.execute();}
}
执行结果
奇数流:1> 1
偶数流:2> 2
偶数流:1> 2
偶数流:2> 4
奇数流:1> 3
奇数流:2> 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
三、使用测输出流
关于处理函数中侧输出流的用法,我们已经在 7.5 节做了详细介绍。简单来说,只需要调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。
相关文章:
Flink Flink中的分流
一、什么是分流 所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。 二、基于filter算子的简单实现分流 其实根据条件筛选数据的需求…...
传输层协议[精选]
网络: 跨主机通信. 互联网通信: 两点之间的通信路径有无数条. 集线器: 把一根网线差出来两根,但是同一时刻只能有一根线跑.交换机: 组建局域网.路由器: 本质就是将两个局域网连接起来 交换机和路由器之间的区别越来越模糊. 调制解调器: 使用电话线上网的时候,需要将电话线的模…...
LeetCode算法题解|474. 一和零
474. 一和零 题目链接:474. 一和零 题目描述 给你一个二进制字符串数组 strs 和两个整数 m 和 n 。 请你找出并返回 strs 的最大子集的长度,该子集中 最多 有 m 个 0 和 n 个 1 。 如果 x 的所有元素也是 y 的元素,集合 x 是集合 y 的 子…...
一种太阳能风能市电互补路灯方案介绍
太阳能市电互补路灯是一种环保、节能的照明设施,它利用太阳能进行发电并实现照明。这种路灯在白天吸收阳光并将其转化为电能,到了晚上则利用储存的电能为LED灯提供电力,实现照明功能。下面叁仟智慧将详细介绍太阳能市电互补路灯灯的工作原理和…...
世微 dc-dc降压恒流 LED汽车大灯 单灯 14V5A 68W车灯驱动方案 AP5191
产品描述 AP5191是一款PWM工作模式,高效率、外围简单、外置功率MOS管,适用于4.5-150V输入的高精度降压LED恒流驱动芯片。输出最大功率150W,最大电流6A。AP5191可实现线性调光和PWM调光,线性调光脚有效电压范围0.55-2.6V.AP5191 工作频率可以…...
基于时隙的多重冗余流指纹模型
文章信息 论文题目:基于时隙的多重冗余流指纹模型 期刊(会议):网络与信息安全学报 时间:2023 级别:CCF C 概述 为确保内生网络流量安全可信,本文在研究流水印及其扩展的流指纹机制的基础上&a…...
Visual Studio 2019 C# System.BadImageFormatException 解决方法
文章目录 1.DLL文件缺失或不匹配原因解决方法 2.系统环境变量Path下内容过多原因解决方法 3.位数错误原因解决方法 分析几种可能因素 1.DLL文件缺失或不匹配 原因 检查对应Debug路径下的DLL文件是否有缺失 解决方法 将对应的DLL文件放到Debug文件夹里面,检查冗余…...
深度学习之基于YoloV5车辆和行人目标检测系统
欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介YOLOv5 简介YOLOv5 特点 车辆和行人目标检测系统 二、功能三、系统四. 总结 一项目简介 # 深度学习之基于 YOLOv5 车辆和行人目标检测系统介绍 深度学习在…...
Django框架之中间件
目录 一、引入 二、Django中间件介绍 【1】什么是Django中间件 【2】Django中间件的作用 【3】示例 三、Django请求生命周期流程图 四、Django中间件是Django的门户 五、Django中间件详解 六、中间件必须要掌握的两个方法 (1) process_request (2) process_respon…...
BTC 复兴:Ordinals 带来创新活力,BitVM 与 BitStream 相继问世
除了备受瞩目的 ETF,今年 Bitcoin 生态迎来全新的发展活力和机遇。Ordinals 协议的横空出世,以此为基础诞生的 BRC20 协议给整个比特币生态带去了一波新的能量,迎来铭文热度高涨。而诸如 BitVM、BitStream 等新技术甫一问世,便引发…...
STM32 CAN协议讲解以及代码
STM32 CAN 文章目录 STM32 CAN前言一、CAN外设1.主控制寄存器CAN_MCR2.位时序寄存器CAN_BTR3.CAN的发送邮箱4.CAN的接收FIFO5.验收筛选器 二、代码配置1.初始化2.发送数据3.接收数据4.main.c 前言 前面学习了CAN的一些理论知识,他在我们的STM32里面是怎么用的呢 前…...
京东数据分析(京东大数据):2023年10月京东手机行业品牌销售排行榜
鲸参谋监测的京东平台10月份手机市场销售数据已出炉! 根据鲸参谋平台的数据显示,今年10月份,京东平台手机行业的销量约340万,环比增长约11%,同比则下滑约2%;销售额为108亿,环比增长约17%&#x…...
计算机毕业设计 基于Hadoop的物品租赁系统的设计与实现 Java实战项目 附源码+文档+视频讲解
博主介绍:✌从事软件开发10年之余,专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ 🍅文末获取源码联系🍅 👇🏻 精…...
pop链反序列化 [MRCTF2020]Ezpop1
打开题目 网站源码为 Welcome to index.php <?php //flag is in flag.php //WTF IS THIS? //Learn From https://ctf.ieki.xyz/library/php.html#%E5%8F%8D%E5%BA%8F%E5%88%97%E5%8C%96%E9%AD%94%E6%9C%AF%E6%96%B9%E6%B3%95 //And Crack It! class Modifier {protected …...
yolov5从英伟达平台移植到华为昇腾开发板上的思路
作者:朱金灿 来源:clever101的专栏 为什么大多数人学不会人工智能编程?>>> 最近需要将yolov5代码从英伟达平台移植到华为昇腾开发板上。搜了一些代码和资料,大致明白了二者的差别。 1.二者使用的模型文件不一样 yolov…...
网络运维与网络安全 学习笔记2023.11.25
网络运维与网络安全 学习笔记 第二十六天 今日目标 ACL原理与类型、基本ACL配置、高级ACL配置 高级ACL之ICMP、高级ACL之telnet ACL原理与类型 项目背景 为了企业的业务安全,要求不同部门对服务器有不同的权限 PC1不能访问Server PC2允许访问Server 允许其他所…...
Trustzone/TEE/安全 面试100问
关键词:cache学习、mmu学习、cache资料、mmu资料、arm资料、armv8资料、armv9资料、 trustzone视频、tee视频、ATF视频、secureboot视频、安全启动视频、selinux视频,cache视频、mmu视频,armv8视频、armv9视频、FF-A视频、密码学视频、RME/CCA视频、学习资料下载、免费学习资…...
【数据结构】D : 图的顶点可达闭包
D : 图的顶点可达闭包 Description 给定有向图的邻接矩阵A,其元素定义为:若存在顶点i到顶点j的有向边则A[i,j]1,若没有有向边则A[i,j] 0。试求A的可达闭包矩阵A*,其元素定义为:若存在顶点i到顶点j的有向路径则A*[i,j…...
链表?细!详细知识点总结!
链表 定义:链表是一种递归的数据结构,它或者为空(null),或者是指向一个结点(node)的引用,该结点含有一个泛型的元素和一个指向另一条链表的引用。 其实链表就是有序的列表,它在内…...
【数据结构实验】排序(三)快速排序算法的改进(三者取中法)
文章目录 1. 引言2. 快速排序算法2.1 传统快速排序2.2 三者取中法 3. 实验内容3.1 实验题目(一)输入要求(二)输出要求 3.2 算法实现 4. 实验结果 1. 引言 快速排序是一种经典的排序算法,其核心思想是通过选择一个基准元…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
shell脚本--常见案例
1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件: 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...
排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...
Vite中定义@软链接
在webpack中可以直接通过符号表示src路径,但是vite中默认不可以。 如何实现: vite中提供了resolve.alias:通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...
苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会
在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...
十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建
【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...
负载均衡器》》LVS、Nginx、HAproxy 区别
虚拟主机 先4,后7...
验证redis数据结构
一、功能验证 1.验证redis的数据结构(如字符串、列表、哈希、集合、有序集合等)是否按照预期工作。 2、常见的数据结构验证方法: ①字符串(string) 测试基本操作 set、get、incr、decr 验证字符串的长度和内容是否正…...
