当前位置: 首页 > news >正文

Flink多流处理之Broadcast(广播变量)

写过Spark批处理的应该都知道,有一个广播变量broadcast这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有broadcast,简单来说和Spark中的类似,但是有所区别,首先Spark中的broadcast是静态的数据,而Flink中的broadcast是动态的,也就是源源不断的数据流.在Flink中会将广播的数据存到state中.
在这里插入图片描述
在Flink中主流数据可以获取state中的所有状态数据,使用过window的应该都清楚,当两个streamData中的数据到达窗口的时间刚好错过时就会发生关联不上的情况,如window2S,sreamData1到达窗口的时间刚好卡在这个2S窗口的尾端,而streamData到达窗口时,这个窗口已经结束了,这种情况就算这两条数据有相同id也无法进行关联了.
但是broadcast会将到达的数据都存储在state中,这样主流到达的每一条数据都可以和state中的广播流数据进行关联比较.
在这里插入图片描述
流程图内容可能不够准确,只是为了看起来方便理解.

  • 数据源
    # 主流数据
    ➜  ~ nc -lk 1234
    101,浏览商品,2023-08-02
    102,浏览商品,2023-08-02
    103,查看商品价格,2023-08-04
    101,商品加入购物车,2023-08-03
    101,从购物车删除商品,2023-08-03
    102,下单,2023-08-02
    102,申请延期发货,2023-08-03
    103,点击商品详情页,2023-08-04
    104,点击收藏,2023-08-05
    104,下单,2023-08-05
    104,付款,2023-08-06
    105,浏览商品,2023-08-07
    106,浏览商品,2023-08-07
    106,加入购物车,2023-08-08
    107,浏览商品,2023-08-10
    
    # 广播流数据
    ➜  ~ nc -lk 5678
    101,小明
    102,张丽
    103,公孙飞天
    104,王二虎
    106,李四
    108,赵屋面
    
  • 代码
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/11* @Description: 多流操作-广播流**/
    public class FlinkBroadcast {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据集源1作为主流数据(用户行为日志[id,behavior,date])DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234);// 将字符串切割处理SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() {});// 数据源2作为广播流数据(用户信息(id,name))DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678);// 将字符串切割处理SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() {});// 将广播流数据源进行广播/***参数说明* 这里需要我们传入一个MapStateDescriptor,其实就是一个Map结构的数据<k,v>* <String, Tuple2<String, String>>,第一个String类型就是广播流和主流连接的字段,在这个代码中就是id,由实际业务决定* <String, Tuple2<String, String>>,第二个Tuple2<String, String>就是实际广播数据流的数据,由实际业务决定* "userInfo"就是给一个名字,这个自定义无强制要求**/// 先构建一个状态,后面也会使用MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState);// 将主流数据和广播流数据使用connect连接/*** 我们将数据转变成广播流之后,在Flink中也不知哪个数据流需要使用这个广播流(userInfoBroadStream),* 这个时候就需要我们自己将主流数据和该广播流数据进行连接**/BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream);/*** 在process()中有两类函数供我们选择,KeyedBroadcastProcessFunction和BroadcastProcessFunction,* 这里要注意当"connectedStream"是KeyedStream时选择KeyedBroadcastProcessFunction* 当"connectedStream"不是KeyedStream时选择BroadcastProcessFunction就可以.* 使用keyBy算子返回的就是KeyedStream**/SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() {// 这个方法写主流数据处理逻辑@Overridepublic void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {/*** 要注意,这里我们最好从ReadOnlyContext来获取广播状态数据,因为获取只读的状态数据可以保证数据的安全性,* 如果是通过成员变量的方式获取可修改的状态数据,就会存在数据不安全的问题,如在代码逻辑中出现了对状态数据* 修改的代码,那么共享此状态的并行算子可能看到的状态数据不一致,就会导致数据错误或者代码报错.* 而使用ReadOnlyContext就可以保证processElement这个方法中我们只对状态数据进行读取.**/ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);if (broadcastState != null) {// 通过主流中的ID作为key获取广播变量中的用户信息Tuple2<String, String> userInfo = broadcastState.get(value.f0);// 输出数据的形式(id,behavior,date,name)if (userInfo == null) {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1);}} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");}}// 这个方法写广播流数据处理逻辑@Overridepublic void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception {// 使用Context获取状态BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);// 将数据存入到状态中broadcastState.put(value.f0, value);}});// 打印结果resultStream.print();env.execute("Flink broadcast");}
    }
    
  • 结果
    3> 101,浏览商品,2023-08-02,小明
    3> 101,商品加入购物车,2023-08-03,小明
    3> 102,申请延期发货,2023-08-03,张丽
    3> 104,下单,2023-08-05,王二虎
    3> 106,浏览商品,2023-08-07,李四
    1> 102,浏览商品,2023-08-02,张丽
    1> 101,从购物车删除商品,2023-08-03,小明
    1> 103,点击商品详情页,2023-08-04,公孙飞天
    1> 104,付款,2023-08-06,王二虎
    1> 106,加入购物车,2023-08-08,李四
    2> 103,查看商品价格,2023-08-04,公孙飞天
    2> 102,下单,2023-08-02,张丽
    2> 104,点击收藏,2023-08-05,王二虎
    2> 105,浏览商品,2023-08-07,NULL
    2> 107,浏览商品,2023-08-10,NULL
    
    代码内容就不进行详细解释了,注释基本都写清楚了,如有疑问可评论提问,共同探讨.

