当前位置: 首页 > article >正文

zk基础—5.Curator的使用与剖析二

大纲

1.基于Curator进行基本的zk数据操作

2.基于Curator实现集群元数据管理

3.基于Curator实现HA主备自动切换

4.基于Curator实现Leader选举

5.基于Curator实现分布式Barrier

6.基于Curator实现分布式计数器

7.基于Curator实现zk的节点和子节点监听机制

8.基于Curator创建客户端实例的源码分析

9.Curator在启动时是如何跟zk建立连接的

10.基于Curator进行增删改查节点的源码分析

11.基于Curator的节点监听回调机制的实现源码

12.基于Curator的Leader选举机制的实现源码

11.Curator节点监听回调机制的实现源码

(1)PathCache子节点监听机制的实现源码

(2)NodeCache节点监听机制的实现源码

(3)getChildren()方法对子节点注册监听器和后台异步回调说明

(4)PathCache实现自动重复注册监听器的效果

(5)NodeCache实现节点变化事件监听的效果

(1)PathCache子节点监听机制的实现源码

PathChildrenCache会调用原生zk客户端对象的getChildren()方法,并往该方法传入一个监听器childrenWatcher。当子节点发生事件,就会通知childrenWatcher这个原生的Watcher,然后该Watcher便会调用注册到PathChildrenCache的Listener。注意:在传入的监听器Watcher中会实现重复注册Watcher。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//PathCache,监听/cluster下的子节点变化PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {...}});pathChildrenCache.start();}
}public class PathChildrenCache implements Closeable {private final WatcherRemoveCuratorFramework client;private final String path;private final boolean cacheData;private final boolean dataIsCompressed;private final CloseableExecutorService executorService;private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();...//初始化public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) {this.client = client.newWatcherRemoveCuratorFramework();this.path = PathUtils.validatePath(path);this.cacheData = cacheData;this.dataIsCompressed = dataIsCompressed;this.executorService = executorService;ensureContainers = new EnsureContainers(client, path);}//获取用来存放Listener的容器listenerspublic ListenerContainer<PathChildrenCacheListener> getListenable() {return listeners;}//启动对子节点的监听public void start() throws Exception {start(StartMode.NORMAL);}private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {//处理连接状态的变化handleStateChange(newState);}};public void start(StartMode mode) throws Exception {...//对建立的zk连接添加Listenerclient.getConnectionStateListenable().addListener(connectionStateListener);...//把PathChildrenCache自己传入RefreshOperation中//下面的代码其实就是调用PathChildrenCache的refresh()方法offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));...}//提交一个任务到线程池进行处理void offerOperation(final Operation operation) {if (operationsQuantizer.add(operation)) {submitToExecutor(new Runnable() {@Overridepublic void run() {...operationsQuantizer.remove(operation);//其实就是调用PathChildrenCache的refresh()方法operation.invoke();...}});}}private synchronized void submitToExecutor(final Runnable command) {if (state.get() == State.STARTED) {//提交一个任务到线程池进行处理executorService.submit(command);}}...
}class RefreshOperation implements Operation {private final PathChildrenCache cache;private final PathChildrenCache.RefreshMode mode;RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode) {this.cache = cache;this.mode = mode;}@Overridepublic void invoke() throws Exception {//调用PathChildrenCache的refresh方法,也就是发起对子节点的监听cache.refresh(mode);}...
}public class PathChildrenCache implements Closeable {...private volatile Watcher childrenWatcher = new Watcher() {//重复注册监听器//当子节点发生变化事件时,该方法就会被触发调用@Overridepublic void process(WatchedEvent event) {//下面的代码其实依然是调用PathChildrenCache的refresh()方法offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));}};void refresh(final RefreshMode mode) throws Exception {ensurePath();//创建一个回调,在下面执行client.getChildren()成功时会触发执行该回调final BackgroundCallback callback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {if (reRemoveWatchersOnBackgroundClosed()) {return;}if (event.getResultCode() == KeeperException.Code.OK.intValue()) {//处理子节点数据processChildren(event.getChildren(), mode);} else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {if (mode == RefreshMode.NO_NODE_EXCEPTION) {log.debug("KeeperException.NoNodeException received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing. Path: [{}]", path);ensureContainers.reset();} else {log.debug("KeeperException.NoNodeException received for getChildren(). Resetting ensureContainers. Path: [{}]", path);ensureContainers.reset();offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.NO_NODE_EXCEPTION));}}}};//下面的代码最后会调用到原生zk客户端的getChildren方法发起对子节点的监听//并且添加一个叫childrenWatcher的监听,一个叫callback的后台异步回调client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);}...
}//子节点发生变化事件时,最后都会触发执行EventOperation的invoke()方法
class EventOperation implements Operation {private final PathChildrenCache cache;private final PathChildrenCacheEvent event;EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) {this.cache = cache;this.event = event;}@Overridepublic void invoke() {//调用PathChildrenCache的Listenercache.callListeners(event);}...
}

(2)NodeCache节点监听机制的实现源码

NodeCache会调用原生zk客户端对象的exists()方法,并往该方法传入一个监听器watcher。当子节点发生事件,就会通知watcher这个原生的Watcher,然后该Watcher便会调用注册到NodeCache的Listener。注意:在传入的监听器Watcher中会实现重复注册Watcher。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//NodeCachefinal NodeCache nodeCache = new NodeCache(client, "/cluster");nodeCache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {Stat stat = client.checkExists().forPath("/cluster");if (stat == null) {} else {nodeCache.getCurrentData();}}});nodeCache.start();}
}public class NodeCache implements Closeable {private final WatcherRemoveCuratorFramework client;private final String path;private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();...private ConnectionStateListener connectionStateListener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {if (isConnected.compareAndSet(false, true)) {reset();}} else {isConnected.set(false);}}};//初始化一个Watcher,作为监听器添加到下面reset()方法执行的client.checkExists()方法中private Watcher watcher = new Watcher() {//重复注册监听器@Overridepublic void process(WatchedEvent event) {reset();}};//初始化一个回调,在下面reset()方法执行client.checkExists()成功时会触发执行该回调private final BackgroundCallback backgroundCallback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {processBackgroundResult(event);}};//初始化NodeCachepublic NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) {this.client = client.newWatcherRemoveCuratorFramework();this.path = PathUtils.validatePath(path);this.dataIsCompressed = dataIsCompressed;}//获取存放Listener的容器ListenerContainerpublic ListenerContainer<NodeCacheListener> getListenable() {Preconditions.checkState(state.get() != State.CLOSED, "Closed");return listeners;}//启动对节点的监听public void start() throws Exception {start(false);}public void start(boolean buildInitial) throws Exception {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");//对建立的zk连接添加Listenerclient.getConnectionStateListenable().addListener(connectionStateListener);if (buildInitial) {//调用原生的zk客户端的exists()方法,对节点进行监听client.checkExists().creatingParentContainersIfNeeded().forPath(path);internalRebuild();}reset();}private void reset() throws Exception {if ((state.get() == State.STARTED) && isConnected.get()) {//下面的代码最后会调用原生的zk客户端的exists()方法,对节点进行监听//并且添加一个叫watcher的监听,一个叫backgroundCallback的后台异步回调client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);}}private void processBackgroundResult(CuratorEvent event) throws Exception {switch (event.getType()) {case GET_DATA: {if (event.getResultCode() == KeeperException.Code.OK.intValue()) {ChildData childData = new ChildData(path, event.getStat(), event.getData());setNewData(childData);}break;}case EXISTS: {if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {setNewData(null);} else if (event.getResultCode() == KeeperException.Code.OK.intValue()) {if (dataIsCompressed) {client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);} else {client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);}}break;}}}...
}

