当前位置: 首页 > news >正文

【Netty】Promise 源码分析(十七)

文章目录

  • 前言
  • 一、Promise 接口
  • 二、Netty 的 DefaultPromise
    • 2.1、设置任务的成功或失败
    • 2.2、获取 Future 任务执行结果和添加监听事件
  • 三、Netty 的 DefaultChannelPromise
  • 总结

前言

回顾Netty系列文章:

  • Netty 概述(一)
  • Netty 架构设计(二)
  • Netty Channel 概述(三)
  • Netty ChannelHandler(四)
  • ChannelPipeline源码分析(五)
  • 字节缓冲区 ByteBuf (六)(上)
  • 字节缓冲区 ByteBuf(七)(下)
  • Netty 如何实现零拷贝(八)
  • Netty 程序引导类(九)
  • Reactor 模型(十)
  • 工作原理详解(十一)
  • Netty 解码器(十二)
  • Netty 编码器(十三)
  • Netty 编解码器(十四)
  • 自定义解码器、编码器、编解码器(十五)
  • Future 源码分析(十六)

本篇文章我就就来分析一下可写的 Future,也就是 promise,Netty 中的 Promise 扩展自 Netty 的 Future。

一、Promise 接口

在 Netty 中,Promise 接口是一种特殊的可写的 Future。 Promise 的核心源码如下:

public interface Promise<V> extends Future<V> {Promise<V> setSuccess(V var1);boolean trySuccess(V var1);Promise<V> setFailure(Throwable var1);boolean tryFailure(Throwable var1);boolean setUncancellable();Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> await() throws InterruptedException;Promise<V> awaitUninterruptibly();Promise<V> sync() throws InterruptedException;Promise<V> syncUninterruptibly();
}

从上面可以看出,Promise 就是一个可写的 Future。在 Future 机制中,业务逻辑所在任务执行的状态(成功或失败)是在 Future 中实现的;而在 Promise 中,可以在业务逻辑中控制任务的执行结果,相比 Future 更加灵活。
以下是一个 Promise 的示例(伪代码)。

//异步的耗时任务接收一个 Promise
public Promise asynchronousFunction() {Promise promise = new PromiseImpl();Object result = null;return =search()  //业务逻辑if (sucess) {promise.setSuccess(result); //通知 promise 当前异步任务成功了,并传入结果} else if (failed) {promise.setFailure(reason);//通知 promise 当前异步任务失败了} else if (error) {promise.setFailure(error);//通知 promise 当前异步任务发生了异常}
}//调用异步的耗时操作
Promise promise = asynchronousFunction(promise);//会立即返回 promise//添加成功处理 / 失败处理 / 异步处理等事件
promise.addListener();//例如:可以添加成功后的执行事件//继续做其他事件,不需要理会 asynchronousFunction 何时结束
doOtherThings();

在 Netty 中,Promise 继承了 Future,因此也具备了 Future 的所有功能。在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败。
Netty 的常用 Promise 类有 DefaultPromise 类,这是 Promise 实现的基础,DefaultChannelPromise 是 DefaultPromise 的子类,加入了channel属性。

二、Netty 的 DefaultPromise

Netty 中涉及异步操作的地方都使用了 Promise 。例如,下面是服务器/客户端启动时的注册任务,最终会调用 Unsafe 的 register,调用过程中会传入一个Promise 。Unsafe 进行事件的注册时调用 Promise 可以设置成功或者失败。

//SingleThreadEventLoop.java
public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;
}//AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));} else if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));} else {AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} else {try {eventLoop.execute(new Runnable() {public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}
}

DefaultPromise 提供的功能可以分为两个部分;一个是为调用者提供 get()和addListen()用于获取 Future 任务执行结果和添加监听事件;另一部分是为业务处理任务提供setSucess()等方法设置任务的成功或失败。

2.1、设置任务的成功或失败