相关文章:

Flink多流处理之Broadcast(广播变量)

写过Spark批处理的应该都知道,有一个广播变量broadcast这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有broadcast,简单来说和Spark中的类似,但是有所区别,首先Spark中的broadcast是静态的数据,而Flink中的broadcast是动态的,也就是源源不断的数据流.在…...

LVS/DR+Keepalived负载均衡实战(一)

引言 负载均衡这个概念对于一个IT老鸟来说再也熟悉不过了&#xff0c;当听到此概念的第一反应是想到举世闻名的nginx&#xff0c;但殊不知还有一个大名鼎鼎的负载均衡方案可能被忽略了&#xff0c;因为对于一般系统来说&#xff0c;很多应用场合中采用nginx基本已经满足需求&a…...

测试DWPose的onnx +Unity barracuda

环境&#xff1a; Unity Barracuda 3.0.1 从github直接拉取的barracuda仓库才能装到这个版本Barracuda以后不再升级了&#xff0c;会迁移到Unity AI大计划里的Sentis 我想申请的来着但好像已经不开放了 Unity 2021.3.20模型&#xff1a;dw-ll_ucoco_384.onnx 报了一些错&…...

轻装上阵,不调用jar包,用C#写SM4加密算法【卸载IKVM 】

前言 记得之前写了一个文章&#xff0c;是关于java和c#加密不一致导致需要使用ikvm的方式来进行数据加密&#xff0c;主要是ikvm把打包后的jar包打成dll包&#xff0c;然后Nuget引入ikvm&#xff0c;从而实现算法的统一&#xff0c;这几天闲来无事&#xff0c;网上找了一下加密…...

redis学习笔记(一)

文章目录 一、引言二、redis介绍2.1、定义2.2、Redis的数据类型及主要特性2.3、Redis的应用场景有哪些&#xff1f; 三、redis环境安装3.1、下载和安装 一、引言 在Web应用发展的初期&#xff0c;那时关系型数据库受到了较为广泛的关注和应用&#xff0c;原因是因为那时候Web站…...

最优化问题 - 拉格朗日对偶

primal 原问题 dual 对偶问题 目标函数 约束条件 可行域D 对偶专题 “拉格朗日对偶问题”如何直观理解&#xff1f;“KKT条件” “Slater条件” “凸优化”打包理解——bilibili 王木头 拉格朗日乘子法与对偶问题...

关于ISO27701隐私信息安全管理体系介绍

01 什么是ISO27701 ISO27701是对ISO27001信息安全管理和ISO27002安全控制的隐私扩展&#xff0c;全称《安全技术—扩展ISO27001和ISO27002的隐私信息管理—要求与指南》&#xff0c;是ISO标准委员会以ISO 27001为基准&#xff0c;以ISO27552为蓝本&#xff0c;建立发布的隐私…...

C语言案例 分数列求和-11

题目&#xff1a;有一分数列&#xff1a;2 / 1,3 / 2,5 / 3,8 / 5,13 / 8,21 / 13 …求出这个数列的前20项之和。 程序分析 这是一个典型的分数列数学逻辑题&#xff0c;考究这类题目是需要从已知的条件中找到它们的分布规律 我们把前6荐的分子与分母分别排列出来&#xff0c;…...

Git 入门

一、版本控制 1.1 什么是版本控制 版本控制&#xff08;Revision control&#xff09;是一种在开发的过程中用于管理我们对文件、目录或工程等内容的修改历史&#xff0c;方便查看更改历史记录&#xff0c;备份以便恢复以前的版本的软件工程技术。简单说就是用于管理多人协同开…...

PAT 1010 Radix

个人学习记录&#xff0c;代码难免不尽人意 Given a pair of positive integers, for example, 6 and 110, can this equation 6 110 be true? The answer is yes, if 6 is a decimal number and 110 is a binary number. Now for any pair of positive integers N 1and N 2…...

ruoyi-cloud微服务新建子模块

