当前位置: 首页 > news >正文

【Flink快速入门-5.流处理之多流转换算子】

流处理之多流转换算子

实验介绍

前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子流中处理不同的业务逻辑。所以本节实验的内容我们将学习 DataSteam API 中的可以将多条输入流合并为一个输入流,或者将一个输入流分割为多个子流的算子,我们将其统称为“多流转换算子”。

知识点
  • Union
  • filter

算子演示

Union

union 顾名思义就是连接的意思,所以 union 算子的作用就是合并两条或者多条相同类型的 DataStream,生成一个新的类型相同的 DataStream。如图所示:
在这里插入图片描述

需要注意的是,事件合流的方式为 FIFO 方式。操作符并不会产生一个特定顺序的事件流。union 操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。

假设某公司分别在淘宝和天猫都开设了自己的直营店,公司高层需要实时监控到两个店铺的交易数据,并希望通过大屏展示的方式实时滚动。我们可以通过两条 Socket 输入流来模拟这样的场景。

首先在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个 UnionOperator 的 Scala object,输入如下代码:

package com.vlab.operatorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object UnionOperator {def main(args: Array[String]): Unit = {// 创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 接收京东订单val jdOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9999)// 接收拼刀刀订单val pindaoOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9998)// 将两条输入流合并为一条输入流val unionStream:DataStream[String] = jdOrder.union(pindaoOrder)// 设置并行度unionStream.print().setParallelism(1)// 执行env.execute("UnionOperator")}}

我们使用 netcat 监控两个端口来模拟发送淘宝和天猫的订单信息,然后使用 Flink 接收。打开终端窗口,执行 nc -l -p 9998 命令,紧接着打开另一个终端窗口,执行 nc -l -p 9999 命令。这样的话我们监控了 9998 和 9999 两个端口,接下来在 Flink 中进行接收。

运行刚刚的代码,然后在前面打开的两个终端中交替发送订单数据,观察 idea 控制台输出。
在这里插入图片描述

filter

使用 filter 来根据体温的阈值将流拆分为两个子流:一个是正常体温流,另一个是发烧体温流。然后我们可以对每个子流进行不同的业务逻辑处理。

疫情期间,全国各地的超市、医院、机场等公共场所入口都有温度监控设备,当该设备检测到某个人体温异常之后就会报警。假设鉴别正常体温和发烧体温的阈值为 36.0 摄氏度,也就是说,只要体温大于等于 36.0 摄氏度我们就认为其为发烧状态。我们使用 检测体温是否异常,我们可以使用 filter 来将流分为两条子流,一个代表 正常体温,另一个代表 发烧体温,然后可以对这些流进行不同的业务逻辑处理。

在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个名为 SelectOperator 的 Scala object,代码如下:

package com.shiyanlou.operatorimport org.apache.flink.streaming.api.scala._object SelectOperator {def main(args: Array[String]): Unit = {// 设置流环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 读取socket文本数据流val inputDS: DataStream[String] = env.socketTextStream("192.168.137.81", 9999)val peopleStream = inputDS.map(line => {val arr = line.split(" ")People(arr(0), arr(1).toFloat)})// 使用 keyBy 按照温度类型(high 或 normal)进行分组val highTempStream = peopleStream.filter(_.temperature > 36.5)val normalTempStream = peopleStream.filter(_.temperature <= 36.5)// 打印输出highTempStream.print("发烧")normalTempStream.print("体温正常")env.execute("SelectOperator")}case class People(name: String, temperature: Float)
}

上面的代码中,我们创建了一个 Socket 输入流监控localhost下的 9999 端口,然后将输入的文本使用空格分隔之后转换为People类。紧接着使用 Split 算子将体温大于 36.0 的人群定义为fever,将体温小于等于 36.0 的人群定义为normal,最后使用select算子选择了fever(发烧)状态的人群并输出到控制台。

打开终端,执行nc -l -p 9999,在 idea 运行以上代码,并在终端中依次发送下面的信息:

张小明 35.6
李鹏程 36.3
赵露 36.7
李阳 35.5
刘明 37.0

在 idea 的控制台会看到将体温高于 36.5 的做了打印(赵露、刘明)。
在这里插入图片描述

实验总结

本节实验中我们介绍了 Flink 中的多流转换算子,其中 Union 是将两个或者多个类型相同的输入流转换成一个输入流,而filter是将一个输入流根据给定的条件切分成多个子输入流。这部分内容在工作中会经常用到,大家一定要理解。

相关文章:

【Flink快速入门-5.流处理之多流转换算子】

流处理之多流转换算子 实验介绍 前面实验中介绍的算子已经能够满足我们的大部分开发需求了&#xff0c;但是在实际工作中有时候还会遇到一些业务场景&#xff0c;例如需要摄入多个输入流并将其合并处理&#xff0c;或者需要将一条输入流分割为多条子流&#xff0c;在不同的子…...

react传递函数与回调函数原理

为什么 React 允许直接传递函数&#xff1f; 回调函数核心逻辑 例子&#xff1a;父组件控制 Modal 的显示与隐藏 // 父组件 (ParentComponent.tsx) import React, { useState } from react; import { Modal, Button } from antd; import ModalContent from ./ModalContent;co…...

华为云kubernetes基于keda自动伸缩deployment副本(监听redis队列长度)

1 概述 KEDA&#xff08;Kubernetes-based Event-Driven Autoscaler&#xff0c;网址是https://keda.sh&#xff09;是在 Kubernetes 中事件驱动的弹性伸缩器&#xff0c;功能非常强大。不仅支持根据基础的CPU和内存指标进行伸缩&#xff0c;还支持根据各种消息队列中的长度、…...

Spring源码分析のBean扫描流程

文章目录 前言一、scanCandidateComponents1.1 isCandidateComponent1.1.1、排除/包含过滤器1.1.2、条件装配1.1.3、重载一1.1.4、重载二1.1.5、补充&#xff1a;Lookup注解 总结 前言 原生的Spring在构造ApplicationContext时&#xff0c;会调用refresh方法。其中就包含了扫描…...

Ubuntu安装docker:docker-desktop : 依赖: docker-ce-cli 但无法安装它、无法定位软件包 docker-ce-cli

具体错误 sudo apt-get install ./docker-desktop-amd64.deb [sudo] password for weiyu: 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 注意&#xff0c;选中 docker-desktop 而非 ./docker-desktop-amd64.de…...

基于大数据的奥运会获奖数据分析系统设计与实现

【大数据】基于大数据的奥运会获奖数据分析系统设计与实现&#xff08;完整系统源码开发笔记详细部署教程&#xff09;✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 该系统通过集成先进的数据抓取、处理、存储与可视化技术&#xff0c;为深入理解奥运会…...

数据结构 堆和priority_queue

一、堆的定义 堆&#xff08;heap&#xff09;&#xff0c;是⼀棵有着特殊性质的完全⼆叉树&#xff0c;可以⽤来实现优先级队列&#xff08;priorityqueue&#xff09;。 堆需要满⾜以下性质&#xff1a; 1. 是⼀棵完全⼆叉树&#xff1b; 2. 对于树中每个结点&#xff0c;如…...

Dockerfile 编写推荐

一、导读 本文主要介绍在编写 docker 镜像的时候一些需要注意的事项和推荐的做法。 虽然 Dockerfile 简化了镜像构建的过程&#xff0c;并且把这个过程可以进行版本控制&#xff0c;但是不正当的 Dockerfile 使用也会导致很多问题。 docker 镜像太大。如果你经常使用镜像或者…...

【抽象代数】1.2. 半群与群

群的定义 群非空集合二元运算性质 定义1. 设 为一个非空集合&#xff0c;上有二元运算&#xff0c;满足结合律&#xff0c;则称或为一个半群。 定义2. 设 为半群&#xff0c;若元素 满足 &#xff0c;则称 为 的左幺元&#xff08;右幺元&#xff1a;&#xff09;&#…...

Django中实现简单易用的分页工具

如何在Django中实现简单易用的分页工具&#xff1f;&#x1f4da; 嗨&#xff0c;小伙伴们&#xff01;今天我们来看看如何在 Django 中实现一个超简单的分页工具。无论你是在处理博客文章、产品列表&#xff0c;还是用户评论&#xff0c;当数据量一大时&#xff0c;分页显得尤…...

「软件设计模式」装饰者模式(Decorator)

深入解析装饰者模式&#xff1a;动态扩展功能的艺术&#xff08;C实现&#xff09; 一、模式思想与应用场景 1.1 模式定义 装饰者模式&#xff08;Decorator Pattern&#xff09;是一种结构型设计模式&#xff0c;它通过将对象放入包含行为的特殊封装对象中&#xff0c;动态地…...

CI/CD(二)docker-compose安装Jenkins

1、docker-compose.yml version: 3.8services:jenkins:image: jenkins/jenkins:lts # 使用官方的 Jenkins LTS 镜像container_name: jenkinsuser: root # 如果需要以 root 用户运行ports:- "8080:8080" # Jenkins Web 界面端口- "50000:50000" # 用于 Jen…...

OpenCV机器学习(1)人工神经网络 - 多层感知器类cv::ml::ANN_MLP

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::ml::ANN_MLP 是 OpenCV 库中的一部分&#xff0c;用于实现人工神经网络 - 多层感知器&#xff08;Artificial Neural Network - Multi-Layer…...

ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群

ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群 一、PolarDB-X标准版主备集群搭建 三台机器上传 polardbx 包&#xff0c;包可以从官网https://openpolardb.com/download获取&#xff0c;这里提供离线rpm。 1、上传 polardbx 安装包 到 /opt目录下 rpm -ivh t-pol…...

15.1 Process(进程)类

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 通常开发时想要获得进程是比较困难的事&#xff0c;必须要调用CreateToolhelpSnapshot、ProcessFirst、ProcessNext等API或者诸如 Zw…...

elasticsearch8 linux版以服务的方式启动

1.创建系统服务文件 对于使用 systemd 作为系统初始化系统的 Linux 发行版&#xff08;如 CentOS 7 及以上、Ubuntu 16.04 及以上&#xff09;&#xff0c;需要创建一个 systemd 服务文件。以 root 用户或具有 sudo 权限的用户身份执行以下操作&#xff1a; sudo vim /etc/sy…...

小米 R3G 路由器刷机教程(Pandavan)

小米 R3G 路由器刷机教程&#xff08;Pandavan&#xff09; 一、前言 小米 R3G 路由器以其高性价比和稳定的性能备受用户青睐。然而&#xff0c;原厂固件的功能相对有限&#xff0c;难以满足高级用户的个性化需求。刷机不仅可以解锁路由器的潜能&#xff0c;还能通过第三方固…...

某大型业务系统技术栈介绍【应对面试】

微服务架构【图】 微服务架构【概念】 微服务架构&#xff0c;是一种架构模式&#xff0c;它提倡将单一应用程序划分成一组小的服务&#xff0c;服务之间互相协调、互相配合&#xff0c;为用户提供最终价值。在微服务架构中&#xff0c;服务与服务之间通信时&#xff0c;通常是…...

【区块链】零知识证明基础概念详解

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 零知识证明基础概念详解引言1. 零知识证明的定义与特性1.1 基本定义1.2 三个核心…...

建筑行业安全技能竞赛流程方案

一、比赛时间&#xff1a; 6月23日8&#xff1a;30分准时到场&#xff1b;9&#xff1a;00&#xff0d;10&#xff1a;00理论考试&#xff1b;10&#xff1a;10-12:00现场隐患答疑&#xff1b;12:00-13&#xff1a;30午餐&#xff1b;下午13&#xff1a;30-15&#xff1a;30现场…...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

Spark 之 入门讲解详细版(1)

1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室&#xff08;Algorithms, Machines, and People Lab&#xff09;开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目&#xff0c;8个月后成为Apache顶级项目&#xff0c;速度之快足见过人之处&…...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

dify打造数据可视化图表

一、概述 在日常工作和学习中&#xff0c;我们经常需要和数据打交道。无论是分析报告、项目展示&#xff0c;还是简单的数据洞察&#xff0c;一个清晰直观的图表&#xff0c;往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server&#xff0c;由蚂蚁集团 AntV 团队…...

Java编程之桥接模式

定义 桥接模式&#xff08;Bridge Pattern&#xff09;属于结构型设计模式&#xff0c;它的核心意图是将抽象部分与实现部分分离&#xff0c;使它们可以独立地变化。这种模式通过组合关系来替代继承关系&#xff0c;从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...

使用LangGraph和LangSmith构建多智能体人工智能系统

现在&#xff0c;通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战&#xff0c;比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...

C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)

名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...

Web中间件--tomcat学习

Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机&#xff0c;它可以执行Java字节码。Java虚拟机是Java平台的一部分&#xff0c;Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...