Flink多流处理之connect拼接流
Flink中的拼接流connect
的使用其实非常简单,就是leftStream.connect(rightStream)
的方式,但是有一点我们需要清楚,使用connect
后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStream
和rightStream
,也可以使用不同的逻辑处理leftStream
和rightStream
.
如下图:
下面的演示代码也可以通过这个图结合来看,其实connect
算子最主要的作用就是共享状态,如常用的广播状态
.
- 代码
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;import java.util.Arrays;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/7* @Description: 多流操作-流连接**/
public class FlinkConnect {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 添加数据源1DataStreamSource<String> sourceStream1 = env.fromCollection(Arrays.asList("a", "b", "c", "d"));// 添加数据源2DataStreamSource<Double> sourceStream2 = env.fromCollection(Arrays.asList(22.2, 11.0, 6.0, 98.0, 100.0));// 拼接数据流ConnectedStreams<String, Double> connectedStream = sourceStream1.connect(sourceStream2);// 这里使用map算子作为演示SingleOutputStreamOperator<String> resultStream = connectedStream.map(new CoMapFunction<String, Double, String>() {/*** map1作为左流**/@Overridepublic String map1(String value) throws Exception {return "字符串: " + value;}/*** map2作为右流**/@Overridepublic String map2(Double value) throws Exception {return "数字: " + (value * 100);}});// 打印结果resultStream.print();env.execute("Connect Operator");}
}
- 结果
3> 字符串: b
1> 数字: 600.0
2> 字符串: a
3> 数字: 1100.0
2> 数字: 2220.0
2> 字符串: d
2> 数字: 9800.0
3> 数字: 10000.0
1> 字符串: c
相关文章:

Flink多流处理之connect拼接流
Flink中的拼接流connect的使用其实非常简单,就是leftStream.connect(rightStream)的方式,但是有一点我们需要清楚,使用connect后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStream和rightStream,也可以…...

