当前位置: 首页 > news >正文

Flink多流处理之coGroup(协同分组)

这篇文章主要介绍协同分组coGroup的使用,先讲解API代码模板,后面会结图解介绍coGroup是如何将流中数据进行分组的.

1 API介绍

  • 数据源
    # 左流数据
    ➜  ~ nc -lk 6666
    101,Tom
    102,小明
    103,小黑
    104,张强
    105,Ken
    106,GG小日子
    107,小花
    108,赵宣艺
    109,明亮
    # 右流数据
    ➜  ~ nc -lk 7777
    101,,本科,程序员
    102,,本科,程序员
    103,,本科,会计
    104,,大专,安全工程师
    105,,硕士,律师
    106,未知,小本,挖粪使者
    108,,本科,人事
    110,,本科,算法工程师
  • 代码
    import org.apache.flink.api.common.functions.CoGroupFunction;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/10* @Description: 协同分组**/
    public class FlinkCoGroup {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(2);// 数据源1(socket数据源),为了方便测试,根据实际情况自行选择DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 6666);// 将数据进行切分返回Tuple2(id,name)SingleOutputStreamOperator<Tuple2<String, String>> mapStream1 = sourceStream1.map(value -> {String[] split = value.split(",");return Tuple2.of(split[0], split[1]);}).returns(new TypeHint<Tuple2<String, String>>() {});// 数据源2(socket数据源),为了方便测试,根据实际情况自行选择DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 7777);// 将数据进行切分返回Tuple4(id,gender,education,job)SingleOutputStreamOperator<Tuple4<String, String, String, String>> mapStream2 = sourceStream2.map(value -> {String[] split = value.split(",");return Tuple4.of(split[0], split[1], split[2], split[3]);}).returns(new TypeHint<Tuple4<String, String, String, String>>() {});// 数据流协同DataStream<Tuple4<String, String, String, String>> coGrouped = mapStream1.coGroup(mapStream2).where(tup -> tup.f0) // 左流协同分组字段(mapStream1).equalTo(tup -> tup.f0) // 右流协同分组字段(mapStream2).window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 开窗口,以处理时间划分(每20秒一个窗口).apply(new CoGroupFunction<Tuple2<String, String>, Tuple4<String, String, String, String>, Tuple4<String, String, String, String>>() {@Overridepublic void coGroup(Iterable<Tuple2<String, String>> first, Iterable<Tuple4<String, String, String, String>> second, Collector<Tuple4<String, String, String, String>> out) throws Exception {/***first 代表左流的迭代器* second 代表右流的迭代器* out 则是返回的数据形式* 具体方法中两个迭代器存数据的原理后续会通过图结合进行解析**/// 这里的逻辑模拟sql中left join// 遍历左流数据(first)for (Tuple2<String, String> left : first) {// 定义右流是否为NULL判断标识boolean flag = false;// 遍历右流数据(second)for (Tuple4<String, String, String, String> right : second) {// 返回left(id, name) + right(gender, education)Tuple4<String, String, String, String> tup4 = Tuple4.of(left.f0, left.f1, right.f1, right.f2);// 输出out.collect(tup4);// 修改判断标识flag = true;}// 如果右流为NULL,则输出左流的数据if (!flag) {// 这里用字符串"NULL"代替null值,方便观察Tuple4<String, String, String, String> tup4 = Tuple4.of(left.f0, left.f1, "NULL", "NULL");// 输出out.collect(tup4);}}}});// 打印结果coGrouped.print();env.execute("Flink CoGroup");}
    }
    
  • 结果
    2> (102,小明,男,本科)
    1> (106,GG小日子,未知,小本)
    2> (109,明亮,NULL,NULL)
    1> (107,小花,NULL,NULL)
    2> (105,Ken,男,硕士)
    2> (103,小黑,女,本科)
    2> (101,Tom,男,本科)
    2> (108,赵宣艺,女,本科)
    2> (104,张强,男,大专)
    
    从数据源和结果数据可以看到和代码逻辑是完全吻合的.

2 原理解析

我这我们先看一下图解,如下

在这里插入图片描述

  • 无界转有界
    在代码中我们开启window,这也是使用coGroup的必要条件,开启window后实际上就是将我们原本的无界数据流转变成一个以20S为界限的有界数据流.
  • 迭代器分组
    将数据进入到窗口内后,就会根据经我们前面设定的条件也就是.where.equalTo中的内容将mapStream1mapStream2中的数据根据key进行分组存储到不同的iterator中.
  • 逻辑计算
    上面已经将数据根据key都存储到iterator中了,这里就会根据我们在new CoGroupFunction<...>(){...}中的写的逻辑将mapStream1mapStream2中具有相同keyiterator进行计算.
  • 输出
    当一个window结束后,就会将数据按照计算后的结果(在代码中就是Tuple4<String, String, String, String>)输出到下游.

