当前位置: 首页 > news >正文

【Netty】Promise 源码分析(十七)

文章目录

  • 前言
  • 一、Promise 接口
  • 二、Netty 的 DefaultPromise
    • 2.1、设置任务的成功或失败
    • 2.2、获取 Future 任务执行结果和添加监听事件
  • 三、Netty 的 DefaultChannelPromise
  • 总结

前言

回顾Netty系列文章:

  • Netty 概述(一)
  • Netty 架构设计(二)
  • Netty Channel 概述(三)
  • Netty ChannelHandler(四)
  • ChannelPipeline源码分析(五)
  • 字节缓冲区 ByteBuf (六)(上)
  • 字节缓冲区 ByteBuf(七)(下)
  • Netty 如何实现零拷贝(八)
  • Netty 程序引导类(九)
  • Reactor 模型(十)
  • 工作原理详解(十一)
  • Netty 解码器(十二)
  • Netty 编码器(十三)
  • Netty 编解码器(十四)
  • 自定义解码器、编码器、编解码器(十五)
  • Future 源码分析(十六)

本篇文章我就就来分析一下可写的 Future,也就是 promise,Netty 中的 Promise 扩展自 Netty 的 Future。

一、Promise 接口

在 Netty 中,Promise 接口是一种特殊的可写的 Future。 Promise 的核心源码如下:

public interface Promise<V> extends Future<V> {Promise<V> setSuccess(V var1);boolean trySuccess(V var1);Promise<V> setFailure(Throwable var1);boolean tryFailure(Throwable var1);boolean setUncancellable();Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> await() throws InterruptedException;Promise<V> awaitUninterruptibly();Promise<V> sync() throws InterruptedException;Promise<V> syncUninterruptibly();
}

从上面可以看出,Promise 就是一个可写的 Future。在 Future 机制中,业务逻辑所在任务执行的状态(成功或失败)是在 Future 中实现的;而在 Promise 中,可以在业务逻辑中控制任务的执行结果,相比 Future 更加灵活。
以下是一个 Promise 的示例(伪代码)。

//异步的耗时任务接收一个 Promise
public Promise asynchronousFunction() {Promise promise = new PromiseImpl();Object result = null;return =search()  //业务逻辑if (sucess) {promise.setSuccess(result); //通知 promise 当前异步任务成功了,并传入结果} else if (failed) {promise.setFailure(reason);//通知 promise 当前异步任务失败了} else if (error) {promise.setFailure(error);//通知 promise 当前异步任务发生了异常}
}//调用异步的耗时操作
Promise promise = asynchronousFunction(promise);//会立即返回 promise//添加成功处理 / 失败处理 / 异步处理等事件
promise.addListener();//例如:可以添加成功后的执行事件//继续做其他事件,不需要理会 asynchronousFunction 何时结束
doOtherThings();

在 Netty 中,Promise 继承了 Future,因此也具备了 Future 的所有功能。在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败。
Netty 的常用 Promise 类有 DefaultPromise 类,这是 Promise 实现的基础,DefaultChannelPromise 是 DefaultPromise 的子类,加入了channel属性。

二、Netty 的 DefaultPromise

Netty 中涉及异步操作的地方都使用了 Promise 。例如,下面是服务器/客户端启动时的注册任务,最终会调用 Unsafe 的 register,调用过程中会传入一个Promise 。Unsafe 进行事件的注册时调用 Promise 可以设置成功或者失败。

//SingleThreadEventLoop.java
public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;
}//AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));} else if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));} else {AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} else {try {eventLoop.execute(new Runnable() {public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}
}

DefaultPromise 提供的功能可以分为两个部分;一个是为调用者提供 get()和addListen()用于获取 Future 任务执行结果和添加监听事件;另一部分是为业务处理任务提供setSucess()等方法设置任务的成功或失败。

2.1、设置任务的成功或失败

