当前位置: 首页 > news >正文

(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器

1. 背景介绍

在先前的博客文章中,我们已经搭建了一个基于SRS的流媒体服务器。现在,我们希望通过Web接口来控制这个服务器的行为,特别是对于正在进行的 RTSP 转码任务的管理。这将使我们能够在不停止整个服务器的情况下,动态地启动或停止摄像头的转码过程。

Docker部署 SRS rtmp/flv流媒体服务器-CSDN博客文章浏览阅读360次,点赞7次,收藏5次。SRS(Simple Realtime Server)是一款开源的流媒体服务器,具有高性能、高可靠性、高灵活性的特点,能够支持直播、点播、转码等多种流媒体应用场景。SRS 不仅提供了流媒体服务器,还提供了适用于多种平台的客户端 SDK 和在线转码等辅助服务,是一款十分强大的流媒体解决方案。https://blog.csdn.net/m0_56659620/article/details/135400510?spm=1001.2014.3001.5501

2. 技术选择

在选择技术方案时,考虑到构建视频流转码服务的需求,我们将采用Python编程语言,并结合asyncio和aiohttp库。这一选择基于异步框架的优势,以下是对异步框架和同步框架在视频流转码场景中的优缺点的明确总结:

异步框架的优势:

  • 高并发处理: 异步框架通过非阻塞方式处理请求,能够高效处理大量并发请求,确保系统在高负载下保持稳定性。
  • 异步I/O: 支持异步I/O操作,允许在等待I/O操作完成的同时继续处理其他请求,提高整体效率。
  • 资源利用率高: 能够更有效地利用系统资源,同时处理多个请求,提高视频转码效率。
  • 事件驱动: 采用事件驱动模型,适应实时性要求高的视频流处理,能够立即响应新的转码请求。

同步框架的缺点:

  • 阻塞: 阻塞调用可能导致整个程序停滞,尤其在处理大文件或网络请求时可能引发性能问题,特别是在高并发场景下。
  • 低并发: 每个请求需要独立的线程或进程,可能导致系统资源耗尽,降低并发处理能力,对于需要同时处理多个视频流的情况可能不够高效。

考虑到处理大量并发请求、提高系统性能和响应性的需求,采用异步框架是更为合适的选择。异步框架的高并发处理能力、异步I/O支持、高资源利用率以及事件驱动的特性使其更适用于实时性要求较高的视频流转码服务。

3. 代码实现(必须在linux系统运行,4步骤为部署攻略)

3.1 导入必要的库

首先,我们导入所需的库,包括asyncio、aiohttp、aiohttp_cors和logging。

import asyncio
from aiohttp import web
import aiohttp_cors
import logging

3.2 设置日志

logging.basicConfig(level=logging.INFO)

3.3 配置并发控制和任务跟踪

设置最大同时运行的ffmpeg子进程数量,并使用Semaphore限制并发进程数量。同时,使用字典跟踪正在进行的转码任务。

MAX_CONCURRENT_PROCESSES = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)
transcoding_tasks = {}

3.4 定义启动和停止转码任务的方法

定义启动和停止 RTSP 转码任务的方法

# 开始转码方法
async def perform_transcoding(ip, camera_id, rtmp_server):# 检查相同RTSP是否已有子进程在处理if camera_id in transcoding_tasks:return transcoding_tasks[camera_id]# 使用Semaphore限制并发进程数量async with semaphore:# 实际的转码操作,这里需要调用ffmpeg或其他工具ffmpeg_command = ['ffmpeg','-rtsp_transport', 'tcp','-i', ip,'-c:v', 'libx264','-c:a', 'aac','-f', 'flv',f'{rtmp_server}/live/livestream{camera_id}']# 创建异步子进程process = await asyncio.create_subprocess_exec(*ffmpeg_command)# 将任务添加到字典中transcoding_tasks[camera_id] = process# 等待子进程完成await process.communicate()# 从字典中移除已完成的任务transcoding_tasks.pop(camera_id, None)# 停止转码方法
async def stop_transcoding(camera_id):# 停止转码任务if camera_id in transcoding_tasks:process = transcoding_tasks[camera_id]process.terminate()  # 发送终止信号await process.wait()  # 等待进程结束# 从字典中移除已停止的任务transcoding_tasks.pop(camera_id, None)

