深度了解flink rpc机制(四) 组件启动流程源码分析
前言
目前已发布了3篇关于Flink RPC相关的文章,分别从底层通信系统akka/Pekko,RPC实现方式动态代理以及Flink RPC相关的组件做了介绍
深度了解flink rpc机制(一)-Akka/Pekko_flink pekko akka-CSDN博客
深度了解flink rpc机制(二)-动态代理-CSDN博客
深度了解flink rpc机制(三)-组件以及交互-CSDN博客
这篇文章通过分析源码,对以上知识进行验证并串联加深印象,更深入的了解Flink RPC的实现原理。本篇文章分享TaskManager启动和向ResouceManager注册的流程,TaskManager在flink 1.12之后被更名为TaskExecutor,可能文章中两个名称都会使用,大家理解成一个就行。
TaskManage启动源码分析
入口类
TaskManager的启动类入口,以Flink的Standalone模式为例,可以在flink目录下的bin目录的flink-daemon.sh找到入口类:
. "$bin"/config.shcase $DAEMON in(taskexecutor)CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(zookeeper)CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(historyserver)CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer;;(standalonesession)CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;;(*)echo "Unknown daemon '${DAEMON}'. $USAGE."exit 1;;
esac
从这里可以看到Standalon模式下各个组件的启动类入口,TaskManager的入口类是TaskManageRunner,做为组件的入口类,肯定会有main方法:
public static void main(String[] args) throws Exception {// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);SignalHandler.register(LOG);JvmShutdownSafeguard.installAsShutdownHook(LOG);long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();if (maxOpenFileHandles != -1L) {LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);} else {LOG.info("Cannot determine the maximum number of open file descriptors");}//安装的方式启动taskmanager进程 runTaskManagerProcessSecurely(args);}
之后就是在TaskManageRunner的方法调用了,最终会进入到runTaskManager这个静态方法
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)throws Exception {final TaskManagerRunner taskManagerRunner;try {//之前方法都是静态方法调用,初始化taskManagerRunner对象taskManagerRunner =new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService);//开始创建TaskmanagertaskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}}
之前一直是在调用TaskManageRunner的静态方法做一些日志加载,安全检查的前置校验,此时才真正的实例化TaskManageRunner对象,调用start方法进行TaskManager的创建
//taskManagerRunner.start()public void start() throws Exception {synchronized (lock) {startTaskManagerRunnerServices();taskExecutorService.start();}
}
创建RpcService和TaskExecutor
taskManagerRunner.start()方法内部有两个方法的调用
- startTaskManagerRunnerServices()
private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {rpcSystem = RpcSystem.load(configuration);//非RPC相关 代码省略JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));//创建rpcServicerpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);//非RPC相关 代码省略//创建TaskExecutortaskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);}}
可以看到这个方法首先调用createRpcService这个方法,这个方法内部内就是去创建ActorSystem,初始化RpcService
初始化RpcServer和PekkoInvocationHandler
然后就是创建TaskExecutor,TaskExecutor继承自EndPoint,EndPoint构造方法执行的时候会初始化RpcServer
/*** Initializes the RPC endpoint.** @param rpcService The RPC server that dispatches calls to this RPC endpoint.* @param endpointId Unique identifier for this endpoint*/protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");//创建RpcServer 方法内部//1.创建Acotr通信对象PekkoRpcActor//2.对象动态代理对象PekkoInvocationHandler赋值给rpcServerthis.rpcServer = rpcService.startServer(this);this.resourceRegistry = new CloseableRegistry();this.mainThreadExecutor =new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId);registerResource(this.mainThreadExecutor);}
- taskExecutorService.start()
这个方法会调用TaskExecutor对象的start方法,会调用父类EndPoint的start方法
/*** Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc* endpoint is ready to process remote procedure calls.*/public final void start() {rpcServer.start();}
rpcServer.start()方法如下
public void start() {//rpcEndpoint是Actor对象rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());}
这块儿代码就是taskmanger初始化后自己会给自己发送一个Akka START控制类的消息,准确来说是继承了EndPoint的类都会在初始化之后给自身发送一个这样的消息。
因为发的是Akka的消息,会进入到TaskExecutor的PekkoInvocationHandler#createReceive接收Akka消息的逻辑
//构造方法PekkoRpcActor(final T rpcEndpoint,final CompletableFuture<Boolean> terminationFuture,final int version,final long maximumFramesize,final boolean forceSerialization,final ClassLoader flinkClassLoader) {//省略其他代码//PekkoPrcActor初始化 会将state枚举值设置为StoppedState.STOPPEDthis.state = StoppedState.STOPPED;}//接收消息@Overridepublic Receive createReceive() {return ReceiveBuilder.create()//匹配到握手消息.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)//控制类消息.match(ControlMessages.class, this::handleControlMessage)//除以上两种之外的任意消息.matchAny(this::handleMessage).build();//处理控制类消息的逻辑 private void handleControlMessage(ControlMessages controlMessage) {try {switch (controlMessage) {case START:state = state.start(this, flinkClassLoader);break;case STOP:state = state.stop();break;case TERMINATE:state = state.terminate(this, flinkClassLoader);break;default:handleUnknownControlMessage(controlMessage);}} catch (Exception e) {this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);throw e;}}
PekkoRpcActor在初始化的时候会 将自身state属性设置为StoppedState.STOPPED;
接收到ControlMessages.START消息,会走到handleControlMessage方法的case stop分支,因为state是StoppedState.STOPPED,所以代码会走到StoppedState这个静态枚举类的start方法
public State start(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {pekkoRpcActor.mainThreadValidator.enterMainThread();try {runWithContextClassLoader(() -> pekkoRpcActor.rpcEndpoint.internalCallOnStart(), flinkClassLoader);} catch (Throwable throwable) {pekkoRpcActor.stop(RpcEndpointTerminationResult.failure(new RpcException(String.format("Could not start RpcEndpoint %s.",pekkoRpcActor.rpcEndpoint.getEndpointId()),throwable)));} finally {pekkoRpcActor.mainThreadValidator.exitMainThread();}return StartedState.STARTED;}
pekkoRpcActor.rpcEndpoint.internalCallOnStart()这块儿代码是关键,又指定到了Endpoint定义的方法,
public final void internalCallOnStart() throws Exception {validateRunsInMainThread();isRunning = true;onStart();}protected void onStart() throws Exception {}
这块儿代码饶了半天,其实用大白话来讲就是Flink任何需要进行通信的组件都要继承Endpoint类,组件初始化之前会先初始化RpcService对象作为Endpoint子类的成员变量,然后再由RpcService初始化ActorSystem,创建Actor和代理对象,之后再给自身发一个控制类的START方法,最后一定要进入到自身的onStart方法
TaskExecutor向ResourceManager注册流程
onStart方法开始进入到向ResourceManager注册的流程
@Overridepublic void onStart() throws Exception {try {//开始向ResourceManager注册startTaskExecutorServices();} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout();}private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManager//new ResourceManagerLeaderListener()是真正注册的代码resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());//省略其他代码} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}
new ResourceManagerLeaderListener()是真正注册的方法
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {@Overridepublic void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {runAsync(() ->notifyOfNewResourceManagerLeader(leaderAddress,ResourceManagerId.fromUuidOrNull(leaderSessionID)));}@Overridepublic void handleError(Exception exception) {onFatalError(exception);}}
再进入到notifyOfNewResourceManagerLeader方法内部
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {//获取ResouceManager的通信地址resourceManagerAddress =createResourceManagerAddress(newLeaderAddress, newResourceManagerId);//尝试连接ResouceMnangerreconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s",resourceManagerAddress)));
}
reconnectToResourceManager方法内部
private void reconnectToResourceManager(Exception cause) {//如果已存在ResourceManger的连接 关闭连接closeResourceManagerConnection(cause);//设置注册超时时间startRegistrationTimeout();//继续尝试连接ResouceManagertryConnectToResourceManager();}
tryConnectToResourceManager();
private void tryConnectToResourceManager() {if (resourceManagerAddress != null) {connectToResourceManager();}}private void connectToResourceManager() {assert (resourceManagerAddress != null);assert (establishedResourceManagerConnection == null);assert (resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);//封装taskExecutor的信息:地址 硬件资源 内存资源final TaskExecutorRegistration taskExecutorRegistration =new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile(),unresolvedTaskManagerLocation.getNodeId());resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();}
进入到connectToResourceManager方法,封装注册信息。进入start方法
public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null,"The RPC connection is already started");//创建注册成功、注册失败的回调方法final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {//开始主持newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}
首先创建注册成功和主持失败的回调方法,然后继续进入注册的流程
public void startRegistration() {//创建动态代理对象final CompletableFuture<G> rpcGatewayFuture;//ResourceManager可能有主从,所以走Fenced这块儿if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture =(CompletableFuture<G>)rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = rpcService.connect(targetAddress, targetType);}//省略其他代码
}private <C extends RpcGateway> CompletableFuture<C> connectInternal(final String address,final Class<C> clazz,Function<ActorRef, InvocationHandler> invocationHandlerFactory) {checkState(!stopped, "RpcService is stopped");//省略无关代码//握手确保连接正常final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =final CompletableFuture<C> gatewayFuture =actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler =invocationHandlerFactory.apply(actorRef);ClassLoader classLoader = getClass().getClassLoader();//真正核心的代码 创建代理的实现@SuppressWarnings("unchecked")C proxy =(C)Proxy.newProxyInstance(classLoader,new Class<?>[] {clazz},invocationHandler);return proxy;},actorSystem.dispatcher());return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);}
然后就会走到RpcService获取到ReouceManager的代理,然后将代理对象和主持方法通过akka消息发送到ResouceManager的RpcActor,然后进入消息处理,执行代理的对象的注册方法,也就是ResouceManager的注册方法,从而将TaskManager进行注册
启动注册流程图

相关文章:
深度了解flink rpc机制(四) 组件启动流程源码分析
前言 目前已发布了3篇关于Flink RPC相关的文章,分别从底层通信系统akka/Pekko,RPC实现方式动态代理以及Flink RPC相关的组件做了介绍 深度了解flink rpc机制(一)-Akka/Pekko_flink pekko akka-CSDN博客 深度了解flink rpc机制&…...
C++基于opencv的视频质量检测--遮挡检测
文章目录 0.引言1. 原始代码分析1.1 存在的问题 2. 优化方案3. 优化后的代码4. 代码详细解读4.1. 输入检查4.2. 图像预处理4.3. 高斯模糊4.4. 梯度计算4.5. 计算梯度幅值和方向4.6. 边缘检测4.7. 计算边缘密度4.8. 估计遮挡程度4.9. 限定结果范围4.10. 返回结果 0.引言 视频质…...
手机玩潜水员戴夫?GameViewer远程如何随时随地玩潜水员戴夫教程
如果你是潜水员戴夫的忠实玩家,你知道如何在手机上玩潜水员戴夫吗?潜水员戴夫是一个以神秘蓝洞为背景的海洋冒险游戏。在这个游戏里你白天可以在美丽的大海里打鱼,晚上可以经营寿司店。现在这个游戏也能实现用手机随时随地畅玩了!…...
UE5 喷射背包
首选创建一个输入操作 然后在输入映射中添加,shift是向上飞,ctrl是向下飞 进入人物蓝图中编写逻辑,变量HaveJatpack默认true,Thrust为0 最后...
【Vue3】第三篇
Vue3学习第三篇 01. 组件组成02. 组件嵌套关系03. 组件注册方式04. 组件传递数据Props05. 组件传递多种数据类型06. 组件传递Props校验07. 组件事件08. 组件事件配合v-model使用09. 组件数据传递10. 透传Attributes 01. 组件组成 在vue当中,组件是最重要的知识&…...
c++二级指针
如果要通过函数改变一个指针的值,要往函数中传入指针的指针 如果要通过函数改变一个变量的值,那就要往函数中传入这个变量的地址 改变a的值和b的值 #include <iostream>using namespace std;void swap(int* a, int* b) {int temp *a;*a *b;*b …...
客户端存储 — IndexedDB 实现分页查询
前言 相信 IndexedDB 大家都有过了解,但是不一定每个人都有过实践,并且其中涉及到事务、游标等概念,会导致在初次使用时会有些不适应,那么本文会通过 IndexedDB 实现分页查询的形式进行实践,在开始之前,可…...
logback 如何将日志输出到文件
如何作 将日志输出到文件需要使用 RollingFileAppender,该 Appender 必须定义 rollingPolicy ,另外 rollingPollicy 下必须定义 fileNamePattern 和 encoder <appender name"fileAppender" class"ch.qos.logback.core.rolling.Rollin…...
Files.newBufferedReader和Files.readAllLines
在Java中,Files.newBufferedReader 和 Files.readAllLines 都是用于从文件中读取数据的工具方法,但它们的使用场景和功能有所不同。下面我将详细解释这两个方法的含义、用途、区别、优缺点以及各自的使用场景。 1. Files.newBufferedReader 含义和用途…...
MySQL 数据库备份与恢复全攻略
MySQL 数据库备份与恢复全攻略 引言 在现代应用中,数据库是核心组件之一。无论是个人项目还是企业级应用,数据的安全性和完整性都至关重要。为了防止数据丢失、损坏或意外删除,定期备份数据库是必不可少的。本文将详细介绍 MySQL 数据库的备…...
Appium中的api(一)
目录 1.基础python代码准备 1--参数的一些说明 2--python内所要编写的代码 解释 2.如何获取包名和界面名 1-api 2-完整代码 代码解释 3.如何关闭驱动连接 4.安装卸载app 1--卸载 2--安装 5.判断app是否安装 6.将应用放到后台在切换为前台的时间 7.UIAutomatorViewer的使用 1--找…...
【AI辅助设计】没错!训练FLUX LoRA就这么简单!
前言 得益于开源社区的力量,在各位大佬的努力下,现在16G VRAM的家用电脑也可以训练FLUX的LoRA了 👏。 今天我使用fluxgym这个方法,训练LoRA,并记录过程。 篇幅有限,这里就不一一展示了,有需要的…...
Mac 下安装FastDFS
首先我们需要下载相对应的安装包: libfastcommonFastDFS 下载完成后我们先将其解压到桌面。 1.安装libfastcommon 我们进入到libfastcommon-master目录中执行./make.sh和sudo ./make.sh install,具体代码如下: 2.安装FastDFS 同安装libfa…...
人工智能的未来:重塑生活与工作的变革者
随着人工智能(AI)技术的快速发展,我们正处于一个前所未有的变革时代。AI不仅在医疗、企业运营和日常生活中发挥着重要作用,而且正在重新定义我们的生活和工作方式。本文将探讨人工智能技术的应用前景以及它如何改变我们的生活和工…...
【微服务】Java 对接飞书多维表格使用详解
目录 一、前言 二、前置操作 2.1 开通企业飞书账户 2.2 确保账户具备多维表操作权限 2.3 创建一张测试用的多维表 2.4 获取飞书开放平台文档 2.5 获取Java SDK 三、应用App相关操作 3.1 创建应用过程 3.2 应用发布过程 3.3 应用添加操作权限 四、多维表应用授权操作…...
学习threejs,使用粒子实现下雪特效
👨⚕️ 主页: gis分享者 👨⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️THREE.Points简介1.11 ☘️…...
unity3d——Time
在Unity3D中,Time类是一个非常重要的工具类,它提供了一系列与时间相关的属性和方法,帮助开发者在游戏中实现各种时间相关的操作。以下是一些Time类常用的方法及其示例: 一、常用属性 Time.time 含义:表示从游戏开始到…...
天地图实现海量聚合marker--uniapp后端详细实现
本文章详细的讲解了前后端代码来 实现uniapp天地图功能的实现 以及 后端海量数据的聚合查询 和网格算法实现思路。 并对当数据量增加和用户频繁请求接口时可能导致服务器负载过高做了前后端优化。 前端uniapp: 实现了天地图的行政区划边界/地图切换/比例尺/海量数…...
Bug | 项目中数据库查询问题
问题描述 理论上,点击查询后,表头应当显示中文。而不是上面的在数据库中的表头【如上图示】 正常点击查询后,如果没有输入值,应当是查询所有的信息。 原因分析: 这里是直接使用SELECT * 导致的。例如: S…...
C++入门基础知识129—【关于C 库函数 - time()】
成长路上不孤单😊😊😊😊😊😊 【14后😊///C爱好者😊///持续分享所学😊///如有需要欢迎收藏转发///😊】 今日分享关于C 库函数 - time()的相关内容࿰…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
MySQL JOIN 表过多的优化思路
当 MySQL 查询涉及大量表 JOIN 时,性能会显著下降。以下是优化思路和简易实现方法: 一、核心优化思路 减少 JOIN 数量 数据冗余:添加必要的冗余字段(如订单表直接存储用户名)合并表:将频繁关联的小表合并成…...
Visual Studio Code 扩展
Visual Studio Code 扩展 change-case 大小写转换EmmyLua for VSCode 调试插件Bookmarks 书签 change-case 大小写转换 https://marketplace.visualstudio.com/items?itemNamewmaurer.change-case 选中单词后,命令 changeCase.commands 可预览转换效果 EmmyLua…...
C++_哈希表
本篇文章是对C学习的哈希表部分的学习分享 相信一定会对你有所帮助~ 那咱们废话不多说,直接开始吧! 一、基础概念 1. 哈希核心思想: 哈希函数的作用:通过此函数建立一个Key与存储位置之间的映射关系。理想目标:实现…...
Xcode 16 集成 cocoapods 报错
基于 Xcode 16 新建工程项目,集成 cocoapods 执行 pod init 报错 ### Error RuntimeError - PBXGroup attempted to initialize an object with unknown ISA PBXFileSystemSynchronizedRootGroup from attributes: {"isa">"PBXFileSystemSynchro…...
