Flink广播流 BroadcastStream
文章目录
- 前言
- BroadcastStream代码示例
- Broadcast 使用注意事项
前言
Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。
广播流的使用通常涉及以下步骤:
-
定义MapStateDescriptor:首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。
-
创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的
broadcast()方法实现,并将MapStateDescriptor作为参数传入。 -
连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是Keyed流还是Non-Keyed流)连接起来。这通过调用非广播流的
connect()方法完成,并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream,它提供了process()方法用于处理数据。 -
处理数据:在
process()方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的KeyedBroadcastProcessFunction或BroadcastProcessFunction类型的处理函数。
广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如,当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。
总的来说,Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。
BroadcastStream代码示例
功能:将用户信息进行广播,从Kafka中读取用户访问记录,判断访问用户是否存在
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;import flink.demo.data.UserVo;
/*** 多流connect,并进行join**/
public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties = new Properties();proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");proterties.setProperty("group.id", "test");proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// proterties.setProperty("auto.offset.reset", "latest");FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test",new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute("kafkaTest");}private static class UserPvProcessorextends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {private static final long serialVersionUID = 1L;MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));@Override//用户信息处理public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out)throws Exception {// 将接收到的控制数据放到 broadcast state 中 ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+" " + value.f1);}@Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从 broadcast state 中拿到用户列表信息List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));if(userList!=null&&userList.size()>0) {Map<String,String> userMap=new HashMap<>();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());}
// System.out.println(userMap);JsonNode value = element.get("value");String userid=value.get("user").asText();String userName=userMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()+"存在用户"+userid+" "+userName +" "+time);}else {out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );}}else {out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );}}}
}
Broadcast 使用注意事项
- 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
- 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
- Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游tasks,但是元素到达的顺序可能不同,所以更新state时不能依赖元素到达的顺序;
- 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
- 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存
相关文章:
Flink广播流 BroadcastStream
文章目录 前言BroadcastStream代码示例Broadcast 使用注意事项 前言 Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同…...
IP数据报格式
每一行都由32位比特,即4个字节组成,每个格子称为字段或者域。IP数据报由20字节的固定部分和最大40字节的可变部分组成。 总长度 总长度为16个比特,该字段的取值以字节为单位,用来表示IPv4数据报的长度(首部长度数据载荷长度)最大…...
GET https://registry.npm.taobao.org/xxxx error (CERT_HAS_EXPIRED)解决
PNPM用的阿里源,提示意思是证书过期了,参考网上的解决办法。执行 pnpm config set registry https://registry.npmmirror.com 再用pnpm config get registry查看,确实是 https://registry.npmmirror.com 但是仍旧报错,发现还…...
SSM Java Web项目由于spring-mvc.xml配置不对带来的一系列问题
1 介绍 一年多前,我就买了好多关于Java开发类的书籍,内容关于Java Web实操、Spring 学习指南、Maven实战、IntelliJ IDEA软件开发与应用等等。可是由于工作繁忙,这些书没系统地看完。这也是参加工作后的无奈吧! 寒假期间的一周&…...
MySQL事务隔离
什么是事务隔离? 为了确保在并发事务执行时,各个事务之间能够相互独立、互不干扰地运行,从而保证数据的一致性。 事务的隔离级别 MySQL事务隔离为了满足不同场景,提供了4个事务隔离级别(严格来讲是InnoDB存储引擎支…...
Java基础知识总结(1)
Java概况 JavaSE是java分类中的标准版,是刚接触java要学习的基础知识。 JavaEE是java分类中的企业版,是java中的高级,涉及到的知识广泛。 JavaME中M是Micro的缩写,用在嵌入式等电子设备中。 Java软件工程师:通过Ja…...
脚手架原理之webpack处理html文件和模块打包
脚手架原理之webpack处理html文件和模块打包 为了更好的理解项目脚手架的使用,我们来学习一下webpack工具,因为脚手架的底层就是基于webpack工具实现的。 安装 webpack工具是基于nodejs的,所以首先要有nodejs环境,其次需要下载…...
Winform编程详解一:Form窗口
一、属性介绍 1. (Name) 窗体的对象标识符ID 2. Text 修改窗口左上角标题 3. Icon 修改窗口左上角图标,图标最合适大小 32*32 4. 修改窗体第一次出现位置 代码修改:StartPosition System.Windows.Forms.FormStartPosition.CenterScreen; 5…...
Windows Server 2025 Install Preview
前言 Windows Server 2025 带来了巨大的发展,例如面向所有人的热补丁、下一代 Active Directory 和 SMB、关键任务数据和存储、Hyper-V 和 AI 等 Windows Server 2025 Preview download 下载 已注册的预览体验成员可以直接导航到 Windows Server Insider Preview 下载页面。…...
四、MySQL
MySQL MySQL1.初识网站2.安装MySQL2.1 下载(最重要的一点是路径中不能有中文,哪怕是同级目录也不行)2.2安装补丁2.3安装2.4创建配置文件2.5初始化 3.启动MySQL4.连接测试4.1 设置密码4.2 查看已有的文件夹(数据库)4.3 …...
C#使用泛型自定义的方法设计队列CQueue<T>类
目录 一、涉及到的知识点 1.C#中的队列类 2.自定义队列的方法 (1)先设计一个CList<T>类 (2)再设计CQueue<T>类 二、自定义队列CQueue<T>类的实例 一、涉及到的知识点 1.C#中的队列类 在C#中实现队列类&a…...
IDEA自定义Maven仓库
Maven 是一款广泛应用于 Java 开发的工具,其作用类似于一个全自动的 JAR 包管理器,能够方便地导入开发所需的相关 JAR 包。在使用 Maven 进行 Java 程序开发时,开发者能够极大地提高开发效率。以下是关于如何安装 Maven 以及在 IDEA 中配置自…...
Codeql复现CVE-2018-11776学习笔记
基本使用 1、首先下载struts2漏洞版本源码: https://codeload.github.com/apache/struts/zip/refs/tags/STRUTS_2_3_20 2、构建codeql数据库(构建失败文末有解决办法): codeql database create ~/CodeQL/databases/struts2-2.3.…...
CVE-2024-27199 JetBrains TeamCity 身份验证绕过漏洞2
漏洞简介 TeamCity Web 服务器中发现了第二个身份验证绕过漏洞。这种身份验证旁路允许在没有身份验证的情况下访问有限数量的经过身份验证的端点。未经身份验证的攻击者可以利用此漏洞修改服务器上有限数量的系统设置,并泄露服务器上有限数量的敏感信息。 项目官网…...
ms office学习记录12:Excel学习记录㈥
数据工具 分列的其他运用:身份证号中“出生日期”切片:分列→固定宽度→下一步→切割出三列→下一步→不导入第一列→导入第二列且转换成日期→不导入第三列→完成 删除重复值:定位到要“数据”选项卡→删除重复项→取消全选再勾选要删除的…...
基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的条形码二维码检测系统(深度学习+UI界面+训练数据集+Python代码)
摘要:在物流和制造业中,开发一套高效的条形码与二维码识别系统显得尤为关键。本博文深入探讨了如何利用深度学习技术打造出一套先进的条形码及二维码检测系统,并且提供了一套完整的实施方案。该系统搭载了性能卓越的YOLOv8算法,并…...
npm yarn 一起使用报错
项目记录,具有独特性,仅供参考 项目好好的运行,前一天装个测试工具包, 突然就不行了,卸载重装也不行,所有的项目都安装失败,新起一个项目也不行,有时候某个单独安装一个包可以&…...
基于springboot实现驾校信息管理系统项目【项目源码+论文说明】计算机毕业设计
基于springboot实现驾校信息管理系统演示 摘要 随着人们生活水平的不断提高,出行方式多样化,也以私家车为主,那么既然私家车的需求不断增长,那么基于驾校的考核管理也就不断增强,那么业务系统也就慢慢的随之加大。信息…...
DXP软件界面显示“No Hard Devices”【简单的操作问题】加【软件下载】
目录 一,DXP软件界面显示“No Hard Devices” 二,软件下载的百度网盘资源 一,DXP软件界面显示“No Hard Devices” Protel DXP是2004是澳大利亚Altium公司于2002年推出的一款电子设计自动化软件。它的主要功能包括:原理图编辑、印…...
通过Spring Boot 实现页面配置生成动态接口?
流程介绍 在Spring Boot中实现页面配置生成动态接口通常涉及几个关键步骤: 设计页面配置:首先,你需要设计一个用户界面(UI),允许用户通过此界面来配置接口的各种参数,例如HTTP方法(GET、POST等)、URL路径、请求参数、响应数据格式等。保存配置信息:当用户通过页面配置…...
如何免越狱定制iPhone界面:Cowabunga Lite完整使用指南
如何免越狱定制iPhone界面:Cowabunga Lite完整使用指南 【免费下载链接】CowabungaLite iOS 15 Customization Toolbox 项目地址: https://gitcode.com/gh_mirrors/co/CowabungaLite Cowabunga Lite是一款专为iOS 15设备设计的系统定制工具,让普通…...
MATLAB plot()函数实战:从数据到专业图表的完整工作流
1. 数据准备:从原始数据到可绘图格式 第一次用MATLAB画图时,我直接把Excel表格里的数据复制粘贴进去,结果plot()函数报错让我懵了半天。后来才发现,数据格式转换是绘图的第一步关键操作。假设你手头有一组温度传感器采集的时序数据…...
保姆级教程:在RK3588开发板上编译并加载Xilinx XDMA PCIe驱动(含完整Makefile解析)
RK3588与FPGA的PCIe通信实战:XDMA驱动编译与深度优化指南 当RK3588遇上FPGA,PCIe通信便成为两者之间高速数据交互的核心桥梁。作为一款广泛应用于边缘计算和嵌入式AI场景的ARM处理器,RK3588的PCIe 3.0 x4接口能够提供接近4GB/s的理论带宽&am…...
OpenClaw安全指南:gemma-3-12b-it本地化部署的权限管控策略
OpenClaw安全指南:gemma-3-12b-it本地化部署的权限管控策略 1. 为什么需要特别关注OpenClaw的权限管控? 上周我在调试一个自动化文档整理任务时,差点酿成大祸——OpenClaw误将我的工作目录/Documents/ProjectX识别为临时文件夹,…...
LaTeX2Word-Equation:学术公式无缝迁移的终极解决方案
LaTeX2Word-Equation:学术公式无缝迁移的终极解决方案 【免费下载链接】LaTeX2Word-Equation Copy LaTeX Equations as Word Equations, a Chrome Extension 项目地址: https://gitcode.com/gh_mirrors/la/LaTeX2Word-Equation 在学术写作与科研工作中&#…...
MIT-BEVFusion LiDAR Encoder 保姆级拆解:从点云到BEV特征图,手把手带你过一遍代码
MIT-BEVFusion LiDAR Encoder 深度解析:从点云到BEV特征图的完整实现路径 当自动驾驶系统需要理解周围环境时,LiDAR点云数据的高效处理成为关键挑战。MIT-BEVFusion框架中的LiDAR编码器模块,通过创新的稀疏卷积架构,将无序的三维点…...
Qwen3-Embedding-4B GPU算力优化:CUDA Stream并发执行向量化与相似度计算,吞吐提升1.8倍
Qwen3-Embedding-4B GPU算力优化:CUDA Stream并发执行向量化与相似度计算,吞吐提升1.8倍 1. 引言:当语义搜索遇上性能瓶颈 想象一下,你正在使用一个智能语义搜索工具,输入“我想吃点东西”,它立刻为你找到…...
seo网络推广专员有哪些发展前景
SEO网络推广专员的职业发展前景分析 在当今数字经济时代,网络推广已经成为企业营销的核心手段之一。而在网络推广的诸多角色中,SEO网络推广专员(Search Engine Optimization网络推广专员)无疑是其中最为关键的一环。作为一个SEO网…...
Qwen2.5-14B-Instruct入门指南:像素剧本圣殿UI组件与剧本结构映射关系解析
Qwen2.5-14B-Instruct入门指南:像素剧本圣殿UI组件与剧本结构映射关系解析 1. 工具概览与核心价值 像素剧本圣殿(Pixel Script Temple)是一款基于Qwen2.5-14B-Instruct大模型深度优化的专业剧本创作工具。它将AI强大的文本生成能力与独特的…...
Intv_AI_MK11 解决 403 Forbidden 错误:模型服务访问权限配置详解
Intv_AI_MK11 解决 403 Forbidden 错误:模型服务访问权限配置详解 1. 问题背景与解决思路 当你兴致勃勃地准备调用 Intv_AI_MK11 模型服务时,突然收到一个冷冰冰的 "403 Forbidden" 错误,这种体验就像拿着门票却被拦在演唱会门外…...
