zookeeper watch
目录
- 回顾回调&观察者模式&发布订阅模式
- Zookeeper 客户端/ 服务端 watch
- getChildren 为例
- 最后归纳

回顾回调&观察者模式&发布订阅模式
- 回调的思想
- 类A的a()方法调用类B的b()方法
- 类B的b()方法执行完毕主动调用类A的callback()方法
回调分为同步回调和异步回调, 假如以买彩票的场景来模拟, 我买彩票, 调用彩票网,给我返回的结果确定是否中奖,同步回调就是,我买了彩票之后, 需要等待彩票网给我返回的结果, 这个时候我不能做其他事情, 我必须等待这个结果, 这就叫同步回调, 同步, 就意味着等待, 我不能去做其他事情, 必须等待, 异步回调就是, 我买了彩票之后, 可以去做其他事情, 然后当彩票网有了结果和消息, 再给我返回消息。
- 观察者模式

- 发布订阅,对比 观察者模式

Zookeeper 客户端/ 服务端 watch
- 客户端维持的 socket 连接 ClientCnxn
/*** This class manages the socket i/o for the client. ClientCnxn maintains a list* of available servers to connect to and "transparently" switches servers it is* connected to as needed.**/
public class ClientCnxn {
/*** Manage watchers & handle events generated by the ClientCnxn object.** We are implementing this as a nested class of ZooKeeper so that* the public methods will not be exposed as part of the ZooKeeper client* API.*/static class ZKWatchManager implements ClientWatchManager {
- 服务端 DataTree
/*** This class maintains the tree data structure. It doesn't have any networking* or client connection code in it so that it can be tested in a stand alone* way.* <p>* The tree maintains two parallel data structures: a hashtable that maps from* full paths to DataNodes and a tree of DataNodes. All accesses to a path is* through the hashtable. The tree is traversed only when serializing to disk.*/
public class DataTree {
getChildren 为例
/*** Return the list of the children of the node of the given path.* <p>* If the watch is non-null and the call is successful (no exception is thrown),* a watch will be left on the node with the given path. The watch willbe* triggered by a successful operation that deletes the node of the given* path or creates/delete a child under the node.* <p>* The list of children returned is not sorted and no guarantee is provided* as to its natural or lexical order.* <p>* A KeeperException with error code KeeperException.NoNode will be thrown* if no node with the given path exists.** @param path* @param watcher explicit watcher* @return an unordered array of children of the node with the given path* @throws InterruptedException If the server transaction is interrupted.* @throws KeeperException If the server signals an error with a non-zero error code.* @throws IllegalArgumentException if an invalid path is specified*/public List<String> getChildren(final String path, Watcher watcher)throws KeeperException, InterruptedException{final String clientPath = path;PathUtils.validatePath(clientPath);// the watch contains the un-chroot pathWatchRegistration wcb = null;if (watcher != null) {wcb = new ChildWatchRegistration(watcher, clientPath);}final String serverPath = prependChroot(clientPath);RequestHeader h = new RequestHeader();h.setType(ZooDefs.OpCode.getChildren);GetChildrenRequest request = new GetChildrenRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetChildrenResponse response = new GetChildrenResponse();ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);if (r.getErr() != 0) {throw KeeperException.create(KeeperException.Code.get(r.getErr()),clientPath);}return response.getChildren();}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 发送请求给服务端
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {ReplyHeader r = new ReplyHeader();// 客户端与服务端的网络传输ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration);synchronized(packet) {while(!packet.finished) {packet.wait();}return r;}
}ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {ClientCnxn.Packet packet = null;LinkedList var11 = this.outgoingQueue;synchronized(this.outgoingQueue) {// 传输的对象都包装成Packet对象packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;if (this.state.isAlive() && !this.closing) {if (h.getType() == -11) {this.closing = true;}// 放入发送队列中,等待发送this.outgoingQueue.add(packet);} else {this.conLossPacket(packet);}}this.sendThread.getClientCnxnSocket().wakeupCnxn();return packet;
}
outgoingQueue的处理


服务端org.apache.zookeeper.server.FinalRequestProcessor#processRequest处理
case OpCode.getChildren: {lastOp = "GETC";GetChildrenRequest getChildrenRequest = new GetChildrenRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getChildrenRequest);DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),ZooDefs.Perms.READ,request.authInfo);// 返回children,// 这里根据客户端设置的是否有watch变量来传入watcher对象// 如果true则将当前的ServerCnxn传入(ServerCnxn代表客户端和服务端的连接) List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);rsp = new GetChildrenResponse(children);break;}
将数据节点路径和ServerCnxn对象存储在WatcherManager的watchTable和watch2Paths中
public List<String> getChildren(String path, Stat stat, Watcher watcher)throws KeeperException.NoNodeException {DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {if (stat != null) {n.copyStat(stat);}List<String> children=new ArrayList<String>(n.getChildren());if (watcher != null) {childWatches.addWatch(path, watcher);}return children;}}
- 当服务端处理完毕之后,客户端的SendThread线程负责接收服务端的响应,finishPacket方法会从packet中取出WatchRegistration并注册到ZKWatchManager中
/*** This class services the outgoing request queue and generates the heart* beats. It also spawns the ReadThread.*/class SendThread extends ZooKeeperThread {private long lastPingSentNs;private final ClientCnxnSocket clientCnxnSocket;private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true;void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); }if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}// If SASL authentication is currently in progress, construct and// send a response packet immediately, rather than queuing a// response as with other packets.if (tunnelAuthInProgress()) {GetSASLRequest request = new GetSASLRequest();request.deserialize(bbia,"token");zooKeeperSaslClient.respondToServer(request.getToken(),ClientCnxn.this);return;}Packet packet;synchronized (pendingQueue) {if (pendingQueue.size() == 0) {throw new IOException("Nothing in the queue, but got "+ replyHdr.getXid());}packet = pendingQueue.remove();}/** Since requests are processed in order, we better get a response* to the first request!*/try {if (packet.requestHeader.getXid() != replyHdr.getXid()) {packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("Xid out of order. Got Xid "+ replyHdr.getXid() + " with err " ++ replyHdr.getErr() +" expected Xid "+ packet.requestHeader.getXid()+ " for a packet with details: "+ packet );}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {lastZxid = replyHdr.getZxid();}if (packet.response != null && replyHdr.getErr() == 0) {packet.response.deserialize(bbia, "response");}if (LOG.isDebugEnabled()) {LOG.debug("Reading reply sessionid:0x"+ Long.toHexString(sessionId) + ", packet:: " + packet);}} finally {finishPacket(packet);}}private void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}// Add all the removed watch events to the event queue, so that the// clients will be notified with 'Data/Child WatchRemoved' event type.if (p.watchDeregistration != null) {Map<EventType, Set<Watcher>> materializedWatchers = null;try {materializedWatchers = p.watchDeregistration.unregister(err);for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {Set<Watcher> watchers = entry.getValue();if (watchers.size() > 0) {queueEvent(p.watchDeregistration.getClientPath(), err,watchers, entry.getKey());// ignore connectionloss when removing from local// sessionp.replyHeader.setErr(Code.OK.intValue());}}} catch (KeeperException.NoWatcherException nwe) {LOG.error("Failed to find watcher!", nwe);p.replyHeader.setErr(nwe.code().intValue());} catch (KeeperException ke) {LOG.error("Exception when removing watcher", ke);p.replyHeader.setErr(ke.code().intValue());}}if (p.cb == null) {synchronized (p) {p.finished = true;p.notifyAll();}} else {p.finished = true;eventThread.queuePacket(p);}}
触发watcher org.apache.zookeeper.server.WatchManager#triggerWatch
Set<Watcher> triggerWatch(String path, EventType type) {return triggerWatch(path, type, null);}Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;// 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的synchronized (this) {watchers = watchTable.remove(path);if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"No watchers for " + path);}return null;}for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}// 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#processw.process(e);}return watchers;}
最后归纳
流程
- 客户端把注册的Watcher传到服务端,处理请求加入处理队列
- 服务端从处理队列取出事件,并处理请求返回给客户端
- 回调Watcher处理在客户端处理,并会被删除

相关文章:
zookeeper watch
目录 回顾回调&观察者模式&发布订阅模式Zookeeper 客户端/ 服务端 watchgetChildren 为例最后归纳 回顾回调&观察者模式&发布订阅模式 回调的思想 类A的a()方法调用类B的b()方法类B的b()方法执行完毕主动调用类A的callback()方法 回调分为同步回调和异步回调…...
vue3.x 的shallowReactive 与 shallowRef 详细解读
在 Vue 3.x 中,shallowReactive 和 shallowRef 是两个用于创建浅层响应式数据的 API。它们与 reactive 和 ref 类似,但在处理嵌套对象时的行为有所不同。以下是它们的详细解读和示例。 1. shallowReactive 作用 shallowReactive 创建一个浅层响应式对…...
鸿蒙NEXT开发-界面渲染(条件和循环)
注意:博主有个鸿蒙专栏,里面从上到下有关于鸿蒙next的教学文档,大家感兴趣可以学习下 如果大家觉得博主文章写的好的话,可以点下关注,博主会一直更新鸿蒙next相关知识 目录 1. 渲染-条件渲染 1.1 基本介绍 1.2 使…...
python电影数据分析及可视化系统建设
博主介绍:✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…...
在本地校验密码或弱口令 (windows)
# 0x00 背景 需求是验证服务器的弱口令,如果通过网络侧校验可能会造成账户锁定风险。在本地校验不会有锁定风险或频率限制。 # 0x01 实践 ## 1 使用 net use 命令 可以通过命令行使用 net use 命令来验证本地账户的密码。打开命令提示符(CMD࿰…...
pytest测试专题 - 1.3 测试用例发现规则
<< 返回目录 1 pytest测试专题 - 1.3 测试用例发现规则 执行pytest命令时,可以不输入参数,或者只输入文件名或者目录名,pytest会自己扫描测试用例。那pytest基于什么规则找到用例呢? 文件名:满足文件名称为tes…...
零基础学习人工智能
零基础学习人工智能是一个既充满挑战又极具潜力的过程。以下是一份详细的学习指南,旨在帮助零基础的学习者有效地踏入人工智能领域。 一、理解基本概念 在学习人工智能之前,首先要对其基本概念有一个清晰的认识。人工智能(AI)是…...
LeetCode热题100- 缺失的第一个正数【JavaScript讲解】
题目: 解题一: 如果不考虑时间复杂度和空间复杂度的话,我们最先想到的办法是先将该数组进行排序和去重,将最初的res结果值设置为1;将然后进行遍历,如果第一项不为1,则返回1,否则根…...
JAVA泛型介绍与举例
Java中,泛型用于编译阶段限制集合中元素的类型,或者限制类中某个属性的类型,编译过程中发生类型擦除,最终还是Object类型。 1. 集合中的泛型 集合默认可以存储任何类型的元素,即Object类型,当使用一个集合…...
【ISO 14229-1:2023 UDS诊断(会话控制0x10服务)测试用例CAPL代码全解析③】
ISO 14229-1:2023 UDS诊断【会话控制0x10服务】_TestCase03 作者:车端域控测试工程师 更新日期:2025年02月15日 关键词:UDS诊断、0x10服务、诊断会话控制、ECU测试、ISO 14229-1:2023 TC10-003测试用例 用例ID测试场景验证要点参考条款预期…...
Vivado生成edif网表及其使用
介绍如何在Vivado中将模块设为顶层,并生成相应的网表文件(Verilog文件和edif文件),该过程适用于需要将一个模块作为顶层设计进行综合,并生成用于其他工程中的网表文件的情况。 例如要将fpga_top模块制作成网表给其它工…...
Win10环境借助DockerDesktop部署大数据时序数据库Apache Druid
Win10环境借助DockerDesktop部署最新版大数据时序数据库Apache Druid32.0.0 前言 大数据分析中,有一种常见的场景,那就是时序数据,简言之,数据一旦产生绝对不会修改,随着时间流逝,每个时间点都会有个新的…...
mac 意外退出移动硬盘后再次插入移动硬盘不显示怎么办
第一步:sudo ps aux | grep fsck 打开mac控制台输入如下指令,我们看到会出现两个进程,看进程是root的这个 sudo ps aux|grep fsck 第二步:杀死进程 在第一步基础上我们知道不显示u盘的进程是:62319,我们…...
力扣动态规划-32【算法学习day.126】
前言 ###我做这类文章一个重要的目的还是记录自己的学习过程,我的解析也不会做的非常详细,只会提供思路和一些关键点,力扣上的大佬们的题解质量是非常非常高滴!!! 习题 1.完全平方数 题目链接:279. 完全…...
【算法进阶详解 第一节】树状数组
【算法进阶详解 第一节】树状数组 前言树状数组基础树状数组原理树状数组能够解决的问题 树状数组提高树状数组区间加,区间和操作二维树状数组 树状数组应用树状数组区间数颜色树状数组二维偏序 前言 树状数组在算法竞赛中十分常见,其能解决二维数点&am…...
【苍穹外卖】学习
软件开发整体介绍 作为一名软件开发工程师,我们需要了解在软件开发过程中的开发流程, 以及软件开发过程中涉及到的岗位角色,角色的分工、职责, 并了解软件开发中涉及到的三种软件环境。那么这一小节,我们将从 软件开发流程、角色…...
Python常见面试题的详解8
1. 变量作用域和查找规则(LEGB) 作用域层级: Local:函数内部作用域 Enclosing:闭包函数外层作用域 Global:模块全局作用域 Built-in:内置命名空间 查找顺序:L → E → G → B关…...
Deepseek R1模型本地化部署与API实战指南:释放企业级AI生产力
摘要 本文深入解析Deepseek R1开源大模型的本地化部署流程与API集成方案,涵盖从硬件选型、Docker环境搭建到模型微调及RESTful接口封装的完整企业级解决方案。通过电商评论分析和智能客服搭建等案例,展示如何将前沿AI技术转化为实际生产力。教程支持Lin…...
node.js + html调用ChatGPTApi实现Ai网站demo(带源码)
文章目录 前言一、demo演示二、node.js 使用步骤1.引入库2.引入包 前端HTML调用接口和UI所有文件总结 前言 关注博主,学习每天一个小demo 今天是Ai对话网站 又到了每天一个小demo的时候咯,前面我写了多人实时对话demo、和视频转换demo,今天…...
sql语言语法的学习
sql通用语法 sql分类 DDL(操作数据库和表) 操作数据库 操作表_查询 操作表_创建 举例: 操作表_删除 操作表_修改 DML(增删改表中数据) DML添加数据 DML删除数据 DML修改数据 DQL 单表查询 基础查询 条件查询 案例演示: 排序查询 聚合函数 分组查询…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...
华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
