【Netty】Promise 源码分析(十七)
文章目录
- 前言
- 一、Promise 接口
- 二、Netty 的 DefaultPromise
- 2.1、设置任务的成功或失败
- 2.2、获取 Future 任务执行结果和添加监听事件
- 三、Netty 的 DefaultChannelPromise
- 总结
前言
回顾Netty系列文章:
- Netty 概述(一)
- Netty 架构设计(二)
- Netty Channel 概述(三)
- Netty ChannelHandler(四)
- ChannelPipeline源码分析(五)
- 字节缓冲区 ByteBuf (六)(上)
- 字节缓冲区 ByteBuf(七)(下)
- Netty 如何实现零拷贝(八)
- Netty 程序引导类(九)
- Reactor 模型(十)
- 工作原理详解(十一)
- Netty 解码器(十二)
- Netty 编码器(十三)
- Netty 编解码器(十四)
- 自定义解码器、编码器、编解码器(十五)
- Future 源码分析(十六)
本篇文章我就就来分析一下可写的 Future,也就是 promise,Netty 中的 Promise 扩展自 Netty 的 Future。
一、Promise 接口
在 Netty 中,Promise 接口是一种特殊的可写的 Future。 Promise 的核心源码如下:
public interface Promise<V> extends Future<V> {Promise<V> setSuccess(V var1);boolean trySuccess(V var1);Promise<V> setFailure(Throwable var1);boolean tryFailure(Throwable var1);boolean setUncancellable();Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> await() throws InterruptedException;Promise<V> awaitUninterruptibly();Promise<V> sync() throws InterruptedException;Promise<V> syncUninterruptibly();
}
从上面可以看出,Promise 就是一个可写的 Future。在 Future 机制中,业务逻辑所在任务执行的状态(成功或失败)是在 Future 中实现的;而在 Promise 中,可以在业务逻辑中控制任务的执行结果,相比 Future 更加灵活。
以下是一个 Promise 的示例(伪代码)。
//异步的耗时任务接收一个 Promise
public Promise asynchronousFunction() {Promise promise = new PromiseImpl();Object result = null;return =search() //业务逻辑if (sucess) {promise.setSuccess(result); //通知 promise 当前异步任务成功了,并传入结果} else if (failed) {promise.setFailure(reason);//通知 promise 当前异步任务失败了} else if (error) {promise.setFailure(error);//通知 promise 当前异步任务发生了异常}
}//调用异步的耗时操作
Promise promise = asynchronousFunction(promise);//会立即返回 promise//添加成功处理 / 失败处理 / 异步处理等事件
promise.addListener();//例如:可以添加成功后的执行事件//继续做其他事件,不需要理会 asynchronousFunction 何时结束
doOtherThings();
在 Netty 中,Promise 继承了 Future,因此也具备了 Future 的所有功能。在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败。
Netty 的常用 Promise 类有 DefaultPromise 类,这是 Promise 实现的基础,DefaultChannelPromise 是 DefaultPromise 的子类,加入了channel属性。
二、Netty 的 DefaultPromise
Netty 中涉及异步操作的地方都使用了 Promise 。例如,下面是服务器/客户端启动时的注册任务,最终会调用 Unsafe 的 register,调用过程中会传入一个Promise 。Unsafe 进行事件的注册时调用 Promise 可以设置成功或者失败。
//SingleThreadEventLoop.java
public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;
}//AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));} else if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));} else {AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} else {try {eventLoop.execute(new Runnable() {public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}
}
DefaultPromise 提供的功能可以分为两个部分;一个是为调用者提供 get()和addListen()用于获取 Future 任务执行结果和添加监听事件;另一部分是为业务处理任务提供setSucess()等方法设置任务的成功或失败。
2.1、设置任务的成功或失败
DefaultPromise 核心源码如下:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {public Promise<V> setSuccess(V result) {if (this.setSuccess0(result)) {return this;} else {throw new IllegalStateException("complete already: " + this);}}public boolean trySuccess(V result) {return this.setSuccess0(result);}public Promise<V> setFailure(Throwable cause) {if (this.setFailure0(cause)) {return this;} else {throw new IllegalStateException("complete already: " + this, cause);}}public boolean tryFailure(Throwable cause) {return this.setFailure0(cause);}public boolean setUncancellable() {if (RESULT_UPDATER.compareAndSet(this, (Object)null, UNCANCELLABLE)) {return true;} else {Object result = this.result;return !isDone0(result) || !isCancelled0(result);}}public boolean isSuccess() {Object result = this.result;return result != null && result != UNCANCELLABLE && !(result instanceof DefaultPromise.CauseHolder);}public boolean isCancellable() {return this.result == null;}//...}
2.2、获取 Future 任务执行结果和添加监听事件
DefaultPromise 的get方法有 3 个。
无参数的get会阻塞等待;
有参数的get会等待指定事件,若未结束就抛出超时异常,这两个get是在其父类 AbstractFuture中实现的。getNow()方法则会立马返回结果。
源码如下:
public V getNow() {Object result = this.result;return !(result instanceof DefaultPromise.CauseHolder) && result != SUCCESS && result != UNCANCELLABLE ? result : null;
}public V get() throws InterruptedException, ExecutionException {Object result = this.result;if (!isDone0(result)) {this.await();result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {Object result = this.result;if (!isDone0(result)) {if (!this.await(timeout, unit)) {throw new TimeoutException();}result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}
await() 方法判断 Future 任务是否结束,之后获取 this 锁,如果任务未完成则调用 Object 的 wait()等待。源码如下:
public Promise<V> await() throws InterruptedException { if (this.isDone()) {return this;} else if (Thread.interrupted()) {throw new InterruptedException(this.toString());} else {this.checkDeadLock();synchronized(this) {while(!this.isDone()) {this.incWaiters();try {this.wait();} finally {this.decWaiters();}}return this;}}//...
}
addListener 方法被调用时,将传入的回调传入listeners对象中。如果监听多于 1 个,会创建DeflaultFutureListeners对象将回调方法保存在一个数组中。
removeListener会将listeners设置为null(只有一个时)或从数组中移除(多个回调时)。源码如下。
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.addListener0(listener);}if (this.isDone()) {this.notifyListeners();}return this;
} public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;int var5 = 0;while(var5 < var4) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener != null) {this.addListener0(listener);++var5;continue;}}}if (this.isDone()) {this.notifyListeners();}return this;
}public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.removeListener0(listener);return this;}
}public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;for(int var5 = 0; var5 < var4; ++var5) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener == null) {break;}this.removeListener0(listener);}return this;}
}
在添加监听器的过程中,如果任务刚好执行完毕 done(),则立即触发监听事件。触发监听通过notifyListeners()实现。主要逻辑如下:
如果当前addListener的线程(准确来说应该是调用了notifyListeners的线程,因为addListener和setSuccess都会调用notifyListeners和 Promise 内的线程池)与当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池中执行;
而如果是执行 Future 任务的线程池中的setSuccess时,调用notifyListeners(),会放在当前线程中执行。内部维护了notifyListeners用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次遍历并调用operationComplete。
三、Netty 的 DefaultChannelPromise
DefaultChannelPromise 是 DefaultPromise 的子类,内部维护了一个通道变量 channel。
Promise 机制相关的方法都是调用父类方法。
除此之外,DefaultChannelPromise 还实现了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,可以将ChannelFuture注册到ChannelFlushPromiseNotifier类,当有数据写入或到达checkpoint时使用。
核心源码如下:
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {private final Channel channel;private long checkpoint;//...public Channel channel() {return this.channel;}public ChannelPromise setSuccess() {return this.setSuccess((Void)null);}public ChannelPromise setSuccess(Void result) {super.setSuccess(result);return this;}public boolean trySuccess() {return this.trySuccess((Object)null);}public ChannelPromise setFailure(Throwable cause) {super.setFailure(cause);return this;}//...public ChannelPromise promise() {return this;}protected void checkDeadLock() {if (this.channel().isRegistered()) {super.checkDeadLock();}}public ChannelPromise unvoid() {return this;}public boolean isVoid() {return false;}
}
总结
以上我们分析了 Netty 中的 Promise,知道了它是扩展自 Netty 的 Future,是一个可写的 Future。
相关文章:
【Netty】Promise 源码分析(十七)
文章目录 前言一、Promise 接口二、Netty 的 DefaultPromise2.1、设置任务的成功或失败2.2、获取 Future 任务执行结果和添加监听事件 三、Netty 的 DefaultChannelPromise总结 前言 回顾Netty系列文章: Netty 概述(一)Netty 架构设计&…...
测牛学堂:2023最新自动化软件测试教程之python基础(字符串常用api总结)
python字符串常用API总结 1 count 查找某个字符在整个字符串中出现的次数 2 capitalize 将字符串的第一个字符转换为大写 3 center(width,fillchar) 返回一个指定宽度的字符串,fillchar为填充的字符,默认是空格,常用* str1 分隔线 print(st…...
【信号变化检测】使用新颖的短时间条件局部峰值速率特征进行信号变化/事件/异常检测(Matlab代码实现)
、 💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭…...
MQTT GUI 客户端 可视化管理工具
MQTT GUI 客户端 可视化管理工具 介绍 多标签页管理,同时打开多个连接提供原生性能,并且比使用 Electron 等 Web 技术开发的同等应用程序消耗的资源少得多支持 MQTT v5.0 以及 MQTT v3.1.1 协议,支持通过 WebSocket 连接至 MQTT 服务器以树…...
计算机硬件系统 — 冯诺依曼体系结构运行原理解析
目录 文章目录 目录计算机系统计算机硬件系统(冯诺依曼体系结构)PC 主机硬件CPU(中央处理器)CPU 的组成部分CPU 总线控制器单元运算器单元寄存器组超线程与多核架构三级高速缓存为什么需要缓存三级缓存结构 CPU 的指令集指令集的类…...
10.Linux查看文件内容
在 Linux 中,可以使用多种命令来查看文件内容。以下是几个常用的命令及其用法: cat 命令:以行为单位显示整个文件内容。 cat file.txt # 显示名为 file.txt 的文件内容less 命令:分页显示文件内容,可向前/后翻页、搜索…...
API接口测试—详情版(拼多多根据ID取商品详情)
一、为什么要做接口测试 做接口测试的原因主要有以下几个方面: 1. 确保接口功能正确性:接口是不同软件系统或者不同模块之间的数据传输和交互的通道,通过接口测试能够确保不同系统或者模块之间传递的信息准确无误,从而保证了整个…...
【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering(分离对比协同过滤)
【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering(分离对比协同过滤) 文章目录 【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering(分离对比协同过滤)1. 来源2. 介绍3. 模型方法3.1…...
目前的网络情况与特点
现有网络无法进展安全管理与控制,缺乏可管理与安全性,一旦 网络出现病毒与网络攻击现象,将会涉与到个别部门部数据丢失与影 响相关的业务运作。 1 1.1 采用普通傻瓜式交换机 目前全所各部门采用的交换机根本上为 TP-LINK、D-LINK 10/100M 傻瓜…...
css选择器及其权重
1. 类型选择器 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-wid…...
RK3588平台开发系列讲解(项目篇)RKNN-Toolkit2 的使用
平台内核版本安卓版本RK3588Linux 5.10Android 12文章目录 一、RKNN-Toolkit2安装二、模型转换和模型推理三、性能和内存评估沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 NPU 是专门用于神经网络的处理单元。它旨在加速人工智能领域的神经网络算法,如机器视觉和自…...
C/C++基础讲解(九十九)之经典篇(第几天/排序)
C/C++基础讲解(九十九)之经典篇(第几天/排序) 程序之美 前言 很多时候,特别是刚步入大学的学子们,对于刚刚开展的计算机课程基本上是一团迷雾,想要弄明白其中的奥秘,真的要花费一些功夫,我和大家一样都是这么啃过来的,从不知到知知,懵懂到入门,每一步都走的很艰辛,…...
quickstart Guide快速入门
本文档参考backtrader官方文档,是官方文档的完整中文翻译,可作为backtrader中文教程、backtrader中文参考手册、backtrader中文开发手册、backtrader入门资料使用。 快速入门章节目录 快速入门使用平台从0到100:一步一步的演示基本设置设置现…...
Kubernetes 证书详解
K8S 证书介绍 在 Kube-apiserver 中提供了很多认证方式,其中最常用的就是 TLS 认证,当然也有 BootstrapToken,BasicAuth 认证等,只要有一个认证通过,那么 Kube-apiserver 即认为认证通过。下面就主要讲解 TLS 认证。 …...
Python常用数据结构
Python 提供了多种内置的数据结构,用于存储和组织数据。以下是一些常见的 Python 数据结构: 1.列表(List):列表是一个有序、可变的数据集合,可以包含任意类型的元素。列表使用方括号 [] 表示,元…...
CompletableFuture详解-初遇者-很细
目录 一、创建异步任务 1. supplyAsync 2. runAsync 3.获取任务结果的方法 二、异步回调处理 1.thenApply和thenApplyAsync 2.thenAccept和thenAcceptAsync 2.thenRun和thenRunAsync 3.whenComplete和whenCompleteAsync 4.handle和handleAsync 三、多任务组合处理 1…...
【iOS】—— iOS中的相关锁
文章目录 自旋锁1.OSSpinLock2.os_unfair_lock3.atomic 互斥锁pthread_mutexsynchronizedobjc_sync_enterobjc_sync_exit注意事项 NSLockNSRecursiveLock信号量条件锁NSConditionNSConditionLock 读写锁总结 锁作为一种非强制的机制,被用来保证线程安全。每一个线程…...
表单重复提交:
1. 表单重复提交原因 当用户提交完请求,浏览器会记录最后一次请求的全部信息。用户按下功能键F5,就会发起浏览器记录的最后一次请求。如果最后一次请求为添加操作,那么此时刷新按钮就会再次提交数据,造成表单重复提交。 2. 表单…...
【0197】共享内存管理结构(shmem)之创建共享内存分配机制(Shared Memory Allocation)(2 - 2)
文章目录 1. 概述2. 初始化事务管理器 ShmemVariableCache2.1 从共享内存分配 VariableCacheData 大小内存空间2.1.1 分配对齐块2.2 内存空间清零相关文章: 【0195】共享内存管理结构(shmem)之概念篇(1) 【0196】共享内存管理结构(shmem)之创建共享内存分配机制(Shared…...
ChatGPT国内免费使用方法有哪些?
目录 ChatGPT介绍:一、ChatGPT是什么?二、ChatGPT发展:三、ChatGPT 优点:四、国内使用ChatGPT方法五、结语: ChatGPT介绍: 一、ChatGPT是什么? ChatGPT 是一个基于语言模型 GPT-3.5 的聊天机器人,ChatGPT模型是Instruct GPT的姊妹模型(siblingmodel&a…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
Oracle11g安装包
Oracle 11g安装包 适用于windows系统,64位 下载路径 oracle 11g 安装包...
Java并发编程实战 Day 11:并发设计模式
【Java并发编程实战 Day 11】并发设计模式 开篇 这是"Java并发编程实战"系列的第11天,今天我们聚焦于并发设计模式。并发设计模式是解决多线程环境下常见问题的经典解决方案,它们不仅提供了优雅的设计思路,还能显著提升系统的性能…...
boost::filesystem::path文件路径使用详解和示例
boost::filesystem::path 是 Boost 库中用于跨平台操作文件路径的类,封装了路径的拼接、分割、提取、判断等常用功能。下面是对它的使用详解,包括常用接口与完整示例。 1. 引入头文件与命名空间 #include <boost/filesystem.hpp> namespace fs b…...
Linux入门课的思维导图
耗时两周,终于把慕课网上的Linux的基础入门课实操、总结完了! 第一次以Blog的形式做学习记录,过程很有意思,但也很耗时。 课程时长5h,涉及到很多专有名词,要去逐个查找,以前接触过的概念因为时…...
java 局域网 rtsp 取流 WebSocket 推送到前端显示 低延迟
众所周知 摄像头取流推流显示前端延迟大 传统方法是服务器取摄像头的rtsp流 然后客户端连服务器 中转多了,延迟一定不小。 假设相机没有专网 公网 1相机自带推流 直接推送到云服务器 然后客户端拉去 2相机只有rtsp ,边缘服务器拉流推送到云服务器 …...
前端打包工具简单介绍
前端打包工具简单介绍 一、Webpack 架构与插件机制 1. Webpack 架构核心组成 Entry(入口) 指定应用的起点文件,比如 src/index.js。 Module(模块) Webpack 把项目当作模块图,模块可以是 JS、CSS、图片等…...
分布式光纤声振传感技术原理与瑞利散射机制解析
分布式光纤传感技术(Distributed Fiber Optic Sensing,简称DFOS)作为近年来迅速发展的新型感知手段,已广泛应用于边界安防、油气管道监测、结构健康诊断、地震探测等领域。其子类技术——分布式光纤声振传感(Distribut…...
NLP常用工具包
✨做一次按NLP项目常见工具的使用拆解 1. tokenizer from torchtext.data.utils import get_tokenizertokenizer get_tokenizer(basic_english) text_sample "Were going on an adventure! The weather is really nice today." tokens tokenizer(text_sample) p…...
stm32—ADC和DAC
ADC和DAC 在嵌入式系统中,微控制器经常需要与现实世界的模拟信号进行交互。STM32微控制器内置了模拟数字转换器(ADC)和数字模拟转换器(DAC),它们是实现这种交互的关键模块。 1. 模拟数字转换器(…...
