Apache Curator 分布式锁的介绍,以及案例
可重入锁(InterProcessMutex):这种锁允许同一个客户端多次获取同一把锁而不会被阻塞,类似于Java中的ReentrantLock。它通过在Zookeeper的指定路径下创建临时序列节点来实现锁的功能。如果获取锁失败,当前线程会监听前一个节点的变动情况并等待,直到被唤醒或超时
package com.zz.lock;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;public class CuratorReentrantLockExample {private final String lockPath = "/curator/lock"; // 锁的Zookeeper路径private CuratorFramework client; // Curator客户端private InterProcessMutex mutex; // 可重入锁// 初始化Curator客户端和可重入锁public void init() {// 设置Zookeeper服务地址String connectString = "192.168.200.130:2181";// 设置重试策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);// 创建Curator客户端client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);client.start();// 创建可重入锁mutex = new InterProcessMutex(client, lockPath);}// 执行业务逻辑,使用可重入锁public void executeBusinessLogic() {try {// 获取锁mutex.acquire();// 模拟业务逻辑System.out.println("当前线程获得锁,开始执行业务逻辑。");// 模拟重入逻辑reentrantLock();// 模拟业务逻辑System.out.println("当前线程完成业务逻辑执行。");} catch (Exception e) {e.printStackTrace();} finally {// 确保释放锁if (mutex.isAcquiredInThisProcess()) {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}}}// 模拟可重入逻辑public void reentrantLock() {try {// 再次获取同一把锁mutex.acquire();System.out.println("当前线程重入成功,再次获得同一把锁。");// 模拟一些操作...} catch (Exception e) {e.printStackTrace();} finally {// 释放锁if (mutex.isAcquiredInThisProcess()) {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}}}// 程序入口public static void main(String[] args) {CuratorReentrantLockExample example = new CuratorReentrantLockExample();example.init();// 执行业务逻辑example.executeBusinessLogic();}
}
不可重入锁(InterProcessSemaphoreMutex):与可重入锁类似,但不允许同一个线程在持有锁的情况下再次获取该锁。这种锁很容易导致死锁,使用时需要特别注意
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;public class CuratorReentrantLockExample {private final String lockPath = "/curator/lock"; // 锁的Zookeeper路径private CuratorFramework client; // Curator客户端private InterProcessMutex mutex; // 可重入锁// 初始化Curator客户端和可重入锁public void init() {// 设置Zookeeper服务地址String connectString = "127.0.0.1:2181";// 设置重试策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);// 创建Curator客户端client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);client.start();// 创建可重入锁mutex = new InterProcessMutex(client, lockPath);}// 执行业务逻辑,使用可重入锁public void executeBusinessLogic() {try {// 获取锁mutex.acquire();// 模拟业务逻辑System.out.println("当前线程获得锁,开始执行业务逻辑。");// 模拟重入逻辑reentrantLock();// 模拟业务逻辑System.out.println("当前线程完成业务逻辑执行。");} catch (Exception e) {e.printStackTrace();} finally {// 确保释放锁if (mutex.isAcquiredInThisProcess()) {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}}}// 模拟可重入逻辑public void reentrantLock() {try {// 再次获取同一把锁mutex.acquire();System.out.println("当前线程重入成功,再次获得同一把锁。");// 模拟一些操作...} catch (Exception e) {e.printStackTrace();} finally {// 释放锁if (mutex.isAcquiredInThisProcess()) {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}}}// 程序入口public static void main(String[] args) {CuratorReentrantLockExample example = new CuratorReentrantLockExample();example.init();// 执行业务逻辑example.executeBusinessLogic();}
}
读写锁(InterProcessReadWriteLock):提供一对相关的锁,读锁可以被多个读操作共享,而写锁则独占。一个拥有写锁的线程可以获取读锁,但读锁不能升级为写锁。这种锁是公平的,保证用户按请求顺序获取锁。(读写锁在逻辑上有点像数据库的事务)
package com.zz.lock;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;public class CuratorReadWriteLockExample {private final String lockPath = "/curator/read-write-lock"; // 锁的Zookeeper路径private CuratorFramework client; // Curator客户端private InterProcessReadWriteLock lock; // 读写锁// 初始化Curator客户端和读写锁public void init() {// 设置Zookeeper服务地址String connectString = "192.168.200.130:2181";// 设置重试策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);// 创建Curator客户端client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);client.start();// 创建读写锁lock = new InterProcessReadWriteLock(client, lockPath);}// 执行读操作public void executeReadOperation() throws Exception {try {// 获取读锁lock.readLock().acquire();// 模拟读操作System.out.println("读操作开始,线程安全地读取数据。");// 模拟读操作延迟Thread.sleep(3000);System.out.println("读操作结束。");} catch (Exception e) {e.printStackTrace();} finally {// 释放读锁if (lock.readLock().isAcquiredInThisProcess()) {lock.readLock().release();}}}// 执行写操作public void executeWriteOperation() throws Exception {try {// 获取写锁lock.writeLock().acquire();// 模拟写操作System.out.println("写操作开始,线程独占资源进行写入。");// 模拟写操作延迟Thread.sleep(3000);System.out.println("写操作结束,更新了数据。");} catch (Exception e) {e.printStackTrace();} finally {// 释放写锁if (lock.writeLock().isAcquiredInThisProcess()) {lock.writeLock().release();}}}// 程序入口public static void main(String[] args) {CuratorReadWriteLockExample example = new CuratorReadWriteLockExample();example.init();// 启动多个读操作线程for (int i = 0; i < 5; i++) {new Thread(() -> {try {example.executeReadOperation();} catch (Exception e) {e.printStackTrace();}}).start();}// 启动写操作线程new Thread(() -> {try {example.executeWriteOperation();} catch (Exception e) {e.printStackTrace();}}).start();}
}
联锁(InterProcessMultiLock):这是一个锁的容器,可以同时获取多个锁。如果获取过程中任何一个锁请求失败,已获取的所有锁都会被释放。这在需要同时持有多个锁执行操作的场景中非常有用
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import java.util.Arrays;
import java.util.List;public class InterProcessMultiLockExample {private CuratorFramework client;private List<String> lockPaths = Arrays.asList("/lock1", "/lock2", "/lock3");private List<InterProcessLock> locks = lockPaths.stream().map(path -> new InterProcessMutex(client, path)).collect(Collectors.toList());private InterProcessMultiLock multiLock;public void init() {String connectString = "127.0.0.1:2181";RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);client.start();multiLock = new InterProcessMultiLock(locks);}public void executeProtectedOperation() {try {multiLock.acquire();// 所有锁都已获取,执行你的业务逻辑System.out.println("All locks acquired, performing business logic.");// 业务逻辑...} catch (Exception e) {e.printStackTrace();} finally {// 确保释放所有锁multiLock.release();}}public static void main(String[] args) {InterProcessMultiLockExample example = new InterProcessMultiLockExample();example.init();example.executeProtectedOperation();}
}
信号量(InterProcessSemaphoreV2):Curator提供了一种信号量实现,可以控制同时访问某个资源的线程数量。通过acquire方法请求获取信号量,使用完成后通过returnAll方法释放
信号量(Semaphore)确实可以起到限流的作用。在分布式系统中,信号量是一种常用的限流工具,它通过控制同时访问某个资源或执行某个操作的线程数量来实现限流。以下是信号量实现限流的几个关键点:1. **资源限制**:信号量通过一个计数器来限制可用资源的数量。例如,如果你有10个停车位,你可以设置信号量的初始值为10。2. **请求处理**:当一个线程需要访问资源时,它首先尝试从信号量中获取一个“许可”(lease)。如果信号量的计数器大于0,该线程成功获取一个许可,然后继续执行。否则,线程将被阻塞,直到其他线程释放资源。3. **释放资源**:线程完成资源访问后,必须释放它获取的许可,通过将许可返还给信号量来实现。这会将信号量的计数器增加1,允许其他等待的线程获取许可。4. **公平性**:信号量通常是公平的,意味着线程将按照它们请求许可的顺序来获得它们。这有助于避免某些线程长时间等待访问资源。5. **跨JVM共享**:在分布式系统中,不同的进程可能在不同的JVM中运行。Apache Curator 提供的 `InterProcessSemaphoreV2` 允许跨JVM共享信号量状态,因此所有相关进程都能协调地访问共享资源。6. **自动资源回收**:如果持有信号量许可的线程或进程崩溃,Curator 会自动释放该许可,确保资源不会被永久占用,其他线程可以继续获取该资源。通过这种方式,信号量可以有效地控制对共享资源的并发访问,防止系统过载,从而实现限流。这在许多场景下都非常有用,比如数据库连接池、线程池、外部服务调用等。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import org.apache.curator.retry.ExponentialBackoffRetry;public class InterProcessSemaphoreV2Demo {private static final String PATH = "/semaphore/path";private static CuratorFramework client;public static void main(String[] args) throws Exception {// 初始化Curator客户端client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2));client.start();// 创建InterProcessSemaphoreV2实例,设置最大租约数为5InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, 5);// 线程示例,模拟同时请求信号量的多个线程for (int i = 0; i < 10; i++) {new Thread(new SemaphoreTask(semaphore)).start();}// 等待一段时间,让线程执行Thread.sleep(10000);// 关闭客户端连接client.close();}static class SemaphoreTask implements Runnable {private final InterProcessSemaphoreV2 semaphore;public SemaphoreTask(InterProcessSemaphoreV2 semaphore) {this.semaphore = semaphore;}@Overridepublic void run() {try {Lease lease = semaphore.acquire();System.out.println(Thread.currentThread().getName() + " acquired a lease.");// 模拟业务逻辑处理Thread.sleep(3000);// 释放信号量租约semaphore.returnLease(lease);System.out.println(Thread.currentThread().getName() + " returned a lease.");} catch (Exception e) {e.printStackTrace();}}}
}
分布式锁的实现原理:Curator的分布式锁通常是基于Zookeeper的临时顺序节点来实现的。当多个客户端尝试获取锁时,Zookeeper会为它们创建顺序节点,并让它们按照节点的序号依次尝试获取锁。未获取到锁的客户端会监听前一个序号的节点,一旦前一个节点释放锁,监听的客户端就会尝试获取锁
相关文章:
Apache Curator 分布式锁的介绍,以及案例
可重入锁(InterProcessMutex):这种锁允许同一个客户端多次获取同一把锁而不会被阻塞,类似于Java中的ReentrantLock。它通过在Zookeeper的指定路径下创建临时序列节点来实现锁的功能。如果获取锁失败,当前线程会监听前一…...
自动化测试 — selenium + Java
什么是自动化测试 将人为驱动的测试行为转化为机器执行的过程。 自动化测试包括UI 自动化,接口自动化,单元测试自动化。按照这个金字塔模型来进行自动化测试规划,可以产生最佳的自贡话测试产出投入比(ROI ),…...
【SpringBoot系列】接口参数的默认值与必要性
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
茶余饭后(五)
真正出类拔萃的人 往往都是狠角色, 他们具备着一种独特的特质 那就是: 目标清晰 意志如铁 底线分明 同时手段又极为高明 且勤奋不屑 在处于劣势时 他们表现的极为谦逊和低调 像一只温顺无害的小羊羔 然而一旦时机成熟 他们便会毫不犹豫的展现出强…...
【网络编程详解】
🌈个人主页:努力学编程’ ⛅个人推荐: c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构,刷题刻不容缓:点击一起刷题 🌙心灵鸡汤:总有人要赢,为什么不能是我呢 🔥…...
C# winform三层架构 实现增删改查( 显示数据,查询数据 显示,查询篇)
一.留言 上一篇讲解了如何去添加数据,那么本章节我们来做,添加数据后显示,以及咋现有的数据里,查询我们所需要的数据。 二.显示 首先我们看上一篇更新,我们在添加成功后跳转页面显示数据,那么跳转代码只…...
Apache Kylin 系列入门教程
Apache Kylin 是一款开源的分布式分析引擎,主要用于提供SQL接口及多维分析(OLAP)能力以支持超大规模数据集。它能在亚秒级时间内完成PB级别的数据查询。本文将带你一步步了解如何安装、配置和使用Apache Kylin来构建数据仓库,并执…...
如何识别并防御漏洞扫描类攻击
随着网络安全威胁的不断演变,漏洞扫描已成为黑客常用的手段之一,旨在发现目标系统中的弱点以便进行后续攻击。高防服务作为一种专业的安全防护措施,能够在一定程度上识别并阻止这类攻击行为。本文将深入探讨高防服务是如何识别并防御漏洞扫描…...
冷思考:低代码的AI Agent构建平台能创造价值吗?
当前AI 圈中热点讨论的产品,除了以ChatGPT为代表的Chatbot领域,以及以Character.ai 为代表的AI虚拟社交领域,另一个热度较高的领域就是AI Agent领域。 大模型发展到今天,已经基本达成了一个共识:错综复杂的工作任务无…...
Spring Boot如何自定义注解?
1.什么是注解 注解(Annotation),也叫元数据。一种代码级别的说明。它是JDK1.5及以后版本引入的一个特性,与类、接口、枚举是在同一个层次。它可以声明在包、类、字段、方法、局部变量、方法参数等的前面,用来对这些元…...
gin框架传入的gin.context参数是池化的
1. gin.context参数不但是池化的,而且是指针 2. 但是gin.context又实现了context的接口。因此,可以当作context去使用 3. 这就会导致一个很严重的问题: 1. 池化导致了复用后的ctx将会将之前使用的ctx中的内容进行覆盖。 2. 实现了context接…...
AWS注册是否必须使用美元银行卡
亚马逊网络服务(AWS)作为全球领先的云计算平台,吸引了众多企业和个人用户。然而,不少人在注册AWS账户时会产生疑问:是否必须使用美元银行卡?实际上,这种说法并不准确。虽然AWS的主要结算货币是美元,但用户在注册和使用过程中有多种支付方式可供选择。我们结合九河云的分析来告…...
Spring IOC 注入的3种方式
Spring IOC 注入的3种方式 1. 构造器注入(Constructor Injection)2. Setter方法注入(Setter Injection)3. 字段注入(Field Injection) 💖The Begin💖点点关注,收藏不迷路…...
无人机影像基于机器学习的遥感反演及其结果可视化,定量遥感反演结果出图,相关性分析,指标筛选,特征选择
无人机影像或者卫星遥感反演分类模型的建立,反演模型的可视化制图出图,相关性分析,指标筛选,特征选择。代码太多,可企鹅联系: 指标的相关性分析。572 特征选择,贡献性最大的特征。412 LAI反演&…...
Eclipse插件之Java Dependency Viewer(显示类和包的关系图)
Java Dependency Viewer 插件的作用 Eclipse插件Java Dependency Viewer是一个为Java项目提供依赖关系可视化功能的工具。 在复杂的Java项目中,理解和分析类与类之间、包与包之间的依赖关系是非常有用的。Java Dependency Viewer插件通过生成依赖关系图,…...
H5小游戏出海,如何流量变现?
根据数据显示, 90%的轻度休闲游戏收入来自广告,即IAA(In-App Advertising)。使用这种形式进行变现的游戏类型大多以超休闲游戏为主,玩法简单、游戏内容轻度、风格简洁、游戏时间碎片化且即时娱乐性较高,收益…...
轻空间六大专利优势:引领气膜建筑新时代
在绿色建筑和科技创新的驱动下,轻空间不断突破传统建筑的限制,推出了一系列具有前瞻性和高性能的专利技术。通过这些技术,轻空间不仅为建筑行业注入了新动力,也为未来的气膜建筑设定了更高的标准。 低碳建材:“clearsk…...
LeetCode-day37-2940. 找到 Alice 和 Bob 可以相遇的建筑
LeetCode-day37-2940. 找到 Alice 和 Bob 可以相遇的建筑 题目描述示例示例1:示例2: 思路代码 题目描述 给你一个下标从 0 开始的正整数数组 heights ,其中 heights[i] 表示第 i 栋建筑的高度。 如果一个人在建筑 i ,且存在 i &…...
unity 判断平台
原文链接 Unity中判断平台的方法 Unity提供了一些方法来判断当前运行的平台,其中包括了判断是否为i0S平台。以下是几种常用的方法1.Application.platform Applicaion,platom 是Unity中的一个枚举类型,用于表示当前运行的平台。可以通过比较 Apication,p…...
PyCharm找不到Python了咋办
Python发生了重装的,且新的路径和原有路径不同,就会出现如下的错误: 解决办法: 点开PyCharm菜单的File/Setting 然后: 有上图的提示,说明需要将原来的venv进行清空。 如此操作之后,原来的红色…...
智慧医疗能源事业线深度画像分析(上)
引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
Ascend NPU上适配Step-Audio模型
1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统,支持多语言对话(如 中文,英文,日语),语音情感(如 开心,悲伤)&#x…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
JS设计模式(4):观察者模式
JS设计模式(4):观察者模式 一、引入 在开发中,我们经常会遇到这样的场景:一个对象的状态变化需要自动通知其他对象,比如: 电商平台中,商品库存变化时需要通知所有订阅该商品的用户;新闻网站中࿰…...
