Flink多流处理之join(关联)
Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup.
- 数据源
➜ ~ nc -lk 1111 101,A 102,B 103,C 104,D 105,E 106,F➜ ~ nc -lk 2222 101,A,男,程序员 102,B,男,程序员 103,C,男,会计 104,D,男,安全工程师 106,K,男,程序员 108,女,本科,人事 - 代码
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/10* @Description: 多流操作-join**/ public class FlinkJoin {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据源1,以socket作为数据源DataStreamSource<String> socketStream1 = env.socketTextStream("localhost", 1111);SingleOutputStreamOperator<String[]> mapStream1 = socketStream1.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 数据源2,以socket作为数据源DataStreamSource<String> socketStream2 = env.socketTextStream("localhost", 2222);SingleOutputStreamOperator<String[]> mapStream2 = socketStream2.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 关联数据流DataStream<String> joinedStream = mapStream1.join(mapStream2).where(arr -> arr[0]) // mapStream1以数组中的第一个字段作为关联字段.equalTo(arr -> arr[0]) // mapStream2以数组中的第一个字段作为关联字段.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 以20秒作为一个窗口.apply(new JoinFunction<String[], String[], String>() {// 这里是写关联后的具体逻辑@Overridepublic String join(String[] first, String[] second) throws Exception {String result = first[0] + "," + second[1] + "," + second[2] + "," + second[3];return result;}});// 打印结果数据joinedStream.print();env.execute("Flink join");} } - 结果
这个3> 103,C,男,会计 2> 106,K,男,程序员 2> 101,A,男,程序员 3> 104,D,男,安全工程师 3> 102,B,男,程序员API使用起来还是比较简单的,如果想实现left join或者right join的功能就需要通过coGroup来实现了.
相关文章:
Flink多流处理之join(关联)
Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup. 数据源➜ ~ nc -lk 1111 101,A 102,B 103,C 10…...
LeetCode Top100 Liked 题单(序号34~51)
34. Find First and Last Position of Element in Sorted Array 题意:找到非递减序列中目标的开头和结尾 我的思路 用二分法把每一个数字都找到,最后返回首尾两个数 代码 Runtime12 ms Beats 33.23% Memory14 MB Beats 5.16% class Solution {…...
视觉slam十四讲---第一弹三维空间刚体运动
1.旋转矩阵 1.1内积 1.2外积 1.3坐标系间的欧式变换 相机运动是一个刚体运动,它保证了同一个向量在各个坐标系下的长度和夹角都不会 发生变化。这种变换称为欧氏变换。 旋转矩阵:它是一个行列式为 1 的正交矩阵。 旋转矩阵为正交阵,它的逆…...
手把手教你配置Jenkins自动化邮件通知
完成基于Jenkins的持续集成部署后,自动化测试执行后,测试结果需要通知到相关人员,除了钉钉通知外我们还可以通过Email通知到对应负责人,这里记录一下测试结果通过Jenkins邮件通知的配置与部署 01、安装插件 方法1: 进…...
Arcgis连续数据的分类(求不同值域的面积)
问题描述:如果得到的一个连续的影响数值数据,但是我们想求取某一段值域的面积占比,需要进行以下操作: 1.按照数值重分类,将某段数值变成一个类别 2.栅格转矢量,再求取面积...
C++ 函数
函数是一组一起执行一个任务的语句。每个 C 程序都至少有一个函数,即主函数 main() ,所有简单的程序都可以定义其他额外的函数。 您可以把代码划分到不同的函数中。如何划分代码到不同的函数中是由您来决定的,但在逻辑上,划分通常…...
关于如何创建一个windows窗口的exe文件
如何创建一个windows窗口exe文件,具体参照这个博主: http://t.csdn.cn/pfQK5 以下是实现代码,注意用vs打开: #pragma comment( linker, "/subsystem:\"windows\" /entry:\"WinMainCRTStartup\"" …...
re学习(33)攻防世界-secret-galaxy-300(动态调试)
下载压缩包: 下载链接:https://adworld.xctf.org.cn/challenges/list 参考文章:攻防世界逆向高手题之secret-galaxy-300_沐一 林的博客-CSDN博客 发现这只是三个同一类型文件的三个不同版本而已,一个windows32位exe࿰…...
springboot工程集成前端编译包,用于uni-app webView工程,解决其需独立部署带来的麻烦,场景如页面->画布->图片->pdf
前端工程 访问方式 http://127.0.0.1:8080/context/frontEnd/index放行 public class SecurityConfig extends WebSecurityConfigurerAdapter { "/frontEnd/**",SysFrontEndController import lombok.extern.slf4j.Slf4j; import nl.basjes.shaded.org.springfram…...
NeuralNLP-NeuralClassifier的使用记录(二),训练预测自己的【中文文本多分类】
NeuralNLP-NeuralClassifier的使用记录,训练预测自己的【中文文本多分类】 数据准备: 与英文的训练预测一致,都使用相同的数据格式,将数据通过代码处理为JSON格式,以下是我使用的一种,不同的原数据情况…...
express学习笔记8 - 文件上传 下载以及预览
一、上传 1、 安装multer (任意选其中一种) yarn add multer --S npm install multer --S 2、新建配置文件(utils/multerConfig) const multer require(multer); const mkdirp require(mkdirp); // const sd require(silly-datetime); const path require(path);con…...
Python系统学习1-9-类(一)
一、类之初印象 1、类就是空表格,将变量(列名)和函数(行为)结合起来 2、创建对象,表达具体行 3、创建类就是创建数据的模板 --操作数据时有提示 --还能再组合数据的行为 --结构更加清晰 4、类的内存分配…...
什么是公网、私网、内网、外网?
中午好,我的网工朋友。 最近经常有很多小白朋友在问,公网、私网、内网、外网,这些的概念是啥样的,又该怎么去界定。 关于IP地址,确实没有太明确的区分,其实也不必太过咬文嚼字。 内网、外网就是一个参考…...
一篇文章教会你搭建私人kindle图书馆,并内网穿透实现公网访问
搭建私人kindle图书馆,并内网穿透实现公网访问 在电子书风靡的时期,大部分人都购买了一本电子书,虽然这本电子书更多的时候是被搁置在储物架上吃灰,或者成为盖泡面的神器,但当亚马逊发布消息将放弃电子书在中国的服务…...
好用的安卓手机投屏到mac分享
工具推荐:scrcpy github地址:https://github.com/Genymobile/scrcpy/tree/master mac使用方式 安装环境,打开terminal,执行以下命令,没有brew的先安装brew brew install scrcpy brew install android-platform-too…...
df -h
df -h 命令用于查看磁盘占用的空间 Filesystem:表示该文件系统位于哪个分区,因此该列显示的是设备名称; Used:表示用掉的磁盘空间大小; Available:表示剩余的磁盘空间大小; Use%:磁盘…...
彻底卸载Android Studio
永恒的爱是永远恪守最初的诺言。 在安装Android Studio会有很多问题导致无法正常运行,多次下载AS多次错误后了解到,删除以下四个文件才能彻底卸载Android Studio。 第一个文件:.gradle 路径:C:\Users\yao(这里yao是本…...
QT 5.12配置OpenCV3.4.10
主要过程:使用cmake编译源码,生成Mingw64位 下的OpenCV库 三篇博客解决问题: 1.Windows下安装Qt并使用cmake配置opencv3.4.10(含错误记录及解决办法)_d:\qt\qt5.14.2\5.14.2\mingw73_64\include\qtcore\qg_会飞的DA象的博客-CSDN博客 2.【…...
Qt应用开发(基础篇)——选项卡窗口 QTabWidget
一、前言 QTabWidget类继承于QWidget,是一个拥有选项卡的窗口部件。 QTabWidget类有一个选项卡栏QTabBar和一个页面区域,用来显示和选项卡相关联的界面。用户通过点击选项卡或者自定义快捷方式(ALTKey)切换页面。 二、QTabWidget类 1、count 该属…...
Socks5代理在多线程爬虫中的应用
在进行爬虫开发过程中,我们常常需要处理大量的数据,并执行多任务并发操作。然而,频繁的请求可能会引起目标网站的反爬机制,导致IP封禁或限制访问。为了规避这些限制,我们可以借助Socks5代理的强大功能,通过…...
相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下: 一、场景操作步骤 操作步…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
【Linux】Linux 系统默认的目录及作用说明
博主介绍:✌全网粉丝23W,CSDN博客专家、Java领域优质创作者,掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围:SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...
CSS | transition 和 transform的用处和区别
省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...
FFmpeg:Windows系统小白安装及其使用
一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】,注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录(即exe所在文件夹)加入系统变量…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
用递归算法解锁「子集」问题 —— LeetCode 78题解析
文章目录 一、题目介绍二、递归思路详解:从决策树开始理解三、解法一:二叉决策树 DFS四、解法二:组合式回溯写法(推荐)五、解法对比 递归算法是编程中一种非常强大且常见的思想,它能够优雅地解决很多复杂的…...
