当前位置: 首页 > news >正文

zookeeper watch

目录

    • 回顾回调&观察者模式&发布订阅模式
    • Zookeeper 客户端/ 服务端 watch
      • getChildren 为例
      • 最后归纳

在这里插入图片描述

回顾回调&观察者模式&发布订阅模式

  • 回调的思想
  1. 类A的a()方法调用类B的b()方法
  2. 类B的b()方法执行完毕主动调用类A的callback()方法

回调分为同步回调异步回调, 假如以买彩票的场景来模拟, 我买彩票, 调用彩票网,给我返回的结果确定是否中奖,同步回调就是,我买了彩票之后, 需要等待彩票网给我返回的结果, 这个时候我不能做其他事情, 我必须等待这个结果, 这就叫同步回调, 同步, 就意味着等待, 我不能去做其他事情, 必须等待, 异步回调就是, 我买了彩票之后, 可以去做其他事情, 然后当彩票网有了结果和消息, 再给我返回消息。

  • 观察者模式
    在这里插入图片描述
  • 发布订阅,对比 观察者模式
    在这里插入图片描述

Zookeeper 客户端/ 服务端 watch

  • 客户端维持的 socket 连接 ClientCnxn
/*** This class manages the socket i/o for the client. ClientCnxn maintains a list* of available servers to connect to and "transparently" switches servers it is* connected to as needed.**/
public class ClientCnxn {
/*** Manage watchers & handle events generated by the ClientCnxn object.** We are implementing this as a nested class of ZooKeeper so that* the public methods will not be exposed as part of the ZooKeeper client* API.*/static class ZKWatchManager implements ClientWatchManager {
  • 服务端 DataTree
/*** This class maintains the tree data structure. It doesn't have any networking* or client connection code in it so that it can be tested in a stand alone* way.* <p>* The tree maintains two parallel data structures: a hashtable that maps from* full paths to DataNodes and a tree of DataNodes. All accesses to a path is* through the hashtable. The tree is traversed only when serializing to disk.*/
public class DataTree {

getChildren 为例

/*** Return the list of the children of the node of the given path.* <p>* If the watch is non-null and the call is successful (no exception is thrown),* a watch will be left on the node with the given path. The watch willbe* triggered by a successful operation that deletes the node of the given* path or creates/delete a child under the node.* <p>* The list of children returned is not sorted and no guarantee is provided* as to its natural or lexical order.* <p>* A KeeperException with error code KeeperException.NoNode will be thrown* if no node with the given path exists.** @param path* @param watcher explicit watcher* @return an unordered array of children of the node with the given path* @throws InterruptedException If the server transaction is interrupted.* @throws KeeperException If the server signals an error with a non-zero error code.* @throws IllegalArgumentException if an invalid path is specified*/public List<String> getChildren(final String path, Watcher watcher)throws KeeperException, InterruptedException{final String clientPath = path;PathUtils.validatePath(clientPath);// the watch contains the un-chroot pathWatchRegistration wcb = null;if (watcher != null) {wcb = new ChildWatchRegistration(watcher, clientPath);}final String serverPath = prependChroot(clientPath);RequestHeader h = new RequestHeader();h.setType(ZooDefs.OpCode.getChildren);GetChildrenRequest request = new GetChildrenRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetChildrenResponse response = new GetChildrenResponse();ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);if (r.getErr() != 0) {throw KeeperException.create(KeeperException.Code.get(r.getErr()),clientPath);}return response.getChildren();}

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 发送请求给服务端

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {ReplyHeader r = new ReplyHeader();// 客户端与服务端的网络传输ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration);synchronized(packet) {while(!packet.finished) {packet.wait();}return r;}
}ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {ClientCnxn.Packet packet = null;LinkedList var11 = this.outgoingQueue;synchronized(this.outgoingQueue) {// 传输的对象都包装成Packet对象packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;if (this.state.isAlive() && !this.closing) {if (h.getType() == -11) {this.closing = true;}// 放入发送队列中,等待发送this.outgoingQueue.add(packet);} else {this.conLossPacket(packet);}}this.sendThread.getClientCnxnSocket().wakeupCnxn();return packet;
}

outgoingQueue的处理
在这里插入图片描述
在这里插入图片描述
服务端org.apache.zookeeper.server.FinalRequestProcessor#processRequest处理

 case OpCode.getChildren: {lastOp = "GETC";GetChildrenRequest getChildrenRequest = new GetChildrenRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getChildrenRequest);DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),ZooDefs.Perms.READ,request.authInfo);// 返回children,// 这里根据客户端设置的是否有watch变量来传入watcher对象// 如果true则将当前的ServerCnxn传入(ServerCnxn代表客户端和服务端的连接)      List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);rsp = new GetChildrenResponse(children);break;}

将数据节点路径和ServerCnxn对象存储在WatcherManager的watchTablewatch2Paths

 public List<String> getChildren(String path, Stat stat, Watcher watcher)throws KeeperException.NoNodeException {DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {if (stat != null) {n.copyStat(stat);}List<String> children=new ArrayList<String>(n.getChildren());if (watcher != null) {childWatches.addWatch(path, watcher);}return children;}}
  • 当服务端处理完毕之后,客户端的SendThread线程负责接收服务端的响应,finishPacket方法会从packet中取出WatchRegistration并注册到ZKWatchManager中

/*** This class services the outgoing request queue and generates the heart* beats. It also spawns the ReadThread.*/class SendThread extends ZooKeeperThread {private long lastPingSentNs;private final ClientCnxnSocket clientCnxnSocket;private Random r = new Random(System.nanoTime());        private boolean isFirstConnect = true;void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket               if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED;                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) );            		            		}if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}// If SASL authentication is currently in progress, construct and// send a response packet immediately, rather than queuing a// response as with other packets.if (tunnelAuthInProgress()) {GetSASLRequest request = new GetSASLRequest();request.deserialize(bbia,"token");zooKeeperSaslClient.respondToServer(request.getToken(),ClientCnxn.this);return;}Packet packet;synchronized (pendingQueue) {if (pendingQueue.size() == 0) {throw new IOException("Nothing in the queue, but got "+ replyHdr.getXid());}packet = pendingQueue.remove();}/** Since requests are processed in order, we better get a response* to the first request!*/try {if (packet.requestHeader.getXid() != replyHdr.getXid()) {packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("Xid out of order. Got Xid "+ replyHdr.getXid() + " with err " ++ replyHdr.getErr() +" expected Xid "+ packet.requestHeader.getXid()+ " for a packet with details: "+ packet );}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {lastZxid = replyHdr.getZxid();}if (packet.response != null && replyHdr.getErr() == 0) {packet.response.deserialize(bbia, "response");}if (LOG.isDebugEnabled()) {LOG.debug("Reading reply sessionid:0x"+ Long.toHexString(sessionId) + ", packet:: " + packet);}} finally {finishPacket(packet);}}private void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}// Add all the removed watch events to the event queue, so that the// clients will be notified with 'Data/Child WatchRemoved' event type.if (p.watchDeregistration != null) {Map<EventType, Set<Watcher>> materializedWatchers = null;try {materializedWatchers = p.watchDeregistration.unregister(err);for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {Set<Watcher> watchers = entry.getValue();if (watchers.size() > 0) {queueEvent(p.watchDeregistration.getClientPath(), err,watchers, entry.getKey());// ignore connectionloss when removing from local// sessionp.replyHeader.setErr(Code.OK.intValue());}}} catch (KeeperException.NoWatcherException nwe) {LOG.error("Failed to find watcher!", nwe);p.replyHeader.setErr(nwe.code().intValue());} catch (KeeperException ke) {LOG.error("Exception when removing watcher", ke);p.replyHeader.setErr(ke.code().intValue());}}if (p.cb == null) {synchronized (p) {p.finished = true;p.notifyAll();}} else {p.finished = true;eventThread.queuePacket(p);}}

