flink学习之广播流与合流操作demo
广播流是什么?
将一条数据广播到所有的节点。使用 dataStream.broadCast()
广播流使用场景?
一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复
vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。
广播流怎么用?
一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流
不多说直接上demo,开箱即用
package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Pattern> patternDataStream = env.addSource(new ChangeSource());DataStream<User> userDataStream = env.addSource(new CustomerSource());userDataStream.print("user");patternDataStream.print("pattern");//test1 直接合流 不广播。只会在一个节点更新。 用于特殊需求?
// userDataStream
// .keyBy(user -> user.userId)
// .connect(patternDataStream)
// .process(new CustomerSimpleProcess())
// .print();//test2// 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
// userDataStream
// .keyBy(user -> user.userId)
// .connect(patternDataStream.broadcast())
// .process(new CustomerSimpleProcess())
// .print();//test3MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStream<Pattern> broadcast = patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user -> user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction<Integer, User, Pattern, String> {@Overridepublic void processElement(User user, KeyedBroadcastProcessFunction<Integer, User, Pattern, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {Integer userVip = user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!=null){Integer patternVip = pattern.vip;String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>= patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}else {System.out.println("pattern is null ");}}@Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<Integer,User, Pattern, String>.Context context, Collector<String> collector) throws Exception {BroadcastState<Void, Pattern> bcState = context.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunction<User, Pattern, String> {ValueState<Integer> vip; //这个是保留主流的state的。 不是保留广播流的stateHashMap<String,Integer> vipMap;@Overridepublic void open(Configuration parameters) throws Exception {vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));vipMap=new HashMap<String,Integer>();super.open(parameters);}@Overridepublic void processElement1(User user, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {Integer userVip = user.getVip();Integer patternVip = vipMap.getOrDefault("vip", 0);String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>=patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}@Overridepublic void processElement2(Pattern pattern, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {vipMap.put("vip",pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId = userId;this.vip = vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Action{" +"userId=" + userId +", vip='" + vip + '\'' +'}';}}// 定义行为模式POJO类,包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Pattern{" +"vip='" + vip + '\'' +'}';}}private static class CustomerSource implements SourceFunction<User> {boolean run = true;@Overridepublic void run(SourceContext<User> sourceContext) throws Exception {while (true) {Integer userId = new Random().nextInt(1000);Integer vip = new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}@Overridepublic void cancel() {run = false;}}private static class ChangeSource implements SourceFunction<Pattern> {boolean run = true;@Overridepublic void run(SourceContext<Pattern> sourceContext) throws Exception {int i = 1;while (true) {sourceContext.collect(new Pattern(i++));Thread.sleep(5000);}}@Overridepublic void cancel() {run = false;}}}
demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。
test1 不广播

注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2
但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1
test2 map保存变化数据

