当前位置: 首页 > news >正文

WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue

WebRTC服务质量(01)- Qos概述
WebRTC服务质量(02)- RTP协议
WebRTC服务质量(03)- RTCP协议
WebRTC服务质量(04)- 重传机制(01) RTX NACK概述
WebRTC服务质量(05)- 重传机制(02) NACK判断丢包
WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包
WebRTC服务质量(07)- 重传机制(04) 接收NACK消息
WebRTC服务质量(08)- 重传机制(05) RTX机制
WebRTC服务质量(09)- Pacer机制(01) 流程概述
WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(11)- Pacer机制(03) IntervalBudget
WebRTC服务质量(12)- Pacer机制(04) 向Pacer中插入数据

一、前言:

RoundRobinPacketQueue 是Pacer模块当中一个非常重要的循环队列模块,主要目的是在多流场景中,根据每个流的 优先级 以及流内的 RTP 包的 入队顺序类型 和其他因素,动态决定数据包的发送顺序。

二、类关系图:

2.1、类图:

在这里插入图片描述

关键部分加了注释,便于理解:

  1. 多流支持: 每个流按 SSRC(Synchronization Source Identifier,同步信源标识符)进行区分,每个流维护独立的优先级队列。
  2. 多级优先级调度: 不同流间按优先级调度(基于 StreamPrioKey),单个流内的包按严格的队列顺序管理。
  3. 静态与动态属性结合: 例如,属性包括每个包的优先级(priority)、入队时间(enqueue_time)、重传标志、带宽开销等。
  4. 分组调度: 使用轮询(Round-Robin)机制,从不同流根据流的权重和优先级选取包发送,提高音频、视频和反馈等多种类型数据的实时性。

2.2、重要成员关系:

在这里插入图片描述

  1. stream_priorities当中存放StreamPrioKey和ssrc对应关系;
  2. streams当中存放ssrc和Stream的对应关系;
  3. 这样就建立了StreamPrioKey和Stream之间的关系;
  4. 反过来,也可以通过Stream当中的priority_it找到stream_priorities当中的某一项;

小结下:重要成员变量的功能

成员变量功能
streams_保存所有流,键是流的 ssrc,每条流存有独立队列和优先级信息。
stream_priorities_将所有流按照 StreamPrioKey 排列,用于实现流间优先级调度。
enqueue_times_一个多重集合,保存所有包的入队时间,便于快速找到最早入队的包时间。
size_packets_size_分别记录队列中包的个数和总字节数,动态调整。
transport_overhead_per_packet_计算包传输的额外开销(如包头)。
time_last_updated_用于统计队列的更新时间,辅助计算入队和等待时间等。

三、重要函数:

3.1、Push

Push 会根据包的优先级、流的权重、总队列大小,以及包的类型等,将包插入到对应的流(Stream)队列,并更新其他与队列状态关联的元数据。总体思路如下:

处理一个新包时:

  1. 确定该包所属的流,如果不存在该流,则创建一个 Stream 对象。
  2. 将包包装成为 QueuedPacket 并插入到流的优先级队列。
  3. 更新流的优先级键 StreamPrioKey,并在 stream_priorities_ 中重新排序。
  4. 更新队列元数据(如队列包大小、队列时间等)。