3.5 定义Web接口路由

定义Web接口路由,包括启动摄像头转码、停止摄像头转码和停止所有摄像头转码的路由。

# 开始转码任务
async def play_camera(request):data = await request.post()# 从表单数据中获取摄像头的ID和rtsp流camera_id = data.get('id')rtsp = data.get('ip')# 这里设置你的 RTMP 服务器地址rtmp_server = 'rtmp://192.168.14.93:1935'# 执行实际的转码操作task = await perform_transcoding(rtsp, camera_id, rtmp_server)# 返回包含转码后的RTMP URL的JSON响应rtmp_url = f'http://192.168.14.93:8080/live/livestream{camera_id}.flv'return web.json_response({'message': '转码启动成功', 'flv_data': rtmp_url})# 停止转码任务
async def stop_camera(request):data = await request.post()camera_id = data.get('id')# 停止指定摄像头的转码任务await stop_transcoding(camera_id)return web.json_response({'code':200,'message': '转码停止成功'})# 如果页面进行刷新或者关闭停止全部转码任务
async def stop_all_camera(request):# 获取所有正在运行的任务的列表tasks = [stop_transcoding(camera_id) for camera_id in transcoding_tasks.keys()]# 并发停止所有任务await asyncio.gather(*tasks)# 清空字典,表示所有任务都已停止transcoding_tasks.clear()return web.json_response({'code':200,'message': '转码停止成功'})

3.6 创建Web应用和配置CORS

创建Web应用,配置CORS(跨域资源共享)中间件,以确保接口可以被跨域访问。

app = web.Application()# CORS配置
cors = aiohttp_cors.setup(app, defaults={"*": aiohttp_cors.ResourceOptions(allow_credentials=True,expose_headers="*",allow_headers="*",)
})

3.7 添加Web接口路由

添加Web接口路由,包括启动摄像头转码、停止摄像头转码和停止所有摄像头转码的路由。

app.router.add_route('POST', '/play_camera', play_camera)  # 开始转码任务路由
app.router.add_route('POST', '/stop_camera', stop_camera)  # 停止转码任务路由
app.router.add_route('POST', '/stop_all_camera', stop_all_camera)  # 停止全部转码任务路由

3.8 添加CORS中间件

添加CORS中间件,确保接口可以被跨域访问。

# 添加 CORS 中间件
for route in list(app.router.routes()):cors.add(route)

3.9 运行Web应用

运行Web应用,监听指定的主机和端口。

if __name__ == '__main__':web.run_app(app, host='0.0.0.0', port=7000,access_log=logging.getLogger())

 3.10 完整代码

