当前位置: 首页 > news >正文

Spring Boot集成Akka Cluster快速入门Demo

1.什么是Akka Cluster?

Akka Cluster将多个JVM连接整合在一起,实现消息地址的透明化和统一化使用管理,集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序,部署到很多JVM上去实现程序的分布式并行运算(单机也可以起很多节点构成集群)。更重要的是, Akka Cluster集群构建与Actor编程没有直接的联系,集群构建是在ActorSystem层面上,实现了Actor消息地址的透明化,无需考虑目标运行环节是否分布式,可以按照正常的Actor编程模式进行开发。 我们知道,分布式集群是由若干节点组成的,那么节点的发现及状态管理是分布式系统一个比较重要的任务。Akka Cluster中将节点的生命周期划分为:

member-states

 

  • joining - 当尝试加入集群时的初始状态
  • up - 加入集群后的正常状态
  • leaving / exiting - 节点退出集群时的中间状态
  • down - 集群无法感知某节点后,将其标记为down
  • removed - 从集群中被删除,以后也无法再加入集群

其实当参数akka.cluster.allow-weakly-up-members启用时(默认是启用的),还有个weakly up,它是用于集群出现分裂时,集群无法收敛,则leader无法将状态置为up的临时状态。这个后面再解释。 图中还有两个特殊的名词:

  • fd* - 这个表示akka的错误检测机制Faiulre Detector被触发后,将节点标记为unreachable
  • unreachable* - unreachable不是一个真正的节点状态,更多的像是一个flag,用来描述集群无法与该节点进行通讯。当错误检测机制侦测到这个节点又能正常通讯时,会移除这个flag。

市面上大多数产品的分布式管理一般用的是注册中心机制,例如zk、consul或etcd。其实是节点把自己的信息注册到所使用的注册中心里,而master通过接受注册中心的通知得知新节点信息。显然本质上是一种master/slave的架构。这种架构有两个问题:

  1. master节点一般是单一的,一旦挂了影响就比较大(所以很多master都采用了HA机制),也就是所谓的系统单点故障;
  2. 通常节点的地址发现是要走master去获取的,当系统并发大时,master节点就可能成为性能瓶颈,即单点性能瓶颈。

Akka可能就是考虑这两点,采用了P2P的模式,这样任何一个节点都可以作为”master”,任何的节点都可以用来寻找其他节点地址。那它是怎么做到的呢?答案是Gossip协议和CRDT。这里不做过多解释,感兴趣的话可以自己去翻阅相关介绍

2.代码工程

实验目的

搭建一个简单akka custer集群

pom.xml

<!-- Akka Cluster dependency -->
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-cluster-typed_2.13</artifactId><version>2.6.0</version>
</dependency>

cluster

node1.conf

akka {actor {provider = "cluster"  }remote {artery {canonical.hostname = "127.0.0.1"canonical.port = 2551 }}cluster {seed-nodes = ["akka://ClusterSystem@127.0.0.1:2551","akka://ClusterSystem@127.0.0.1:2552"]}
}

node2.conf

akka {actor {provider = "cluster"}remote {artery {canonical.hostname = "127.0.0.1"canonical.port = 2552  }}cluster {seed-nodes = ["akka://ClusterSystem@127.0.0.1:2551","akka://ClusterSystem@127.0.0.1:2552"]}
}

集群监听器

package com.et.akka.cluster;import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;
import akka.cluster.ClusterEvent;public class ClusterListener extends AbstractBehavior<ClusterEvent.ClusterDomainEvent> {public ClusterListener(ActorContext<ClusterEvent.ClusterDomainEvent> context) {super(context);Cluster cluster = Cluster.get(context.getSystem());cluster.subscriptions().tell(Subscribe.create(getContext().getSelf(), ClusterEvent.ClusterDomainEvent.class));}@Overridepublic Receive<ClusterEvent.ClusterDomainEvent> createReceive() {return newReceiveBuilder().onMessage(ClusterEvent.MemberUp.class, this::onMemberUp).onMessage(ClusterEvent.MemberRemoved.class, this::onMemberRemoved).onAnyMessage(event -> {System.out.println("Received cluster event: " + event);return this;}).build();}private Behavior<ClusterEvent.ClusterDomainEvent> onMemberUp(ClusterEvent.MemberUp memberUp) {System.out.println("Member is Up: " + memberUp.member());return this;}private Behavior<ClusterEvent.ClusterDomainEvent> onMemberRemoved(ClusterEvent.MemberRemoved memberRemoved) {System.out.println("Member is Removed: " + memberRemoved.member());return this;}public static Behavior<ClusterEvent.ClusterDomainEvent> create() {return Behaviors.setup(ClusterListener::new);}
}

