zookeeper案例
目录
案例一:服务器动态上下线
服务端:
(1)先获取zookeeper连接
(2)注册服务器到zookeeper集群:
(3)业务逻辑(睡眠):
服务端代码如下:
客户端:
(1)获取zookeeper的连接:
(2)监听/servers下边的子节点的增减:
客户端代码如下:
案例二:ZooKeeper 分布式锁
分布式锁是什么?
锁的实现:
构造函数:
加锁函数:
解锁函数:
整体代码:
测试类代码 :
Curator 框架实现分布式锁案例:
实现步骤:
代码如下:
该案例主要也是客户端监听原理,客户端监听服务器的上下线情况
先在集群上创建/servers 节点(用于存储连接的服务器的主机和该服务器的节点数)相当于zookeeper集群
案例一:服务器动态上下线
服务端:
(1)先获取zookeeper连接
创建类对象
该类为我们创建的服务端类:
DistributeServer server = new DistributeServer();
获取zookeeper连接:
自己创建连接方法:
private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}
让后server对象在main函数中调用
(2)注册服务器到zookeeper集群:
注册是需要注册到zookeeper集群的/servers路径下,需要指定参数进行创建
private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 需要创建有序的临时节点所以-e(暂时) -s(有序)System.out.println("服务器"+hostname+"已注册连接");}
(3)业务逻辑(睡眠):
private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
服务端代码如下:
package com.tangxiaocong.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
/*** @Date 2023/8/10 19:06* @Author */
public class DistributeServer {private static String connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";private static int sessionTimeout=2000;private ZooKeeper zk =null;private String parentNode = "/servers";public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//获取zk连接//创建DistributeServer server = new DistributeServer();server.getconnect();//注册服务器到zk集群//注册是需要在/servers节点下创建所开启的服务器的路径server.regestServer(args[0]);//业务逻辑(实际是延时让它睡觉---不然会注册完成就关闭)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 需要创建有序的临时节点所以-e(暂时) -s(有序)System.out.println("服务器"+hostname+"已注册连接");}private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}
}
客户端:
(1)获取zookeeper的连接:
先创建客户端对象,在进行构建获取zookeeper连接的方法,本方法对process方法进行了重写,填写了再发生上下线的运行逻辑
private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
(2)监听/servers下边的子节点的增减:
构建方法client.getServerList()来进行监听:
代码逻辑就是通过getChildren()方法获取指定目录下的所有子目录并开启监听
再进行遍历,把遍历结果封装到一个集合中,最后进行输出
private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//该方法会获取指定路径下的所有子节点//true 会走初始化中的watch 也可以自己创建watch//把所有的服务器都封装到一个集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上边已经便利到一个服务器对象,再进行添加list.add(new String(data));}System.out.println(list);}
(3)业务逻辑同服务端不在赘述。
客户端代码如下:
package com.tangxiaocong.case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/*** @Date 2023/8/10 21:27* @Author * 客户端的监听功能*/
public class DistributeClient {
private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTimeout=2000;private ZooKeeper zk=null;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//获取zk连接DistributeClient client = new DistributeClient();client.getConnect();//监听/servers下边的子节点的增减client.getServerList();//业务逻辑(睡眠)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//该方法会获取指定路径下的所有子节点//true 会走初始化中的watch 也可以自己创建watch//把所有的服务器都封装到一个集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上边已经便利到一个服务器对象,再进行添加list.add(new String(data));}System.out.println(list);}private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
}
案例二:ZooKeeper 分布式锁
分布式锁是什么?
日常使用计算机的时候,我们的电脑不会只开一个进程,但是当“进程1”在访问某些资源的时候,不能被其他进程所访问,它就会去获得锁,把她所访问的资源进行锁上,对该资源进行独占。"进程 1"用完该资源以后就将锁释放掉,让其 他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
锁的实现:
构造函数:
在该类中首先要实现构造方法,构造方法与类名相同,在该方法中需要获取连接,重写process方法,在该方法中实现释放CountDownLatch的类对象,有两种情况,正常连接释放一种,不是正常连接状态,则释放另一种。在构造方法中还要判断是否存在“/locks”路径,存在则正常退出,不存在则创建该路径。
加锁函数:
使用ZooKeeper对象进行创建节点(临时有序),让后获取“/locks”路径下的所有节点序号,对结果进行判断,如果返回的List集合只有一个节点,则直接返回,默认加锁,不用再做监听工作。如果不是只有一个节点,则对List集合进行排序,再获取他的节点名称,通过indexOf函数来获取该名称节点的下标。如果为-1,则数据异常,为0 则为最小节点,则直接退出,进行加锁不需要设置监听,结果为其他则需要设置监听,先设置监听字符串,当状态不发生改变会一致阻塞,只有上锁节点让位后会调用process方法进行释放。
解锁函数:
解锁就是直接删除节点即可
整体代码:
package com.tangxiaocong.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/*** @Date 2023/8/12 19:56* @Author */
public class DistributedLock {final private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";final private int sessionTimeout=2000;final private ZooKeeper zk;private String waitPath;private String currentModu;//为了程序的健壮性,创建该对象 等待操作final private CountDownLatch waitLach=new CountDownLatch(1);final private CountDownLatch countDownLatch=new CountDownLatch(1);public DistributedLock() throws IOException, InterruptedException, KeeperException {//获取连接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch 如果正常连接zk 可以释放if (watchedEvent.getState()==Event.KeeperState.SyncConnected){countDownLatch.countDown();}//检测到删除节点并且是前一个节点则释放waitlatchif (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){waitLach.countDown();}}});//等待是否正常连接 正常(已)连接会释放 否则阻塞countDownLatch.await();// 判断是否存在lock锁Stat stat = zk.exists("/locks", false);if (stat==null){//创建该节点String s = zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);}}//对zk加锁public void zkLock() {//创建临时的带序号的节点try {currentModu = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);List<String> children = zk.getChildren("/locks", false);//如果只有一个节点 则直接获取if(children.size()==1){return;}else {//排序Collections.sort(children);//直接从s后边开始 开始的下标就是length的长度String substring = currentModu.substring("/locks/".length());//通过substring来获取在List集合中的下标位置int index = children.indexOf(substring);if (index==-1){System.out.println("数据异常");}else if (index==0){return;}else {// 需要监听上一个节点waitPath="/locks/"+children.get(index-1);zk.getData(waitPath,true,new Stat());//等待监听waitLach.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}//判断创建的节点是否是最小序号的节点 如果是则获取锁 不是则监听他的前一个节点}//对zk解锁public void unzkLock(){
//删除节点try {//-1 是版本号zk.delete(this.currentModu,-1);} catch (InterruptedException | KeeperException e) {e.printStackTrace();}}
}
测试类代码 :
package com.tangxiaocong.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/*** @Date 2023/8/12 22:31* @Author 唐晓聪*/
public class DistributedLockTest
{public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//创建两个客户端对象final DistributedLock lock1 = new DistributedLock();final DistributedLock lock2 = new DistributedLock();new Thread(new Runnable() {@Overridepublic void run() {try { lock1.zkLock();System.out.println("线程1启动获得锁");Thread.sleep(5*1000);lock1.unzkLock();System.out.println("线程1释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zkLock();System.out.println("线程2启动获得锁");Thread.sleep(5*1000);lock2.unzkLock();System.out.println("线程2释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();}
}
Curator 框架实现分布式锁案例:
该案例是直接使用API进行实现分布式锁
实现步骤:
先创建分布式锁对象,new InterProcessMutex(),参数1为所要连接的客户端,参数2为监听路径
参数1传入的为getCuratorFramework()自定义函数,
该函数通过工厂类的方式进行建立连接,返回创建好的客户端,让后start启动客户端
创建完分布式锁对象后创建两个线程,在线程中进行获得锁,释放锁的操作。
代码如下:
package com.tangxiaocong.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/*** @Date 2023/8/13 20:07* @Author */
public class CuratorLockTest {public static void main(String[] args) {//创建分布式锁1//参数1 所连接的客户端 参数2 监听路径InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");//创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");//创建线程new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("thread 1 acquire lock");lock1.acquire();System.out.println("thread 1 again acquire lock");Thread.sleep(5*1000);lock1.release();System.out.println("thread 1 relax lock");lock1.release();System.out.println("thread 1 again relax lock");System.out.println();} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("thread 2 acquire lock");lock2.acquire();System.out.println("thread 2 again acquire lock");Thread.sleep(5*1000);lock2.release();System.out.println("thread 2 relax lock");lock2.release();System.out.println("thread 2 again relax lock");} catch (Exception e) {e.printStackTrace();}}}).start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);//通过工厂类的方式进行建立连接CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop102:2181,hadoop104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy)//连接失败后 间隔多少秒下次间隔.build();client.start();System.out.println("zookeeper success start !!!!!");return client;}
}
相关文章:

zookeeper案例
目录 案例一:服务器动态上下线 服务端: (1)先获取zookeeper连接 (2)注册服务器到zookeeper集群: (3)业务逻辑(睡眠): 服务端代码…...

项目中使用git vscode GitHubDesktopSetup-x64
一、使用git bash 1.使用git bash拉取gitee项目 1.在本地新建一个文件夹(这个文件夹是用来存放从gitee上拉下来的项目的) 2.在这个文件夹右键选择 git bash here 3.输入命令 git init (创建/初始化一个新的仓库) 4.输入命令 git remote add origin …...

【Linux操作系统】文件描述符fd
🔥🔥 欢迎来到小林的博客!! 🛰️博客主页:✈️林 子 🛰️博客专栏:✈️ Linux之路 🛰️社区 :✈️ 进步学堂 …...

【RocketMQ入门-安装部署与Java API测试】
【RocketMQ入门-安装部署与Java API测试】 一、环境说明二、安装部署三、Java API 编写Producer和Consumer进行测试四、小结 一、环境说明 虚拟机VWMare:安装centos7.6操作系统源码包:rocketmq-all-5.1.3-source-release.zip单master部署,在…...

SystemVerilog之覆盖率详解
文章目录 1.0 覆盖率前言1.1 覆盖率类型1.2 覆盖策略及覆盖组1.3 覆盖率数据采样1.3.1 bin的创建与使用1.3.2 条件覆盖率1.3.3 翻转覆盖率1.3.4 wildcard覆盖率1.3.5 忽略bin与非法bin 1.4 交叉覆盖率1.4.1 排除部分cross bin1.4.2 精细化交叉覆盖率1.4.3 单个实例的覆盖率1.4.…...
Qt Designer设计的界面如何显示、即运行显示窗口界面
首先利用Qt Designer设计.ui文件,然后采用Tools->External Tools->PyUIC转换成.py文件。这个.py文件是.ui文件编译而来的,将这种文件由.ui文件编译而来的.py文件称之为界面文件。由于界面文件每次编译时候都会初始化,所以需要新建一个.…...
vue3的setup的使用和原理解析
setup是Vue 3中引入的一个新的组件选项。它是一个在组件实例创建之前被调用的函数,用于设置组件的初始状态、计算属性、方法等。setup函数是Vue 3中函数式组件的核心部分,它提供了一种新的方式来编写组件逻辑。 使用setup函数有以下几个步骤:…...

Spring boot中的线程池-ThreadPoolTaskExecutor
一、jdk的阻塞队列: 二、Spring boot工程的有哪些阻塞队列呢? 1、默认注入的ThreadPoolTaskExecutor 视频解说: 线程池篇-springboot项目中的service层里简单注入ThreadPoolTaskExecutor并且使用_哔哩哔哩_bilibili 程序代码:…...

pgsql checkpoint机制(1)
检查点触发时机 检查点间隔时间由checkpoint_timeout设置pg_xlog中wall段文件总大小超过参数max_WAL_size的值postgresql服务器在smart或fast模式下关闭手动checkpoint 为什么需要检查点? 定期保持修改过的数据块作为实例恢复时起始位置(问题…...

微信小程序 map地图(轨迹)
allMarkers效果图 废话少说直接上马(最后是我遇到的问题) cover-view是气泡弹窗,可以自定义弹窗,要配合js:customCallout,如果是非自定义的话:callout(可以修改颜色、边框宽度、圆角…...
【钉钉接口】bpms_task_change、bpms_instance_change 的区别及举例
bpms_task_change:审批任务回调,是针对审批任务状态的推送。如审批人执行审批、审批人转交审批等针对具体某个审批节点的操作,属于 bpms_task_change 事件类型。bpms_instance_change:审批实例回调,是针对审批实例状态…...

vue左右div结构手动拉伸并且echarts图表根据拉伸宽高自适应
需求: 左右结构的div,可以根据数据抬起按下进行拉伸修改容器宽度的操作给左右结构某一图表设置拉伸自适应左右结构都设置个最小宽度,只能到一定区域内拉伸解决echarts的bug(重复加载chart实例):[ECharts] …...

开发工具Eclipse的使用
🥳🥳Welcome Huihuis Code World ! !🥳🥳 接下来看看由辉辉所写的关于Eclipse使用的相关操作吧 目录 🥳🥳Welcome Huihuis Code World ! !🥳🥳 一.Eclipse是什么 二.使用Eclipse的…...
DrawerLayout布局使用教程Android侧边栏导航完全指南:创建简单实用的导航抽屉
导航抽屉(侧边栏)在现代移动应用中扮演着关键角色,提供了流畅的用户导航体验。本文将带您从头开始,逐步创建一个基本的 Android 侧边栏导航示例,为您的应用增添更多交互魅力。 1. 创建新的 Android 项目 首先&#x…...

Dynamics 365 实体快速创建功能启用
这里我会先用例子讲快速创建,包含了字段创建等内容。希望直接了解配置过程的,可以根据目目录跳转查看。 1 例子 我们这里创建了两个实体,学生和选择的科目。它们的关系是一个学生可以选择多个科目,即学生和科目选择是一对多关系。所以我们在选择的科目中创建了一个学生的…...

Mybatis三剑客(一)在springboot中自动生成Mybatis【generator】
1、pom.xml中新增plugin <plugin><groupId>org.mybatis.generator</groupId><artifactId>mybatis-generator-maven-plugin</artifactId><version>1.3.7</version><configuration><overwrite>true</overwrite><…...
【LeetCode 热题 100】图论 专题(bfs,拓扑排序,Trie树 字典树)
from: https://leetcode.cn/studyplan/top-100-liked/ bfs 具有 边权为1 的最短路性质 拓扑排序,入度 Trie树, 高效存储 字符串【见鬼,不知道为什么写错,需要掌握熟练度】 文章目录 200. 岛屿数量【dfs / bfs】994. 腐…...

Jmeter压测实战:Jmeter二次开发之自定义函数
目录 1 前言 2 开发准备 3 自定义函数核心实现 3.1 新建项目 3.2 继承实现AbstractFunction类 3.3 最终项目结构 4 Jmeter加载扩展包 4.1 maven构建配置 4.2 项目打包 4.3 Jmeter加载扩展包 5 自定义函数调用调试 5.1 打开Jmeter函数助手,选择自定义函数…...
在python中使用nvidia的VPF库对RTSP流进行硬解码并使用opencv进行显示
解码并处理视频流的多线程应用 随着视频处理技术的不断发展,越来越多的应用需要对视频流进行解码和处理。在本文中,我们将介绍一个基于Python的多线程应用程序,该应用程序可以解码并处理多个RTSP视频流,同时利用GPU加速࿰…...
C++中using namespace std的作用记录
using namespace std;这句代码的作用是引入std命名空间,使得程序可以直接使用std命名空间下的标识符,而不需要加上std::前缀。 在C中,标识符被组织在不同的命名空间中,以避免命名冲突。最常见的命名空间是std,它包含了C标准库中的所有标识符,如cout、vector、string等。 默认…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
Robots.txt 文件
什么是robots.txt? robots.txt 是一个位于网站根目录下的文本文件(如:https://example.com/robots.txt),它用于指导网络爬虫(如搜索引擎的蜘蛛程序)如何抓取该网站的内容。这个文件遵循 Robots…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...

使用Spring AI和MCP协议构建图片搜索服务
目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式(本地调用) SSE模式(远程调用) 4. 注册工具提…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

在 Spring Boot 中使用 JSP
jsp? 好多年没用了。重新整一下 还费了点时间,记录一下。 项目结构: pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...