Flink Flink中的合流
一、Flink中的基本合流操作
在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。
二、联合(Union)
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:
stream1.union(stream2, stream3, ...)
注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。
代码实现:我们可以用下面的代码做一个简单测试:
package com.flink.DataStream.UnionStream;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkUnionStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment.socketTextStream("localhost", 1111).map(a -> Integer.parseInt(a));SingleOutputStreamOperator<Integer> source2 = streamExecutionEnvironment.socketTextStream("localhost", 2222).map(a -> Integer.parseInt(a));DataStreamSource<String> source3 = streamExecutionEnvironment.fromElements("3", "4", "5");DataStream<Integer> unionResult = source1.union(source2, source3.map(Integer::valueOf));unionResult.print();streamExecutionEnvironment.execute();}
}


三、连接(Connect)
为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的结果并不是DataStream,而是一个“连接流”。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。

相关文章:
Flink Flink中的合流
一、Flink中的基本合流操作 在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。 二、联合(Union) 最简单的合流操作…...
工业园区重金属废水深度处理工程项目,稳定出水0.1mg/l
随着环保要求不断提高,工业废水处理已成为众多企业的必修课。然而在工业生产中,如何有效处理含有重金属的废水成为了一个关键的挑战。 重金属废水是指含有汞、铅、铜、镉、锌、镍等有毒有害物质的废水,来源于矿山开采、金属冶炼、电镀、印刷线…...
element table滚动条失效
问题描述:给el-table限制高度之后滚动条没了 给看看咋设置的: <el-table:data"tableData"style"width: 100%;"ref"table"max-height"400"sort-change"changeSort">对比了老半天找不出问题,最后…...
代码随想录算法训练营 ---第四十六天
第一题: 简介: 本题的重点在于确定背包容量和物品数量 确定dp数组以及下标的含义 dp[i] : 字符串长度为i的话,dp[i]为true,表示可以拆分为一个或多个在字典中出现的单词。 2.确定递推公式 如果确定dp[j] 是true,且…...
MySQL-02-InnoDB存储引擎
实际的业务系统开发中,使用MySQL数据库,我们使用最多的当然是支持事务并发的InnoDB存储引擎的这种表结构,下面我们介绍下InnoDB存储引擎相关的知识点。 1-Innodb体系架构 InnoDB存储引擎有多个内存块,可以认为这些内存块组成了一…...
Qt路径和Anaconda中QT路径冲突(ubuntu系统)
最近做一个项目需要配置QT库,本项目配置环境如下: Qt version 5 Operating system, version and so on ubuntu 20.04 Description 之前使用过anaconda环境安装过QT5,所以在项目中CMakeLists文件中使用find_package时候,默认使用An…...
vue2.js添加水印
通过canvas生成水印图片 function addWaterMark(str) {let ctx document.createElement("canvas");ctx.width 900;ctx.height 450;ctx.style.display "none";let cans ctx.getContext("2d");cans.rotate((-20 * Math.PI) / 180);cans.font…...
Eureka简单使用做微服务模块之间动态请求
创建一个eureka模块,引入eureka 为启动项加上EnableEurekaServer注解 配置信息 orderService和userService的操作是一样的 这里以orderService为例: 引入eureka客户端 加上 LoadBalanced注解 配置 orderService和userService都配置好了之后 启动 这样我们在http://localhos…...
竞赛选题 题目:基于深度学习卷积神经网络的花卉识别 - 深度学习 机器视觉
文章目录 0 前言1 项目背景2 花卉识别的基本原理3 算法实现3.1 预处理3.2 特征提取和选择3.3 分类器设计和决策3.4 卷积神经网络基本原理 4 算法实现4.1 花卉图像数据4.2 模块组成 5 项目执行结果6 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 基…...
css-tricks网站图例
使用css实现钟表 <template><div><p><small>CSS sin() and cos() does <strong>NOT</strong> work in your browser.</small></p><div class"clock"><div id"app" class"clock-face"…...
Scrapy框架内置管道之图片视频和文件(一篇文章齐全)
1、Scrapy框架初识(点击前往查阅) 2、Scrapy框架持久化存储(点击前往查阅) 3、Scrapy框架内置管道 4、Scrapy框架中间件(点击前往查阅) Scrapy 是一个开源的、基于Python的爬虫框架,它提供了…...
Linux文件与路径
Linux文件与路径 1、文件结构 Windows和Linux文件系统区别 在windows平台下,打开“此电脑”,我们可以看到盘符分区 每个驱动器都有自己的根目录结构,这样形成了多个树并列的情形 但是在 Linux 下,我们是看不到这些…...
【Qt】获取当前系统用户名:9种获取方式
目的 有时,在项目开发中,需要显示或者用到当前系统用户名信息。以下是几种获取系统用户名解决方案: 解决方案 1. 使用QDir::home() #include <QApplication> #include <QDir> #include <QDebug>int main(int argc, cha…...
ECMAScript2023你学习了吗?
一、ES2023 Features 【Array find from last】 从头到尾搜索数组:findLast() 、findLastIndex()【Hashbang Grammar】Hashbang 语法【Symbols as WeakMap keys】Symbol 作为 WeakMap 的键【Change array by copy】通过副本更改数组:toReversed()、toSo…...
【从删库到跑路 | MySQL总结篇】数据库基础(增删改查的基本操作)
个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【MySQL学习专栏】🎈 本专栏旨在分享学习MySQL的一点学习心得,欢迎大家在评论区讨论💌 重点放前面&am…...
【JMeter】配置元件
1. 元件的分类 HTTP Request Default 作用: 可以配置成通用的信息,可复用 JDBC Connection Configuration 作用:连接数据库 前提: 下载好对应数据类型的jar包 HTTP Header Manager信息头管理…...
数据采集静态存储SRAM芯片EMI7064
数据采集是利用一种装置,从系统外部采集数据并输入到系统内部的一个接口。数据采集技术广泛应用在各个领域。比如摄像头,麦克风,都是数据采集工具。 ram工作时可以随时从任何一个指定的地址写入(存入)或读出(取出)信息。RAM在计算…...
网络运维与网络安全 学习笔记2023.11.27
网络运维与网络安全 学习笔记 第二十八天 今日目标 OSPF基本原理、OSPF单区域配置、OSPF多区域配置 特殊区域之Stub、特殊区域之NSSA OSPF基本原理 项目背景 随着企业的发展,网络的规模越来越大,网段的数量越来越多,公司内部的路由器的…...
ansible学习
一文掌握 Ansible 自动化运维 - 知乎 ansible的安装与简单的使用_坚持到所有人都放弃!!!的技术博客_51CTO博客 Ansible中文权威指南 — 国内最专业的Ansible中文官方学习手册 (ansible-tran.readthedocs.io) 安装 # yum -y install epel-release //更新本地安装库 # yu…...
使用Kibana让es集群形象起来
部署Elasticsearch集群详细步骤参考本人: https://blog.csdn.net/m0_59933574/article/details/134605073?spm1001.2014.3001.5502https://blog.csdn.net/m0_59933574/article/details/134605073?spm1001.2014.3001.5502 kibana部署 es集群设备 安装软件主机名…...
(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...
龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
【第二十一章 SDIO接口(SDIO)】
第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...
多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...