(3)getChildren()方法对子节点注册监听器和后台异步回调说明

getChildren()方法注册的Watcher只有一次性,其注册的回调是一个异步回调。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/test", "10".getBytes());System.out.println("创建节点'/test");client.getChildren().usingWatcher(new CuratorWatcher() {public void process(WatchedEvent event) throws Exception {//只要通知过一次zk节点的变化,这里就不会再被通知了//也就是第一次的通知才有效,这里被执行过一次后,就不会再被执行System.out.println("收到一个zk的通知: " + event);}}).inBackground(new BackgroundCallback() {//后台回调通知,表示会让zk.getChildren()在后台异步执行//后台异步执行client.getChildren()方法完毕,便会回调这个方法进行通知public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("收到一个后台回调通知: " + event);}}).forPath("/test");}
}

(4)PathCache实现自动重复注册监听器的效果

每当节点发生变化时,就会触发childEvent()方法的调用。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/test", true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {//只要子节点发生变化,无论变化多少次,每次变化都会触发这里childEvent的调用public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {System.out.println("监听的子节点发生变化,收到了事件通知:" + pathChildrenCacheEvent);}});pathChildrenCache.start();System.out.println("完成子节点的监听和启动");}
}

(5)NodeCache实现节点变化事件监听的效果

每当节点发生变化时,就会触发nodeChanged()方法的调用。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");final NodeCache nodeCache = new NodeCache(client, "/test/child/id");nodeCache.getListenable().addListener(new NodeCacheListener() {//只要节点发生变化,无论变化多少次,每次变化都会触发这里nodeChanged的调用public void nodeChanged() throws Exception {Stat stat = client.checkExists().forPath("/test/child/id");if (stat != null) {byte[] dataBytes = client.getData().forPath("/test/child/id");System.out.println("节点数据发生了变化:" + new String(dataBytes));} else {System.out.println("节点被删除");}}});nodeCache.start();}
}

