当前位置: 首页 > news >正文

Flink广播流 BroadcastStream

文章目录

  • 前言
  • BroadcastStream代码示例
  • Broadcast 使用注意事项


前言

Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。

广播流的使用通常涉及以下步骤:

  1. 定义MapStateDescriptor:首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。

  2. 创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的broadcast()方法实现,并将MapStateDescriptor作为参数传入。

  3. 连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是Keyed流还是Non-Keyed流)连接起来。这通过调用非广播流的connect()方法完成,并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream,它提供了process()方法用于处理数据。

  4. 处理数据:在process()方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的KeyedBroadcastProcessFunctionBroadcastProcessFunction类型的处理函数。

广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如,当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。

总的来说,Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。


BroadcastStream代码示例

功能:将用户信息进行广播,从Kafka中读取用户访问记录,判断访问用户是否存在


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;import flink.demo.data.UserVo;
/*** 多流connect,并进行join**/
public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties = new Properties();proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");proterties.setProperty("group.id", "test");proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        proterties.setProperty("auto.offset.reset", "latest");FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test",new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute("kafkaTest");}private static class UserPvProcessorextends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {private static final long serialVersionUID = 1L;MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));@Override//用户信息处理public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out)throws Exception {// 将接收到的控制数据放到 broadcast state 中  ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+"   " + value.f1);}@Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从 broadcast state 中拿到用户列表信息List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));if(userList!=null&&userList.size()>0) {Map<String,String> userMap=new HashMap<>();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());}
//				System.out.println(userMap);JsonNode value = element.get("value");String userid=value.get("user").asText();String userName=userMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()+"存在用户"+userid+"  "+userName +" "+time);}else {out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );}}else {out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );}}}
}

Broadcast 使用注意事项

  • 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
  • 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
  • Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游tasks,但是元素到达的顺序可能不同,所以更新state时不能依赖元素到达的顺序;
  • 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
  • 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存

相关文章:

Flink广播流 BroadcastStream

文章目录 前言BroadcastStream代码示例Broadcast 使用注意事项 前言 Flink中的广播流&#xff08;BroadcastStream&#xff09;是一种特殊的流处理方式&#xff0c;它允许将一个流&#xff08;通常是一个较小的流&#xff09;广播到所有的并行任务中&#xff0c;从而实现在不同…...

IP数据报格式

每一行都由32位比特&#xff0c;即4个字节组成&#xff0c;每个格子称为字段或者域。IP数据报由20字节的固定部分和最大40字节的可变部分组成。 总长度 总长度为16个比特&#xff0c;该字段的取值以字节为单位&#xff0c;用来表示IPv4数据报的长度(首部长度数据载荷长度)最大…...

GET https://registry.npm.taobao.org/xxxx error (CERT_HAS_EXPIRED)解决

PNPM用的阿里源&#xff0c;提示意思是证书过期了&#xff0c;参考网上的解决办法。执行 pnpm config set registry https://registry.npmmirror.com 再用pnpm config get registry查看&#xff0c;确实是 https://registry.npmmirror.com 但是仍旧报错&#xff0c;发现还…...

SSM Java Web项目由于spring-mvc.xml配置不对带来的一系列问题

1 介绍 一年多前&#xff0c;我就买了好多关于Java开发类的书籍&#xff0c;内容关于Java Web实操、Spring 学习指南、Maven实战、IntelliJ IDEA软件开发与应用等等。可是由于工作繁忙&#xff0c;这些书没系统地看完。这也是参加工作后的无奈吧&#xff01; 寒假期间的一周&…...

MySQL事务隔离

什么是事务隔离&#xff1f; 为了确保在并发事务执行时&#xff0c;各个事务之间能够相互独立、互不干扰地运行&#xff0c;从而保证数据的一致性。 事务的隔离级别 MySQL事务隔离为了满足不同场景&#xff0c;提供了4个事务隔离级别&#xff08;严格来讲是InnoDB存储引擎支…...

Java基础知识总结(1)

Java概况 JavaSE是java分类中的标准版&#xff0c;是刚接触java要学习的基础知识。 JavaEE是java分类中的企业版&#xff0c;是java中的高级&#xff0c;涉及到的知识广泛。 JavaME中M是Micro的缩写&#xff0c;用在嵌入式等电子设备中。 Java软件工程师&#xff1a;通过Ja…...

