当前位置: 首页 > news >正文

quinn源码解析:QUIC数据包是如何发送的

quinn源码解析:QUIC数据包是如何发送的

  • 简介
  • QUIC协议中的概念
    • endpoint(端点)
    • connection(连接)
    • Stream(流)
    • Frame (帧)
  • 发包过程解析
    • SendStream::write_all
    • ConnectionDriver
    • EndpointDriver

简介

quinn是Rust编程语言中用于实现QUIC(Quick UDP Internet Connections)协议的一个crate(包)。它提供了一个高级别的API,用于构建基于QUIC的网络应用程序。quinn crate的设计目标是提供一个简洁、安全和高性能的QUIC实现。它内部使用了Rust的异步编程模型(async/await),使得编写异步网络代码更加方便和高效。
本文主要介绍其发送数据的流程

QUIC协议中的概念

endpoint(端点)

在QUIC(Quick UDP Internet Connections)协议中,Endpoint(端点)是指QUIC连接的一端,可以是客户端或服务器。每个端点都有自己的网络地址,并与其他端点进行通信以建立和管理QUIC连接。在quinn中,endpoint对应一个操作系统的socket。例如client的Endpoint创建时就是bind了一个本地的地址。

    pub fn client(addr: SocketAddr) -> io::Result<Self> {let socket = std::net::UdpSocket::bind(addr)?;let runtime = default_runtime().ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no async runtime found"))?;Self::new_with_runtime(EndpointConfig::default(),None,runtime.wrap_udp_socket(socket)?,runtime,)}

connection(连接)

两个endpoint之间可以建立connection,并且一个endpoint可以向多个endpoint建立连接。

注意与TCP不同的是,QUIC的一个socket可以同时向多个其他socket建立连接。而TCP中每一个连接都对应client和server端的两个socket。

在这里插入图片描述

Stream(流)

一条连接可以同时存在多条流,每条流上的数据相互独立,一条流发生阻塞不会影响其他流。(TCP相当于只有一条流,所以会有对头阻塞的缺陷。)

client的流ID为奇数,server的流ID为偶数

在这里插入图片描述

Frame (帧)

流是抽象出的概念,而实际上在链路上传输的只是不同的帧,不同流的帧中会有流ID用于标识此帧属于哪条流,接收端收到后根据流ID将对应的帧放入对应的流缓冲区。
在这里插入图片描述

发包过程解析

以官方的client Example为例。其关键步骤如下述伪代码所示,主要包括:创建endpoint、创建连接、创建流、最后写入数据。

 //创建endpointlet mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?; ...//创建连接let conn = endpoint.connect(remote, host)?.await.map_err(|e| anyhow!("failed to connect: {}", e))?;//创建流let (mut send, mut recv) = conn.open_bi().await.map_err(|e| anyhow!("failed to open stream: {}", e))?;//写数据send.write_all(request.as_bytes()).await.map_err(|e| anyhow!("failed to send request: {}", e))?;

SendStream::write_all

首先我们以流写入数据为切入点来看。
write_all接口实际上是产生了一个WriteAllFuture,数据会暂时放在WriteAll结构体里。当Runtime(默认为Tokio的运行时)下一次pollFuture时才会将数据写入到该流的缓冲区中。

impl<'a> Future for WriteAll<'a> {type Output = Result<(), WriteError>;fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {let this = self.get_mut();loop {if this.buf.is_empty() {return Poll::Ready(Ok(()));}let buf = this.buf;#将数据写入缓冲区let n = ready!(this.stream.execute_poll(cx, |s| s.write(buf)))?;this.buf = &this.buf[n..];}}
}

注意向流的缓冲区写数据时,是经过了流控逻辑的:当可写空间为0时,写操作会被block。可写空间一般由send_window-unacked_datasend_windowunacked_data都是连接级的,所有流都受此限制。send_window是开始时设置的,此值决定整个连接的发送缓冲区的峰值大小。当应用连接数较多时应该谨慎设置此值,避免因内存占用过多而引起OOM。

    /// Returns the maximum amount of data this is allowed to be written on the connectionpub(crate) fn write_limit(&self) -> u64 {(self.max_data - self.data_sent).min(self.send_window - self.unacked_data)}

写入的数据最终又被暂时放在SendBufferunacked_segments里。

impl SendBuffer {/// Append application data to the end of the streampub(super) fn write(&mut self, data: Bytes) {self.unacked_len += data.len();self.offset += data.len() as u64;self.unacked_segments.push_back(data);}
}

到这里,write_all这个操作就算是结束了。那么放入缓冲区的数据又是如何进一步被发送的呢?

ConnectionDriver

我们把视线回到 endpoint.connect(remote, host)?.await,在连接建立时,产生了一个ConnectionDriverFuture,此ConnectionDriver一产生就被丢进runtime中去持续地执行了。

        runtime.spawn(Box::pin(ConnectionDriver(conn.clone()).instrument(Span::current()),));

而这个ConnectionDriver在被poll时最终会调用Connection::poll_transmit–>Connection::populate_packet获取将要发送的帧

    fn populate_packet(&mut self,now: Instant,space_id: SpaceId,buf: &mut BytesMut,max_size: usize,pn: u64,) -> SentFrames {let mut sent = SentFrames::default();......// STREAMif space_id == SpaceId::Data {sent.stream_frames = self.streams.write_stream_frames(buf, max_size);self.stats.frame_tx.stream += sent.stream_frames.len() as u64;}sent}

StreamsState::write_stream_frames方法中从优先级队列中取出优先级最高的流并将其数据写入buf,如果流的数据都已发送完毕则将此流从优先级队列中取出。

pub(crate) fn write_stream_frames(&mut self,buf: &mut BytesMut,max_buf_size: usize,) -> StreamMetaVec {let mut stream_frames = StreamMetaVec::new();while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size {if max_buf_size.checked_sub(buf.len() + frame::Stream::SIZE_BOUND).is_none(){break;}//不同优先级的数量let num_levels = self.pending.len();//获取优先级最高的队列let mut level = match self.pending.peek_mut() {Some(x) => x,None => break,};// Poppping data from the front of the queue, storing as much data// as possible in a single frame, and enqueing sending further// remaining data at the end of the queue helps with fairness.// Other streams will have a chance to write data before we touch// this stream again.//从队列中拿到第一个流let id = match level.queue.get_mut().pop_front() {Some(x) => x,None => {debug_assert!(num_levels == 1,"An empty queue is only allowed for a single level");break;}};//拿到具体的流let stream = match self.send.get_mut(&id) {Some(s) => s,// Stream was reset with pending data and the reset was acknowledgedNone => continue,};// Reset streams aren't removed from the pending list and still exist while the peer// hasn't acknowledged the reset, but should not generate STREAM frames, so we need to// check for them explicitly.if stream.is_reset() {continue;}// Now that we know the `StreamId`, we can better account for how many bytes// are required to encode it.let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into());//从流中获取到本次要写的偏移量let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size);//如果流中的数据都已经发送完,则将此流从pending队列中移除let fin = offsets.end == stream.pending.offset()&& matches!(stream.state, SendState::DataSent { .. });if fin {stream.fin_pending = false;}if stream.is_pending() {if level.priority == stream.priority {// Enqueue for the same levellevel.queue.get_mut().push_back(id);} else {// Enqueue for a different level. If the current level is empty, drop itif level.queue.borrow().is_empty() && num_levels != 1 {// We keep the last level around even in empty form so that// the next insert doesn't have to reallocate the queuePeekMut::pop(level);} else {drop(level);}push_pending(&mut self.pending, id, stream.priority);}} else if level.queue.borrow().is_empty() && num_levels != 1 {// We keep the last level around even in empty form so that// the next insert doesn't have to reallocate the queuePeekMut::pop(level);}let meta = frame::StreamMeta { id, offsets, fin };trace!(id = %meta.id, off = meta.offsets.start, len = meta.offsets.end - meta.offsets.start, fin = meta.fin, "STREAM");//写入帧的头部meta.encode(encode_length, buf);// The range might not be retrievable in a single `get` if it is// stored in noncontiguous fashion. Therefore this loop iterates// until the range is fully copied into the frame.let mut offsets = meta.offsets.clone();while offsets.start != offsets.end {let data = stream.pending.get(offsets.clone());offsets.start += data.len() as u64;//写入具体数据buf.put_slice(data);}stream_frames.push(meta);}stream_frames}

到了这里,要发送的数据实际上还是暂存在缓冲区了。然后又以EndpointEvent::Transmit事件的方式通过channel发送到endpoint的协程里。

fn drive_transmit(&mut self) -> bool {let now = Instant::now();let mut transmits = 0;let max_datagrams = self.socket.max_transmit_segments();let capacity = self.inner.current_mtu();let mut buffer = BytesMut::with_capacity(capacity as usize);while let Some(t) = self.inner.poll_transmit(now, max_datagrams, &mut buffer) {transmits += match t.segment_size {None => 1,Some(s) => (t.size + s - 1) / s, // round up};// If the endpoint driver is gone, noop.let size = t.size;//将要发送的数据发送到endpoint协程let _ = self.endpoint_events.send((self.handle,EndpointEvent::Transmit(t, buffer.split_to(size).freeze()),));if transmits >= MAX_TRANSMIT_DATAGRAMS {// TODO: What isn't ideal here yet is that if we don't poll all// datagrams that could be sent we don't go into the `app_limited`// state and CWND continues to grow until we get here the next time.// See https://github.com/quinn-rs/quinn/issues/1126return true;}}false}

ConnectionDriver的任务到这里就完成了,总的来说ConnectionDriver的任务就是从流中取出数据,并最终将数据通过channel发送给endpoint

EndpointDriver

connection的逻辑类似,endpoints建立时就已经spawn了一个EndpointDriver在后台一直poll,正是在poll方法中会处理来自ConnectionDriver发来的events,并写入outgoing缓冲区中。

    fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool {use EndpointEvent::*;for _ in 0..IO_LOOP_BOUND {match self.events.poll_recv(cx) {Poll::Ready(Some((ch, event))) => match event {......//接受从ConnectionDriver发过来的Transmit,并写入到outgoing缓冲区中Transmit(t, buf) => {let contents_len = buf.len();self.outgoing.push_back(udp_transmit(t, buf));self.transmit_queue_contents_len = self.transmit_queue_contents_len.saturating_add(contents_len);}},Poll::Ready(None) => unreachable!("EndpointInner owns one sender"),Poll::Pending => {return false;}}}true}

drive_send中从outgoing缓冲区中取出数据并写入socket

 fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {self.send_limiter.start_cycle();let result = loop {if self.outgoing.is_empty() {break Ok(false);}if !self.send_limiter.allow_work() {break Ok(true);}//实际写入match self.socket.poll_send(cx, self.outgoing.as_slices().0) {Poll::Ready(Ok(n)) => {let contents_len: usize =self.outgoing.drain(..n).map(|t| t.contents.len()).sum();self.transmit_queue_contents_len = self.transmit_queue_contents_len.saturating_sub(contents_len);// We count transmits instead of `poll_send` calls since the cost// of a `sendmmsg` still linearly increases with number of packets.self.send_limiter.record_work(n);}Poll::Pending => {break Ok(false);}Poll::Ready(Err(e)) => {break Err(e);}}};self.send_limiter.finish_cycle();result}

至此,整个发送过程就算完了。写入socket的数据由具体的操作系统底层去实现了。

相关文章:

quinn源码解析:QUIC数据包是如何发送的

quinn源码解析&#xff1a;QUIC数据包是如何发送的 简介QUIC协议中的概念endpoint&#xff08;端点&#xff09;connection&#xff08;连接&#xff09;Stream&#xff08;流&#xff09;Frame (帧) 发包过程解析SendStream::write_allConnectionDriverEndpointDriver 简介 q…...

scss的高级用法——循环

周末愉快呀&#xff01;一起来学一点简单但非常有用的css小知识。 最近在一个项目中看到以下css class写法&#xff1a; 了解过tailwind css或者unocss的都知道&#xff0c;从命名就可以看出有以下样式&#xff1a; font-size: 30pxmargin-left: 5px;margin-top: 10px; 于是…...

Linux安装Chrome浏览器 -linux安装choeme

Linux 操作系统一般自带的浏览器是 FireFox&#xff0c;不过有些用户可能更喜欢 Google 出品的 Chrome 浏览器。本教程将介绍如何在 Linux 系统上安装 Chrome 浏览器&#xff0c;以及可能会遇到的一些问题解决方案。 下载 Chrome 安装包 需要下载 Chrome 的安装包。可以在 Go…...

六大排序(插入排序、希尔排序、冒泡排序、选择排序、堆排序、快速排序)未完

文章目录 排序一、 排序的概念1.排序&#xff1a;2.稳定性&#xff1a;3.内部排序&#xff1a;4.外部排序&#xff1a; 二、插入排序1.直接插入排序2.希尔排序 三、选择排序1.直接选择排序方法一方法二直接插入排序和直接排序的区别 2.堆排序 四、交换排序1.冒泡排序2.快速排序…...

JVM垃圾回收相关概念

目录 一、System.gc()的理解 二、内存溢出与内存泄露 &#xff08;一&#xff09;OOM &#xff08;二&#xff09;内存泄露 三、StopTheWorld 四、垃圾回收的并行与并发 五、安全点与安全区域 &#xff08;一&#xff09;安全点 &#xff08;二&#xff09;安全区域 …...

C++各种字符转换

C各种字符转换 一.如何将char数组转化为string类型二. string转char数组&#xff1a;参考 一.如何将char数组转化为string类型 在C中&#xff0c;可以使用string的构造函数或者赋值操作符来将char数组转换为string类型。 方法1&#xff1a;使用string的构造函数 const char* c…...

MSSQL-逻辑级常用命令

--SQL Server 查询表的记录数 --one: 使用系统表. SELECT object_name (i.id) TableName, rows as RowCnt FROM sysindexes i INNER JOIN sysObjects o ON (o.id i.id AND o.xType U ) WHERE indid < 2 ORDER BY rows desc ————————————…...

【如何学习Python自动化测试】—— 时间等待

3 、 时间等待 在做自动化测试时&#xff0c;难免会碰到一些问题&#xff0c;比如你在脚本中操作某个对象时&#xff0c; 页面还没有加载出来&#xff0c;你的操作语句已经被执行&#xff0c;从而导致脚本执行失败&#xff0c;针对这样的问题 webdriver 提供了等待操作&#xf…...

《数字图像处理-OpenCV/Python》连载(44)图像的投影变换

《数字图像处理-OpenCV/Python》连载&#xff08;44&#xff09;图像的投影变换 本书京东优惠购书链接&#xff1a;https://item.jd.com/14098452.html 本书CSDN独家连载专栏&#xff1a;https://blog.csdn.net/youcans/category_12418787.html 第 6 章 图像的几何变换 几何变…...

AI机器学习 | 基于librosa库和使用scikit-learn库中的分类器进行语音识别

专栏集锦&#xff0c;大佬们可以收藏以备不时之需 Spring Cloud实战专栏&#xff1a;https://blog.csdn.net/superdangbo/category_9270827.html Python 实战专栏&#xff1a;https://blog.csdn.net/superdangbo/category_9271194.html Logback 详解专栏&#xff1a;https:/…...

Asp.net MVC Api项目搭建

整个解决方案按照分层思想来划分不同功能模块&#xff0c;以提供User服务的Api为需求&#xff0c;各个层次的具体实现如下所示&#xff1a; 1、新建数据库User表 数据库使用SQLExpress版本&#xff0c;表的定义如下所示&#xff1a; CREATE TABLE [dbo].[User] ([Id] …...

C语言中文网 - Shell脚本 - 8

第1章 Shell基础&#xff08;开胃菜&#xff09; 8. Linux Shell命令提示符 启动 Linux 桌面环境自带的终端模拟包&#xff0c;或者从 Linux 控制台登录后&#xff0c;便可以看到 Shell 命令提示符。看见命令提示符就意味着可以输入命令了。命令提示符不是命令的一部分&#x…...

性能测试学习——项目环境搭建和Jmete学习二

项目环境搭建、Jmeter学习二 环境的部署虚拟机的安装虚拟机中添加项目操作步骤 使用环境的注意事项Jmeter的安装和简单使用Jemter的使用的进阶Jemter元件 Jmeter属性执行顺序和作用域作用域以自定义用户变量和用户参数(前置处理器)为例如何解决用户变量和线程组同级时&#xff…...

C++标准模板库(STL)-map介绍

C标准库中的map是一种关联容器&#xff0c;它提供了键值对的映射关系。每个键值对中的键都是唯一的&#xff0c;通过键可以访问对应的值。 map基本操作 插入元素&#xff1a; 使用insert函数插入元素&#xff0c;该函数有两种形式&#xff1a; // 插入一个pair<const Ke…...

使用docker部署ELK日志框架-Elasticsearch

一、ELK知识了解 1-ELK组件 工作原理&#xff1a; &#xff08;1&#xff09;在所有需要收集日志的服务器上部署Logstash&#xff1b;或者先将日志进行集中化管理在日志服务器上&#xff0c;在日志服务器上部署 Logstash。 &#xff08;2&#xff09;Logstash 收集日志&#…...

第7章 模式匹配与正则表达式

目录 1. 不用正则表达式来查找文本模式2. 用正则表达式来查找文本模式2.1 创建正则表达式&#xff08;Regex&#xff09;对象2.2 匹配Regex对象 3. 用正则表达式匹配更多模式3.1 利用括号分组3.2 用管道匹配多个分组3.3 用问号实现可选匹配3.4 用星号匹配零次或多次3.5 用加号匹…...

单元测试实战(三)JPA 的测试

为鼓励单元测试&#xff0c;特分门别类示例各种组件的测试代码并进行解说&#xff0c;供开发人员参考。 本文中的测试均基于JUnit5。 单元测试实战&#xff08;一&#xff09;Controller 的测试 单元测试实战&#xff08;二&#xff09;Service 的测试 单元测试实战&am…...

初刷leetcode题目(3)——数据结构与算法

&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️Take your time ! &#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️…...

76基于matlab的免疫算法求解配送中心选址问题,根据配送地址确定最佳配送中心地址位置。

基于matlab的免疫算法求解配送中心选址问题&#xff0c;根据配送地址确定最佳配送中心地址位置。数据可更换自己的&#xff0c;程序已调通&#xff0c;可直接运行。 76matlab免疫算法配送中心选址 (xiaohongshu.com)...

C++二分查找算法:找到 Alice 和 Bob 可以相遇的建筑

本文涉及的基础知识点 二分查找算法合集 离线查询 题目 给你一个下标从 0 开始的正整数数组 heights &#xff0c;其中 heights[i] 表示第 i 栋建筑的高度。 如果一个人在建筑 i &#xff0c;且存在 i < j 的建筑 j 满足 heights[i] < heights[j] &#xff0c;那么这个…...

python打卡day49

知识点回顾&#xff1a; 通道注意力模块复习空间注意力模块CBAM的定义 作业&#xff1a;尝试对今天的模型检查参数数目&#xff0c;并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

uniapp 字符包含的相关方法

在uniapp中&#xff0c;如果你想检查一个字符串是否包含另一个子字符串&#xff0c;你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的&#xff0c;但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...

LabVIEW双光子成像系统技术

双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制&#xff0c;展现出显著的技术优势&#xff1a; 深层组织穿透能力&#xff1a;适用于活体组织深度成像 高分辨率观测性能&#xff1a;满足微观结构的精细研究需求 低光毒性特点&#xff1a;减少对样本的损伤…...

DBLP数据库是什么?

DBLP&#xff08;Digital Bibliography & Library Project&#xff09;Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高&#xff0c;数据库文献更新速度很快&#xff0c;很好地反映了国际计算机科学学术研…...

基于江科大stm32屏幕驱动,实现OLED多级菜单(动画效果),结构体链表实现(独创源码)

引言 在嵌入式系统中&#xff0c;用户界面的设计往往直接影响到用户体验。本文将以STM32微控制器和OLED显示屏为例&#xff0c;介绍如何实现一个多级菜单系统。该系统支持用户通过按键导航菜单&#xff0c;执行相应操作&#xff0c;并提供平滑的滚动动画效果。 本文设计了一个…...