当前位置: 首页 > news >正文

WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue

WebRTC服务质量(01)- Qos概述
WebRTC服务质量(02)- RTP协议
WebRTC服务质量(03)- RTCP协议
WebRTC服务质量(04)- 重传机制(01) RTX NACK概述
WebRTC服务质量(05)- 重传机制(02) NACK判断丢包
WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包
WebRTC服务质量(07)- 重传机制(04) 接收NACK消息
WebRTC服务质量(08)- 重传机制(05) RTX机制
WebRTC服务质量(09)- Pacer机制(01) 流程概述
WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(11)- Pacer机制(03) IntervalBudget
WebRTC服务质量(12)- Pacer机制(04) 向Pacer中插入数据

一、前言:

RoundRobinPacketQueue 是Pacer模块当中一个非常重要的循环队列模块,主要目的是在多流场景中,根据每个流的 优先级 以及流内的 RTP 包的 入队顺序类型 和其他因素,动态决定数据包的发送顺序。

二、类关系图:

2.1、类图:

在这里插入图片描述

关键部分加了注释,便于理解:

  1. 多流支持: 每个流按 SSRC(Synchronization Source Identifier,同步信源标识符)进行区分,每个流维护独立的优先级队列。
  2. 多级优先级调度: 不同流间按优先级调度(基于 StreamPrioKey),单个流内的包按严格的队列顺序管理。
  3. 静态与动态属性结合: 例如,属性包括每个包的优先级(priority)、入队时间(enqueue_time)、重传标志、带宽开销等。
  4. 分组调度: 使用轮询(Round-Robin)机制,从不同流根据流的权重和优先级选取包发送,提高音频、视频和反馈等多种类型数据的实时性。

2.2、重要成员关系:

在这里插入图片描述

  1. stream_priorities当中存放StreamPrioKey和ssrc对应关系;
  2. streams当中存放ssrc和Stream的对应关系;
  3. 这样就建立了StreamPrioKey和Stream之间的关系;
  4. 反过来,也可以通过Stream当中的priority_it找到stream_priorities当中的某一项;

小结下:重要成员变量的功能

成员变量功能
streams_保存所有流,键是流的 ssrc,每条流存有独立队列和优先级信息。
stream_priorities_将所有流按照 StreamPrioKey 排列,用于实现流间优先级调度。
enqueue_times_一个多重集合,保存所有包的入队时间,便于快速找到最早入队的包时间。
size_packets_size_分别记录队列中包的个数和总字节数,动态调整。
transport_overhead_per_packet_计算包传输的额外开销(如包头)。
time_last_updated_用于统计队列的更新时间,辅助计算入队和等待时间等。

三、重要函数:

3.1、Push

Push 会根据包的优先级、流的权重、总队列大小,以及包的类型等,将包插入到对应的流(Stream)队列,并更新其他与队列状态关联的元数据。总体思路如下:

处理一个新包时:

  1. 确定该包所属的流,如果不存在该流,则创建一个 Stream 对象。
  2. 将包包装成为 QueuedPacket 并插入到流的优先级队列。
  3. 更新流的优先级键 StreamPrioKey,并在 stream_priorities_ 中重新排序。
  4. 更新队列元数据(如队列包大小、队列时间等)。
