当前位置: 首页 > news >正文

WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue

WebRTC服务质量(01)- Qos概述
WebRTC服务质量(02)- RTP协议
WebRTC服务质量(03)- RTCP协议
WebRTC服务质量(04)- 重传机制(01) RTX NACK概述
WebRTC服务质量(05)- 重传机制(02) NACK判断丢包
WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包
WebRTC服务质量(07)- 重传机制(04) 接收NACK消息
WebRTC服务质量(08)- 重传机制(05) RTX机制
WebRTC服务质量(09)- Pacer机制(01) 流程概述
WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(11)- Pacer机制(03) IntervalBudget
WebRTC服务质量(12)- Pacer机制(04) 向Pacer中插入数据

一、前言:

RoundRobinPacketQueue 是Pacer模块当中一个非常重要的循环队列模块,主要目的是在多流场景中,根据每个流的 优先级 以及流内的 RTP 包的 入队顺序类型 和其他因素,动态决定数据包的发送顺序。

二、类关系图:

2.1、类图:

在这里插入图片描述

关键部分加了注释,便于理解:

  1. 多流支持: 每个流按 SSRC(Synchronization Source Identifier,同步信源标识符)进行区分,每个流维护独立的优先级队列。
  2. 多级优先级调度: 不同流间按优先级调度(基于 StreamPrioKey),单个流内的包按严格的队列顺序管理。
  3. 静态与动态属性结合: 例如,属性包括每个包的优先级(priority)、入队时间(enqueue_time)、重传标志、带宽开销等。
  4. 分组调度: 使用轮询(Round-Robin)机制,从不同流根据流的权重和优先级选取包发送,提高音频、视频和反馈等多种类型数据的实时性。

2.2、重要成员关系:

在这里插入图片描述

  1. stream_priorities当中存放StreamPrioKey和ssrc对应关系;
  2. streams当中存放ssrc和Stream的对应关系;
  3. 这样就建立了StreamPrioKey和Stream之间的关系;
  4. 反过来,也可以通过Stream当中的priority_it找到stream_priorities当中的某一项;

小结下:重要成员变量的功能

成员变量功能
streams_保存所有流,键是流的 ssrc,每条流存有独立队列和优先级信息。
stream_priorities_将所有流按照 StreamPrioKey 排列,用于实现流间优先级调度。
enqueue_times_一个多重集合,保存所有包的入队时间,便于快速找到最早入队的包时间。
size_packets_size_分别记录队列中包的个数和总字节数,动态调整。
transport_overhead_per_packet_计算包传输的额外开销(如包头)。
time_last_updated_用于统计队列的更新时间,辅助计算入队和等待时间等。

三、重要函数:

3.1、Push

Push 会根据包的优先级、流的权重、总队列大小,以及包的类型等,将包插入到对应的流(Stream)队列,并更新其他与队列状态关联的元数据。总体思路如下:

处理一个新包时:

  1. 确定该包所属的流,如果不存在该流,则创建一个 Stream 对象。
  2. 将包包装成为 QueuedPacket 并插入到流的优先级队列。
  3. 更新流的优先级键 StreamPrioKey,并在 stream_priorities_ 中重新排序。
  4. 更新队列元数据(如队列包大小、队列时间等)。
