Flink广播流 BroadcastStream
文章目录
- 前言
- BroadcastStream代码示例
- Broadcast 使用注意事项
前言
Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。
广播流的使用通常涉及以下步骤:
-
定义MapStateDescriptor:首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。
-
创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的
broadcast()
方法实现,并将MapStateDescriptor作为参数传入。 -
连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是Keyed流还是Non-Keyed流)连接起来。这通过调用非广播流的
connect()
方法完成,并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream
,它提供了process()
方法用于处理数据。 -
处理数据:在
process()
方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的KeyedBroadcastProcessFunction
或BroadcastProcessFunction
类型的处理函数。
广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如,当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。
总的来说,Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。
BroadcastStream代码示例
功能:将用户信息进行广播,从Kafka中读取用户访问记录,判断访问用户是否存在
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;import flink.demo.data.UserVo;
/*** 多流connect,并进行join**/
public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties = new Properties();proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");proterties.setProperty("group.id", "test");proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// proterties.setProperty("auto.offset.reset", "latest");FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test",new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute("kafkaTest");}private static class UserPvProcessorextends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {private static final long serialVersionUID = 1L;MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));@Override//用户信息处理public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out)throws Exception {// 将接收到的控制数据放到 broadcast state 中 ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+" " + value.f1);}@Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从 broadcast state 中拿到用户列表信息List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));if(userList!=null&&userList.size()>0) {Map<String,String> userMap=new HashMap<>();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());}
// System.out.println(userMap);JsonNode value = element.get("value");String userid=value.get("user").asText();String userName=userMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()+"存在用户"+userid+" "+userName +" "+time);}else {out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );}}else {out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );}}}
}
Broadcast 使用注意事项
- 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
- 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
- Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游tasks,但是元素到达的顺序可能不同,所以更新state时不能依赖元素到达的顺序;
- 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
- 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存
相关文章:

Flink广播流 BroadcastStream
文章目录 前言BroadcastStream代码示例Broadcast 使用注意事项 前言 Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同…...

IP数据报格式
每一行都由32位比特,即4个字节组成,每个格子称为字段或者域。IP数据报由20字节的固定部分和最大40字节的可变部分组成。 总长度 总长度为16个比特,该字段的取值以字节为单位,用来表示IPv4数据报的长度(首部长度数据载荷长度)最大…...

GET https://registry.npm.taobao.org/xxxx error (CERT_HAS_EXPIRED)解决
PNPM用的阿里源,提示意思是证书过期了,参考网上的解决办法。执行 pnpm config set registry https://registry.npmmirror.com 再用pnpm config get registry查看,确实是 https://registry.npmmirror.com 但是仍旧报错,发现还…...

SSM Java Web项目由于spring-mvc.xml配置不对带来的一系列问题
1 介绍 一年多前,我就买了好多关于Java开发类的书籍,内容关于Java Web实操、Spring 学习指南、Maven实战、IntelliJ IDEA软件开发与应用等等。可是由于工作繁忙,这些书没系统地看完。这也是参加工作后的无奈吧! 寒假期间的一周&…...

