【Flink快速入门-5.流处理之多流转换算子】
流处理之多流转换算子
实验介绍
前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子流中处理不同的业务逻辑。所以本节实验的内容我们将学习 DataSteam API 中的可以将多条输入流合并为一个输入流,或者将一个输入流分割为多个子流的算子,我们将其统称为“多流转换算子”。
知识点
- Union
- filter
算子演示
Union
union 顾名思义就是连接的意思,所以 union 算子的作用就是合并两条或者多条相同类型的 DataStream,生成一个新的类型相同的 DataStream。如图所示:

需要注意的是,事件合流的方式为 FIFO 方式。操作符并不会产生一个特定顺序的事件流。union 操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。
假设某公司分别在淘宝和天猫都开设了自己的直营店,公司高层需要实时监控到两个店铺的交易数据,并希望通过大屏展示的方式实时滚动。我们可以通过两条 Socket 输入流来模拟这样的场景。
首先在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个 UnionOperator 的 Scala object,输入如下代码:
package com.vlab.operatorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object UnionOperator {def main(args: Array[String]): Unit = {// 创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 接收京东订单val jdOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9999)// 接收拼刀刀订单val pindaoOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9998)// 将两条输入流合并为一条输入流val unionStream:DataStream[String] = jdOrder.union(pindaoOrder)// 设置并行度unionStream.print().setParallelism(1)// 执行env.execute("UnionOperator")}}
我们使用 netcat 监控两个端口来模拟发送淘宝和天猫的订单信息,然后使用 Flink 接收。打开终端窗口,执行 nc -l -p 9998 命令,紧接着打开另一个终端窗口,执行 nc -l -p 9999 命令。这样的话我们监控了 9998 和 9999 两个端口,接下来在 Flink 中进行接收。
运行刚刚的代码,然后在前面打开的两个终端中交替发送订单数据,观察 idea 控制台输出。

filter
使用 filter 来根据体温的阈值将流拆分为两个子流:一个是正常体温流,另一个是发烧体温流。然后我们可以对每个子流进行不同的业务逻辑处理。
疫情期间,全国各地的超市、医院、机场等公共场所入口都有温度监控设备,当该设备检测到某个人体温异常之后就会报警。假设鉴别正常体温和发烧体温的阈值为 36.0 摄氏度,也就是说,只要体温大于等于 36.0 摄氏度我们就认为其为发烧状态。我们使用 检测体温是否异常,我们可以使用 filter 来将流分为两条子流,一个代表 正常体温,另一个代表 发烧体温,然后可以对这些流进行不同的业务逻辑处理。
在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个名为 SelectOperator 的 Scala object,代码如下:
package com.shiyanlou.operatorimport org.apache.flink.streaming.api.scala._object SelectOperator {def main(args: Array[String]): Unit = {// 设置流环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 读取socket文本数据流val inputDS: DataStream[String] = env.socketTextStream("192.168.137.81", 9999)val peopleStream = inputDS.map(line => {val arr = line.split(" ")People(arr(0), arr(1).toFloat)})// 使用 keyBy 按照温度类型(high 或 normal)进行分组val highTempStream = peopleStream.filter(_.temperature > 36.5)val normalTempStream = peopleStream.filter(_.temperature <= 36.5)// 打印输出highTempStream.print("发烧")normalTempStream.print("体温正常")env.execute("SelectOperator")}case class People(name: String, temperature: Float)
}
上面的代码中,我们创建了一个 Socket 输入流监控localhost下的 9999 端口,然后将输入的文本使用空格分隔之后转换为People类。紧接着使用 Split 算子将体温大于 36.0 的人群定义为fever,将体温小于等于 36.0 的人群定义为normal,最后使用select算子选择了fever(发烧)状态的人群并输出到控制台。
打开终端,执行nc -l -p 9999,在 idea 运行以上代码,并在终端中依次发送下面的信息:
张小明 35.6
李鹏程 36.3
赵露 36.7
李阳 35.5
刘明 37.0
在 idea 的控制台会看到将体温高于 36.5 的做了打印(赵露、刘明)。

实验总结
本节实验中我们介绍了 Flink 中的多流转换算子,其中 Union 是将两个或者多个类型相同的输入流转换成一个输入流,而filter是将一个输入流根据给定的条件切分成多个子输入流。这部分内容在工作中会经常用到,大家一定要理解。
相关文章:
【Flink快速入门-5.流处理之多流转换算子】
流处理之多流转换算子 实验介绍 前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子…...
react传递函数与回调函数原理
为什么 React 允许直接传递函数? 回调函数核心逻辑 例子:父组件控制 Modal 的显示与隐藏 // 父组件 (ParentComponent.tsx) import React, { useState } from react; import { Modal, Button } from antd; import ModalContent from ./ModalContent;co…...
华为云kubernetes基于keda自动伸缩deployment副本(监听redis队列长度)
1 概述 KEDA(Kubernetes-based Event-Driven Autoscaler,网址是https://keda.sh)是在 Kubernetes 中事件驱动的弹性伸缩器,功能非常强大。不仅支持根据基础的CPU和内存指标进行伸缩,还支持根据各种消息队列中的长度、…...
Spring源码分析のBean扫描流程
文章目录 前言一、scanCandidateComponents1.1 isCandidateComponent1.1.1、排除/包含过滤器1.1.2、条件装配1.1.3、重载一1.1.4、重载二1.1.5、补充:Lookup注解 总结 前言 原生的Spring在构造ApplicationContext时,会调用refresh方法。其中就包含了扫描…...
Ubuntu安装docker:docker-desktop : 依赖: docker-ce-cli 但无法安装它、无法定位软件包 docker-ce-cli
具体错误 sudo apt-get install ./docker-desktop-amd64.deb [sudo] password for weiyu: 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 注意,选中 docker-desktop 而非 ./docker-desktop-amd64.de…...
基于大数据的奥运会获奖数据分析系统设计与实现
【大数据】基于大数据的奥运会获奖数据分析系统设计与实现(完整系统源码开发笔记详细部署教程)✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 该系统通过集成先进的数据抓取、处理、存储与可视化技术,为深入理解奥运会…...
数据结构 堆和priority_queue
一、堆的定义 堆(heap),是⼀棵有着特殊性质的完全⼆叉树,可以⽤来实现优先级队列(priorityqueue)。 堆需要满⾜以下性质: 1. 是⼀棵完全⼆叉树; 2. 对于树中每个结点,如…...
Dockerfile 编写推荐
一、导读 本文主要介绍在编写 docker 镜像的时候一些需要注意的事项和推荐的做法。 虽然 Dockerfile 简化了镜像构建的过程,并且把这个过程可以进行版本控制,但是不正当的 Dockerfile 使用也会导致很多问题。 docker 镜像太大。如果你经常使用镜像或者…...
【抽象代数】1.2. 半群与群
群的定义 群非空集合二元运算性质 定义1. 设 为一个非空集合,上有二元运算,满足结合律,则称或为一个半群。 定义2. 设 为半群,若元素 满足 ,则称 为 的左幺元(右幺元:)&#…...
Django中实现简单易用的分页工具
如何在Django中实现简单易用的分页工具?📚 嗨,小伙伴们!今天我们来看看如何在 Django 中实现一个超简单的分页工具。无论你是在处理博客文章、产品列表,还是用户评论,当数据量一大时,分页显得尤…...
「软件设计模式」装饰者模式(Decorator)
深入解析装饰者模式:动态扩展功能的艺术(C实现) 一、模式思想与应用场景 1.1 模式定义 装饰者模式(Decorator Pattern)是一种结构型设计模式,它通过将对象放入包含行为的特殊封装对象中,动态地…...
CI/CD(二)docker-compose安装Jenkins
1、docker-compose.yml version: 3.8services:jenkins:image: jenkins/jenkins:lts # 使用官方的 Jenkins LTS 镜像container_name: jenkinsuser: root # 如果需要以 root 用户运行ports:- "8080:8080" # Jenkins Web 界面端口- "50000:50000" # 用于 Jen…...
OpenCV机器学习(1)人工神经网络 - 多层感知器类cv::ml::ANN_MLP
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::ml::ANN_MLP 是 OpenCV 库中的一部分,用于实现人工神经网络 - 多层感知器(Artificial Neural Network - Multi-Layer…...
ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群
ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群 一、PolarDB-X标准版主备集群搭建 三台机器上传 polardbx 包,包可以从官网https://openpolardb.com/download获取,这里提供离线rpm。 1、上传 polardbx 安装包 到 /opt目录下 rpm -ivh t-pol…...
15.1 Process(进程)类
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 通常开发时想要获得进程是比较困难的事,必须要调用CreateToolhelpSnapshot、ProcessFirst、ProcessNext等API或者诸如 Zw…...
elasticsearch8 linux版以服务的方式启动
1.创建系统服务文件 对于使用 systemd 作为系统初始化系统的 Linux 发行版(如 CentOS 7 及以上、Ubuntu 16.04 及以上),需要创建一个 systemd 服务文件。以 root 用户或具有 sudo 权限的用户身份执行以下操作: sudo vim /etc/sy…...
小米 R3G 路由器刷机教程(Pandavan)
小米 R3G 路由器刷机教程(Pandavan) 一、前言 小米 R3G 路由器以其高性价比和稳定的性能备受用户青睐。然而,原厂固件的功能相对有限,难以满足高级用户的个性化需求。刷机不仅可以解锁路由器的潜能,还能通过第三方固…...
某大型业务系统技术栈介绍【应对面试】
微服务架构【图】 微服务架构【概念】 微服务架构,是一种架构模式,它提倡将单一应用程序划分成一组小的服务,服务之间互相协调、互相配合,为用户提供最终价值。在微服务架构中,服务与服务之间通信时,通常是…...
【区块链】零知识证明基础概念详解
🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 💫个人格言: "如无必要,勿增实体" 文章目录 零知识证明基础概念详解引言1. 零知识证明的定义与特性1.1 基本定义1.2 三个核心…...
建筑行业安全技能竞赛流程方案
一、比赛时间: 6月23日8:30分准时到场;9:00-10:00理论考试;10:10-12:00现场隐患答疑;12:00-13:30午餐;下午13:30-15:30现场…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
多场景 OkHttpClient 管理器 - Android 网络通信解决方案
下面是一个完整的 Android 实现,展示如何创建和管理多个 OkHttpClient 实例,分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
今日科技热点速览
🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...
群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...
关于easyexcel动态下拉选问题处理
前些日子突然碰到一个问题,说是客户的导入文件模版想支持部分导入内容的下拉选,于是我就找了easyexcel官网寻找解决方案,并没有找到合适的方案,没办法只能自己动手并分享出来,针对Java生成Excel下拉菜单时因选项过多导…...