12.基于Curator的Leader选举机制的实现源码

(1)第一种Leader选举机制LeaderLatch的源码

(2)第二种Leader选举机制LeaderSelector的源码

利用Curator的CRUD+ 监听回调机制,就能满足大部分系统使用zk的场景了。需要注意的是:如果使用原生的zk去注册监听器来监听节点或者子节点,当节点或子节点发生了对应的事件,会通知客户端一次,但是下一次再有对应的事件就不会通知了。使用zk原生的API时,客户端需要每次收到事件通知后,重新注册监听器。然而Curator的PathCache + NodeCache,会自动重新注册监听器。

(1)第一种Leader选举机制LeaderLatch的源码

Curator客户端会通过创建临时顺序节点的方式来竞争成为Leader的,LeaderLatch这种Leader选举的实现方式与分布式锁的实现几乎一样。

每个Curator客户端创建完临时顺序节点后,就会对/leader/latch目录调用getChildren()方法来获取里面所有的子节点,调用getChildren()方法的结果会通过backgroundCallback回调进行通知,接着客户端便对获取到的子节点进行排序来判断自己是否是第一个子节点。

如果客户端发现自己是第一个子节点,那么就是Leader。如果客户端发现自己不是第一个子节点,就对上一个节点添加一个监听器。在添加监听器时,会使用getData()方法获取自己的上一个节点,getData()方法执行成功后会调用backgrondCallback回调。

当上一个节点对应的客户端释放了Leader角色,上一个节点就会消失,此时就会通知第二个节点对应的客户端,执行getData()方法添加的监听器。

所以如果getData()方法的监听器被触发了,即发现上一个节点不存在了,客户端会调用getChildren()方法重新获取子节点列表,判断是否是Leader。

