当前位置: 首页 > news >正文

使用Akka的Actor模拟Spark的Master和Worker工作机制

使用Akka的Actor模拟Spark的Master和Worker工作机制

Spark的Master和Worker协调工作原理

在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程:

  1. Worker 启动后,会向预先配置的 Master 节点发送注册请求。
  2. Master 接收到注册请求后,会为该 Worker 创建一个唯一的标识符(Worker ID)并将其信息保存在内存中。
  3. Master 向 Worker 发送包含 Master URL、Worker ID 等信息的注册响应。
  4. Worker 收到注册响应后,会启动一个定时器并开始周期性地向 Master 发送心跳消息。
  5. Worker 的心跳消息中包含当前的负载状况、可用资源等信息。
  6. Master 接收到心跳消息后,更新该 Worker 的最近心跳时间,并根据需要对集群进行动态调整,例如添加新的任务或删除故障的 Worker。
  7. 如果 Master 在一段时间内没有收到某个 Worker 的心跳消息,它将把该 Worker 标记为失效,并将其相应的资源标记为可用以供后续使用。

具体原理如下:

  • Worker 通过网络向 Master 发送心跳消息,通常使用基于 TCP 的长连接。这些心跳消息可以包含有关 Worker 健康状况、资源利用情况等的信息。
  • Master 使用一个内部的心跳管理组件来处理接收到的心跳消息,并维护每个 Worker 的状态。它根据心跳消息的频率和时间戳来判断 Worker 是否正常运行。
  • 如果 Master 在预定的时间内没有收到 Worker 的心跳消息,它会将该 Worker 标记为失效并触发一系列的故障处理机制,例如重新分配任务给其他可用的 Worker。
  • Worker 定期发送心跳消息,以确保在网络故障、Worker 故障或其他问题发生时能够及时通知 Master。

通过心跳机制,Master 能够实时监控 Worker 的状态,并根据需要进行集群的动态管理和资源调度,从而实现高可用性和容错性。

使用Akka的Actor模拟Spark的Master和Worker工作机制

  1. worker注册到Master, Master完成注册,并回复worker注册成功。
  2. worker定时发送心跳,并在Master接收到。
  3. Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间。
  4. 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除。
  5. master worker 进行分布式部署(Linux系统)-》如何给maven项目打包->上传linux。
  • 创建SparkMaster类继承Actor特质,实现Receive方法,并定义对应的伴生对象,在伴生对象中创建SparkMaster-actor引用,并启动Actor发送消息。服务端Master对worker进行心跳监测,发现6秒内无法获取worker心跳,将异常的Worker的实例从HashMap中移除。若能正常获取到心跳,则获取心跳信息后更新心跳时间。定时保持心跳机制。

代码实现:

