Apache Curator 分布式锁的介绍,以及案例
可重入锁(InterProcessMutex):这种锁允许同一个客户端多次获取同一把锁而不会被阻塞,类似于Java中的ReentrantLock
。它通过在Zookeeper的指定路径下创建临时序列节点来实现锁的功能。如果获取锁失败,当前线程会监听前一个节点的变动情况并等待,直到被唤醒或超时
package com.zz.lock;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;public class CuratorReentrantLockExample {private final String lockPath = "/curator/lock"; // 锁的Zookeeper路径private CuratorFramework client; // Curator客户端private InterProcessMutex mutex; // 可重入锁// 初始化Curator客户端和可重入锁public void init() {// 设置Zookeeper服务地址String connectString = "192.168.200.130:2181";// 设置重试策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);// 创建Curator客户端client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);client.start();// 创建可重入锁mutex = new InterProcessMutex(client, lockPath);}// 执行业务逻辑,使用可重入锁public void executeBusinessLogic() {try {// 获取锁mutex.acquire();// 模拟业务逻辑System.out.println("当前线程获得锁,开始执行业务逻辑。");// 模拟重入逻辑reentrantLock();// 模拟业务逻辑System.out.println("当前线程完成业务逻辑执行。");} catch (Exception e) {e.printStackTrace();} finally {// 确保释放锁if (mutex.isAcquiredInThisProcess()) {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}}}// 模拟可重入逻辑public void reentrantLock() {try {// 再次获取同一把锁mutex.acquire();System.out.println("当前线程重入成功,再次获得同一把锁。");// 模拟一些操作...} catch (Exception e) {e.printStackTrace();} finally {// 释放锁if (mutex.isAcquiredInThisProcess()) {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}}}// 程序入口public static void main(String[] args) {CuratorReentrantLockExample example = new CuratorReentrantLockExample();example.init();// 执行业务逻辑example.executeBusinessLogic();}
}
不可重入锁(InterProcessSemaphoreMutex):与可重入锁类似,但不允许同一个线程在持有锁的情况下再次获取该锁。这种锁很容易导致死锁,使用时需要特别注意
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;public class CuratorReentrantLockExample {private final String lockPath = "/curator/lock"; // 锁的Zookeeper路径private CuratorFramework client; // Curator客户端private InterProcessMutex mutex; // 可重入锁// 初始化Curator客户端和可重入锁public void init() {// 设置Zookeeper服务地址String connectString = "127.0.0.1:2181";// 设置重试策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);// 创建Curator客户端client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);client.start();// 创建可重入锁mutex = new InterProcessMutex(client, lockPath);}// 执行业务逻辑,使用可重入锁public void executeBusinessLogic() {try {// 获取锁mutex.acquire();// 模拟业务逻辑System.out.println("当前线程获得锁,开始执行业务逻辑。");// 模拟重入逻辑reentrantLock();// 模拟业务逻辑System.out.println("当前线程完成业务逻辑执行。");} catch (Exception e) {e.printStackTrace();} finally {// 确保释放锁if (mutex.isAcquiredInThisProcess()) {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}}}// 模拟可重入逻辑public void reentrantLock() {try {// 再次获取同一把锁mutex.acquire();System.out.println("当前线程重入成功,再次获得同一把锁。");// 模拟一些操作...} catch (Exception e) {e.printStackTrace();} finally {// 释放锁if (mutex.isAcquiredInThisProcess()) {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}}}// 程序入口public static void main(String[] args) {CuratorReentrantLockExample example = new CuratorReentrantLockExample();example.init();// 执行业务逻辑example.executeBusinessLogic();}
}
读写锁(InterProcessReadWriteLock):提供一对相关的锁,读锁可以被多个读操作共享,而写锁则独占。一个拥有写锁的线程可以获取读锁,但读锁不能升级为写锁。这种锁是公平的,保证用户按请求顺序获取锁。(读写锁在逻辑上有点像数据库的事务)
package com.zz.lock;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;public class CuratorReadWriteLockExample {private final String lockPath = "/curator/read-write-lock"; // 锁的Zookeeper路径private CuratorFramework client; // Curator客户端private InterProcessReadWriteLock lock; // 读写锁// 初始化Curator客户端和读写锁public void init() {// 设置Zookeeper服务地址String connectString = "192.168.200.130:2181";// 设置重试策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);// 创建Curator客户端client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);client.start();// 创建读写锁lock = new InterProcessReadWriteLock(client, lockPath);}// 执行读操作public void executeReadOperation() throws Exception {try {// 获取读锁lock.readLock().acquire();// 模拟读操作System.out.println("读操作开始,线程安全地读取数据。");// 模拟读操作延迟Thread.sleep(3000);System.out.println("读操作结束。");} catch (Exception e) {e.printStackTrace();} finally {// 释放读锁if (lock.readLock().isAcquiredInThisProcess()) {lock.readLock().release();}}}// 执行写操作public void executeWriteOperation() throws Exception {try {// 获取写锁lock.writeLock().acquire();// 模拟写操作System.out.println("写操作开始,线程独占资源进行写入。");// 模拟写操作延迟Thread.sleep(3000);System.out.println("写操作结束,更新了数据。");} catch (Exception e) {e.printStackTrace();} finally {// 释放写锁if (lock.writeLock().isAcquiredInThisProcess()) {lock.writeLock().release();}}}// 程序入口public static void main(String[] args) {CuratorReadWriteLockExample example = new CuratorReadWriteLockExample();example.init();// 启动多个读操作线程for (int i = 0; i < 5; i++) {new Thread(() -> {try {example.executeReadOperation();} catch (Exception e) {e.printStackTrace();}}).start();}// 启动写操作线程new Thread(() -> {try {example.executeWriteOperation();} catch (Exception e) {e.printStackTrace();}}).start();}
}
联锁(InterProcessMultiLock):这是一个锁的容器,可以同时获取多个锁。如果获取过程中任何一个锁请求失败,已获取的所有锁都会被释放。这在需要同时持有多个锁执行操作的场景中非常有用
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import java.util.Arrays;
import java.util.List;public class InterProcessMultiLockExample {private CuratorFramework client;private List<String> lockPaths = Arrays.asList("/lock1", "/lock2", "/lock3");private List<InterProcessLock> locks = lockPaths.stream().map(path -> new InterProcessMutex(client, path)).collect(Collectors.toList());private InterProcessMultiLock multiLock;public void init() {String connectString = "127.0.0.1:2181";RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);client.start();multiLock = new InterProcessMultiLock(locks);}public void executeProtectedOperation() {try {multiLock.acquire();// 所有锁都已获取,执行你的业务逻辑System.out.println("All locks acquired, performing business logic.");// 业务逻辑...} catch (Exception e) {e.printStackTrace();} finally {// 确保释放所有锁multiLock.release();}}public static void main(String[] args) {InterProcessMultiLockExample example = new InterProcessMultiLockExample();example.init();example.executeProtectedOperation();}
}
信号量(InterProcessSemaphoreV2):Curator提供了一种信号量实现,可以控制同时访问某个资源的线程数量。通过acquire
方法请求获取信号量,使用完成后通过returnAll
方法释放
信号量(Semaphore)确实可以起到限流的作用。在分布式系统中,信号量是一种常用的限流工具,它通过控制同时访问某个资源或执行某个操作的线程数量来实现限流。以下是信号量实现限流的几个关键点:1. **资源限制**:信号量通过一个计数器来限制可用资源的数量。例如,如果你有10个停车位,你可以设置信号量的初始值为10。2. **请求处理**:当一个线程需要访问资源时,它首先尝试从信号量中获取一个“许可”(lease)。如果信号量的计数器大于0,该线程成功获取一个许可,然后继续执行。否则,线程将被阻塞,直到其他线程释放资源。3. **释放资源**:线程完成资源访问后,必须释放它获取的许可,通过将许可返还给信号量来实现。这会将信号量的计数器增加1,允许其他等待的线程获取许可。4. **公平性**:信号量通常是公平的,意味着线程将按照它们请求许可的顺序来获得它们。这有助于避免某些线程长时间等待访问资源。5. **跨JVM共享**:在分布式系统中,不同的进程可能在不同的JVM中运行。Apache Curator 提供的 `InterProcessSemaphoreV2` 允许跨JVM共享信号量状态,因此所有相关进程都能协调地访问共享资源。6. **自动资源回收**:如果持有信号量许可的线程或进程崩溃,Curator 会自动释放该许可,确保资源不会被永久占用,其他线程可以继续获取该资源。通过这种方式,信号量可以有效地控制对共享资源的并发访问,防止系统过载,从而实现限流。这在许多场景下都非常有用,比如数据库连接池、线程池、外部服务调用等。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import org.apache.curator.retry.ExponentialBackoffRetry;public class InterProcessSemaphoreV2Demo {private static final String PATH = "/semaphore/path";private static CuratorFramework client;public static void main(String[] args) throws Exception {// 初始化Curator客户端client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2));client.start();// 创建InterProcessSemaphoreV2实例,设置最大租约数为5InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, 5);// 线程示例,模拟同时请求信号量的多个线程for (int i = 0; i < 10; i++) {new Thread(new SemaphoreTask(semaphore)).start();}// 等待一段时间,让线程执行Thread.sleep(10000);// 关闭客户端连接client.close();}static class SemaphoreTask implements Runnable {private final InterProcessSemaphoreV2 semaphore;public SemaphoreTask(InterProcessSemaphoreV2 semaphore) {this.semaphore = semaphore;}@Overridepublic void run() {try {Lease lease = semaphore.acquire();System.out.println(Thread.currentThread().getName() + " acquired a lease.");// 模拟业务逻辑处理Thread.sleep(3000);// 释放信号量租约semaphore.returnLease(lease);System.out.println(Thread.currentThread().getName() + " returned a lease.");} catch (Exception e) {e.printStackTrace();}}}
}
分布式锁的实现原理:Curator的分布式锁通常是基于Zookeeper的临时顺序节点来实现的。当多个客户端尝试获取锁时,Zookeeper会为它们创建顺序节点,并让它们按照节点的序号依次尝试获取锁。未获取到锁的客户端会监听前一个序号的节点,一旦前一个节点释放锁,监听的客户端就会尝试获取锁
相关文章:
Apache Curator 分布式锁的介绍,以及案例
可重入锁(InterProcessMutex):这种锁允许同一个客户端多次获取同一把锁而不会被阻塞,类似于Java中的ReentrantLock。它通过在Zookeeper的指定路径下创建临时序列节点来实现锁的功能。如果获取锁失败,当前线程会监听前一…...

自动化测试 — selenium + Java
什么是自动化测试 将人为驱动的测试行为转化为机器执行的过程。 自动化测试包括UI 自动化,接口自动化,单元测试自动化。按照这个金字塔模型来进行自动化测试规划,可以产生最佳的自贡话测试产出投入比(ROI ),…...

【SpringBoot系列】接口参数的默认值与必要性
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
茶余饭后(五)
真正出类拔萃的人 往往都是狠角色, 他们具备着一种独特的特质 那就是: 目标清晰 意志如铁 底线分明 同时手段又极为高明 且勤奋不屑 在处于劣势时 他们表现的极为谦逊和低调 像一只温顺无害的小羊羔 然而一旦时机成熟 他们便会毫不犹豫的展现出强…...

【网络编程详解】
🌈个人主页:努力学编程’ ⛅个人推荐: c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构,刷题刻不容缓:点击一起刷题 🌙心灵鸡汤:总有人要赢,为什么不能是我呢 🔥…...

C# winform三层架构 实现增删改查( 显示数据,查询数据 显示,查询篇)
一.留言 上一篇讲解了如何去添加数据,那么本章节我们来做,添加数据后显示,以及咋现有的数据里,查询我们所需要的数据。 二.显示 首先我们看上一篇更新,我们在添加成功后跳转页面显示数据,那么跳转代码只…...
Apache Kylin 系列入门教程
Apache Kylin 是一款开源的分布式分析引擎,主要用于提供SQL接口及多维分析(OLAP)能力以支持超大规模数据集。它能在亚秒级时间内完成PB级别的数据查询。本文将带你一步步了解如何安装、配置和使用Apache Kylin来构建数据仓库,并执…...
如何识别并防御漏洞扫描类攻击
随着网络安全威胁的不断演变,漏洞扫描已成为黑客常用的手段之一,旨在发现目标系统中的弱点以便进行后续攻击。高防服务作为一种专业的安全防护措施,能够在一定程度上识别并阻止这类攻击行为。本文将深入探讨高防服务是如何识别并防御漏洞扫描…...

冷思考:低代码的AI Agent构建平台能创造价值吗?
当前AI 圈中热点讨论的产品,除了以ChatGPT为代表的Chatbot领域,以及以Character.ai 为代表的AI虚拟社交领域,另一个热度较高的领域就是AI Agent领域。 大模型发展到今天,已经基本达成了一个共识:错综复杂的工作任务无…...
Spring Boot如何自定义注解?
1.什么是注解 注解(Annotation),也叫元数据。一种代码级别的说明。它是JDK1.5及以后版本引入的一个特性,与类、接口、枚举是在同一个层次。它可以声明在包、类、字段、方法、局部变量、方法参数等的前面,用来对这些元…...
gin框架传入的gin.context参数是池化的
1. gin.context参数不但是池化的,而且是指针 2. 但是gin.context又实现了context的接口。因此,可以当作context去使用 3. 这就会导致一个很严重的问题: 1. 池化导致了复用后的ctx将会将之前使用的ctx中的内容进行覆盖。 2. 实现了context接…...

AWS注册是否必须使用美元银行卡
亚马逊网络服务(AWS)作为全球领先的云计算平台,吸引了众多企业和个人用户。然而,不少人在注册AWS账户时会产生疑问:是否必须使用美元银行卡?实际上,这种说法并不准确。虽然AWS的主要结算货币是美元,但用户在注册和使用过程中有多种支付方式可供选择。我们结合九河云的分析来告…...

Spring IOC 注入的3种方式
Spring IOC 注入的3种方式 1. 构造器注入(Constructor Injection)2. Setter方法注入(Setter Injection)3. 字段注入(Field Injection) 💖The Begin💖点点关注,收藏不迷路…...

无人机影像基于机器学习的遥感反演及其结果可视化,定量遥感反演结果出图,相关性分析,指标筛选,特征选择
无人机影像或者卫星遥感反演分类模型的建立,反演模型的可视化制图出图,相关性分析,指标筛选,特征选择。代码太多,可企鹅联系: 指标的相关性分析。572 特征选择,贡献性最大的特征。412 LAI反演&…...

Eclipse插件之Java Dependency Viewer(显示类和包的关系图)
Java Dependency Viewer 插件的作用 Eclipse插件Java Dependency Viewer是一个为Java项目提供依赖关系可视化功能的工具。 在复杂的Java项目中,理解和分析类与类之间、包与包之间的依赖关系是非常有用的。Java Dependency Viewer插件通过生成依赖关系图,…...
H5小游戏出海,如何流量变现?
根据数据显示, 90%的轻度休闲游戏收入来自广告,即IAA(In-App Advertising)。使用这种形式进行变现的游戏类型大多以超休闲游戏为主,玩法简单、游戏内容轻度、风格简洁、游戏时间碎片化且即时娱乐性较高,收益…...

轻空间六大专利优势:引领气膜建筑新时代
在绿色建筑和科技创新的驱动下,轻空间不断突破传统建筑的限制,推出了一系列具有前瞻性和高性能的专利技术。通过这些技术,轻空间不仅为建筑行业注入了新动力,也为未来的气膜建筑设定了更高的标准。 低碳建材:“clearsk…...
LeetCode-day37-2940. 找到 Alice 和 Bob 可以相遇的建筑
LeetCode-day37-2940. 找到 Alice 和 Bob 可以相遇的建筑 题目描述示例示例1:示例2: 思路代码 题目描述 给你一个下标从 0 开始的正整数数组 heights ,其中 heights[i] 表示第 i 栋建筑的高度。 如果一个人在建筑 i ,且存在 i &…...
unity 判断平台
原文链接 Unity中判断平台的方法 Unity提供了一些方法来判断当前运行的平台,其中包括了判断是否为i0S平台。以下是几种常用的方法1.Application.platform Applicaion,platom 是Unity中的一个枚举类型,用于表示当前运行的平台。可以通过比较 Apication,p…...

PyCharm找不到Python了咋办
Python发生了重装的,且新的路径和原有路径不同,就会出现如下的错误: 解决办法: 点开PyCharm菜单的File/Setting 然后: 有上图的提示,说明需要将原来的venv进行清空。 如此操作之后,原来的红色…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...

微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...

YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...

k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...

【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...