相关文章:

Flink多流处理之coGroup(协同分组)

这篇文章主要介绍协同分组coGroup的使用,先讲解API代码模板,后面会结图解介绍coGroup是如何将流中数据进行分组的. 1 API介绍 数据源# 左流数据 ➜ ~ nc -lk 6666 101,Tom 102,小明 103,小黑 104,张强 105,Ken 106,GG小日子 107,小花 108,赵宣艺 109,明亮# 右流数据 ➜ ~ n…...

基于TICK的DevOps监控实战(Ubuntu20.04系统,Telegraf+InfluDB+Chronograf+Kapacitor)

1、TICK简介 TICK是InfluxData开发的开源高性能时序中台&#xff0c;集成了采集、存储、分析、可视化等能力&#xff0c;由Telegraf, InfluDB, Chronograf, Kapacitor等4个组件以一种灵活松散、但又紧密配合&#xff0c;互为补充的方式构成。TICK专注于DevOps监控、IoT监控、实…...

十九、docker学习-Dockerfile

Dockerfile 官网地址 https://docs.docker.com/engine/reference/builder/Dockerfile其实就是我们用来构建Docker镜像的源码&#xff0c;当然这不是所谓的编程源码&#xff0c;而是一些命令的集合&#xff0c;只要理解它的逻辑和语法格式&#xff0c;就可以很容易的编写Docke…...

Docker容器的数据卷

1.数据卷的概念及作用 2.数据卷的配置 创建容器并挂载数据卷&#xff1a; docker run -it --namec1 -v /root/data:/root/data_container centos:7 /bin/bash按照容器挂载数据卷的原理&#xff0c;data_contianer这个目录下也会同步下来数据的更改。 3.一个容器挂载多个数据…...

推荐工具!使终端便于 DevOps 和 Kubernetes 使用

如果你熟悉 DevOps 和 Kubernetes 的使用&#xff0c;就会知道命令行界面&#xff08;CLI&#xff09;对于管理任务有多么重要。好在现在市面上有一些工具可以让终端在这些环境中更容易使用。在本文中&#xff0c;我们将探讨可以让工作流程简化的优秀工具&#xff0c;帮助你在 …...

抖音小程序实现less语言编译样式

1.在抖音开发工具中搜索扩展less 2. 然后点击小齿轮选择扩展设置 3. 然后在扩展设置中选择在settings.json中编辑# 4. 在settings.json中加入以下这段代码即可 // Easy LESS配置"less.compile": {"compress": false,//是否压缩"sourceMap": fal…...

介绍 TensorFlow 的基本概念和使用场景

TensorFlow 是一种开源的机器学习框架&#xff0c;由 Google 开发。它是用来构建和训练机器学习模型的强大工具&#xff0c;支持很多种不同类型的机器学习算法&#xff0c;并使用数据流图来表示计算过程。 TensorFlow 的核心是张量 (Tensor) 和计算图 (Graph)。 张量 (Tensor)…...

抖音关键词搜索小程序排名怎么做

抖音关键词搜索小程序排名怎么做 1 分钟教你制作一个抖音小程序。 抖音小程序就是我的视频&#xff0c;左下方这个蓝色的链接&#xff0c;点进去就是抖音小程序。 如果你有了这个小程序&#xff0c;发布视频的时候可以挂载这个小程序&#xff0c;直播的时候也可以挂载这个小…...

Windows下升级jdk1.8小版本

1.首先下载要升级jdk最新版本&#xff0c;下载地址&#xff1a;Java Downloads | Oracle 中国 2.下载完毕之后&#xff0c;直接双击下载完毕后的文件&#xff0c;进行安装。 3.安装完毕后&#xff0c;调整环境变量至新安装的jdk位置 4.此时&#xff0c;idea启动项目有可能会出…...

[保研/考研机试] KY235 进制转换2 清华大学复试上机题 C++实现

题目链接&#xff1a; KY235 进制转换2 https://www.nowcoder.com/questionTerminal/ae4b3c4a968745618d65b866002bbd32 描述 将M进制的数X转换为N进制的数输出。 输入描述&#xff1a; 输入的第一行包括两个整数&#xff1a;M和N(2<M,N<36)。 下面的一行输入一个数…...