脚手架原理之webpack处理html文件和模块打包

脚手架原理之webpack处理html文件和模块打包 为了更好的理解项目脚手架的使用&#xff0c;我们来学习一下webpack工具&#xff0c;因为脚手架的底层就是基于webpack工具实现的。 安装 webpack工具是基于nodejs的&#xff0c;所以首先要有nodejs环境&#xff0c;其次需要下载…...

Winform编程详解一:Form窗口

一、属性介绍 1. (Name) 窗体的对象标识符ID 2. Text 修改窗口左上角标题 3. Icon 修改窗口左上角图标&#xff0c;图标最合适大小 32*32 4. 修改窗体第一次出现位置 代码修改&#xff1a;StartPosition System.Windows.Forms.FormStartPosition.CenterScreen; 5…...

Windows Server 2025 Install Preview

前言 Windows Server 2025 带来了巨大的发展,例如面向所有人的热补丁、下一代 Active Directory 和 SMB、关键任务数据和存储、Hyper-V 和 AI 等 Windows Server 2025 Preview download 下载 已注册的预览体验成员可以直接导航到 Windows Server Insider Preview 下载页面。…...

四、MySQL

MySQL MySQL1.初识网站2.安装MySQL2.1 下载&#xff08;最重要的一点是路径中不能有中文&#xff0c;哪怕是同级目录也不行&#xff09;2.2安装补丁2.3安装2.4创建配置文件2.5初始化 3.启动MySQL4.连接测试4.1 设置密码4.2 查看已有的文件夹&#xff08;数据库&#xff09;4.3 …...

C#使用泛型自定义的方法设计队列CQueue<T>类

目录 一、涉及到的知识点 1.C#中的队列类 2.自定义队列的方法 &#xff08;1&#xff09;先设计一个CList<T>类 &#xff08;2&#xff09;再设计CQueue<T>类 二、自定义队列CQueue<T>类的实例 一、涉及到的知识点 1.C#中的队列类 在C#中实现队列类&a…...

IDEA自定义Maven仓库

Maven 是一款广泛应用于 Java 开发的工具&#xff0c;其作用类似于一个全自动的 JAR 包管理器&#xff0c;能够方便地导入开发所需的相关 JAR 包。在使用 Maven 进行 Java 程序开发时&#xff0c;开发者能够极大地提高开发效率。以下是关于如何安装 Maven 以及在 IDEA 中配置自…...

Codeql复现CVE-2018-11776学习笔记

基本使用 1、首先下载struts2漏洞版本源码&#xff1a; https://codeload.github.com/apache/struts/zip/refs/tags/STRUTS_2_3_20 2、构建codeql数据库&#xff08;构建失败文末有解决办法&#xff09;&#xff1a; codeql database create ~/CodeQL/databases/struts2-2.3.…...

CVE-2024-27199 JetBrains TeamCity 身份验证绕过漏洞2

漏洞简介 TeamCity Web 服务器中发现了第二个身份验证绕过漏洞。这种身份验证旁路允许在没有身份验证的情况下访问有限数量的经过身份验证的端点。未经身份验证的攻击者可以利用此漏洞修改服务器上有限数量的系统设置&#xff0c;并泄露服务器上有限数量的敏感信息。 项目官网…...

ms office学习记录12:Excel学习记录㈥

数据工具 分列的其他运用&#xff1a;身份证号中“出生日期”切片&#xff1a;分列→固定宽度→下一步→切割出三列→下一步→不导入第一列→导入第二列且转换成日期→不导入第三列→完成 删除重复值&#xff1a;定位到要“数据”选项卡→删除重复项→取消全选再勾选要删除的…...

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的条形码二维码检测系统(深度学习+UI界面+训练数据集+Python代码)

摘要&#xff1a;在物流和制造业中&#xff0c;开发一套高效的条形码与二维码识别系统显得尤为关键。本博文深入探讨了如何利用深度学习技术打造出一套先进的条形码及二维码检测系统&#xff0c;并且提供了一套完整的实施方案。该系统搭载了性能卓越的YOLOv8算法&#xff0c;并…...

npm yarn 一起使用报错

项目记录&#xff0c;具有独特性&#xff0c;仅供参考 项目好好的运行&#xff0c;前一天装个测试工具包&#xff0c; 突然就不行了&#xff0c;卸载重装也不行&#xff0c;所有的项目都安装失败&#xff0c;新起一个项目也不行&#xff0c;有时候某个单独安装一个包可以&…...

