当前位置: 首页 > news >正文

Flink Flink中的合流

一、Flink中的基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。

二、联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在这里插入图片描述
在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:

stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

代码实现:我们可以用下面的代码做一个简单测试:

package com.flink.DataStream.UnionStream;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkUnionStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment.socketTextStream("localhost", 1111).map(a -> Integer.parseInt(a));SingleOutputStreamOperator<Integer> source2 = streamExecutionEnvironment.socketTextStream("localhost", 2222).map(a -> Integer.parseInt(a));DataStreamSource<String> source3 = streamExecutionEnvironment.fromElements("3", "4", "5");DataStream<Integer> unionResult = source1.union(source2, source3.map(Integer::valueOf));unionResult.print();streamExecutionEnvironment.execute();}
}

在这里插入图片描述
在这里插入图片描述

三、连接(Connect)

为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的结果并不是DataStream,而是一个“连接流”。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。
在这里插入图片描述

相关文章:

Flink Flink中的合流

一、Flink中的基本合流操作 在实际应用中&#xff0c;我们经常会遇到来源不同的多条流&#xff0c;需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍&#xff0c;对应的 API 也更加丰富。 二、联合&#xff08;Union&#xff09; 最简单的合流操作&#xf…...

工业园区重金属废水深度处理工程项目,稳定出水0.1mg/l

随着环保要求不断提高&#xff0c;工业废水处理已成为众多企业的必修课。然而在工业生产中&#xff0c;如何有效处理含有重金属的废水成为了一个关键的挑战。 重金属废水是指含有汞、铅、铜、镉、锌、镍等有毒有害物质的废水&#xff0c;来源于矿山开采、金属冶炼、电镀、印刷线…...

element table滚动条失效

问题描述:给el-table限制高度之后滚动条没了 给看看咋设置的&#xff1a; <el-table:data"tableData"style"width: 100%;"ref"table"max-height"400"sort-change"changeSort">对比了老半天找不出问题&#xff0c;最后…...

代码随想录算法训练营 ---第四十六天

第一题&#xff1a; 简介&#xff1a; 本题的重点在于确定背包容量和物品数量 确定dp数组以及下标的含义 dp[i] : 字符串长度为i的话&#xff0c;dp[i]为true&#xff0c;表示可以拆分为一个或多个在字典中出现的单词。 2.确定递推公式 如果确定dp[j] 是true&#xff0c;且…...

MySQL-02-InnoDB存储引擎

实际的业务系统开发中&#xff0c;使用MySQL数据库&#xff0c;我们使用最多的当然是支持事务并发的InnoDB存储引擎的这种表结构&#xff0c;下面我们介绍下InnoDB存储引擎相关的知识点。 1-Innodb体系架构 InnoDB存储引擎有多个内存块&#xff0c;可以认为这些内存块组成了一…...

Qt路径和Anaconda中QT路径冲突(ubuntu系统)

最近做一个项目需要配置QT库&#xff0c;本项目配置环境如下&#xff1a; Qt version 5 Operating system, version and so on ubuntu 20.04 Description 之前使用过anaconda环境安装过QT5&#xff0c;所以在项目中CMakeLists文件中使用find_package时候&#xff0c;默认使用An…...

vue2.js添加水印

