当前位置: 首页 > news >正文

Flink多流处理之join(关联)

Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup.

  • 数据源
    ➜  ~ nc -lk 1111
    101,A
    102,B
    103,C
    104,D
    105,E
    106,F
    
    ➜  ~ nc -lk 2222
    101,A,,程序员
    102,B,,程序员
    103,C,,会计
    104,D,,安全工程师
    106,K,,程序员
    108,,本科,人事
    
  • 代码
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/10* @Description: 多流操作-join**/
    public class FlinkJoin {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据源1,以socket作为数据源DataStreamSource<String> socketStream1 = env.socketTextStream("localhost", 1111);SingleOutputStreamOperator<String[]> mapStream1 = socketStream1.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 数据源2,以socket作为数据源DataStreamSource<String> socketStream2 = env.socketTextStream("localhost", 2222);SingleOutputStreamOperator<String[]> mapStream2 = socketStream2.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 关联数据流DataStream<String> joinedStream = mapStream1.join(mapStream2).where(arr -> arr[0]) // mapStream1以数组中的第一个字段作为关联字段.equalTo(arr -> arr[0]) // mapStream2以数组中的第一个字段作为关联字段.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 以20秒作为一个窗口.apply(new JoinFunction<String[], String[], String>() {// 这里是写关联后的具体逻辑@Overridepublic String join(String[] first, String[] second) throws Exception {String result = first[0] + "," + second[1] + "," + second[2] + "," + second[3];return result;}});// 打印结果数据joinedStream.print();env.execute("Flink join");}
    }
    
  • 结果
    3> 103,C,男,会计
    2> 106,K,男,程序员
    2> 101,A,男,程序员
    3> 104,D,男,安全工程师
    3> 102,B,男,程序员
    
    这个API使用起来还是比较简单的,如果想实现left join或者right join的功能就需要通过coGroup来实现了.

相关文章:

Flink多流处理之join(关联)

Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup. 数据源➜ ~ nc -lk 1111 101,A 102,B 103,C 10…...

LeetCode Top100 Liked 题单(序号34~51)

​34. Find First and Last Position of Element in Sorted Array ​ 题意&#xff1a;找到非递减序列中目标的开头和结尾 我的思路 用二分法把每一个数字都找到&#xff0c;最后返回首尾两个数 代码 Runtime12 ms Beats 33.23% Memory14 MB Beats 5.16% class Solution {…...

视觉slam十四讲---第一弹三维空间刚体运动

1.旋转矩阵 1.1内积 1.2外积 1.3坐标系间的欧式变换 相机运动是一个刚体运动&#xff0c;它保证了同一个向量在各个坐标系下的长度和夹角都不会 发生变化。这种变换称为欧氏变换。 旋转矩阵&#xff1a;它是一个行列式为 1 的正交矩阵。 旋转矩阵为正交阵&#xff0c;它的逆…...

手把手教你配置Jenkins自动化邮件通知

完成基于Jenkins的持续集成部署后&#xff0c;自动化测试执行后&#xff0c;测试结果需要通知到相关人员&#xff0c;除了钉钉通知外我们还可以通过Email通知到对应负责人&#xff0c;这里记录一下测试结果通过Jenkins邮件通知的配置与部署 01、安装插件 方法1&#xff1a; 进…...

Arcgis连续数据的分类(求不同值域的面积)

问题描述&#xff1a;如果得到的一个连续的影响数值数据&#xff0c;但是我们想求取某一段值域的面积占比&#xff0c;需要进行以下操作&#xff1a; 1.按照数值重分类&#xff0c;将某段数值变成一个类别 2.栅格转矢量&#xff0c;再求取面积...

C++ 函数

函数是一组一起执行一个任务的语句。每个 C 程序都至少有一个函数&#xff0c;即主函数 main() &#xff0c;所有简单的程序都可以定义其他额外的函数。 您可以把代码划分到不同的函数中。如何划分代码到不同的函数中是由您来决定的&#xff0c;但在逻辑上&#xff0c;划分通常…...

关于如何创建一个windows窗口的exe文件

如何创建一个windows窗口exe文件&#xff0c;具体参照这个博主&#xff1a; http://t.csdn.cn/pfQK5 以下是实现代码&#xff0c;注意用vs打开&#xff1a; #pragma comment( linker, "/subsystem:\"windows\" /entry:\"WinMainCRTStartup\"" …...

re学习(33)攻防世界-secret-galaxy-300(动态调试)

下载压缩包&#xff1a; 下载链接&#xff1a;https://adworld.xctf.org.cn/challenges/list 参考文章&#xff1a;攻防世界逆向高手题之secret-galaxy-300_沐一 林的博客-CSDN博客 发现这只是三个同一类型文件的三个不同版本而已&#xff0c;一个windows32位exe&#xff0…...

springboot工程集成前端编译包,用于uni-app webView工程,解决其需独立部署带来的麻烦,场景如页面->画布->图片->pdf

前端工程 访问方式 http://127.0.0.1:8080/context/frontEnd/index放行 public class SecurityConfig extends WebSecurityConfigurerAdapter { "/frontEnd/**",SysFrontEndController import lombok.extern.slf4j.Slf4j; import nl.basjes.shaded.org.springfram…...