void RoundRobinPacketQueue::Push(int priority,Timestamp enqueue_time,uint64_t enqueue_order,std::unique_ptr<RtpPacketToSend> packet) {RTC_DCHECK(packet->packet_type().has_value());if (size_packets_ == 0) {// Single packet fast-path.single_packet_queue_.emplace(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.end(), std::move(packet)));UpdateQueueTime(enqueue_time);single_packet_queue_->SubtractPauseTime(pause_time_sum_);size_packets_ = 1;size_ += PacketSize(*single_packet_queue_);} else {MaybePromoteSinglePacketToNormalQueue();Push(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.insert(enqueue_time), std::move(packet)));}
}void RoundRobinPacketQueue::Push(QueuedPacket packet) {auto stream_info_it = streams_.find(packet.Ssrc());if (stream_info_it == streams_.end()) {stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;stream_info_it->second.priority_it = stream_priorities_.end();stream_info_it->second.ssrc = packet.Ssrc();}Stream* stream = &stream_info_it->second;if (stream->priority_it == stream_priorities_.end()) {RTC_CHECK(!IsSsrcScheduled(stream->ssrc));stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());} else if (packet.Priority() < stream->priority_it->first.priority) {stream_priorities_.erase(stream->priority_it);stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());}if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {packet.UpdateEnqueueTimeIterator(enqueue_times_.insert(packet.EnqueueTime()));} else {UpdateQueueTime(packet.EnqueueTime());packet.SubtractPauseTime(pause_time_sum_);size_packets_ += 1;size_ += PacketSize(packet);}stream->packet_queue.push(packet);
}
  • 从streams中找packet所属的Ssrc的stream,如果没有,则在streams中插入一项;
  • 查看stream的priority_it是否等于stream_priorities_的end()
    • 如果相等,则在stream_priorities_中插入一项;
    • 否则,如果新包的优先高(注意,优先级数值越小表示优先级越高),则更新其ssrc对应队列的优先级;
  • 更新队列总时长;
  • 入队时间减去暂停时间(一般不会有暂停);
  • 队列总包数+1;
  • 队列总字节大小 = 包的负载大小+Padding大小;
  • 插入到stream中的packet_queue中;

3.2、Pop

Pop 会轮询不同的流并从当前优先级最高的流中取出一个包发送,同时维护包的发送顺序。总体思路如下:

  1. 调用 GetHighestPriorityStream 获取当前优先级最高的流。
  2. 从该流的优先级队列(PriorityPacketQueue)中取出队首包,并更新流的状态(如剩余大小、时间等)。
  3. 如果该流没有剩余包,删除对应的流优先级键。
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {if (single_packet_queue_.has_value()) {RTC_DCHECK(stream_priorities_.empty());std::unique_ptr<RtpPacketToSend> rtp_packet(single_packet_queue_->RtpPacket());single_packet_queue_.reset();queue_time_sum_ = TimeDelta::Zero();size_packets_ = 0;size_ = DataSize::Zero();return rtp_packet;}RTC_DCHECK(!Empty());Stream* stream = GetHighestPriorityStream();const QueuedPacket& queued_packet = stream->packet_queue.top();stream_priorities_.erase(stream->priority_it);// Calculate the total amount of time spent by this packet in the queue// while in a non-paused state. Note that the |pause_time_sum_ms_| was// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and// by subtracting it now we effectively remove the time spent in in the// queue while in a paused state.TimeDelta time_in_non_paused_state =time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;queue_time_sum_ -= time_in_non_paused_state;RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());enqueue_times_.erase(queued_packet.EnqueueTimeIterator());// Update |bytes| of this stream. The general idea is that the stream that// has sent the least amount of bytes should have the highest priority.// The problem with that is if streams send with different rates, in which// case a "budget" will be built up for the stream sending at the lower// rate. To avoid building a too large budget we limit |bytes| to be within// kMaxLeading bytes of the stream that has sent the most amount of bytes.DataSize packet_size = PacketSize(queued_packet);stream->size =std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);max_size_ = std::max(max_size_, stream->size);size_ -= packet_size;size_packets_ -= 1;RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());stream->packet_queue.pop();// If there are packets left to be sent, schedule the stream again.RTC_CHECK(!IsSsrcScheduled(stream->ssrc));if (stream->packet_queue.empty()) {stream->priority_it = stream_priorities_.end();} else {int priority = stream->packet_queue.top().Priority();stream->priority_it = stream_priorities_.emplace(StreamPrioKey(priority, stream->size), stream->ssrc);}return rtp_packet;
}
  • 获得优先级最高的stream;

  • 从stream的packet_queue当中取出第一个包;

  • 将stream在stream_priorities_中的项删除;

    • 思考下虽然包删除了,但是stream还在,为啥要删除?

      答案:因为stream_priorities_ 是multimap,允许出现相同key,也就是说,这个包没有,它的stream优先级全靠它撑着(因为是先pop优先级高的包),所以,现在不应该由它撑着了,换下一个优先级最高的包撑着。

  • 计算Packet入队后距离现在的时间(不包括暂停时间);

  • 将这段时间从队列的总时间减去;

  • enqueue_times_中将Packet的项删除;

  • 总包数减去1;

  • 总字节数减去包的字节数;

  • 将包从stream中的queue中弹出;

  • 如果stream中的队列为空,则令stream的priority_it指向stream_priorities_end();

  • 否则,从stream队列头部取出Packet,将该Packet的priority插入到stream_priorities_中;