通过canvas生成水印图片 function addWaterMark(str) {let ctx document.createElement("canvas");ctx.width 900;ctx.height 450;ctx.style.display "none";let cans ctx.getContext("2d");cans.rotate((-20 * Math.PI) / 180);cans.font…...

Eureka简单使用做微服务模块之间动态请求

创建一个eureka模块,引入eureka 为启动项加上EnableEurekaServer注解 配置信息 orderService和userService的操作是一样的 这里以orderService为例: 引入eureka客户端 加上 LoadBalanced注解 配置 orderService和userService都配置好了之后 启动 这样我们在http://localhos…...

竞赛选题 题目:基于深度学习卷积神经网络的花卉识别 - 深度学习 机器视觉

文章目录 0 前言1 项目背景2 花卉识别的基本原理3 算法实现3.1 预处理3.2 特征提取和选择3.3 分类器设计和决策3.4 卷积神经网络基本原理 4 算法实现4.1 花卉图像数据4.2 模块组成 5 项目执行结果6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基…...

css-tricks网站图例

使用css实现钟表 <template><div><p><small>CSS sin() and cos() does <strong>NOT</strong> work in your browser.</small></p><div class"clock"><div id"app" class"clock-face"…...

Scrapy框架内置管道之图片视频和文件(一篇文章齐全)

1、Scrapy框架初识&#xff08;点击前往查阅&#xff09; 2、Scrapy框架持久化存储&#xff08;点击前往查阅&#xff09; 3、Scrapy框架内置管道 4、Scrapy框架中间件&#xff08;点击前往查阅&#xff09; Scrapy 是一个开源的、基于Python的爬虫框架&#xff0c;它提供了…...

Linux文件与路径

Linux文件与路径 1、文件结构 ​ Windows和Linux文件系统区别 ​ 在windows平台下&#xff0c;打开“此电脑”&#xff0c;我们可以看到盘符分区 ​ 每个驱动器都有自己的根目录结构&#xff0c;这样形成了多个树并列的情形 ​ 但是在 Linux 下&#xff0c;我们是看不到这些…...

【Qt】获取当前系统用户名:9种获取方式

目的 有时&#xff0c;在项目开发中&#xff0c;需要显示或者用到当前系统用户名信息。以下是几种获取系统用户名解决方案&#xff1a; 解决方案 1. 使用QDir::home() #include <QApplication> #include <QDir> #include <QDebug>int main(int argc, cha…...

ECMAScript2023你学习了吗?

一、ES2023 Features 【Array find from last】 从头到尾搜索数组&#xff1a;findLast() 、findLastIndex()【Hashbang Grammar】Hashbang 语法【Symbols as WeakMap keys】Symbol 作为 WeakMap 的键【Change array by copy】通过副本更改数组&#xff1a;toReversed()、toSo…...

【从删库到跑路 | MySQL总结篇】数据库基础(增删改查的基本操作)

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【MySQL学习专栏】&#x1f388; 本专栏旨在分享学习MySQL的一点学习心得&#xff0c;欢迎大家在评论区讨论&#x1f48c; 重点放前面&am…...

【JMeter】配置元件

1. 元件的分类 HTTP Request Default 作用&#xff1a; 可以配置成通用的信息&#xff0c;可复用 ​​​​​​​ JDBC Connection Configuration 作用&#xff1a;连接数据库 前提&#xff1a; 下载好对应数据类型的jar包 ​​​​​​​ HTTP Header Manager信息头管理…...

数据采集静态存储SRAM芯片EMI7064

数据采集是利用一种装置&#xff0c;从系统外部采集数据并输入到系统内部的一个接口。数据采集技术广泛应用在各个领域。比如摄像头&#xff0c;麦克风&#xff0c;都是数据采集工具。 ram工作时可以随时从任何一个指定的地址写入(存入&#xff09;或读出(取出)信息。RAM在计算…...

网络运维与网络安全 学习笔记2023.11.27

网络运维与网络安全 学习笔记 第二十八天 今日目标 OSPF基本原理、OSPF单区域配置、OSPF多区域配置 特殊区域之Stub、特殊区域之NSSA OSPF基本原理 项目背景 随着企业的发展&#xff0c;网络的规模越来越大&#xff0c;网段的数量越来越多&#xff0c;公司内部的路由器的…...

ansible学习

一文掌握 Ansible 自动化运维 - 知乎 ansible的安装与简单的使用_坚持到所有人都放弃!!!的技术博客_51CTO博客 Ansible中文权威指南 — 国内最专业的Ansible中文官方学习手册 (ansible-tran.readthedocs.io) 安装 # yum -y install epel-release //更新本地安装库 # yu…...

使用Kibana让es集群形象起来

部署Elasticsearch集群详细步骤参考本人&#xff1a; https://blog.csdn.net/m0_59933574/article/details/134605073?spm1001.2014.3001.5502https://blog.csdn.net/m0_59933574/article/details/134605073?spm1001.2014.3001.5502 kibana部署 es集群设备 安装软件主机名…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

ArcGIS Pro制作水平横向图例+多级标注

今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作&#xff1a;ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等&#xff08;ArcGIS出图图例8大技巧&#xff09;&#xff0c;那这次我们看看ArcGIS Pro如何更加快捷的操作。…...

C# 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

日常一水C

多态 言简意赅&#xff1a;就是一个对象面对同一事件时做出的不同反应 而之前的继承中说过&#xff0c;当子类和父类的函数名相同时&#xff0c;会隐藏父类的同名函数转而调用子类的同名函数&#xff0c;如果要调用父类的同名函数&#xff0c;那么就需要对父类进行引用&#…...

在 Spring Boot 项目里,MYSQL中json类型字段使用

前言&#xff1a; 因为程序特殊需求导致&#xff0c;需要mysql数据库存储json类型数据&#xff0c;因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...

如何在Windows本机安装Python并确保与Python.NET兼容

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…...