当前位置: 首页 > news >正文

Flink广播流 BroadcastStream

文章目录

  • 前言
  • BroadcastStream代码示例
  • Broadcast 使用注意事项


前言

Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。

广播流的使用通常涉及以下步骤:

  1. 定义MapStateDescriptor:首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。

  2. 创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的broadcast()方法实现,并将MapStateDescriptor作为参数传入。

  3. 连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是Keyed流还是Non-Keyed流)连接起来。这通过调用非广播流的connect()方法完成,并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream,它提供了process()方法用于处理数据。

  4. 处理数据:在process()方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的KeyedBroadcastProcessFunctionBroadcastProcessFunction类型的处理函数。

广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如,当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。

总的来说,Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。


BroadcastStream代码示例

功能:将用户信息进行广播,从Kafka中读取用户访问记录,判断访问用户是否存在


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;import flink.demo.data.UserVo;
/*** 多流connect,并进行join**/
public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties = new Properties();proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");proterties.setProperty("group.id", "test");proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        proterties.setProperty("auto.offset.reset", "latest");FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test",new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute("kafkaTest");}private static class UserPvProcessorextends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {private static final long serialVersionUID = 1L;MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));@Override//用户信息处理public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out)throws Exception {// 将接收到的控制数据放到 broadcast state 中  ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+"   " + value.f1);}@Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从 broadcast state 中拿到用户列表信息List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));if(userList!=null&&userList.size()>0) {Map<String,String> userMap=new HashMap<>();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());}
//				System.out.println(userMap);JsonNode value = element.get("value");String userid=value.get("user").asText();String userName=userMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()+"存在用户"+userid+"  "+userName +" "+time);}else {out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );}}else {out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );}}}
}

Broadcast 使用注意事项

  • 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
  • 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
  • Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游tasks,但是元素到达的顺序可能不同,所以更新state时不能依赖元素到达的顺序;
  • 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
  • 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存

相关文章:

Flink广播流 BroadcastStream

文章目录 前言BroadcastStream代码示例Broadcast 使用注意事项 前言 Flink中的广播流&#xff08;BroadcastStream&#xff09;是一种特殊的流处理方式&#xff0c;它允许将一个流&#xff08;通常是一个较小的流&#xff09;广播到所有的并行任务中&#xff0c;从而实现在不同…...

IP数据报格式

每一行都由32位比特&#xff0c;即4个字节组成&#xff0c;每个格子称为字段或者域。IP数据报由20字节的固定部分和最大40字节的可变部分组成。 总长度 总长度为16个比特&#xff0c;该字段的取值以字节为单位&#xff0c;用来表示IPv4数据报的长度(首部长度数据载荷长度)最大…...

GET https://registry.npm.taobao.org/xxxx error (CERT_HAS_EXPIRED)解决

PNPM用的阿里源&#xff0c;提示意思是证书过期了&#xff0c;参考网上的解决办法。执行 pnpm config set registry https://registry.npmmirror.com 再用pnpm config get registry查看&#xff0c;确实是 https://registry.npmmirror.com 但是仍旧报错&#xff0c;发现还…...

SSM Java Web项目由于spring-mvc.xml配置不对带来的一系列问题

1 介绍 一年多前&#xff0c;我就买了好多关于Java开发类的书籍&#xff0c;内容关于Java Web实操、Spring 学习指南、Maven实战、IntelliJ IDEA软件开发与应用等等。可是由于工作繁忙&#xff0c;这些书没系统地看完。这也是参加工作后的无奈吧&#xff01; 寒假期间的一周&…...

MySQL事务隔离

什么是事务隔离&#xff1f; 为了确保在并发事务执行时&#xff0c;各个事务之间能够相互独立、互不干扰地运行&#xff0c;从而保证数据的一致性。 事务的隔离级别 MySQL事务隔离为了满足不同场景&#xff0c;提供了4个事务隔离级别&#xff08;严格来讲是InnoDB存储引擎支…...

Java基础知识总结(1)

Java概况 JavaSE是java分类中的标准版&#xff0c;是刚接触java要学习的基础知识。 JavaEE是java分类中的企业版&#xff0c;是java中的高级&#xff0c;涉及到的知识广泛。 JavaME中M是Micro的缩写&#xff0c;用在嵌入式等电子设备中。 Java软件工程师&#xff1a;通过Ja…...

脚手架原理之webpack处理html文件和模块打包

脚手架原理之webpack处理html文件和模块打包 为了更好的理解项目脚手架的使用&#xff0c;我们来学习一下webpack工具&#xff0c;因为脚手架的底层就是基于webpack工具实现的。 安装 webpack工具是基于nodejs的&#xff0c;所以首先要有nodejs环境&#xff0c;其次需要下载…...

Winform编程详解一:Form窗口

一、属性介绍 1. (Name) 窗体的对象标识符ID 2. Text 修改窗口左上角标题 3. Icon 修改窗口左上角图标&#xff0c;图标最合适大小 32*32 4. 修改窗体第一次出现位置 代码修改&#xff1a;StartPosition System.Windows.Forms.FormStartPosition.CenterScreen; 5…...

Windows Server 2025 Install Preview

前言 Windows Server 2025 带来了巨大的发展,例如面向所有人的热补丁、下一代 Active Directory 和 SMB、关键任务数据和存储、Hyper-V 和 AI 等 Windows Server 2025 Preview download 下载 已注册的预览体验成员可以直接导航到 Windows Server Insider Preview 下载页面。…...

四、MySQL

MySQL MySQL1.初识网站2.安装MySQL2.1 下载&#xff08;最重要的一点是路径中不能有中文&#xff0c;哪怕是同级目录也不行&#xff09;2.2安装补丁2.3安装2.4创建配置文件2.5初始化 3.启动MySQL4.连接测试4.1 设置密码4.2 查看已有的文件夹&#xff08;数据库&#xff09;4.3 …...