NeuralNLP-NeuralClassifier的使用记录(二),训练预测自己的【中文文本多分类】

NeuralNLP-NeuralClassifier的使用记录&#xff0c;训练预测自己的【中文文本多分类】 数据准备&#xff1a; ​ 与英文的训练预测一致&#xff0c;都使用相同的数据格式&#xff0c;将数据通过代码处理为JSON格式&#xff0c;以下是我使用的一种&#xff0c;不同的原数据情况…...

express学习笔记8 - 文件上传 下载以及预览

一、上传 1、 安装multer (任意选其中一种) yarn add multer --S npm install multer --S 2、新建配置文件(utils/multerConfig) const multer require(multer); const mkdirp require(mkdirp); // const sd require(silly-datetime); const path require(path);con…...

Python系统学习1-9-类(一)

一、类之初印象 1、类就是空表格&#xff0c;将变量&#xff08;列名&#xff09;和函数&#xff08;行为&#xff09;结合起来 2、创建对象&#xff0c;表达具体行 3、创建类就是创建数据的模板 --操作数据时有提示 --还能再组合数据的行为 --结构更加清晰 4、类的内存分配…...

什么是公网、私网、内网、外网?

中午好&#xff0c;我的网工朋友。 最近经常有很多小白朋友在问&#xff0c;公网、私网、内网、外网&#xff0c;这些的概念是啥样的&#xff0c;又该怎么去界定。 关于IP地址&#xff0c;确实没有太明确的区分&#xff0c;其实也不必太过咬文嚼字。 内网、外网就是一个参考…...

一篇文章教会你搭建私人kindle图书馆,并内网穿透实现公网访问

搭建私人kindle图书馆&#xff0c;并内网穿透实现公网访问 在电子书风靡的时期&#xff0c;大部分人都购买了一本电子书&#xff0c;虽然这本电子书更多的时候是被搁置在储物架上吃灰&#xff0c;或者成为盖泡面的神器&#xff0c;但当亚马逊发布消息将放弃电子书在中国的服务…...

好用的安卓手机投屏到mac分享

工具推荐&#xff1a;scrcpy github地址&#xff1a;https://github.com/Genymobile/scrcpy/tree/master mac使用方式 安装环境&#xff0c;打开terminal&#xff0c;执行以下命令&#xff0c;没有brew的先安装brew brew install scrcpy brew install android-platform-too…...

df -h

df -h 命令用于查看磁盘占用的空间 Filesystem&#xff1a;表示该文件系统位于哪个分区&#xff0c;因此该列显示的是设备名称&#xff1b; Used&#xff1a;表示用掉的磁盘空间大小&#xff1b; Available&#xff1a;表示剩余的磁盘空间大小&#xff1b; Use%&#xff1a;磁盘…...

彻底卸载Android Studio

永恒的爱是永远恪守最初的诺言。 在安装Android Studio会有很多问题导致无法正常运行&#xff0c;多次下载AS多次错误后了解到&#xff0c;删除以下四个文件才能彻底卸载Android Studio。 第一个文件&#xff1a;.gradle 路径&#xff1a;C:\Users\yao&#xff08;这里yao是本…...

QT 5.12配置OpenCV3.4.10

主要过程&#xff1a;使用cmake编译源码&#xff0c;生成Mingw64位 下的OpenCV库 三篇博客解决问题&#xff1a; 1.Windows下安装Qt并使用cmake配置opencv3.4.10(含错误记录及解决办法)_d:\qt\qt5.14.2\5.14.2\mingw73_64\include\qtcore\qg_会飞的DA象的博客-CSDN博客 2.【…...

Qt应用开发(基础篇)——选项卡窗口 QTabWidget

一、前言 QTabWidget类继承于QWidget&#xff0c;是一个拥有选项卡的窗口部件。 QTabWidget类有一个选项卡栏QTabBar和一个页面区域&#xff0c;用来显示和选项卡相关联的界面。用户通过点击选项卡或者自定义快捷方式(ALTKey)切换页面。 二、QTabWidget类 1、count 该属…...

Socks5代理在多线程爬虫中的应用

在进行爬虫开发过程中&#xff0c;我们常常需要处理大量的数据&#xff0c;并执行多任务并发操作。然而&#xff0c;频繁的请求可能会引起目标网站的反爬机制&#xff0c;导致IP封禁或限制访问。为了规避这些限制&#xff0c;我们可以借助Socks5代理的强大功能&#xff0c;通过…...

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

3-11单元格区域边界定位(End属性)学习笔记

返回一个Range 对象&#xff0c;只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意&#xff1a;它移动的位置必须是相连的有内容的单元格…...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

Java编程之桥接模式

定义 桥接模式&#xff08;Bridge Pattern&#xff09;属于结构型设计模式&#xff0c;它的核心意图是将抽象部分与实现部分分离&#xff0c;使它们可以独立地变化。这种模式通过组合关系来替代继承关系&#xff0c;从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...