四、优先级调度:

  • 每个流有单独的优先级队列(PriorityPacketQueue),保存 QueuedPacket 对象。

  • QueuedPacket 是一个包装类,表示单个 RTP 数据包及其附属信息(例如入队时间、优先级、是否为重传包等)。

  • 列表中的流按 StreamPrioKey 保存在 stream_priorities_ 中,通过此键决定流次序:

    struct StreamPrioKey {int priority;    // 流的优先级,数值越低,优先级越高DataSize size;   // 数据包大小,用于平衡负载
    };
    

    优先规则:优先级低(priority 值小) > 数据包大小小(size 值小)。

五、轮询调度:

  • 核心逻辑通过 RoundRobin 的方式轮询多个流。但由于流可能有不同的优先级,某些流会被更多次轮询到。
  • 函数 GetHighestPriorityStream 定位当前最高优先级的流,从流对应的队列中取得包然后发送。

六、Stream定义与管理:

每个流由 Stream 类表示,Stream 是该流独有的数据结构,包括:

  • 当前队列状态,如总字节数、包大小和优先级。
  • 内部维护单独的优先级队列(PriorityPacketQueue)。
  • 调度时实时更新优先级,确保新加入高优先级包时能调整队列次序。

七、其他特性:

7.1、重传支持:

  • 标记是否重传的包,在出队时可能依据该标记进行特殊处理。

7.2、时间相关:

  • queue_time_sum_pause_time_sum_ 用于统计包在队列中的存留时间,这对于带宽控制和流量管理很有用。
  • 提供接口如 AverageQueueTime 计算平均队列时间,用于监控流的实时性。

7.3、队列字节限制:

队列有最大可存储的字节数(max_size_),以防止占用过多资源。

7.4、暂停/恢复功能:

可以通过 SetPauseState 暂停或者恢复队列处理。劝你最好别用!!!

八、总结:

RoundRobinPacketQueue 是一个高效的多流、多优先级调度队列,适用于 RTP 媒体数据的分组发送场景。它通过流内、流间的双重调度机制,结合优先级动态提升、统计队列时间和暂停控制等特性,确保在带宽有限的网络环境中最大程度提高数据的实时性和发送效率,是 WebRTC Pacer 模块的核心部分。

相关文章:

WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue

WebRTC服务质量&#xff08;01&#xff09;- Qos概述 WebRTC服务质量&#xff08;02&#xff09;- RTP协议 WebRTC服务质量&#xff08;03&#xff09;- RTCP协议 WebRTC服务质量&#xff08;04&#xff09;- 重传机制&#xff08;01) RTX NACK概述 WebRTC服务质量&#xff08;…...

数据库的数据被清除了,该如何恢复?

当数据库的数据被清除时&#xff0c;恢复数据的难度和可能性取决于多种因素&#xff0c;包括数据清除的方式、数据库的类型、是否有备份等。以下是一些常见的数据库数据恢复方法&#xff1a; 一、基于备份的恢复 使用备份文件&#xff1a; 如果数据库有定期的备份&#xff0c…...

【西安电子科技大学考研】25官方复试专业课参考书目汇总

初试已经顺利考完啦、成绩已经公布&#xff0c;现在已经有很多同学来问学长学姐&#xff0c;复试参考书有哪些&#xff0c;复试应该做好哪些准备。故此学长学姐给大家整理好了西安电子科技大学各个学院的复试参考书目录&#xff0c;有需要的同学可以参考一下哈。大家可以结合本…...

【理解机器学习中的过拟合与欠拟合】