DefaultPromise 核心源码如下:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {public Promise<V> setSuccess(V result) {if (this.setSuccess0(result)) {return this;} else {throw new IllegalStateException("complete already: " + this);}}public boolean trySuccess(V result) {return this.setSuccess0(result);}public Promise<V> setFailure(Throwable cause) {if (this.setFailure0(cause)) {return this;} else {throw new IllegalStateException("complete already: " + this, cause);}}public boolean tryFailure(Throwable cause) {return this.setFailure0(cause);}public boolean setUncancellable() {if (RESULT_UPDATER.compareAndSet(this, (Object)null, UNCANCELLABLE)) {return true;} else {Object result = this.result;return !isDone0(result) || !isCancelled0(result);}}public boolean isSuccess() {Object result = this.result;return result != null && result != UNCANCELLABLE && !(result instanceof DefaultPromise.CauseHolder);}public boolean isCancellable() {return this.result == null;}//...}

2.2、获取 Future 任务执行结果和添加监听事件

DefaultPromise 的get方法有 3 个。

无参数的get会阻塞等待;
有参数的get会等待指定事件,若未结束就抛出超时异常,这两个get是在其父类 AbstractFuture中实现的。getNow()方法则会立马返回结果。

源码如下:

public V getNow() {Object result = this.result;return !(result instanceof DefaultPromise.CauseHolder) && result != SUCCESS && result != UNCANCELLABLE ? result : null;
}public V get() throws InterruptedException, ExecutionException {Object result = this.result;if (!isDone0(result)) {this.await();result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {Object result = this.result;if (!isDone0(result)) {if (!this.await(timeout, unit)) {throw new TimeoutException();}result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}

await() 方法判断 Future 任务是否结束,之后获取 this 锁,如果任务未完成则调用 Object 的 wait()等待。源码如下:

public Promise<V> await() throws InterruptedException { if (this.isDone()) {return this;} else if (Thread.interrupted()) {throw new InterruptedException(this.toString());} else {this.checkDeadLock();synchronized(this) {while(!this.isDone()) {this.incWaiters();try {this.wait();} finally {this.decWaiters();}}return this;}}//...
}

addListener 方法被调用时,将传入的回调传入listeners对象中。如果监听多于 1 个,会创建DeflaultFutureListeners对象将回调方法保存在一个数组中。
removeListener会将listeners设置为null(只有一个时)或从数组中移除(多个回调时)。源码如下。

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.addListener0(listener);}if (this.isDone()) {this.notifyListeners();}return this;
}   public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;int var5 = 0;while(var5 < var4) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener != null) {this.addListener0(listener);++var5;continue;}}}if (this.isDone()) {this.notifyListeners();}return this;
}public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.removeListener0(listener);return this;}
}public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;for(int var5 = 0; var5 < var4; ++var5) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener == null) {break;}this.removeListener0(listener);}return this;}
}

在添加监听器的过程中,如果任务刚好执行完毕 done(),则立即触发监听事件。触发监听通过notifyListeners()实现。主要逻辑如下:
如果当前addListener的线程(准确来说应该是调用了notifyListeners的线程,因为addListener和setSuccess都会调用notifyListeners和 Promise 内的线程池)与当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池中执行;
而如果是执行 Future 任务的线程池中的setSuccess时,调用notifyListeners(),会放在当前线程中执行。内部维护了notifyListeners用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次遍历并调用operationComplete。

三、Netty 的 DefaultChannelPromise

DefaultChannelPromise 是 DefaultPromise 的子类,内部维护了一个通道变量 channel。
Promise 机制相关的方法都是调用父类方法。
除此之外,DefaultChannelPromise 还实现了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,可以将ChannelFuture注册到ChannelFlushPromiseNotifier类,当有数据写入或到达checkpoint时使用。
核心源码如下:

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {private final Channel channel;private long checkpoint;//...public Channel channel() {return this.channel;}public ChannelPromise setSuccess() {return this.setSuccess((Void)null);}public ChannelPromise setSuccess(Void result) {super.setSuccess(result);return this;}public boolean trySuccess() {return this.trySuccess((Object)null);}public ChannelPromise setFailure(Throwable cause) {super.setFailure(cause);return this;}//...public ChannelPromise promise() {return this;}protected void checkDeadLock() {if (this.channel().isRegistered()) {super.checkDeadLock();}}public ChannelPromise unvoid() {return this;}public boolean isVoid() {return false;}
}

总结

以上我们分析了 Netty 中的 Promise,知道了它是扩展自 Netty 的 Future,是一个可写的 Future。

相关文章:

【Netty】Promise 源码分析(十七)

文章目录 前言一、Promise 接口二、Netty 的 DefaultPromise2.1、设置任务的成功或失败2.2、获取 Future 任务执行结果和添加监听事件 三、Netty 的 DefaultChannelPromise总结 前言 回顾Netty系列文章&#xff1a; Netty 概述&#xff08;一&#xff09;Netty 架构设计&…...

