Flink广播流 BroadcastStream
文章目录
- 前言
- BroadcastStream代码示例
- Broadcast 使用注意事项
前言
Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。
广播流的使用通常涉及以下步骤:
-
定义MapStateDescriptor:首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。
-
创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的
broadcast()方法实现,并将MapStateDescriptor作为参数传入。 -
连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是Keyed流还是Non-Keyed流)连接起来。这通过调用非广播流的
connect()方法完成,并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream,它提供了process()方法用于处理数据。 -
处理数据:在
process()方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的KeyedBroadcastProcessFunction或BroadcastProcessFunction类型的处理函数。
广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如,当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。
总的来说,Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。
BroadcastStream代码示例
功能:将用户信息进行广播,从Kafka中读取用户访问记录,判断访问用户是否存在
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;import flink.demo.data.UserVo;
/*** 多流connect,并进行join**/
public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties = new Properties();proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");proterties.setProperty("group.id", "test");proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// proterties.setProperty("auto.offset.reset", "latest");FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test",new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute("kafkaTest");}private static class UserPvProcessorextends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {private static final long serialVersionUID = 1L;MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));@Override//用户信息处理public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out)throws Exception {// 将接收到的控制数据放到 broadcast state 中 ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+" " + value.f1);}@Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从 broadcast state 中拿到用户列表信息List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));if(userList!=null&&userList.size()>0) {Map<String,String> userMap=new HashMap<>();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());}
// System.out.println(userMap);JsonNode value = element.get("value");String userid=value.get("user").asText();String userName=userMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()+"存在用户"+userid+" "+userName +" "+time);}else {out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );}}else {out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );}}}
}
Broadcast 使用注意事项
- 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
- 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
- Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游tasks,但是元素到达的顺序可能不同,所以更新state时不能依赖元素到达的顺序;
- 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
- 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存
相关文章:
Flink广播流 BroadcastStream
文章目录 前言BroadcastStream代码示例Broadcast 使用注意事项 前言 Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同…...
IP数据报格式
每一行都由32位比特,即4个字节组成,每个格子称为字段或者域。IP数据报由20字节的固定部分和最大40字节的可变部分组成。 总长度 总长度为16个比特,该字段的取值以字节为单位,用来表示IPv4数据报的长度(首部长度数据载荷长度)最大…...
GET https://registry.npm.taobao.org/xxxx error (CERT_HAS_EXPIRED)解决
PNPM用的阿里源,提示意思是证书过期了,参考网上的解决办法。执行 pnpm config set registry https://registry.npmmirror.com 再用pnpm config get registry查看,确实是 https://registry.npmmirror.com 但是仍旧报错,发现还…...
SSM Java Web项目由于spring-mvc.xml配置不对带来的一系列问题
1 介绍 一年多前,我就买了好多关于Java开发类的书籍,内容关于Java Web实操、Spring 学习指南、Maven实战、IntelliJ IDEA软件开发与应用等等。可是由于工作繁忙,这些书没系统地看完。这也是参加工作后的无奈吧! 寒假期间的一周&…...
MySQL事务隔离
什么是事务隔离? 为了确保在并发事务执行时,各个事务之间能够相互独立、互不干扰地运行,从而保证数据的一致性。 事务的隔离级别 MySQL事务隔离为了满足不同场景,提供了4个事务隔离级别(严格来讲是InnoDB存储引擎支…...
Java基础知识总结(1)
Java概况 JavaSE是java分类中的标准版,是刚接触java要学习的基础知识。 JavaEE是java分类中的企业版,是java中的高级,涉及到的知识广泛。 JavaME中M是Micro的缩写,用在嵌入式等电子设备中。 Java软件工程师:通过Ja…...
脚手架原理之webpack处理html文件和模块打包
脚手架原理之webpack处理html文件和模块打包 为了更好的理解项目脚手架的使用,我们来学习一下webpack工具,因为脚手架的底层就是基于webpack工具实现的。 安装 webpack工具是基于nodejs的,所以首先要有nodejs环境,其次需要下载…...
Winform编程详解一:Form窗口
一、属性介绍 1. (Name) 窗体的对象标识符ID 2. Text 修改窗口左上角标题 3. Icon 修改窗口左上角图标,图标最合适大小 32*32 4. 修改窗体第一次出现位置 代码修改:StartPosition System.Windows.Forms.FormStartPosition.CenterScreen; 5…...
Windows Server 2025 Install Preview
前言 Windows Server 2025 带来了巨大的发展,例如面向所有人的热补丁、下一代 Active Directory 和 SMB、关键任务数据和存储、Hyper-V 和 AI 等 Windows Server 2025 Preview download 下载 已注册的预览体验成员可以直接导航到 Windows Server Insider Preview 下载页面。…...
四、MySQL
MySQL MySQL1.初识网站2.安装MySQL2.1 下载(最重要的一点是路径中不能有中文,哪怕是同级目录也不行)2.2安装补丁2.3安装2.4创建配置文件2.5初始化 3.启动MySQL4.连接测试4.1 设置密码4.2 查看已有的文件夹(数据库)4.3 …...
C#使用泛型自定义的方法设计队列CQueue<T>类
目录 一、涉及到的知识点 1.C#中的队列类 2.自定义队列的方法 (1)先设计一个CList<T>类 (2)再设计CQueue<T>类 二、自定义队列CQueue<T>类的实例 一、涉及到的知识点 1.C#中的队列类 在C#中实现队列类&a…...
IDEA自定义Maven仓库
Maven 是一款广泛应用于 Java 开发的工具,其作用类似于一个全自动的 JAR 包管理器,能够方便地导入开发所需的相关 JAR 包。在使用 Maven 进行 Java 程序开发时,开发者能够极大地提高开发效率。以下是关于如何安装 Maven 以及在 IDEA 中配置自…...
Codeql复现CVE-2018-11776学习笔记
基本使用 1、首先下载struts2漏洞版本源码: https://codeload.github.com/apache/struts/zip/refs/tags/STRUTS_2_3_20 2、构建codeql数据库(构建失败文末有解决办法): codeql database create ~/CodeQL/databases/struts2-2.3.…...
CVE-2024-27199 JetBrains TeamCity 身份验证绕过漏洞2
漏洞简介 TeamCity Web 服务器中发现了第二个身份验证绕过漏洞。这种身份验证旁路允许在没有身份验证的情况下访问有限数量的经过身份验证的端点。未经身份验证的攻击者可以利用此漏洞修改服务器上有限数量的系统设置,并泄露服务器上有限数量的敏感信息。 项目官网…...
ms office学习记录12:Excel学习记录㈥
数据工具 分列的其他运用:身份证号中“出生日期”切片:分列→固定宽度→下一步→切割出三列→下一步→不导入第一列→导入第二列且转换成日期→不导入第三列→完成 删除重复值:定位到要“数据”选项卡→删除重复项→取消全选再勾选要删除的…...
基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的条形码二维码检测系统(深度学习+UI界面+训练数据集+Python代码)
摘要:在物流和制造业中,开发一套高效的条形码与二维码识别系统显得尤为关键。本博文深入探讨了如何利用深度学习技术打造出一套先进的条形码及二维码检测系统,并且提供了一套完整的实施方案。该系统搭载了性能卓越的YOLOv8算法,并…...
npm yarn 一起使用报错
项目记录,具有独特性,仅供参考 项目好好的运行,前一天装个测试工具包, 突然就不行了,卸载重装也不行,所有的项目都安装失败,新起一个项目也不行,有时候某个单独安装一个包可以&…...
基于springboot实现驾校信息管理系统项目【项目源码+论文说明】计算机毕业设计
基于springboot实现驾校信息管理系统演示 摘要 随着人们生活水平的不断提高,出行方式多样化,也以私家车为主,那么既然私家车的需求不断增长,那么基于驾校的考核管理也就不断增强,那么业务系统也就慢慢的随之加大。信息…...
DXP软件界面显示“No Hard Devices”【简单的操作问题】加【软件下载】
目录 一,DXP软件界面显示“No Hard Devices” 二,软件下载的百度网盘资源 一,DXP软件界面显示“No Hard Devices” Protel DXP是2004是澳大利亚Altium公司于2002年推出的一款电子设计自动化软件。它的主要功能包括:原理图编辑、印…...
通过Spring Boot 实现页面配置生成动态接口?
流程介绍 在Spring Boot中实现页面配置生成动态接口通常涉及几个关键步骤: 设计页面配置:首先,你需要设计一个用户界面(UI),允许用户通过此界面来配置接口的各种参数,例如HTTP方法(GET、POST等)、URL路径、请求参数、响应数据格式等。保存配置信息:当用户通过页面配置…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...
Yolov8 目标检测蒸馏学习记录
yolov8系列模型蒸馏基本流程,代码下载:这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中,**知识蒸馏(Knowledge Distillation)**被广泛应用,作为提升模型…...
Python 高效图像帧提取与视频编码:实战指南
Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...
Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践
在 Kubernetes 集群中,如何在保障应用高可用的同时有效地管理资源,一直是运维人员和开发者关注的重点。随着微服务架构的普及,集群内各个服务的负载波动日趋明显,传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...
内窥镜检查中基于提示的息肉分割|文献速递-深度学习医疗AI最新文献
Title 题目 Prompt-based polyp segmentation during endoscopy 内窥镜检查中基于提示的息肉分割 01 文献速递介绍 以下是对这段英文内容的中文翻译: ### 胃肠道癌症的发病率呈上升趋势,且有年轻化倾向(Bray等人,2018&#x…...
从零手写Java版本的LSM Tree (一):LSM Tree 概述
🔥 推荐一个高质量的Java LSM Tree开源项目! https://github.com/brianxiadong/java-lsm-tree java-lsm-tree 是一个从零实现的Log-Structured Merge Tree,专为高并发写入场景设计。 核心亮点: ⚡ 极致性能:写入速度超…...
基于stm32F10x 系列微控制器的智能电子琴(附完整项目源码、详细接线及讲解视频)
注:文章末尾网盘链接中自取成品使用演示视频、项目源码、项目文档 所用硬件:STM32F103C8T6、无源蜂鸣器、44矩阵键盘、flash存储模块、OLED显示屏、RGB三色灯、面包板、杜邦线、usb转ttl串口 stm32f103c8t6 面包板 …...
计算机系统结构复习-名词解释2
1.定向:在某条指令产生计算结果之前,其他指令并不真正立即需要该计算结果,如果能够将该计算结果从其产生的地方直接送到其他指令中需要它的地方,那么就可以避免停顿。 2.多级存储层次:由若干个采用不同实现技术的存储…...
Yolo11改进策略:Block改进|FCM,特征互补映射模块|AAAI 2025|即插即用
1 论文信息 FBRT-YOLO(Faster and Better for Real-Time Aerial Image Detection)是由北京理工大学团队提出的专用于航拍图像实时目标检测的创新框架,发表于AAAI 2025。论文针对航拍场景中小目标检测的核心难题展开研究,重点解决…...
