Zookeeper-JavaApI操作
JavaApI操作
- JavaApI操作
- 1) Curator 介绍
- 2) Curator API 常用操作
- a) 建立连接与CRUD基本操作
- b) Watch事件监听
- c) 分布式锁
- c.1) 介绍
- c.2) Zookeeper分布式锁原理
- c.3) 案例:模拟12306售票
JavaApI操作
1) Curator 介绍
Curator 是 Apache ZooKeeper 的Java客户端库。
常见的ZooKeeper Java API :
- 原生Java API
- ZkClient
- Curator
Curator 项目的目标是简化 ZooKeeper 客户端的使用。
Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。
官网:http://curator.apache.org/
2) Curator API 常用操作
a) 建立连接与CRUD基本操作
Curator API 常用操作:
- 建立连接
- 第一种方法:CuratorFrameworkFactory.newClient
- 第二种方法:CuratorFrameworkFactory.builder() 推荐
/**
* 建立连接
*/
@Test
public void testConnect() {/*** connectString – 连接字符串 zk server 地址和端口 "192.168.200.130:2181"* sessionTimeoutMs – 会话超时时间 单位ms* connectionTimeoutMs – 连接超时时间 单位ms* retryPolicy – 重试策略*/RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);// 1.第一种方式CuratorFramework client1 = CuratorFrameworkFactory.newClient("192.168.200.130:2181", 60 * 1000, 15 * 1000, retryPolicy);client1.start();// 2.第二种方式CuratorFramework client2 = CuratorFrameworkFactory.builder().connectString("192.168.200.130:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("dcy") // 名称空间.build();client2.start();
}
- 添加节点
- 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
- 默认类型:持久化
- creatingParentsIfNeeded 如果父节点不存在,则创建父节点
/**
* 创建节点: create 持久 临时 顺序 数据
*/
@Test
public void testCreate1() throws Exception {// 1.基本创建// 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储String rs = client2.create().forPath("/app1");System.out.println("rs = " + rs);
}@Test
public void testCreate2() throws Exception {// 2.创建节点 带有数据String rs = client2.create().forPath("/app2", "hehe".getBytes());System.out.println("rs = " + rs);
}@Test
public void testCreate3() throws Exception {// 3.设置节点的类型// 默认类型:持久化String rs = client2.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");System.out.println("rs = " + rs);
}@Test
public void testCreate4() throws Exception {// 4.创建多级节点// creatingParentsIfNeeded 如果父节点不存在,则创建父节点String rs = client2.create().creatingParentsIfNeeded().forPath("/app4/p1");System.out.println("rs = " + rs);
}
- 删除节点
- 1.删除单个节点
delete().forPath()
- 2.删除带有子节点的节点
delete().deletingChildrenIfNeeded().forPath()
- 3.必须成功的删除(为了防止网络抖动,本质是重试)
delete().guaranteed().forPath()
- 4.回调
inBackground
- 1.删除单个节点
/**
* 删除节点: delete, deleteall
*/
@Test
public void testDelete1() throws Exception {// 1.删除单个节点client2.delete().forPath("/app1");
}@Test
public void testDelete2() throws Exception {// 2.删除带有子节点的节点client2.delete().deletingChildrenIfNeeded().forPath("/app4");
}@Test
public void testDelete3() throws Exception {// 3.必须成功的删除 (可能会因为网络抖动等原因,操作超时)client2.delete().guaranteed().forPath("/app2");
}@Test
public void testDelete4() throws Exception {// 4.回调client2.delete().guaranteed().inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("我被删除了~");System.out.println(event);}}).forPath("/app1");
}
- 修改节点
- 1.修改数据
setData().forPath()
- 2.根据版本修改
setData().withVersion().forPath()
- version 是通过查询出来的,目的是为了让其他客户端和线程不干扰我
- 1.修改数据
/**
* 修改数据
*/
@Test
public void testSet() throws Exception {client2.setData().forPath("/app1", "dcy".getBytes());
}@Test
public void testSetForVersion() throws Exception {Stat stat = new Stat();client2.getData().storingStatIn(stat).forPath("/app1");int version = stat.getVersion(); // 查询出来client2.setData().withVersion(version).forPath("/app1", "haha".getBytes());
}
- 查询节点
- 1.查询数据:get:
getData().forPath()
- 2.查询子节点:ls:
getChildren().forPath()
- 3.查询节点状态信息:ls -s:
getData().storingStatIn(状态对象).forPath()
- 1.查询数据:get:
/**
* 查询节点:
* 1.查询数据:get: getData().forPath()
* 2.查询子节点:ls: getChildren().forPath()
* 3.查询节点状态信息:ls -s: getData().storingStatIn(状态对象).forPath()
*/
@Test
public void testGet1() throws Exception {// 1.查询数据:getbyte[] bytes = client2.getData().forPath("/app1");System.out.println(new String(bytes));
}@Test
public void testGet2() throws Exception {// 2.查询子节点:lsList<String> path = client2.getChildren().forPath("/");System.out.println(path);
}@Test
public void testGet3() throws Exception {// 3.查询节点状态信息:ls -sStat status = new Stat();client2.getData().storingStatIn(status).forPath("/app1");System.out.println("status = " + status);
}
b) Watch事件监听
ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便 需要开发人员自己反复注册Watcher,比较繁琐。
Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
ZooKeeper提供了三种Watcher:
- NodeCache : 只是监听某一个特定的节点
- PathChildrenCache : 监控一个ZNode的子节点.
- TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
1.NodeCache : 监听一个特定的节点
/**
* 演示NodeCache
*/
@Test
public void testNodeCache() throws Exception {// 1.创建NodeCache监听对象final NodeCache nodeCache = new NodeCache(client, "/app1");// 2.注册监听nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("节点变化~~~");// 获取修改节点后的数据byte[] data = nodeCache.getCurrentData().getData();System.out.println(new String(data));}});// 3.开启监听,如果设置true,则开启监听是,加载缓存数据nodeCache.start(true);while (true) {}
}
2.PathChildrenCache:监听某个节点的所有子节点
/**
* 演示PathChildrenCache:监听某个节点的所有子节点
*/
@Test
public void testPathChildrenCache () throws Exception {// 1.创建监听对象PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);// 2.绑定监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("子节点变化~~~");System.out.println("event = " + event);// 监听子节点的数据变更,并且拿到变更后的数据// 1.获取类型PathChildrenCacheEvent.Type type = event.getType();// 2.判断类型是否是updateif (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {byte[] data = event.getData().getData();System.out.println(new String(data));}}});// 3.开启监听pathChildrenCache.start();while (true) {}
}
3.TreeCache:监听某个节点自己和所有的子节点
/**
* 演示TreeCache:监听某个节点自己和所有的子节点
*/
@Test
public void testTreeCache () throws Exception {// 1.创建监听器TreeCache treeCache = new TreeCache(client, "/app2");// 2.注册监听treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println("节点变化了");System.out.println("event = " + event);}});// 3.开启监听treeCache.start();while (true) {}
}
c) 分布式锁
c.1) 介绍
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。
c.2) Zookeeper分布式锁原理
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除节点
- 1.客户端获取锁时,在lock节点下创建临时顺序节点
- 2.然后获取lock下面的所有子节点,客户端获取到所有的子节点后。如果发现自己创建的节点顺序最小,那就认为该客户端获取到了锁。使用完锁后,将该节点删除
- 3.如果发现自己创建的节点并发lock所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
- 4.如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点 是否是lock子节点中序号最小的。如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点,并注册监听。
c.3) 案例:模拟12306售票
Curator实现分布式锁API
在Curator中有五种锁方案:
- InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
- InterProcessMutex:分布式可重入排它锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
// 测试类
public class LockTest {public static void main(String[] args) {Ticket12306 ticket12306 = new Ticket12306();// 创建客户端Thread t1 = new Thread(ticket12306, "携程");Thread t2 = new Thread(ticket12306, "飞猪");t1.start();t2.start();}
}// Ticket12306
public class Ticket12306 implements Runnable{private int tickets = 10; // 数据库的票数private InterProcessMutex lock; // 创建锁public Ticket12306() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.200.130:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).build();client.start();this.lock = new InterProcessMutex(client, "/lock");}@Overridepublic void run() {while (true) {try {// 获取锁lock.acquire(3, TimeUnit.SECONDS);if (tickets > 0) {System.out.println(Thread.currentThread().getName() + ":" + tickets);tickets--;}} catch (Exception e) {e.printStackTrace();} finally {// 释放锁try {lock.release();} catch (Exception e) {e.printStackTrace();}}}}
}
相关文章:

Zookeeper-JavaApI操作
JavaApI操作 JavaApI操作1) Curator 介绍2) Curator API 常用操作a) 建立连接与CRUD基本操作b) Watch事件监听c) 分布式锁c.1) 介绍c.2) Zookeeper分布式锁原理c.3) 案例:模拟12306售票 JavaApI操作 1) Curator 介绍 Curator 是 Apache ZooKeeper 的Java客户端库。…...
Vue2.0打包指定路由前缀
【1】修改vue.config.js 如下修改publicPath: module.exports {publicPath:/concert,lintOnSave: false }【2】修改router/index.js base指定路由前缀: const router new VueRouter({mode: history,base: /concert, //指定路由前缀// base: process.env.BASE_…...

[vxe-table] 合并行后滚动错位
使用vxe-table的属性:span-method合并行,之后下拉后会错位 原因:缺少配置 scroll-y"{enabled: false}"参考: vxe-table合并行后错位...
动态规划:05不同路径
动态规划:05不同路径 62. 不同路径 五部曲 确定dp数组含义:到达第i,j位置的路径条数为d[i][j] 确定递归公式:d[i][j]d[i-1][j]d[i][j-1] 我们发现,想要到d[i][j],只能从d[i-1][j]或者d[i][j-1]达到 dp数…...

html与css知识点
html 元素分类 块级元素 1.独占一行,宽度为父元素宽度的100% 2.可以设置宽高 常见块级元素 h1~h6 div ul ol li dl dt dd table form header footer section nav article aside 行内元素 1.一行显示多个 2.不能设置宽高,宽高由元素内容撑开 常见行内…...
spring boot simple类型cache使用
注意:这里用的不是 redis 的缓存,simple 的缓存默认用的是 java 的 ConcurrentHashMap, 单纯的 simple 缓存,只需要引入下面的 pom 依赖即可:spring-boot-starter-cache <dependency><groupId>org.springf…...
springboot-aop-redis-lua 实现的分布式限流方案
1.自定义限流注解 Target({ElementType.METHOD, ElementType.TYPE}) Retention(RetentionPolicy.RUNTIME) Inherited Documented public interface Limit {/*** 名字*/String name() default "";/*** key*/String key() default "";/*** Key的前缀*/String…...
C++ realloc()用法及代码示例
C realloc()用法及代码示例 C 中的realloc() 函数重新分配先前分配但尚未释放的内存块。realloc() 函数重新分配先前使用 malloc() 、 calloc() 或 realloc() 函数分配但尚未使用 free() 函数释放的内存。如果新大小为零,则返回的值取决于库的实现。它可能会也可能…...
【Go】gin框架生成压缩包与下载文件
在没有加入下面这串代码之前,下载的压缩包一直为空。遂debug了两个小时。。。 可以在服务端本地创建压缩包。单独将服务端本地的压缩包发送给客户端也是没问题的。但是两个合起来,客户端接收到的压缩包内容就为空了。 期间也尝试了 zipFile.Close() zipW…...

iOS 面试题以及自我理解答案
1、简述push原理,push的证书和其他的有什么不一样? 第 一阶段:BeejiveIM服务器把要发送的消息、目的iPhone的标识打包,发给APNS。 第二阶段:APNS在自身的已注册Push服务 的iPhone列表中,查找有相应标识的iP…...

vue实现自定义滚动条
vue实现自定义滚动条 具体效果如下,这边我用的rem单位,比例是1:40, 先写下页面布局,把原生的滚动条给隐藏掉,给自定义的滑块增加transition: marginLeft 1s linear;可以使左边距过度的更顺滑 .top-box-2::-webkit-scr…...

基于Qt C++的工具箱项目源码,含命令行工具、桌面宠物、文献翻译、文件处理工具、医学图像浏览器、插件市场、设置扩展等工具
一、介绍 1. 基本信息 完整代码下载地址:基于Qt C的工具箱项目源码 TBox是一款基于Qt C的工具箱。用户可以自行选择安装所需的工具(以插件的形式),将TBox打造成专属于自己的效率软件。TBox基本界面展示如下: 2. 使用…...

C# AnimeGANv2 人像动漫化
效果 项目 下载 可执行程序exe下载 源码下载 其他 C# 人像卡通化 Onnx photo2cartoon-CSDN博客...
gateway接口参数加解密
上篇介绍了多种加解密的使用java加密使用 本篇主要介绍在gateway网关中使用对参数解密和返回数据进行加密的操作 原理 下面使用的是AES加密 SHA1withRSA加签 1-用户使用拿到的AES秘钥和RSA私钥。对数据进行加密和加签 2-进行验签和时间的检验 3-将解密的数据返回到具体的调用…...

WorkPlus定制化的局域网会议软件,提供安全稳定的会议体验
在现代商业环境中,迅速而高效的沟通是企业成功的关键要素之一。而在传统的会议模式下,时间成本和地理限制往往给企业带来不小的困扰。针对这一问题,WorkPlus推出了一款创新的局域网会议软件——WorkPlus Meet,旨在为企业创造高效的…...

干货|小白也能自制电子相册赶紧码住~
你是否想拥有一个独一无二的电子相册,却又苦于不知道如何下手?今天教你一个简单的方法,即使你是小白,也能轻松自制电子相册! 一、选择合适的工具 首先,你需要选择一个合适的工具来制作电子相册。有很多工具…...

docker之Harbor私有仓库
目录 一、什么是Harbor 二、Harbor的特性 三、Harbor的构成 1、六个组件 2、七个容器 四、私有镜像仓库的上传与下载 五、部署docker-compose服务 把项目中的镜像数据进行打包持久数据,如镜像,数据库等在宿主机的/data/目录下, 一、什么…...

服务器上部署python脚本
1.查看服务器上的python是否自带,一般都自带 2.将本地脚本上传到服务器 3.直接运行一下脚本看报什么错误 代码错误, 将f删除后报别的错误 上面是未安装依赖的错误。我们安装一下依赖 下面是编码的解决 #!/usr/bin/python # -*- coding: utf-8 -*- 先把…...

【excel技巧】如何在Excel表格中添加选项按钮?
不知道大家是否会9遇到需要勾中选项的情况,我们可以在电子表格中制作出可以勾选、选中的选项按钮,今天我们一起学习一下设置方法。 首先,我们需要先在excel工具栏中添加一个功能模块:开发工具 依次点击excel中的文件 – 选项 –…...

前端 vite+vue3——写一个随机抽奖组件
文章目录 ⭐前言⭐设计布局⭐交互设计⭐整体代码⭐insicode代码 ⭐总结⭐结束 ⭐前言 大家好,我是yma16,本文分享关于前端 vitevue3——写一个抽奖随机组件。 vue3系列相关文章: 前端vue2、vue3去掉url路由“ # ”号——nginx配置 csdn新星计…...

简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...
k8s从入门到放弃之Ingress七层负载
k8s从入门到放弃之Ingress七层负载 在Kubernetes(简称K8s)中,Ingress是一个API对象,它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress,你可…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
聊一聊接口测试的意义有哪些?
目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开,首…...

网站指纹识别
网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...