void RoundRobinPacketQueue::Push(int priority,Timestamp enqueue_time,uint64_t enqueue_order,std::unique_ptr<RtpPacketToSend> packet) {RTC_DCHECK(packet->packet_type().has_value());if (size_packets_ == 0) {// Single packet fast-path.single_packet_queue_.emplace(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.end(), std::move(packet)));UpdateQueueTime(enqueue_time);single_packet_queue_->SubtractPauseTime(pause_time_sum_);size_packets_ = 1;size_ += PacketSize(*single_packet_queue_);} else {MaybePromoteSinglePacketToNormalQueue();Push(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.insert(enqueue_time), std::move(packet)));}
}void RoundRobinPacketQueue::Push(QueuedPacket packet) {auto stream_info_it = streams_.find(packet.Ssrc());if (stream_info_it == streams_.end()) {stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;stream_info_it->second.priority_it = stream_priorities_.end();stream_info_it->second.ssrc = packet.Ssrc();}Stream* stream = &stream_info_it->second;if (stream->priority_it == stream_priorities_.end()) {RTC_CHECK(!IsSsrcScheduled(stream->ssrc));stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());} else if (packet.Priority() < stream->priority_it->first.priority) {stream_priorities_.erase(stream->priority_it);stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());}if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {packet.UpdateEnqueueTimeIterator(enqueue_times_.insert(packet.EnqueueTime()));} else {UpdateQueueTime(packet.EnqueueTime());packet.SubtractPauseTime(pause_time_sum_);size_packets_ += 1;size_ += PacketSize(packet);}stream->packet_queue.push(packet);
}
  • 从streams中找packet所属的Ssrc的stream,如果没有,则在streams中插入一项;
  • 查看stream的priority_it是否等于stream_priorities_的end()
    • 如果相等,则在stream_priorities_中插入一项;
    • 否则,如果新包的优先高(注意,优先级数值越小表示优先级越高),则更新其ssrc对应队列的优先级;
  • 更新队列总时长;
  • 入队时间减去暂停时间(一般不会有暂停);
  • 队列总包数+1;
  • 队列总字节大小 = 包的负载大小+Padding大小;
  • 插入到stream中的packet_queue中;

3.2、Pop

Pop 会轮询不同的流并从当前优先级最高的流中取出一个包发送,同时维护包的发送顺序。总体思路如下:

  1. 调用 GetHighestPriorityStream 获取当前优先级最高的流。
  2. 从该流的优先级队列(PriorityPacketQueue)中取出队首包,并更新流的状态(如剩余大小、时间等)。
  3. 如果该流没有剩余包,删除对应的流优先级键。
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {if (single_packet_queue_.has_value()) {RTC_DCHECK(stream_priorities_.empty());std::unique_ptr<RtpPacketToSend> rtp_packet(single_packet_queue_->RtpPacket());single_packet_queue_.reset();queue_time_sum_ = TimeDelta::Zero();size_packets_ = 0;size_ = DataSize::Zero();return rtp_packet;}RTC_DCHECK(!Empty());Stream* stream = GetHighestPriorityStream();const QueuedPacket& queued_packet = stream->packet_queue.top();stream_priorities_.erase(stream->priority_it);// Calculate the total amount of time spent by this packet in the queue// while in a non-paused state. Note that the |pause_time_sum_ms_| was// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and// by subtracting it now we effectively remove the time spent in in the// queue while in a paused state.TimeDelta time_in_non_paused_state =time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;queue_time_sum_ -= time_in_non_paused_state;RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());enqueue_times_.erase(queued_packet.EnqueueTimeIterator());// Update |bytes| of this stream. The general idea is that the stream that// has sent the least amount of bytes should have the highest priority.// The problem with that is if streams send with different rates, in which// case a "budget" will be built up for the stream sending at the lower// rate. To avoid building a too large budget we limit |bytes| to be within// kMaxLeading bytes of the stream that has sent the most amount of bytes.DataSize packet_size = PacketSize(queued_packet);stream->size =std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);max_size_ = std::max(max_size_, stream->size);size_ -= packet_size;size_packets_ -= 1;RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());stream->packet_queue.pop();// If there are packets left to be sent, schedule the stream again.RTC_CHECK(!IsSsrcScheduled(stream->ssrc));if (stream->packet_queue.empty()) {stream->priority_it = stream_priorities_.end();} else {int priority = stream->packet_queue.top().Priority();stream->priority_it = stream_priorities_.emplace(StreamPrioKey(priority, stream->size), stream->ssrc);}return rtp_packet;
}
  • 获得优先级最高的stream;

  • 从stream的packet_queue当中取出第一个包;

  • 将stream在stream_priorities_中的项删除;

    • 思考下虽然包删除了,但是stream还在,为啥要删除?

      答案:因为stream_priorities_ 是multimap,允许出现相同key,也就是说,这个包没有,它的stream优先级全靠它撑着(因为是先pop优先级高的包),所以,现在不应该由它撑着了,换下一个优先级最高的包撑着。

  • 计算Packet入队后距离现在的时间(不包括暂停时间);

  • 将这段时间从队列的总时间减去;

  • enqueue_times_中将Packet的项删除;

  • 总包数减去1;

  • 总字节数减去包的字节数;

  • 将包从stream中的queue中弹出;

  • 如果stream中的队列为空,则令stream的priority_it指向stream_priorities_end();

  • 否则,从stream队列头部取出Packet,将该Packet的priority插入到stream_priorities_中;

