当前位置: 首页 > news >正文

Flink多流处理之join(关联)

Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup.

  • 数据源
    ➜  ~ nc -lk 1111
    101,A
    102,B
    103,C
    104,D
    105,E
    106,F
    
    ➜  ~ nc -lk 2222
    101,A,,程序员
    102,B,,程序员
    103,C,,会计
    104,D,,安全工程师
    106,K,,程序员
    108,,本科,人事
    
  • 代码
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/10* @Description: 多流操作-join**/
    public class FlinkJoin {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据源1,以socket作为数据源DataStreamSource<String> socketStream1 = env.socketTextStream("localhost", 1111);SingleOutputStreamOperator<String[]> mapStream1 = socketStream1.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 数据源2,以socket作为数据源DataStreamSource<String> socketStream2 = env.socketTextStream("localhost", 2222);SingleOutputStreamOperator<String[]> mapStream2 = socketStream2.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 关联数据流DataStream<String> joinedStream = mapStream1.join(mapStream2).where(arr -> arr[0]) // mapStream1以数组中的第一个字段作为关联字段.equalTo(arr -> arr[0]) // mapStream2以数组中的第一个字段作为关联字段.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 以20秒作为一个窗口.apply(new JoinFunction<String[], String[], String>() {// 这里是写关联后的具体逻辑@Overridepublic String join(String[] first, String[] second) throws Exception {String result = first[0] + "," + second[1] + "," + second[2] + "," + second[3];return result;}});// 打印结果数据joinedStream.print();env.execute("Flink join");}
    }
    
  • 结果
    3> 103,C,男,会计
    2> 106,K,男,程序员
    2> 101,A,男,程序员
    3> 104,D,男,安全工程师
    3> 102,B,男,程序员
    
    这个API使用起来还是比较简单的,如果想实现left join或者right join的功能就需要通过coGroup来实现了.

相关文章:

Flink多流处理之join(关联)

Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup. 数据源➜ ~ nc -lk 1111 101,A 102,B 103,C 10…...

LeetCode Top100 Liked 题单(序号34~51)

​34. Find First and Last Position of Element in Sorted Array ​ 题意&#xff1a;找到非递减序列中目标的开头和结尾 我的思路 用二分法把每一个数字都找到&#xff0c;最后返回首尾两个数 代码 Runtime12 ms Beats 33.23% Memory14 MB Beats 5.16% class Solution {…...

视觉slam十四讲---第一弹三维空间刚体运动

1.旋转矩阵 1.1内积 1.2外积 1.3坐标系间的欧式变换 相机运动是一个刚体运动&#xff0c;它保证了同一个向量在各个坐标系下的长度和夹角都不会 发生变化。这种变换称为欧氏变换。 旋转矩阵&#xff1a;它是一个行列式为 1 的正交矩阵。 旋转矩阵为正交阵&#xff0c;它的逆…...

手把手教你配置Jenkins自动化邮件通知

完成基于Jenkins的持续集成部署后&#xff0c;自动化测试执行后&#xff0c;测试结果需要通知到相关人员&#xff0c;除了钉钉通知外我们还可以通过Email通知到对应负责人&#xff0c;这里记录一下测试结果通过Jenkins邮件通知的配置与部署 01、安装插件 方法1&#xff1a; 进…...

Arcgis连续数据的分类(求不同值域的面积)

问题描述&#xff1a;如果得到的一个连续的影响数值数据&#xff0c;但是我们想求取某一段值域的面积占比&#xff0c;需要进行以下操作&#xff1a; 1.按照数值重分类&#xff0c;将某段数值变成一个类别 2.栅格转矢量&#xff0c;再求取面积...

C++ 函数

函数是一组一起执行一个任务的语句。每个 C 程序都至少有一个函数&#xff0c;即主函数 main() &#xff0c;所有简单的程序都可以定义其他额外的函数。 您可以把代码划分到不同的函数中。如何划分代码到不同的函数中是由您来决定的&#xff0c;但在逻辑上&#xff0c;划分通常…...

关于如何创建一个windows窗口的exe文件

如何创建一个windows窗口exe文件&#xff0c;具体参照这个博主&#xff1a; http://t.csdn.cn/pfQK5 以下是实现代码&#xff0c;注意用vs打开&#xff1a; #pragma comment( linker, "/subsystem:\"windows\" /entry:\"WinMainCRTStartup\"" …...

re学习(33)攻防世界-secret-galaxy-300(动态调试)

下载压缩包&#xff1a; 下载链接&#xff1a;https://adworld.xctf.org.cn/challenges/list 参考文章&#xff1a;攻防世界逆向高手题之secret-galaxy-300_沐一 林的博客-CSDN博客 发现这只是三个同一类型文件的三个不同版本而已&#xff0c;一个windows32位exe&#xff0…...

springboot工程集成前端编译包,用于uni-app webView工程,解决其需独立部署带来的麻烦,场景如页面->画布->图片->pdf

前端工程 访问方式 http://127.0.0.1:8080/context/frontEnd/index放行 public class SecurityConfig extends WebSecurityConfigurerAdapter { "/frontEnd/**",SysFrontEndController import lombok.extern.slf4j.Slf4j; import nl.basjes.shaded.org.springfram…...