测牛学堂:2023最新自动化软件测试教程之python基础(字符串常用api总结)

python字符串常用API总结 1 count 查找某个字符在整个字符串中出现的次数 2 capitalize 将字符串的第一个字符转换为大写 3 center(width,fillchar) 返回一个指定宽度的字符串&#xff0c;fillchar为填充的字符&#xff0c;默认是空格&#xff0c;常用* str1 分隔线 print(st…...

【信号变化检测】使用新颖的短时间条件局部峰值速率特征进行信号变化/事件/异常检测(Matlab代码实现)

、 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭…...

MQTT GUI 客户端 可视化管理工具

MQTT GUI 客户端 可视化管理工具 介绍 多标签页管理&#xff0c;同时打开多个连接提供原生性能&#xff0c;并且比使用 Electron 等 Web 技术开发的同等应用程序消耗的资源少得多支持 MQTT v5.0 以及 MQTT v3.1.1 协议&#xff0c;支持通过 WebSocket 连接至 MQTT 服务器以树…...

计算机硬件系统 — 冯诺依曼体系结构运行原理解析

目录 文章目录 目录计算机系统计算机硬件系统&#xff08;冯诺依曼体系结构&#xff09;PC 主机硬件CPU&#xff08;中央处理器&#xff09;CPU 的组成部分CPU 总线控制器单元运算器单元寄存器组超线程与多核架构三级高速缓存为什么需要缓存三级缓存结构 CPU 的指令集指令集的类…...

10.Linux查看文件内容

在 Linux 中&#xff0c;可以使用多种命令来查看文件内容。以下是几个常用的命令及其用法&#xff1a; cat 命令&#xff1a;以行为单位显示整个文件内容。 cat file.txt # 显示名为 file.txt 的文件内容less 命令&#xff1a;分页显示文件内容&#xff0c;可向前/后翻页、搜索…...

API接口测试—详情版(拼多多根据ID取商品详情)

一、为什么要做接口测试 做接口测试的原因主要有以下几个方面&#xff1a; 1. 确保接口功能正确性&#xff1a;接口是不同软件系统或者不同模块之间的数据传输和交互的通道&#xff0c;通过接口测试能够确保不同系统或者模块之间传递的信息准确无误&#xff0c;从而保证了整个…...

【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering(分离对比协同过滤)

【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering&#xff08;分离对比协同过滤&#xff09; 文章目录 【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering&#xff08;分离对比协同过滤&#xff09;1. 来源2. 介绍3. 模型方法3.1…...

目前的网络情况与特点

现有网络无法进展安全管理与控制&#xff0c;缺乏可管理与安全性&#xff0c;一旦 网络出现病毒与网络攻击现象&#xff0c;将会涉与到个别部门部数据丢失与影 响相关的业务运作。 1 1.1 采用普通傻瓜式交换机 目前全所各部门采用的交换机根本上为 TP-LINK、D-LINK 10/100M 傻瓜…...

css选择器及其权重

1. 类型选择器 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-wid…...

RK3588平台开发系列讲解(项目篇)RKNN-Toolkit2 的使用

平台内核版本安卓版本RK3588Linux 5.10Android 12文章目录 一、RKNN-Toolkit2安装二、模型转换和模型推理三、性能和内存评估沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 NPU 是专门用于神经网络的处理单元。它旨在加速人工智能领域的神经网络算法,如机器视觉和自…...

C/C++基础讲解(九十九)之经典篇(第几天/排序)

C/C++基础讲解(九十九)之经典篇(第几天/排序) 程序之美 前言 很多时候,特别是刚步入大学的学子们,对于刚刚开展的计算机课程基本上是一团迷雾,想要弄明白其中的奥秘,真的要花费一些功夫,我和大家一样都是这么啃过来的,从不知到知知,懵懂到入门,每一步都走的很艰辛,…...

quickstart Guide快速入门

本文档参考backtrader官方文档&#xff0c;是官方文档的完整中文翻译&#xff0c;可作为backtrader中文教程、backtrader中文参考手册、backtrader中文开发手册、backtrader入门资料使用。 快速入门章节目录 快速入门使用平台从0到100&#xff1a;一步一步的演示基本设置设置现…...

Kubernetes 证书详解