启动集群

package com.et.akka.cluster;import akka.actor.typed.ActorSystem;
import akka.cluster.ClusterEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;import java.io.File;public class ClusterApp {public static void main(String[] args) {Config configNode1 = ConfigFactory.parseFile(new File("D:/IdeaProjects/ETFramework/akka/src/main/resources/node1.conf")).withFallback(ConfigFactory.load());ActorSystem<ClusterEvent.ClusterDomainEvent> systemNode1 = ActorSystem.create(ClusterListener.create(), "ClusterSystem", configNode1);System.out.println("Node 1 started with config from node1.conf");Config configNode2 = ConfigFactory.parseFile(new File("D:/IdeaProjects/ETFramework/akka/src/main/resources/node2.conf")).withFallback(ConfigFactory.load());ActorSystem<ClusterEvent.ClusterDomainEvent> systemNode2 = ActorSystem.create(ClusterListener.create(), "ClusterSystem", configNode2);System.out.println("Node 2 started with config from node2.conf");}
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(akka)

3.测试

启动集群(执行ClusterApp里面的main方法),查看日志可以看到2个节点都起来了

23:00:19.201 [ClusterSystem-akka.actor.default-dispatcher-6] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Welcome from [akka://ClusterSystem@127.0.0.1:2551]
Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2551, status = Up)
Received cluster event: MemberJoined(Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Joining))
Received cluster event: LeaderChanged(Some(akka://ClusterSystem@127.0.0.1:2551))
Received cluster event: RoleLeaderChanged(dc-default,Some(akka://ClusterSystem@127.0.0.1:2551))
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem@127.0.0.1:2551, akka://ClusterSystem@127.0.0.1:2552))
Received cluster event: ReachabilityChanged()
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem@127.0.0.1:2551, akka://ClusterSystem@127.0.0.1:2552))
Received cluster event: ReachabilityChanged()
23:00:19.645 [ClusterSystem-akka.actor.default-dispatcher-5] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2552] to [Up]
Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Up)
Received cluster event: SeenChanged(false,Set(akka://ClusterSystem@127.0.0.1:2551))
Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Up)
Received cluster event: ReachabilityChanged()
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem@127.0.0.1:2551, akka://ClusterSystem@127.0.0.1:2552))
Received cluster event: ReachabilityChanged()

4.引用

  • Cluster Specification • Akka Documentation

相关文章:

Spring Boot集成Akka Cluster快速入门Demo

1.什么是Akka Cluster&#xff1f; Akka Cluster将多个JVM连接整合在一起&#xff0c;实现消息地址的透明化和统一化使用管理&#xff0c;集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序&#xff0c;部署到很多JVM上去实现程序的分布式并行运算&#xf…...

django学习入门系列之第十点《A 案例: 员工管理系统10》

文章目录 12 管理员操作12.4 密码加密12.5 获取对象&#xff08;防止id错误--编辑界面等&#xff09;12.6 编辑管理员12.7 重置密码 往期回顾 12 管理员操作 12.4 密码加密 密码不应该以明文的方式直接存储到数据库&#xff0c;应该加密才放进去 定义一个md5的方法&#xff…...

Unity实战案例全解析:PVZ 植物卡片状态分析

Siki学院2023的PVZ免费了&#xff0c;学一下也坏 卡片状态 卡片可以有三种状态&#xff1a; 1.阳光足够&#xff0c;&#xff08;且cd好了可以种植&#xff09; 2.阳光不够&#xff0c;&#xff08;cd&#xff1f;好了&#xff1a;没好 &#xff08;三目运算符&#xff09;&…...

判断变量是否为有限数字(非无穷大或NaN)math.isfinite() 判断变量是否为无穷大(正无穷大或负无穷大)math.isinf()

【小白从小学Python、C、Java】 【考研初试复试毕业设计】 【Python基础AI数据分析】 判断变量是否为有限数字&#xff08;非无穷大或NaN&#xff09; math.isfinite() 判断变量是否为无穷大&#xff08;正无穷大或负无穷大&#xff09; math.isinf() 请问关于以下代码表述错误…...

idea使用阿里云服务器运行jar包

说明&#xff1a;因为我用的阿里云服务器不是自己的&#xff0c;所以一些具体的操作可能不太全面。看到一个很完整的教程&#xff0c;供参考。 0. 打包项目 这里使用的是maven打包。 在pom.xml中添加以下模块。 <build><plugins><plugin><groupId>org…...

解决nginx代理SSE接口的响应没有流式返回

目录 现象原来的nginx配置解决 现象 前后端分离的项目&#xff0c;前端访问被nginx反向代理的后端SSE接口&#xff0c;预期是流式返回&#xff0c;但经常是很久不响应&#xff0c;一响应全部结果一下子都返回了。查看后端项目的日志&#xff0c;响应其实是流式产生的。推测是n…...

11 - TCPClient实验

在上一个章节的UDP通信测试中&#xff0c;尽管通信的实现过程相对简洁&#xff0c;但出现了通信数据丢包的问题。因此&#xff0c;本章节将基于之前建立的WIFI网络连接&#xff0c;构建一个基础的TCPClient连接机制。我们利用网络调试助手工具来发送数据&#xff0c;测试网络通…...

React框架搭建,看这一篇就够了,看完你会感谢我

传统搭建框架的方式 在2024年以前&#xff0c;我们构建框架基本上采用官方脚手架&#xff0c;但是官方脚手架其实大概率都不符合我们的项目要求&#xff0c;搭建完了以后往往需要再继续集成一些第三方的包。这时候又会碰到一些版本冲突&#xff0c;配置教程等&#xff0c;往往…...

【rust】rust条件编译

在c语言中&#xff0c;条件编译是一个非常好用的功能&#xff0c;那么rust中如何实现条件编译呢? rust的条件编译需要两个部分&#xff0c;一个是fratures&#xff0c;另一个是cfg。Cargo feature是一个非常强大的功能&#xff0c;可以提供条件编译和可选依赖项的高级特性&…...

一键文本提示实现图像对象高质量剪切与透明背景生成

按照提示词裁剪 按照边框裁剪 要实现您描述的功能,即通过一个文本提示就能自动从图片中切割出指定的对象并生成一个带有透明背景的新图像,这需要一个结合了先进的计算机视觉技术和自然语言处理能力的系统。这样的系统可以理解输入的文本指令,并将其转化为对图像内容的精确分…...

游戏客服精华回复快捷语大全

以黑神话悟空为代表的国内的游戏行业&#xff0c;最近发展非常迅猛&#xff0c;大量游戏玩家需要足够的游戏客服支持&#xff0c;这里整理了游戏客服精华回复快捷语&#xff0c;涵盖了接待客户&#xff0c;游戏级数&#xff0c;游戏外挂&#xff0c;游戏要求&#xff0c;游戏特…...

国内版Microsoft Teams 基础版部署方案

目录 前言Microsoft Teams简介部署前的准备 环境需求账户和许可网络要求部署步骤 初步配置和设置安装Microsoft Teams客户端Teams管理中心配置用户管理 用户添加与分配角色与权限管理通讯与协作 团队和频道管理即时消息和会议功能文件共享与协作安全性与合规性 数据保护措施合规…...

计算机网络 ---- OSI参考模型TCP/IP模型

目录 一、OSI参考模型 1.1 学习路线 1.2 OSI参考模型和TCP/IP模型 1.3 具体设备与具体层次对应关系 1.3.1 物理层 1.3.2 数据链路层 1.3.3 网络层 1.3.4 传输层 1.3.5 会话层、表示层、应用层 1.4 各层次数据传输单位 二、TCP/IP模型 2.1 学习路线 2.2 TCP/I…...

在Windows环境下部署Java的Web项目集成工具的整体流程和详细步骤

好的&#xff0c;以下是一份关于“Windows环境下部署Java的Web项目集成工具”的手把手操作流程&#xff0c;由浅入深&#xff0c;先整体后分部&#xff1a; 一、引言 在现代软件开发中&#xff0c;Java作为一种广泛应用的编程语言&#xff0c;其Web项目开发尤为常见。为了提高…...

9.18作业

提示并输入一个字符串&#xff0c;统计该字符串中字母、数字、空格、其他字符的个数并输出 代码展示 #include <iostream>using namespace std;int main() {string str;int countc 0; // 字母计数int countn 0; // 数字计数int count 0; // 空格计数int counto 0;…...

【算法】滑动窗口—最小覆盖子串

题目 ”最小覆盖子串“问题&#xff0c;难度为Hard&#xff0c;题目如下&#xff1a; 给你两个字符串 S 和 T&#xff0c;请你在 S 中找到包含 T 中全部字母的最短子串。如果 S 中没有这样一个子串&#xff0c;则算法返回空串&#xff0c;如果存在这样一个子串&#xff0c;则可…...

“Fast-forward“ in git-pull result

当你执行 git pull 并且结果显示 Fast-forward 时&#xff0c;这意味着你的本地分支可以直接快进到远程分支的最新提交&#xff0c;没有任何冲突或者需要合并的内容。具体来说&#xff0c;Fast-forward 是一种合并方式&#xff0c;它的特点是将当前分支的指针直接移动到远程分支…...

Oracle(133)如何创建表空间(Tablespace)?

在Oracle数据库中&#xff0c;表空间&#xff08;Tablespace&#xff09;是存储数据的逻辑单位&#xff0c;它由一个或多个数据文件组成。表空间是数据库数据管理的基本结构&#xff0c;了解如何创建表空间对于数据库管理员至关重要。 创建表空间的基本语法 创建表空间的基本…...

Linux中权限和指令

&#x1f4a5;1、Linux基本指令 1.1 mv 指令 mv指令是move的缩写&#xff0c;用来移动或重命名文件、目录&#xff0c;经常用来备份文件或目录。 mv old_name new_name&#xff1a; 重命名文件或目录mv file /path/to/directory&#xff1a; 移动文件到指定目录 roothcss-ecs…...

本地镜像发布到阿里云

本地镜像发布到阿里云 登录阿里云容器镜像服务配置 Docker 登录阿里云容器镜像服务标记你的 Docker 镜像推送镜像到阿里云验证使用阿里云镜像注意事项 将 Docker 本地镜像发布到阿里云&#xff08;Alibaba Cloud&#xff09;容器镜像服务&#xff08;Container Registry&#x…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

服务器硬防的应用场景都有哪些?

服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式&#xff0c;避免服务器受到各种恶意攻击和网络威胁&#xff0c;那么&#xff0c;服务器硬防通常都会应用在哪些场景当中呢&#xff1f; 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案

目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后&#xff0c;迭代器会失效&#xff0c;因为顺序迭代器在内存中是连续存储的&#xff0c;元素删除后&#xff0c;后续元素会前移。 但一些场景中&#xff0c;我们又需要在执行删除操作…...

数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !

我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...

uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)

UniApp 集成腾讯云 IM 富媒体消息全攻略&#xff08;地理位置/文件&#xff09; 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型&#xff0c;核心实现方式&#xff1a; 标准消息类型&#xff1a;直接使用 SDK 内置类型&#xff08;文件、图片等&#xff09;自…...

c# 局部函数 定义、功能与示例

C# 局部函数&#xff1a;定义、功能与示例 1. 定义与功能 局部函数&#xff08;Local Function&#xff09;是嵌套在另一个方法内部的私有方法&#xff0c;仅在包含它的方法内可见。 • 作用&#xff1a;封装仅用于当前方法的逻辑&#xff0c;避免污染类作用域&#xff0c;提升…...

ubuntu22.04 安装docker 和docker-compose

首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...

Neko虚拟浏览器远程协作方案:Docker+内网穿透技术部署实践

前言&#xff1a;本文将向开发者介绍一款创新性协作工具——Neko虚拟浏览器。在数字化协作场景中&#xff0c;跨地域的团队常需面对实时共享屏幕、协同编辑文档等需求。通过本指南&#xff0c;你将掌握在Ubuntu系统中使用容器化技术部署该工具的具体方案&#xff0c;并结合内网…...