在机器学习中&#xff0c;模型的表现很大程度上取决于我们如何平衡“过拟合”和“欠拟合”。本文通过理论介绍和代码演示&#xff0c;详细解析过拟合与欠拟合现象&#xff0c;并提出应对策略。主要内容如下&#xff1a; 什么是过拟合和欠拟合&#xff1f; 如何防止过拟合和欠拟…...

fastjson诡异报错

1、环境以及报错描述 1.1 环境 操作系统为中标麒麟、cpu 为国产鲲鹏服务器。 jdk为openjdk version 1.8.0._242 1.2 错误 com.alibaba.fastjson2.JSONException: syntax error : f at com.alibaba.fastjson2.JSONReaderUTF16.readBoolValue(JSONReaderUTF16.java:6424) at c…...

面经zhenyq

如何去实现分层的动画效果&#xff1f; 在Unity中实现分层的动画效果&#xff0c;可以通过Animator的 Layer 功能实现。以下是详细步骤&#xff1a; 1. 什么是分层动画&#xff1f; 分层动画允许在同一个角色的不同部分同时播放独立的动画。例如&#xff1a; 上半身可以播放…...

GoFrame框架介绍

GoFrame是一款功能强大、设计精良且易用的Go语言开发框架&#xff0c;以下为你详细介绍它的相关特点和内容&#xff1a; ### 框架概述 GoFrame是为了提升Go语言开发者的编码效率以及项目的整体可维护性、可扩展性等而打造的开发框架&#xff0c;它涵盖了从基础的网络通信、数据…...

MapReduce工作流程+Shuffle机制

一、Mapreduce工作流程 &#xff08;1&#xff09;数据切片Split。数据切片数决定maptask并行度&#xff0c;默认情况下&#xff0c;一个切片大小块大小。切片不是针对整体数据集&#xff0c;而是针对每一个文件单独切片&#xff0c;所以会有小文件问题&#xff08;CombineTex…...

JAVA8 Stream API 使用详解

Java 8 引入了 Stream API&#xff0c;它提供了对集合对象进行一系列操作的新方式&#xff0c;包括筛选、转换、聚合等。Stream API 的设计目标是提供一种高效且易于使用的处理集合数据的方式&#xff0c;同时支持并行处理。 以下是 Java 8 Stream API 的一些核心概念和使用详解…...

Redis 集群架构:高可用与扩展性

一、引言 在当今数字化时代&#xff0c;数据量呈爆炸式增长&#xff0c;对数据存储和处理的要求也越来越高。Redis作为一款高性能的键值对存储数据库&#xff0c;其集群架构在应对高并发、大数据量场景时展现出了独特的优势&#xff0c;成为众多企业构建高效、稳定系统的关键技…...

Redis数据对象

基本结构图 key和value指向的是redisObject对象 type&#xff1a;标识该对象用的是什么类型&#xff08;String、List Redis数据结构 SDS SDS有4个属性&#xff1a; len&#xff1a;记录了字符串长度&#xff0c;因此获取字符串长度的时候时间复杂度O&#xff08;1&#xff…...

Docker部署GitLab服务器

一、GitLab介绍 1.1 GitLab简介 GitLab 是一款基于 Git 的开源代码托管平台&#xff0c;集成了版本控制、代码审查、问题跟踪、持续集成与持续交付&#xff08;CI/CD&#xff09;等多种功能&#xff0c;旨在为团队提供一站式的项目管理解决方案。借助 GitLab&#xff0c;开发…...

python版本的Selenium的下载及chrome环境搭建和简单使用

针对Python版本的Selenium下载及Chrome环境搭建和使用&#xff0c;以下将详细阐述具体步骤&#xff1a; 一、Python版本的Selenium下载 安装Python环境&#xff1a; 确保系统上已经安装了Python 3.8及以上版本。可以从[Python官方网站]下载并安装最新版本的Python&#xff0c;…...

重温设计模式--4、组合模式

