Flink多流处理之join(关联)
Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup.
- 数据源
➜ ~ nc -lk 1111 101,A 102,B 103,C 104,D 105,E 106,F➜ ~ nc -lk 2222 101,A,男,程序员 102,B,男,程序员 103,C,男,会计 104,D,男,安全工程师 106,K,男,程序员 108,女,本科,人事 - 代码
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/10* @Description: 多流操作-join**/ public class FlinkJoin {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据源1,以socket作为数据源DataStreamSource<String> socketStream1 = env.socketTextStream("localhost", 1111);SingleOutputStreamOperator<String[]> mapStream1 = socketStream1.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 数据源2,以socket作为数据源DataStreamSource<String> socketStream2 = env.socketTextStream("localhost", 2222);SingleOutputStreamOperator<String[]> mapStream2 = socketStream2.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 关联数据流DataStream<String> joinedStream = mapStream1.join(mapStream2).where(arr -> arr[0]) // mapStream1以数组中的第一个字段作为关联字段.equalTo(arr -> arr[0]) // mapStream2以数组中的第一个字段作为关联字段.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 以20秒作为一个窗口.apply(new JoinFunction<String[], String[], String>() {// 这里是写关联后的具体逻辑@Overridepublic String join(String[] first, String[] second) throws Exception {String result = first[0] + "," + second[1] + "," + second[2] + "," + second[3];return result;}});// 打印结果数据joinedStream.print();env.execute("Flink join");} } - 结果
这个3> 103,C,男,会计 2> 106,K,男,程序员 2> 101,A,男,程序员 3> 104,D,男,安全工程师 3> 102,B,男,程序员API使用起来还是比较简单的,如果想实现left join或者right join的功能就需要通过coGroup来实现了.
相关文章:
Flink多流处理之join(关联)
Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup. 数据源➜ ~ nc -lk 1111 101,A 102,B 103,C 10…...
LeetCode Top100 Liked 题单(序号34~51)
34. Find First and Last Position of Element in Sorted Array 题意:找到非递减序列中目标的开头和结尾 我的思路 用二分法把每一个数字都找到,最后返回首尾两个数 代码 Runtime12 ms Beats 33.23% Memory14 MB Beats 5.16% class Solution {…...
视觉slam十四讲---第一弹三维空间刚体运动
1.旋转矩阵 1.1内积 1.2外积 1.3坐标系间的欧式变换 相机运动是一个刚体运动,它保证了同一个向量在各个坐标系下的长度和夹角都不会 发生变化。这种变换称为欧氏变换。 旋转矩阵:它是一个行列式为 1 的正交矩阵。 旋转矩阵为正交阵,它的逆…...
手把手教你配置Jenkins自动化邮件通知
完成基于Jenkins的持续集成部署后,自动化测试执行后,测试结果需要通知到相关人员,除了钉钉通知外我们还可以通过Email通知到对应负责人,这里记录一下测试结果通过Jenkins邮件通知的配置与部署 01、安装插件 方法1: 进…...
Arcgis连续数据的分类(求不同值域的面积)
问题描述:如果得到的一个连续的影响数值数据,但是我们想求取某一段值域的面积占比,需要进行以下操作: 1.按照数值重分类,将某段数值变成一个类别 2.栅格转矢量,再求取面积...
C++ 函数
函数是一组一起执行一个任务的语句。每个 C 程序都至少有一个函数,即主函数 main() ,所有简单的程序都可以定义其他额外的函数。 您可以把代码划分到不同的函数中。如何划分代码到不同的函数中是由您来决定的,但在逻辑上,划分通常…...
关于如何创建一个windows窗口的exe文件
如何创建一个windows窗口exe文件,具体参照这个博主: http://t.csdn.cn/pfQK5 以下是实现代码,注意用vs打开: #pragma comment( linker, "/subsystem:\"windows\" /entry:\"WinMainCRTStartup\"" …...
re学习(33)攻防世界-secret-galaxy-300(动态调试)
下载压缩包: 下载链接:https://adworld.xctf.org.cn/challenges/list 参考文章:攻防世界逆向高手题之secret-galaxy-300_沐一 林的博客-CSDN博客 发现这只是三个同一类型文件的三个不同版本而已,一个windows32位exe࿰…...
springboot工程集成前端编译包,用于uni-app webView工程,解决其需独立部署带来的麻烦,场景如页面->画布->图片->pdf
前端工程 访问方式 http://127.0.0.1:8080/context/frontEnd/index放行 public class SecurityConfig extends WebSecurityConfigurerAdapter { "/frontEnd/**",SysFrontEndController import lombok.extern.slf4j.Slf4j; import nl.basjes.shaded.org.springfram…...
NeuralNLP-NeuralClassifier的使用记录(二),训练预测自己的【中文文本多分类】
NeuralNLP-NeuralClassifier的使用记录,训练预测自己的【中文文本多分类】 数据准备: 与英文的训练预测一致,都使用相同的数据格式,将数据通过代码处理为JSON格式,以下是我使用的一种,不同的原数据情况…...
express学习笔记8 - 文件上传 下载以及预览
一、上传 1、 安装multer (任意选其中一种) yarn add multer --S npm install multer --S 2、新建配置文件(utils/multerConfig) const multer require(multer); const mkdirp require(mkdirp); // const sd require(silly-datetime); const path require(path);con…...
Python系统学习1-9-类(一)
一、类之初印象 1、类就是空表格,将变量(列名)和函数(行为)结合起来 2、创建对象,表达具体行 3、创建类就是创建数据的模板 --操作数据时有提示 --还能再组合数据的行为 --结构更加清晰 4、类的内存分配…...
什么是公网、私网、内网、外网?
中午好,我的网工朋友。 最近经常有很多小白朋友在问,公网、私网、内网、外网,这些的概念是啥样的,又该怎么去界定。 关于IP地址,确实没有太明确的区分,其实也不必太过咬文嚼字。 内网、外网就是一个参考…...
一篇文章教会你搭建私人kindle图书馆,并内网穿透实现公网访问
搭建私人kindle图书馆,并内网穿透实现公网访问 在电子书风靡的时期,大部分人都购买了一本电子书,虽然这本电子书更多的时候是被搁置在储物架上吃灰,或者成为盖泡面的神器,但当亚马逊发布消息将放弃电子书在中国的服务…...
好用的安卓手机投屏到mac分享
工具推荐:scrcpy github地址:https://github.com/Genymobile/scrcpy/tree/master mac使用方式 安装环境,打开terminal,执行以下命令,没有brew的先安装brew brew install scrcpy brew install android-platform-too…...
df -h
df -h 命令用于查看磁盘占用的空间 Filesystem:表示该文件系统位于哪个分区,因此该列显示的是设备名称; Used:表示用掉的磁盘空间大小; Available:表示剩余的磁盘空间大小; Use%:磁盘…...
彻底卸载Android Studio
永恒的爱是永远恪守最初的诺言。 在安装Android Studio会有很多问题导致无法正常运行,多次下载AS多次错误后了解到,删除以下四个文件才能彻底卸载Android Studio。 第一个文件:.gradle 路径:C:\Users\yao(这里yao是本…...
QT 5.12配置OpenCV3.4.10
主要过程:使用cmake编译源码,生成Mingw64位 下的OpenCV库 三篇博客解决问题: 1.Windows下安装Qt并使用cmake配置opencv3.4.10(含错误记录及解决办法)_d:\qt\qt5.14.2\5.14.2\mingw73_64\include\qtcore\qg_会飞的DA象的博客-CSDN博客 2.【…...
Qt应用开发(基础篇)——选项卡窗口 QTabWidget
一、前言 QTabWidget类继承于QWidget,是一个拥有选项卡的窗口部件。 QTabWidget类有一个选项卡栏QTabBar和一个页面区域,用来显示和选项卡相关联的界面。用户通过点击选项卡或者自定义快捷方式(ALTKey)切换页面。 二、QTabWidget类 1、count 该属…...
Socks5代理在多线程爬虫中的应用
在进行爬虫开发过程中,我们常常需要处理大量的数据,并执行多任务并发操作。然而,频繁的请求可能会引起目标网站的反爬机制,导致IP封禁或限制访问。为了规避这些限制,我们可以借助Socks5代理的强大功能,通过…...
简历投了全石沉大海?实测3个免费AI简历神器,HR秒通过、面试翻3倍!
3个实测免费的AI简历神器,不用花钱、不用登录,直接让简历过ATS、获面试,应届生/职场人闭眼冲!简历优化本身就讲究精准度,尤其是ATS筛选逻辑,很多工具要么收费高,要么改完还是不贴合JD࿰…...
2026届必备的AI辅助论文网站解析与推荐
Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek 于学术研究的范畴之内,高效且可靠的AI辅助工具正逐渐演变成学者以及学生的得力帮…...
彻底告别Row-By-Row:标量子查询外连接改写与向量化引擎深潜
在实际的复杂业务系统开发与运维中,SQL查询的结构往往会随着业务复杂度的提升而变得臃肿不堪。为了保证代码的可读性和逻辑的直观性,开发者非常喜欢使用 CTE(公共表表达式)、多层子查询、窗口函数,以及标量子查询&…...
为开发者工具注入情感分析能力:开源库ai-devtools-sentiment实战指南
1. 项目概述:一个为开发者工具注入情感分析能力的开源库最近在折腾一些开发者工具,比如代码审查机器人、文档生成器或者IDE插件,我总感觉它们冷冰冰的。它们能告诉你代码有语法错误,能提示你某个API已废弃,但它们无法感…...
Scroll Reverser:为什么你的Mac需要这款滚动方向控制神器?
Scroll Reverser:为什么你的Mac需要这款滚动方向控制神器? 【免费下载链接】Scroll-Reverser Per-device scrolling prefs on macOS. 项目地址: https://gitcode.com/gh_mirrors/sc/Scroll-Reverser 作为一名设计师,李华每天在MacBook…...
嵌入式开发实战:SPI、UART、I2C三大硬件接口通信协议详解与CircuitPython应用
1. 项目概述:为什么硬件接口是嵌入式开发的基石如果你玩过单片机或者树莓派,肯定遇到过这样的场景:手里有一块炫酷的LED灯带、一个GPS模块或者一个环境传感器,想让它和你的主控板“说上话”,结果发现连线复杂、代码难调…...
SMARC模块化电脑标准:嵌入式系统设计、选型与集成实战指南
1. 项目概述最近在规划一个边缘计算网关项目,选型时又和硬件同事聊到了SMARC。这已经不是第一次在项目里接触这个标准了,但每次和不同背景的工程师讨论,总会发现大家对它的理解深浅不一。有的嵌入式软件工程师觉得它就是个“带金手指的核心板…...
魔兽争霸III终极优化指南:7个实用方案让经典游戏完美适配现代硬件
魔兽争霸III终极优化指南:7个实用方案让经典游戏完美适配现代硬件 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 魔兽争霸III作为一款经典…...
智能体框架(Harness)深度解析:模型+框架=智能体,一文带你秒懂!
智能体框架(Harness)到底是什么?一文拆透 先把结论摆出来 智能体 模型 框架 如果你不是模型,你就是框架。这个公式听起来简单,但真正理解它需要费点功夫。 所谓框架(Harness),就是…...
紫光同创PGL22G开发板DDR3读写实验:从IP核安装到上板验证的保姆级避坑指南
紫光同创PGL22G开发板DDR3读写实验全流程实战解析 第一次接触国产FPGA平台进行DDR3内存控制实验时,很多开发者都会遇到各种"坑"。本文将基于紫光同创PGL22G开发板,从IP核安装到最终上板验证,手把手带你避开那些容易出错的关键环节。…...
