Flink广播流 BroadcastStream
文章目录
- 前言
- BroadcastStream代码示例
- Broadcast 使用注意事项
前言
Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。
广播流的使用通常涉及以下步骤:
-
定义MapStateDescriptor:首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。
-
创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的
broadcast()方法实现,并将MapStateDescriptor作为参数传入。 -
连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是Keyed流还是Non-Keyed流)连接起来。这通过调用非广播流的
connect()方法完成,并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream,它提供了process()方法用于处理数据。 -
处理数据:在
process()方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的KeyedBroadcastProcessFunction或BroadcastProcessFunction类型的处理函数。
广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如,当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。
总的来说,Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。
BroadcastStream代码示例
功能:将用户信息进行广播,从Kafka中读取用户访问记录,判断访问用户是否存在
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;import flink.demo.data.UserVo;
/*** 多流connect,并进行join**/
public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties = new Properties();proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");proterties.setProperty("group.id", "test");proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// proterties.setProperty("auto.offset.reset", "latest");FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test",new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute("kafkaTest");}private static class UserPvProcessorextends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {private static final long serialVersionUID = 1L;MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));@Override//用户信息处理public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out)throws Exception {// 将接收到的控制数据放到 broadcast state 中 ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+" " + value.f1);}@Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从 broadcast state 中拿到用户列表信息List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));if(userList!=null&&userList.size()>0) {Map<String,String> userMap=new HashMap<>();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());}
// System.out.println(userMap);JsonNode value = element.get("value");String userid=value.get("user").asText();String userName=userMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()+"存在用户"+userid+" "+userName +" "+time);}else {out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );}}else {out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );}}}
}
Broadcast 使用注意事项
- 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
- 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
- Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游tasks,但是元素到达的顺序可能不同,所以更新state时不能依赖元素到达的顺序;
- 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
- 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存
相关文章:
Flink广播流 BroadcastStream
文章目录 前言BroadcastStream代码示例Broadcast 使用注意事项 前言 Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同…...
IP数据报格式
每一行都由32位比特,即4个字节组成,每个格子称为字段或者域。IP数据报由20字节的固定部分和最大40字节的可变部分组成。 总长度 总长度为16个比特,该字段的取值以字节为单位,用来表示IPv4数据报的长度(首部长度数据载荷长度)最大…...
GET https://registry.npm.taobao.org/xxxx error (CERT_HAS_EXPIRED)解决
PNPM用的阿里源,提示意思是证书过期了,参考网上的解决办法。执行 pnpm config set registry https://registry.npmmirror.com 再用pnpm config get registry查看,确实是 https://registry.npmmirror.com 但是仍旧报错,发现还…...
SSM Java Web项目由于spring-mvc.xml配置不对带来的一系列问题
1 介绍 一年多前,我就买了好多关于Java开发类的书籍,内容关于Java Web实操、Spring 学习指南、Maven实战、IntelliJ IDEA软件开发与应用等等。可是由于工作繁忙,这些书没系统地看完。这也是参加工作后的无奈吧! 寒假期间的一周&…...
MySQL事务隔离
什么是事务隔离? 为了确保在并发事务执行时,各个事务之间能够相互独立、互不干扰地运行,从而保证数据的一致性。 事务的隔离级别 MySQL事务隔离为了满足不同场景,提供了4个事务隔离级别(严格来讲是InnoDB存储引擎支…...
Java基础知识总结(1)
Java概况 JavaSE是java分类中的标准版,是刚接触java要学习的基础知识。 JavaEE是java分类中的企业版,是java中的高级,涉及到的知识广泛。 JavaME中M是Micro的缩写,用在嵌入式等电子设备中。 Java软件工程师:通过Ja…...
脚手架原理之webpack处理html文件和模块打包
脚手架原理之webpack处理html文件和模块打包 为了更好的理解项目脚手架的使用,我们来学习一下webpack工具,因为脚手架的底层就是基于webpack工具实现的。 安装 webpack工具是基于nodejs的,所以首先要有nodejs环境,其次需要下载…...
Winform编程详解一:Form窗口
一、属性介绍 1. (Name) 窗体的对象标识符ID 2. Text 修改窗口左上角标题 3. Icon 修改窗口左上角图标,图标最合适大小 32*32 4. 修改窗体第一次出现位置 代码修改:StartPosition System.Windows.Forms.FormStartPosition.CenterScreen; 5…...
Windows Server 2025 Install Preview
前言 Windows Server 2025 带来了巨大的发展,例如面向所有人的热补丁、下一代 Active Directory 和 SMB、关键任务数据和存储、Hyper-V 和 AI 等 Windows Server 2025 Preview download 下载 已注册的预览体验成员可以直接导航到 Windows Server Insider Preview 下载页面。…...
四、MySQL
MySQL MySQL1.初识网站2.安装MySQL2.1 下载(最重要的一点是路径中不能有中文,哪怕是同级目录也不行)2.2安装补丁2.3安装2.4创建配置文件2.5初始化 3.启动MySQL4.连接测试4.1 设置密码4.2 查看已有的文件夹(数据库)4.3 …...
C#使用泛型自定义的方法设计队列CQueue<T>类
目录 一、涉及到的知识点 1.C#中的队列类 2.自定义队列的方法 (1)先设计一个CList<T>类 (2)再设计CQueue<T>类 二、自定义队列CQueue<T>类的实例 一、涉及到的知识点 1.C#中的队列类 在C#中实现队列类&a…...
IDEA自定义Maven仓库
Maven 是一款广泛应用于 Java 开发的工具,其作用类似于一个全自动的 JAR 包管理器,能够方便地导入开发所需的相关 JAR 包。在使用 Maven 进行 Java 程序开发时,开发者能够极大地提高开发效率。以下是关于如何安装 Maven 以及在 IDEA 中配置自…...
Codeql复现CVE-2018-11776学习笔记
基本使用 1、首先下载struts2漏洞版本源码: https://codeload.github.com/apache/struts/zip/refs/tags/STRUTS_2_3_20 2、构建codeql数据库(构建失败文末有解决办法): codeql database create ~/CodeQL/databases/struts2-2.3.…...
CVE-2024-27199 JetBrains TeamCity 身份验证绕过漏洞2
漏洞简介 TeamCity Web 服务器中发现了第二个身份验证绕过漏洞。这种身份验证旁路允许在没有身份验证的情况下访问有限数量的经过身份验证的端点。未经身份验证的攻击者可以利用此漏洞修改服务器上有限数量的系统设置,并泄露服务器上有限数量的敏感信息。 项目官网…...
ms office学习记录12:Excel学习记录㈥
数据工具 分列的其他运用:身份证号中“出生日期”切片:分列→固定宽度→下一步→切割出三列→下一步→不导入第一列→导入第二列且转换成日期→不导入第三列→完成 删除重复值:定位到要“数据”选项卡→删除重复项→取消全选再勾选要删除的…...
基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的条形码二维码检测系统(深度学习+UI界面+训练数据集+Python代码)
摘要:在物流和制造业中,开发一套高效的条形码与二维码识别系统显得尤为关键。本博文深入探讨了如何利用深度学习技术打造出一套先进的条形码及二维码检测系统,并且提供了一套完整的实施方案。该系统搭载了性能卓越的YOLOv8算法,并…...
npm yarn 一起使用报错
项目记录,具有独特性,仅供参考 项目好好的运行,前一天装个测试工具包, 突然就不行了,卸载重装也不行,所有的项目都安装失败,新起一个项目也不行,有时候某个单独安装一个包可以&…...
基于springboot实现驾校信息管理系统项目【项目源码+论文说明】计算机毕业设计
基于springboot实现驾校信息管理系统演示 摘要 随着人们生活水平的不断提高,出行方式多样化,也以私家车为主,那么既然私家车的需求不断增长,那么基于驾校的考核管理也就不断增强,那么业务系统也就慢慢的随之加大。信息…...
DXP软件界面显示“No Hard Devices”【简单的操作问题】加【软件下载】
目录 一,DXP软件界面显示“No Hard Devices” 二,软件下载的百度网盘资源 一,DXP软件界面显示“No Hard Devices” Protel DXP是2004是澳大利亚Altium公司于2002年推出的一款电子设计自动化软件。它的主要功能包括:原理图编辑、印…...
通过Spring Boot 实现页面配置生成动态接口?
流程介绍 在Spring Boot中实现页面配置生成动态接口通常涉及几个关键步骤: 设计页面配置:首先,你需要设计一个用户界面(UI),允许用户通过此界面来配置接口的各种参数,例如HTTP方法(GET、POST等)、URL路径、请求参数、响应数据格式等。保存配置信息:当用户通过页面配置…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...
Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
PL0语法,分析器实现!
简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...
爬虫基础学习day2
# 爬虫设计领域 工商:企查查、天眼查短视频:抖音、快手、西瓜 ---> 飞瓜电商:京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空:抓取所有航空公司价格 ---> 去哪儿自媒体:采集自媒体数据进…...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
初学 pytest 记录
安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...
