当前位置: 首页 > news >正文

Flink预加载分区维表,实时更新配置信息

当前我们的业务场景,是基于dataStream代码, 维表数据量很大, 实时性要求很高,所以采用预加载分区维表模式, kafka广播流实时更新配置。

实现方案
1:job初始化时 每个分区open 只加载自己那部分的配置, 不用每个分区都全量加载。
2: 配置实时更新, 采用kafka topic传到flink job广播流中,使用ConfigBroadcastProcessFunction更新分区内的配置信息。

衡量指标

总体来讲,关联维表有三个基础的方式:实时数据库查找关联(Per-Record Reference Data Lookup)、预加载维表关联(Pre-Loading of Reference Data)和维表变更日志关联(Reference Data Change Stream),而根据实现上的优化可以衍生出多种关联方式,且这些优化还可以灵活组合产生不同效果(不过为了简单性这里不讨论同时应用多种优化的实现方式)。对于不同的关联方式,我们可以从以下 7 个关键指标来衡量(每个指标的得分将以 1-5 五档来表示):

实现简单性: 设计是否足够简单,易于迭代和维护。

吞吐量: 性能是否足够好。

维表数据的实时性: 维度表的更新是否可以立刻对作业可见。

数据库的负载: 是否对外部数据库造成较大的负载(负载越低分越高)。

内存资源占用: 是否需要大量内存来缓存维表数据(内存占用越少分越高)。

可拓展性: 在更大规模的数据下会不会出现瓶颈。

结果确定性: 在数据延迟或者数据重放情况下,是否可以得到一致的结果。

启动预加载分区维表
对于维表比较大的情况,可以启动预加载维表基础之上增加分区功能。简单来说就是将数据流按字段进行分区,然后每个 Subtask 只需要加在对应分区范围的维表数据。值得注意的是,这里的分区方式并不是用 keyby 这种通用的 hash 分区,而是需要根据业务数据定制化分区策略,然后调用 DataStream#partitionCustom。比如按照 userId 等区间划分,0-999 划分到 subtask 1,1000-1999 划分到 subtask 2,以此类推。而在 open() 方法中,我们再根据 subtask 的 id 和总并行度来计算应该加载的维表数据范围。

在这里插入图片描述
启动预加载分区维表介绍:
通过这种分区方式,维表的大小上限理论上可以线性拓展,解决了维表大小受限于单个 TaskManager 内存的问题(现在是取决于所有 TaskManager 的内存总量),但同时给带来设计和维护分区策略的复杂性。

缓存方式
在这里插入图片描述
之前业务场景是采用的第一种, 但是配置数据量越来越大,已经不能支撑业务,所以模拟调研第三种方式,设计和维护分区策略

代码实验
Flink设置4个并行度, 2个taskmanager

-m yarn-cluster -p 4 -yjm 1024m -ytm 2048m -ynm $application_name -ys 2

在这里插入图片描述
在这里插入图片描述
采用自定义Partition设计和维护分区策略,数据流和维表connect

.filter(_.nonEmpty)
.map(_.get)
.partitionCustom(new CustomPartitioner(),data => {s"${data.datas.controlPlanId}"
})
.connect(indicatorConfigBroadcastStream)
.process(new FdcIndicatorConfigBroadcastProcessFunction)
.name("FdcGenerateIndicator")
.uid("FdcGenerateIndicator")

自定义Partition分区类

import org.apache.flink.api.common.functions.Partitioner
import org.slf4j.{Logger, LoggerFactory}class CustomPartitioner extends Partitioner[String]{lazy private val logger: Logger = LoggerFactory.getLogger(classOf[CustomPartitioner])override def partition(key: String, numPartitions: Int): Int = {logger.warn("分区总数"+numPartitions)return (key.hashCode % numPartitions).abs}
}

BroadcastProcessFunction