触发watcher org.apache.zookeeper.server.WatchManager#triggerWatch

 Set<Watcher> triggerWatch(String path, EventType type) {return triggerWatch(path, type, null);}Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;// 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的synchronized (this) {watchers = watchTable.remove(path);if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"No watchers for " + path);}return null;}for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}// 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#processw.process(e);}return watchers;}

最后归纳

流程

  1. 客户端把注册的Watcher传到服务端,处理请求加入处理队列
  2. 服务端从处理队列取出事件,并处理请求返回给客户端
  3. 回调Watcher处理在客户端处理,并会被删除

在这里插入图片描述

相关文章:

zookeeper watch

目录 回顾回调&观察者模式&发布订阅模式Zookeeper 客户端/ 服务端 watchgetChildren 为例最后归纳 回顾回调&观察者模式&发布订阅模式 回调的思想 类A的a()方法调用类B的b()方法类B的b()方法执行完毕主动调用类A的callback()方法 回调分为同步回调和异步回调…...

vue3.x 的shallowReactive 与 shallowRef 详细解读

在 Vue 3.x 中&#xff0c;shallowReactive 和 shallowRef 是两个用于创建浅层响应式数据的 API。它们与 reactive 和 ref 类似&#xff0c;但在处理嵌套对象时的行为有所不同。以下是它们的详细解读和示例。 1. shallowReactive 作用 shallowReactive 创建一个浅层响应式对…...

