webrtc的线程模型
目录
线程的声明
线程创建过程
向线程中投递消息
从消息队列中取消息的具体实现
处理线程消息
webrtc线程模块的实现逻辑在 rtc_base\thread.h 文件中
比如想创建一个线程:
//声明要创建的线程指针,通过智能指针管理
std::unique_ptr<rtc::Thread> video_thread_;
// 创建线程
video_thread_ = rtc::Thread::Create();
//设置新创建的线程名
video_thread_->SetName("video_thread_", video_thread_.get());
//开启线程
video_thread_->Start();
//向线程投递要处理的消息video_thread_->Post(RTC_FROM_HERE, this, MESSAGE_ID);// MESSAGE_ID 自定义的消息id//向线程投入带有消息体的消息video_thread_->Post(RTC_FROM_HERE, this, VIDEO_INFO,new rtc::TypedMessageData<VIDEO_INFO_MEESAGE>(r));//其中RTC_FROM_HERE 是个宏定义,标记线程调用的原位置
// Define a macro to record the current source location.
#define RTC_FROM_HERE RTC_FROM_HERE_WITH_FUNCTION(__FUNCTION__)
下面看下线程的具体实现
线程的声明
//线程继承自一个任务队列,并且有两个存储消息的消息队列
//普通消息 messages_,延时消息 delayed_messages_
class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {explicit Thread(SocketServer* ss);explicit Thread(std::unique_ptr<SocketServer> ss);privateMessage msgPeek_;//声明对应的消息//MessageList 具体的定义://typedef std::list<Message> MessageList;MessageList messages_ RTC_GUARDED_BY(crit_); //延时队列继承自 std::priority_queue<DelayedMessage> PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
}
创建线程的实现
//具体的创建函数
//构造中传入一个 NullSocketServer() 作为参数
std::unique_ptr<Thread> Thread::Create() {return std::unique_ptr<Thread>(new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
}//最终调用到这里,线程构造函数
Thread::Thread(SocketServer* ss, bool do_init): fPeekKeep_(false),delayed_next_num_(0),fInitialized_(false),fDestroyed_(false),stop_(0),ss_(ss) {RTC_DCHECK(ss);//把当前线程的this指针传给 NullSocketServerss_->SetMessageQueue(this);
//设置线程的初始名字SetName("Thread", this); // default nameif (do_init) {DoInit();}
}void Thread::DoInit() {if (fInitialized_) {return;}fInitialized_ = true;//把当前线程的this指针对象传给ThreadManagerThreadManager::Add(this);
}
//ThreadManager会把当前线程,放到一个 message_queues_ 中统一管理
void ThreadManager::AddInternal(Thread* message_queue) {CritScope cs(&crit_);// Prevent changes while the list of message queues is processed.RTC_DCHECK_EQ(processing_, 0);message_queues_.push_back(message_queue);
}
引入了一个新的对象 ThreadManager
//ThreadManager是线程管理类,是一个单例,
//保存创建的所有线程对象
class RTC_EXPORT ThreadManager {// Singleton, constructor and destructor are private.static ThreadManager* Instance();//保存线程的消息队列,其实是个vector,不是queue。//很多服务都喜欢用vector代替queue,srs也是把vector当queue用// This list contains all live Threads.std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_);}
//创建单例 ThreadManager,饿汉模式
ThreadManager* ThreadManager::Instance() {static ThreadManager* const thread_manager = new ThreadManager();return thread_manager;
}
//把线程指针加入到消息队列中
void ThreadManager::Add(Thread* message_queue) {return Instance()->AddInternal(message_queue);
}
void ThreadManager::AddInternal(Thread* message_queue) {CritScope cs(&crit_);// Prevent changes while the list of message queues is processed.RTC_DCHECK_EQ(processing_, 0);message_queues_.push_back(message_queue);
}
线程创建过程
线程的Start()函数才是真正创建线程的地方,只看android(即linux)端。
具体的实现是用的pthread,而没有用标准的std::thread
bool Thread::Start() {pthread_attr_t attr;pthread_attr_init(&attr);//创建线程调用的是pthread_create,//并传入线程函数 PreRunint error_code = pthread_create(&thread_, &attr, PreRun, this);if (0 != error_code) {RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;thread_ = 0;return false;}
}void* Thread::PreRun(void* pv) {Thread* thread = static_cast<Thread*>(pv);ThreadManager::Instance()->SetCurrentThread(thread);rtc::SetCurrentThreadName(thread->name_.c_str());//调用一个Run()函数thread->Run();}void Thread::Run()
{
// Forever 模式,一直轮训处理ProcessMessages(kForever);
}
//真正处理消息的函数,下文会详细介绍
bool Thread::ProcessMessages(int cmsLoop)
{while (true) {Message msg;// Get()函数从消息队列中取消息 if (!Get(&msg, cmsNext))return !IsQuitting();//取出消息后调用Dispatch()进行处理Dispatch(&msg);if (cmsLoop != kForever){cmsNext = static_cast<int>(TimeUntil(msEnd));if (cmsNext < 0)return true;}} }
向线程中投递消息
// |time_sensitive| is deprecated and should always be false.virtual void Post(const Location& posted_from,//是从哪个函数向线程中投递消息MessageHandler* phandler,//消息处理的类,一般是向线程抛消息的类的this指针,当线程轮训到该消息时通过该this指针再回调对应的处理函数uint32_t id = 0,//消息idMessageData* pdata = nullptr, //消息体bool time_sensitive = false);//废弃的参数virtual void PostDelayed(const Location& posted_from, //支持向线程抛入延迟消息int delay_ms,MessageHandler* phandler,uint32_t id = 0,MessageData* pdata = nullptr);virtual void PostAt(const Location& posted_from, int64_t run_at_ms,MessageHandler* phandler,uint32_t id = 0,MessageData* pdata = nullptr);// 看下Post的具体实现void Thread::Post(const Location& posted_from,MessageHandler* phandler,uint32_t id,MessageData* pdata,bool time_sensitive) {RTC_DCHECK(!time_sensitive);if (IsQuitting()) {delete pdata;return;}// Keep thread safe// Add the message to the end of the queue// Signal for the multiplexer to return{//注意这个大括号哈//数据进队列加锁,内部用的 pthread_mutex_lock(mutex_)//CritScope对 mutex_进行了封装,构造函数加锁、析构函数解锁CritScope cs(&crit_);Message msg;//构造消息体msg.posted_from = posted_from;msg.phandler = phandler;msg.message_id = id;msg.pdata = pdata;messages_.push_back(msg);}//CritScope退出作用区域后,调用对应的析构函数解锁//即pthread_mutex_unlock(&mutex_);函数//这种实现方式一方面缩小了锁的范围,锁的范围仅仅局限于大括号内部,而不是整个Post()函数//同时,退出临界区后自动调用析构函数释放锁,也避免了死锁的可能性//这个WakeUp* 函数是重点,它会唤醒当前等待的线程WakeUpSocketServer();
}
//看一下 WakeUpSocketServer()的实现
//最终是通过 pthread_cond_broadcast()
//唤醒当前所有处于pthread_cond_wait()的线程void Thread::WakeUpSocketServer() {ss_->WakeUp();
}
void NullSocketServer::WakeUp() {event_.Set();
} void Event::Set() {pthread_mutex_lock(&event_mutex_);event_status_ = true;//广播唤醒所有处于 pthread_cond_wait()的线程pthread_cond_broadcast(&event_cond_);pthread_mutex_unlock(&event_mutex_);
}
从消息队列中取消息的具体实现
//消息处理是从 Thread 的Run()函数开始
void Thread::Run() {// KForever字段,一直轮训取数据,//没有数据时会 wait() 阻塞等待ProcessMessages(kForever);
}
bool Thread::ProcessMessages(int cmsLoop) {int cmsNext = cmsLoop;while (true) {Message msg;//从消息队列中取消,//取出来后交给 Dispatch()进行处理if (!Get(&msg, cmsNext))return !IsQuitting();Dispatch(&msg);if (cmsLoop != kForever) {cmsNext = static_cast<int>(TimeUntil(msEnd));if (cmsNext < 0)return true;}}
}
//取消息的过程
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {// Return and clear peek if present// Always return the peek if it exists so there is Peek/Get symmetryif (fPeekKeep_) {*pmsg = msgPeek_;fPeekKeep_ = false;return true;}// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatchint64_t cmsTotal = cmsWait;int64_t cmsElapsed = 0;int64_t msStart = TimeMillis();int64_t msCurrent = msStart;while (true) {// Check for posted eventsint64_t cmsDelayNext = kForever; //一直训练bool first_pass = true;//具体实现是两层while(true)。内部的while负责取消息,//取不到时,外部while负责wait()阻塞等待while (true) {// All queue operations need to be locked, but nothing else in this loop// (specifically handling disposed message) can happen inside the crit.// Otherwise, disposed MessageHandlers will cause deadlocks.{//和像线程中投递消息类似,取消息时也先加锁CritScope cs(&crit_);// On the first pass, check for delayed messages that have been// triggered and calculate the next trigger time.if (first_pass) {//线程被唤醒后,只从延时队列中取一次//并且这一次会把所有到时需要处理的延时消息取完,//取出的延时消息放到messages_队列和普通消息一样进行处理first_pass = false;while (!delayed_messages_.empty()) {//当前时间,小于延时队列中第一条消息时间,//说明还没有到需要处理延时消息的时间,if (msCurrent < delayed_messages_.top().run_time_ms_) {//cmsDelayNext计算出需要等待的时间,//也是后面线程wait()时需要等待的最大时间,//因为到了这个时间,即便没有普通消息到来//延时队列中的消息也到时间需要处理了cmsDelayNext =TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);break;}//把到时需要处理的延时消息,放到普通队列中一起处理messages_.push_back(delayed_messages_.top().msg_);//延时消息出队列delayed_messages_.pop();}}// Pull a message off the message queue, if available.if (messages_.empty()) {break;} else {//真正获得需要处理消息的地方*pmsg = messages_.front();messages_.pop_front();}} // crit_ is released here.// If this was a dispose message, delete it and skip it.//如果是dispose废除的消息就会删除,//然后 continue()继续去取if (MQID_DISPOSE == pmsg->message_id) {RTC_DCHECK(nullptr == pmsg->phandler);delete pmsg->pdata;*pmsg = Message();continue;}//如果是需要处理的消息就return退出当前 Get()函数,//进行后面的Disptch()处理return true;}if (IsQuitting())break;// Which is shorter, the delay wait or the asked wait?int64_t cmsNext;if (cmsWait == kForever) {cmsNext = cmsDelayNext;} else {cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))cmsNext = cmsDelayNext;}// 如果延时消息队列和普通的消息队列中都没有消息,//内部while(true)会调用 break退出//然后就调用到这里,因为我们是 KForever一直轮训模式,//所以当队列中没有消息时,防止一直遍历查询,//会通过wait()挂起当前线程让出时间片{// Wait and multiplex in the meantime//内部调用的是 pthread_cond_wait,//并且在wait()时也加了锁if (!ss_->Wait(static_cast<int>(cmsNext), process_io))return false;}// If the specified timeout expired, returnmsCurrent = TimeMillis();cmsElapsed = TimeDiff(msCurrent, msStart);if (cmsWait != kForever) {if (cmsElapsed >= cmsWait)return false;}}return false;
}
处理线程消息
从消息队列中Get()获取消息后,会调用 Dispatch()处理消息。具体实现就是回调向线程中抛消息的类的OnMessage(pmg)函数,然后进行具体消息的处理:
void Thread::Dispatch(Message* pmsg) {TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",pmsg->posted_from.file_name(), "src_func",pmsg->posted_from.function_name());int64_t start_time = TimeMillis();//回调对应OnMessage(pmsg)函数进行消息处理pmsg->phandler->OnMessage(pmsg);int64_t end_time = TimeMillis();int64_t diff = TimeDiff(end_time, start_time);if (diff >= kSlowDispatchLoggingThreshold) {RTC_LOG(LS_INFO) << "Message took " << diff<< "ms to dispatch. Posted from: "<< pmsg->posted_from.ToString();}
}
而我们的处理类继承 rtc::MessageHandler,并实现了 OnMessage()函数,就可以基于对应的MessageID类型,处理不同的消息了
CVideoThread::OnMessage(rtc::Message* msg) {switch case:Message_id:handlerMessage();case VIDEO_INFO: //假如向线程中传入了 MessageData,//在线程回调时会把这个消息体带出来方便我们处理if(msg->pdata){rtc::TypedMessageData<VideoMessageData>* data = static_cast<rtc::TypedMessageData<VideoMessageData>*>(msg->pdata);string message = data->data();delete data;data = nullptr;}default:break;
}
以上就是webrtc的线程模块了,下一篇会介绍webrtc的 TaskQueue 任务队列
相关文章:
webrtc的线程模型
目录 线程的声明 线程创建过程 向线程中投递消息 从消息队列中取消息的具体实现 处理线程消息 webrtc线程模块的实现逻辑在 rtc_base\thread.h 文件中 比如想创建一个线程: //声明要创建的线程指针,通过智能指针管理 std::unique_ptr<rtc::Thr…...
数据库备份还原-mysqldump、mydumper、xtrabackup、压缩
目录 数据库备份,数据库为school,素材如下 一、创建student和score表 二、为student表和score表增加记录 三、练习题 数据库备份,数据库为school,素材如下 一、创建student和score表 CREATE TABLE student ( id INT(10) NOT…...
【黑马程序员前端】JavaScript入门到精通--20230801
B站链接 理论 HTML相关知识【黑马程序员前端】 https://blog.csdn.net/m0_48964052/article/details/125951658 CSS相关知识【黑马程序员前端】 https://blog.csdn.net/m0_48964052/article/details/125951788 黑马程序员——JavaScript基础1(初识 JavaS…...
100道Java多线程面试题(上)
线程创建方式? 线程有哪些基本状态? 如何停止一个正在运行的线程? 有三个线程T1,T2,T3,如何保证顺序执行? 在线程中你怎么处理不可控制异常? 如何创建线程池? 以下情况如何使用线程池?高并发、任务时间短;…...
web开发中的安全和防御入门——csp (content-security-policy内容安全策略)
偶然碰到iframe跨域加载被拒绝的问题,原因是父页面默认不允许加载跨域的子页面,也就是的content-security-policy中没有设置允许跨域加载。 简单地说,content-security-policy能限制页面允许和不允许加载的所有资源,常见的包括&a…...
定了!全国2023下半年软考(高级、中级、初级)报名时间汇总
截止到2023年8月2日,有以下地区公布了软考报名时间: 安徽软考2023下半年报名时间:8月15日9:00至8月21日16:00 黑龙江软考2023下半年报名时间:8月16日至8月22日 甘肃软考2023下半年报名时间:8月28日9:00至9月6日18:00…...
Linux下安装配置Redis
文章目录 安装依赖库上传安装包并解压 启动默认启动指定配置启动开机自启 安装 依赖库 Redis是基于C语言编写的,因此首先需要安装Redis所需要的gcc依赖: yum install -y gcc tcl上传安装包并解压 将Redis安装包上传到服务器的任意目录,例…...
深度学习(33)——CycleGAN(2)
深度学习(33)——CycleGAN(2) 完整项目在在这里:欢迎造访 文章目录 深度学习(33)——CycleGAN(2)1. Generator2. Discriminator3. fake pool4. loss定义5. 模型参数量6…...
WeakMap and WeakSet(弱映射和弱集合)
在垃圾回收中了解JavaScript 引擎在值“可达”和可能被使用时会将其保持在内存中 let john { name: "John" }; // 该对象能被访问,john 是它的引用 // 覆盖引用 john null; // 该对象将会被从内存中清除通常,当对象、数组之类的数据结构在内…...
【Vue3基础】组件保持存活、异步加载组件
一、组件保持存活 1、需求描述 点击按钮跳转到其他组件后,原组件不会被销毁 2、知识整理 1)组件生命周期 创建期:beforeCreate、created 挂载期:beforeMount、mounted 更新期:beforeUpdate、updated 销毁期&am…...
在 3ds Max 中使用相机映射将静止图像转换为实时素材
推荐: NSDT场景编辑器 助你快速搭建可二次开发的3D应用场景 1. 在 Photoshop 中准备图像 步骤 1 这是我将在教程中使用的静止图像。 这是我的静态相机纸箱的快照。 静止图像 步骤 2 打开 Photoshop。将图像导入 Photoshop。 打开 Photoshop 步骤 3 单击套索工…...
如何使用GIL解决Python多线程性能瓶颈
如何使用GIL解决Python多线程性能瓶颈 引言: Python是一种使用广泛的编程语言,但其在多线程方面存在一个性能瓶颈,即全局解释器锁(Global Interpreter Lock,简称GIL)。GIL会限制Python的多线程并行能力&am…...
k8s概念-深入pod
回到目录 工作负载(workloads) 工作负载(workload)是在kubernetes集群中运行的应用程序。无论你的工作负载是单一服务还是多个一同工作的服务构成,在kubernetes中都可以使用pod来运行它 workloads分为pod与control…...
Web服务器实验案例
目录 关闭或放行防火墙和selinux 1 搭建静态网站 2 建立两个基于ip地址访问的网站 思路: 简单配置 编写httpd额外文件 3 建立两个基于不同端口访问的网站 思路 创建文件(与之前一致) 额外文件配置 4 基于虚拟目录和用户控制的web网…...
预警 项目经验BUG
文章目录 定时任务 定时任务 在方法上使用Scheduled注解 cron参数: cron是一个表达式,最多接收7个参数从左到右分别表示:秒 分 时 天 月 周 年参数以空格隔开,其中年不是必须参数,可以省略。示例:Schedule…...
基于RFID技术的猪舍门读卡器
随着科技的发展和普及,智能化管理在各个领域的应用越来越广泛。在畜牧业中,将RFID技术应用在养殖管理中的企业也越来越多,为养殖企业的智能化管理提供了有力的支持,本文将介绍RFID技术的猪舍门读卡器的应用。 一、RFID技术简介 …...
亚马逊店铺的回款周期是多久?
现如今,开亚马逊店铺可是一个技术活,一旦有一个环节,或者是一件事情没有做好,对整个亚马逊店铺过程中影响都是十分巨大的,不少亚马逊卖家就吃过这方面的亏。 很多亚马逊卖家就是吃亏在这些方面,现在要想开…...
剑指offer19.正则表达式
这道题我一看就有印象,我室友算法课设抽到这题,他当时有个bug让我帮他看一下,然后我就大概看了一下他的算法,他是用动态规划写的,用了一个二维数组,然后我就试着按照这个思路去写,想了一会还是没…...
Mac Navicat 16试用脚本
一、无限试用脚本如下 #!/bin/bash #/usr/libexec/PlistBuddy -c "print" ~/Library/Preferences/com.navicat.NavicatPremium.plist /usr/libexec/PlistBuddy -c "Delete :91F6C435D172C8163E0689D3DAD3F3E9" ~/Library/Preferences/com.navicat.Navica…...
什么是 webpack?
Webpack 介绍 什么是 webpack? :::tip 官方描述 webpack 是一个用于现代 JavaScript 应用程序的静态模块打包工具。当 webpack 处理应用程序时,它会在内部从一个或多个入口点构建一个 依赖图(dependency graph),然后将你项目中所需的每一个…...
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
系统设计 --- MongoDB亿级数据查询优化策略
系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...
CocosCreator 之 JavaScript/TypeScript和Java的相互交互
引擎版本: 3.8.1 语言: JavaScript/TypeScript、C、Java 环境:Window 参考:Java原生反射机制 您好,我是鹤九日! 回顾 在上篇文章中:CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...