class ConfigBroadcastProcessFunctionextends BroadcastProcessFunction[fdcWindowData, JsonNode,(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])] {lazy private val logger: Logger = LoggerFactory.getLogger(classOf[FdcIndicatorConfigBroadcastProcessFunction])// 初始化override def open(parameters: Configuration): Unit = {logger.warn(s"getIndexOfThisSubtask: ${getRuntimeContext.getIndexOfThisSubtask}")logger.warn(s"getNumberOfParallelSubtasks: ${getRuntimeContext.getNumberOfParallelSubtasks}")super.open(parameters)// 获取全局变量val p = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]ProjectConfig.getConfig(p)}// 数据流override def processElement(windowData: fdcWindowData, ctx: BroadcastProcessFunction[fdcWindowData,JsonNode, (ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]#ReadOnlyContext,out: Collector[(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]): Unit = {logger.warn(s"${getRuntimeContext.getIndexOfThisSubtask}")}// 广播流override def processBroadcastElement(value: JsonNode, ctx: BroadcastProcessFunction[fdcWindowData, JsonNode, (ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]#Context,out: Collector[(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]): Unit = {}
}

打印结果:
taskmanager1; open的时候打印信息
在这里插入图片描述
taskmanager2; open的时候打印信息
在这里插入图片描述
当数据流来时, processElement中的打印信息
在这里插入图片描述
参考:
https://blog.csdn.net/weixin_44904816/article/details/104305824
https://codeantenna.com/a/IcVVHYGUVi

https://www.jianshu.com/p/66b014dd2e36

https://blog.csdn.net/cloudbigdata/article/details/125013545

相关文章:

Flink预加载分区维表,实时更新配置信息

当前我们的业务场景,是基于dataStream代码, 维表数据量很大, 实时性要求很高,所以采用预加载分区维表模式, kafka广播流实时更新配置。 实现方案 1:job初始化时 每个分区open 只加载自己那部分的配置&…...

大数据现在找工作难么

大数据行业工作好找还是难找不是光靠嘴说出来的结合实际,看看市场上的招聘需求和岗位要求就大致知道了 要想符合企业用人规范,学历,工作经验,掌握技能都是非常重要的~ 先来看几个招聘网站的报告数据: Boss直聘发布的…...

【Linux】学会这些基本指令来上手Linux吧

前言上篇文章介绍了一些常用的指令,这篇文章再来介绍一下Linux必须学会的指令。一.时间相关的指令ate显示date 指定格式显示时间: date %Y:%m:%d date 用法:date [OPTION]... [FORMAT]1.在显示方面,使用者可以设定欲显示的格式&am…...

【沐风老师】3DMAX交通流插件TrafficFlow使用方法详解

TrafficFlow交通流插件,模拟生成车流、人流动画。 【版本要求】 3dMax 2008及更高版本 【安装方法】 无需安装直接拖动插件脚本文件到3dMax视口中打开。 【快速开始】 1.创建车辆对象和行车路径。 2.打开TrafficFlow插件,先选择“车辆”对象&#xff0…...

c#实现视频的批量剪辑

篇首,完全没有技术含量的帖子,高手略过,只为十几年后重新捡起的我爱好玩玩。。。 起因,一个朋友说他下载了很多短视频,但只需要要其中的一小截,去头掐尾,在软件里搞来搞去太麻烦,让…...

小白怎么系统的自学计算机科学和黑客技术?

我把csdn上有关自学网络安全、零基础入门网络安全的回答大致都浏览了一遍,最大的感受就是“太复杂”,新手看了之后只会更迷茫,还是不知道如何去做,所以站在新手的角度去写回答,应该把回答写的简单易懂,“傻…...

scheduler 的使用实验对比和总结(PyTorch)

这篇文章是在完成 HW02 的过程中所产生的,是关于各 scheduler (ReduceLROnPlateau(),CosineAnnealingLR(),CosineAnnealingWarmRestarts())使用的对比实验。 起因是为了在 Kaggle 上跑出更高的成绩,但结果确…...