void RoundRobinPacketQueue::Push(int priority,Timestamp enqueue_time,uint64_t enqueue_order,std::unique_ptr<RtpPacketToSend> packet) {RTC_DCHECK(packet->packet_type().has_value());if (size_packets_ == 0) {// Single packet fast-path.single_packet_queue_.emplace(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.end(), std::move(packet)));UpdateQueueTime(enqueue_time);single_packet_queue_->SubtractPauseTime(pause_time_sum_);size_packets_ = 1;size_ += PacketSize(*single_packet_queue_);} else {MaybePromoteSinglePacketToNormalQueue();Push(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.insert(enqueue_time), std::move(packet)));}
}void RoundRobinPacketQueue::Push(QueuedPacket packet) {auto stream_info_it = streams_.find(packet.Ssrc());if (stream_info_it == streams_.end()) {stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;stream_info_it->second.priority_it = stream_priorities_.end();stream_info_it->second.ssrc = packet.Ssrc();}Stream* stream = &stream_info_it->second;if (stream->priority_it == stream_priorities_.end()) {RTC_CHECK(!IsSsrcScheduled(stream->ssrc));stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());} else if (packet.Priority() < stream->priority_it->first.priority) {stream_priorities_.erase(stream->priority_it);stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());}if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {packet.UpdateEnqueueTimeIterator(enqueue_times_.insert(packet.EnqueueTime()));} else {UpdateQueueTime(packet.EnqueueTime());packet.SubtractPauseTime(pause_time_sum_);size_packets_ += 1;size_ += PacketSize(packet);}stream->packet_queue.push(packet);
}
  • 从streams中找packet所属的Ssrc的stream,如果没有,则在streams中插入一项;
  • 查看stream的priority_it是否等于stream_priorities_的end()
    • 如果相等,则在stream_priorities_中插入一项;
    • 否则,如果新包的优先高(注意,优先级数值越小表示优先级越高),则更新其ssrc对应队列的优先级;
  • 更新队列总时长;
  • 入队时间减去暂停时间(一般不会有暂停);
  • 队列总包数+1;
  • 队列总字节大小 = 包的负载大小+Padding大小;
  • 插入到stream中的packet_queue中;

3.2、Pop

Pop 会轮询不同的流并从当前优先级最高的流中取出一个包发送,同时维护包的发送顺序。总体思路如下:

  1. 调用 GetHighestPriorityStream 获取当前优先级最高的流。
  2. 从该流的优先级队列(PriorityPacketQueue)中取出队首包,并更新流的状态(如剩余大小、时间等)。
  3. 如果该流没有剩余包,删除对应的流优先级键。
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {if (single_packet_queue_.has_value()) {RTC_DCHECK(stream_priorities_.empty());std::unique_ptr<RtpPacketToSend> rtp_packet(single_packet_queue_->RtpPacket());single_packet_queue_.reset();queue_time_sum_ = TimeDelta::Zero();size_packets_ = 0;size_ = DataSize::Zero();return rtp_packet;}RTC_DCHECK(!Empty());Stream* stream = GetHighestPriorityStream();const QueuedPacket& queued_packet = stream->packet_queue.top();stream_priorities_.erase(stream->priority_it);// Calculate the total amount of time spent by this packet in the queue// while in a non-paused state. Note that the |pause_time_sum_ms_| was// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and// by subtracting it now we effectively remove the time spent in in the// queue while in a paused state.TimeDelta time_in_non_paused_state =time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;queue_time_sum_ -= time_in_non_paused_state;RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());enqueue_times_.erase(queued_packet.EnqueueTimeIterator());// Update |bytes| of this stream. The general idea is that the stream that// has sent the least amount of bytes should have the highest priority.// The problem with that is if streams send with different rates, in which// case a "budget" will be built up for the stream sending at the lower// rate. To avoid building a too large budget we limit |bytes| to be within// kMaxLeading bytes of the stream that has sent the most amount of bytes.DataSize packet_size = PacketSize(queued_packet);stream->size =std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);max_size_ = std::max(max_size_, stream->size);size_ -= packet_size;size_packets_ -= 1;RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());stream->packet_queue.pop();// If there are packets left to be sent, schedule the stream again.RTC_CHECK(!IsSsrcScheduled(stream->ssrc));if (stream->packet_queue.empty()) {stream->priority_it = stream_priorities_.end();} else {int priority = stream->packet_queue.top().Priority();stream->priority_it = stream_priorities_.emplace(StreamPrioKey(priority, stream->size), stream->ssrc);}return rtp_packet;
}
  • 获得优先级最高的stream;

  • 从stream的packet_queue当中取出第一个包;

  • 将stream在stream_priorities_中的项删除;

    • 思考下虽然包删除了,但是stream还在,为啥要删除?

      答案:因为stream_priorities_ 是multimap,允许出现相同key,也就是说,这个包没有,它的stream优先级全靠它撑着(因为是先pop优先级高的包),所以,现在不应该由它撑着了,换下一个优先级最高的包撑着。

  • 计算Packet入队后距离现在的时间(不包括暂停时间);

  • 将这段时间从队列的总时间减去;

  • enqueue_times_中将Packet的项删除;

  • 总包数减去1;

  • 总字节数减去包的字节数;

  • 将包从stream中的queue中弹出;

  • 如果stream中的队列为空,则令stream的priority_it指向stream_priorities_end();

  • 否则,从stream队列头部取出Packet,将该Packet的priority插入到stream_priorities_中;