文章目录 1 、组合模式&#xff08;Composite Pattern&#xff09;概述2. 组合模式的结构3. C 代码示例4. C示例代码25 .应用场景 1 、组合模式&#xff08;Composite Pattern&#xff09;概述 定义&#xff1a;组合模式是一种结构型设计模式&#xff0c;它允许你将对象组合成…...

5、mysql的读写分离

主从复制 主从复制的含义 主从复制&#xff1a;在一个mysql的集群当中&#xff0c;至少3台&#xff0c;即主1台&#xff0c;从2台。 当有数据写入时&#xff0c;主负责写入本库&#xff0c;然后把数据同步到从服务器。 一定是在主服务器写入数据&#xff0c;从服务器的写入…...

uniapp Native.js原生arr插件服务发送广播到uniapp页面中

前言 最近搞了个设备&#xff0c;需求是读取m1卡&#xff0c;厂家给了个安卓原生demo&#xff0c;接入arr插件如下&#xff0c;接入后发现还是少了一部分代码&#xff0c;设备服务调起后触发刷卡无法发送到uniapp里。 中间是一些踩坑记录&#xff0c;最后面是解决办法&#xf…...

如何在 Ubuntu 22.04 上安装 Elasticsearch

简介 在本教程中&#xff0c;你将学习如何在 Ubuntu 22.04 服务器上安装 Elasticsearch。此外&#xff0c;你还将学习如何使用 Elasticsearch REST API 索引和操作数据。 Elasticsearch 是一个基于 Apache Lucene Library 的免费分布式搜索和分析引擎。它是一个快速且可扩展的…...

单片机长耗时前后台任务优化

代码&#xff1a; void Task_10ms(void) {... }//改 void Task_2ms(void) {static uint8_t s_state 0switch(s_state){case 0:....s_state 1;break;case 1:....s_state 2;break;case 3:....s_state 1;break;default: //此段可以去除s_state 0;break; } } 参考链接 MCU长…...

Linux大数据方向shell

一、概述 shell是一个命令行解释器&#xff0c;它接收应用程序/用户命令&#xff0c;然后调用操作系统内核&#xff0c;还是一个功能相当强大的编程语言&#xff0c;易编写&#xff0c;易调试&#xff0c;灵活性强。 二、shell入门 1.输出hello world touch helloworld.sh&…...

爬虫 APP 逆向 ---> shopee(虾皮) 电商

shopee 泰国站点&#xff1a;https://shopee.co.th/ shopee 网页访问时&#xff0c;直接弹出使用 app 登录查看&#xff0c;那就登录 shopee 泰国站点 app。 手机抓包&#xff1a;分类接口 接口&#xff1a;https://mall.shopee.co.th/api/v4/pages/get_category_tree 请求参…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南&#xff1a;从入门到实战 一、Grunt 是什么&#xff1f; Grunt是一个基于 Node.js 的前端自动化任务运行器&#xff0c;主要用于自动化执行项目开发中重复性高的任务&#xff0c;例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…...

逻辑回归暴力训练预测金融欺诈

简述 「使用逻辑回归暴力预测金融欺诈&#xff0c;并不断增加特征维度持续测试」的做法&#xff0c;体现了一种逐步建模与迭代验证的实验思路&#xff0c;在金融欺诈检测中非常有价值&#xff0c;本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...

Rust 开发环境搭建

环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行&#xff1a; rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu ​ 2、Hello World fn main() { println…...

Golang——7、包与接口详解

包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...

从面试角度回答Android中ContentProvider启动原理

Android中ContentProvider原理的面试角度解析&#xff0c;分为​​已启动​​和​​未启动​​两种场景&#xff1a; 一、ContentProvider已启动的情况 1. ​​核心流程​​ ​​触发条件​​&#xff1a;当其他组件&#xff08;如Activity、Service&#xff09;通过ContentR…...

学习一下用鸿蒙​​DevEco Studio HarmonyOS5实现百度地图

在鸿蒙&#xff08;HarmonyOS5&#xff09;中集成百度地图&#xff0c;可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API&#xff0c;可以构建跨设备的定位、导航和地图展示功能。 ​​1. 鸿蒙环境准备​​ ​​开发工具​​&#xff1a;下载安装 ​​De…...