NeuralNLP-NeuralClassifier的使用记录(二),训练预测自己的【中文文本多分类】

NeuralNLP-NeuralClassifier的使用记录&#xff0c;训练预测自己的【中文文本多分类】 数据准备&#xff1a; ​ 与英文的训练预测一致&#xff0c;都使用相同的数据格式&#xff0c;将数据通过代码处理为JSON格式&#xff0c;以下是我使用的一种&#xff0c;不同的原数据情况…...

express学习笔记8 - 文件上传 下载以及预览

一、上传 1、 安装multer (任意选其中一种) yarn add multer --S npm install multer --S 2、新建配置文件(utils/multerConfig) const multer require(multer); const mkdirp require(mkdirp); // const sd require(silly-datetime); const path require(path);con…...

Python系统学习1-9-类(一)

一、类之初印象 1、类就是空表格&#xff0c;将变量&#xff08;列名&#xff09;和函数&#xff08;行为&#xff09;结合起来 2、创建对象&#xff0c;表达具体行 3、创建类就是创建数据的模板 --操作数据时有提示 --还能再组合数据的行为 --结构更加清晰 4、类的内存分配…...

什么是公网、私网、内网、外网?

中午好&#xff0c;我的网工朋友。 最近经常有很多小白朋友在问&#xff0c;公网、私网、内网、外网&#xff0c;这些的概念是啥样的&#xff0c;又该怎么去界定。 关于IP地址&#xff0c;确实没有太明确的区分&#xff0c;其实也不必太过咬文嚼字。 内网、外网就是一个参考…...

一篇文章教会你搭建私人kindle图书馆,并内网穿透实现公网访问

搭建私人kindle图书馆&#xff0c;并内网穿透实现公网访问 在电子书风靡的时期&#xff0c;大部分人都购买了一本电子书&#xff0c;虽然这本电子书更多的时候是被搁置在储物架上吃灰&#xff0c;或者成为盖泡面的神器&#xff0c;但当亚马逊发布消息将放弃电子书在中国的服务…...

好用的安卓手机投屏到mac分享

工具推荐&#xff1a;scrcpy github地址&#xff1a;https://github.com/Genymobile/scrcpy/tree/master mac使用方式 安装环境&#xff0c;打开terminal&#xff0c;执行以下命令&#xff0c;没有brew的先安装brew brew install scrcpy brew install android-platform-too…...

df -h

df -h 命令用于查看磁盘占用的空间 Filesystem&#xff1a;表示该文件系统位于哪个分区&#xff0c;因此该列显示的是设备名称&#xff1b; Used&#xff1a;表示用掉的磁盘空间大小&#xff1b; Available&#xff1a;表示剩余的磁盘空间大小&#xff1b; Use%&#xff1a;磁盘…...

彻底卸载Android Studio

永恒的爱是永远恪守最初的诺言。 在安装Android Studio会有很多问题导致无法正常运行&#xff0c;多次下载AS多次错误后了解到&#xff0c;删除以下四个文件才能彻底卸载Android Studio。 第一个文件&#xff1a;.gradle 路径&#xff1a;C:\Users\yao&#xff08;这里yao是本…...

QT 5.12配置OpenCV3.4.10

主要过程&#xff1a;使用cmake编译源码&#xff0c;生成Mingw64位 下的OpenCV库 三篇博客解决问题&#xff1a; 1.Windows下安装Qt并使用cmake配置opencv3.4.10(含错误记录及解决办法)_d:\qt\qt5.14.2\5.14.2\mingw73_64\include\qtcore\qg_会飞的DA象的博客-CSDN博客 2.【…...

Qt应用开发(基础篇)——选项卡窗口 QTabWidget

一、前言 QTabWidget类继承于QWidget&#xff0c;是一个拥有选项卡的窗口部件。 QTabWidget类有一个选项卡栏QTabBar和一个页面区域&#xff0c;用来显示和选项卡相关联的界面。用户通过点击选项卡或者自定义快捷方式(ALTKey)切换页面。 二、QTabWidget类 1、count 该属…...

Socks5代理在多线程爬虫中的应用

在进行爬虫开发过程中&#xff0c;我们常常需要处理大量的数据&#xff0c;并执行多任务并发操作。然而&#xff0c;频繁的请求可能会引起目标网站的反爬机制&#xff0c;导致IP封禁或限制访问。为了规避这些限制&#xff0c;我们可以借助Socks5代理的强大功能&#xff0c;通过…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

C++:std::is_convertible

C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

Debian系统简介

目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版&#xff…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建

制造业采购供应链管理是企业运营的核心环节&#xff0c;供应链协同管理在供应链上下游企业之间建立紧密的合作关系&#xff0c;通过信息共享、资源整合、业务协同等方式&#xff0c;实现供应链的全面管理和优化&#xff0c;提高供应链的效率和透明度&#xff0c;降低供应链的成…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

C++ 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

代理篇12|深入理解 Vite中的Proxy接口代理配置

在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?

uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件&#xff0c;用于在原生应用中加载 HTML 页面&#xff1a; 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...

HashMap中的put方法执行流程(流程图)

1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中&#xff0c;其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下&#xff1a; 初始判断与哈希计算&#xff1a; 首先&#xff0c;putVal 方法会检查当前的 table&#xff08;也就…...