四、优先级调度:

  • 每个流有单独的优先级队列(PriorityPacketQueue),保存 QueuedPacket 对象。

  • QueuedPacket 是一个包装类,表示单个 RTP 数据包及其附属信息(例如入队时间、优先级、是否为重传包等)。

  • 列表中的流按 StreamPrioKey 保存在 stream_priorities_ 中,通过此键决定流次序:

    struct StreamPrioKey {int priority;    // 流的优先级,数值越低,优先级越高DataSize size;   // 数据包大小,用于平衡负载
    };
    

    优先规则:优先级低(priority 值小) > 数据包大小小(size 值小)。

五、轮询调度:

  • 核心逻辑通过 RoundRobin 的方式轮询多个流。但由于流可能有不同的优先级,某些流会被更多次轮询到。
  • 函数 GetHighestPriorityStream 定位当前最高优先级的流,从流对应的队列中取得包然后发送。

六、Stream定义与管理:

每个流由 Stream 类表示,Stream 是该流独有的数据结构,包括:

  • 当前队列状态,如总字节数、包大小和优先级。
  • 内部维护单独的优先级队列(PriorityPacketQueue)。
  • 调度时实时更新优先级,确保新加入高优先级包时能调整队列次序。

七、其他特性:

7.1、重传支持:

  • 标记是否重传的包,在出队时可能依据该标记进行特殊处理。

7.2、时间相关:

  • queue_time_sum_pause_time_sum_ 用于统计包在队列中的存留时间,这对于带宽控制和流量管理很有用。
  • 提供接口如 AverageQueueTime 计算平均队列时间,用于监控流的实时性。

7.3、队列字节限制:

队列有最大可存储的字节数(max_size_),以防止占用过多资源。

7.4、暂停/恢复功能:

可以通过 SetPauseState 暂停或者恢复队列处理。劝你最好别用!!!

八、总结:

RoundRobinPacketQueue 是一个高效的多流、多优先级调度队列,适用于 RTP 媒体数据的分组发送场景。它通过流内、流间的双重调度机制,结合优先级动态提升、统计队列时间和暂停控制等特性,确保在带宽有限的网络环境中最大程度提高数据的实时性和发送效率,是 WebRTC Pacer 模块的核心部分。

相关文章:

WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue

WebRTC服务质量&#xff08;01&#xff09;- Qos概述 WebRTC服务质量&#xff08;02&#xff09;- RTP协议 WebRTC服务质量&#xff08;03&#xff09;- RTCP协议 WebRTC服务质量&#xff08;04&#xff09;- 重传机制&#xff08;01) RTX NACK概述 WebRTC服务质量&#xff08;…...

数据库的数据被清除了,该如何恢复?

当数据库的数据被清除时&#xff0c;恢复数据的难度和可能性取决于多种因素&#xff0c;包括数据清除的方式、数据库的类型、是否有备份等。以下是一些常见的数据库数据恢复方法&#xff1a; 一、基于备份的恢复 使用备份文件&#xff1a; 如果数据库有定期的备份&#xff0c…...

【西安电子科技大学考研】25官方复试专业课参考书目汇总

初试已经顺利考完啦、成绩已经公布&#xff0c;现在已经有很多同学来问学长学姐&#xff0c;复试参考书有哪些&#xff0c;复试应该做好哪些准备。故此学长学姐给大家整理好了西安电子科技大学各个学院的复试参考书目录&#xff0c;有需要的同学可以参考一下哈。大家可以结合本…...

【理解机器学习中的过拟合与欠拟合】

在机器学习中&#xff0c;模型的表现很大程度上取决于我们如何平衡“过拟合”和“欠拟合”。本文通过理论介绍和代码演示&#xff0c;详细解析过拟合与欠拟合现象&#xff0c;并提出应对策略。主要内容如下&#xff1a; 什么是过拟合和欠拟合&#xff1f; 如何防止过拟合和欠拟…...

