Flink多流处理之Broadcast(广播变量)
写过Spark批处理的应该都知道,有一个广播变量broadcast这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有broadcast,简单来说和Spark中的类似,但是有所区别,首先Spark中的broadcast是静态的数据,而Flink中的broadcast是动态的,也就是源源不断的数据流.在Flink中会将广播的数据存到state中.

在Flink中主流数据可以获取state中的所有状态数据,使用过window的应该都清楚,当两个streamData中的数据到达窗口的时间刚好错过时就会发生关联不上的情况,如window是2S,sreamData1到达窗口的时间刚好卡在这个2S窗口的尾端,而streamData到达窗口时,这个窗口已经结束了,这种情况就算这两条数据有相同id也无法进行关联了.
但是broadcast会将到达的数据都存储在state中,这样主流到达的每一条数据都可以和state中的广播流数据进行关联比较.

流程图内容可能不够准确,只是为了看起来方便理解.
- 数据源
# 主流数据 ➜ ~ nc -lk 1234 101,浏览商品,2023-08-02 102,浏览商品,2023-08-02 103,查看商品价格,2023-08-04 101,商品加入购物车,2023-08-03 101,从购物车删除商品,2023-08-03 102,下单,2023-08-02 102,申请延期发货,2023-08-03 103,点击商品详情页,2023-08-04 104,点击收藏,2023-08-05 104,下单,2023-08-05 104,付款,2023-08-06 105,浏览商品,2023-08-07 106,浏览商品,2023-08-07 106,加入购物车,2023-08-08 107,浏览商品,2023-08-10# 广播流数据 ➜ ~ nc -lk 5678 101,小明 102,张丽 103,公孙飞天 104,王二虎 106,李四 108,赵屋面 - 代码
import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/11* @Description: 多流操作-广播流**/ public class FlinkBroadcast {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据集源1作为主流数据(用户行为日志[id,behavior,date])DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234);// 将字符串切割处理SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() {});// 数据源2作为广播流数据(用户信息(id,name))DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678);// 将字符串切割处理SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() {});// 将广播流数据源进行广播/***参数说明* 这里需要我们传入一个MapStateDescriptor,其实就是一个Map结构的数据<k,v>* <String, Tuple2<String, String>>,第一个String类型就是广播流和主流连接的字段,在这个代码中就是id,由实际业务决定* <String, Tuple2<String, String>>,第二个Tuple2<String, String>就是实际广播数据流的数据,由实际业务决定* "userInfo"就是给一个名字,这个自定义无强制要求**/// 先构建一个状态,后面也会使用MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState);// 将主流数据和广播流数据使用connect连接/*** 我们将数据转变成广播流之后,在Flink中也不知哪个数据流需要使用这个广播流(userInfoBroadStream),* 这个时候就需要我们自己将主流数据和该广播流数据进行连接**/BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream);/*** 在process()中有两类函数供我们选择,KeyedBroadcastProcessFunction和BroadcastProcessFunction,* 这里要注意当"connectedStream"是KeyedStream时选择KeyedBroadcastProcessFunction* 当"connectedStream"不是KeyedStream时选择BroadcastProcessFunction就可以.* 使用keyBy算子返回的就是KeyedStream**/SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() {// 这个方法写主流数据处理逻辑@Overridepublic void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {/*** 要注意,这里我们最好从ReadOnlyContext来获取广播状态数据,因为获取只读的状态数据可以保证数据的安全性,* 如果是通过成员变量的方式获取可修改的状态数据,就会存在数据不安全的问题,如在代码逻辑中出现了对状态数据* 修改的代码,那么共享此状态的并行算子可能看到的状态数据不一致,就会导致数据错误或者代码报错.* 而使用ReadOnlyContext就可以保证processElement这个方法中我们只对状态数据进行读取.**/ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);if (broadcastState != null) {// 通过主流中的ID作为key获取广播变量中的用户信息Tuple2<String, String> userInfo = broadcastState.get(value.f0);// 输出数据的形式(id,behavior,date,name)if (userInfo == null) {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1);}} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");}}// 这个方法写广播流数据处理逻辑@Overridepublic void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception {// 使用Context获取状态BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);// 将数据存入到状态中broadcastState.put(value.f0, value);}});// 打印结果resultStream.print();env.execute("Flink broadcast");} } - 结果
代码内容就不进行详细解释了,注释基本都写清楚了,如有疑问可评论提问,共同探讨.3> 101,浏览商品,2023-08-02,小明 3> 101,商品加入购物车,2023-08-03,小明 3> 102,申请延期发货,2023-08-03,张丽 3> 104,下单,2023-08-05,王二虎 3> 106,浏览商品,2023-08-07,李四 1> 102,浏览商品,2023-08-02,张丽 1> 101,从购物车删除商品,2023-08-03,小明 1> 103,点击商品详情页,2023-08-04,公孙飞天 1> 104,付款,2023-08-06,王二虎 1> 106,加入购物车,2023-08-08,李四 2> 103,查看商品价格,2023-08-04,公孙飞天 2> 102,下单,2023-08-02,张丽 2> 104,点击收藏,2023-08-05,王二虎 2> 105,浏览商品,2023-08-07,NULL 2> 107,浏览商品,2023-08-10,NULL
相关文章:
Flink多流处理之Broadcast(广播变量)
写过Spark批处理的应该都知道,有一个广播变量broadcast这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有broadcast,简单来说和Spark中的类似,但是有所区别,首先Spark中的broadcast是静态的数据,而Flink中的broadcast是动态的,也就是源源不断的数据流.在…...
LVS/DR+Keepalived负载均衡实战(一)
引言 负载均衡这个概念对于一个IT老鸟来说再也熟悉不过了,当听到此概念的第一反应是想到举世闻名的nginx,但殊不知还有一个大名鼎鼎的负载均衡方案可能被忽略了,因为对于一般系统来说,很多应用场合中采用nginx基本已经满足需求&a…...
测试DWPose的onnx +Unity barracuda
环境: Unity Barracuda 3.0.1 从github直接拉取的barracuda仓库才能装到这个版本Barracuda以后不再升级了,会迁移到Unity AI大计划里的Sentis 我想申请的来着但好像已经不开放了 Unity 2021.3.20模型:dw-ll_ucoco_384.onnx 报了一些错&…...
轻装上阵,不调用jar包,用C#写SM4加密算法【卸载IKVM 】
前言 记得之前写了一个文章,是关于java和c#加密不一致导致需要使用ikvm的方式来进行数据加密,主要是ikvm把打包后的jar包打成dll包,然后Nuget引入ikvm,从而实现算法的统一,这几天闲来无事,网上找了一下加密…...
redis学习笔记(一)
文章目录 一、引言二、redis介绍2.1、定义2.2、Redis的数据类型及主要特性2.3、Redis的应用场景有哪些? 三、redis环境安装3.1、下载和安装 一、引言 在Web应用发展的初期,那时关系型数据库受到了较为广泛的关注和应用,原因是因为那时候Web站…...
最优化问题 - 拉格朗日对偶
primal 原问题 dual 对偶问题 目标函数 约束条件 可行域D 对偶专题 “拉格朗日对偶问题”如何直观理解?“KKT条件” “Slater条件” “凸优化”打包理解——bilibili 王木头 拉格朗日乘子法与对偶问题...
关于ISO27701隐私信息安全管理体系介绍
01 什么是ISO27701 ISO27701是对ISO27001信息安全管理和ISO27002安全控制的隐私扩展,全称《安全技术—扩展ISO27001和ISO27002的隐私信息管理—要求与指南》,是ISO标准委员会以ISO 27001为基准,以ISO27552为蓝本,建立发布的隐私…...
C语言案例 分数列求和-11
题目:有一分数列:2 / 1,3 / 2,5 / 3,8 / 5,13 / 8,21 / 13 …求出这个数列的前20项之和。 程序分析 这是一个典型的分数列数学逻辑题,考究这类题目是需要从已知的条件中找到它们的分布规律 我们把前6荐的分子与分母分别排列出来,…...
Git 入门
一、版本控制 1.1 什么是版本控制 版本控制(Revision control)是一种在开发的过程中用于管理我们对文件、目录或工程等内容的修改历史,方便查看更改历史记录,备份以便恢复以前的版本的软件工程技术。简单说就是用于管理多人协同开…...
PAT 1010 Radix
个人学习记录,代码难免不尽人意 Given a pair of positive integers, for example, 6 and 110, can this equation 6 110 be true? The answer is yes, if 6 is a decimal number and 110 is a binary number. Now for any pair of positive integers N 1and N 2…...
ruoyi-cloud微服务新建子模块
目录 相关文章1、复制system模块2、在modules下的 pom.xml文件中添加子模块 test3、进入 test模块修改 pom.xml4、修改对应的包名、目录名和启动应用程序为test5、修改bootstrap.yml文件中的端口号和应用名称6、nacos中克隆 system-dev.yml的配置,修改名称为 test-d…...
Dijkstra(求最短路)
时间复杂是 O(n2m) ,n 表示点数,m 表示边数 模板(朴素法一般m等于n^2的时候使用) #include<bits/stdc.h> #include<algorithm> using namespace std; const int N510; int g[N][N]; //为稠密阵所以用邻接矩阵存储 int dist[N]; //用…...
React 脚手架
1.React 定义 React 脚手架(React boilerplate)是一种预先设置好的、可以快速启动 React 项目的工具。脚手架已经包含了 React、Webpack、Babel、ESLint、Jest 等一些常用的工具和库,并已经配置好了这些工具的参数,可以直接使用和…...
CTFSHOW php命令执行
目录 web29 过滤flag web30 过滤system php web31 过滤 cat|sort|shell|\. 这里有一个新姿势 可以学习一下 web32 过滤 ; . web33 web34 web35 web36 web37 data伪协议 web38 短开表达式 web39 web40 __FILE__命令的扩展 web41 web42 重定向…...
侧滑置顶,取消置顶
第一步:布局 <?xml version"1.0" encoding"utf-8"?> <com.ddmh.magic.camera.ui.widget.SwipeMenuLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:app"http://schemas.android.com/apk/res-auto"…...
Pycharm解决启动时候索引慢的问题
设置里去掉update里面的两个勾 shared indexes中,把自动下载索引改成不下载使用本地索引...
Http请求响应时间一般划分标准
HTTP请求的响应时间被认为是长或短通常取决于具体应用场景和性能需求。一般来说,以下是一些常见的对HTTP请求响应时间进行划分的标准: 即时响应:通常在毫秒级别的响应时间被认为是即时响应。这适用于对实时性要求较高的应用,如实时…...
生成测试报告,在Unittest框架中就是简单
测试套件(Test Suite)是测试用例、测试套件或两者的集合,用于组装一组要运行的测试(多个测试用例集合在一起)。 (1)创建一个测试套件: import unittest suite unittest.TestSuite…...
生成式人工智能的潜在有害影响与未来之路(一)
这是本文的第1版,反映了截至2023年5月15日,Generative AI的已记载的和预期的危害。由于Generative AI的发展、使用和危害的快速变化,我们承认这是一篇内在的动态论文,未来会发生变化。 在本文中,我们使用一种标准格式…...
lightdb23.3 表名与包名不能重复
LightDB 表名与包名不能重复 从 LightDB 23.3 版本开始表名和包名不能重复,与 oracle 一致。原先已已支持包名和schema名不能重复。 背景 在之前版本在同一schema 下可以创建相同名字的表和包。这会导致在存储过程中使用%type指定变量类型时,如果存在…...
容器日志还在切窗口查?VSCode 2026实时查看已支持结构化JSON高亮+错误自动聚类(仅限Insiders 2026.2+)
更多请点击: https://intelliparadigm.com 第一章:VSCode 2026容器日志实时查看功能概览 VSCode 2026 引入了原生集成的容器日志流式监听机制,无需额外安装扩展即可在内置终端或专用日志面板中实时捕获 Docker、Podman 及 Kubernetes Pod 的…...
第66篇:AI项目商业化中的常见“坑”——技术理想主义与市场现实的碰撞(踩坑总结)
文章目录问题现象:技术完美,市场不买账排查过程:从技术指标到商业价值的追问根本原因:技术思维与商业思维的错位解决方案:如何跨越理想与现实的鸿沟举一反三:其他领域的“理想主义”之坑问题现象࿱…...
告别Remote-SSH!VSCode 2026原生Device Sync协议详解(含Wireshark抓包分析+自定义Endpoint配置模板)
更多请点击: https://intelliparadigm.com 第一章:VSCode 2026 Device Sync协议的演进与设计哲学 VSCode 2026 引入的 Device Sync 协议并非简单延续旧有状态同步机制,而是以“设备语义感知”(Device-Semantic Awareness…...
GitHub 1.2 万星 Qt 项目 VNote 源码解读(二):Markdown 文本渲染
VNote 的 Markdown 文档是使用 QWebEngineView 这个组件来展示的,这是一个基于 Chromium 内核的浏览器组件。在 Qt 下展示 Markdown 文档,QWebEngineView 可以说是最好的选择。因为 Markdown 实质上是 HTML,可以认为是面向写作排版设计的简化版 HTML,并且围绕 Markdown 的渲…...
完全开源的语言模型学习记录--TrilinearCIM架构
文章目录在这里插入图片描述一、一段话总结二、思维导图三、详细总结1. 研究动机与问题2. 核心技术方案3. 评估与结果4. 贡献与结论四、关键问题与答案https://arxiv.org/pdf/2604.07628 Trilinear Compute-in-Memory Architecture for Energy-Efficient Transformer Accelerat…...
Unity WebCamTexture实战:从权限申请到区域截图,一个完整AR证件照项目的避坑实录
Unity WebCamTexture实战:从权限申请到区域截图,一个完整AR证件照项目的避坑实录 在移动应用开发中,AR证件照功能正成为教育、社交和电商平台的热门需求。想象一下,用户只需打开手机摄像头,就能自动生成符合标准的证件…...
二叉树先序线索化及先序线索二叉树找后继
#include <stdio.h> #include <stdlib.h>// 线索二叉树结点 typedef struct ThreadNode {int data;struct ThreadNode *lchild, *rchild;int ltag, rtag; } ThreadNode, *ThreadTree;ThreadNode *pre NULL;void create(ThreadTree &T) {T (ThreadNode *)mal…...
MCP 2026工业适配紧急响应手册:当MES断连、DCS指令延迟>120ms、数字孪生体失步时,如何15分钟内定位根因并热修复?
更多请点击: https://intelliparadigm.com 第一章:MCP 2026工业适配紧急响应手册导论 MCP 2026(Modular Control Protocol 2026)是新一代面向高实时性、多协议融合的工业控制通信标准,已纳入IEC 61158-17补充规范。本…...
如何5步完成微信聊天记录完整备份:终极数据安全解决方案
如何5步完成微信聊天记录完整备份:终极数据安全解决方案 【免费下载链接】WeChatExporter 一个可以快速导出、查看你的微信聊天记录的工具 项目地址: https://gitcode.com/gh_mirrors/wec/WeChatExporter 你是否曾担心手机里的珍贵对话会因设备故障而永久丢失…...
Phi-3-mini-4k-instruct-gguf实战手册:使用Prometheus+Grafana监控vLLM服务指标
Phi-3-mini-4k-instruct-gguf实战手册:使用PrometheusGrafana监控vLLM服务指标 1. 模型与部署环境介绍 Phi-3-Mini-4K-Instruct是一个38亿参数的轻量级开源模型,采用GGUF格式提供。该模型在Phi-3数据集上训练,专注于高质量和密集推理能力&a…...