vue2 虚拟列表(优化版)

作用&#xff1a; 虚拟列表是优化长列表的一种手段&#xff0c;防止列表存在过多的dom元素导致页面卡顿&#xff08;包扣移动端下拉到底加载下一页这种列表加载的dom元素多了一样会卡&#xff09;。 原理&#xff1a; 如上图简单地说就是以 <div classlist-view">作…...

从应用层到MCU,看Windows处理键盘输入 [1.在应用层调试Notepad.exe (按键消费者)]

文本编辑器/文本编辑框是应用层常见的键盘处理程序。微软泄露的WinXP源码下有文本编辑器Notepad的实现&#xff1a;Microsoft_leaked_source_code\nt5src\Source\XPSP1\NT\shell\osshell\accesory\notepad文本编辑器的实现并不复杂&#xff0c;微软又(被迫)提供了Sample&#x…...

什么是大数据?大数据能做什么

大数据发现现在如火如荼&#xff0c;也吸引了很多有志人士想要加入这个行业&#xff0c;但是在正式入行之前了解大数据是什么以及能做什么是非常重要的~ 下面我们一起来看一下~ 比较官方的定义是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合&#xff…...

Git 和 GitHub 超入门指南(四)

Git基本命令 以下是一些基本的Git命令&#xff1a; git add&#xff1a;将文件添加到Git索引中git commit&#xff1a;将索引中的文件提交到Git仓库中git status&#xff1a;查看工作目录和索引的状态git log&#xff1a;查看提交历史记录 Git高级命令 以下是一些高级的Git…...

Java 响应式编程 Reactor 框架

文章目录 Java 响应式编程 Reactor 框架FluxMono其它的关键对象Java 响应式编程 Reactor 框架 Reactor框架的核心理念是基于响应式编程的异步流处理。这意味着应用程序可以通过异步事件流来处理请求,而不是通过传统的同步请求-响应模型。在响应式编程中, 应用程序可以处理多个…...

Hazel引擎学习(十一)

我自己维护引擎的github地址在这里&#xff0c;里面加了不少注释&#xff0c;有需要的可以看看 参考视频链接在这里 很高兴的是&#xff0c;引擎的开发终于慢慢开始往深了走了&#xff0c;前几章的引擎UI搭建着实是有点折磨人&#xff0c;根据课程&#xff0c;接下来的引擎开发…...

深度学习(22):如何判断训练过程中深度学习模型损失值不再下降

2023年3月22日&#xff0c;与 chatGPT 的沟通如何判断训练过程中深度学习模型损失值不再下降在深度学习中&#xff0c;判断模型是否收敛是非常重要的&#xff0c;这可以通过监控模型损失值来实现。一般来说&#xff0c;当训练模型的损失值不再下降&#xff0c;我们就可以认为模…...

一个比较全面的C#公共帮助类

上次跟大家推荐过2个C#开发工具箱&#xff1a;《推荐一个不到2MB的C#开发工具箱&#xff0c;集成了上千个常用操作类》、《推荐一个.Net常用代码集合&#xff0c;助你高效完成业务》。 今天再给大家推荐一个&#xff0c;这几个部分代码功能有重合的部分&#xff0c;大家可以根…...

人脸识别经典网络-MTCNN(含Python源码实现)

人脸检测-mtcnn 本文参加新星计划人工智能赛道&#xff1a;https://bbs.csdn.net/topics/613989052 文章目录人脸检测-mtcnn1. 人脸检测1.1 人脸检测概述1.2 人脸检测的难点1.3 人脸检测的应用场景2. mtcnn2.1 mtcnn概述2.2 mtcnn的网络结构2.3 图像金字塔2.4 P-Net2.5 R-Net2…...

OpenCV入门(十八)快速学会OpenCV 17 直线检测