四、优先级调度:

  • 每个流有单独的优先级队列(PriorityPacketQueue),保存 QueuedPacket 对象。

  • QueuedPacket 是一个包装类,表示单个 RTP 数据包及其附属信息(例如入队时间、优先级、是否为重传包等)。

  • 列表中的流按 StreamPrioKey 保存在 stream_priorities_ 中,通过此键决定流次序:

    struct StreamPrioKey {int priority;    // 流的优先级,数值越低,优先级越高DataSize size;   // 数据包大小,用于平衡负载
    };
    

    优先规则:优先级低(priority 值小) > 数据包大小小(size 值小)。

五、轮询调度:

  • 核心逻辑通过 RoundRobin 的方式轮询多个流。但由于流可能有不同的优先级,某些流会被更多次轮询到。
  • 函数 GetHighestPriorityStream 定位当前最高优先级的流,从流对应的队列中取得包然后发送。

六、Stream定义与管理:

每个流由 Stream 类表示,Stream 是该流独有的数据结构,包括:

  • 当前队列状态,如总字节数、包大小和优先级。
  • 内部维护单独的优先级队列(PriorityPacketQueue)。
  • 调度时实时更新优先级,确保新加入高优先级包时能调整队列次序。

七、其他特性:

7.1、重传支持:

  • 标记是否重传的包,在出队时可能依据该标记进行特殊处理。

7.2、时间相关:

  • queue_time_sum_pause_time_sum_ 用于统计包在队列中的存留时间,这对于带宽控制和流量管理很有用。
  • 提供接口如 AverageQueueTime 计算平均队列时间,用于监控流的实时性。

7.3、队列字节限制:

队列有最大可存储的字节数(max_size_),以防止占用过多资源。

7.4、暂停/恢复功能:

可以通过 SetPauseState 暂停或者恢复队列处理。劝你最好别用!!!

八、总结:

RoundRobinPacketQueue 是一个高效的多流、多优先级调度队列,适用于 RTP 媒体数据的分组发送场景。它通过流内、流间的双重调度机制,结合优先级动态提升、统计队列时间和暂停控制等特性,确保在带宽有限的网络环境中最大程度提高数据的实时性和发送效率,是 WebRTC Pacer 模块的核心部分。

相关文章:

WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue

WebRTC服务质量&#xff08;01&#xff09;- Qos概述 WebRTC服务质量&#xff08;02&#xff09;- RTP协议 WebRTC服务质量&#xff08;03&#xff09;- RTCP协议 WebRTC服务质量&#xff08;04&#xff09;- 重传机制&#xff08;01) RTX NACK概述 WebRTC服务质量&#xff08;…...

数据库的数据被清除了,该如何恢复?

当数据库的数据被清除时&#xff0c;恢复数据的难度和可能性取决于多种因素&#xff0c;包括数据清除的方式、数据库的类型、是否有备份等。以下是一些常见的数据库数据恢复方法&#xff1a; 一、基于备份的恢复 使用备份文件&#xff1a; 如果数据库有定期的备份&#xff0c…...

