Flink多流处理之join(关联)
Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup.
- 数据源
➜ ~ nc -lk 1111 101,A 102,B 103,C 104,D 105,E 106,F➜ ~ nc -lk 2222 101,A,男,程序员 102,B,男,程序员 103,C,男,会计 104,D,男,安全工程师 106,K,男,程序员 108,女,本科,人事 - 代码
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/10* @Description: 多流操作-join**/ public class FlinkJoin {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据源1,以socket作为数据源DataStreamSource<String> socketStream1 = env.socketTextStream("localhost", 1111);SingleOutputStreamOperator<String[]> mapStream1 = socketStream1.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 数据源2,以socket作为数据源DataStreamSource<String> socketStream2 = env.socketTextStream("localhost", 2222);SingleOutputStreamOperator<String[]> mapStream2 = socketStream2.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 关联数据流DataStream<String> joinedStream = mapStream1.join(mapStream2).where(arr -> arr[0]) // mapStream1以数组中的第一个字段作为关联字段.equalTo(arr -> arr[0]) // mapStream2以数组中的第一个字段作为关联字段.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 以20秒作为一个窗口.apply(new JoinFunction<String[], String[], String>() {// 这里是写关联后的具体逻辑@Overridepublic String join(String[] first, String[] second) throws Exception {String result = first[0] + "," + second[1] + "," + second[2] + "," + second[3];return result;}});// 打印结果数据joinedStream.print();env.execute("Flink join");} } - 结果
这个3> 103,C,男,会计 2> 106,K,男,程序员 2> 101,A,男,程序员 3> 104,D,男,安全工程师 3> 102,B,男,程序员API使用起来还是比较简单的,如果想实现left join或者right join的功能就需要通过coGroup来实现了.
相关文章:
Flink多流处理之join(关联)
Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup. 数据源➜ ~ nc -lk 1111 101,A 102,B 103,C 10…...
LeetCode Top100 Liked 题单(序号34~51)
34. Find First and Last Position of Element in Sorted Array 题意:找到非递减序列中目标的开头和结尾 我的思路 用二分法把每一个数字都找到,最后返回首尾两个数 代码 Runtime12 ms Beats 33.23% Memory14 MB Beats 5.16% class Solution {…...
视觉slam十四讲---第一弹三维空间刚体运动
1.旋转矩阵 1.1内积 1.2外积 1.3坐标系间的欧式变换 相机运动是一个刚体运动,它保证了同一个向量在各个坐标系下的长度和夹角都不会 发生变化。这种变换称为欧氏变换。 旋转矩阵:它是一个行列式为 1 的正交矩阵。 旋转矩阵为正交阵,它的逆…...
手把手教你配置Jenkins自动化邮件通知
完成基于Jenkins的持续集成部署后,自动化测试执行后,测试结果需要通知到相关人员,除了钉钉通知外我们还可以通过Email通知到对应负责人,这里记录一下测试结果通过Jenkins邮件通知的配置与部署 01、安装插件 方法1: 进…...
Arcgis连续数据的分类(求不同值域的面积)
问题描述:如果得到的一个连续的影响数值数据,但是我们想求取某一段值域的面积占比,需要进行以下操作: 1.按照数值重分类,将某段数值变成一个类别 2.栅格转矢量,再求取面积...
C++ 函数
函数是一组一起执行一个任务的语句。每个 C 程序都至少有一个函数,即主函数 main() ,所有简单的程序都可以定义其他额外的函数。 您可以把代码划分到不同的函数中。如何划分代码到不同的函数中是由您来决定的,但在逻辑上,划分通常…...
关于如何创建一个windows窗口的exe文件
如何创建一个windows窗口exe文件,具体参照这个博主: http://t.csdn.cn/pfQK5 以下是实现代码,注意用vs打开: #pragma comment( linker, "/subsystem:\"windows\" /entry:\"WinMainCRTStartup\"" …...
re学习(33)攻防世界-secret-galaxy-300(动态调试)
下载压缩包: 下载链接:https://adworld.xctf.org.cn/challenges/list 参考文章:攻防世界逆向高手题之secret-galaxy-300_沐一 林的博客-CSDN博客 发现这只是三个同一类型文件的三个不同版本而已,一个windows32位exe࿰…...
springboot工程集成前端编译包,用于uni-app webView工程,解决其需独立部署带来的麻烦,场景如页面->画布->图片->pdf
前端工程 访问方式 http://127.0.0.1:8080/context/frontEnd/index放行 public class SecurityConfig extends WebSecurityConfigurerAdapter { "/frontEnd/**",SysFrontEndController import lombok.extern.slf4j.Slf4j; import nl.basjes.shaded.org.springfram…...
NeuralNLP-NeuralClassifier的使用记录(二),训练预测自己的【中文文本多分类】
NeuralNLP-NeuralClassifier的使用记录,训练预测自己的【中文文本多分类】 数据准备: 与英文的训练预测一致,都使用相同的数据格式,将数据通过代码处理为JSON格式,以下是我使用的一种,不同的原数据情况…...
express学习笔记8 - 文件上传 下载以及预览
一、上传 1、 安装multer (任意选其中一种) yarn add multer --S npm install multer --S 2、新建配置文件(utils/multerConfig) const multer require(multer); const mkdirp require(mkdirp); // const sd require(silly-datetime); const path require(path);con…...
Python系统学习1-9-类(一)
一、类之初印象 1、类就是空表格,将变量(列名)和函数(行为)结合起来 2、创建对象,表达具体行 3、创建类就是创建数据的模板 --操作数据时有提示 --还能再组合数据的行为 --结构更加清晰 4、类的内存分配…...
什么是公网、私网、内网、外网?
中午好,我的网工朋友。 最近经常有很多小白朋友在问,公网、私网、内网、外网,这些的概念是啥样的,又该怎么去界定。 关于IP地址,确实没有太明确的区分,其实也不必太过咬文嚼字。 内网、外网就是一个参考…...
一篇文章教会你搭建私人kindle图书馆,并内网穿透实现公网访问
搭建私人kindle图书馆,并内网穿透实现公网访问 在电子书风靡的时期,大部分人都购买了一本电子书,虽然这本电子书更多的时候是被搁置在储物架上吃灰,或者成为盖泡面的神器,但当亚马逊发布消息将放弃电子书在中国的服务…...
好用的安卓手机投屏到mac分享
工具推荐:scrcpy github地址:https://github.com/Genymobile/scrcpy/tree/master mac使用方式 安装环境,打开terminal,执行以下命令,没有brew的先安装brew brew install scrcpy brew install android-platform-too…...
df -h
df -h 命令用于查看磁盘占用的空间 Filesystem:表示该文件系统位于哪个分区,因此该列显示的是设备名称; Used:表示用掉的磁盘空间大小; Available:表示剩余的磁盘空间大小; Use%:磁盘…...
彻底卸载Android Studio
永恒的爱是永远恪守最初的诺言。 在安装Android Studio会有很多问题导致无法正常运行,多次下载AS多次错误后了解到,删除以下四个文件才能彻底卸载Android Studio。 第一个文件:.gradle 路径:C:\Users\yao(这里yao是本…...
QT 5.12配置OpenCV3.4.10
主要过程:使用cmake编译源码,生成Mingw64位 下的OpenCV库 三篇博客解决问题: 1.Windows下安装Qt并使用cmake配置opencv3.4.10(含错误记录及解决办法)_d:\qt\qt5.14.2\5.14.2\mingw73_64\include\qtcore\qg_会飞的DA象的博客-CSDN博客 2.【…...
Qt应用开发(基础篇)——选项卡窗口 QTabWidget
一、前言 QTabWidget类继承于QWidget,是一个拥有选项卡的窗口部件。 QTabWidget类有一个选项卡栏QTabBar和一个页面区域,用来显示和选项卡相关联的界面。用户通过点击选项卡或者自定义快捷方式(ALTKey)切换页面。 二、QTabWidget类 1、count 该属…...
Socks5代理在多线程爬虫中的应用
在进行爬虫开发过程中,我们常常需要处理大量的数据,并执行多任务并发操作。然而,频繁的请求可能会引起目标网站的反爬机制,导致IP封禁或限制访问。为了规避这些限制,我们可以借助Socks5代理的强大功能,通过…...
GLM-5.1 全面支持与 Gemini CLI 集成:HagiCode 的多模型进化之路
GLM-5.1 全面支持与 Gemini CLI 集成:HagiCode 的多模型进化之路 本文介绍了 HagiCode 平台近期的重要更新——智谱 AI GLM-5.1 模型的全面支持,以及 Gemini CLI 作为第十个 Agent CLI 的成功集成。这两项更新进一步强化了平台的多模型能力和多 CLI 生态…...
告别桌面图标混乱:NoFences让你的数字空间井然有序
告别桌面图标混乱:NoFences让你的数字空间井然有序 【免费下载链接】NoFences 🚧 Open Source Stardock Fences alternative 项目地址: https://gitcode.com/gh_mirrors/no/NoFences 你是否曾打开电脑就被满屏散乱的图标淹没?工作文件…...
视频号推客模式系统小程序开发
开发一个基于微信视频号的推客模式系统小程序,需要结合微信生态的开放能力和推客(分销)模式的业务逻辑。以下是关键开发要点:微信小程序与视频号打通通过微信开放平台的JS-SDK实现小程序与视频号的互联互通。调用wx.openChannelsA…...
AI赋能51单片机开发:让快马平台智能生成复杂避障算法代码
最近在做一个基于51单片机的智能小车项目,需要实现复杂的避障功能。传统开发方式需要手动编写大量底层代码,调试起来特别耗时。不过这次尝试用InsCode(快马)平台的AI辅助功能,整个过程顺利了很多。 需求分析阶段 首先需要明确小车的核心功能&…...
Qwen3.5-9B-AWQ-4bit开源可部署教程:私有云/K8s集群中部署多实例视觉理解服务
Qwen3.5-9B-AWQ-4bit开源可部署教程:私有云/K8s集群中部署多实例视觉理解服务 1. 模型概述 Qwen3.5-9B-AWQ-4bit是一个支持图像理解的多模态模型,能够结合上传图片与文字提示词,输出中文分析结果。这个量化版本特别适合在资源受限的环境中部…...
bilibili-downloader完全指南:从入门到精通的4个关键步骤
bilibili-downloader完全指南:从入门到精通的4个关键步骤 【免费下载链接】bilibili-downloader B站视频下载,支持下载大会员清晰度4K,持续更新中 项目地址: https://gitcode.com/gh_mirrors/bil/bilibili-downloader 一、痛点分析&am…...
eNSP安装避坑指南:WinPcap/Wireshark/VirtualBox依赖关系解析
eNSP安装避坑指南:WinPcap/Wireshark/VirtualBox依赖关系解析 当你第一次打开eNSP安装包时,可能会疑惑为什么需要同时安装WinPcap、Wireshark和VirtualBox这三个看似不相关的软件。这就像组装一台精密仪器——少了任何一个螺丝,整台机器都无法…...
Phi-3-mini-4k-instruct-gguf多场景:覆盖个人提效、团队协作、客户支持全链路
Phi-3-mini-4k-instruct-gguf多场景:覆盖个人提效、团队协作、客户支持全链路 1. 认识Phi-3-mini-4k-instruct-gguf Phi-3-mini-4k-instruct-gguf是微软Phi-3系列中的轻量级文本生成模型GGUF版本。这个开箱即用的工具特别适合处理日常工作中的文本任务,…...
Mac用户的移动Win10工坊:从WTG配置到驱动、激活、文件共享的完整避坑指南
Mac用户的移动Win10工坊:从WTG配置到驱动、激活、文件共享的完整避坑指南 当Mac用户需要运行Windows应用时,双系统方案往往是最佳选择。而通过Windows To Go(WTG)技术将Win10安装在移动硬盘上,不仅保留了Mac原生系统的…...
56:L构建蓝队AI:蓝队的智能防御
作者: HOS(安全风信子) 日期: 2026-03-07 主要来源平台: GitHub 摘要: 面对基拉等高级威胁的不断进化,传统的蓝队防御手段已经难以应对。L构建了一套蓝队AI系统,通过AI驱动的威胁检测、自动响应和防御优化&…...