对任意类型数都可以排序的函数:qsort函数
之前我们学习过冒泡排序: int main() {int arr[] { 9,7,8,6,5,4,3,2,1,0 };int sz sizeof(arr)/sizeof(arr[0]);int i 0;for (i 0; i < sz-1; i) {int j 0;for (j 0; j < sz-1-i; j) {if (arr[j] > arr[j 1]){int temp 0;temp arr[j];arr[j] ar…...
vue数据更新table内容不更新解决方法
场景: table组件绑定的数据变化时,页面没有重新渲染,常见于子组件中使用table组件 原理: 创建实例时 数组在vue中没有被监听到,属于非响应式数据,数组的下标变化无法监听到 解决方式: <e…...
合宙Air724UG LuatOS-Air script lib API--record
record Table of Contents record record.start(seconds, cbFnc, type, quality, rcdType, format, streamRptLen) record.stop(cbFnc) record.getFilePath() record.getData(offset, len) record.getSize() record.delete() record.exists() record.isBusy() record 模块功能&…...

基于Vgg16和Vgg19深度学习网络的步态识别系统matlab仿真
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 MATLAB2022A 3.部分核心程序 ................................................................ % 设置训练选项options …...

Java分布式微服务3——Docker
文章目录 Docker介绍安装DockerDocker基础操作Docker服务的启动镜像命令容器命令1. 从docker hub去查看Nginx容器的运行命令2. 查看所有容器状态3. 查看容器日志4. 进入Nginx容器执行命令,修改Html内容,添加“Hello World”5. 停止与开始容器6. 删除容器…...
js字符串替换
在JavaScript中,字符串替换 有多种方法,下面介绍其中一些比较常用的方法。 使用replace()方法、 replace()方法用于在字符串中查…...

网络防御(2)
1. 什么是防火墙? 2. 状态防火墙工作原理? 3. 防火墙如何处理双通道协议? 一、什么是防火墙? 防火墙是一种网络安全设备或软件,用于保护计算机网络免受未经授权的访问,并管理网络流量。它作为一个安全边界…...

[RCTF2019]DontEatMe
前言 一道迷宫题,但是输入被加密后使用,迷宫也需要在程序中找出并没有直接输出 分析 反调试 发现有两个比较特殊的地方,随机数和创建了新线程,随机数后面又被重新赋值给覆盖了,暂时不用管,ZwSetInformat…...

6. CSS(三)
目录 一、盒子模型 (一)网页布局的本质 (二)盒子模型组成 (三)边框(border) (四)表格的细线边框 (五)内边距(padding…...

计算机网络—HTTP
这里写目录标题 HTTP是什么HTTP常见状态码HTTP常见字段GET与POST的区别Get和Post是安全和幂等吗PUT幂等,不安全DELETE幂等,不是安全 HTTP缓存技术HTTP缓存实现技术 HTTP1.0优缺点和性能HTTP1.1优缺点和性能HTTP2优缺点和性能HTTP3优缺点和性能HTTP和HTTP…...

Tomcat线程池原理
1. 一个 SpringBoot 项目能同时处理多少请求?tomcat容器, 200 次。 2. 怎么来的? 而点击这些线程,查看其堆栈消息,可以看到 Tomcat、threads、ThreadPoolExecutor 等关键字 基于“短时间内有 200 个请求被立马处理…...

踩坑 视觉SLAM 十四讲第二版 ch13 编译及运行问题
一、安装Geset 库 sudo apt-get install libgtest-dev cd /usr/src/gtest sudo mkdir build cd build sudo cmake .. //一定要以sudo的方式运行,否则没有写入权限 sudo make //这个也一样要以sudo的方式 sudo cp libgtest*.a /usr/local/lib //将生成…...
【设计模式】-装饰器模式
Java 设计模式之装饰器模式 前言 在软件开发中,经常有需求对已有的对象进行功能的扩展,但是传统的继承方式会导致类的数量快速增多,且难以维护。为了解决这个问题,装饰器模式应运而生。 装饰器模式是一种结构型设计模式ÿ…...

七月学习总结
一晃暑期七月份已经结束了,八月份需要做的事情更多。 在成长的路上不断地迷茫,不断地前进。到底才能完成对自己地救赎。 目前想的就是以后走软件开发,往架构方向做,主语言Java或者go,408基础一定要扎实,计…...
Camunda 7.x 系列【6】Spring Boot 集成 Camunda 7.19
有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot 版本 2.7.9 本系列Camunda 版本 7.19.0 源码地址:https://gitee.com/pearl-organization/camunda-study-demo 文章目录 1. 前言2. Camunda Platform Run3. Spring Boot 版本兼容性4. 集成 Spring Boot5. 启动项目…...
Kubernetes —调度器配置
目录 配置文件 扩展点 调度插件 多配置文件 应用于多个扩展点的插件 调度程序配置迁移 你可以通过编写配置文件,并将其路径传给 kube-scheduler 的命令行参数,定制 kube-scheduler 的行为。 调度模板(Profile)允许你配置 k…...
【微信小程序】申请蓝牙、位置和数据库等相关权限
在小程序的app.json文件中配置requiredPermissions字段,并在其中添加相应的权限。 以下是一个示例app.json文件的配置,包括了蓝牙、位置和数据库等权限的申请: {"pages": ["pages/index/index"],"requiredPermiss…...

ORB-SLAM2学习笔记6之D435i双目IR相机运行ROS版ORB-SLAM2并发布位姿pose的rostopic
文章目录 0 引言1 D435i相机配置2 新增发布双目位姿功能2.1 新增d435i_stereo.cc代码2.2 修改CMakeLists.txt2.3 新增配置文件D435i.yaml 3 编译运行和结果3.1 编译运行3.2 结果3.3 可能出现的问题 0 引言 ORB-SLAM2学习笔记1已成功编译安装ROS版本ORB-SLAM2到本地,…...

【数据结构与算法——TypeScript】哈希表
【数据结构与算法——TypeScript】 哈希表(HashTable) 哈希表介绍和特性 哈希表是一种非常重要的数据结构,但是很多学习编程的人一直搞不懂哈希表到底是如何实现的。 在这一章节中,我门就一点点来实现一个自己的哈希表。通过实现来理解哈希表背后的原理…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...

vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...