DefaultPromise 核心源码如下:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {public Promise<V> setSuccess(V result) {if (this.setSuccess0(result)) {return this;} else {throw new IllegalStateException("complete already: " + this);}}public boolean trySuccess(V result) {return this.setSuccess0(result);}public Promise<V> setFailure(Throwable cause) {if (this.setFailure0(cause)) {return this;} else {throw new IllegalStateException("complete already: " + this, cause);}}public boolean tryFailure(Throwable cause) {return this.setFailure0(cause);}public boolean setUncancellable() {if (RESULT_UPDATER.compareAndSet(this, (Object)null, UNCANCELLABLE)) {return true;} else {Object result = this.result;return !isDone0(result) || !isCancelled0(result);}}public boolean isSuccess() {Object result = this.result;return result != null && result != UNCANCELLABLE && !(result instanceof DefaultPromise.CauseHolder);}public boolean isCancellable() {return this.result == null;}//...}

2.2、获取 Future 任务执行结果和添加监听事件

DefaultPromise 的get方法有 3 个。

无参数的get会阻塞等待;
有参数的get会等待指定事件,若未结束就抛出超时异常,这两个get是在其父类 AbstractFuture中实现的。getNow()方法则会立马返回结果。

源码如下:

public V getNow() {Object result = this.result;return !(result instanceof DefaultPromise.CauseHolder) && result != SUCCESS && result != UNCANCELLABLE ? result : null;
}public V get() throws InterruptedException, ExecutionException {Object result = this.result;if (!isDone0(result)) {this.await();result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {Object result = this.result;if (!isDone0(result)) {if (!this.await(timeout, unit)) {throw new TimeoutException();}result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}

await() 方法判断 Future 任务是否结束,之后获取 this 锁,如果任务未完成则调用 Object 的 wait()等待。源码如下:

public Promise<V> await() throws InterruptedException { if (this.isDone()) {return this;} else if (Thread.interrupted()) {throw new InterruptedException(this.toString());} else {this.checkDeadLock();synchronized(this) {while(!this.isDone()) {this.incWaiters();try {this.wait();} finally {this.decWaiters();}}return this;}}//...
}

addListener 方法被调用时,将传入的回调传入listeners对象中。如果监听多于 1 个,会创建DeflaultFutureListeners对象将回调方法保存在一个数组中。
removeListener会将listeners设置为null(只有一个时)或从数组中移除(多个回调时)。源码如下。

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.addListener0(listener);}if (this.isDone()) {this.notifyListeners();}return this;
}   public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;int var5 = 0;while(var5 < var4) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener != null) {this.addListener0(listener);++var5;continue;}}}if (this.isDone()) {this.notifyListeners();}return this;
}public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.removeListener0(listener);return this;}
}public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;for(int var5 = 0; var5 < var4; ++var5) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener == null) {break;}this.removeListener0(listener);}return this;}
}

在添加监听器的过程中,如果任务刚好执行完毕 done(),则立即触发监听事件。触发监听通过notifyListeners()实现。主要逻辑如下:
如果当前addListener的线程(准确来说应该是调用了notifyListeners的线程,因为addListener和setSuccess都会调用notifyListeners和 Promise 内的线程池)与当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池中执行;
而如果是执行 Future 任务的线程池中的setSuccess时,调用notifyListeners(),会放在当前线程中执行。内部维护了notifyListeners用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次遍历并调用operationComplete。

三、Netty 的 DefaultChannelPromise

DefaultChannelPromise 是 DefaultPromise 的子类,内部维护了一个通道变量 channel。
Promise 机制相关的方法都是调用父类方法。
除此之外,DefaultChannelPromise 还实现了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,可以将ChannelFuture注册到ChannelFlushPromiseNotifier类,当有数据写入或到达checkpoint时使用。
核心源码如下:

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {private final Channel channel;private long checkpoint;//...public Channel channel() {return this.channel;}public ChannelPromise setSuccess() {return this.setSuccess((Void)null);}public ChannelPromise setSuccess(Void result) {super.setSuccess(result);return this;}public boolean trySuccess() {return this.trySuccess((Object)null);}public ChannelPromise setFailure(Throwable cause) {super.setFailure(cause);return this;}//...public ChannelPromise promise() {return this;}protected void checkDeadLock() {if (this.channel().isRegistered()) {super.checkDeadLock();}}public ChannelPromise unvoid() {return this;}public boolean isVoid() {return false;}
}

总结

以上我们分析了 Netty 中的 Promise,知道了它是扩展自 Netty 的 Future,是一个可写的 Future。

相关文章:

【Netty】Promise 源码分析(十七)

文章目录 前言一、Promise 接口二、Netty 的 DefaultPromise2.1、设置任务的成功或失败2.2、获取 Future 任务执行结果和添加监听事件 三、Netty 的 DefaultChannelPromise总结 前言 回顾Netty系列文章&#xff1a; Netty 概述&#xff08;一&#xff09;Netty 架构设计&…...