鸿蒙NEXT开发-界面渲染(条件和循环)

注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注&#xff0c;博主会一直更新鸿蒙next相关知识 目录 1. 渲染-条件渲染 1.1 基本介绍 1.2 使…...

python电影数据分析及可视化系统建设

博主介绍&#xff1a;✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…...

在本地校验密码或弱口令 (windows)

# 0x00 背景 需求是验证服务器的弱口令&#xff0c;如果通过网络侧校验可能会造成账户锁定风险。在本地校验不会有锁定风险或频率限制。 # 0x01 实践 ## 1 使用 net use 命令 可以通过命令行使用 net use 命令来验证本地账户的密码。打开命令提示符&#xff08;CMD&#xff0…...

pytest测试专题 - 1.3 测试用例发现规则

<< 返回目录 1 pytest测试专题 - 1.3 测试用例发现规则 执行pytest命令时&#xff0c;可以不输入参数&#xff0c;或者只输入文件名或者目录名&#xff0c;pytest会自己扫描测试用例。那pytest基于什么规则找到用例呢&#xff1f; 文件名&#xff1a;满足文件名称为tes…...

零基础学习人工智能

零基础学习人工智能是一个既充满挑战又极具潜力的过程。以下是一份详细的学习指南&#xff0c;旨在帮助零基础的学习者有效地踏入人工智能领域。 一、理解基本概念 在学习人工智能之前&#xff0c;首先要对其基本概念有一个清晰的认识。人工智能&#xff08;AI&#xff09;是…...

LeetCode热题100- 缺失的第一个正数【JavaScript讲解】

题目&#xff1a; 解题一&#xff1a; 如果不考虑时间复杂度和空间复杂度的话&#xff0c;我们最先想到的办法是先将该数组进行排序和去重&#xff0c;将最初的res结果值设置为1&#xff1b;将然后进行遍历&#xff0c;如果第一项不为1&#xff0c;则返回1&#xff0c;否则根…...

JAVA泛型介绍与举例

Java中&#xff0c;泛型用于编译阶段限制集合中元素的类型&#xff0c;或者限制类中某个属性的类型&#xff0c;编译过程中发生类型擦除&#xff0c;最终还是Object类型。 1. 集合中的泛型 集合默认可以存储任何类型的元素&#xff0c;即Object类型&#xff0c;当使用一个集合…...

【ISO 14229-1:2023 UDS诊断(会话控制0x10服务)测试用例CAPL代码全解析③】

ISO 14229-1:2023 UDS诊断【会话控制0x10服务】_TestCase03 作者&#xff1a;车端域控测试工程师 更新日期&#xff1a;2025年02月15日 关键词&#xff1a;UDS诊断、0x10服务、诊断会话控制、ECU测试、ISO 14229-1:2023 TC10-003测试用例 用例ID测试场景验证要点参考条款预期…...

Vivado生成edif网表及其使用

介绍如何在Vivado中将模块设为顶层&#xff0c;并生成相应的网表文件&#xff08;Verilog文件和edif文件&#xff09;&#xff0c;该过程适用于需要将一个模块作为顶层设计进行综合&#xff0c;并生成用于其他工程中的网表文件的情况。 例如要将fpga_top模块制作成网表给其它工…...