fastjson诡异报错

1、环境以及报错描述 1.1 环境 操作系统为中标麒麟、cpu 为国产鲲鹏服务器。 jdk为openjdk version 1.8.0._242 1.2 错误 com.alibaba.fastjson2.JSONException: syntax error : f at com.alibaba.fastjson2.JSONReaderUTF16.readBoolValue(JSONReaderUTF16.java:6424) at c…...

面经zhenyq

如何去实现分层的动画效果&#xff1f; 在Unity中实现分层的动画效果&#xff0c;可以通过Animator的 Layer 功能实现。以下是详细步骤&#xff1a; 1. 什么是分层动画&#xff1f; 分层动画允许在同一个角色的不同部分同时播放独立的动画。例如&#xff1a; 上半身可以播放…...

GoFrame框架介绍

GoFrame是一款功能强大、设计精良且易用的Go语言开发框架&#xff0c;以下为你详细介绍它的相关特点和内容&#xff1a; ### 框架概述 GoFrame是为了提升Go语言开发者的编码效率以及项目的整体可维护性、可扩展性等而打造的开发框架&#xff0c;它涵盖了从基础的网络通信、数据…...

MapReduce工作流程+Shuffle机制

一、Mapreduce工作流程 &#xff08;1&#xff09;数据切片Split。数据切片数决定maptask并行度&#xff0c;默认情况下&#xff0c;一个切片大小块大小。切片不是针对整体数据集&#xff0c;而是针对每一个文件单独切片&#xff0c;所以会有小文件问题&#xff08;CombineTex…...

JAVA8 Stream API 使用详解

Java 8 引入了 Stream API&#xff0c;它提供了对集合对象进行一系列操作的新方式&#xff0c;包括筛选、转换、聚合等。Stream API 的设计目标是提供一种高效且易于使用的处理集合数据的方式&#xff0c;同时支持并行处理。 以下是 Java 8 Stream API 的一些核心概念和使用详解…...

Redis 集群架构:高可用与扩展性

一、引言 在当今数字化时代&#xff0c;数据量呈爆炸式增长&#xff0c;对数据存储和处理的要求也越来越高。Redis作为一款高性能的键值对存储数据库&#xff0c;其集群架构在应对高并发、大数据量场景时展现出了独特的优势&#xff0c;成为众多企业构建高效、稳定系统的关键技…...

Redis数据对象

基本结构图 key和value指向的是redisObject对象 type&#xff1a;标识该对象用的是什么类型&#xff08;String、List Redis数据结构 SDS SDS有4个属性&#xff1a; len&#xff1a;记录了字符串长度&#xff0c;因此获取字符串长度的时候时间复杂度O&#xff08;1&#xff…...

Docker部署GitLab服务器

一、GitLab介绍 1.1 GitLab简介 GitLab 是一款基于 Git 的开源代码托管平台&#xff0c;集成了版本控制、代码审查、问题跟踪、持续集成与持续交付&#xff08;CI/CD&#xff09;等多种功能&#xff0c;旨在为团队提供一站式的项目管理解决方案。借助 GitLab&#xff0c;开发…...

python版本的Selenium的下载及chrome环境搭建和简单使用

针对Python版本的Selenium下载及Chrome环境搭建和使用&#xff0c;以下将详细阐述具体步骤&#xff1a; 一、Python版本的Selenium下载 安装Python环境&#xff1a; 确保系统上已经安装了Python 3.8及以上版本。可以从[Python官方网站]下载并安装最新版本的Python&#xff0c;…...

重温设计模式--4、组合模式

文章目录 1 、组合模式&#xff08;Composite Pattern&#xff09;概述2. 组合模式的结构3. C 代码示例4. C示例代码25 .应用场景 1 、组合模式&#xff08;Composite Pattern&#xff09;概述 定义&#xff1a;组合模式是一种结构型设计模式&#xff0c;它允许你将对象组合成…...

5、mysql的读写分离