K8S 证书介绍 在 Kube-apiserver 中提供了很多认证方式&#xff0c;其中最常用的就是 TLS 认证&#xff0c;当然也有 BootstrapToken&#xff0c;BasicAuth 认证等&#xff0c;只要有一个认证通过&#xff0c;那么 Kube-apiserver 即认为认证通过。下面就主要讲解 TLS 认证。 …...

Python常用数据结构

Python 提供了多种内置的数据结构&#xff0c;用于存储和组织数据。以下是一些常见的 Python 数据结构&#xff1a; 1.列表&#xff08;List&#xff09;&#xff1a;列表是一个有序、可变的数据集合&#xff0c;可以包含任意类型的元素。列表使用方括号 [] 表示&#xff0c;元…...

CompletableFuture详解-初遇者-很细

目录 一、创建异步任务 1. supplyAsync 2. runAsync 3.获取任务结果的方法 二、异步回调处理 1.thenApply和thenApplyAsync 2.thenAccept和thenAcceptAsync 2.thenRun和thenRunAsync 3.whenComplete和whenCompleteAsync 4.handle和handleAsync 三、多任务组合处理 1…...

【iOS】—— iOS中的相关锁

文章目录 自旋锁1.OSSpinLock2.os_unfair_lock3.atomic 互斥锁pthread_mutexsynchronizedobjc_sync_enterobjc_sync_exit注意事项 NSLockNSRecursiveLock信号量条件锁NSConditionNSConditionLock 读写锁总结 锁作为一种非强制的机制&#xff0c;被用来保证线程安全。每一个线程…...

表单重复提交:

1. 表单重复提交原因 当用户提交完请求&#xff0c;浏览器会记录最后一次请求的全部信息。用户按下功能键F5&#xff0c;就会发起浏览器记录的最后一次请求。如果最后一次请求为添加操作&#xff0c;那么此时刷新按钮就会再次提交数据&#xff0c;造成表单重复提交。 2. 表单…...

【0197】共享内存管理结构(shmem)之创建共享内存分配机制(Shared Memory Allocation)(2 - 2)

文章目录 1. 概述2. 初始化事务管理器 ShmemVariableCache2.1 从共享内存分配 VariableCacheData 大小内存空间2.1.1 分配对齐块2.2 内存空间清零相关文章: 【0195】共享内存管理结构(shmem)之概念篇(1) 【0196】共享内存管理结构(shmem)之创建共享内存分配机制(Shared…...

ChatGPT国内免费使用方法有哪些?

目录 ChatGPT介绍:一、ChatGPT是什么?二、ChatGPT发展:三、ChatGPT 优点:四、国内使用ChatGPT方法五、结语: ChatGPT介绍: 一、ChatGPT是什么? ChatGPT 是一个基于语言模型 GPT-3.5 的聊天机器人&#xff0c;ChatGPT模型是Instruct GPT的姊妹模型&#xff08;siblingmodel&a…...

基于大模型的 UI 自动化系统

基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

【位运算】消失的两个数字(hard)

消失的两个数字&#xff08;hard&#xff09; 题⽬描述&#xff1a;解法&#xff08;位运算&#xff09;&#xff1a;Java 算法代码&#xff1a;更简便代码 题⽬链接&#xff1a;⾯试题 17.19. 消失的两个数字 题⽬描述&#xff1a; 给定⼀个数组&#xff0c;包含从 1 到 N 所有…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

从零实现STL哈希容器:unordered_map/unordered_set封装详解

本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说&#xff0c;直接开始吧&#xff01; 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

均衡后的SNRSINR

本文主要摘自参考文献中的前两篇&#xff0c;相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程&#xff0c;其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt​ 根发送天线&#xff0c; n r n_r nr​ 根接收天线的 MIMO 系…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

c# 局部函数 定义、功能与示例

C# 局部函数&#xff1a;定义、功能与示例 1. 定义与功能 局部函数&#xff08;Local Function&#xff09;是嵌套在另一个方法内部的私有方法&#xff0c;仅在包含它的方法内可见。 • 作用&#xff1a;封装仅用于当前方法的逻辑&#xff0c;避免污染类作用域&#xff0c;提升…...

Docker拉取MySQL后数据库连接失败的解决方案

在使用Docker部署MySQL时&#xff0c;拉取并启动容器后&#xff0c;有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致&#xff0c;包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因&#xff0c;并提供解决方案。 一、确认MySQL容器的运行状态 …...