测牛学堂:2023最新自动化软件测试教程之python基础(字符串常用api总结)

python字符串常用API总结 1 count 查找某个字符在整个字符串中出现的次数 2 capitalize 将字符串的第一个字符转换为大写 3 center(width,fillchar) 返回一个指定宽度的字符串&#xff0c;fillchar为填充的字符&#xff0c;默认是空格&#xff0c;常用* str1 分隔线 print(st…...

【信号变化检测】使用新颖的短时间条件局部峰值速率特征进行信号变化/事件/异常检测(Matlab代码实现)

、 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭…...

MQTT GUI 客户端 可视化管理工具

MQTT GUI 客户端 可视化管理工具 介绍 多标签页管理&#xff0c;同时打开多个连接提供原生性能&#xff0c;并且比使用 Electron 等 Web 技术开发的同等应用程序消耗的资源少得多支持 MQTT v5.0 以及 MQTT v3.1.1 协议&#xff0c;支持通过 WebSocket 连接至 MQTT 服务器以树…...

计算机硬件系统 — 冯诺依曼体系结构运行原理解析

目录 文章目录 目录计算机系统计算机硬件系统&#xff08;冯诺依曼体系结构&#xff09;PC 主机硬件CPU&#xff08;中央处理器&#xff09;CPU 的组成部分CPU 总线控制器单元运算器单元寄存器组超线程与多核架构三级高速缓存为什么需要缓存三级缓存结构 CPU 的指令集指令集的类…...

10.Linux查看文件内容

在 Linux 中&#xff0c;可以使用多种命令来查看文件内容。以下是几个常用的命令及其用法&#xff1a; cat 命令&#xff1a;以行为单位显示整个文件内容。 cat file.txt # 显示名为 file.txt 的文件内容less 命令&#xff1a;分页显示文件内容&#xff0c;可向前/后翻页、搜索…...

API接口测试—详情版(拼多多根据ID取商品详情)

一、为什么要做接口测试 做接口测试的原因主要有以下几个方面&#xff1a; 1. 确保接口功能正确性&#xff1a;接口是不同软件系统或者不同模块之间的数据传输和交互的通道&#xff0c;通过接口测试能够确保不同系统或者模块之间传递的信息准确无误&#xff0c;从而保证了整个…...

【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering(分离对比协同过滤)

【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering&#xff08;分离对比协同过滤&#xff09; 文章目录 【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering&#xff08;分离对比协同过滤&#xff09;1. 来源2. 介绍3. 模型方法3.1…...

目前的网络情况与特点

现有网络无法进展安全管理与控制&#xff0c;缺乏可管理与安全性&#xff0c;一旦 网络出现病毒与网络攻击现象&#xff0c;将会涉与到个别部门部数据丢失与影 响相关的业务运作。 1 1.1 采用普通傻瓜式交换机 目前全所各部门采用的交换机根本上为 TP-LINK、D-LINK 10/100M 傻瓜…...

css选择器及其权重

1. 类型选择器 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-wid…...

RK3588平台开发系列讲解(项目篇)RKNN-Toolkit2 的使用

平台内核版本安卓版本RK3588Linux 5.10Android 12文章目录 一、RKNN-Toolkit2安装二、模型转换和模型推理三、性能和内存评估沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 NPU 是专门用于神经网络的处理单元。它旨在加速人工智能领域的神经网络算法,如机器视觉和自…...

C/C++基础讲解(九十九)之经典篇(第几天/排序)

C/C++基础讲解(九十九)之经典篇(第几天/排序) 程序之美 前言 很多时候,特别是刚步入大学的学子们,对于刚刚开展的计算机课程基本上是一团迷雾,想要弄明白其中的奥秘,真的要花费一些功夫,我和大家一样都是这么啃过来的,从不知到知知,懵懂到入门,每一步都走的很艰辛,…...

quickstart Guide快速入门

本文档参考backtrader官方文档&#xff0c;是官方文档的完整中文翻译&#xff0c;可作为backtrader中文教程、backtrader中文参考手册、backtrader中文开发手册、backtrader入门资料使用。 快速入门章节目录 快速入门使用平台从0到100&#xff1a;一步一步的演示基本设置设置现…...

Kubernetes 证书详解