MySQL事务隔离
什么是事务隔离? 为了确保在并发事务执行时,各个事务之间能够相互独立、互不干扰地运行,从而保证数据的一致性。 事务的隔离级别 MySQL事务隔离为了满足不同场景,提供了4个事务隔离级别(严格来讲是InnoDB存储引擎支…...

Java基础知识总结(1)
Java概况 JavaSE是java分类中的标准版,是刚接触java要学习的基础知识。 JavaEE是java分类中的企业版,是java中的高级,涉及到的知识广泛。 JavaME中M是Micro的缩写,用在嵌入式等电子设备中。 Java软件工程师:通过Ja…...

脚手架原理之webpack处理html文件和模块打包
脚手架原理之webpack处理html文件和模块打包 为了更好的理解项目脚手架的使用,我们来学习一下webpack工具,因为脚手架的底层就是基于webpack工具实现的。 安装 webpack工具是基于nodejs的,所以首先要有nodejs环境,其次需要下载…...

Winform编程详解一:Form窗口
一、属性介绍 1. (Name) 窗体的对象标识符ID 2. Text 修改窗口左上角标题 3. Icon 修改窗口左上角图标,图标最合适大小 32*32 4. 修改窗体第一次出现位置 代码修改:StartPosition System.Windows.Forms.FormStartPosition.CenterScreen; 5…...

Windows Server 2025 Install Preview
前言 Windows Server 2025 带来了巨大的发展,例如面向所有人的热补丁、下一代 Active Directory 和 SMB、关键任务数据和存储、Hyper-V 和 AI 等 Windows Server 2025 Preview download 下载 已注册的预览体验成员可以直接导航到 Windows Server Insider Preview 下载页面。…...

四、MySQL
MySQL MySQL1.初识网站2.安装MySQL2.1 下载(最重要的一点是路径中不能有中文,哪怕是同级目录也不行)2.2安装补丁2.3安装2.4创建配置文件2.5初始化 3.启动MySQL4.连接测试4.1 设置密码4.2 查看已有的文件夹(数据库)4.3 …...

C#使用泛型自定义的方法设计队列CQueue<T>类
目录 一、涉及到的知识点 1.C#中的队列类 2.自定义队列的方法 (1)先设计一个CList<T>类 (2)再设计CQueue<T>类 二、自定义队列CQueue<T>类的实例 一、涉及到的知识点 1.C#中的队列类 在C#中实现队列类&a…...

IDEA自定义Maven仓库
Maven 是一款广泛应用于 Java 开发的工具,其作用类似于一个全自动的 JAR 包管理器,能够方便地导入开发所需的相关 JAR 包。在使用 Maven 进行 Java 程序开发时,开发者能够极大地提高开发效率。以下是关于如何安装 Maven 以及在 IDEA 中配置自…...

Codeql复现CVE-2018-11776学习笔记
基本使用 1、首先下载struts2漏洞版本源码: https://codeload.github.com/apache/struts/zip/refs/tags/STRUTS_2_3_20 2、构建codeql数据库(构建失败文末有解决办法): codeql database create ~/CodeQL/databases/struts2-2.3.…...

CVE-2024-27199 JetBrains TeamCity 身份验证绕过漏洞2
漏洞简介 TeamCity Web 服务器中发现了第二个身份验证绕过漏洞。这种身份验证旁路允许在没有身份验证的情况下访问有限数量的经过身份验证的端点。未经身份验证的攻击者可以利用此漏洞修改服务器上有限数量的系统设置,并泄露服务器上有限数量的敏感信息。 项目官网…...

ms office学习记录12:Excel学习记录㈥
数据工具 分列的其他运用:身份证号中“出生日期”切片:分列→固定宽度→下一步→切割出三列→下一步→不导入第一列→导入第二列且转换成日期→不导入第三列→完成 删除重复值:定位到要“数据”选项卡→删除重复项→取消全选再勾选要删除的…...

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的条形码二维码检测系统(深度学习+UI界面+训练数据集+Python代码)
摘要:在物流和制造业中,开发一套高效的条形码与二维码识别系统显得尤为关键。本博文深入探讨了如何利用深度学习技术打造出一套先进的条形码及二维码检测系统,并且提供了一套完整的实施方案。该系统搭载了性能卓越的YOLOv8算法,并…...

npm yarn 一起使用报错
项目记录,具有独特性,仅供参考 项目好好的运行,前一天装个测试工具包, 突然就不行了,卸载重装也不行,所有的项目都安装失败,新起一个项目也不行,有时候某个单独安装一个包可以&…...

基于springboot实现驾校信息管理系统项目【项目源码+论文说明】计算机毕业设计
基于springboot实现驾校信息管理系统演示 摘要 随着人们生活水平的不断提高,出行方式多样化,也以私家车为主,那么既然私家车的需求不断增长,那么基于驾校的考核管理也就不断增强,那么业务系统也就慢慢的随之加大。信息…...

DXP软件界面显示“No Hard Devices”【简单的操作问题】加【软件下载】
目录 一,DXP软件界面显示“No Hard Devices” 二,软件下载的百度网盘资源 一,DXP软件界面显示“No Hard Devices” Protel DXP是2004是澳大利亚Altium公司于2002年推出的一款电子设计自动化软件。它的主要功能包括:原理图编辑、印…...

通过Spring Boot 实现页面配置生成动态接口?
流程介绍 在Spring Boot中实现页面配置生成动态接口通常涉及几个关键步骤: 设计页面配置:首先,你需要设计一个用户界面(UI),允许用户通过此界面来配置接口的各种参数,例如HTTP方法(GET、POST等)、URL路径、请求参数、响应数据格式等。保存配置信息:当用户通过页面配置…...

【数据结构与算法】:插入排序与希尔排序
🔥个人主页: Quitecoder 🔥专栏: 数据结构与算法 欢迎大家来到初阶数据结构的最后一小节:排序 目录 1.排序的基本概念与分类1.1什么是排序的稳定性?1.2内排序与外排序内排序外排序 2.插入排序2.1实现插入排序2.3稳定性…...

前端性能优化——javascript
优化处理: 讲javascript脚本文件放到body标记的后面 减少页面当中所包含的script标记的数量 课堂练习: 脚本优化处理 使用原生JavaScript完成操作过程。 document.querySelector document.querySelectorAll classList以及类的操作API Element.class…...

Docker容器化技术(使用Docker搭建论坛)
第一步:删除容器镜像文件 [rootlocalhost ~]# docker rm -f docker ps -aq b09ee6438986 e0fe8ebf3ba1第二步:使用docker拉取数据库 [rootlocalhost ~]# docker run -d --name db mysql:5.7 02a4e5bfffdc81cb6403985fe4cd6acb0c5fab0b19edf9f5b8274783…...

C# ListView 控件使用
1.基本设置 listView1.Columns.Add("序号", 60); //向 listView1控件中添加1列 同时设置列名称和宽度listView1.Columns.Add("温度", 100); //下同listView1.Columns.Add("偏移", 100);listView1.Columns.Add("分割", 50);listView1…...

【string一些函数用法的补充】
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 string类对象的修改操作 我们来看 c_str 返回c格式的字符串的操作: 我们来看 rfind 和 substr 的操作: string类非成员函数 我们来看 r…...

【Go】令牌桶限流算法
1. 限流 限流,顾名思义,限制用户请求流量,避免大规模并发导致系统宕机。 2. 令牌桶算法 令牌管理员以恒定的速率向令牌桶里放置一个令牌。如果桶满,就丢弃令牌。 请求到达时,都要先去令牌桶里取一个令牌,…...

go的slice学习
并发访问slice 线上出现一粒多协程并发append全局slice的情况,导致内存不断翻倍,因此对slice的使用需要重新考虑。 并发读写的情况下, 可以利用锁、channel等避免竞态 问题 func TestDemo32(t *testing.T) {var wg sync.WaitGroupvar n 1…...

软件设计师17--磁盘管理
软件设计师17--磁盘管理 考点1:存储管理 - 磁盘管理调度算法磁盘调度 - FCFS磁盘调度 - SSTF例题: 考点1:存储管理 - 磁盘管理 存取时间寻道时间等待时间,训导时间是指磁头移动到磁道所需的时间;等待时间为等待读写的扇…...

学点Java打小工——Day2Day3一点作业
1 猜数字(10次机会) 随机生成[1,1000]的一个数,输入你猜的数程序会给出反馈,直到猜对或次数用尽(10次)。 //猜数字 10次机会Testpublic void guessNumber() {Random random new Random();// [0, 1000) 1// [1, 1000]int num ra…...

【话题】2024年AI辅助研发趋势,有那些应用领域
大家好,我是全栈小5,欢迎阅读文章! 此篇是【话题达人】系列文章,这一次的话题是《2024年AI辅助研发趋势》 目录 背景概念实践医药领域汽车设计领域展望未来文章推荐 背景 随着人工智能技术的持续发展与突破,2024年AI辅…...