C#使用泛型自定义的方法设计队列CQueue<T>类

目录 一、涉及到的知识点 1.C#中的队列类 2.自定义队列的方法 &#xff08;1&#xff09;先设计一个CList<T>类 &#xff08;2&#xff09;再设计CQueue<T>类 二、自定义队列CQueue<T>类的实例 一、涉及到的知识点 1.C#中的队列类 在C#中实现队列类&a…...

IDEA自定义Maven仓库

Maven 是一款广泛应用于 Java 开发的工具&#xff0c;其作用类似于一个全自动的 JAR 包管理器&#xff0c;能够方便地导入开发所需的相关 JAR 包。在使用 Maven 进行 Java 程序开发时&#xff0c;开发者能够极大地提高开发效率。以下是关于如何安装 Maven 以及在 IDEA 中配置自…...

Codeql复现CVE-2018-11776学习笔记

基本使用 1、首先下载struts2漏洞版本源码&#xff1a; https://codeload.github.com/apache/struts/zip/refs/tags/STRUTS_2_3_20 2、构建codeql数据库&#xff08;构建失败文末有解决办法&#xff09;&#xff1a; codeql database create ~/CodeQL/databases/struts2-2.3.…...

CVE-2024-27199 JetBrains TeamCity 身份验证绕过漏洞2

漏洞简介 TeamCity Web 服务器中发现了第二个身份验证绕过漏洞。这种身份验证旁路允许在没有身份验证的情况下访问有限数量的经过身份验证的端点。未经身份验证的攻击者可以利用此漏洞修改服务器上有限数量的系统设置&#xff0c;并泄露服务器上有限数量的敏感信息。 项目官网…...

ms office学习记录12:Excel学习记录㈥

数据工具 分列的其他运用&#xff1a;身份证号中“出生日期”切片&#xff1a;分列→固定宽度→下一步→切割出三列→下一步→不导入第一列→导入第二列且转换成日期→不导入第三列→完成 删除重复值&#xff1a;定位到要“数据”选项卡→删除重复项→取消全选再勾选要删除的…...

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的条形码二维码检测系统(深度学习+UI界面+训练数据集+Python代码)

摘要&#xff1a;在物流和制造业中&#xff0c;开发一套高效的条形码与二维码识别系统显得尤为关键。本博文深入探讨了如何利用深度学习技术打造出一套先进的条形码及二维码检测系统&#xff0c;并且提供了一套完整的实施方案。该系统搭载了性能卓越的YOLOv8算法&#xff0c;并…...

npm yarn 一起使用报错

项目记录&#xff0c;具有独特性&#xff0c;仅供参考 项目好好的运行&#xff0c;前一天装个测试工具包&#xff0c; 突然就不行了&#xff0c;卸载重装也不行&#xff0c;所有的项目都安装失败&#xff0c;新起一个项目也不行&#xff0c;有时候某个单独安装一个包可以&…...

基于springboot实现驾校信息管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现驾校信息管理系统演示 摘要 随着人们生活水平的不断提高&#xff0c;出行方式多样化&#xff0c;也以私家车为主&#xff0c;那么既然私家车的需求不断增长&#xff0c;那么基于驾校的考核管理也就不断增强&#xff0c;那么业务系统也就慢慢的随之加大。信息…...

DXP软件界面显示“No Hard Devices”【简单的操作问题】加【软件下载】

目录 一&#xff0c;DXP软件界面显示“No Hard Devices” 二&#xff0c;软件下载的百度网盘资源 一&#xff0c;DXP软件界面显示“No Hard Devices” Protel DXP是2004是澳大利亚Altium公司于2002年推出的一款电子设计自动化软件。它的主要功能包括&#xff1a;原理图编辑、印…...

通过Spring Boot 实现页面配置生成动态接口?

流程介绍 在Spring Boot中实现页面配置生成动态接口通常涉及几个关键步骤: 设计页面配置:首先,你需要设计一个用户界面(UI),允许用户通过此界面来配置接口的各种参数,例如HTTP方法(GET、POST等)、URL路径、请求参数、响应数据格式等。保存配置信息:当用户通过页面配置…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

django filter 统计数量 按属性去重

在Django中&#xff0c;如果你想要根据某个属性对查询集进行去重并统计数量&#xff0c;你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求&#xff1a; 方法1&#xff1a;使用annotate()和Count 假设你有一个模型Item&#xff0c;并且你想…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

2025盘古石杯决赛【手机取证】

前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来&#xff0c;实在找不到&#xff0c;希望有大佬教一下我。 还有就会议时间&#xff0c;我感觉不是图片时间&#xff0c;因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

均衡后的SNRSINR

本文主要摘自参考文献中的前两篇&#xff0c;相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程&#xff0c;其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt​ 根发送天线&#xff0c; n r n_r nr​ 根接收天线的 MIMO 系…...

如何在网页里填写 PDF 表格?

有时候&#xff0c;你可能希望用户能在你的网站上填写 PDF 表单。然而&#xff0c;这件事并不简单&#xff0c;因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件&#xff0c;但原生并不支持编辑或填写它们。更糟的是&#xff0c;如果你想收集表单数据&#xff…...

基于 TAPD 进行项目管理

起因 自己写了个小工具&#xff0c;仓库用的Github。之前在用markdown进行需求管理&#xff0c;现在随着功能的增加&#xff0c;感觉有点难以管理了&#xff0c;所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD&#xff0c;需要提供一个企业名新建一个项目&#…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中&#xff0c;Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式&#xff0c;用于在多个 Goroutine 之间传递数据&#xff0c;从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...