import asyncio
from aiohttp import web
import aiohttp_cors
import logging# 设置日志级别
logging.basicConfig(level=logging.INFO)# 最大同时运行的ffmpeg子进程数量
MAX_CONCURRENT_PROCESSES = 5# 使用Semaphore限制并发进程数量
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)# 字典用于跟踪正在进行的转码任务
transcoding_tasks = {}# 开始转码方法
async def perform_transcoding(ip, camera_id, rtmp_server):# 检查相同RTSP是否已有子进程在处理if camera_id in transcoding_tasks:return transcoding_tasks[camera_id]# 使用Semaphore限制并发进程数量async with semaphore:# 实际的转码操作,这里需要调用ffmpeg或其他工具ffmpeg_command = ['ffmpeg','-rtsp_transport', 'tcp','-i', ip,'-c:v', 'libx264','-c:a', 'aac','-f', 'flv',f'{rtmp_server}/live/livestream{camera_id}']# 创建异步子进程process = await asyncio.create_subprocess_exec(*ffmpeg_command)# 将任务添加到字典中transcoding_tasks[camera_id] = process# 等待子进程完成await process.communicate()# 从字典中移除已完成的任务transcoding_tasks.pop(camera_id, None)# 停止转码方法
async def stop_transcoding(camera_id):# 停止转码任务if camera_id in transcoding_tasks:process = transcoding_tasks[camera_id]process.terminate()  # 发送终止信号await process.wait()  # 等待进程结束# 从字典中移除已停止的任务transcoding_tasks.pop(camera_id, None)# 开始转码任务
async def play_camera(request):data = await request.post()# 从表单数据中获取摄像头的ID和rtsp流camera_id = data.get('id')rtsp = data.get('ip')# 这里设置你的 RTMP 服务器地址rtmp_server = 'rtmp://192.168.14.93:1935'# 执行实际的转码操作task = await perform_transcoding(rtsp, camera_id, rtmp_server)# 返回包含转码后的RTMP URL的JSON响应rtmp_url = f'http://192.168.14.93:8080/live/livestream{camera_id}.flv'return web.json_response({'message': '转码启动成功', 'flv_data': rtmp_url})# 停止转码任务
async def stop_camera(request):data = await request.post()camera_id = data.get('id')# 停止指定摄像头的转码任务await stop_transcoding(camera_id)return web.json_response({'code':200,'message': '转码停止成功'})# 如果页面进行刷新或者关闭停止全部转码任务
async def stop_all_camera(request):# 获取所有正在运行的任务的列表tasks = [stop_transcoding(camera_id) for camera_id in transcoding_tasks.keys()]# 并发停止所有任务await asyncio.gather(*tasks)# 清空字典,表示所有任务都已停止transcoding_tasks.clear()return web.json_response({'code':200,'message': '转码停止成功'})app = web.Application()# CORS配置
cors = aiohttp_cors.setup(app, defaults={"*": aiohttp_cors.ResourceOptions(allow_credentials=True,expose_headers="*",allow_headers="*",)
})app.router.add_route('POST', '/play_camera', play_camera)  # 开始转码任务路由
app.router.add_route('POST', '/stop_camera', stop_camera)  # 停止转码任务路由
app.router.add_route('POST', '/stop_all_camera', stop_all_camera)  # 停止全部转码任务路由# 添加 CORS 中间件
for route in list(app.router.routes()):cors.add(route)if __name__ == '__main__':web.run_app(app, host='0.0.0.0', port=7000,access_log=logging.getLogger())

4. 部署(Docker环境)

部署所需Dockerfile文件代码如下

