WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(01)- Qos概述
WebRTC服务质量(02)- RTP协议
WebRTC服务质量(03)- RTCP协议
WebRTC服务质量(04)- 重传机制(01) RTX NACK概述
WebRTC服务质量(05)- 重传机制(02) NACK判断丢包
WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包
WebRTC服务质量(07)- 重传机制(04) 接收NACK消息
WebRTC服务质量(08)- 重传机制(05) RTX机制
WebRTC服务质量(09)- Pacer机制(01) 流程概述
WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(11)- Pacer机制(03) IntervalBudget
WebRTC服务质量(12)- Pacer机制(04) 向Pacer中插入数据
一、前言:
RoundRobinPacketQueue 是Pacer模块当中一个非常重要的循环队列模块,主要目的是在多流场景中,根据每个流的 优先级 以及流内的 RTP 包的 入队顺序、类型 和其他因素,动态决定数据包的发送顺序。
二、类关系图:
2.1、类图:

关键部分加了注释,便于理解:
- 多流支持: 每个流按 SSRC(Synchronization Source Identifier,同步信源标识符)进行区分,每个流维护独立的优先级队列。
- 多级优先级调度: 不同流间按优先级调度(基于
StreamPrioKey),单个流内的包按严格的队列顺序管理。 - 静态与动态属性结合: 例如,属性包括每个包的优先级(
priority)、入队时间(enqueue_time)、重传标志、带宽开销等。 - 分组调度: 使用轮询(Round-Robin)机制,从不同流根据流的权重和优先级选取包发送,提高音频、视频和反馈等多种类型数据的实时性。
2.2、重要成员关系:

- stream_priorities当中存放StreamPrioKey和ssrc对应关系;
- streams当中存放ssrc和Stream的对应关系;
- 这样就建立了StreamPrioKey和Stream之间的关系;
- 反过来,也可以通过Stream当中的priority_it找到stream_priorities当中的某一项;
小结下:重要成员变量的功能
| 成员变量 | 功能 |
|---|---|
streams_ | 保存所有流,键是流的 ssrc,每条流存有独立队列和优先级信息。 |
stream_priorities_ | 将所有流按照 StreamPrioKey 排列,用于实现流间优先级调度。 |
enqueue_times_ | 一个多重集合,保存所有包的入队时间,便于快速找到最早入队的包时间。 |
size_packets_ 和 size_ | 分别记录队列中包的个数和总字节数,动态调整。 |
transport_overhead_per_packet_ | 计算包传输的额外开销(如包头)。 |
time_last_updated_ | 用于统计队列的更新时间,辅助计算入队和等待时间等。 |
三、重要函数:
3.1、Push
Push 会根据包的优先级、流的权重、总队列大小,以及包的类型等,将包插入到对应的流(Stream)队列,并更新其他与队列状态关联的元数据。总体思路如下:
处理一个新包时:
- 确定该包所属的流,如果不存在该流,则创建一个
Stream对象。 - 将包包装成为
QueuedPacket并插入到流的优先级队列。 - 更新流的优先级键
StreamPrioKey,并在stream_priorities_中重新排序。 - 更新队列元数据(如队列包大小、队列时间等)。
void RoundRobinPacketQueue::Push(int priority,Timestamp enqueue_time,uint64_t enqueue_order,std::unique_ptr<RtpPacketToSend> packet) {RTC_DCHECK(packet->packet_type().has_value());if (size_packets_ == 0) {// Single packet fast-path.single_packet_queue_.emplace(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.end(), std::move(packet)));UpdateQueueTime(enqueue_time);single_packet_queue_->SubtractPauseTime(pause_time_sum_);size_packets_ = 1;size_ += PacketSize(*single_packet_queue_);} else {MaybePromoteSinglePacketToNormalQueue();Push(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.insert(enqueue_time), std::move(packet)));}
}void RoundRobinPacketQueue::Push(QueuedPacket packet) {auto stream_info_it = streams_.find(packet.Ssrc());if (stream_info_it == streams_.end()) {stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;stream_info_it->second.priority_it = stream_priorities_.end();stream_info_it->second.ssrc = packet.Ssrc();}Stream* stream = &stream_info_it->second;if (stream->priority_it == stream_priorities_.end()) {RTC_CHECK(!IsSsrcScheduled(stream->ssrc));stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());} else if (packet.Priority() < stream->priority_it->first.priority) {stream_priorities_.erase(stream->priority_it);stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());}if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {packet.UpdateEnqueueTimeIterator(enqueue_times_.insert(packet.EnqueueTime()));} else {UpdateQueueTime(packet.EnqueueTime());packet.SubtractPauseTime(pause_time_sum_);size_packets_ += 1;size_ += PacketSize(packet);}stream->packet_queue.push(packet);
}
- 从streams中找packet所属的Ssrc的stream,如果没有,则在streams中插入一项;
- 查看stream的priority_it是否等于stream_priorities_的end()
- 如果相等,则在stream_priorities_中插入一项;
- 否则,如果新包的优先高(注意,优先级数值越小表示优先级越高),则更新其ssrc对应队列的优先级;
- 更新队列总时长;
- 入队时间减去暂停时间(一般不会有暂停);
- 队列总包数+1;
- 队列总字节大小 = 包的负载大小+Padding大小;
- 插入到stream中的packet_queue中;
3.2、Pop
Pop 会轮询不同的流并从当前优先级最高的流中取出一个包发送,同时维护包的发送顺序。总体思路如下:
- 调用
GetHighestPriorityStream获取当前优先级最高的流。 - 从该流的优先级队列(
PriorityPacketQueue)中取出队首包,并更新流的状态(如剩余大小、时间等)。 - 如果该流没有剩余包,删除对应的流优先级键。
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {if (single_packet_queue_.has_value()) {RTC_DCHECK(stream_priorities_.empty());std::unique_ptr<RtpPacketToSend> rtp_packet(single_packet_queue_->RtpPacket());single_packet_queue_.reset();queue_time_sum_ = TimeDelta::Zero();size_packets_ = 0;size_ = DataSize::Zero();return rtp_packet;}RTC_DCHECK(!Empty());Stream* stream = GetHighestPriorityStream();const QueuedPacket& queued_packet = stream->packet_queue.top();stream_priorities_.erase(stream->priority_it);// Calculate the total amount of time spent by this packet in the queue// while in a non-paused state. Note that the |pause_time_sum_ms_| was// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and// by subtracting it now we effectively remove the time spent in in the// queue while in a paused state.TimeDelta time_in_non_paused_state =time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;queue_time_sum_ -= time_in_non_paused_state;RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());enqueue_times_.erase(queued_packet.EnqueueTimeIterator());// Update |bytes| of this stream. The general idea is that the stream that// has sent the least amount of bytes should have the highest priority.// The problem with that is if streams send with different rates, in which// case a "budget" will be built up for the stream sending at the lower// rate. To avoid building a too large budget we limit |bytes| to be within// kMaxLeading bytes of the stream that has sent the most amount of bytes.DataSize packet_size = PacketSize(queued_packet);stream->size =std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);max_size_ = std::max(max_size_, stream->size);size_ -= packet_size;size_packets_ -= 1;RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());stream->packet_queue.pop();// If there are packets left to be sent, schedule the stream again.RTC_CHECK(!IsSsrcScheduled(stream->ssrc));if (stream->packet_queue.empty()) {stream->priority_it = stream_priorities_.end();} else {int priority = stream->packet_queue.top().Priority();stream->priority_it = stream_priorities_.emplace(StreamPrioKey(priority, stream->size), stream->ssrc);}return rtp_packet;
}
-
获得优先级最高的stream;
-
从stream的packet_queue当中取出第一个包;
-
将stream在
stream_priorities_中的项删除;-
思考下虽然包删除了,但是stream还在,为啥要删除?
答案:因为
stream_priorities_是multimap,允许出现相同key,也就是说,这个包没有,它的stream优先级全靠它撑着(因为是先pop优先级高的包),所以,现在不应该由它撑着了,换下一个优先级最高的包撑着。
-
-
计算Packet入队后距离现在的时间(不包括暂停时间);
-
将这段时间从队列的总时间减去;
-
从
enqueue_times_中将Packet的项删除; -
总包数减去1;
-
总字节数减去包的字节数;
-
将包从stream中的queue中弹出;
-
如果stream中的队列为空,则令
stream的priority_it指向stream_priorities_的end(); -
否则,从stream队列头部取出Packet,将该Packet的priority插入到
stream_priorities_中;
四、优先级调度:
-
每个流有单独的优先级队列(
PriorityPacketQueue),保存QueuedPacket对象。 -
QueuedPacket是一个包装类,表示单个 RTP 数据包及其附属信息(例如入队时间、优先级、是否为重传包等)。 -
列表中的流按
StreamPrioKey保存在stream_priorities_中,通过此键决定流次序:struct StreamPrioKey {int priority; // 流的优先级,数值越低,优先级越高DataSize size; // 数据包大小,用于平衡负载 };优先规则:优先级低(priority 值小) > 数据包大小小(size 值小)。
五、轮询调度:
- 核心逻辑通过
RoundRobin的方式轮询多个流。但由于流可能有不同的优先级,某些流会被更多次轮询到。 - 函数
GetHighestPriorityStream定位当前最高优先级的流,从流对应的队列中取得包然后发送。
六、Stream定义与管理:
每个流由 Stream 类表示,Stream 是该流独有的数据结构,包括:
- 当前队列状态,如总字节数、包大小和优先级。
- 内部维护单独的优先级队列(
PriorityPacketQueue)。 - 调度时实时更新优先级,确保新加入高优先级包时能调整队列次序。
七、其他特性:
7.1、重传支持:
- 标记是否重传的包,在出队时可能依据该标记进行特殊处理。
7.2、时间相关:
queue_time_sum_和pause_time_sum_用于统计包在队列中的存留时间,这对于带宽控制和流量管理很有用。- 提供接口如
AverageQueueTime计算平均队列时间,用于监控流的实时性。
7.3、队列字节限制:
队列有最大可存储的字节数(max_size_),以防止占用过多资源。
7.4、暂停/恢复功能:
可以通过 SetPauseState 暂停或者恢复队列处理。劝你最好别用!!!
八、总结:
RoundRobinPacketQueue 是一个高效的多流、多优先级调度队列,适用于 RTP 媒体数据的分组发送场景。它通过流内、流间的双重调度机制,结合优先级动态提升、统计队列时间和暂停控制等特性,确保在带宽有限的网络环境中最大程度提高数据的实时性和发送效率,是 WebRTC Pacer 模块的核心部分。
相关文章:
WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(01)- Qos概述 WebRTC服务质量(02)- RTP协议 WebRTC服务质量(03)- RTCP协议 WebRTC服务质量(04)- 重传机制(01) RTX NACK概述 WebRTC服务质量(…...
数据库的数据被清除了,该如何恢复?
当数据库的数据被清除时,恢复数据的难度和可能性取决于多种因素,包括数据清除的方式、数据库的类型、是否有备份等。以下是一些常见的数据库数据恢复方法: 一、基于备份的恢复 使用备份文件: 如果数据库有定期的备份,…...
【西安电子科技大学考研】25官方复试专业课参考书目汇总
初试已经顺利考完啦、成绩已经公布,现在已经有很多同学来问学长学姐,复试参考书有哪些,复试应该做好哪些准备。故此学长学姐给大家整理好了西安电子科技大学各个学院的复试参考书目录,有需要的同学可以参考一下哈。大家可以结合本…...
【理解机器学习中的过拟合与欠拟合】
在机器学习中,模型的表现很大程度上取决于我们如何平衡“过拟合”和“欠拟合”。本文通过理论介绍和代码演示,详细解析过拟合与欠拟合现象,并提出应对策略。主要内容如下: 什么是过拟合和欠拟合? 如何防止过拟合和欠拟…...
fastjson诡异报错
1、环境以及报错描述 1.1 环境 操作系统为中标麒麟、cpu 为国产鲲鹏服务器。 jdk为openjdk version 1.8.0._242 1.2 错误 com.alibaba.fastjson2.JSONException: syntax error : f at com.alibaba.fastjson2.JSONReaderUTF16.readBoolValue(JSONReaderUTF16.java:6424) at c…...
面经zhenyq
如何去实现分层的动画效果? 在Unity中实现分层的动画效果,可以通过Animator的 Layer 功能实现。以下是详细步骤: 1. 什么是分层动画? 分层动画允许在同一个角色的不同部分同时播放独立的动画。例如: 上半身可以播放…...
GoFrame框架介绍
GoFrame是一款功能强大、设计精良且易用的Go语言开发框架,以下为你详细介绍它的相关特点和内容: ### 框架概述 GoFrame是为了提升Go语言开发者的编码效率以及项目的整体可维护性、可扩展性等而打造的开发框架,它涵盖了从基础的网络通信、数据…...
MapReduce工作流程+Shuffle机制
一、Mapreduce工作流程 (1)数据切片Split。数据切片数决定maptask并行度,默认情况下,一个切片大小块大小。切片不是针对整体数据集,而是针对每一个文件单独切片,所以会有小文件问题(CombineTex…...
JAVA8 Stream API 使用详解
Java 8 引入了 Stream API,它提供了对集合对象进行一系列操作的新方式,包括筛选、转换、聚合等。Stream API 的设计目标是提供一种高效且易于使用的处理集合数据的方式,同时支持并行处理。 以下是 Java 8 Stream API 的一些核心概念和使用详解…...
Redis 集群架构:高可用与扩展性
一、引言 在当今数字化时代,数据量呈爆炸式增长,对数据存储和处理的要求也越来越高。Redis作为一款高性能的键值对存储数据库,其集群架构在应对高并发、大数据量场景时展现出了独特的优势,成为众多企业构建高效、稳定系统的关键技…...
Redis数据对象
基本结构图 key和value指向的是redisObject对象 type:标识该对象用的是什么类型(String、List Redis数据结构 SDS SDS有4个属性: len:记录了字符串长度,因此获取字符串长度的时候时间复杂度O(1ÿ…...
Docker部署GitLab服务器
一、GitLab介绍 1.1 GitLab简介 GitLab 是一款基于 Git 的开源代码托管平台,集成了版本控制、代码审查、问题跟踪、持续集成与持续交付(CI/CD)等多种功能,旨在为团队提供一站式的项目管理解决方案。借助 GitLab,开发…...
python版本的Selenium的下载及chrome环境搭建和简单使用
针对Python版本的Selenium下载及Chrome环境搭建和使用,以下将详细阐述具体步骤: 一、Python版本的Selenium下载 安装Python环境: 确保系统上已经安装了Python 3.8及以上版本。可以从[Python官方网站]下载并安装最新版本的Python,…...
重温设计模式--4、组合模式
文章目录 1 、组合模式(Composite Pattern)概述2. 组合模式的结构3. C 代码示例4. C示例代码25 .应用场景 1 、组合模式(Composite Pattern)概述 定义:组合模式是一种结构型设计模式,它允许你将对象组合成…...
5、mysql的读写分离
主从复制 主从复制的含义 主从复制:在一个mysql的集群当中,至少3台,即主1台,从2台。 当有数据写入时,主负责写入本库,然后把数据同步到从服务器。 一定是在主服务器写入数据,从服务器的写入…...
uniapp Native.js原生arr插件服务发送广播到uniapp页面中
前言 最近搞了个设备,需求是读取m1卡,厂家给了个安卓原生demo,接入arr插件如下,接入后发现还是少了一部分代码,设备服务调起后触发刷卡无法发送到uniapp里。 中间是一些踩坑记录,最后面是解决办法…...
如何在 Ubuntu 22.04 上安装 Elasticsearch
简介 在本教程中,你将学习如何在 Ubuntu 22.04 服务器上安装 Elasticsearch。此外,你还将学习如何使用 Elasticsearch REST API 索引和操作数据。 Elasticsearch 是一个基于 Apache Lucene Library 的免费分布式搜索和分析引擎。它是一个快速且可扩展的…...
单片机长耗时前后台任务优化
代码: void Task_10ms(void) {... }//改 void Task_2ms(void) {static uint8_t s_state 0switch(s_state){case 0:....s_state 1;break;case 1:....s_state 2;break;case 3:....s_state 1;break;default: //此段可以去除s_state 0;break; } } 参考链接 MCU长…...
Linux大数据方向shell
一、概述 shell是一个命令行解释器,它接收应用程序/用户命令,然后调用操作系统内核,还是一个功能相当强大的编程语言,易编写,易调试,灵活性强。 二、shell入门 1.输出hello world touch helloworld.sh&…...
爬虫 APP 逆向 ---> shopee(虾皮) 电商
shopee 泰国站点:https://shopee.co.th/ shopee 网页访问时,直接弹出使用 app 登录查看,那就登录 shopee 泰国站点 app。 手机抓包:分类接口 接口:https://mall.shopee.co.th/api/v4/pages/get_category_tree 请求参…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...
群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...
