GB28181学习(十七)——基于jrtplib实现tcp被动和主动发流
前言
GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185
基于jrtplib实现tcp被动和主动收流介绍:https://blog.csdn.net/www_dong/article/details/134451387
本文主要介绍下级平台或设备发流功能,用于对接特定的SIP服务器或上级平台。
UDP发流
流程图

发送端流程:
- 初始化rtp参数;
- 裸流数据做PS复用;
- 组RTP包发送;
设计
- 初始化rtp参数
int CUdp::InitRtp_()
{RTPSessionParams sessionParams;sessionParams.SetMinimumRTCPTransmissionInterval(10);sessionParams.SetOwnTimestampUnit(1.0 / 90000.0);sessionParams.SetAcceptOwnPackets(true);sessionParams.SetMaximumPacketSize(1450);RTPUDPv4TransmissionParams transParams;transParams.SetRTPSendBuffer(2*1024*1024);transParams.SetBindIP(m_ip);transParams.SetPortbase((uint16_t)m_port);if (0 != Create(sessionParams, &transParams)){return -1;}SetDefaultPayloadType((uint8_t)m_payload);SetDefaultTimestampIncrement(3600);SetDefaultMark(true);RTPIPv4Address addr(ntohl(inet_addr(m_ip), (uint16_t)m_port);if(0 != AddDestination(addr)){return -1;}return 0;
}
- 流数据复用为PS
// 使用ireader开源库进行ps复用
// 初始化
CData2PS::CData2PS()
{struct ps_muxer_func_t func;func.alloc = Alloc;func.free = Free;func.write = Packet;m_ps = ps_muxer_create(&func, this);// TODO codecid待补充m_ps_stream = ps_muxer_add_stream(m_ps, PSI_STREAM_H264, nullptr, 0);
}// 塞数据
int CData2PS::InputData(void* data, int len)
{if (!m_ps)return -1;uint64_t clock = time64_now();if (0 == m_ps_clock)m_ps_clock = clock;return ps_muxer_input(m_ps, m_ps_stream, 0, (clock - m_ps_clock) * 90, (clock - m_ps_clock) * 90, data, len);
}
- 发送rtp包
// 调用jrtplib中SendPacket(data, len);接口发送数据// 以下为SendPacket部分源码
// 主要流程:
// 1. 构建packet
// 2. 发送rtp数据
int RTPSession::SendPacket(const void *data,size_t len,uint8_t pt,bool mark,uint32_t timestampinc)
{int status;if (!created)return ERR_RTP_SESSION_NOTCREATED;BUILDER_LOCKif ((status = packetbuilder.BuildPacket(data,len,pt,mark,timestampinc)) < 0){BUILDER_UNLOCKreturn status;}if ((status = SendRTPData(packetbuilder.GetPacket(),packetbuilder.GetPacketLength())) < 0){BUILDER_UNLOCKreturn status;}BUILDER_UNLOCKSOURCES_LOCKsources.SentRTPPacket();SOURCES_UNLOCKPACKSENT_LOCKsentpackets = true;PACKSENT_UNLOCKreturn 0;
}// 构建包
int RTPPacketBuilder::PrivateBuildPacket(const void *data,size_t len,uint8_t pt,bool mark,uint32_t timestampinc,bool gotextension,uint16_t hdrextID,const void *hdrextdata,size_t numhdrextwords)
{RTPPacket p(pt,data,len,seqnr,timestamp,ssrc,mark,numcsrcs,csrcs,gotextension,hdrextID,(uint16_t)numhdrextwords,hdrextdata,buffer,maxpacksize,GetMemoryManager());int status = p.GetCreationError();if (status < 0)return status;packetlength = p.GetPacketLength();if (numpackets == 0) // first packet{lastwallclocktime = RTPTime::CurrentTime();lastrtptimestamp = timestamp;prevrtptimestamp = timestamp;}else if (timestamp != prevrtptimestamp){lastwallclocktime = RTPTime::CurrentTime();lastrtptimestamp = timestamp;prevrtptimestamp = timestamp;}numpayloadbytes += (uint32_t)p.GetPayloadLength();numpackets++;timestamp += timestampinc;seqnr++;return 0;
}// 发送包
int RTPSession::SendRTPData(const void *data, size_t len)
{if (!m_changeOutgoingData)return rtptrans->SendRTPData(data, len);void *pSendData = 0;size_t sendLen = 0;int status = 0;status = OnChangeRTPOrRTCPData(data, len, true, &pSendData, &sendLen);if (status < 0)return status;if (pSendData){status = rtptrans->SendRTPData(pSendData, sendLen);OnSentRTPOrRTCPData(pSendData, sendLen, true);}return status;
}// 底层实现
int RTPUDPv4Transmitter::SendRTPData(const void *data,size_t len)
{if (!init)return ERR_RTP_UDPV4TRANS_NOTINIT;MAINMUTEX_LOCKif (!created){MAINMUTEX_UNLOCKreturn ERR_RTP_UDPV4TRANS_NOTCREATED;}if (len > maxpacksize){MAINMUTEX_UNLOCKreturn ERR_RTP_UDPV4TRANS_SPECIFIEDSIZETOOBIG;}destinations.GotoFirstElement();while (destinations.HasCurrentElement()){// 调用sendto函数实现udp包的发送sendto(rtpsock,(const char *)data,len,0,(const struct sockaddr *)destinations.GetCurrentElement().GetRTPSockAddr(),sizeof(struct sockaddr_in));destinations.GotoNextElement();}MAINMUTEX_UNLOCKreturn 0;
}
tcp passive发流
流程图

发送端流程:
- 上级平台或sip服务器以主动方式连接,对于下级平台或者设备(数据发送端)为被动方式;
- 下级平台或者设备(数据发送端)启动端口监听;
- 接收上级平台或sip服务器tcp连接请求;
- 向上级平台或sip服务器发送流数据;
设计
- 创建socket、bind、listen,启动数据接收线程;
// TcpServer为封装的socket类int CGBTcpServer::Start()
{if (0 != m_localPort || m_tcpServer.get())return 0;int ret = -1;do {m_tcpServer = std::make_shared<TcpServer>(nullptr, this);if (!m_tcpServer.get())break;ret = m_tcpServer->TcpCreate();if (0 != ret)break;ret = m_tcpServer->TcpBind(m_localPort);if (0 != ret)break;ret = m_tcpServer->TcpListen(5);if (0 != ret)break;m_thread = std::thread(TCPData2PSThread, this);return 0;} while (0);Stop();return ret;
}
- 在线程内等待连接,连接成功后接收数据并回调至应用层处理
void CGBTcpServer::TCPData2PSWorker()
{if (!m_pspacker)m_pspacker = new(std::nothrow) CData2PS(PSTCPDataCB, this);bool bAccept = false;while (m_running){if (!bAccept){if (0 == m_tcpServer->TcpAccept()){bAccept = true;if (0 != InitRtp_()){break;}}continue;}std::this_thread::sleep_for(std::chrono::seconds(1));}
}
- 初始化rtp参数
int CGBTcpServer::InitRtp_()
{const int packetSize = 45678;RTPSessionParams sessionparams;sessionparams.SetProbationType(RTPSources::NoProbation);sessionparams.SetOwnTimestampUnit(1.0 / packetSize);sessionparams.SetMaximumPacketSize(packetSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);int status = Create(sessionparams, m_rtpTcpTransmitter);if (status < 0){return status;}status = AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket()));if (0 != status)return status;SetDefaultPayloadType(96);SetDefaultMark(false);SetDefaultTimestampIncrement(160);return 0;
}
- 将数据复用为PS;
- tcp方式发包
// 调用jrtplib中SendPacket(data, len);接口发送数据// 以下为tcp方式SendPacket部分源码
int RTPTCPTransmitter::SendRTPData(const void *data,size_t len)
{return SendRTPRTCPData(data, len);
}int RTPTCPTransmitter::SendRTPRTCPData(const void *data, size_t len)
{if (!m_init)return ERR_RTP_TCPTRANS_NOTINIT;MAINMUTEX_LOCKif (!m_created){MAINMUTEX_UNLOCKreturn ERR_RTP_TCPTRANS_NOTCREATED;}// #define RTPTCPTRANS_MAXPACKSIZE 65535if (len > RTPTCPTRANS_MAXPACKSIZE){MAINMUTEX_UNLOCKreturn ERR_RTP_TCPTRANS_SPECIFIEDSIZETOOBIG;}std::map<SocketType, SocketData>::iterator it = m_destSockets.begin();std::map<SocketType, SocketData>::iterator end = m_destSockets.end();vector<SocketType> errSockets;int flags = 0;
#ifdef RTP_HAVE_MSG_NOSIGNALflags = MSG_NOSIGNAL;
#endif // RTP_HAVE_MSG_NOSIGNALwhile (it != end){uint8_t lengthBytes[2] = { (uint8_t)((len >> 8)&0xff), (uint8_t)(len&0xff) };SocketType sock = it->first;// 调用send接口发送数据// 1. 先发送2字节头(固定格式)// 2. 再发送数据if (send(sock,(const char *)lengthBytes,2,flags) < 0 ||send(sock,(const char *)data,len,flags) < 0)errSockets.push_back(sock);++it;}MAINMUTEX_UNLOCKif (errSockets.size() != 0){for (size_t i = 0 ; i < errSockets.size() ; i++)OnSendError(errSockets[i]);}// Don't return an error code to avoid the poll thread exiting// due to one closed connection for examplereturn 0;
}
tcp active发流
流程图