K8S 证书介绍 在 Kube-apiserver 中提供了很多认证方式&#xff0c;其中最常用的就是 TLS 认证&#xff0c;当然也有 BootstrapToken&#xff0c;BasicAuth 认证等&#xff0c;只要有一个认证通过&#xff0c;那么 Kube-apiserver 即认为认证通过。下面就主要讲解 TLS 认证。 …...

Python常用数据结构

Python 提供了多种内置的数据结构&#xff0c;用于存储和组织数据。以下是一些常见的 Python 数据结构&#xff1a; 1.列表&#xff08;List&#xff09;&#xff1a;列表是一个有序、可变的数据集合&#xff0c;可以包含任意类型的元素。列表使用方括号 [] 表示&#xff0c;元…...

CompletableFuture详解-初遇者-很细

目录 一、创建异步任务 1. supplyAsync 2. runAsync 3.获取任务结果的方法 二、异步回调处理 1.thenApply和thenApplyAsync 2.thenAccept和thenAcceptAsync 2.thenRun和thenRunAsync 3.whenComplete和whenCompleteAsync 4.handle和handleAsync 三、多任务组合处理 1…...

【iOS】—— iOS中的相关锁

文章目录 自旋锁1.OSSpinLock2.os_unfair_lock3.atomic 互斥锁pthread_mutexsynchronizedobjc_sync_enterobjc_sync_exit注意事项 NSLockNSRecursiveLock信号量条件锁NSConditionNSConditionLock 读写锁总结 锁作为一种非强制的机制&#xff0c;被用来保证线程安全。每一个线程…...

表单重复提交:

1. 表单重复提交原因 当用户提交完请求&#xff0c;浏览器会记录最后一次请求的全部信息。用户按下功能键F5&#xff0c;就会发起浏览器记录的最后一次请求。如果最后一次请求为添加操作&#xff0c;那么此时刷新按钮就会再次提交数据&#xff0c;造成表单重复提交。 2. 表单…...

【0197】共享内存管理结构(shmem)之创建共享内存分配机制(Shared Memory Allocation)(2 - 2)

文章目录 1. 概述2. 初始化事务管理器 ShmemVariableCache2.1 从共享内存分配 VariableCacheData 大小内存空间2.1.1 分配对齐块2.2 内存空间清零相关文章: 【0195】共享内存管理结构(shmem)之概念篇(1) 【0196】共享内存管理结构(shmem)之创建共享内存分配机制(Shared…...

ChatGPT国内免费使用方法有哪些?

目录 ChatGPT介绍:一、ChatGPT是什么?二、ChatGPT发展:三、ChatGPT 优点:四、国内使用ChatGPT方法五、结语: ChatGPT介绍: 一、ChatGPT是什么? ChatGPT 是一个基于语言模型 GPT-3.5 的聊天机器人&#xff0c;ChatGPT模型是Instruct GPT的姊妹模型&#xff08;siblingmodel&a…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题&#xff1a; 下面创建一个简单的Flask RESTful API示例。首先&#xff0c;我们需要创建环境&#xff0c;安装必要的依赖&#xff0c;然后…...

React Native 导航系统实战(React Navigation)

导航系统实战&#xff08;React Navigation&#xff09; React Navigation 是 React Native 应用中最常用的导航库之一&#xff0c;它提供了多种导航模式&#xff0c;如堆栈导航&#xff08;Stack Navigator&#xff09;、标签导航&#xff08;Tab Navigator&#xff09;和抽屉…...

Mac软件卸载指南,简单易懂!

刚和Adobe分手&#xff0c;它却总在Library里给你写"回忆录"&#xff1f;卸载的Final Cut Pro像电子幽灵般阴魂不散&#xff1f;总是会有残留文件&#xff0c;别慌&#xff01;这份Mac软件卸载指南&#xff0c;将用最硬核的方式教你"数字分手术"&#xff0…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理

引言 Bitmap&#xff08;位图&#xff09;是Android应用内存占用的“头号杀手”。一张1080P&#xff08;1920x1080&#xff09;的图片以ARGB_8888格式加载时&#xff0c;内存占用高达8MB&#xff08;192010804字节&#xff09;。据统计&#xff0c;超过60%的应用OOM崩溃与Bitm…...

2023赣州旅游投资集团

单选题 1.“不登高山&#xff0c;不知天之高也&#xff1b;不临深溪&#xff0c;不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

技术栈RabbitMq的介绍和使用

目录 1. 什么是消息队列&#xff1f;2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...