基于springboot实现驾校信息管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现驾校信息管理系统演示 摘要 随着人们生活水平的不断提高&#xff0c;出行方式多样化&#xff0c;也以私家车为主&#xff0c;那么既然私家车的需求不断增长&#xff0c;那么基于驾校的考核管理也就不断增强&#xff0c;那么业务系统也就慢慢的随之加大。信息…...

DXP软件界面显示“No Hard Devices”【简单的操作问题】加【软件下载】

目录 一&#xff0c;DXP软件界面显示“No Hard Devices” 二&#xff0c;软件下载的百度网盘资源 一&#xff0c;DXP软件界面显示“No Hard Devices” Protel DXP是2004是澳大利亚Altium公司于2002年推出的一款电子设计自动化软件。它的主要功能包括&#xff1a;原理图编辑、印…...

通过Spring Boot 实现页面配置生成动态接口?

流程介绍 在Spring Boot中实现页面配置生成动态接口通常涉及几个关键步骤: 设计页面配置:首先,你需要设计一个用户界面(UI),允许用户通过此界面来配置接口的各种参数,例如HTTP方法(GET、POST等)、URL路径、请求参数、响应数据格式等。保存配置信息:当用户通过页面配置…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

算法打卡第18天

从中序与后序遍历序列构造二叉树 (力扣106题) 给定两个整数数组 inorder 和 postorder &#xff0c;其中 inorder 是二叉树的中序遍历&#xff0c; postorder 是同一棵树的后序遍历&#xff0c;请你构造并返回这颗 二叉树 。 示例 1: 输入&#xff1a;inorder [9,3,15,20,7…...

恶补电源:1.电桥

一、元器件的选择 搜索并选择电桥&#xff0c;再multisim中选择FWB&#xff0c;就有各种型号的电桥: 电桥是用来干嘛的呢&#xff1f; 它是一个由四个二极管搭成的“桥梁”形状的电路&#xff0c;用来把交流电&#xff08;AC&#xff09;变成直流电&#xff08;DC&#xff09;。…...

书籍“之“字形打印矩阵(8)0609

题目 给定一个矩阵matrix&#xff0c;按照"之"字形的方式打印这个矩阵&#xff0c;例如&#xff1a; 1 2 3 4 5 6 7 8 9 10 11 12 ”之“字形打印的结果为&#xff1a;1&#xff0c;…...

JDK 17 序列化是怎么回事

如何序列化&#xff1f;其实很简单&#xff0c;就是根据每个类型&#xff0c;用工厂类调用。逐个完成。 没什么漂亮的代码&#xff0c;只有有效、稳定的代码。 代码中调用toJson toJson 代码 mapper.writeValueAsString ObjectMapper DefaultSerializerProvider 一堆实…...

算法—栈系列

一&#xff1a;删除字符串中的所有相邻重复项 class Solution { public:string removeDuplicates(string s) {stack<char> st;for(int i 0; i < s.size(); i){char target s[i];if(!st.empty() && target st.top())st.pop();elsest.push(s[i]);}string ret…...

游戏开发中常见的战斗数值英文缩写对照表

游戏开发中常见的战斗数值英文缩写对照表 基础属性&#xff08;Basic Attributes&#xff09; 缩写英文全称中文释义常见使用场景HPHit Points / Health Points生命值角色生存状态MPMana Points / Magic Points魔法值技能释放资源SPStamina Points体力值动作消耗资源APAction…...

深入理解 React 样式方案

React 的样式方案较多,在应用开发初期,开发者需要根据项目业务具体情况选择对应样式方案。React 样式方案主要有: 1. 内联样式 2. module css 3. css in js 4. tailwind css 这些方案中,均有各自的优势和缺点。 1. 方案优劣势 1. 内联样式: 简单直观,适合动态样式和…...

Python第七周作业

Python第七周作业 文章目录 Python第七周作业 1.使用open以只读模式打开文件data.txt&#xff0c;并逐行打印内容 2.使用pathlib模块获取当前脚本的绝对路径&#xff0c;并创建logs目录&#xff08;若不存在&#xff09; 3.递归遍历目录data&#xff0c;输出所有.csv文件的路径…...