class SparkMaster extends Actor {//定义个hashMap,管理workers(所有worker的实例)val workers = mutable.Map[String, WorkerInfo]()override def receive: Receive = {case "start" => {println("master服务器启动了...")//这里开始。。self ! StartTimeOutWorker}case RegisterWorkerInfo(id, cpu, ram) => {//接收到worker注册信息if (!workers.contains(id)) {//创建WorkerInfo 对象val workerInfo = new WorkerInfo(id, cpu, ram)//加入到workersworkers += ((id, workerInfo))println("服务器的workers=" + workers)//回复一个消息,说注册成功sender() ! RegisteredWorkerInfo}}case HeartBeat(id) => {//更新对应的worker的心跳时间//1.从workers对应的HashMap中取出WorkerInfo,然后更新worker心跳时间val workerInfo = workers(id)workerInfo.lastHeartBeat = System.currentTimeMillis()println("master更新了 " + id + " 心跳时间...")}case StartTimeOutWorker => {println("开始了定时检测worker心跳的任务")import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 9000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. RemoveTimeOutWorker 发送的内容context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)}//对RemoveTimeOutWorker消息处理//这里需求检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除case RemoveTimeOutWorker => {//首先将所有的 workers 的 所有WorkerInfoval workerInfos = workers.valuesval nowTime = System.currentTimeMillis()//先把超时的所有workerInfo,删除即可workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeat) > 6000).foreach(workerInfo => workers.remove(workerInfo.id))println("当前有 " + workers.size + " 个worker存活的")}}
}object SparkMaster {def main(args: Array[String]): Unit = {//这里我们分析出有3个host,port,sparkMasterActorif (args.length != 3) {println("请输入参数 host port sparkMasterActor名字")sys.exit()}val host = args(0)val port = args(1)val name = args(2)//先创建ActorSystemval config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${host}|akka.remote.netty.tcp.port=${port}""".stripMargin)val sparkMasterSystem = ActorSystem("SparkMaster", config)//创建SparkMaster -actorval sparkMasterRef = sparkMasterSystem.actorOf(Props[SparkMaster], s"${name}")//启动SparkMastersparkMasterRef ! "start"}
}
  • 定义SparkWorker类继承Actor特质,实现Receive方法,在方法中实现向master发送注册信息的请求,获取到服务端Master注册成功的消息后,定义定时任务发送心跳包给Master。
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{//masterProxy是Master的代理/引用refvar masterPorxy :ActorSelection = _val id = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {println("preStart()调用")//初始化masterPorxymasterPorxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}")println("masterProxy=" + masterPorxy)}override def receive:Receive = {case "start" => {println("worker启动了")//发出一个注册消息masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)}case RegisteredWorkerInfo => {println("workerid= " + id + " 注册成功~")//当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 3000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. SendHeartBeat 发送的内容context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)}case SendHeartBeat =>{println("worker = " + id + "给master发送心跳")masterPorxy ! HeartBeat(id)}}
}object SparkWorker {def main(args: Array[String]): Unit = {if (args.length != 6) {println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")sys.exit()}val workerHost = args(0)val workerPort = args(1)val workerName = args(2)val masterHost = args(3)val masterPort = args(4)val masterName = args(5)val config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${workerHost}|akka.remote.netty.tcp.port=${workerPort}""".stripMargin)//创建ActorSystemval sparkWorkerSystem = ActorSystem("SparkWorker",config)//创建SparkWorker 的引用/代理val sparkWorkerRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s"${workerName}")//启动actorsparkWorkerRef ! "start"}
}
  • 分别定义发送注册信息的RegisterWorkerInfo的样例类,WorkerInfo消息类,定义注册成功的消息样例对象RegisteredWorkerInfo,心跳信息样例类HeartBeat,以及确认发送心跳信息样例对象SendHeartBeat,触发超时work的样例对象StartTimeOutWorker,移除超时worker的样例对象RemoveTimeOutWorker。

代码如下:

// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {var lastHeartBeat : Long = System.currentTimeMillis()
}// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker

运行效果:
在这里插入图片描述
通过这个案例我们可以深入理解Spark的Master和Worker的通讯机制,为了方便以后对Spark的底层源码的学习,命名的方式和源码保持一致.(如: 通讯消息类命名就是一样的);同时也加深了我们对主从服务心跳检测机制(HeartBeat)的理解,方便以后spark源码二次开发。

相关文章:

使用Akka的Actor模拟Spark的Master和Worker工作机制

使用Akka的Actor模拟Spark的Master和Worker工作机制 Spark的Master和Worker协调工作原理 在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程: Worker 启动后&#xff0c…...

文心一言api接入如何在你的项目里使用文心一言

文心一言api接入在项目里接入文心一言 一、百度文心一言API二、使用步骤1、接口2、请求参数3、请求参数示例4、接口 返回示例 三、 如何获取appKey和uid1、申请appKey:2、获取appKey和uid 四、重要说明 一、百度文心一言API 基于百度文心一言语言大模型的智能文本对话AI机器人…...

Python匿名函数lambda(R与Python第五篇)

目录 一、为什么要引入“lambda函数”? 二、匿名函数的两种用法 参考: 本文来源:《Python全案例学习与实践》(2019年9月出版,电子工业出版社) Python允许使用一种无名的函数,称其为匿名函数…...

【2023校园招聘】 钉钉AI应用开发平台开始校招拉~

【岗位职责】 负责钉钉AI Paas 产品化研发落地,包含但不限于: 1. 用户意图理解、任务规划、服务推荐等算法的设计和开发 2. 基于大模型落地各种落地应用,缩短大模型与真实应用场景的距离 3. 负责算法的工程化落地,包括算法的代…...

Linux系统gdb调试常用命令

GDB&#xff08;GNU调试器&#xff09;是一款常用的调试工具&#xff0c;用于调试C、C等编程语言的程序。以下是一些常用的GDB命令&#xff1a; 1. 启动程序&#xff1a; - gdb <executable>&#xff1a;启动GDB调试器&#xff0c;并加载可执行文件。 2. 设置断点&a…...

Sumo中Traci.trafficlight详解(上)

Sumo中Traci.trafficlight详解&#xff08;上&#xff09; 记录慢慢学习traci的每一天&#xff0c;希望也能帮到你 文章目录 Sumo中Traci.trafficlight详解&#xff08;上&#xff09;Traci.trafficlight信号灯参数讲解1.getAllProgramLogics(self,tlsID)2.getBlockingVehicle…...

手写Mybatis:第13章-通过注解配置执行SQL语句

文章目录 一、目标&#xff1a;注解配置执行SQL二、设计&#xff1a;注解配置执行SQL三、实现&#xff1a;注解配置执行SQL3.1 工程结构3.2 注解配置执行SQL类图3.3 脚本语言驱动器3.3.1 脚本语言驱动器接口3.3.2 XML语言驱动器 3.4 注解配置构建器3.4.1 定义增删改查注解3.4.2…...

spring security - 快速整合 springboot

1.引入依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spr…...

NPM 常用命令(二)

目录 1、npm bugs 1.1 配置 browser registry 2、npm cache 2.1 概要 2.2 详情 2.3 关于缓存设计的说明 2.4 配置 cache 3、 npm ci 3.1 描述 3.2 配置 install-strategy legacy-bundling global-style omit strict-peer-deps foreground-scripts ignore-s…...

ctfhub ssrf(3关)

文章目录 内网访问伪协议读取文件扫描端口 内网访问 根据该题目&#xff0c;是让我们访问127.0.0.1/falg.php&#xff0c;访问给出的链接后用bp抓包&#xff0c;修改URL&#xff0c;发送后得到flag&#xff1a; 伪协议读取文件 这题的让我们用伪协议&#xff0c;而网站的目录…...

跨源资源共享(CORS)Access-Control-Allow-Origin

1、浏览器的同源安全策略 没错&#xff0c;就是这家伙干的&#xff0c;浏览器只允许请求当前域的资源&#xff0c;而对其他域的资源表示不信任。那怎么才算跨域呢&#xff1f; 请求协议http,https的不同域domain的不同端口port的不同 好好好&#xff0c;大概就是这么回事啦&…...

【嵌入式软件开发 】学习笔记

本文主要记录 【嵌入式软件开发】 学习笔记&#xff0c;参考相关大佬的文章 1.RTOS 内功修炼笔记 RTOS内功修炼记&#xff08;一&#xff09;—— 任务到底应该怎么写&#xff1f; RTOS内功修炼记&#xff08;二&#xff09;—— 优先级抢占式调度到底是怎么回事&#xff1f;…...

CentOS 7上安装Python 3.11.5,支持Django

CentOS 7上安装Python 3.11.5,支持Django 今天安装django&#xff0c;报了“Django - deterministicTrue requires SQLite 3.8.3 or higher upon running python manage.py runserver”。查了一番资料&#xff0c;记录下来。 参考链接&#xff1a; 参考链接: Django的web项目…...

COMPFEST 15H「组合数学+容斥」

Problem - H - Codeforces 题意&#xff1a; 定义一个集合S为T的孩子是&#xff0c;对于S中的每一个元素x&#xff0c;在T中都能找到x1。 给定n&#xff0c;k&#xff0c;每一个集合中的元素x必须满足 1 < x < k 1<x<k 1<x<k且 c n t [ x ] < 1 cnt[x…...

react快速开始(三)-create-react-app脚手架项目启动;使用VScode调试react

文章目录 react快速开始(三)-create-react-app脚手架项目启动&#xff1b;使用VScode调试react一、create-react-app脚手架项目启动1. react-scripts2. 关于better-npm-runbetter-npm-run安装 二、使用VScode调试react1. 浏览器插件React Developer Tools2. 【重点】用 VSCode …...

【C++入门】string类常用方法(万字详解)

目录 1.STL简介1.1什么是STL1.2STL的版本1.3STL的六大组件1.4STL的缺陷 2.string类的使用2.1C语言中的字符串2.2标准库中的string类2.3string类的常用接口说明 &#xff08;只讲解最常用的接口&#xff09;2.3.1string类对象的常见构造2.3.2 string类对象的容量操作2.3.3string…...

大数据错误

question1 : Could not locate Hadoop executable: D:\hadoop-3.3.1\bin\winutils.exe - 【已解决】Could not locate executable E:\Hadoop\bin\winutils.exe in the Hadoop binaries._could not locate executable e:\hadoop-3.3.1\bin\wi_君问归期魏有期的博客-CSDN博客 q…...

【Node.js】Express-Generator:快速生成Express应用程序的利器

在Node.js世界中&#xff0c;Express是一个广泛使用的、强大的Web应用程序框架。它为开发者提供了一系列的工具和选项&#xff0c;使得创建高效且可扩展的Web应用程序变得轻而易举。然而&#xff0c;对于初学者来说&#xff0c;配置和初始化Express应用程序可能会有些困难。为了…...

SpringMVC的工作流程及入门

目录 一、概述 ( 1 ) 是什么 ( 2 ) 作用 二、工作流程 ( 1 ) 流程 ( 2 ) 步骤 三、入门实例 ( 1 ) 入门实例 ( 2 ) 静态资源处理 给我们带来的收获 一、概述 ( 1 ) 是什么 SpringMVC是一个基于Java的Web应用开发框架&#xff0c;它是Spring Framework的一部…...

logging.level的含义及设置 【java 日志 (logback、log4j)】

日志级别 trace<debug<info<warn<error<fatal 常用的有&#xff1a;debug&#xff0c;info&#xff0c;warn&#xff0c;error 通常我们想设置日志级别&#xff0c;会用到 logging.level.rootinfo logging.level设置日志级别&#xff0c;后面跟生效的区域。r…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口&#xff08;适配服务端返回 Token&#xff09; export const login async (code, avatar) > {const res await http…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互

引擎版本&#xff1a; 3.8.1 语言&#xff1a; JavaScript/TypeScript、C、Java 环境&#xff1a;Window 参考&#xff1a;Java原生反射机制 您好&#xff0c;我是鹤九日&#xff01; 回顾 在上篇文章中&#xff1a;CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...

Neo4j 集群管理:原理、技术与最佳实践深度解析

Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

【HTML-16】深入理解HTML中的块元素与行内元素

HTML元素根据其显示特性可以分为两大类&#xff1a;块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

Java入门学习详细版(一)

大家好&#xff0c;Java 学习是一个系统学习的过程&#xff0c;核心原则就是“理论 实践 坚持”&#xff0c;并且需循序渐进&#xff0c;不可过于着急&#xff0c;本篇文章推出的这份详细入门学习资料将带大家从零基础开始&#xff0c;逐步掌握 Java 的核心概念和编程技能。 …...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

MySQL 部分重点知识篇

一、数据库对象 1. 主键 定义 &#xff1a;主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 &#xff1a;确保数据的完整性&#xff0c;便于数据的查询和管理。 示例 &#xff1a;在学生信息表中&#xff0c;学号可以作为主键&#xff…...

苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会

在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...

HTML前端开发:JavaScript 获取元素方法详解

作为前端开发者&#xff0c;高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法&#xff0c;分为两大系列&#xff1a; 一、getElementBy... 系列 传统方法&#xff0c;直接通过 DOM 接口访问&#xff0c;返回动态集合&#xff08;元素变化会实时更新&#xff09;。…...