【西安电子科技大学考研】25官方复试专业课参考书目汇总

初试已经顺利考完啦、成绩已经公布&#xff0c;现在已经有很多同学来问学长学姐&#xff0c;复试参考书有哪些&#xff0c;复试应该做好哪些准备。故此学长学姐给大家整理好了西安电子科技大学各个学院的复试参考书目录&#xff0c;有需要的同学可以参考一下哈。大家可以结合本…...

【理解机器学习中的过拟合与欠拟合】

在机器学习中&#xff0c;模型的表现很大程度上取决于我们如何平衡“过拟合”和“欠拟合”。本文通过理论介绍和代码演示&#xff0c;详细解析过拟合与欠拟合现象&#xff0c;并提出应对策略。主要内容如下&#xff1a; 什么是过拟合和欠拟合&#xff1f; 如何防止过拟合和欠拟…...

fastjson诡异报错

1、环境以及报错描述 1.1 环境 操作系统为中标麒麟、cpu 为国产鲲鹏服务器。 jdk为openjdk version 1.8.0._242 1.2 错误 com.alibaba.fastjson2.JSONException: syntax error : f at com.alibaba.fastjson2.JSONReaderUTF16.readBoolValue(JSONReaderUTF16.java:6424) at c…...

面经zhenyq

如何去实现分层的动画效果&#xff1f; 在Unity中实现分层的动画效果&#xff0c;可以通过Animator的 Layer 功能实现。以下是详细步骤&#xff1a; 1. 什么是分层动画&#xff1f; 分层动画允许在同一个角色的不同部分同时播放独立的动画。例如&#xff1a; 上半身可以播放…...

GoFrame框架介绍

GoFrame是一款功能强大、设计精良且易用的Go语言开发框架&#xff0c;以下为你详细介绍它的相关特点和内容&#xff1a; ### 框架概述 GoFrame是为了提升Go语言开发者的编码效率以及项目的整体可维护性、可扩展性等而打造的开发框架&#xff0c;它涵盖了从基础的网络通信、数据…...

MapReduce工作流程+Shuffle机制

一、Mapreduce工作流程 &#xff08;1&#xff09;数据切片Split。数据切片数决定maptask并行度&#xff0c;默认情况下&#xff0c;一个切片大小块大小。切片不是针对整体数据集&#xff0c;而是针对每一个文件单独切片&#xff0c;所以会有小文件问题&#xff08;CombineTex…...

JAVA8 Stream API 使用详解

Java 8 引入了 Stream API&#xff0c;它提供了对集合对象进行一系列操作的新方式&#xff0c;包括筛选、转换、聚合等。Stream API 的设计目标是提供一种高效且易于使用的处理集合数据的方式&#xff0c;同时支持并行处理。 以下是 Java 8 Stream API 的一些核心概念和使用详解…...

Redis 集群架构:高可用与扩展性

一、引言 在当今数字化时代&#xff0c;数据量呈爆炸式增长&#xff0c;对数据存储和处理的要求也越来越高。Redis作为一款高性能的键值对存储数据库&#xff0c;其集群架构在应对高并发、大数据量场景时展现出了独特的优势&#xff0c;成为众多企业构建高效、稳定系统的关键技…...

Redis数据对象

基本结构图 key和value指向的是redisObject对象 type&#xff1a;标识该对象用的是什么类型&#xff08;String、List Redis数据结构 SDS SDS有4个属性&#xff1a; len&#xff1a;记录了字符串长度&#xff0c;因此获取字符串长度的时候时间复杂度O&#xff08;1&#xff…...

Docker部署GitLab服务器

一、GitLab介绍 1.1 GitLab简介 GitLab 是一款基于 Git 的开源代码托管平台&#xff0c;集成了版本控制、代码审查、问题跟踪、持续集成与持续交付&#xff08;CI/CD&#xff09;等多种功能&#xff0c;旨在为团队提供一站式的项目管理解决方案。借助 GitLab&#xff0c;开发…...