test3通过描述器获取数据
和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。
相关文章:
flink学习之广播流与合流操作demo
广播流是什么? 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景? 一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3…...
PPT架构师架构技能图
PPT架构师架构技能图 目录概述需求: 设计思路实现思路分析1.软素质2.核心输出(office输出) 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,ma…...
STM32微控制器的低功耗模式
STM32微控制器的低功耗模式(Low-power modes):Sleep mode、Stop mode 和 Standby mode。 1.1 Sleep Mode(睡眠模式): 把STM32微控制器当作一位劳累的工人,他在工作过程中需要短暂的休息。在Sleep模式下,微控制器会关闭一部分电路,减小功耗,但仍然保持对中央处理单…...
tensorflow QAT
tensorflow qat https://www.wpgdadatong.com/tw/blog/detail/70672 在边缘运算的重点技术之中,除了简化复杂的模块构架,来简化参数量以提高运算速度的这项模块轻量化网络构架技术之外。另一项技术就是各家神经网络框架(TensorFlow、Pytorc…...
[杂谈]-快速了解LoRaWAN网络以及工作原理
快速了解LoRaWAN网络以及工作原理 文章目录 快速了解LoRaWAN网络以及工作原理1、LoRaWAN网络元素1.1 终端设备(End Devices)1.2 网关(Gateways)1.3 网络服务器(Net Server)1.4 应用服务器(Appli…...
MySQL--MySQL表的增删改查(基础)
排序:ORDER BY 语法: – ASC 为升序(从小到大) – DESC 为降序(从大到小) – 默认为 ASC SELECT … FROM table_name [WHERE …] ORDER BY column [ASC|DESC], […]; *** update...
Vue中启动提示polyfill缺少-webpack v5版本导致
安装 npm i node-polyfill-webpack-plugin 因为我们的项目使用webpack v5,其中polyfill Node核心模块被删除。所以,我们安装它是为了在项目中访问这些模块 vue.config.js文件 const { defineConfig } require("vue/cli-service"); const No…...
Hugging Face实战-系列教程3:AutoModelForSequenceClassification文本2分类
🚩🚩🚩Hugging Face 实战系列 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在notebook中进行 本篇文章配套的代码资源已经上传 下篇内容: Hugging Face实战-系列教程4:padding与attention_mask 输出我…...
《TCP/IP网络编程》阅读笔记--Socket类型及协议设置
目录 1--协议的定义 2--Socket的创建 2-1--协议族(Protocol Family) 2-2--Socket类型(Type) 3--Linux下实现TCP Socket 3-1--服务器端 3-2--客户端 3-3--编译运行 4--Windows下实现 TCP Socket 4-1--TCP服务端 4-2--TC…...
GitHub使用教程
GitHub使用教程 视频教程一:Github 新手够用指南 | 全程演示&个人找项目技巧放送_哔哩哔哩_bilibili 笔记: README.md编写教程:Typora官方免费版与入门教程__阿伟_的博客-CSDN博客 找开源项目的一些途径 • https://github.com/trendin…...
sql server 分区表
分区表 分区表是在SQL Server 2005之后的版本引入的特性,这个特性允许把逻辑上的一个表在物理上分为很多部分。换句话说,分区表从物理上看是将一个大表分成几个小表,但是从逻辑上看,还是一个大表。 步骤 创建分表区的步骤分为…...
开源许可证概述:GNU, BSD, Apache, MPL, 和 MIT
前言 开源许可证是开源软件分发的基础。它们定义了使用者如何使用,修改,分发开源软件。在这篇文章中,我们将探讨五种常见的开源许可证:GNU通用公共许可证 (GNU GPL),BSD许可证,Apache许可证,Mo…...
java中log使用总结
目录 一、概述1.1. 核心日志框架1.2 门面日志框架 二、最佳实践2.1 核心日志框架API包2.2 门面日志框架依赖2.3 集成使用2.3.1 集成jcl2.3.2 集成slf4j2.3.2.1 slf4j集成单一框架2.3.2.2 slf4j整合混合框架 三、总结3.1 所有相关包3.1.1 核心日志框架包3.1.2 门面日志框架3.1.3…...
【Java】传输层协议TCP
传输层协议TCP TCP报文格式首部长度保留位32位序列号和32位确认应答号标记ACKSYNFINRSTURGPSH 16位窗口大小16位校验和16位紧急指针选项 TCP特点可靠传输实现机制-确认应答超时重传连接管理机制三次握手四次挥手特殊情况 滑动窗口流量控制拥塞控制延迟应答捎带应答面向字节流粘…...
计算机网络基础知识(非常详细)
1. 网络模型 1.1 OSI 七层参考模型 七层模型,亦称 OSI(Open System Interconnection)参考模型,即开放式系统互联,是网络通信的标准模型。一般称为 OSI 参考模型或七层模型。 它是一个七层的、抽象的模型体ÿ…...
如何进行SEO优化数据分析?(掌握正确的数据分析方法,让您的网站更上一层楼!)
在互联网时代,SEO优化已经成为了每一个网站运营者必备的技能。而在SEO优化中,数据分析更是至关重要的一环。在本文中,我们将会详细介绍如何正确的进行SEO优化数据分析,让您的网站更上一层楼! 数据分析的重要性 数据分…...
Golang不同平台编译的思考
GOOS和GOARCH $GOOS可选值如下: darwin dragonfly freebsd linux netbsd openbsd plan9 solaris windows $GOARCH可选值如下 386 amd64 arm 在编译的时候我们可以根据实际需要对这两个参数进行组合。更详细的说明可以进官网看看 ## http://golang.org/cmd/go http…...
SpringSecurity学习
1.认证 密码校验用户 密码加密存储 Configuration public class SecurityConfig extends WebSecurityConfigurerAdapter {Beanpublic PasswordEncoder passwordEncoder(){return new BCryptPasswordEncoder();}} 我们没有这个配置,默认明文存储, {id}password;实现…...
时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测
时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测 目录 时序预测 | MATLAB实现ICEEMDAN-iMPA-BiLSTM时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 ICEEMDAN-iMPA-BiLSTM功率/风速预测 基于改进的自适应经验模态分解改进海洋捕食者算法双向长短期记忆…...
二叉树(上)
“路虽远,行则将至” ❤️主页:小赛毛 目录 1.树概念及结构 1.1树的概念 1.2 树的相关概念 1.3 树的表示(树的存储) 2.二叉树概念及结构 2.1概念 2.2现实中的二叉树 2.3 特殊的二叉树: 2.4 二叉树的性质 3.二叉树的顺…...
魔兽世界插件开发利器:wow_api技术架构与实战指南
魔兽世界插件开发利器:wow_api技术架构与实战指南 【免费下载链接】wow_api Documents of wow API -- 魔兽世界API资料以及宏工具 项目地址: https://gitcode.com/gh_mirrors/wo/wow_api 技术探索:从需求到架构的演进之路 魔兽世界插件开发生态长…...
驯服失控菜单:让右键操作提速60%的实战指南
驯服失控菜单:让右键操作提速60%的实战指南 【免费下载链接】ContextMenuManager 🖱️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 当你在Windows系统中右键点击文件时,是否曾面…...
力扣高频经典双题解:接雨水 + 无重复最长子串(思路 + 满分代码)
接雨水、无重复字符最长子串是面试高频、算法入门必刷的经典题,一道考动态规划预处理,一道考滑动窗口,都是数组 / 字符串题型里的核心套路。本篇把两道题的思路讲透、代码写清,新手也能一遍看懂,刷题效率直接拉满&…...
iPhone 抓包失败 4 种具体情况逐个解决方法
抓不到包这个描述太模糊了,在实际调试中,这句话至少对应四种完全不同的情况: 完全没有请求只有浏览器能抓到能抓到但 HTTPS 解不开能抓到但数据不完整 如果不先分清楚是哪一种,就会一直重复安装证书或改代理配置。一、先做一个验证…...
字节开源AI神器DeerFlow,4.1万星标刷屏,普通人免费就能用
文章目录这玩意儿不是ChatGPT那种"嘴炮型"选手35k星标怎么来的?字节这次把"龙虾"养明白了多智能体协作:不是一个人在战斗沙箱执行:让AI真的"动手"干活对比OpenAI:免费、本地、可控普通人怎么上手&a…...
WorkBuddy杀疯了?一群AI专家帮我打工,我在微信里当赛博虾工头!
梦瑶 发自 凹非寺量子位 | 公众号 QbitAI到底是谁说,给老板打工自己就当不成老板的?又是谁说,龙虾不好用、还不听使唤的?反正这些事儿,现在跟我没啥关系了。毕竟现在的我,已经转头当起了「虾工头」…...
PlayCover终极指南:三步在Mac上畅玩iOS游戏与应用
PlayCover终极指南:三步在Mac上畅玩iOS游戏与应用 【免费下载链接】PlayCover Community fork of PlayCover 项目地址: https://gitcode.com/gh_mirrors/pl/PlayCover 还在为心爱的iOS游戏无法在Mac上体验而烦恼吗?PlayCover为你打开了一扇全新的…...
英雄联盟智能助手如何解决游戏操作繁琐问题?提升游戏效率完全指南
英雄联盟智能助手如何解决游戏操作繁琐问题?提升游戏效率完全指南 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 你是…...
告别风扇噪音与过热:FanControl智能控温完全指南
告别风扇噪音与过热:FanControl智能控温完全指南 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.com/GitHub_Trending/fa/FanC…...
用U8g2库玩转OLED:Arduino显示动态变量+自定义图标的5个实用技巧
用U8g2库玩转OLED:Arduino显示动态变量自定义图标的5个实用技巧 在嵌入式开发中,OLED显示屏因其高对比度、低功耗和紧凑尺寸成为物联网设备和交互式项目的首选。U8g2库作为Arduino平台上最强大的显示驱动库之一,其灵活性和功能丰富性远超基础…...