注意:使用getData()代替exists(),可以避免不必要的Watcher造成的资源泄露。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.getConnectionStateListenable().addListener(new ConnectionStateListener() {public void stateChanged(CuratorFramework client, ConnectionState newState) {switch (newState) {case LOST://当Leader与zk断开时,需要暂停当前Leader的工作}}});client.start();System.out.println("已经启动Curator客户端,完成zk的连接");LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");leaderLatch.start();leaderLatch.await();//阻塞等待直到当前客户端成为LeaderBoolean hasLeaderShip = leaderLatch.hasLeadership();System.out.println("是否成为Leader: " + hasLeaderShip);}
}public class LeaderLatch implements Closeable {private final WatcherRemoveCuratorFramework client;private final ConnectionStateListener listener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {handleStateChange(newState);}};...//Add this instance to the leadership election and attempt to acquire leadership.public void start() throws Exception {...//对建立的zk连接添加Listenerclient.getConnectionStateListenable().addListener(listener);reset();...}@VisibleForTestingvoid reset() throws Exception {setLeadership(false);setNode(null);//callback作为成功创建临时顺序节点后的回调BackgroundCallback callback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {...if (event.getResultCode() == KeeperException.Code.OK.intValue()) {setNode(event.getName());if (state.get() == State.CLOSED) {setNode(null);} else {//成功创建临时顺序节点,需要通过getChildren()再去获取子节点列表getChildren();}} else {log.error("getChildren() failed. rc = " + event.getResultCode());}}};//创建临时顺序节点client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));}//获取子节点列表private void getChildren() throws Exception {//callback作为成功获取子节点列表后的回调BackgroundCallback callback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {if (event.getResultCode() == KeeperException.Code.OK.intValue()) {checkLeadership(event.getChildren());}}};client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));}//检查自己是否是第一个节点private void checkLeadership(List<String> children) throws Exception {if (debugCheckLeaderShipLatch != null) {debugCheckLeaderShipLatch.await();}final String localOurPath = ourPath.get();//对获取到的节点进行排序List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;if (ourIndex < 0) {log.error("Can't find our node. Resetting. Index: " + ourIndex);reset();} else if (ourIndex == 0) {//如果自己是第一个节点,则标记自己为LeadersetLeadership(true);} else {//如果自己不是第一个节点,则对前一个节点添加监听String watchPath = sortedChildren.get(ourIndex - 1);Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if ((state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null)) {//重新获取子节点列表getChildren();}}};BackgroundCallback callback = new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {reset();}}};//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//使用getData()代替exists(),可以避免不必要的Watcher造成的资源泄露client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));}}...//阻塞等待直到成为Leaderpublic void await() throws InterruptedException, EOFException {synchronized(this) {while ((state.get() == State.STARTED) && !hasLeadership.get()) {wait();//Objetc对象的wait()方法,阻塞等待}}if (state.get() != State.STARTED) {throw new EOFException();}}//设置当前客户端成为Leader,并进行notifyAll()通知之前阻塞的线程private synchronized void setLeadership(boolean newValue) {boolean oldValue = hasLeadership.getAndSet(newValue);if (oldValue && !newValue) { // Lost leadership, was true, now falselisteners.forEach(new Function<LeaderLatchListener, Void>() {@Overridepublic Void apply(LeaderLatchListener listener) {listener.notLeader();return null;}});} else if (!oldValue && newValue) { // Gained leadership, was false, now truelisteners.forEach(new Function<LeaderLatchListener, Void>() {@Overridepublic Void apply(LeaderLatchListener input) {input.isLeader();return null;}});}notifyAll();//唤醒之前执行了wait()方法的线程}
}

(2)第二种Leader选举机制LeaderSelector的源码

通过判断是否成功获取到分布式锁,来判断是否竞争成为Leader。正因为是通过持有分布式锁来成为Leader,所以LeaderSelector.takeLeadership()方法不能退出,否则就会释放锁。而一旦释放了锁,其他客户端就会竞争锁成功而成为新的Leader。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");LeaderSelector leaderSelector = new LeaderSelector(client,"/leader/election",new LeaderSelectorListener() {public void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("你已经成为了Leader......");//在这里干Leader所有的事情,此时方法不能退出Thread.sleep(Integer.MAX_VALUE);}public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {System.out.println("连接状态的变化,已经不是Leader......");if (connectionState.equals(ConnectionState.LOST)) {throw new CancelLeadershipException();}}});leaderSelector.start();//尝试和其他节点在节点"/leader/election"上进行竞争成为LeaderThread.sleep(Integer.MAX_VALUE);}
}public class LeaderSelector implements Closeable {private final CuratorFramework client;private final LeaderSelectorListener listener;private final CloseableExecutorService executorService;private final InterProcessMutex mutex;...public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener) {Preconditions.checkNotNull(client, "client cannot be null");PathUtils.validatePath(leaderPath);Preconditions.checkNotNull(listener, "listener cannot be null");this.client = client;this.listener = new WrappedListener(this, listener);hasLeadership = false;this.executorService = executorService;//初始化一个分布式锁mutex = new InterProcessMutex(client, leaderPath) {@Overrideprotected byte[] getLockNodeBytes() {return (id.length() > 0) ? getIdBytes(id) : null;}};}public void start() {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");Preconditions.checkState(!executorService.isShutdown(), "Already started");Preconditions.checkState(!hasLeadership, "Already has leadership");client.getConnectionStateListenable().addListener(listener);requeue();}public boolean requeue() {Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");return internalRequeue();}private synchronized boolean internalRequeue() {if (!isQueued && (state.get() == State.STARTED)) {isQueued = true;//将选举的工作作为一个任务交给线程池执行Future<Void> task = executorService.submit(new Callable<Void>() {@Overridepublic Void call() throws Exception {...doWorkLoop();...return null;}});ourTask.set(task);return true;}return false;}private void doWorkLoop() throws Exception {...doWork();...}@VisibleForTestingvoid doWork() throws Exception {hasLeadership = false;try {//尝试获取一把分布式锁,获取失败会进行阻塞mutex.acquire();//执行到这一行代码,说明获取分布式锁成功hasLeadership = true;try {if (debugLeadershipLatch != null) {debugLeadershipLatch.countDown();}if (debugLeadershipWaitLatch != null) {debugLeadershipWaitLatch.await();}//回调用户重写的takeLeadership()方法listener.takeLeadership(client);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw e;} catch (Throwable e) {ThreadUtils.checkInterrupted(e);} finally {clearIsQueued();}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw e;} finally {if (hasLeadership) {hasLeadership = false;boolean wasInterrupted = Thread.interrupted();  // clear any interrupted tatus so that mutex.release() works immediatelytry {//释放锁mutex.release();} catch (Exception e) {if (failedMutexReleaseCount != null) {failedMutexReleaseCount.incrementAndGet();}ThreadUtils.checkInterrupted(e);log.error("The leader threw an exception", e);} finally {if (wasInterrupted) {Thread.currentThread().interrupt();}}}}}...
}

相关文章:

zk基础—5.Curator的使用与剖析二

大纲 1.基于Curator进行基本的zk数据操作 2.基于Curator实现集群元数据管理 3.基于Curator实现HA主备自动切换 4.基于Curator实现Leader选举 5.基于Curator实现分布式Barrier 6.基于Curator实现分布式计数器 7.基于Curator实现zk的节点和子节点监听机制 8.基于Curator创…...

前端布局难题:父元素padding导致子元素无法全屏?3种解决方案

大家好&#xff0c;我是一诺。今天要跟大家分享一个我在实际项目中经常用到的CSS技巧——如何让子元素突破父元素的padding限制&#xff0c;实现真正的全屏宽度效果。 为什么会有这个需求&#xff1f; 记得我刚入行的时候&#xff0c;接到一个需求&#xff1a;要在内容区插入…...

Android使用OpenGL和MediaCodec录制

目录 一,什么是opengl 二,什么是Android OpenGL ES 三, OpenGL 绘制流程 四, OpenGL坐标系 五, OpenGL 着色器 六, GLSL编程语言 七,使用MediaCodec录制在Opengl中渲染架构 八,代码实现 8.1 自定义渲染view继承GLSurfaceView 8.2 自定义渲染器TigerRender 8.3 创建编…...

《如何避免虚无》速读笔记

文章目录 书籍信息概览躺派&#xff08;出世&#xff09;卷派&#xff08;入世&#xff09;虚无篇&#xff1a;直面虚无自我篇&#xff1a;认识自我孤独篇&#xff1a;应对孤独幸福篇&#xff1a;追寻幸福超越篇&#xff1a;超越自我 书籍信息 书名&#xff1a;《如何避免虚无…...

哈尔滨工业大学:大模型时代的具身智能

大家好&#xff0c;我是樱木。 机器人在工业领域&#xff0c;已经逐渐成熟。具身容易&#xff0c;智能难。 机器人-》智能机器人&#xff0c;需要自主能力&#xff0c;加上通用能力。 智能机器人-》人类&#xff0c;这个阶段就太有想象空间了。而最受关注的-类人机器人。 如何…...

19.go日志包log

核心功能与接口 基础日志输出 Print 系列&#xff1a;支持 Print()、Println()、Printf()&#xff0c;输出日志不中断程序。 log.Print("常规日志") // 输出: 2025/03/18 14:47:13 常规日志 log.Printf("格式化: %s", "数据") Fatal…...

理解OSPF 特殊区域NSSA和各类LSA特点

本文基于上文 理解OSPF Stub区域和各类LSA特点 在理解了Stub区域之后&#xff0c;我们再来理解一下NSSA区域&#xff0c;NSSA区域用于需要引入少量外部路由&#xff0c;同时又需要保持Stub区域特性的情况 一、 网络总拓扑图 我们在R1上配置黑洞路由&#xff0c;来模拟NSSA区域…...

如何通过优化HMI设计大幅提升产品竞争力?

一、HMI设计的重要性与竞争力提升 HMI&#xff08;人机交互界面&#xff09;设计在现代产品开发中扮演着至关重要的角色。良好的HMI设计不仅能够提升用户体验&#xff0c;还能显著增强产品的竞争力。在功能趋同的市场环境中&#xff0c;用户体验成为产品竞争的关键。HMI设计通…...

Linux信号——信号的处理(3)

信号是什么时候被处理&#xff1f; 进程从内核态&#xff0c;切换到用户态的时候&#xff0c;信号会被检测处理。 内核态&#xff1a;操作系统的状态&#xff0c;权限级别高 用户态&#xff1a;你自己的状态 内核态和用户态 进程地址空间第三次 所谓的系统调用本质其实是一堆…...

Pod的调度

在默认情况下&#xff0c;一个Pod在哪个Node节点上运行&#xff0c;是由Scheduler组件采用相应的算法计算出来的&#xff0c;这个过程是不受人工控制的。但是在实际使用中&#xff0c;这并不满足的需求&#xff0c;因为很多情况下&#xff0c;我们想控制某些Pod到达某些节点上&…...

LabVIEW面向对象编程设计方法

一、概述 面向对象编程&#xff08;OOP&#xff09;在软件开发中占据重要地位&#xff0c;尤其是在大规模软件项目中。它与小型程序开发思路不同&#xff0c;更注重未来功能的升级与扩展。在设计阶段&#xff0c;需思考如何构建既灵活又稳定的系统&#xff0c;这涉及众多设计方…...

Spring常见问题复习

############Spring############# Bean的生命周期是什么&#xff1f; BeanFactory和FactoryBean的区别&#xff1f; ApplicationContext和BeanFactory的区别&#xff1f; BeanFactoryAware注解&#xff0c;还有什么其它的Aware注解 BeanFactoryAware方法和Bean注解的方法执行顺…...

JJJ:generic netlink例程分析

接嵌入式毕设、课设辅导、技术咨询&#xff0c;欢迎私信 完整代码&#xff1a;github代码仓链接 若想要和指定的generic netlink family通信&#xff0c;如: 994 static struct genl_family genl_ctrl __ro_after_init { // generic netlink子协议995 .module THIS_MODU…...

Dify票据识别遇到的分支判断不准确问题

已测试这篇文章中 https://zhuanlan.zhihu.com/p/5465385787 使用多分支条件判断使用不同的大模型识别图片内容 发现了细节问题。在使用时若不注意&#xff0c;分支会出现走向不准的问题。 需要关注部分 下方红框处。1&#xff0c;2后不能跟点。否则会出问。除此之外&#xff0…...

《全栈+双客户端Turnkey方案》架构设计图

今天分享一些全栈双客户端Turnkey方案的架构与结构图。 1&#xff1a;三种分布式部署方案:网关方案&#xff0c;超级服务器单服方案&#xff0c;直连逻辑服方案 2: 单服多线程核心架构: 系统服务逻辑服服务 3: 系统服务的多线程池调度设计 4:LogicServer Update与ECS架构&…...

某碰瓷国赛美赛,号称第三赛事的数模竞赛

首先我非常不能理解的就是怎么好意思自称第三赛事的呢&#xff1f;下面我们进行一个简单讨论&#xff0c;当然这里不对国赛和美赛进行讨论。首先我们来明确一点&#xff0c;比赛的含金量由什么来定&#xff1f;这个可能大家的评价指标可能不唯一&#xff0c;我通过DeepSeek选取…...

【代码模板】如何用FILE操作符打开文件?fopen、fclose

#include "stdio.h" #include "unistd.h"int main(int argc, char *argv[]) {FILE *fp fopen("1.log", "wb");if (!fp) {perror("Failed open 1.log");return -1;}fclose(fp); }关于权限部分参考兄弟篇【代码模板】C语言中…...

【大模型深度学习】如何估算大模型需要的显存

一、模型参数量 参数量的单位 参数量指的是模型中所有权重和偏置的数量总和。在大模型中&#xff0c;参数量的单位通常以“百万”&#xff08;M&#xff09;或“亿”&#xff08;B&#xff0c;也常说十亿&#xff09;来表示。 百万&#xff08;M&#xff09;&#xff1a;表示…...

Mysql 数据库编程技术01

一、数据库基础 1.1 认识数据库 为什么学习数据库 瞬时数据&#xff1a;比如内存中的数据&#xff0c;是不能永久保存的。持久化数据&#xff1a;比如持久化至数据库中或者文档中&#xff0c;能够长久保存。 数据库是“按照数据结构来组织、存储和管理数据的仓库”。是一个长…...

Class<?> 和Class<T >有什么区别

Class<?> 和 Class<T> 在 Java 中都表示 Class 类型的对象&#xff0c;但它们的使用方式和作用略有不同。让我们详细分析它们的区别&#xff1a; 1. Class<?>&#xff08;通配符 Class 类型&#xff09; ? 代表一个未知类型&#xff08;Wildcard&#xf…...

[自制调试工具]利用模板函数打造通用调试工具

引言 上一篇文章 我们介绍了调式类工具,这篇文章我们补充一下 点击这里查看 在软件开发的过程中&#xff0c;调试是必不可少的环节。为了能更高效地定位和解决问题&#xff0c;我们常常需要在代码中插入一些调试信息&#xff0c;来输出变量的值、函数的执行状态等。传统的调试…...

Python地理数据处理 28:基于Arcpy批量操作实现——按属性提取和分区统计

Arcpy批量操作 1. 批量按属性提取2. 批量分区统计&#xff08;最大值、最小值和像元个数等&#xff09; 1. 批量按属性提取 # -*- coding: cp936 -*- """ PROJECT_NAME: ArcPy FILE_NAME: batch_attribute_extract AUTHOR: JacksonZhao DATE: 2025/04/05 &qu…...

Mysql慢查询设置 和 建立索引

1 .mysql慢查询的设置 slow_query_log ON //或 slow_query_log_file /usr/local/mysql/data/slow.log long_query_time 2 修改后重启动mysql 1.1 查看设置后的参数 mysql> show variables like slow_query%; --------------------------------------------------…...

【Android】界面布局-相对布局RelativeLayout-例子

题目 完成下面相对布局&#xff0c;要求&#xff1a; 中间的button在整个屏幕的中央&#xff0c;其他的以它为基准排列。Hints&#xff1a;利用layout_toEndof,_toRightof,_toLeftof,_toStartof完成。 结果演示 代码实现 <?xml version"1.0" encoding"u…...

Spring Boot 中使用 Redis:从入门到实战

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…...

【ROS】 CMakeLists 文件详解

【ROS】 CMakeLists文件详解 前言标准的CMAKELIST.TXT文件的组成部分CMake 版本要求和项目名称指定编译器和设置构建规则查找 ROS 依赖消息和服务文件catkin_package设置头文件目录路径添加可执行文件的构建规则设置编译依赖关系&#xff08;构建顺序&#xff09;设置目标文件的…...

【每日算法】Day 17-1:位图(Bitmap)——十亿级数据去重与快速检索的终极方案(C++实现)

解锁海量数据处理的极致空间效率&#xff01;今日深入解析位图的核心原理与实战应用&#xff0c;从基础操作到分块优化&#xff0c;彻底掌握仅用1bit存储一个数据的压缩艺术。 一、位图核心思想 位图&#xff08;Bitmap&#xff09; 是一种通过比特位表示数据存在性的数据结构…...

7-1 素数求和(线性筛实现)

7-1 素数求和。 分数 10 中等 全屏浏览 切换布局 作者 魏英 单位 浙江科技大学 输入两个正整数m和n&#xff08;1<m<n<500&#xff09;统计并输出m和n之间的素数个数以及这些素数的和。 输入格式: 输入两个正整数m和n&#xff08;1<m<n<500&#xff0…...

NLP简介及其发展历史

自然语言处理&#xff08;Natural Language Processing&#xff0c;简称NLP&#xff09;是人工智能和计算机科学领域中的一个重要分支&#xff0c;致力于实现人与计算机之间自然、高效的语言交流。本文将介绍NLP的基本概念以及其发展历史。 一、什么是自然语言处理&#xff1f…...

ZKmall开源商城多云高可用架构方案:AWS/Azure/阿里云全栈实践

随着企业数字化转型的加速&#xff0c;云计算服务已成为IT战略中的核心部分。ZKmall开源商城作为一款高性能的开源商城系统&#xff0c;其在多云环境下的高可用架构方案备受关注。下面将结合AWS、Azure和阿里云三大主流云平台&#xff0c;探讨ZKmall的多云高可用架构全栈实践。…...