主从复制 主从复制的含义 主从复制&#xff1a;在一个mysql的集群当中&#xff0c;至少3台&#xff0c;即主1台&#xff0c;从2台。 当有数据写入时&#xff0c;主负责写入本库&#xff0c;然后把数据同步到从服务器。 一定是在主服务器写入数据&#xff0c;从服务器的写入…...

uniapp Native.js原生arr插件服务发送广播到uniapp页面中

前言 最近搞了个设备&#xff0c;需求是读取m1卡&#xff0c;厂家给了个安卓原生demo&#xff0c;接入arr插件如下&#xff0c;接入后发现还是少了一部分代码&#xff0c;设备服务调起后触发刷卡无法发送到uniapp里。 中间是一些踩坑记录&#xff0c;最后面是解决办法&#xf…...

如何在 Ubuntu 22.04 上安装 Elasticsearch

简介 在本教程中&#xff0c;你将学习如何在 Ubuntu 22.04 服务器上安装 Elasticsearch。此外&#xff0c;你还将学习如何使用 Elasticsearch REST API 索引和操作数据。 Elasticsearch 是一个基于 Apache Lucene Library 的免费分布式搜索和分析引擎。它是一个快速且可扩展的…...

单片机长耗时前后台任务优化

代码&#xff1a; void Task_10ms(void) {... }//改 void Task_2ms(void) {static uint8_t s_state 0switch(s_state){case 0:....s_state 1;break;case 1:....s_state 2;break;case 3:....s_state 1;break;default: //此段可以去除s_state 0;break; } } 参考链接 MCU长…...

Linux大数据方向shell

一、概述 shell是一个命令行解释器&#xff0c;它接收应用程序/用户命令&#xff0c;然后调用操作系统内核&#xff0c;还是一个功能相当强大的编程语言&#xff0c;易编写&#xff0c;易调试&#xff0c;灵活性强。 二、shell入门 1.输出hello world touch helloworld.sh&…...

爬虫 APP 逆向 ---> shopee(虾皮) 电商

shopee 泰国站点&#xff1a;https://shopee.co.th/ shopee 网页访问时&#xff0c;直接弹出使用 app 登录查看&#xff0c;那就登录 shopee 泰国站点 app。 手机抓包&#xff1a;分类接口 接口&#xff1a;https://mall.shopee.co.th/api/v4/pages/get_category_tree 请求参…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

【网络安全产品大调研系列】2. 体验漏洞扫描

前言 2023 年漏洞扫描服务市场规模预计为 3.06&#xff08;十亿美元&#xff09;。漏洞扫描服务市场行业预计将从 2024 年的 3.48&#xff08;十亿美元&#xff09;增长到 2032 年的 9.54&#xff08;十亿美元&#xff09;。预测期内漏洞扫描服务市场 CAGR&#xff08;增长率&…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP

编辑-虚拟网络编辑器-更改设置 选择桥接模式&#xff0c;然后找到相应的网卡&#xff08;可以查看自己本机的网络连接&#xff09; windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置&#xff0c;选择刚才配置的桥接模式 静态ip设置&#xff1a; 我用的ubuntu24桌…...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面&#xff0c;gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress&#xff0c;说明目标所使用的cms是wordpress&#xff0c;访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...

Selenium常用函数介绍

目录 一&#xff0c;元素定位 1.1 cssSeector 1.2 xpath 二&#xff0c;操作测试对象 三&#xff0c;窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四&#xff0c;弹窗 五&#xff0c;等待 六&#xff0c;导航 七&#xff0c;文件上传 …...

NPOI操作EXCEL文件 ——CAD C# 二次开发

缺点:dll.版本容易加载错误。CAD加载插件时&#xff0c;没有加载所有类库。插件运行过程中用到某个类库&#xff0c;会从CAD的安装目录找&#xff0c;找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库&#xff0c;就用插件程序加载进…...

Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?

Pod IP 的本质与特性 Pod IP 的定位 纯端点地址&#xff1a;Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址&#xff08;如 10.244.1.2&#xff09;无特殊名称&#xff1a;在 Kubernetes 中&#xff0c;它通常被称为 “Pod IP” 或 “容器 IP”生命周期&#xff1a;与 Pod …...