Zookeeper是什么?基于zookeeper实现分布式锁
zookeeper听的很多,但实际在应用开发中用的不错,主要是作为中间件配合使用的,例如:Kafka。
了解zk首先需要知道它的数据结构,可以想象为树、文件夹目录。每个节点有基本的信息,例如:创建时间、修改时间、版本,数据长度等。另外节点可以设置data,也就是数据,以字节的方式进行插入/获取,另外节点还拥有权限和状态。
状态很关键,有持久、临时(会话级别)、持久+顺序、临时+顺序、持久+TTL、临时+TTL。
顺序是给同一个节点增加一个编号,例如:path:/distributed_locks/lock
插入多个,在zk中是:/distributed_locks/lock0000000001和/distributed_locks/lock0000000002、、。
到这里数据结构已经大致清楚了,那么zk存在的意义是什么?
首先,zk的定义:是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供组服务。
关键点:集中、分布式。
在程序进行分布式、多节点部署时,传统程序内存中的变量或者锁机制等都不能在多节点中进行通用。此时,就需要一个集中式的一个中间件,在中间件上存储我们需要同时方案的变量或者其他定义。
那么,我们为什么不直接使用db数据库呢,可能是因为重?也可能是一些特殊的功能db中并不能实现?(临时会话、TTL?)。
作为目前很火热的一个中间件,存在它的意义肯定是有的。为什么说呢,zk是Java实现的,与 Hadoop、Kafka 等 Java 生态项目无缝集成。同理,可以想象,每个语言的特性不一致,都会有不同的中间件或者包。
上述,基本都是个人的一些理解,希望能给大家带来点启发。
zookeeper,咱们的扩展功能到分布式锁这里。通过节点的特性,我们采用会话级别、顺序性质的节点进行实现。
当我们的线程需要去尝试获取锁时,连接zk肯定是个会话,同时zk会根据顺序将不同的线程进行排序,线程内部只需要轮询、wait/notify等方式判断是否轮到自己得到锁了。获取到锁后,执行业务逻辑之后,随之可以将锁进行释放,以便让另外一个线程得到锁。
代码实现用2种方式实现:
原生zookeeper方法实现
package com.fahe.testdistrubutedlock.zk;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/*** @program: test-distrubuted-lock* @description: client* @author: <linfahe-694204477@qq.com>* @create: 2025-04-23 14:05**/
@Slf4j
public class ZkClient implements Watcher {
public static final String ZK_ADDR = "127.0.0.1:32181";public ZooKeeper zk;public CountDownLatch connectedSignal = new CountDownLatch(1);
public ZkClient() {try {zk = new ZooKeeper(ZK_ADDR, 3000, this);connectedSignal.await(); // 等待连接成功} catch (Exception e) {throw new RuntimeException(e);}}
@Overridepublic void process(WatchedEvent watchedEvent) {log.info("process WatchedEvent : {}", watchedEvent);if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {connectedSignal.countDown();}}
// 创建持久节点public void createNode() throws KeeperException, InterruptedException {Stat existsed = zk.exists("/my-node", false);if (existsed != null) {
// zk.delete("/my-node", -1);return;}String path = zk.create("/my-node", "data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("创建节点:" + path);}
// 获取节点数据public void getData() throws KeeperException, InterruptedException {byte[] data = zk.getData("/my-node", false, null);System.out.println("节点数据:" + new String(data));}
public static void main(String[] args) throws InterruptedException, KeeperException {ZkClient zkClient = new ZkClient();List<String> children = zkClient.zk.getChildren("/", true);for (String child : children) {log.info("child : {}", child);}zkClient.createNode();zkClient.getData();}
public void close() {try {if (zk != null) {zk.close();}} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
package com.fahe.testdistrubutedlock.zk;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
public class DistributedLock {private static final String LOCK_ROOT = "/locks";private static final String LOCK_NODE = LOCK_ROOT + "/lock_";private ZooKeeper zooKeeper;private String lockPath;
public DistributedLock(ZooKeeper zooKeeper) throws Exception {this.zooKeeper = zooKeeper;Stat stat = zooKeeper.exists(LOCK_ROOT, false);if (stat == null) {zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}
public void acquireLock() throws Exception {lockPath = zooKeeper.create(LOCK_NODE, "new byte[0]".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println("Lock path: " + lockPath);
while (true) {List<String> children = zooKeeper.getChildren(LOCK_ROOT, false);Collections.sort(children);String smallestChild = LOCK_ROOT + "/" + children.get(0);
if (lockPath.equals(smallestChild)) {System.out.println("Acquired lock: " + lockPath);return;}System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild);String watchNode = null;for (int i = children.size() - 1; i >= 0; i--) {String child = LOCK_ROOT + "/" + children.get(i);if (child.compareTo(lockPath) < 0) {watchNode = child;break;}}System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild + " ; watchNode = " + watchNode);
if (watchNode != null) {final Object lock = new Object();Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {synchronized (lock) {lock.notifyAll();}}};
Stat stat = zooKeeper.exists(watchNode, watcher);if (stat != null) {synchronized (lock) {lock.wait();}}}}}
public void releaseLock() throws Exception {if (lockPath != null) {zooKeeper.delete(lockPath, -1);System.out.println("Released lock: " + lockPath);lockPath = null;}}
public static void main(String[] args) {ZkClient client = new ZkClient();// 模拟多线程。for (int i = 0; i < 30; i++) {new Thread(() -> {try {mainTest(client);} catch (Exception e) {e.printStackTrace();}}).start();}// 模拟多实例。ZkClient client2 = new ZkClient();for (int i = 0; i < 30; i++) {new Thread(() -> {try {mainTest(client2);} catch (Exception e) {e.printStackTrace();}}).start();}}
public static void mainTest(ZkClient client) {
// = new ZkClient();try {ZooKeeper zooKeeper = client.zk;
DistributedLock lock = new DistributedLock(zooKeeper);lock.acquireLock();System.out.println("Lock acquired");// 模拟业务逻辑int randomSleepTime = (int) (Math.random() * 100);System.out.println("randomSleepTime = " + randomSleepTime);Thread.sleep(randomSleepTime);System.out.println("Business logic completed");lock.releaseLock();
// client.close();} catch (Exception e) {e.printStackTrace();}}
}
使用Curator三方包实现:
package com.fahe.testdistrubutedlock.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/*** @program: test-distrubuted-lock* @description: curator 测试* @author: <linfahe-694204477@qq.com>* @create: 2025-04-23 15:04**/
public class CuratorMain {private final InterProcessMutex lock;private static final String LOCK_PATH = "/distributed_lock/my_lock";private static final String ZK_ADDR = "127.0.0.1:32181";
public CuratorMain() {CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDR,new ExponentialBackoffRetry(200, 2));client.start();this.lock = new InterProcessMutex(client, LOCK_PATH);}
public boolean acquireLock() {try {lock.acquire();return true;} catch (Exception e) {e.printStackTrace();return false;}}
public void releaseLock() {try {if (lock.isAcquiredInThisProcess()) {lock.release();}} catch (Exception e) {e.printStackTrace();}}
public static void main(String[] args) {CuratorMain curatorMain = new CuratorMain();for (int i = 0; i < 100; i++) {new Thread(() -> {boolean acquireLock = curatorMain.acquireLock();System.out.println("thread-" + Thread.currentThread().getName() + " is running");System.out.println("acquireLock = " + acquireLock);if (acquireLock) {curatorMain.releaseLock();}}, "thread-" + i).start();}CuratorMain curatorMain2 = new CuratorMain();for (int i = 100; i < 200; i++) {new Thread(() -> {boolean acquireLock = curatorMain2.acquireLock();System.out.println("thread-" + Thread.currentThread().getName() + " is running");System.out.println("acquireLock = " + acquireLock);if (acquireLock) {curatorMain2.releaseLock();}}, "thread-" + i).start();}try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}
}
相关文章:
Zookeeper是什么?基于zookeeper实现分布式锁
zookeeper听的很多,但实际在应用开发中用的不错,主要是作为中间件配合使用的,例如:Kafka。 了解zk首先需要知道它的数据结构,可以想象为树、文件夹目录。每个节点有基本的信息,例如:创建时间、…...
Kafka 主题设计与数据接入机制
一、前言:万物皆流,Kafka 是入口 在构建实时数仓时,Kafka 既是 数据流动的起点,也是后续流处理系统(如 Flink)赖以为生的数据源。 但“消息进来了” ≠ “你就能处理好了”——不合理的 Topic 设计、接入方…...
gem5-gpu教程05 内存建模
memory-modeling|Details on how memory is modeled in gem5-gpu gem5-gpu’s Memory Simulation gem5-gpu在很大程度上避开了GPGPU-Sim的单独功能模拟,而是使用了gem5的执行中执行模型。因此,当执行存储/加载时,内存会被更新/读取。没有单独的功能路径。(顺便说一句,这…...
MySQL的日志--Redo Log【学习笔记】
MySQL的日志--Redo Log 知识来源: 《MySQL是怎样运行的》--- 小孩子4919 MySQL的事务四大特性之一就是持久性(Durability)。但是底层是如何实现的呢?这就需要我们的Redo Log(重做日志)闪亮登场了。它记录着…...
【AI应用】免费代码仓构建定制版本的ComfyUI应用镜像
免费代码仓构建定制版本的ComfyUI应用镜像 1 创建代码仓1.1 注册登陆1.2 创建代码仓1.5 安装中文语言包1.4 拉取ComfyUI官方代码2 配置参数和预装插件2.1 保留插件和模型的版本控制2.2 克隆插件到代码仓2.2.1 下载插件2.2.2 把插件设置本仓库的子模块管理3 定制Docker镜像3.1 创…...
MineWorld,微软研究院开源的实时交互式世界模型
MineWorld是什么 MineWorld是微软研究院开发并开源的一个基于《我的世界》(Minecraft)的实时互动世界模型。该模型采用了视觉-动作自回归Transformer架构,将游戏场景和玩家动作转化为离散的token ID,并通过下一个token的预测进行…...
被裁20240927 --- 视觉目标跟踪算法
永远都像初次见你那样使我心荡漾 参考文献目前主流的视觉目标跟踪算法一、传统跟踪算法1. 卡尔曼滤波(Kalman Filter)2. 相关滤波(Correlation Filter,如KCF、MOSSE)3. 均值漂移(MeanShift/CamShift&#x…...
Agentic AI——当AI学会主动思考与决策,世界将如何被重塑?
一、引言:2025,Agentic AI的元年 “如果ChatGPT是AI的‘聊天时代’,那么2025年将开启AI的‘行动时代’。”——Global X Insights[1] 随着Agentic AI(自主决策型人工智能)的崛起,AI系统正从被动应答的“工具…...
Ollama API 应用指南
1. 基础信息 默认地址: http://localhost:11434/api数据格式: application/json支持方法: POST(主要)、GET(部分接口) 2. 模型管理 API (1) 列出本地模型 端点: GET /api/tags功能: 获取已下载的模型列表。示例:curl http://lo…...
PNG透明免抠设计素材大全26000+
在当今的数字设计领域,寻找高质量且易于使用的素材是每个设计师的日常需求。今天,我们将为大家介绍一个超全面的PNG透明免抠设计素材大全,涵盖多种风格、主题和应用场景,无论是平面设计、网页设计还是多媒体制作,都能轻…...
4.多表查询
SQL 多表查询:数据整合与分析的强大工具 文章目录 SQL 多表查询:数据整合与分析的强大工具一、 多表查询概述1.1 为什么需要多表查询1.2 多表查询的基本原理 二、 多表查询关系2.1 一对一关系(One-to-One)示例: 2.2 一…...
MCP 基于 TypeScript 的完整示例,包含stdio、sse多种用法和调试,对于构建自己的API工具链很有用
typescript-mcp-demo 这是一个基于 Model Context Protocol (MCP) 的 TypeScript 示例项目,展示了如何创建一个简单的 MCP 服务器,包含基本的工具(tools)和资源(resources)功能。 官网:https:…...
位运算知识
位运算是一种直接对整数在内存中的二进制位进行操作的运算方式。计算机中的整数是以二进制形式存储的,位运算通过操作这些二进制位来实现高效的计算。位运算通常比普通的算术运算更快,因为它直接作用于硬件层面。 以下是几种常见的位运算符及其功能&…...
mybatis高级查询:一对多配置,一次性查出主表和子表中的数据
一、MyBatis高级查询:一对多 MyBatis是一款强大的持久层框架,提供了多种方式来处理关联查询,其中包括一对一和一对多的情况。在本文中,我们将深入探讨一对多关联查询的实现方式。 在MyBatis中配置一对多关系通常涉及到associati…...
美团2024年春招第一场笔试 C++
目录 1,小美的平衡矩阵 2,小美的数组询问 3,小美的MT 4,小美的朋友关系 1,小美的平衡矩阵 【题目描述】 给定一个n*n的矩阵,该矩阵只包含数字0和1。对于 每个i(1<i<n),求在该矩阵中&am…...
XHTMLConverter把docx转换html报java.lang.NullPointerException异常
一.报错 1.报错信息 org.apache.poi.xwpf.converter.core.XWPFConverterException: java.lang.NullPointerExceptionat org.apache.poi.xwpf.converter.xhtml.XHTMLConverter.convert(XHTMLConverter.java:77)at org.apache.poi.xwpf.converter.xhtml.XHTMLConverter.doConve…...
OpenCV 图形API(52)颜色空间转换-----将 NV12 格式的图像数据转换为 RGB 格式的图像
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 将图像从 NV12 (YUV420p) 色彩空间转换为 RGB。该函数将输入图像从 NV12 色彩空间转换到 RGB。Y、U 和 V 通道值的常规范围是 0 到 255。 输出图…...
pojovoDto等概念
Java 中的数据模型概念 POJO (Plain Old Java Object) POJO 是最简单的 Java 对象,不依赖于特定的框架,不实现任何特殊的接口,也不继承特定的类。 特点 具有无参构造函数属性使用 private 修饰提供公共的 getter 和 setter 方法可序列化 …...
COdeTop-206-反转链表
题目 206. 反转链表 给你单链表的头节点 head ,请你反转链表,并返回反转后的链表。 示例 1: 输入:head [1,2,3,4,5] 输出:[5,4,3,2,1]示例 2: 输入:head [1,2] 输出:[2,1]示例 …...
线段树讲解(小进阶)
目录 前言 一、线段树知识回顾 线段树区间加减 区间修改维护: 区间修改的操作: 区间修改update: 线段树的区间查询 区间查询: 区间查询的操作: 递归查询过程: 区间查询query: 代码&…...
4082P 信号/频谱分析仪
——新利通仪器仪表—— 4082P丨信号/频谱分析仪 2Hz~110GHz Ceyear 4082系列信号/频谱分析仪在显示平均噪声电平、相位噪声、互调抑制、动态范围、幅度精度和测试速度等方面具备极佳的射频性能。具备强大的频谱分析、符合标准的功率测量套件、I/Q分析、瞬态分析…...
夏季跑步注意
夏季跑步注意 医学专家警示:大众跑者需以安全为先,警惕高温环境下“盲目冲刺”的风险。一些事件再次印证马拉松适宜气温为6-15℃,超过20℃时需主动降速并增加补水量。 所以建议每一位跑友,为了健康的跑步,每年除了做…...
openharmony5.0.0中C++公共基础类测试-线程相关(一)
C公共基础类测试及源码剖析 延续传统,show me the code,除了给出应用示例还重点分析了下openharmony中的实现。 简介 openharmony中提供了C公共基础类库,为标准系统提供了一些常用的C开发工具类,本文分析其实现,并给…...
TDengine 数据订阅设计
简介 数据订阅作为 TDengine 的一个核心功能,为用户提供了灵活获取所需数据的能力。通过深入了解其内部原理,用户可以更加有效地利用这一功能,满足各种实时数据处理和监控需求。 基本概念 主题 与 Kafka 一样,使用 TDengine 数…...
python:mido 提取 midi文件中某一音轨的音乐数据
pip install mido 使用 mido库可以方便地处理 MIDI 文件,提取其中音轨的音乐数据。 1.下面的程序会读取指定的 MIDI 文件,并提取指定编号音轨的音乐数据,主要包括音符事件等信息。 编写 mido_extract.py 如下 # -*- coding: utf-8 -*- &…...
vue3 中推荐使用的页面布局方式
1、Flexbox布局 原理 Flexbox(弹性盒子布局模型)提供了一种更加高效的方式来对容器中的子元素进行布局、对齐和分配空间。它能够根据容器的大小和子元素的内容自动调整布局,非常适合一维布局(水平或垂直方向)。 优…...
URP-UGUI交互功能实现
一、非代码层面实现交互(SetActive) Button :在OnClick()中添加SetActive方法(但是此时只首次有效) Toggle :在OnClick()中添加动态的SetActive方法 &#…...
UniGoal 具身导航 | 通用零样本目标导航 CVPR 2025
UniGoal的提出了一个通用的零样本目标导航框架,能够统一处理多种类型的导航任务 (如对象类别导航、实例图像目标导航和文本目标导航),而无需针对特定任务进行训练或微调。 它的特点是 图匹配与多阶段探索策略!&#x…...
通过Quartus II实现Nios II编程
目录 一、认识Nios II二、使用Quartus II 18.0Lite搭建Nios II硬件部分三、软件部分四、运行项目 一、认识Nios II Nios II软核处理器简介 Nios II是Altera公司推出的一款32位RISC嵌入式处理器,专门设计用于在FPGA上运行。作为软核处理器,Nios II可以通…...
Linux/AndroidOS中进程间的通信线程间的同步 - IPC方式简介
前言 从来没有总结过Linux/Android系统中进程间的通信方式和线程间的同步方式,这个专栏就系统总结讨论一下。首先从标题可知,讨论问题的主体是进程和线程、通信和同步;在这里默认你理解进程和线程的区别。通信和同步有什么概念上的区别&…...