FROM python:3.7-slimWORKDIR /appCOPY requirements.txt .RUN apt-get update \&& apt-get install -y ffmpeg \&& rm -rf /var/lib/apt/lists/* \&& pip install --no-cache-dir -r requirements.txtCOPY . .CMD ["python", "async_io_io.py"]

部署所需requirements.txt如下

aiohttp
aiohttp-cors
ffmpeg

根目录进行打包及启动

请求接口实现转码

5. 总结

通过以上的步骤,我们成功构建了一个流媒体服务器控制接口,可以通过Web接口实现对摄像头的 RTSP 转码任务的动态管理。这个接口可以集成到现有的流媒体服务器中,提供更多控制和管理的可能性。

相关文章:

(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器

1. 背景介绍 在先前的博客文章中,我们已经搭建了一个基于SRS的流媒体服务器。现在,我们希望通过Web接口来控制这个服务器的行为,特别是对于正在进行的 RTSP 转码任务的管理。这将使我们能够在不停止整个服务器的情况下,动态地启动…...

基于STM32微控制器的四轮智能小车控制系统设计

标题:基于STM32微控制器的四轮智能小车控制系统设计与实现 摘要: 本文针对移动机器人领域的应用需求,详细介绍了基于STM32系列单片机(以STM32F103C8T6为例)为核心的四轮小车控制系统的设计和实现过程。该系统集成了电…...

JPA的复杂查询包括一对多多对一和多对多的查询

1. 多表关联查询和排序 假设我们有两个实体类:Customer和Order,它们之间是一对多的关系,即一个客户可以有多个订单。我们想要查询某个客户的所有订单,并按订单金额进行降序排序。 Entity Table(name "customers") pu…...

电脑文件mfc100u.dll丢失的解决方法分析,怎么修复mfc100u.dll靠谱

mfc100u.dll丢失了要怎么办?其实很多人都遇到过这样的电脑故障吧,说这个mfc100u.dll文件已经不见了,然后一些程序打不开了,那么这种情况我们要怎么解决呢?今天我们就来给大家详细的说说mfc100u.dll丢失的解决方法。 一…...

从DETR到Mask2former(2): 损失函数loss function

DETR的损失函数包括几个部分,如果只看论文或者代码,比较难理解,最好是可以打断点调试,对照着论文看。但是现在DETR模型都已经被集成进各种框架中,很难进入内部打断掉调试。与此同时,数据的label的前处理也比…...

Java21 + SpringBoot3集成WebSocket

文章目录 前言相关技术简介什么是WebSocketWebSocket的原理WebSocket与HTTP协议的关系WebSocket优点WebSocket应用场景 实现方式1. 添加maven依赖2. 添加WebSocket配置类,定义ServerEndpointExporter Bean3. 定义WebSocket Endpoint4. 前端创建WebSocket对象 总结 前…...

鲸鱼优化算法WOA改进预告

鲸鱼优化算法(Whale Optimization Algorithm,WOA)是一种基于自然界中鲸鱼群体行为的启发式优化算法。这个算法模拟了鲸鱼的觅食行为和社会行为,通过模拟这些行为来解决优化问题。 以下是鲸鱼优化算法的一些关键特点和步骤&#x…...

Nightingale 夜莺监控系统 - 告警篇(3)

Author:rab 官方文档:https://flashcat.cloud/docs/content/flashcat-monitor/nightingale-v6/usage/alert/alert-rule/ 目录 前言一、配置1.1 创建钉钉机器人1.2 n9e 创建通知用户1.3 n9e 创建团队(组)1.4 将通知用户添加团队1.…...

【LeetCode2696】删除子串后的字符串最小长度

1、题目描述 【题目链接】 标签:栈 、字符串、模拟 难度:简单 给你一个仅由 大写 英文字符组成的字符串 s 。 你可以对此字符串执行一些操作,在每一步操作中,你可以从 s 中删除 任一个 “AB” 或 “CD” 子字符串。 通过执行操作…...

VMware安装CentOS7虚拟机

VMware 安装 获取 VMware 安装包 下载地址:链接:https://pan.baidu.com/s/1ELR5NZa7rO6YVplZ1IUigw?pwdplz3 提取码:plz3 包括:当然,也可以自己去别的地方下载,WMware 版本都差不多,现在用的比…...

Linux第22步_安装CH340驱动和串口终端软件MobaXterm

开发板输出信息通常是采用串口,而计算机通常是USB接口,为了让他们之间能够交换数据,我们通常采用USB转串口的转换器来实现。目前市场上的串口转换器大多是采用CH340芯片来实现的,因此我们需要在计算中安装一个CH340驱动程序&#…...

Elasticsearch 地理空间搜索 - 远超 OpenSearch

作者:来自 Elastic Nathan_Reese 2021 年,OpenSearch 和 OpenSearch Dashboards 开始作为 Elasticsearch 和 Kibana 的分支。 尽管 OpenSearch 和 OpenSearch Dashboards 具有相似的血统,但它们不提供相同的功能。 在分叉时,只能克…...

USB micro输入口中三个问题详解——差分信号、自恢复保险丝SMD1210P050TF、电容滤波

前言:本文对USB micro输入口中遇见的三个问题进行详解:差分信号、自恢复保险丝SMD1210P050TF、电容滤波 目录: 差分信号 自恢复保险丝SMD1210P050TF 电容滤波 如下图,USB为U-F-M5DD-Y-1型号(9个引脚,除…...

mysql原理--undo日志1

1.事务回滚的需求 我们说过 事务 需要保证 原子性 ,也就是事务中的操作要么全部完成,要么什么也不做。但是偏偏有时候事务执行到一半会出现一些情况,比如: (1). 事务执行过程中可能遇到各种错误,比如服务器本身的错误&…...

Zookeeper系列(一)集群搭建(非容器)

系列文章 Zookeeper系列(一)集群搭建(非容器) 目录 前言 下载 搭建 Data目录 Conf目录 集群复制和修改 启动 配置示例 测试 总结 前言 Zookeeper是一个开源的分布式协调服务,其设计目标是将那些复杂的且容易出错的分…...

【高等数学之泰勒公式】

一、从零开始 1.1、泰勒中值定理1 什么是泰勒公式?我们先看看权威解读: 那么我们从古至今到底是如何创造出泰勒公式的呢? 由上图可知,任一无穷小数均可以表示成用一系列数字的求和而得出的结果,我们称之为“无穷算法”。 那么同理我们想对任一曲线来…...

奇异值分解在图形压缩中的应用

奇异值分解在图形压缩中的应用 在研究奇异值分解的工程应用之前,我们得明白什么是奇异值?什么是奇异向量? 奇异值与奇异向量 概念:奇异值描述了矩阵在一组特定向量上的行为,奇异向量描述了其最大的作用方向。 奇异值…...

C++深入学习之STL:1、容器部分

标准模板库STL的组成 主要由六大基本组件组成:容器、迭代器、算法、适配器、函数对象(仿函数)以及空间配置器。 容器:就是用来存数据的,也称为数据结构。 本文要详述的是容器主要如下: 序列式容器:vector、list 关联…...

Javascript——vue下载blob文档流

<el-table-column label"操作" fixed"right" width"150" showOverflowTooltip><template slot-scope"scope"><el-button type"text" v-has"stbsd-gjcx-down" class"edit-button" click&…...

C# 的SequenceEqual

SequenceEqual 是 LINQ 扩展方法之一&#xff0c;用于比较两个序列&#xff08;如数组、列表等&#xff09;的元素是否相等。 该方法的详细定义如下&#xff1a; public static bool SequenceEqual<TSource>(this IEnumerable<TSource> first, IEnumerable<TS…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

业务系统对接大模型的基础方案:架构设计与关键步骤

业务系统对接大模型&#xff1a;架构设计与关键步骤 在当今数字化转型的浪潮中&#xff0c;大语言模型&#xff08;LLM&#xff09;已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中&#xff0c;不仅可以优化用户体验&#xff0c;还能为业务决策提供…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

JVM垃圾回收机制全解析

Java虚拟机&#xff08;JVM&#xff09;中的垃圾收集器&#xff08;Garbage Collector&#xff0c;简称GC&#xff09;是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象&#xff0c;从而释放内存空间&#xff0c;避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

Robots.txt 文件

什么是robots.txt&#xff1f; robots.txt 是一个位于网站根目录下的文本文件&#xff08;如&#xff1a;https://example.com/robots.txt&#xff09;&#xff0c;它用于指导网络爬虫&#xff08;如搜索引擎的蜘蛛程序&#xff09;如何抓取该网站的内容。这个文件遵循 Robots…...

土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等

&#x1f50d; 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术&#xff0c;可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势&#xff0c;还能有效评价重大生态工程…...

鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/

使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题&#xff1a;docker pull 失败 网络不同&#xff0c;需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...