目录 相关文章1、复制system模块2、在modules下的 pom.xml文件中添加子模块 test3、进入 test模块修改 pom.xml4、修改对应的包名、目录名和启动应用程序为test5、修改bootstrap.yml文件中的端口号和应用名称6、nacos中克隆 system-dev.yml的配置&#xff0c;修改名称为 test-d…...

Dijkstra(求最短路)

时间复杂是 O(n2m) &#xff0c;n 表示点数&#xff0c;m 表示边数 模板(朴素法一般m等于n^2的时候使用) #include<bits/stdc.h> #include<algorithm> using namespace std; const int N510; int g[N][N]; //为稠密阵所以用邻接矩阵存储 int dist[N]; //用…...

React 脚手架

1.React 定义 React 脚手架&#xff08;React boilerplate&#xff09;是一种预先设置好的、可以快速启动 React 项目的工具。脚手架已经包含了 React、Webpack、Babel、ESLint、Jest 等一些常用的工具和库&#xff0c;并已经配置好了这些工具的参数&#xff0c;可以直接使用和…...

CTFSHOW php命令执行

目录 web29 过滤flag web30 过滤system php web31 过滤 cat|sort|shell|\. 这里有一个新姿势 可以学习一下 web32 过滤 &#xff1b; . web33 web34 web35 web36 web37 data伪协议 web38 短开表达式 web39 web40 __FILE__命令的扩展 web41 web42 重定向…...

侧滑置顶,取消置顶

第一步:布局 <?xml version"1.0" encoding"utf-8"?> <com.ddmh.magic.camera.ui.widget.SwipeMenuLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:app"http://schemas.android.com/apk/res-auto"…...

Pycharm解决启动时候索引慢的问题

设置里去掉update里面的两个勾 shared indexes中&#xff0c;把自动下载索引改成不下载使用本地索引...

Http请求响应时间一般划分标准

HTTP请求的响应时间被认为是长或短通常取决于具体应用场景和性能需求。一般来说&#xff0c;以下是一些常见的对HTTP请求响应时间进行划分的标准&#xff1a; 即时响应&#xff1a;通常在毫秒级别的响应时间被认为是即时响应。这适用于对实时性要求较高的应用&#xff0c;如实时…...

生成测试报告,在Unittest框架中就是简单

测试套件&#xff08;Test Suite&#xff09;是测试用例、测试套件或两者的集合&#xff0c;用于组装一组要运行的测试&#xff08;多个测试用例集合在一起&#xff09;。 &#xff08;1&#xff09;创建一个测试套件&#xff1a; import unittest suite unittest.TestSuite…...

生成式人工智能的潜在有害影响与未来之路(一)

这是本文的第1版&#xff0c;反映了截至2023年5月15日&#xff0c;Generative AI的已记载的和预期的危害。由于Generative AI的发展、使用和危害的快速变化&#xff0c;我们承认这是一篇内在的动态论文&#xff0c;未来会发生变化。 在本文中&#xff0c;我们使用一种标准格式…...

lightdb23.3 表名与包名不能重复

LightDB 表名与包名不能重复 从 LightDB 23.3 版本开始表名和包名不能重复&#xff0c;与 oracle 一致。原先已已支持包名和schema名不能重复。 背景 在之前版本在同一schema 下可以创建相同名字的表和包。这会导致在存储过程中使用%type指定变量类型时&#xff0c;如果存在…...

【杂谈】-递归进化:人工智能的自我改进与监管挑战

递归进化&#xff1a;人工智能的自我改进与监管挑战 文章目录 递归进化&#xff1a;人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管&#xff1f;3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

golang循环变量捕获问题​​

在 Go 语言中&#xff0c;当在循环中启动协程&#xff08;goroutine&#xff09;时&#xff0c;如果在协程闭包中直接引用循环变量&#xff0c;可能会遇到一个常见的陷阱 - ​​循环变量捕获问题​​。让我详细解释一下&#xff1a; 问题背景 看这个代码片段&#xff1a; fo…...

Spring Boot 实现流式响应(兼容 2.7.x)

在实际开发中&#xff0c;我们可能会遇到一些流式数据处理的场景&#xff0c;比如接收来自上游接口的 Server-Sent Events&#xff08;SSE&#xff09; 或 流式 JSON 内容&#xff0c;并将其原样中转给前端页面或客户端。这种情况下&#xff0c;传统的 RestTemplate 缓存机制会…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

ArcGIS Pro制作水平横向图例+多级标注

今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作&#xff1a;ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等&#xff08;ArcGIS出图图例8大技巧&#xff09;&#xff0c;那这次我们看看ArcGIS Pro如何更加快捷的操作。…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)

在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马&#xff08;服务器方面的&#xff09;的原理&#xff0c;连接&#xff0c;以及各种木马及连接工具的分享 文件木马&#xff1a;https://w…...