OpenCV入门&#xff08;十八&#xff09;快速学会OpenCV 17 直线检测1.霍夫直线变换概述2.霍夫变换原理3.操作实例3.1 HoughLines函数3.2 HoughLinesP函数作者&#xff1a;Xiou 1.霍夫直线变换概述 霍夫变换是一种在图像中寻找直线、圆形以及其他简单形状的方法。霍夫变换采用…...

nginx快速入门.跟学B站nginx一小时精讲课程笔记

nginx快速入门.跟学B站nginx一小时精讲课程笔记nginx简介及环境准备nginx简介环境准备一、nginx 安装1.使用yum安装2.常用命令3.使用systemctl启动、停止、重新加载4.配置文件5.配置文件结构二、配置静态web1.静态网页配置2.listen监听3.server_name4.location三、HTTP反向代理…...

内存泄漏定位工具之 valgrind

内存泄漏检测工具 文章目录内存泄漏检测工具一、valgrind介绍1. memcheck2. cachegrind3. helgrind二、源码下载三、命令操作1.memcheck 工具四、虚拟机下使用1. x86编译2. 正常程序测试3. 申请内存不释放测试4. 内存越界的测试5. 读写已经释放的内存五、ARM平台使用1.交叉编译…...

Django(一)安装

好久没更新了 学习的内容太多了有点杂 一时不知道从何说起 !!! 对于Django我也不是很了解 在网上搜了个词条就是以下显示 我目前的了解也仅限于此 希望在接下来的学习过程中 有更多的学习体会可以和大家分享 一涉及到在对应python环境 下载东西时思维就会很混乱 这里再把之前…...

Ubuntu系统下交叉编译openssl

一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机&#xff1a;Ubuntu 20.04.6 LTSHost&#xff1a;ARM32位交叉编译器&#xff1a;arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

Python 实现 Web 静态服务器(HTTP 协议)

目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1&#xff09;下载安装包2&#xff09;配置环境变量3&#xff09;安装镜像4&#xff09;node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1&#xff09;使用 http-server2&#xff09;详解 …...

提升移动端网页调试效率:WebDebugX 与常见工具组合实践

在日常移动端开发中&#xff0c;网页调试始终是一个高频但又极具挑战的环节。尤其在面对 iOS 与 Android 的混合技术栈、各种设备差异化行为时&#xff0c;开发者迫切需要一套高效、可靠且跨平台的调试方案。过去&#xff0c;我们或多或少使用过 Chrome DevTools、Remote Debug…...

实战三:开发网页端界面完成黑白视频转为彩色视频

​一、需求描述 设计一个简单的视频上色应用&#xff0c;用户可以通过网页界面上传黑白视频&#xff0c;系统会自动将其转换为彩色视频。整个过程对用户来说非常简单直观&#xff0c;不需要了解技术细节。 效果图 ​二、实现思路 总体思路&#xff1a; 用户通过Gradio界面上…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...

ubuntu22.04有线网络无法连接,图标也没了

今天突然无法有线网络无法连接任何设备&#xff0c;并且图标都没了 错误案例 往上一顿搜索&#xff0c;试了很多博客都不行&#xff0c;比如 Ubuntu22.04右上角网络图标消失 最后解决的办法 下载网卡驱动&#xff0c;重新安装 操作步骤 查看自己网卡的型号 lspci | gre…...

6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础

第三周 Day 3 &#x1f3af; 今日目标 理解类&#xff08;class&#xff09;和对象&#xff08;object&#xff09;的关系学会定义类的属性、方法和构造函数&#xff08;init&#xff09;掌握对象的创建与使用初识封装、继承和多态的基本概念&#xff08;预告&#xff09; &a…...

高考志愿填报管理系统---开发介绍

高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发&#xff0c;采用现代化的Web技术&#xff0c;为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## &#x1f4cb; 系统概述 ### &#x1f3af; 系统定…...