当前位置: 首页 > news >正文

flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子

背景:

广播状态可以用于规则表或者配置表的实时更新,本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用

BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用

1.首先看主流程,主流程中使用了两个Broadcast广播的状态,这两个Broadcast广播的状态是独立的

 // 这里面包含规则广播状态的两次使用方法,分别在DynamicKeyFunction处理函数和DynamicAlertFunction处理函数,注意这两个处理函数中的广播状态是独立的,也就是需要分别维度,不能共享// Processing pipeline setupDataStream<Alert> alerts =transactions.connect(rulesStream).process(new DynamicKeyFunction()).uid("DynamicKeyFunction").name("Dynamic Partitioning Function").keyBy((keyed) -> keyed.getKey()).connect(rulesStream).process(new DynamicAlertFunction()).uid("DynamicAlertFunction").name("Dynamic Rule Evaluation Function");

2.BroadcastProcessFunction的处理,这里面会维护这个算子本身的广播状态,并把所有的事件扩散发送到下一个算子

public class DynamicKeyFunctionextends BroadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {@Overridepublic void open(Configuration parameters) {}// 这里会把每个事件结合上广播状态中的每个规则生成N条记录,流转到下一个算子@Overridepublic void processElement(Transaction event, ReadOnlyContext ctx, Collector<Keyed<Transaction, String, Integer>> out)throws Exception {ReadOnlyBroadcastState<Integer, Rule> rulesState =ctx.getBroadcastState(Descriptors.rulesDescriptor);forkEventForEachGroupingKey(event, rulesState, out);}// 独立维护广播状态,可以在广播状态中新增删除或者清空广播状态@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Keyed<Transaction, String, Integer>> out) throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule.getControlType(), broadcastState);}}}static void handleRuleBroadcast(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {switch (rule.getRuleState()) {case ACTIVE:case PAUSE:broadcastState.put(rule.getRuleId(), rule);break;case DELETE:broadcastState.remove(rule.getRuleId());break;}}

3.KeyedBroadcastProcessFunction的处理,这里面也是会维护这个算子本身的广播状态,此外还有键值分区状态,特别注意的是在处理广播元素时,可以用applyToKeyedState方法对所有的键值分区状态应用某个方法,对于ontimer方法,依然可以访问键值分区状态和广播状态

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.field.dynamicrules.functions;import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet;
import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;import com.ververica.field.dynamicrules.Alert;
import com.ververica.field.dynamicrules.FieldsExtractor;
import com.ververica.field.dynamicrules.Keyed;
import com.ververica.field.dynamicrules.Rule;
import com.ververica.field.dynamicrules.Rule.ControlType;
import com.ververica.field.dynamicrules.Rule.RuleState;
import com.ververica.field.dynamicrules.RuleHelper;
import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors;
import com.ververica.field.dynamicrules.Transaction;
import java.math.BigDecimal;
import java.util.*;
import java.util.Map.Entry;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;/** Implements main rule evaluation and alerting logic. */
@Slf4j
public class DynamicAlertFunctionextends KeyedBroadcastProcessFunction<String, Keyed<Transaction, String, Integer>, Rule, Alert> {private static final String COUNT = "COUNT_FLINK";private static final String COUNT_WITH_RESET = "COUNT_WITH_RESET_FLINK";private static int WIDEST_RULE_KEY = Integer.MIN_VALUE;private static int CLEAR_STATE_COMMAND_KEY = Integer.MIN_VALUE + 1;private transient MapState<Long, Set<Transaction>> windowState;private Meter alertMeter;private MapStateDescriptor<Long, Set<Transaction>> windowStateDescriptor =new MapStateDescriptor<>("windowState",BasicTypeInfo.LONG_TYPE_INFO,TypeInformation.of(new TypeHint<Set<Transaction>>() {}));@Overridepublic void open(Configuration parameters) {windowState = getRuntimeContext().getMapState(windowStateDescriptor);alertMeter = new MeterView(60);getRuntimeContext().getMetricGroup().meter("alertsPerSecond", alertMeter);}// 键值分区状态和广播状态联合处理,在这个方法中可以更新键值分区状态,然后广播状态只能读取@Overridepublic void processElement(Keyed<Transaction, String, Integer> value, ReadOnlyContext ctx, Collector<Alert> out)throws Exception {long currentEventTime = value.getWrapped().getEventTime();addToStateValuesSet(windowState, currentEventTime, value.getWrapped());long ingestionTime = value.getWrapped().getIngestionTimestamp();ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());if (noRuleAvailable(rule)) {log.error("Rule with ID {} does not exist", value.getId());return;}if (rule.getRuleState() == Rule.RuleState.ACTIVE) {Long windowStartForEvent = rule.getWindowStartFor(currentEventTime);long cleanupTime = (currentEventTime / 1000) * 1000;ctx.timerService().registerEventTimeTimer(cleanupTime);SimpleAccumulator<BigDecimal> aggregator = RuleHelper.getAggregator(rule);for (Long stateEventTime : windowState.keys()) {if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {aggregateValuesInState(stateEventTime, aggregator, rule);}}BigDecimal aggregateResult = aggregator.getLocalValue();boolean ruleResult = rule.apply(aggregateResult);ctx.output(Descriptors.demoSinkTag,"Rule "+ rule.getRuleId()+ " | "+ value.getKey()+ " : "+ aggregateResult.toString()+ " -> "+ ruleResult);if (ruleResult) {if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {evictAllStateElements();}alertMeter.markEvent();out.collect(new Alert<>(rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));}}}//维护广播状态,新增/删除或者整个清空,值得注意的是,处理广播元素时可以对所有的键值分区状态应用某个函数,比如这里当收到某个属于控制消息的广播消息时,使用applyToKeyedState方法把所有的键值分区状态都清空@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);updateWidestWindowRule(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule, broadcastState, ctx);}}private void handleControlCommand(Rule command, BroadcastState<Integer, Rule> rulesState, Context ctx) throws Exception {ControlType controlType = command.getControlType();switch (controlType) {case EXPORT_RULES_CURRENT:for (Map.Entry<Integer, Rule> entry : rulesState.entries()) {ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());}break;case CLEAR_STATE_ALL:ctx.applyToKeyedState(windowStateDescriptor, (key, state) -> state.clear());break;case CLEAR_STATE_ALL_STOP:rulesState.remove(CLEAR_STATE_COMMAND_KEY);break;case DELETE_RULES_ALL:Iterator<Entry<Integer, Rule>> entriesIterator = rulesState.iterator();while (entriesIterator.hasNext()) {Entry<Integer, Rule> ruleEntry = entriesIterator.next();rulesState.remove(ruleEntry.getKey());log.info("Removed Rule {}", ruleEntry.getValue());}break;}}private boolean isStateValueInWindow(Long stateEventTime, Long windowStartForEvent, long currentEventTime) {return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime;}private void aggregateValuesInState(Long stateEventTime, SimpleAccumulator<BigDecimal> aggregator, Rule rule) throws Exception {Set<Transaction> inWindow = windowState.get(stateEventTime);if (COUNT.equals(rule.getAggregateFieldName())|| COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {for (Transaction event : inWindow) {aggregator.add(BigDecimal.ONE);}} else {for (Transaction event : inWindow) {BigDecimal aggregatedValue =FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);aggregator.add(aggregatedValue);}}}private boolean noRuleAvailable(Rule rule) {// This could happen if the BroadcastState in this CoProcessFunction was updated after it was// updated and used in `DynamicKeyFunction`if (rule == null) {return true;}return false;}private void updateWidestWindowRule(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);if (rule.getRuleState() != Rule.RuleState.ACTIVE) {return;}if (widestWindowRule == null) {broadcastState.put(WIDEST_RULE_KEY, rule);return;}if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {broadcastState.put(WIDEST_RULE_KEY, rule);}}// ontimer方法中可以访问/更新键值分区状态,读取广播状态,此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的,不需要考虑并发访问的问题@Overridepublic void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<Alert> out)throws Exception {Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);Optional<Long> cleanupEventTimeWindow =Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);Optional<Long> cleanupEventTimeThreshold =cleanupEventTimeWindow.map(window -> timestamp - window);cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);}private void evictAgedElementsFromWindow(Long threshold) {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {Long stateEventTime = keys.next();if (stateEventTime < threshold) {keys.remove();}}} catch (Exception ex) {throw new RuntimeException(ex);}}private void evictAllStateElements() {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {keys.next();keys.remove();}} catch (Exception ex) {throw new RuntimeException(ex);}}
}

ps: ontimer方法和processElement方法是同步访问的,没有并发的问题,所以不需要考虑同时更新键值分区状态的线程安全问题

参考文献:
https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/

相关文章:

flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子

背景&#xff1a; 广播状态可以用于规则表或者配置表的实时更新&#xff0c;本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流…...

数据中心系统解决方案

设计思路 系统设计过程中充分考虑各个子系统的信息共享要求&#xff0c;对各子系统进行结构化和标准化设计&#xff0c;通过系统间的各种联动方式将其整合成一个有机的整体&#xff0c;使之成为一套整体的、全方位的数据中心大楼综合管理系统&#xff0c;达到人防、物防和技防…...

服务器开设新账户,创建账号并设置密码

实验室又进新同学了&#xff0c;服务器开设新账号搞起来 1、创建用户&#xff1a; 在root权限下&#xff0c;输入命令useradd -m 用户名&#xff0c;如下 sudo useradd -m yonghuming 2、设置密码&#xff1a; 输入命令passwd 用户名 回车&#xff0c;接着输入密码操作&…...

【C++】关于构造函数后面冒号“:“的故事------初始化列表(超详细解析,小白一看就懂)

目录 一、前言 二、 初始化的概念区分 三、初始化列表 &#xff08;重点&#xff09; &#x1f4a6;初始化列表的概念理解 &#x1f4a6;初始化列表的注意事项 四、共勉 一、前言 在之前的博客学习中&#xff0c;我们已经学习了【C】的六大默认成员函数 &#xff0c;想必大…...

【Shell 系列教程】shell基本运算符(四)

文章目录 往期回顾关系运算符布尔运算符逻辑运算符字符串运算符文件测试运算符其他检查符&#xff1a; 往期回顾 【Shell 系列教程】shell介绍&#xff08;一&#xff09;【Shell 系列教程】shell变量&#xff08;二&#xff09;【Shell 系列教程】shell数组&#xff08;三&am…...

MongoDB安装及开发系例全教程

一、系列文章目录 一、MongoDB安装教程—官方原版 二、MongoDB 使用教程(配置、管理、监控)_linux mongodb 监控 三、MongoDB 基于角色的访问控制 四、MongoDB用户管理 五、MongoDB基础知识详解 六、MongoDB—Indexs 七、MongoDB事务详解 八、MongoDB分片教程 九、Mo…...

ffmpeg命令帮助文档

一&#xff1a;帮助文档的命令格式 ffmpeg -h帮助的基本信息ffmpeg -h long帮助的高级信息ffmpeg -h full帮助的全部信息 ffmpeg的命令使用方式&#xff1a;ffmpeg [options] [[infile options] -i infile] [[outfile options] outfile] 二&#xff1a;将帮助文档输出到文件 …...

回归预测 | Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测

Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测 目录 Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量…...

【原创】java+swing+mysql校园共享单车管理系统设计与实现

摘要&#xff1a; 校园共享单车作为一种绿色、便捷的出行方式&#xff0c;在校园内得到了广泛的应用。然而&#xff0c;随着单车数量的增加&#xff0c;管理难度也不断加大。如何提高单车的利用率和管理效率&#xff0c;成为校园共享单车发展面临的重要问题。本文针对这一问题…...

(自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载

(自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载 带后台系统PbootCMS内核开发的网站模板&#xff0c;该模板适用于新闻博客网站、自媒体运营网站等企业&#xff0c;当然其他行业也可以做&#xff0c;只需要把文字图片换成其他行业的即可&#…...

SystemC入门完整编写示例:全加器测试平台

导读: 本文将完整演示基于systemC编写一个全加器的测试平台。具体内容包括&#xff1a;激励平台&#xff0c;监控平台&#xff0c;待测单元的编写&#xff0c;波形文件读取。 1&#xff0c;main函数模块 搭建一个测试平台主要由&#xff1a;Driver, Monitor, DUT(design under …...

动手学深度学习:2.线性回归pytorch实现

动手学深度学习&#xff1a;2.线性回归pytorch实现 1.手动构造数据集2.小批量读取数据集3.定义模型和损失函数4.初始化模型参数5.小批量随机梯度下降优化算法6.训练完整代码Q&A 1.手动构造数据集 import torch from torch.utils import data from d2l import torch as d2l…...

重要的linux指令

系统管理命令 切换用户 su 用户名管理员身份运行 sudo 命令实时显示进程信息(linux下任务管理器) top查看进程信息(ps) ps -efps -ef | grep 进程名 ps -aux | grep 进程名参数说明e 显示所有进程f 全格式a 显示所有程序u 以用户为主的格式来显示程序状况x 显示无控制终端…...

delphi7安装并使用皮肤控件

1、下载控件 我已经上传到云盘&#xff0c;存储位置 2、下载后并解压。 3、打开dephi7&#xff0c;File-Open&#xff0c;打开路径D:\LC\Desktop\vclskin2_XiaZaiBa\d7&#xff0c; 然后将 D:\LC\Desktop\vclskin2_XiaZaiBa\d7文件夹中所有后缀.dcu的文件复制粘贴到delphi安装路…...

安徽省黄山景区免9天门票为哪般?

今日浑浑噩噩地睡了大半天&#xff0c;强撑起身子写网文......可是&#xff0c;题材不好选&#xff0c;本“人民体验官”只得推广人民日报官方微博文化产品《这两个月“黄山每周三免门票”》。 图&#xff1a;来源“人民体验官”推广平台 因年事渐高&#xff0c;又有未愈的呼吸…...

MFC 窗体插入图片

1.制作BMP图像1.bmp 放到res文件夹下&#xff0c;资源视图界面导入res文件夹下的1.bmp 2.添加控件 控件类型修改为Bitmap 图像&#xff0c;选择IDB_BITMAP1 3.效果...

关于中间件技术

中间件是一种独立的系统软件或服务程序&#xff0c;可以帮助分布式应用软件在不同的技术之间共享资源。中间件可以&#xff1a; 1、负责客户机与服务器之间的连接和通信&#xff0c;以及客户机与应用层之间的高效率通信机制。 2、提供应用的负载均衡和高可用性、安全机制与管…...

机器学习中的嵌入:释放表征的威力

简介 机器学习通过使计算机能够从数据学习和做出预测来彻底改变了人工智能领域。机器学习的一个关键方面是数据的表示&#xff0c;因为表示形式的选择极大地影响了算法的性能和有效性。嵌入已成为机器学习中的一种强大技术&#xff0c;提供了一种捕获和编码数据点之间复杂关系的…...

【Midjourney入门教程3】写好prompt常用的参数

文章目录 1、图片描述词&#xff08;图片链接&#xff09;文字描述词后缀参数2、权重划分3、后缀参数版本选择&#xff1a;--v版本风格&#xff1a;--style长宽比&#xff1a;--ar多样性: --c二次元化&#xff1a;--niji排除内容&#xff1a;--no--stylize--seed--tile、--q 4、…...

01-单节点部署clickhouse及简单使用

1、下载rpm安装包&#xff1a; 官网&#xff1a;https://packages.clickhouse.com/rpm/stable/ clickhouse19.4版本之后只需下载3个rpm安装包&#xff0c;上传到节点目录即可 2、rpm包安装&#xff1a; 安装顺序为conmon->server->client 执行 rpm -ivh ./clickhouse-…...

超短脉冲激光自聚焦效应

前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应&#xff0c;这是一种非线性光学现象&#xff0c;主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场&#xff0c;对材料产生非线性响应&#xff0c;可能…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝

目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为&#xff1a;一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!

本文介绍了一种名为AnomalyAny的创新框架&#xff0c;该方法利用Stable Diffusion的强大生成能力&#xff0c;仅需单个正常样本和文本描述&#xff0c;即可生成逼真且多样化的异常样本&#xff0c;有效解决了视觉异常检测中异常样本稀缺的难题&#xff0c;为工业质检、医疗影像…...

如何配置一个sql server使得其它用户可以通过excel odbc获取数据

要让其他用户通过 Excel 使用 ODBC 连接到 SQL Server 获取数据&#xff0c;你需要完成以下配置步骤&#xff1a; ✅ 一、在 SQL Server 端配置&#xff08;服务器设置&#xff09; 1. 启用 TCP/IP 协议 打开 “SQL Server 配置管理器”。导航到&#xff1a;SQL Server 网络配…...

机器学习的数学基础:线性模型

线性模型 线性模型的基本形式为&#xff1a; f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法&#xff0c;得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...

结构化文件管理实战:实现目录自动创建与归类

手动操作容易因疲劳或疏忽导致命名错误、路径混乱等问题&#xff0c;进而引发后续程序异常。使用工具进行标准化操作&#xff0c;能有效降低出错概率。 需要快速整理大量文件的技术用户而言&#xff0c;这款工具提供了一种轻便高效的解决方案。程序体积仅有 156KB&#xff0c;…...

stm32进入Infinite_Loop原因(因为有系统中断函数未自定义实现)

这是系统中断服务程序的默认处理汇编函数&#xff0c;如果我们没有定义实现某个中断函数&#xff0c;那么当stm32产生了该中断时&#xff0c;就会默认跑这里来了&#xff0c;所以我们打开了什么中断&#xff0c;一定要记得实现对应的系统中断函数&#xff0c;否则会进来一直循环…...

MLP实战二:MLP 实现图像数字多分类

任务 实战&#xff08;二&#xff09;&#xff1a;MLP 实现图像多分类 基于 mnist 数据集&#xff0c;建立 mlp 模型&#xff0c;实现 0-9 数字的十分类 task: 1、实现 mnist 数据载入&#xff0c;可视化图形数字&#xff1b; 2、完成数据预处理&#xff1a;图像数据维度转换与…...