ZLMediaKit 源码分析——[5] ZLToolKit 中EventPoller之延时任务处理
系列文章目录
第一篇 基于SRS 的 WebRTC 环境搭建
第二篇 基于SRS 实现RTSP接入与WebRTC播放
第三篇 centos下基于ZLMediaKit 的WebRTC 环境搭建
第四篇 WebRTC学习一:获取音频和视频设备
第五篇 WebRTC学习二:WebRTC音视频数据采集
第六篇 WebRTC学习三:WebRTC音视频约束
第七篇 WebRTC学习四:WebRTC常规视觉滤镜
第八篇 WebRTC学习五:从视频中提取图片
第九篇 WebRTC学习六:MediaStream 常用API介绍
第十篇 WebRTC学习七:WebRTC 中 STUN 协议详解
ZLMediaKit源码分析——[1] 开篇:基础库 ZLToolKit 之 onceToken 源码分析
ZLMediaKit 源码分析——[2] 从 ZLToolKit 代码看 CPU 亲和性设计
ZLMediaKit 源码分析——[3] ZLToolKit 中EventPoller之网络事件处理
ZLMediaKit 源码分析——[4] ZLToolKit 中EventPoller之异步任务处理
ZLMediaKit 源码分析——[5] ZLToolKit 中EventPoller之延时任务处理
文章目录
- 系列文章目录
- 前言
- 一、整体设计思路
- 二、源码分析
- 2.1 延时任务处理函数
- 2.2 getMinDelay 函数
- 2.3 flushDelayTask 函数
- 三、延时任务触发用户接口函数doDelayTask
- 四、设计亮点与启发
- 4.1 并发性能优化
- 4.2 可重复任务处理
- 4.3 异步插入任务
- 五、实际应用场景
- 5.1 定时任务
- 5.2 超时检测
- 总结
前言
前面两篇文章中已经讲到了EventPoller中的网络事件处理机制和异步任务处理机制,有了前面两篇文章的IO框架和异步任务的基础,今天的延时任务的内容就比较容易理解了,今天我们就一鼓作气,把ZLMediaKit中延时任务的机制和实现细节分析一下,结束掉EventPoller的分析。
一、整体设计思路
EventPoller 中的延时任务处理主要基于一个有序的任务队列 _delay_task_map。这个队列以任务的执行时间戳作为键,以任务对象作为值,按照时间戳从小到大的顺序排列。通过这种方式,我们可以很方便地找到最近需要执行的任务。
在处理延时任务时,主要涉及三个关键函数:flushDelayTask、getMinDelay 和 doDelayTask以及数据结构std::multimap<uint64_t, DelayTask::Ptr> _delay_task_map,下面我们将分别对这三个函数和_delay_task_map进行详细分析。
二、源码分析
2.1 延时任务处理函数
void EventPoller::runLoop(bool blocked, bool ref_self) {if (blocked) {if (ref_self) {s_current_poller = shared_from_this();}_sem_run_started.post();_exit_flag = false;uint64_t minDelay;
#if defined(HAS_EPOLL)struct epoll_event events[EPOLL_SIZE];while (!_exit_flag) {minDelay = getMinDelay();startSleep();//用于统计当前线程负载情况int ret = epoll_wait(_event_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);sleepWakeUp();//用于统计当前线程负载情况if (ret <= 0) {//超时或被打断continue;}// ...其它处理代码}
#elif defined(HAS_KQUEUE)// ... 其他系统的处理代码
#else// ... 其他系统的处理代码
#endif //HAS_EPOLL} else {_loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self);_sem_run_started.wait();}
}
与live555中在最后处理延时队列中的任务不一样,在ZLToolKit中会优先处理延时队列任务,在消息循环函数中会调用getMinDelay(),所有延时队列任务的处理将在这个函数里完成。
2.2 getMinDelay 函数
uint64_t EventPoller::getMinDelay() {// 查找最早延时任务auto it = _delay_task_map.begin();if (it == _delay_task_map.end()) {//没有剩余的定时器了return 0;}// 获取当前时间auto now = getCurrentMillisecond();// 如果最早任务的执行时间大于当前时间,说明所有任务都尚未到期,函数返回最早任务执行时间与当前时间的差值,即需要等待的时间。if (it->first > now) {//所有任务尚未到期return it->first - now; // 计算需要等待的时间}//执行已到期的任务并刷新休眠延时return flushDelayTask(now);
}
这里首先获取_delay_task_map 的首元素,_dealy_task_map定义是std::multimap<uint64_t, DelayTask::Ptr> _delay_task_map; std::multimap是C++标准库中的一个容器,它存储的元素都是键值对,并且允许有重复的键。这与std::map不同,后者不允许有重复的键。那么这个函数的意义就很明确了:
1、查找最早延时任务,没有返回0;
2、如果所有任务都尚未到期,函数返回需要等待的时间;
3、执行已到期的任务并刷新休眠延时,返回下一个未到期任务执行时所需的最小延时时间。
我们接下来看下执行已到期的任务并刷新休眠延时 flushDelayTask(now)函数里面做了些什么事情。
2.3 flushDelayTask 函数
uint64_t EventPoller::flushDelayTask(uint64_t now_time) {// 交换任务队列,将当前任务转移到临时容器,避免处理期间阻塞新任务写入。decltype(_delay_task_map) task_copy; // 自动推导_delay_task_map类型,task_copy初始化为空task_copy.swap(_delay_task_map); // 交换 task_copy 和 _delay_task_map 的内容。task_copy 持有原来的 _delay_task_map 数据。_delay_task_map 变为空容器。// 处理到期任务:执行所有时间戳 ≤ now_time 的任务,并根据返回值决定是否重新调度。for (auto it = task_copy.begin(); it != task_copy.end() && it->first <= now_time; it = task_copy.erase(it)) {//已到期的任务try {auto next_delay = (*(it->second))();if (next_delay) {//可重复任务,更新时间截止线_delay_task_map.emplace(next_delay + now_time, std::move(it->second));}} catch (std::exception &ex) {ErrorL << "Exception occurred when do delay task: " << ex.what();}}// 合并新任务和未处理任务,确保未到期任务和新添加的重复任务保留在队列中。task_copy.insert(_delay_task_map.begin(), _delay_task_map.end());task_copy.swap(_delay_task_map);auto it = _delay_task_map.begin();if (it == _delay_task_map.end()) {//没有剩余的定时器了return 0;}//最近一个定时器的执行延时return it->first - now_time;
}
EventPoller::flushDelayTask(uint64_t now_time) 函数的主要功能是处理所有已到期的延时任务,并根据任务的返回值决定是否重新调度这些任务,最后返回距离下一个未到期任务执行所需的最小延时时间。
三、延时任务触发用户接口函数doDelayTask
延时任务的外部调用接口为doDelayTask, 所有想要添加一个延时任务,需要通过调用doDelayTask添加,传入第一个参数为需要延时的毫秒值,第二个参数为task 任务,task任务返回值为0时代表不再重复任务,否则为下次执行延时,如果任务中抛异常,那么默认不重复任务。doDelayTask的返回值为DelayTask::Ptr,定义为 using DelayTask = TaskCancelableImp<uint64_t(void)>;是一个可取消的任务,意味着在还没到任务执行之前是可以取消的。
EventPoller::DelayTask::Ptr EventPoller::doDelayTask(uint64_t delay_ms, function<uint64_t()> task) {DelayTask::Ptr ret = std::make_shared<DelayTask>(std::move(task));auto time_line = getCurrentMillisecond() + delay_ms; // 当前时间+需要延时执行的时间async_first([time_line, ret, this]() {//异步执行的目的是刷新select或epoll的休眠时间_delay_task_map.emplace(time_line, ret);});return ret;
}
该函数的主要功能是创建一个延时任务,将其添加到延时任务映射_delay_task_map中,该任务将在指定的延时时间(delay_ms)后执行。
四、设计亮点与启发
4.1 并发性能优化
通过使用任务队列交换的方式,避免了在处理任务时阻塞新任务的添加,提高了程序的并发性能。这种设计思路在处理高并发场景下的任务调度非常有效。
4.2 可重复任务处理
支持可重复执行的任务,通过任务返回的延时时间来重新调度任务,增加了任务处理的灵活性。这种设计使得程序可以方便地实现定时任务的功能。
4.3 异步插入任务
使用异步方式插入任务,确保了系统能够及时响应新的延时任务。这在网络编程中尤为重要,因为网络事件的处理需要高实时性。
五、实际应用场景
5.1 定时任务
可以使用 doDelayTask 函数来实现定时任务,例如定时清理缓存、定时发送心跳包等。
如zlToolKit中定时器的实现
Timer::Timer(float second, const std::function<bool()> &cb, const EventPoller::Ptr &poller) {_poller = poller;if (!_poller) {_poller = EventPollerPool::Instance().getPoller();}_tag = _poller->doDelayTask((uint64_t) (second * 1000), [cb, second]() {try {if (cb()) {//重复的任务return (uint64_t) (1000 * second);}//该任务不再重复return (uint64_t) 0;} catch (std::exception &ex) {ErrorL << "Exception occurred when do timer task: " << ex.what();return (uint64_t) (1000 * second);}});
}
这里Timer构造函数通过事件轮询器构建了一个定时任务,该任务会在指定的时间间隔后执行一个回调函数。根据回调函数的返回值,定时器可以选择重复执行或停止执行。同时,代码中对回调函数的异常进行了捕获和处理,确保在出现异常时定时器仍然可以继续运行。
5.2 超时检测
在网络通信中,经常需要检测连接是否超时。可以通过设置延时任务来实现超时检测,当任务到期时,如果连接还没有收到响应,则认为连接超时。
比如MediaSource.cpp 的findAsync_l函数中代码:
auto on_timeout = poller->doDelayTask(maxWaitMS, [cb_once, listener_tag]() {// 最多等待一定时间,如在这个时间内,流还未注册上,则返回空NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);cb_once(nullptr);return 0;});
这里如果在最大等待时间里没有找到媒体源,将触发超时回调。
总结
ZLToolKit 中的 EventPoller 提供了一套高效、灵活的延时任务处理机制。通过有序的任务队列、任务队列交换、异步插入任务等技术手段,保证了程序的高并发性能和实时性。在实际应用中,我们可以根据具体需求灵活运用这些功能,实现各种复杂的定时任务和超时检测功能。
希望通过本文的分析,大家能够对 ZLMediaKit 中 EventPoller 的延时任务处理机制有更深入的理解,并在自己的项目中借鉴其设计思路。
相关文章:
ZLMediaKit 源码分析——[5] ZLToolKit 中EventPoller之延时任务处理
系列文章目录 第一篇 基于SRS 的 WebRTC 环境搭建 第二篇 基于SRS 实现RTSP接入与WebRTC播放 第三篇 centos下基于ZLMediaKit 的WebRTC 环境搭建 第四篇 WebRTC学习一:获取音频和视频设备 第五篇 WebRTC学习二:WebRTC音视频数据采集 第六篇 WebRTC学习三…...
元宇宙浪潮下,前端开发如何“乘风破浪”?
一、元宇宙对前端开发的新要求 元宇宙的兴起,为前端开发领域带来了全新的挑战与机遇。元宇宙作为一个高度集成、多维互动的虚拟世界,要求前端开发不仅具备传统网页开发的能力,还需要掌握虚拟现实(VR)、增强现实&#…...
2025年3月 Scratch 图形化(二级)真题解析 中国电子学会全国青少年软件编程等级考试
2025.03Scratch图形化编程等级考试二级真题试卷 一、选择题 第 1 题 甲、乙、丙、丁、戊五人参加100米跑比赛,甲说:“我的前面至少有两人,但我比丁快。”乙说:“我的前面是戊。”丙说:“我的后面还有两个人。”请从前往后(按照速度快慢&a…...
【新能源汽车整车动力学模型深度解析:面向MATLAB/Simulink仿真测试工程师的硬核指南】
1. 前言 作为MATLAB/Simulink仿真测试工程师,掌握新能源汽车整车动力学模型的构建方法和实现技巧至关重要。本文将提供一份6000+字的深度技术解析,涵盖从基础理论到Simulink实现的完整流程。内容经过算法优化设计,包含12个核心方程、6大模块实现和3种验证方法,满足SEO流量…...
MCP协议的Streamable HTTP:革新数据传输的未来
引言 在数字化时代,数据传输的效率和稳定性是推动技术进步的关键。MCP(Model Context Protocol)作为AI生态系统中的重要一环,通过引入Streamable HTTP传输机制,为数据交互带来了革命性的变化。本文将深入解读MCP协议的…...
dify中配置使用Ktransformer模型
一共是两个框架一个是Ktransformer,一个是dify。 Ktransformer用来部署LLM,比如Deepseek,而LLm的应用框架平台Dify主要用来快速搭建基于LLM应用。 这篇教程主要是用来介绍两个框架的交互与对接的,不是部署Ktransformer也部署部署Dify,要部署Dify、Ktransformer可以直接参考…...
从代码学习深度学习 - GRU PyTorch版
文章目录 前言一、GRU模型介绍1.1 GRU的核心机制1.2 GRU的优势1.3 PyTorch中的实现二、数据加载与预处理2.1 代码实现2.2 解析三、GRU模型定义3.1 代码实现3.2 实例化3.3 解析四、训练与预测4.1 代码实现(utils_for_train.py)4.2 在GRU.ipynb中的使用4.3 输出与可视化4.4 解析…...
二叉树 递归
本篇基于b站灵茶山艾府的课上例题与课后作业。 104. 二叉树的最大深度 给定一个二叉树 root ,返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1: 输入:root [3,9,20,null,null,15,7] 输出&…...
#SVA语法滴水穿石# (002)关于 |-> + ##[min:max] 的联合理解
今天,我们着重理解一些概念。依靠死记硬背去理解知识点,是不长久的,必须深刻理解知识点的精髓,才能长久记忆。 先看如下的代码: property a2b_p; //描述属性@(posedge clk) $rose(tagError) |-> ##[2:4] $rose(tErrorBit); endproperty a2b_a: asser…...
反常积分和定积分的应用 2
世界尚有同类 前言伽马函数的推论关于数学的思考平面图形的面积笛卡尔心形线伯努利双纽线回顾参数方程求面积星型线摆线 旋转体体积一般轴线旋转被积函数有负数部分曲线的弧长最后一个部分内容-旋转曲面侧表面积直角坐标系极坐标系参数方程 总结 前言 力大出奇迹。好好加油。 …...
新零售系统是什么样的?有什么好处?
一、新零售系统的核心架构与特征 技术驱动的分层架构 **前端展示层:**支持多终端适配(如APP、小程序、线下智能设备),采用响应式设计提升用户体验。 **业务中台层:**基于微服务架构(如Spring Clou…...
Element-plus弹出框popover,使用自定义的图标选择组件
自定义的图标选择组件是若依的项目的 1. 若依的图标选择组件 js文件,引入所有的svg图片 let icons [] // 注意这里的路径,一定要是自己svg图片的路径 const modules import.meta.glob(./../../assets/icons/svg/*.svg); for (const path in modules)…...
16进制在蓝牙传输中的应用
在蓝牙传输中,16进制(Hexadecimal)是一种常用的数据表示方法。它主要用于描述数据包的内容、地址、命令、参数等信息。以下是16进制在蓝牙传输中的具体应用场景和作用: 1. 数据包的表示 蓝牙通信中,所有数据最终都以二…...
思维链 Chain-of-Thought(COT)
思维链 Chain-of-Thought(COT):思维链的启蒙 3. 思维链 Chain-of-Thought(COT)存在问题?2. 思维链 Chain-of-Thought(COT)是思路是什么?1. 什么是 思维链 Chain-of-Thoug…...
硬件电路(23)-输入隔离高低电平有效切换电路
一、概述 项目中为了防止信号干扰需要加一些隔离电路,而且有时传感器的信号是高有效有时是低有效,所以基于此背景,设计了一款方便实现高低电平有效检测切换电路。 二、应用电路...
多表查询的多与一
1.查寻表需要的条件 1.1.首先我们要了解查询表有哪些 1.1.1.多对一 多对一就是一个年表拥有例外一个表的多条数据 一个表对应立一个表的多条数据,另一个表对应这个表的多条数据 这个点被称为多对一 1.1.2.多对多 多对多简单来说就是需要一个中间商 中间商就…...
大模型学习二:DeepSeek R1+蒸馏模型组本地部署与调用
一、说明 DeepSeek R1蒸馏模型组是基于DeepSeek-R1模型体系,通过知识蒸馏技术优化形成的系列模型,旨在平衡性能与效率。 1、技术路径与核心能力 基础架构与训练方法 DeepSeek-R1-Zero:通过强化学习(RL)训练&…...
相机的曝光和增益
文章目录 曝光增益增益原理主要作用增益带来的影响增益设置与应用 曝光 参考:B站优致谱视觉 增益 相机增益是指相机在拍摄过程中对图像信号进行放大的一种操作,它在提高图像亮度和增强图像细节方面起着重要作用,以下从原理、作用、影响以…...
Linux内核物理内存组织结构
一、系统调用sys_mmap 系统调用mmap用来创建内存映射,把创建内存映射主要的工作委托给do_mmap函数,内核源码文件处理:mm/mmap.c 二、系统调用sys_munmap 1、vma find_vma (mm, start); // 根据起始地址找到要删除的第一个虚拟内存区域 vma 2…...
【PostgreSQL内核学习:深入理解 PostgreSQL 中的 tuplesort_performsort 函数】
深入理解 PostgreSQL 中的 tuplesort_performsort 函数 函数概述函数源码函数签名核心功能相关函数简介 代码结构与逻辑分析1. 内存上下文切换2. 调试跟踪(可选)3. 状态机逻辑(switch 分支)4. 调试跟踪(完成时…...
谷歌 Gemini 2.5 Pro 免费开放
2025 年 3 月 30 日,谷歌宣布将最新的 Gemini AI 旗舰模型 Gemini 2.5 Pro 免费向所有 Gemini 应用用户开放。以下是关于此次免费开放的一些具体信息1: 背景:此前,Gemini 2.5 Pro 仅向支付 19.99 美元月费的 Gemini Advanced 用户…...
(多看) CExercise_05_1函数_1.2计算base的exponent次幂
题目: 键盘录入两个整数:底(base)和幂指数(exponent),计算base的exponent次幂,并打印输出对应的结果。(注意底和幂指数都可能是负数) 提示:求幂运算时,基础的思路就是先无脑把指数转…...
leetcode刷题 - 数组理论基础
数组是内存空间连续存储、相同类型数据的集合。遍历方式:下标索引 下标:从 0 开始 数组的元素不能删除,只能覆盖 定义一维数组: int arr0[10]; int arr1[10] { 100, 90,80,70,60,50,40,30,20,10 }; int arr2[ ] { 100,90,80,7…...
Jetpack Compose `ACTION_HOVER_EXIT` 事件异常解决方案
Jetpack Compose 1.6.6 版本中 ACTION_HOVER_EXIT 事件异常解决方案 问题现象 在 Android 应用开发中使用 Jetpack Compose 1.6.6 版本时,部分设备会出现以下崩溃日志: java.lang.IllegalStateException: The ACTION_HOVER_EXIT event was not cleare…...
Vuue2 element-admin管理后台,Crud.js封装表格参数修改
需求 表格数据调用列表接口,需要多传一个 Type字段,而Type字段的值 需要从跳转页面Url上面获取到,并赋值给Type,再传入列表接口中,最后拿到表格数据并展示 遇到的问题 需求很简单,但是因为表格使用的是统…...
Tiktok矩阵运营中使用云手机的好处
Tiktok矩阵运营中使用云手机的好处 云手机在TikTok矩阵运营中能够大幅提高管理效率、降低封号风险,并节省成本,是非常实用的运营工具。TikTok矩阵运营使用云手机有很多优势,特别是对于需要批量管理账号、提高运营效率的团队来说。以下是几个…...
Linux下调试器gdb_cgdb使用
文章目录 一、样例代码二、使用watchset var确定问题原因条件断点 一、样例代码 #include <stdio.h>int Sum(int s, int e) {int result 0;int i;for(i s; i < e; i){result i;}return result; }int main() {int start 1;int end 100;printf("I will begin…...
Vite环境下解决跨域问题
在 Vite 开发环境中,可以通过配置代理来解决跨域问题。以下是具体步骤: 在项目根目录下找到 vite.config.js 文件:如果没有,则需要创建一个。配置代理:在 vite.config.js 文件中,使用 server.proxy 选项来…...
超简单:Linux下opencv-gpu配置
1.下载opencv和opencv_contrib安装包 1)使用命令下 git clone https://github.com/opencv/opencv.git -b 4.9.0 git clone https://github.com/opencv/opencv_contrib.git -b 4.9.02)复制链接去GitHub下载然后上传到服务器 注意:看好版本&a…...
【matplotlib参数调整】
1. 基本绘图函数常用参数 折线图 import matplotlib.pyplot as plt import numpy as npx np.linspace(0, 10, 100) y np.sin(x)plt.plot(x, y, colorred, linestyle--, linewidth2,markero, markersize5, labelsin(x), alpha0.8) plt.title(折线图示例) plt.xlabel(X 轴) p…...