发送端流程:
- 上级平台或sip服务器启动tcp监听连接,对于下级平台或者设备(数据发送端)为主动方式;
- 下级平台或者设备(数据发送端)发起tcp连接;
- 接收上级平台或sip服务器tcp响应;
- 向上级平台或sip服务器发送流数据;
设计
- 创建socket、connect、初始化rtp,启动数据接收线程
// TcpClient为封装的客户端socket类int CGBTcpClient::Start()
{if (0 != m_localPort || m_tcpClient.get())return 0;int ret = -1;do{m_tcpClient = std::make_shared<TcpClient>(nullptr, this);if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())break;ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);if (0 != ret)break;ret = InitRtp_();if (0 != ret)break;m_thread = std::thread(RTPPackerThread, this);return 0;} while (0);Stop();return ret;
}
- 初始化rtp参数
int CGBTcpClient::InitRtp_()
{const int packSize = 45678;RTPSessionParams sessionParams;sessionParams.SetProbationType(RTPSources::NoProbation);sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);sessionParams.SetMaximumPacketSize(packSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);if (0 != Create(sessionParams, m_rtpTcpTransmitter))return -1;if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))return -1;return 0;
}
- 视音频数据复用为PS
- 发送数据,同tcp passive发流
相关文章:
GB28181学习(十七)——基于jrtplib实现tcp被动和主动发流
前言 GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185 基于jrtplib实现tcp被动和主动收流介绍:https://blog.csdn.net/www_dong/article/details/134451387 本文主要介绍下级平台或设备发流功能&#…...
RealSense深度相机在Ubuntu18.04的ros环境下,保存同一时刻下深度图像和彩色图像
背景:Ubuntu18.04 ROS Melodic 已安装配置好RealSense相关程序,链接D435i相机后,得到如下Rostopic: /camera/color/image_raw # 彩色图像信息 /camera/depth/image_rect_raw # 深度图像信息 于是写一个python程序&am…...
vue3 ref和reactive使用watch属性的方法和区别
在Vue 3中,您可以使用watch函数和watch选项来监视ref和reactive创建的响应式数据的变化。下面是它们的使用方法和区别: 使用方法: 使用ref和watch: import { ref, watch } from vue;const count ref(0);watch(count, (newVal,…...
YOLO目标检测——卫星遥感舰船检测数据集下载分享【含对应voc、coco和yolo三种格式标签】
实际项目应用:卫星遥感舰船检测数据集说明:卫星遥感舰船检测数据集,真实场景的高质量图片数据,数据场景丰富,含船一个类别标签说明:使用lableimg标注软件标注,标注框质量高,含voc(xm…...
org.Hs.eg.db使用--持续修改
org.Hs.eg.db使用–持续修改 加载 library(org.Hs.eg.db)1 基本信息查询 1.1 display the columns columns(org.Hs.eg.db) [1] "ACCNUM" "ALIAS" "ENSEMBL" "ENSEMBLPROT" "ENSEMBLTRANS" "ENT…...
C# Onnx 百度PaddleSeg发布的实时人像抠图PP-MattingV2
目录 效果 模型信息 项目 代码 下载 效果 图片源自网络侵删 模型信息 Inputs ------------------------- name:img tensor:Float[1, 3, 480, 640] --------------------------------------------------------------- Outputs -----------------…...
linux shell操作 - 04 进程间通信
文章目录 Signal 信号信号定义信号的生命周期信号分类linux进程通信案例 Signal 信号 信号定义 Linux信号是进程间通信的一种方式,通过向目标进程发送一个特定的信号,让其执行相应的处理操作; 向目标进程发送信号时,内核会将信号…...
【Java并发】聊聊线程池原理以及实际应用
线程其实对于操作系统来说是宝贵的资源,java层面的线程其实本质还是依赖于操作系统内核的线程进行处理任务,如果频繁的创建、使用、销毁线程,那么势必会非常浪费资源以及性能不高,所以池化技术(数据库连接池、线程池&a…...
自然语言处理常用方法和评价指标
常用方法 文本分类:如情感分析、主题标签分类。使用方法如朴素贝叶斯、支持向量机、神经网络等。信息提取:从文本中提取结构化信息,如命名实体识别(NER)、关系提取。语义分析:理解文本的含义,包…...
FFmpeg常用命令行讲解及实战一
文章目录 前言一、学习资料参考二、FFmpeg 选项1、主要选项①、主要命令选项②、举例 2、视频选项①、主要命令选项②、举例1)提取固定帧2)禁止输出视频3)指定视频的纵横比 3、音频选项①、主要命令选项②、举例 4、字幕选项①、主要命令选项…...
Java的ArrayList中关于删除的常用操作及方法
目录 remove(int index)方法 remove(Object o)方法 removeAll(Collection c)方法 removeIf(Predicate filter)方法 removeRange(int fromIndex, int toIndex)方法 remove(int index)方法 remove(int index)是ArrayList类中用于删除指定位置元素的方法。它接收一个整…...
低成本打造便携式无线网络攻防学习环境
1.摘要 一直以来, 无线网络安全问题与大众的个人隐私息息相关, 例如: 为了节省流量, 连接到一个看似安全的免费WiFi, 在使用过程中泄露自己的各类密码信息甚至银行卡账号密码信息。随着家用智能电器的普及, 家中的各类智能设备连入家里的无线网络, 却突然失灵, 甚至无法正常连…...
Qt|QLabel显示刷新图像数据
参考:QImage、QClipboard(https://zhuanlan.zhihu.com/p/649611141) 获取图像数据并转换为QImage unsigned char *data 图像数据; QImage show_image_ QImage(data, imgInfo.width, imgInfo.height, imgInfo.width, QImage::Format_Grays…...
Java类加载那些事
Java源文件(.java文件)被编译器编译后变为字节码形式的类文件(.class文件),Java类加载的过程就是JVM加载.class的二进制文件并且放到内存中,将数据放到方法区,并且在堆区构造一个java.lang.clas…...
QSplitter分裂器
QSplitter QSplitter 是 Qt 框架提供的一个小部件(widget),用于在用户界面中创建可拖动的分割窗口,允许用户调整子部件的大小和布局。它可以将父部件分割为多个可调整大小的子部件,使用户能够自定义界面的布局和大小。…...
pgsql 时区查看和修改
建议使用UTC时区,或者和linux、后端程序的时区保持一致,否则容易出现时间的差别。 pgsql的时间字段有一个带时区的timestamp with time zone,如果业务涉及多个时区,建议使用这个字段。 相关链接参考: linux时区设置和…...
el-table 表格表头、单元格、滚动条样式修改
.2023.11.21今天我学习了如何对el-table表格样式进行修改,如图: 运用的两个样式主要是 1.header-cell-class-name(设置表头) 2.class-name(设置行单元格) 代码如下: <el-table :data&quo…...
dockerDesktop使用方法
安装软件 装在C盘会容易满,可以装在D盘, "path\to\Docker Desktop Installer.exe" install -accept-license --installation-dirD:\Docker\Docker --wsl-default-data-rootD:\Docker\data并且在软件的设置的Docker Engine里添加阿里镜像源…...
[Ubuntu]RT810xE--网线已拔出--问题解决
0 环境 ubuntu 22.04.3 LTSDell Inspiron 15 5547windows/ubuntu 双系统 1 问题说明 Dell 笔记本安装的 Ubutun 系统,有线网络无法使用,一直显示 “网线已拔出”。 网上一查,才了解到主要原因:网卡驱动安装错误。系统默认安装…...
美国DDoS服务器:如何保护你的网站免遭攻击?
在当今数字化时代,互联网已经成为人们生活中不可或缺的一部分。随着互联网的普及和发展,网络安全问题也日益严重。其中,DDoS攻击是目前最常见和具有破坏性的网络攻击之一。那么,如何保护你的网站免遭DDoS攻击呢?下面将介绍…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台
🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...
【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...
[ACTF2020 新生赛]Include 1(php://filter伪协议)
题目 做法 启动靶机,点进去 点进去 查看URL,有 ?fileflag.php说明存在文件包含,原理是php://filter 协议 当它与包含函数结合时,php://filter流会被当作php文件执行。 用php://filter加编码,能让PHP把文件内容…...
什么是VR全景技术
VR全景技术,全称为虚拟现实全景技术,是通过计算机图像模拟生成三维空间中的虚拟世界,使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验,结合图文、3D、音视频等多媒体元素…...