Win10环境借助DockerDesktop部署大数据时序数据库Apache Druid

Win10环境借助DockerDesktop部署最新版大数据时序数据库Apache Druid32.0.0 前言 大数据分析中&#xff0c;有一种常见的场景&#xff0c;那就是时序数据&#xff0c;简言之&#xff0c;数据一旦产生绝对不会修改&#xff0c;随着时间流逝&#xff0c;每个时间点都会有个新的…...

mac 意外退出移动硬盘后再次插入移动硬盘不显示怎么办

第一步&#xff1a;sudo ps aux | grep fsck 打开mac控制台输入如下指令&#xff0c;我们看到会出现两个进程&#xff0c;看进程是root的这个 sudo ps aux|grep fsck 第二步&#xff1a;杀死进程 在第一步基础上我们知道不显示u盘的进程是&#xff1a;62319&#xff0c;我们…...

力扣动态规划-32【算法学习day.126】

前言 ###我做这类文章一个重要的目的还是记录自己的学习过程&#xff0c;我的解析也不会做的非常详细&#xff0c;只会提供思路和一些关键点&#xff0c;力扣上的大佬们的题解质量是非常非常高滴&#xff01;&#xff01;&#xff01; 习题 1.完全平方数 题目链接:279. 完全…...

【算法进阶详解 第一节】树状数组

【算法进阶详解 第一节】树状数组 前言树状数组基础树状数组原理树状数组能够解决的问题 树状数组提高树状数组区间加&#xff0c;区间和操作二维树状数组 树状数组应用树状数组区间数颜色树状数组二维偏序 前言 树状数组在算法竞赛中十分常见&#xff0c;其能解决二维数点&am…...

【苍穹外卖】学习

软件开发整体介绍 作为一名软件开发工程师,我们需要了解在软件开发过程中的开发流程&#xff0c; 以及软件开发过程中涉及到的岗位角色&#xff0c;角色的分工、职责&#xff0c; 并了解软件开发中涉及到的三种软件环境。那么这一小节&#xff0c;我们将从 软件开发流程、角色…...

Python常见面试题的详解8

1. 变量作用域和查找规则&#xff08;LEGB&#xff09; 作用域层级&#xff1a; Local&#xff1a;函数内部作用域 Enclosing&#xff1a;闭包函数外层作用域 Global&#xff1a;模块全局作用域 Built-in&#xff1a;内置命名空间 查找顺序&#xff1a;L → E → G → B关…...

Deepseek R1模型本地化部署与API实战指南:释放企业级AI生产力

摘要 本文深入解析Deepseek R1开源大模型的本地化部署流程与API集成方案&#xff0c;涵盖从硬件选型、Docker环境搭建到模型微调及RESTful接口封装的完整企业级解决方案。通过电商评论分析和智能客服搭建等案例&#xff0c;展示如何将前沿AI技术转化为实际生产力。教程支持Lin…...

node.js + html调用ChatGPTApi实现Ai网站demo(带源码)

文章目录 前言一、demo演示二、node.js 使用步骤1.引入库2.引入包 前端HTML调用接口和UI所有文件总结 前言 关注博主&#xff0c;学习每天一个小demo 今天是Ai对话网站 又到了每天一个小demo的时候咯&#xff0c;前面我写了多人实时对话demo、和视频转换demo&#xff0c;今天…...

sql语言语法的学习

sql通用语法 sql分类 DDL(操作数据库和表) 操作数据库 操作表_查询 操作表_创建 举例&#xff1a; 操作表_删除 操作表_修改 DML(增删改表中数据) DML添加数据 DML删除数据 DML修改数据 DQL 单表查询 基础查询 条件查询 案例演示&#xff1a; 排序查询 聚合函数 分组查询…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

css实现圆环展示百分比,根据值动态展示所占比例

代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩

目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

回溯算法学习

一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...

用鸿蒙HarmonyOS5实现中国象棋小游戏的过程

下面是一个基于鸿蒙OS (HarmonyOS) 的中国象棋小游戏的实现代码。这个实现使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chinesechess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├──…...