机器学习 | Python实现KNN(K近邻)模型实践

机器学习 | Python实现KNN(K近邻)模型实践 目录 机器学习 | Python实现KNN(K近邻)模型实践基本介绍模型原理源码设计学习小结参考资料基本介绍 一句话就可以概括出KNN(K最近邻算法)的算法原理:综合k个“邻居”的标签值作为新样本的预测值。更具体来讲KNN分类过程,给定一个训…...

Mybatis 源码 ③ :SqlSession

一、前言 Mybatis 官网 以及 本系列文章地址&#xff1a; Mybatis 源码 ① &#xff1a;开篇Mybatis 源码 ② &#xff1a;流程分析Mybatis 源码 ③ &#xff1a;SqlSessionMybatis 源码 ④ &#xff1a;TypeHandlerMybatis 源码 ∞ &#xff1a;杂七杂八 在 Mybatis 源码 ②…...

Python 潮流周刊#15:如何分析异步任务的性能?

△点击上方“Python猫”关注 &#xff0c;回复“1”领取电子书 你好&#xff0c;我是猫哥。这里每周分享优质的 Python、AI 及通用技术内容&#xff0c;大部分为英文。标题取自其中一则分享&#xff0c;不代表全部内容都是该主题&#xff0c;特此声明。 本周刊精心筛选国内外的…...

二叉搜索树K和KV结构模拟

一 什么是二叉搜索树 这个的结构特性非常重要&#xff0c;是后面函数实现的结构基础&#xff0c;二叉搜索树的特性是每个根节点都比自己的左树任一节点大&#xff0c;比自己的右树任一节点小。 例如这个图&#xff0c; 41是根节点&#xff0c;要比左树大&#xff0c;比右树小&…...

nlohmann json:检查object是否存在某个键

1.通过find进行检查 #include <iostream> #include <nlohmann/json.hpp> using namespace std; using json = nlohmann::json;int main() {json data = R"({"name": "xiaoming","age": 10, "parent": [{"fat…...

15-1_Qt 5.9 C++开发指南_Qt多媒体模块概述

多媒体功能指的主要是计算机的音频和视频的输入、输出、显示和播放等功能&#xff0c;Qt 的多媒体模块为音频和视频播放、录音、摄像头拍照和录像等提供支持&#xff0c;甚至还提供数字收音机的支持。本章将介绍 Qt 多媒体模块的功能和使用。 文章目录 1. Qt 多媒体模块概述2. …...

分页查询中起始位置的计算

在分页查询中&#xff0c;page 和 pageSize 其实就是表示页数和每页的条数。这两个参数通常用于在数据库查询时进行分页。 如果你想根据 page 和 pageSize 计算数据的起始位置&#xff08;例如&#xff0c;MySQL数据库的LIMIT查询&#xff09;&#xff0c;可以使用以下公式&am…...

Failed to execute goal org.apache.maven.plugins

原因&#xff1a; 这个文件D:\java\maven\com\ruoyi\pg-student\maven-metadata-local.xml出了问题 解决&#xff1a; 最简单的直接删除D:\java\maven\com\ruoyi\pg-student\maven-metadata-local.xml重新打包 或者把D:\java\maven\com\ruoyi\pg-student这个目录下所有文件…...

50吨收费站生活一体化污水处理设备厂家价格低

50吨收费站生活一体化污水处理设备厂家价格低 设备工艺说明 污水处理设备主要用于生活污水和与之类似的工业有机废水的处理&#xff0c;其主要处理方法是采用目前较为成熟的生化处理技术—生物接触氧化&#xff0c;水质设计按一般生活污水水质设计计算&#xff0c;按BOD5平均20…...

