ZooKeeper 客户端API操作
文章目录
- 一、节点信息
- 1、创建节点
- 2、获取子节点并监听节点变化
- 3、判断节点是否存在
- 4、客户端向服务端写入数据
- 写入请求直接发给 Leader 节点
- 写入请求直接发给 follow 节点
- 二、服务器动态上下线监听
- 1、监听过程
- 2、代码
- 三、分布式锁
- 1、什么是分布式锁?
- 2、Curator 框架实现分布式锁
一、节点信息
前提:centos102、centos103、centos104 服务器都已经开启
pom.xml 依赖
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>RELEASE</version><scope>compile</scope></dependency>
</dependencies>
log4j.properties 配置
# 设置全局的日志记录级别为 INFO
log4j.rootLogger=INFO, stdout# 控制台输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n# 文件输出
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
1、创建节点
zkClient.java 代码
// 注意:逗号后面不能有空格
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;// 创建客户端
@Before
public void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {}});
}// 创建子节点
@Test
public void create() throws InterruptedException, KeeperException {String nodeCreated = zkClient.create("/frost", "cat".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
运行创建子节点,看看是否创建了该节点
2、获取子节点并监听节点变化
@Test
public void getChildren() throws InterruptedException, KeeperException {List<String> children = zkClient.getChildren("/", true);for (String child : children) {System.out.println(child);}
}
那如果此时我再创建一个节点,此时控制台没有任何变化,我想要创建一个节点控制台能够看到相关变化怎么办?此时只需要将程序保持不结束,然后将客户端查看子节点函数放入监听器中。
3、判断节点是否存在
@Test
public void exit() throws InterruptedException, KeeperException {Stat stat = zkClient.exists("/frost", false);System.out.println(stat == null ? "not exits" : "exits");
}
4、客户端向服务端写入数据
写入请求直接发给 Leader 节点
- 客户端发送写入请求,leader节点执行写入操作
- leader通知follow1执行写入操作
- folllow1写入完毕给leader返回确认ack
- 现在半数以上服务器完成写入,leader给客户端发送确认ack
- leader通知follow2写入
- follow2写入完毕给leader发送确认ack
写入请求直接发给 follow 节点
- 客户端发送写入请求,
- follow1 将写入请求发送给leader
- leader节点执行写入操作,然后leader通知follow1执行写入操作
- folllow1写入完毕给leader返回确认ack
- 现在半数以上服务器完成写入,leader给follow1发送确认ack
- follow1给客户端发送确认ack
- leader通知follow2写入
- follow2写入完毕给leader发送确认ack
二、服务器动态上下线监听
1、监听过程
以下红色字体写错,应该是下线则通知注册监听器的客户端
对于ZooKeeper集群来说,客户端和服务器都相当于客户端,区别在于:服务器在ZooKeeper集群中是创建节点,客户端在ZooKeeper是监听信息。
2、代码
服务器注册到zk集群
import org.apache.zookeeper.*;
import java.io.IOException;public class DistributeServer {private String connectString = "centos102:2181,centos103:2181,centos104:2181";private int sessionTimeout = 2000;ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeServer server = new DistributeServer();// 1. 获取zk连接server.getConnect();// 2. 注册服务器到 zk 集群server.regist(args[0]);// 3. 启动业务逻辑(睡觉)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void regist(String hostname) throws InterruptedException, KeeperException {String create = zk.create("/servers", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 临时带序号的节点System.out.println(hostname + "is online");}private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {}});}
}
客户端进行监听
import org.apache.zookeeper.*;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class DistributeClient {private String connectString = "centos102:2181,centos103:2181,centos104:2181";private int sessionTimeout = 2000;ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeClient client = new DistributeClient();// 1. 获取zk连接client.getConnect();// 2. 监听/servers下子节点的增加和删除client.getServerList();// 3. 启动业务逻辑(睡觉)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);ArrayList<String> servers = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers/" + child, false, null);servers.add(new String(data));}System.out.println(servers);}private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
}
启动客户端,然后在服务器上进行增加节点监听
删除节点监听
因为我们服务端的代码传参了,所以我们需要设置一下这个参数:
下图代表服务端启动的是hadoop102节点
先把客户端启动起来,发现有一个节点hadoop101:
在启动服务端,hadoop102上线:
然后返回看客户端的监听,发现节点有变化,打印出所有节点 [hadoop102, hadoop101]
此时我们修改一下再此启动服务端让 hadoop103 上线:
返回客户端查看发现 hadoop102 下线,hadoop103 上线
三、分布式锁
1、什么是分布式锁?
比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributedLock {private final String connectString = "centos102:2181,centos103:2181,centos104:2181";private final int sessionTimeout = 2000;ZooKeeper zk;private CountDownLatch connectLatch = new CountDownLatch(1);private CountDownLatch waitLatch = new CountDownLatch(1);// 前一个节点private String waitPath;// 当前节点String currentMode;public DistributedLock() throws IOException, InterruptedException, KeeperException {// 1. 获取连接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// connectLatch,如果连接上zk,可以释放if (event.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch,需要释放if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待zk正常连接后往下走connectLatch.await();// 2. 判断根节点/lock是否存在Stat stat = zk.exists("/locks", false);if (stat == null) {// 创建根节点(永久节点)zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}// 对 zk 加锁public void zkLock() {// 创建对应的临时带序号的节点try {currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 判断创建的节点是否是最小的序号节点,如果是,获取到锁;如果不是,监听前一个节点List<String> children = zk.getChildren("/locks", false);// 如果 children 只有一个节点,直接获取锁;如果有多个节点,需要判断,谁最小if (children.size() == 1) {return;}else {// 排序Collections.sort(children);// 获取节点名称seq00000001String thisNode = currentMode.substring("/locks/".length());// 通过seq00000001获取该节点在children当中的位置int index = children.indexOf(thisNode);if (index == -1) {System.out.println("数据异常");}else if (index == 0) {// 该节点为第一个,获取锁直接返回return;}else {// 不是第一个,监听前一个节点waitPath = "/locks/" + children.get(index - 1);zk.getData(waitPath, true, null);// 等待监听结束waitLatch.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}// 解锁public void unZkLock() throws InterruptedException, KeeperException {// 删除节点zk.delete(currentMode, -1);}
}
测试
import org.apache.zookeeper.KeeperException;import java.io.IOException;public class DistributedLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {final DistributedLock lock1 = new DistributedLock();final DistributedLock lock2 = new DistributedLock();new Thread(new Runnable() {@Overridepublic void run() {try {lock1.zkLock();System.out.println("线程1启动,获取到锁");Thread.sleep(5000);lock1.unZkLock();System.out.println("线程1释放锁");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zkLock();System.out.println("线程2启动,获取到锁");Thread.sleep(5000);lock2.unZkLock();System.out.println("线程2释放锁");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}}).start();}}
2、Curator 框架实现分布式锁
原生JAVA API出现的问题:
(1)会话是异步的,需要自己去连接
(2)Watch需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高
(4)不支持多节点的删除和创建,需要自己去递归
Curator 是一个专门解决分布式锁的框架,解决了原生JAVA API开发分布式遇到的的问题
curator 官方文档:https://curator.apache.org/index.html
pom.xml 文件添加依赖
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {public static void main(String[] args) {// 创建分布式锁1InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");// 创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("线程1获取到锁");lock1.acquire();System.out.println("线程1获取到锁");Thread.sleep(5 * 1000);lock1.release();System.out.println("线程1释放锁");lock1.release();System.out.println("线程1再次释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("线程2获取到锁");lock2.acquire();System.out.println("线程2获取到锁");Thread.sleep(5 * 1000);lock2.release();System.out.println("线程2释放锁");lock2.release();System.out.println("线程2再次释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("centos102:2181,centos103:2181,centos104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();// 启动客户端client.start();System.out.println("zookeeper 启动成功");return client;}
}
相关文章:

ZooKeeper 客户端API操作
文章目录 一、节点信息1、创建节点2、获取子节点并监听节点变化3、判断节点是否存在4、客户端向服务端写入数据写入请求直接发给 Leader 节点写入请求直接发给 follow 节点 二、服务器动态上下线监听1、监听过程2、代码 三、分布式锁1、什么是分布式锁?2、Curator 框架实现分布…...
常用滤波算法(一)-限幅滤波法
文章目录 一、限幅滤波法原理二、C语言实现限幅滤波法三、代码解析定义限制值:限幅滤波函数:模拟获取新数据:主函数: 四、结论 限幅滤波法 限幅滤波法,作为一种简单而有效的滤波方法,通过限制信号的幅值范围…...

江协科技STM32学习- P33 实验-软件I2C读写MPU6050
🚀write in front🚀 🔎大家好,我是黄桃罐头,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流 🎁欢迎各位→点赞👍 收藏⭐️ 留言📝…...

BusHound工具的使用-调试USB
12 1.Capture(捕捉按钮)、2.Save(保存按钮)、3.Setting(设置要监听的,输入输出)、4.Device(选择要监听的设备)、5.Help(帮助按钮)、6.Exit(退出按钮)。 一、Capture页面 1.Device 表示是29设备端口,打印机。 2.Phase,各类协议,…...

Hadoop生态圈框架部署(四)- Hadoop完全分布式部署
文章目录 前言一、Hadoop完全分布式部署(手动部署)1. 下载hadoop2. 上传安装包2. 解压hadoop安装包3. 配置hadoop配置文件3.1 虚拟机hadoop1修改hadoop配置文件3.1.1 修改 hadoop-env.sh 配置文件3.3.2 修改 core-site.xml 配置文件3.3.3 修改 hdfs-site…...

Spring Boot 与 Vue 共铸卓越采购管理新平台
作者介绍:✌️大厂全栈码农|毕设实战开发,专注于大学生项目实战开发、讲解和毕业答疑辅导。 🍅获取源码联系方式请查看文末🍅 推荐订阅精彩专栏 👇🏻 避免错过下次更新 Springboot项目精选实战案例 更多项目…...
leetcode3. Longest Substring Without Repeating Characters
Given a string s, find the length of the longest substring without repeating characters. Example 1: Input: s “abcabcbb” Output: 3 Explanation: The answer is “abc”, with the length of 3. Example 2: Input: s “bbbbb” Output: 1 Explanation: The ans…...
Mongodb使用视图连接两个集合
您可以使用 $lookup 为两个集合创建一个视图,然后对该视图运行查询。应用程序可以查询视图,而无需构建或维护复杂的管道。 例子 创建两个样本集合 inventory 和 orders: db.inventory.insertMany( [{ prodId: 100, price: 20, quantity: 1…...

SIP是什么?
SIP(Session Initiation Protocol,会话启动协议)是一个用于建立、更改和终止多媒体会话的应用层控制协议,其中的会话可以是IP电话、多媒体会话或多媒体会议。 SIP是IETF多媒体数据和控制体系结构的核心协议(最新RFC文档…...
Day 39 || 01背包、416. 分割等和子集
01背包 题目链接:卡码网第46题 二维解题思路:需要建立一个i行k列的dp数组,i表示每个物品,k代表容量,初始化数组子一列为0,第一行从背包开始能够放入起始为价值,其他都为0。for双循环先背包后物…...
调用detr-resnet-50进行目标检测
from transformers import DetrImageProcessor, DetrForObjectDetection import torch from PIL import Imageimage = Image.open("1.jpg") torch.set_default_device("cuda"...
Chromium 中chrome.fontSettings扩展接口定义c++
一、chrome.fontSettings 使用 chrome.fontSettings API 管理 Chrome 的字体设置。 权限 fontSettings 要使用 Font Settings API,您必须在扩展程序中声明 "fontSettings" 权限 清单。例如: {"name": "My Font Settings E…...
在Unity游戏开发在面试时会面试哪些内容?
1、请描述游戏动画有几种,以及其原理。 关键帧动画:每一帧动画序列当中包含了顶点的空间位置信息以及改变量,然后通过插值运算,得出动画效果。选中某一游戏对象,创建animation,添加属性Transform࿰…...
刘艳兵-DBA022-以下关于Oracle半连接的描述,哪些是正确的?
以下关于Oracle半连接的描述,哪些是正确的? A SQL中的NOT EXISTS子查询,通常会被转换为对应的半连接。 B SQL中的IN子查询,通常会被转换为对应的半连接。 C 半连接会去重 D SQL中的EXISTS子查询,通常会被转…...

人工智能与伦理:我们应该如何平衡科技与人性?
内容概要 在这个瞬息万变的时代,人工智能的迅猛发展让我们面对前所未有的伦理困境。科技进步带来了便利,但同时也亟需我们反思如何对待人性。尤其是在实现算法透明性时,我们要确保每一个决策背后都能被理解与追溯,这不仅是对技术…...

CRON组件一个复杂的一个简单的
CRON组件一个复杂的一个简单的 一个是复杂点的一个是简单点。 1.以简单的为例使用: 父组件 import CronSimple from "/views/xxx/components/cron-simple/index.vue";components: {CronSimple}<el-dialog title"调度CRON"v-if"cronV…...

自定义日志打成jar包引入项目后不生效
背景:写了一个请求响应日志包,打包后在另一个项目使用pom引入后不生效 package com.example.qqllog.aspect;import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean;…...

RK3568平台开发系列讲解(中断篇)延迟工作实验
🚀返回专栏总目录 文章目录 一、什么是延迟工作二、 struct delayed_work三、延迟工作相关接口函数3.1、初始化延迟工作函数3.2、调度/取消调度 延迟工作函数四、驱动程序编写沉淀、分享、成长,让自己和他人都能有所收获!😄 一、什么是延迟工作 延迟工作是一种将工作的执…...

RabbitMQ 的集群
大家好,我是锋哥。今天分享关于【RabbitMQ 的集群】面试题?希望对大家有帮助; RabbitMQ 的集群 RabbitMQ 是一种流行的开源消息代理,广泛用于构建分布式系统中的消息队列。随着应用程序规模的扩大,单一的 RabbitMQ 实…...

整车功能架构 --- 智能座舱
我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 所有人的看法和评价都是暂时的,只有自己的经历是伴随一生的,几乎所有的担忧和畏惧…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...
k8s从入门到放弃之Ingress七层负载
k8s从入门到放弃之Ingress七层负载 在Kubernetes(简称K8s)中,Ingress是一个API对象,它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress,你可…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理
引言 Bitmap(位图)是Android应用内存占用的“头号杀手”。一张1080P(1920x1080)的图片以ARGB_8888格式加载时,内存占用高达8MB(192010804字节)。据统计,超过60%的应用OOM崩溃与Bitm…...

html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
重启Eureka集群中的节点,对已经注册的服务有什么影响
先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...