使用Akka的Actor模拟Spark的Master和Worker工作机制
使用Akka的Actor模拟Spark的Master和Worker工作机制
Spark的Master和Worker协调工作原理
在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程:
- Worker 启动后,会向预先配置的 Master 节点发送注册请求。
- Master 接收到注册请求后,会为该 Worker 创建一个唯一的标识符(Worker ID)并将其信息保存在内存中。
- Master 向 Worker 发送包含 Master URL、Worker ID 等信息的注册响应。
- Worker 收到注册响应后,会启动一个定时器并开始周期性地向 Master 发送心跳消息。
- Worker 的心跳消息中包含当前的负载状况、可用资源等信息。
- Master 接收到心跳消息后,更新该 Worker 的最近心跳时间,并根据需要对集群进行动态调整,例如添加新的任务或删除故障的 Worker。
- 如果 Master 在一段时间内没有收到某个 Worker 的心跳消息,它将把该 Worker 标记为失效,并将其相应的资源标记为可用以供后续使用。
具体原理如下:
- Worker 通过网络向 Master 发送心跳消息,通常使用基于 TCP 的长连接。这些心跳消息可以包含有关 Worker 健康状况、资源利用情况等的信息。
- Master 使用一个内部的心跳管理组件来处理接收到的心跳消息,并维护每个 Worker 的状态。它根据心跳消息的频率和时间戳来判断 Worker 是否正常运行。
- 如果 Master 在预定的时间内没有收到 Worker 的心跳消息,它会将该 Worker 标记为失效并触发一系列的故障处理机制,例如重新分配任务给其他可用的 Worker。
- Worker 定期发送心跳消息,以确保在网络故障、Worker 故障或其他问题发生时能够及时通知 Master。
通过心跳机制,Master 能够实时监控 Worker 的状态,并根据需要进行集群的动态管理和资源调度,从而实现高可用性和容错性。
使用Akka的Actor模拟Spark的Master和Worker工作机制
- worker注册到Master, Master完成注册,并回复worker注册成功。
- worker定时发送心跳,并在Master接收到。
- Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间。
- 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除。
- master worker 进行分布式部署(Linux系统)-》如何给maven项目打包->上传linux。
- 创建SparkMaster类继承Actor特质,实现Receive方法,并定义对应的伴生对象,在伴生对象中创建SparkMaster-actor引用,并启动Actor发送消息。服务端Master对worker进行心跳监测,发现6秒内无法获取worker心跳,将异常的Worker的实例从HashMap中移除。若能正常获取到心跳,则获取心跳信息后更新心跳时间。定时保持心跳机制。
代码实现:
class SparkMaster extends Actor {//定义个hashMap,管理workers(所有worker的实例)val workers = mutable.Map[String, WorkerInfo]()override def receive: Receive = {case "start" => {println("master服务器启动了...")//这里开始。。self ! StartTimeOutWorker}case RegisterWorkerInfo(id, cpu, ram) => {//接收到worker注册信息if (!workers.contains(id)) {//创建WorkerInfo 对象val workerInfo = new WorkerInfo(id, cpu, ram)//加入到workersworkers += ((id, workerInfo))println("服务器的workers=" + workers)//回复一个消息,说注册成功sender() ! RegisteredWorkerInfo}}case HeartBeat(id) => {//更新对应的worker的心跳时间//1.从workers对应的HashMap中取出WorkerInfo,然后更新worker心跳时间val workerInfo = workers(id)workerInfo.lastHeartBeat = System.currentTimeMillis()println("master更新了 " + id + " 心跳时间...")}case StartTimeOutWorker => {println("开始了定时检测worker心跳的任务")import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 9000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. RemoveTimeOutWorker 发送的内容context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)}//对RemoveTimeOutWorker消息处理//这里需求检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除case RemoveTimeOutWorker => {//首先将所有的 workers 的 所有WorkerInfoval workerInfos = workers.valuesval nowTime = System.currentTimeMillis()//先把超时的所有workerInfo,删除即可workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeat) > 6000).foreach(workerInfo => workers.remove(workerInfo.id))println("当前有 " + workers.size + " 个worker存活的")}}
}object SparkMaster {def main(args: Array[String]): Unit = {//这里我们分析出有3个host,port,sparkMasterActorif (args.length != 3) {println("请输入参数 host port sparkMasterActor名字")sys.exit()}val host = args(0)val port = args(1)val name = args(2)//先创建ActorSystemval config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${host}|akka.remote.netty.tcp.port=${port}""".stripMargin)val sparkMasterSystem = ActorSystem("SparkMaster", config)//创建SparkMaster -actorval sparkMasterRef = sparkMasterSystem.actorOf(Props[SparkMaster], s"${name}")//启动SparkMastersparkMasterRef ! "start"}
}
- 定义SparkWorker类继承Actor特质,实现Receive方法,在方法中实现向master发送注册信息的请求,获取到服务端Master注册成功的消息后,定义定时任务发送心跳包给Master。
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{//masterProxy是Master的代理/引用refvar masterPorxy :ActorSelection = _val id = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {println("preStart()调用")//初始化masterPorxymasterPorxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}")println("masterProxy=" + masterPorxy)}override def receive:Receive = {case "start" => {println("worker启动了")//发出一个注册消息masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)}case RegisteredWorkerInfo => {println("workerid= " + id + " 注册成功~")//当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 3000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. SendHeartBeat 发送的内容context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)}case SendHeartBeat =>{println("worker = " + id + "给master发送心跳")masterPorxy ! HeartBeat(id)}}
}object SparkWorker {def main(args: Array[String]): Unit = {if (args.length != 6) {println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")sys.exit()}val workerHost = args(0)val workerPort = args(1)val workerName = args(2)val masterHost = args(3)val masterPort = args(4)val masterName = args(5)val config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${workerHost}|akka.remote.netty.tcp.port=${workerPort}""".stripMargin)//创建ActorSystemval sparkWorkerSystem = ActorSystem("SparkWorker",config)//创建SparkWorker 的引用/代理val sparkWorkerRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s"${workerName}")//启动actorsparkWorkerRef ! "start"}
}
- 分别定义发送注册信息的RegisterWorkerInfo的样例类,WorkerInfo消息类,定义注册成功的消息样例对象RegisteredWorkerInfo,心跳信息样例类HeartBeat,以及确认发送心跳信息样例对象SendHeartBeat,触发超时work的样例对象StartTimeOutWorker,移除超时worker的样例对象RemoveTimeOutWorker。
代码如下:
// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {var lastHeartBeat : Long = System.currentTimeMillis()
}// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker
运行效果:

通过这个案例我们可以深入理解Spark的Master和Worker的通讯机制,为了方便以后对Spark的底层源码的学习,命名的方式和源码保持一致.(如: 通讯消息类命名就是一样的);同时也加深了我们对主从服务心跳检测机制(HeartBeat)的理解,方便以后spark源码二次开发。
相关文章:
使用Akka的Actor模拟Spark的Master和Worker工作机制
使用Akka的Actor模拟Spark的Master和Worker工作机制 Spark的Master和Worker协调工作原理 在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程: Worker 启动后,…...
文心一言api接入如何在你的项目里使用文心一言
文心一言api接入在项目里接入文心一言 一、百度文心一言API二、使用步骤1、接口2、请求参数3、请求参数示例4、接口 返回示例 三、 如何获取appKey和uid1、申请appKey:2、获取appKey和uid 四、重要说明 一、百度文心一言API 基于百度文心一言语言大模型的智能文本对话AI机器人…...
Python匿名函数lambda(R与Python第五篇)
目录 一、为什么要引入“lambda函数”? 二、匿名函数的两种用法 参考: 本文来源:《Python全案例学习与实践》(2019年9月出版,电子工业出版社) Python允许使用一种无名的函数,称其为匿名函数…...
【2023校园招聘】 钉钉AI应用开发平台开始校招拉~
【岗位职责】 负责钉钉AI Paas 产品化研发落地,包含但不限于: 1. 用户意图理解、任务规划、服务推荐等算法的设计和开发 2. 基于大模型落地各种落地应用,缩短大模型与真实应用场景的距离 3. 负责算法的工程化落地,包括算法的代…...
Linux系统gdb调试常用命令
GDB(GNU调试器)是一款常用的调试工具,用于调试C、C等编程语言的程序。以下是一些常用的GDB命令: 1. 启动程序: - gdb <executable>:启动GDB调试器,并加载可执行文件。 2. 设置断点&a…...
Sumo中Traci.trafficlight详解(上)
Sumo中Traci.trafficlight详解(上) 记录慢慢学习traci的每一天,希望也能帮到你 文章目录 Sumo中Traci.trafficlight详解(上)Traci.trafficlight信号灯参数讲解1.getAllProgramLogics(self,tlsID)2.getBlockingVehicle…...
手写Mybatis:第13章-通过注解配置执行SQL语句
文章目录 一、目标:注解配置执行SQL二、设计:注解配置执行SQL三、实现:注解配置执行SQL3.1 工程结构3.2 注解配置执行SQL类图3.3 脚本语言驱动器3.3.1 脚本语言驱动器接口3.3.2 XML语言驱动器 3.4 注解配置构建器3.4.1 定义增删改查注解3.4.2…...
spring security - 快速整合 springboot
1.引入依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spr…...
NPM 常用命令(二)
目录 1、npm bugs 1.1 配置 browser registry 2、npm cache 2.1 概要 2.2 详情 2.3 关于缓存设计的说明 2.4 配置 cache 3、 npm ci 3.1 描述 3.2 配置 install-strategy legacy-bundling global-style omit strict-peer-deps foreground-scripts ignore-s…...
ctfhub ssrf(3关)
文章目录 内网访问伪协议读取文件扫描端口 内网访问 根据该题目,是让我们访问127.0.0.1/falg.php,访问给出的链接后用bp抓包,修改URL,发送后得到flag: 伪协议读取文件 这题的让我们用伪协议,而网站的目录…...
跨源资源共享(CORS)Access-Control-Allow-Origin
1、浏览器的同源安全策略 没错,就是这家伙干的,浏览器只允许请求当前域的资源,而对其他域的资源表示不信任。那怎么才算跨域呢? 请求协议http,https的不同域domain的不同端口port的不同 好好好,大概就是这么回事啦&…...
【嵌入式软件开发 】学习笔记
本文主要记录 【嵌入式软件开发】 学习笔记,参考相关大佬的文章 1.RTOS 内功修炼笔记 RTOS内功修炼记(一)—— 任务到底应该怎么写? RTOS内功修炼记(二)—— 优先级抢占式调度到底是怎么回事?…...
CentOS 7上安装Python 3.11.5,支持Django
CentOS 7上安装Python 3.11.5,支持Django 今天安装django,报了“Django - deterministicTrue requires SQLite 3.8.3 or higher upon running python manage.py runserver”。查了一番资料,记录下来。 参考链接: 参考链接: Django的web项目…...
COMPFEST 15H「组合数学+容斥」
Problem - H - Codeforces 题意: 定义一个集合S为T的孩子是,对于S中的每一个元素x,在T中都能找到x1。 给定n,k,每一个集合中的元素x必须满足 1 < x < k 1<x<k 1<x<k且 c n t [ x ] < 1 cnt[x…...
react快速开始(三)-create-react-app脚手架项目启动;使用VScode调试react
文章目录 react快速开始(三)-create-react-app脚手架项目启动;使用VScode调试react一、create-react-app脚手架项目启动1. react-scripts2. 关于better-npm-runbetter-npm-run安装 二、使用VScode调试react1. 浏览器插件React Developer Tools2. 【重点】用 VSCode …...
【C++入门】string类常用方法(万字详解)
目录 1.STL简介1.1什么是STL1.2STL的版本1.3STL的六大组件1.4STL的缺陷 2.string类的使用2.1C语言中的字符串2.2标准库中的string类2.3string类的常用接口说明 (只讲解最常用的接口)2.3.1string类对象的常见构造2.3.2 string类对象的容量操作2.3.3string…...
大数据错误
question1 : Could not locate Hadoop executable: D:\hadoop-3.3.1\bin\winutils.exe - 【已解决】Could not locate executable E:\Hadoop\bin\winutils.exe in the Hadoop binaries._could not locate executable e:\hadoop-3.3.1\bin\wi_君问归期魏有期的博客-CSDN博客 q…...
【Node.js】Express-Generator:快速生成Express应用程序的利器
在Node.js世界中,Express是一个广泛使用的、强大的Web应用程序框架。它为开发者提供了一系列的工具和选项,使得创建高效且可扩展的Web应用程序变得轻而易举。然而,对于初学者来说,配置和初始化Express应用程序可能会有些困难。为了…...
SpringMVC的工作流程及入门
目录 一、概述 ( 1 ) 是什么 ( 2 ) 作用 二、工作流程 ( 1 ) 流程 ( 2 ) 步骤 三、入门实例 ( 1 ) 入门实例 ( 2 ) 静态资源处理 给我们带来的收获 一、概述 ( 1 ) 是什么 SpringMVC是一个基于Java的Web应用开发框架,它是Spring Framework的一部…...
logging.level的含义及设置 【java 日志 (logback、log4j)】
日志级别 trace<debug<info<warn<error<fatal 常用的有:debug,info,warn,error 通常我们想设置日志级别,会用到 logging.level.rootinfo logging.level设置日志级别,后面跟生效的区域。r…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
基于鸿蒙(HarmonyOS5)的打车小程序
1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...
热烈祝贺埃文科技正式加入可信数据空间发展联盟
2025年4月29日,在福州举办的第八届数字中国建设峰会“可信数据空间分论坛”上,可信数据空间发展联盟正式宣告成立。国家数据局党组书记、局长刘烈宏出席并致辞,强调该联盟是推进全国一体化数据市场建设的关键抓手。 郑州埃文科技有限公司&am…...
论文阅读:Matting by Generation
今天介绍一篇关于 matting 抠图的文章,抠图也算是计算机视觉里面非常经典的一个任务了。从早期的经典算法到如今的深度学习算法,已经有很多的工作和这个任务相关。这两年 diffusion 模型很火,大家又开始用 diffusion 模型做各种 CV 任务了&am…...
Python训练营-Day26-函数专题1:函数定义与参数
题目1:计算圆的面积 任务: 编写一个名为 calculate_circle_area 的函数,该函数接收圆的半径 radius 作为参数,并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求:函数接收一个位置参数 radi…...
STM32标准库-ADC数模转换器
文章目录 一、ADC1.1简介1. 2逐次逼近型ADC1.3ADC框图1.4ADC基本结构1.4.1 信号 “上车点”:输入模块(GPIO、温度、V_REFINT)1.4.2 信号 “调度站”:多路开关1.4.3 信号 “加工厂”:ADC 转换器(规则组 注入…...
如何把工业通信协议转换成http websocket
1.现状 工业通信协议多数工作在边缘设备上,比如:PLC、IOT盒子等。上层业务系统需要根据不同的工业协议做对应开发,当设备上用的是modbus从站时,采集设备数据需要开发modbus主站;当设备上用的是西门子PN协议时…...
高端性能封装正在突破性能壁垒,其芯片集成技术助力人工智能革命。
2024 年,高端封装市场规模为 80 亿美元,预计到 2030 年将超过 280 亿美元,2024-2030 年复合年增长率为 23%。 细分到各个终端市场,最大的高端性能封装市场是“电信和基础设施”,2024 年该市场创造了超过 67% 的收入。…...