python版本的Selenium的下载及chrome环境搭建和简单使用

针对Python版本的Selenium下载及Chrome环境搭建和使用&#xff0c;以下将详细阐述具体步骤&#xff1a; 一、Python版本的Selenium下载 安装Python环境&#xff1a; 确保系统上已经安装了Python 3.8及以上版本。可以从[Python官方网站]下载并安装最新版本的Python&#xff0c;…...

重温设计模式--4、组合模式

文章目录 1 、组合模式&#xff08;Composite Pattern&#xff09;概述2. 组合模式的结构3. C 代码示例4. C示例代码25 .应用场景 1 、组合模式&#xff08;Composite Pattern&#xff09;概述 定义&#xff1a;组合模式是一种结构型设计模式&#xff0c;它允许你将对象组合成…...

5、mysql的读写分离

主从复制 主从复制的含义 主从复制&#xff1a;在一个mysql的集群当中&#xff0c;至少3台&#xff0c;即主1台&#xff0c;从2台。 当有数据写入时&#xff0c;主负责写入本库&#xff0c;然后把数据同步到从服务器。 一定是在主服务器写入数据&#xff0c;从服务器的写入…...

uniapp Native.js原生arr插件服务发送广播到uniapp页面中

前言 最近搞了个设备&#xff0c;需求是读取m1卡&#xff0c;厂家给了个安卓原生demo&#xff0c;接入arr插件如下&#xff0c;接入后发现还是少了一部分代码&#xff0c;设备服务调起后触发刷卡无法发送到uniapp里。 中间是一些踩坑记录&#xff0c;最后面是解决办法&#xf…...

如何在 Ubuntu 22.04 上安装 Elasticsearch

简介 在本教程中&#xff0c;你将学习如何在 Ubuntu 22.04 服务器上安装 Elasticsearch。此外&#xff0c;你还将学习如何使用 Elasticsearch REST API 索引和操作数据。 Elasticsearch 是一个基于 Apache Lucene Library 的免费分布式搜索和分析引擎。它是一个快速且可扩展的…...

单片机长耗时前后台任务优化

代码&#xff1a; void Task_10ms(void) {... }//改 void Task_2ms(void) {static uint8_t s_state 0switch(s_state){case 0:....s_state 1;break;case 1:....s_state 2;break;case 3:....s_state 1;break;default: //此段可以去除s_state 0;break; } } 参考链接 MCU长…...

Linux大数据方向shell

一、概述 shell是一个命令行解释器&#xff0c;它接收应用程序/用户命令&#xff0c;然后调用操作系统内核&#xff0c;还是一个功能相当强大的编程语言&#xff0c;易编写&#xff0c;易调试&#xff0c;灵活性强。 二、shell入门 1.输出hello world touch helloworld.sh&…...

爬虫 APP 逆向 ---> shopee(虾皮) 电商

shopee 泰国站点&#xff1a;https://shopee.co.th/ shopee 网页访问时&#xff0c;直接弹出使用 app 登录查看&#xff0c;那就登录 shopee 泰国站点 app。 手机抓包&#xff1a;分类接口 接口&#xff1a;https://mall.shopee.co.th/api/v4/pages/get_category_tree 请求参…...

设计模式和设计原则回顾

设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

TDengine 快速体验(Docker 镜像方式)

简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能&#xff0c;本节首先介绍如何通过 Docker 快速体验 TDengine&#xff0c;然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker&#xff0c;请使用 安装包的方式快…...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程&#xff1a;首先由HR先筛选一部分简历后&#xff0c;在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如&#xff1a;Boss直聘&#xff08;招聘方平台&#xff09; 直接按照条件进行筛选 例如&#xff1a…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...

GitHub 趋势日报 (2025年06月06日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...

群晖NAS如何在虚拟机创建飞牛NAS

套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...