UG NX二次开发(C#)-CAM-获取刀具类型

文章目录 1、前言2、UG NX中的刀具类型3、获取刀具类型3.1 刀具类型帮助文档1、前言 在UG NX的加工模块,加工刀具是一个必要的因素,其包括了多种类型的类型,有铣刀、钻刀、车刀、磨刀、成型刀等等,而且每种刀具所包含的信息也各不相同。想获取刀具的信息,那就要知道刀具的…...

使用 Ace Data Cloud 的 Kling 视频生成 API 创建惊艳视频

简介 在数字内容创作日益普及的今天&#xff0c;视频已经成为了重要的传播媒介。Ace Data Cloud 提供的 Kling API 是一款强大的工具&#xff0c;可以帮助开发者和创作者轻松生成高质量的视频内容。无论是制作短片、广告视频&#xff0c;还是其他各种视觉作品&#xff0c;Klin…...

蓝牙HC-05调试避坑指南:从AT指令到手机控制LED的完整流程

HC-05蓝牙模块实战指南&#xff1a;从AT指令解析到手机控制LED全流程 当你第一次拿到HC-05蓝牙模块时&#xff0c;是否被那些神秘的AT指令和复杂的配置过程困扰&#xff1f;本文将带你深入HC-05的核心功能&#xff0c;避开那些新手常踩的"坑"&#xff0c;实现从基础配…...

SITS2026独家解密:LLM边缘部署的7层压缩栈(含实测吞吐提升217%的INT4量化方案)

第一章&#xff1a;SITS2026独家解密&#xff1a;LLM边缘部署的7层压缩栈&#xff08;含实测吞吐提升217%的INT4量化方案&#xff09; 2026奇点智能技术大会(https://ml-summit.org) SITS2026首次公开完整披露面向端侧LLM推理的七层协同压缩架构&#xff0c;该栈在树莓派5RP2…...

保姆级教程:在Ubuntu 22.04上编译运行Vector XCPlite-5.3,并连接CANoe进行标定

从零构建汽车电子标定系统&#xff1a;Ubuntu 22.04下XCPlite-5.3与CANoe实战指南 当ECU开发进入功能验证阶段&#xff0c;标定工程师常面临这样的困境&#xff1a;如何在Linux环境中快速搭建符合ASAM XCP标准的标定系统&#xff1f;Vector开源的XCPlite-5.3解决方案恰好填补了…...

vLLM-v0.17.1实战体验:3步搭建大模型API服务,实测推理速度翻倍

vLLM-v0.17.1实战体验&#xff1a;3步搭建大模型API服务&#xff0c;实测推理速度翻倍 1. vLLM框架简介与核心优势 vLLM是一个专为大语言模型推理优化的高性能服务框架&#xff0c;由加州大学伯克利分校Sky Computing Lab开发并开源。最新发布的v0.17.1版本在推理速度、内存管…...

5分钟掌握英雄联盟LCU智能助手:数据驱动游戏水平提升的完整指南

5分钟掌握英雄联盟LCU智能助手&#xff1a;数据驱动游戏水平提升的完整指南 【免费下载链接】League-Toolkit An all-in-one toolkit for LeagueClient. Gathering power &#x1f680;. 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 你是否在英雄联盟中…...

【计算机网络】思科实验:OSPF多区域配置与链路状态数据库解析

1. OSPF多区域配置实战指南 第一次接触OSPF多区域配置时&#xff0c;我被那些LSA类型和区域边界搞得头晕眼花。直到在真实项目里把整个网络搞瘫痪过一次&#xff0c;才真正理解多区域设计的精妙之处。这次我们就用Packet Tracer搭建一个包含Area 0、Area 1和Area 2的完整实验环…...

AI原生研发运维自动化成熟度评估矩阵(CMMI-AIOps 2.1版):含19项量化指标、自测工具包与TOP3瓶颈突破路线图

第一章&#xff1a;AI原生研发运维自动化成熟度评估矩阵&#xff08;CMMI-AIOps 2.1版&#xff09;概览 2026奇点智能技术大会(https://ml-summit.org) CMMI-AIOps 2.1版是面向AI原生系统全生命周期的评估框架&#xff0c;聚焦模型开发、训练调度、推理服务、可观测性治理与自…...

hive strict 严格模式

Hive的严格模式&#xff08;Strict Mode&#xff09;是一道经典的面试题。它的核心是一个安全防护机制&#xff0c;通过限制执行高风险的查询&#xff0c;来防止单个“烂SQL”拖垮整个集群。 &#x1f512; 严格模式禁止的三大类查询 在 hive.mapred.modestrict 模式下&#…...

STM32上跑矩阵运算老是卡死?可能是你没避开CMSIS-DSP库的这些‘坑’

STM32上跑矩阵运算老是卡死&#xff1f;可能是你没避开CMSIS-DSP库的这些‘坑’ 当你第一次在STM32上尝试使用CMSIS-DSP库进行矩阵运算时&#xff0c;那种兴奋感很快就会被现实浇灭——程序莫名其妙地卡死、计算结果全错&#xff0c;或者性能远低于预期。这不是你的错&#xf…...