zookeeper watch
目录
- 回顾回调&观察者模式&发布订阅模式
- Zookeeper 客户端/ 服务端 watch
- getChildren 为例
- 最后归纳

回顾回调&观察者模式&发布订阅模式
- 回调的思想
- 类A的a()方法调用类B的b()方法
- 类B的b()方法执行完毕主动调用类A的callback()方法
回调分为同步回调和异步回调, 假如以买彩票的场景来模拟, 我买彩票, 调用彩票网,给我返回的结果确定是否中奖,同步回调就是,我买了彩票之后, 需要等待彩票网给我返回的结果, 这个时候我不能做其他事情, 我必须等待这个结果, 这就叫同步回调, 同步, 就意味着等待, 我不能去做其他事情, 必须等待, 异步回调就是, 我买了彩票之后, 可以去做其他事情, 然后当彩票网有了结果和消息, 再给我返回消息。
- 观察者模式

- 发布订阅,对比 观察者模式

Zookeeper 客户端/ 服务端 watch
- 客户端维持的 socket 连接 ClientCnxn
/*** This class manages the socket i/o for the client. ClientCnxn maintains a list* of available servers to connect to and "transparently" switches servers it is* connected to as needed.**/
public class ClientCnxn {
/*** Manage watchers & handle events generated by the ClientCnxn object.** We are implementing this as a nested class of ZooKeeper so that* the public methods will not be exposed as part of the ZooKeeper client* API.*/static class ZKWatchManager implements ClientWatchManager {
- 服务端 DataTree
/*** This class maintains the tree data structure. It doesn't have any networking* or client connection code in it so that it can be tested in a stand alone* way.* <p>* The tree maintains two parallel data structures: a hashtable that maps from* full paths to DataNodes and a tree of DataNodes. All accesses to a path is* through the hashtable. The tree is traversed only when serializing to disk.*/
public class DataTree {
getChildren 为例
/*** Return the list of the children of the node of the given path.* <p>* If the watch is non-null and the call is successful (no exception is thrown),* a watch will be left on the node with the given path. The watch willbe* triggered by a successful operation that deletes the node of the given* path or creates/delete a child under the node.* <p>* The list of children returned is not sorted and no guarantee is provided* as to its natural or lexical order.* <p>* A KeeperException with error code KeeperException.NoNode will be thrown* if no node with the given path exists.** @param path* @param watcher explicit watcher* @return an unordered array of children of the node with the given path* @throws InterruptedException If the server transaction is interrupted.* @throws KeeperException If the server signals an error with a non-zero error code.* @throws IllegalArgumentException if an invalid path is specified*/public List<String> getChildren(final String path, Watcher watcher)throws KeeperException, InterruptedException{final String clientPath = path;PathUtils.validatePath(clientPath);// the watch contains the un-chroot pathWatchRegistration wcb = null;if (watcher != null) {wcb = new ChildWatchRegistration(watcher, clientPath);}final String serverPath = prependChroot(clientPath);RequestHeader h = new RequestHeader();h.setType(ZooDefs.OpCode.getChildren);GetChildrenRequest request = new GetChildrenRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetChildrenResponse response = new GetChildrenResponse();ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);if (r.getErr() != 0) {throw KeeperException.create(KeeperException.Code.get(r.getErr()),clientPath);}return response.getChildren();}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 发送请求给服务端
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {ReplyHeader r = new ReplyHeader();// 客户端与服务端的网络传输ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration);synchronized(packet) {while(!packet.finished) {packet.wait();}return r;}
}ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {ClientCnxn.Packet packet = null;LinkedList var11 = this.outgoingQueue;synchronized(this.outgoingQueue) {// 传输的对象都包装成Packet对象packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;if (this.state.isAlive() && !this.closing) {if (h.getType() == -11) {this.closing = true;}// 放入发送队列中,等待发送this.outgoingQueue.add(packet);} else {this.conLossPacket(packet);}}this.sendThread.getClientCnxnSocket().wakeupCnxn();return packet;
}
outgoingQueue的处理


服务端org.apache.zookeeper.server.FinalRequestProcessor#processRequest处理
case OpCode.getChildren: {lastOp = "GETC";GetChildrenRequest getChildrenRequest = new GetChildrenRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getChildrenRequest);DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),ZooDefs.Perms.READ,request.authInfo);// 返回children,// 这里根据客户端设置的是否有watch变量来传入watcher对象// 如果true则将当前的ServerCnxn传入(ServerCnxn代表客户端和服务端的连接) List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);rsp = new GetChildrenResponse(children);break;}
将数据节点路径和ServerCnxn对象存储在WatcherManager的watchTable和watch2Paths中
public List<String> getChildren(String path, Stat stat, Watcher watcher)throws KeeperException.NoNodeException {DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {if (stat != null) {n.copyStat(stat);}List<String> children=new ArrayList<String>(n.getChildren());if (watcher != null) {childWatches.addWatch(path, watcher);}return children;}}
- 当服务端处理完毕之后,客户端的SendThread线程负责接收服务端的响应,finishPacket方法会从packet中取出WatchRegistration并注册到ZKWatchManager中
/*** This class services the outgoing request queue and generates the heart* beats. It also spawns the ReadThread.*/class SendThread extends ZooKeeperThread {private long lastPingSentNs;private final ClientCnxnSocket clientCnxnSocket;private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true;void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); }if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}// If SASL authentication is currently in progress, construct and// send a response packet immediately, rather than queuing a// response as with other packets.if (tunnelAuthInProgress()) {GetSASLRequest request = new GetSASLRequest();request.deserialize(bbia,"token");zooKeeperSaslClient.respondToServer(request.getToken(),ClientCnxn.this);return;}Packet packet;synchronized (pendingQueue) {if (pendingQueue.size() == 0) {throw new IOException("Nothing in the queue, but got "+ replyHdr.getXid());}packet = pendingQueue.remove();}/** Since requests are processed in order, we better get a response* to the first request!*/try {if (packet.requestHeader.getXid() != replyHdr.getXid()) {packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("Xid out of order. Got Xid "+ replyHdr.getXid() + " with err " ++ replyHdr.getErr() +" expected Xid "+ packet.requestHeader.getXid()+ " for a packet with details: "+ packet );}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {lastZxid = replyHdr.getZxid();}if (packet.response != null && replyHdr.getErr() == 0) {packet.response.deserialize(bbia, "response");}if (LOG.isDebugEnabled()) {LOG.debug("Reading reply sessionid:0x"+ Long.toHexString(sessionId) + ", packet:: " + packet);}} finally {finishPacket(packet);}}private void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}// Add all the removed watch events to the event queue, so that the// clients will be notified with 'Data/Child WatchRemoved' event type.if (p.watchDeregistration != null) {Map<EventType, Set<Watcher>> materializedWatchers = null;try {materializedWatchers = p.watchDeregistration.unregister(err);for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {Set<Watcher> watchers = entry.getValue();if (watchers.size() > 0) {queueEvent(p.watchDeregistration.getClientPath(), err,watchers, entry.getKey());// ignore connectionloss when removing from local// sessionp.replyHeader.setErr(Code.OK.intValue());}}} catch (KeeperException.NoWatcherException nwe) {LOG.error("Failed to find watcher!", nwe);p.replyHeader.setErr(nwe.code().intValue());} catch (KeeperException ke) {LOG.error("Exception when removing watcher", ke);p.replyHeader.setErr(ke.code().intValue());}}if (p.cb == null) {synchronized (p) {p.finished = true;p.notifyAll();}} else {p.finished = true;eventThread.queuePacket(p);}}
触发watcher org.apache.zookeeper.server.WatchManager#triggerWatch
Set<Watcher> triggerWatch(String path, EventType type) {return triggerWatch(path, type, null);}Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;// 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的synchronized (this) {watchers = watchTable.remove(path);if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"No watchers for " + path);}return null;}for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}// 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#processw.process(e);}return watchers;}
最后归纳
流程
- 客户端把注册的Watcher传到服务端,处理请求加入处理队列
- 服务端从处理队列取出事件,并处理请求返回给客户端
- 回调Watcher处理在客户端处理,并会被删除

相关文章:
zookeeper watch
目录 回顾回调&观察者模式&发布订阅模式Zookeeper 客户端/ 服务端 watchgetChildren 为例最后归纳 回顾回调&观察者模式&发布订阅模式 回调的思想 类A的a()方法调用类B的b()方法类B的b()方法执行完毕主动调用类A的callback()方法 回调分为同步回调和异步回调…...
vue3.x 的shallowReactive 与 shallowRef 详细解读
在 Vue 3.x 中,shallowReactive 和 shallowRef 是两个用于创建浅层响应式数据的 API。它们与 reactive 和 ref 类似,但在处理嵌套对象时的行为有所不同。以下是它们的详细解读和示例。 1. shallowReactive 作用 shallowReactive 创建一个浅层响应式对…...
鸿蒙NEXT开发-界面渲染(条件和循环)
注意:博主有个鸿蒙专栏,里面从上到下有关于鸿蒙next的教学文档,大家感兴趣可以学习下 如果大家觉得博主文章写的好的话,可以点下关注,博主会一直更新鸿蒙next相关知识 目录 1. 渲染-条件渲染 1.1 基本介绍 1.2 使…...
python电影数据分析及可视化系统建设
博主介绍:✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…...
在本地校验密码或弱口令 (windows)
# 0x00 背景 需求是验证服务器的弱口令,如果通过网络侧校验可能会造成账户锁定风险。在本地校验不会有锁定风险或频率限制。 # 0x01 实践 ## 1 使用 net use 命令 可以通过命令行使用 net use 命令来验证本地账户的密码。打开命令提示符(CMD࿰…...
pytest测试专题 - 1.3 测试用例发现规则
<< 返回目录 1 pytest测试专题 - 1.3 测试用例发现规则 执行pytest命令时,可以不输入参数,或者只输入文件名或者目录名,pytest会自己扫描测试用例。那pytest基于什么规则找到用例呢? 文件名:满足文件名称为tes…...
零基础学习人工智能
零基础学习人工智能是一个既充满挑战又极具潜力的过程。以下是一份详细的学习指南,旨在帮助零基础的学习者有效地踏入人工智能领域。 一、理解基本概念 在学习人工智能之前,首先要对其基本概念有一个清晰的认识。人工智能(AI)是…...
LeetCode热题100- 缺失的第一个正数【JavaScript讲解】
题目: 解题一: 如果不考虑时间复杂度和空间复杂度的话,我们最先想到的办法是先将该数组进行排序和去重,将最初的res结果值设置为1;将然后进行遍历,如果第一项不为1,则返回1,否则根…...
JAVA泛型介绍与举例
Java中,泛型用于编译阶段限制集合中元素的类型,或者限制类中某个属性的类型,编译过程中发生类型擦除,最终还是Object类型。 1. 集合中的泛型 集合默认可以存储任何类型的元素,即Object类型,当使用一个集合…...
【ISO 14229-1:2023 UDS诊断(会话控制0x10服务)测试用例CAPL代码全解析③】
ISO 14229-1:2023 UDS诊断【会话控制0x10服务】_TestCase03 作者:车端域控测试工程师 更新日期:2025年02月15日 关键词:UDS诊断、0x10服务、诊断会话控制、ECU测试、ISO 14229-1:2023 TC10-003测试用例 用例ID测试场景验证要点参考条款预期…...
Vivado生成edif网表及其使用
介绍如何在Vivado中将模块设为顶层,并生成相应的网表文件(Verilog文件和edif文件),该过程适用于需要将一个模块作为顶层设计进行综合,并生成用于其他工程中的网表文件的情况。 例如要将fpga_top模块制作成网表给其它工…...
Win10环境借助DockerDesktop部署大数据时序数据库Apache Druid
Win10环境借助DockerDesktop部署最新版大数据时序数据库Apache Druid32.0.0 前言 大数据分析中,有一种常见的场景,那就是时序数据,简言之,数据一旦产生绝对不会修改,随着时间流逝,每个时间点都会有个新的…...
mac 意外退出移动硬盘后再次插入移动硬盘不显示怎么办
第一步:sudo ps aux | grep fsck 打开mac控制台输入如下指令,我们看到会出现两个进程,看进程是root的这个 sudo ps aux|grep fsck 第二步:杀死进程 在第一步基础上我们知道不显示u盘的进程是:62319,我们…...
力扣动态规划-32【算法学习day.126】
前言 ###我做这类文章一个重要的目的还是记录自己的学习过程,我的解析也不会做的非常详细,只会提供思路和一些关键点,力扣上的大佬们的题解质量是非常非常高滴!!! 习题 1.完全平方数 题目链接:279. 完全…...
【算法进阶详解 第一节】树状数组
【算法进阶详解 第一节】树状数组 前言树状数组基础树状数组原理树状数组能够解决的问题 树状数组提高树状数组区间加,区间和操作二维树状数组 树状数组应用树状数组区间数颜色树状数组二维偏序 前言 树状数组在算法竞赛中十分常见,其能解决二维数点&am…...
【苍穹外卖】学习
软件开发整体介绍 作为一名软件开发工程师,我们需要了解在软件开发过程中的开发流程, 以及软件开发过程中涉及到的岗位角色,角色的分工、职责, 并了解软件开发中涉及到的三种软件环境。那么这一小节,我们将从 软件开发流程、角色…...
Python常见面试题的详解8
1. 变量作用域和查找规则(LEGB) 作用域层级: Local:函数内部作用域 Enclosing:闭包函数外层作用域 Global:模块全局作用域 Built-in:内置命名空间 查找顺序:L → E → G → B关…...
Deepseek R1模型本地化部署与API实战指南:释放企业级AI生产力
摘要 本文深入解析Deepseek R1开源大模型的本地化部署流程与API集成方案,涵盖从硬件选型、Docker环境搭建到模型微调及RESTful接口封装的完整企业级解决方案。通过电商评论分析和智能客服搭建等案例,展示如何将前沿AI技术转化为实际生产力。教程支持Lin…...
node.js + html调用ChatGPTApi实现Ai网站demo(带源码)
文章目录 前言一、demo演示二、node.js 使用步骤1.引入库2.引入包 前端HTML调用接口和UI所有文件总结 前言 关注博主,学习每天一个小demo 今天是Ai对话网站 又到了每天一个小demo的时候咯,前面我写了多人实时对话demo、和视频转换demo,今天…...
sql语言语法的学习
sql通用语法 sql分类 DDL(操作数据库和表) 操作数据库 操作表_查询 操作表_创建 举例: 操作表_删除 操作表_修改 DML(增删改表中数据) DML添加数据 DML删除数据 DML修改数据 DQL 单表查询 基础查询 条件查询 案例演示: 排序查询 聚合函数 分组查询…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
地震勘探——干扰波识别、井中地震时距曲线特点
目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波:可以用来解决所提出的地质任务的波;干扰波:所有妨碍辨认、追踪有效波的其他波。 地震勘探中,有效波和干扰波是相对的。例如,在反射波…...
基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
Spring是如何解决Bean的循环依赖:三级缓存机制
1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间互相持有对方引用,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...
springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...
C++.OpenGL (20/64)混合(Blending)
混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...
