当前位置: 首页 > news >正文

4.4 媒资管理模块 - 分布式任务处理介绍、视频处理技术方案

媒资管理模块 - 视频处理

文章目录

  • 媒资管理模块 - 视频处理
  • 一、视频转码
    • 1.1 视频转码介绍
    • 1.2 FFmpeg 基本使用
      • 1.2.1 下载安装配置
      • 1.2.2 转码测试
    • 1.3 工具类
      • 1.3.1 VideoUtil
      • 1.3.2 Mp4VideoUtil
      • 1.3.3 测试工具类
  • 二、分布式任务处理
    • 2.1 分布式任务调度
    • 2.2 XXL-JOB 配置执行器 中间件
    • 2.3 搭建XXL-JOB
      • 2.3.1 调度中心
      • 2.3.2 执行器
      • 2.3.3 执行任务
    • 2.4 XXL-JOB 高级配置参数
    • 2.5 分片广播
      • 2.5.1 分片广播事例
  • 三、视频处理
    • 3.1 技术方案
      • 3.1.1 作业分片方案
      • 3.1.2 保证任务不重复执行
      • 3.1.3 视频处理方案

一、视频转码

1.1 视频转码介绍

视频转码是指的对视频文件的编码格式进行转换

视频上传成功需要对视频的格式进行转码处理,比如:avi转成mp4

一般做文件存储的服务都需要对文件进行处理,例如对视频进行转码处理,可能由于文件量较大需要使用多线程等技术进行高效处理

image-20231225211510533

文件格式:是指.mp4、.avi、.rmvb等 这些不同扩展名的视频文件的文件格式

视频文件的内容主要包括视频和音频,其文件格式是按照一 定的编码格式去编码,并且按照该文件所规定的封装格式将视频、音频、字幕等信息封装在一起,播放器会根据它们的封装格式去提取出编码,然后由播放器解码,最终播放音视频

音视频编码格式:通过音视频的压缩技术,将视频格式转换成另一种视频格式,通过视频编码实现流媒体的传输

目前最常用的编码标准是视频H.264,音频AAC

比如:

一个.avi的视频文件原来的编码是a,通过编码后编码格式变为b,

音频原来为c,通过编码后变为d

1.2 FFmpeg 基本使用

1.2.1 下载安装配置

我们Java程序员只需要调用流媒体程序员写的工具类即可完成对视频的操作,这个工具可能是c或c++写的

流媒体程序员:专门做视频处理类的东西

FFmpeg开源工具被许多开源项目采用,QQ影音、暴风影音、VLC等

下载链接:https://www.ffmpeg.org/download.html#build-windows

image-20231225213912242

image-20231225215831981

image-20231225215808180

最终下载之后三个exe文件

image-20231225220211056

查看是否安装成功

ffmpeg -v

image-20231225220243301

也可以把ffmpeg.exe文件配置在path环境变量中

image-20231225220551852

现在我们就可以在任意一个位置执行命令了

image-20231225220650604

1.2.2 转码测试

将avi文件转换成mp4文件

ffmpeg.exe -i  avi测试视频.avi 1.mp4

image-20231225221537200

image-20231225221551723

转成mp3

ffmpeg -i xxx.avi xxx1.mp3

转成gif

ffmpeg -i xxx.avi xxx1.gif

1.3 工具类

在xuecheng-plus-base工程添加此工具类

这份工具类其实就是流媒体程序员进行提供的

其实我们需要的是怎么调用ffmpeg.exe文件

1.3.1 VideoUtil

/*** 此文件作为视频文件处理父类,提供:* 1、查看视频时长* 2、校验两个视频的时长是否相等**/
public class VideoUtil {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安装位置public VideoUtil(String ffmpeg_path){this.ffmpeg_path = ffmpeg_path;}//检查视频时间是否一致public Boolean check_video_time(String source,String target) {String source_time = get_video_time(source);//取出时分秒source_time = source_time.substring(0,source_time.lastIndexOf("."));String target_time = get_video_time(target);//取出时分秒target_time = target_time.substring(0,target_time.lastIndexOf("."));if(source_time == null || target_time == null){return false;}if(source_time.equals(target_time)){return true;}return false;}//获取视频时间(时:分:秒:毫秒)public String get_video_time(String video_path) {/*ffmpeg -i  lucene.mp4*/List<String> commend = new ArrayList<String>();commend.add(ffmpeg_path);commend.add("-i");commend.add(video_path);try {ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//将标准输入流和错误输入流合并,通过标准输入流程读取信息builder.redirectErrorStream(true);Process p = builder.start();String outstring = waitFor(p);System.out.println(outstring);int start = outstring.trim().indexOf("Duration: ");if(start>=0){int end = outstring.trim().indexOf(", start:");if(end>=0){String time = outstring.substring(start+10,end);if(time!=null && !time.equals("")){return time.trim();}}}} catch (Exception ex) {ex.printStackTrace();}return null;}public String waitFor(Process p) {InputStream in = null;InputStream error = null;String result = "error";int exitValue = -1;StringBuffer outputString = new StringBuffer();try {in = p.getInputStream();error = p.getErrorStream();boolean finished = false;int maxRetry = 600;//每次休眠1秒,最长执行时间10分种int retry = 0;while (!finished) {if (retry > maxRetry) {return "error";}try {while (in.available() > 0) {Character c = new Character((char) in.read());outputString.append(c);System.out.print(c);}while (error.available() > 0) {Character c = new Character((char) in.read());outputString.append(c);System.out.print(c);}//进程未结束时调用exitValue将抛出异常exitValue = p.exitValue();finished = true;} catch (IllegalThreadStateException e) {Thread.currentThread().sleep(1000);//休眠1秒retry++;}}} catch (Exception e) {e.printStackTrace();} finally {if (in != null) {try {in.close();} catch (IOException e) {System.out.println(e.getMessage());}}}return outputString.toString();}public static void main(String[] args) throws IOException {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安装位置VideoUtil videoUtil = new VideoUtil(ffmpeg_path);String video_time = videoUtil.get_video_time("E:\\ffmpeg_test\\1.avi");System.out.println(video_time);}
}

1.3.2 Mp4VideoUtil

public class Mp4VideoUtil extends VideoUtil {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安装位置String video_path = "D:\\BaiduNetdiskDownload\\test1.avi";String mp4_name = "test1.mp4";String mp4folder_path = "D:/BaiduNetdiskDownload/Movies/test1/";public Mp4VideoUtil(String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path){super(ffmpeg_path);this.ffmpeg_path = ffmpeg_path;this.video_path = video_path;this.mp4_name = mp4_name;this.mp4folder_path = mp4folder_path;}//清除已生成的mp4private void clear_mp4(String mp4_path){//删除原来已经生成的m3u8及ts文件File mp4File = new File(mp4_path);if(mp4File.exists() && mp4File.isFile()){mp4File.delete();}}/*** 视频编码,生成mp4文件* @return 成功返回success,失败返回控制台日志*/public String generateMp4(){//清除已生成的mp4
//        clear_mp4(mp4folder_path+mp4_name);clear_mp4(mp4folder_path);/*ffmpeg.exe -i  lucene.avi -c:v libx264 -s 1280x720 -pix_fmt yuv420p -b:a 63k -b:v 753k -r 18 .\lucene.mp4*/List<String> commend = new ArrayList<String>();//commend.add("D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe");commend.add(ffmpeg_path);commend.add("-i");
//        commend.add("D:\\BaiduNetdiskDownload\\test1.avi");commend.add(video_path);commend.add("-c:v");commend.add("libx264");commend.add("-y");//覆盖输出文件commend.add("-s");commend.add("1280x720");commend.add("-pix_fmt");commend.add("yuv420p");commend.add("-b:a");commend.add("63k");commend.add("-b:v");commend.add("753k");commend.add("-r");commend.add("18");
//        commend.add(mp4folder_path  + mp4_name );commend.add(mp4folder_path  );String outstring = null;try {ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//将标准输入流和错误输入流合并,通过标准输入流程读取信息builder.redirectErrorStream(true);Process p = builder.start();outstring = waitFor(p);} catch (Exception ex) {ex.printStackTrace();}
//        Boolean check_video_time = this.check_video_time(video_path, mp4folder_path + mp4_name);Boolean check_video_time = this.check_video_time(video_path, mp4folder_path);if(!check_video_time){return outstring;}else{return "success";}}}

上面的代码中大多数是参数封装,真正调用FFmpeg的是下面几行

ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//将标准输入流和错误输入流合并,通过标准输入流程读取信息builder.redirectErrorStream(true);
Process p = builder.start();

1.3.3 测试工具类

我们可以测试一下,比如打开一下“咪咕视频”

    public static void main(String[] args) throws IOException {ProcessBuilder builder = new ProcessBuilder();//启动一下我本地的咪咕视频(路径中尽量不要含有中文)builder.command("D:\\soft\\MiguVideo\\MiGuApp.exe");//将标准输入流和错误输入流合并,通过标准输入流程读取信息builder.redirectErrorStream(true);Process p = builder.start();}     

image-20231225223633631

可以在此类中执行main函数调用一下此工具类是否能完成视频转码

    public static void main(String[] args) throws IOException {//ffmpeg的路径String ffmpeg_path = "";//ffmpeg的安装位置//源avi视频的路径String video_path = "";//转换后mp4文件的名称String mp4_name = "";//转换后mp4文件的路径String mp4_path = "";//创建工具类对象Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);//开始视频转换,成功将返回successString s = videoUtil.generateMp4();System.out.println(s);}

二、分布式任务处理

2.1 分布式任务调度

什么是任务调度

下面的场景就是一个调度方案

  • 每隔24小时执行数据备份任务。

  • 12306网站会根据车次不同,设置几个时间点分批次放票。

  • 某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。

  • 商品成功发货后,需要向客户发送短信提醒。

任务调度:对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务

我们可以将一个视频的转码理解为一个任务的执行,如果视频的数量比较多,如何去高效处理一批任务呢

  1. 多线程

多线程是充分利用单机的资源。

  1. 分布式加多线程

充分利用多台计算机,每台计算机使用多线程处理。每台计算机都在同时运行指定任务处理

方案2可扩展性更强,并且是一种分布式任务调度的处理方案。


什么是分布式任务调度

通常任务调度的程序是集成在应用中的,

比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,

结算服务中包括了定期生成报表的任务调度程序

由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:

img


分布式调度要实现的目标:

​ 不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力

1、并行任务调度

​ 并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。

​ 如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。

2、高可用

​ 若某一个实例宕机,不影响其他实例来执行任务。

3、弹性扩容

​ 当集群中增加实例就可以提高并执行任务的处理效率。

4、任务管理与监测

​ 对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。

5、避免任务重复执行

​ 当任务调度以集群方式部署,同一个任务调度可能会执行多次

比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次

2.2 XXL-JOB 配置执行器 中间件

我们只需要编写任务的执行逻辑即可,其他的部分都在中间件中

XXL-JOB是一个轻量级分布式任务调度平台

其核心设计目标是开发迅速、学习简单、轻量级、易扩展。

现已开放源代码并接入多家公司线上产品线,开箱即用。

官网:https://www.xuxueli.com/xxl-job/

文档:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B

XXL-JOB主要有调度中心、执行器、任务

调度中心:

​ 负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;

​ 主要职责为执行器管理、任务管理、监控运维、日志管理等

调度中心其实就是一个管理者

任务执行器:

​ 负责接收调度请求并执行任务逻辑;

​ 只要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等

任务执行器相当于分布式部署,两个执行器相当于两个人执行

**任务:**负责执行具体的业务处理。

image-20240104220931186

执行流程

  1. 任务执行器根据配置的调度中心的地址,自动注册到调度中心

调度中心要知道自己下面有多少个任务执行器

  1. 达到任务触发条件,调度中心下发任务

调度中心会根据任务的调度策略来下发任务

  1. 执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中

任务执行器可能会执行多个任务,所以要先将任务放入线程池中

  1. 执行器消费内存队列中的执行结果,主动上报给调度中心

任务执行器将执行结果异步上报给调度中心

也就是能够在调度中心里面必须能够拿到几点几分几秒,哪个执行器执行任务是成功还是失败的

  1. 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

其实就是调度中心主动查询任务执行器执行的任务是成功还是失败

image-20240106210933521

2.3 搭建XXL-JOB

调度中心负责给执行器下发任务,执行器负责执行任务

2.3.1 调度中心

首先下载XXL-JOB

GitHub:https://github.com/xuxueli/xxl-job

码云:https://gitee.com/xuxueli0323/xxl-job

项目使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1

image-20240106220104527

如果想本地运行的话,我们需要修改一些参数才可以运行

image-20240106220125453

包结构

xxl-job-admin:调度中心

xxl-job-core:公共依赖

xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)

:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;

:xxl-job-executor-sample-frameless:无框架版本;

image-20240106220258221

进入xxl-job

http://192.168.101.65:8088/xxl-job-admin/toLogin

账号:admin

密码:123456

image-20240106220436130

image-20240106220456289

2.3.2 执行器

在调度中心创建一个执行器

配置执行器,执行器负责与调度中心通信接收调度中心发起的任务调度请求

创建“执行器管理”,如下图所示

image-20240106221326237

此时没有一个java程序在执行任务,知识创建了一个执行器而已

image-20240106221441158

因为我们要在media工程的media-service工程中使用xxl-job,所以在media-service的pom文件中增加下面这个坐标

我们的执行器就是在media-service工程中编写

我们现在的目的是让执行器注册到调度中心,我们添加之后就注册到调度中心了

<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId>
</dependency>

在nacos下的media-service-dev.yaml下配置xxl-job

我们配置上坐标后还不能说是完整的注册到调度中心了,我们还需要告诉它调度中心在哪里,所以就需要下面调度中心的配置了

注意配置中的appname这是执行器的应用名

调度中心要给执行器下发任务,那执行器肯定得启动一个服务

port是执行器启动的端口,如果本地启动多个执行器注意端口不能重复。执行器启动起来后,调度中心会调用它

xxl:job:admin: addresses:  http://192.168.101.65:8088/xxl-job-adminexecutor:appname: testHandleraddress: ip: port: 9999logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30accessToken: default_token

将下面的配置复制到media-service工程

image-20240106224021611

@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}/*** 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;**      1、引入依赖:*          <dependency>*             <groupId>org.springframework.cloud</groupId>*             <artifactId>spring-cloud-commons</artifactId>*             <version>${version}</version>*         </dependency>**      2、配置文件,或者容器启动变量*          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'**      3、获取IP*          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();*/}

最终结果如下图所示

image-20240106224316384

2.3.3 执行任务

为什么要配置执行器呢

我们要让执行器执行任务

那我们怎么告诉执行器来执行什么样的任务呢?

如下图所示的地方有个事例,拷贝到我们自己的工程中

image-20240106225123709

第一步:定义任务类

/*** XxlJob开发示例(Bean模式)** 开发步骤:*      1、任务开发:在Spring Bean实例中,开发Job方法;*      2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。*      3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;*      4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;** @author xuxueli 2019-12-11 21:52:51*//*** 任务类*/
@Component
public class SampleXxlJob {private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);/*** 执行器拿到任务后就会执行这个方法* 具体的任务方法*/@XxlJob("demoJobHandler") //任务名称是demoJobHandlerpublic void demoJobHandler() throws Exception {System.out.println("处理视频.....");//任务执行逻辑...}/*** 执行器拿到任务后就会执行这个方法* 具体的任务方法*/@XxlJob("demoJobHandler2")public void demoJobHandler2() throws Exception {System.out.println("处理文档.....");//任务执行逻辑....}}

第二步:调度中心中注册任务

image-20240106231212119

调度类型:

  • 固定速度:每隔多长时间进行调度

image-20240106230109424

  • CRON:不仅可以配置每隔多长时间,还可以配置年月日时分秒

    image-20240106230146718

    image-20240106230243836

cron = “0/30 * * * * ?”

  • 从第0秒开始,每间隔30秒执行1次
  • 秒 分 时 日 月 周
  • 以秒为例
  • *:每秒都执行
  • 1-3:从第1秒开始执行,到第3秒结束执行
  • 0/3:从第0秒开始,每隔3秒执行1次
  • 1,2,3:在指定的第1、2、3秒执行
  • ?:不指定
  • 日和周不能同时制定,指定其中之一,则另一个设置为?

30 10 1 * * ? 每天1点10分30秒触发

0/30 * * * * ? 每30秒触发一次

* 0/10 * * * ? 每10分钟触发一次

第三步:启动任务

image-20240106231522370

第四步:观察控制台

image-20240106231557034

2.4 XXL-JOB 高级配置参数

XXL-JOB分布式调度平台包括调度中心和执行器,我们刚刚已经在media-service工程中创建了一个执行器,但是分布式任务调度要有多个执行器来执行任务,所以我们需要把执行器至少部署两个节点

怎么部署至少两个节点呢

将media-api工程运行两个即可

怎么让XXL-JOB调度多个集群(即多个执行器)进行执行任务呢

其实就是刚刚任务管理中的这些配置

image-20240106232631666

  • 路由策略

    我们有一个调度中心,三个任务执行器

    当我们规定的任务调度时间到了后调度中心就会下发任务,但是现在面临一个问题,这个任务分发给哪个任务执行器?

    这就需要我们配置路由策略了

    第一个:每次都会下发给第一个任务执行器

    最后一个:每次都会下发给第最后一个任务执行器

    轮训:每个人轮着来

    一致性HASH:我们的任务有一个id,会求此id的hash值,并且此hash值一定会是执行器中的其中一个

    最不经常使用:最不经常执行任务的执行器

    最近最久未使用:最近最不经常执行任务的执行器

    故障转移:任务路由策略选择"故障转移"情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。

    忙碌转移:某个执行器任务挺多或者正在忙,就会发送给其他执行器

    分片广播:执行器集群部署时,任务路由策略选择"分片广播"情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务;

    除了分片广播之外,都是一个任务由一个执行器进行执行,不能将执行能力发挥到最大

    分片广播可以实现将任务同时发送给多个任务执行器

image-20240106232754290

  • 子任务ID

    不经常使用

    当执行完一个任务又想执行第二个任务,此时第二个任务就是第一个任务的子任务

image-20240106233836748

  • 调度过期策略

    到了改调度的时候不知道什么原因没有调度

image-20240106233940570

  • 堵塞处理策略

    当前执行器在执行任务,但是任务调度中心又让此执行器进行执行任务2,此时任务2就被堵塞了

    单击串行:队列的形式,任务进行排队,执行器按次序执行任务

    丢弃后续调度:执行器正在执行任务但是新派了任务,不会干新派的任务

    覆盖之前调度:执行器正在执行任务但是新派了任务,会把当前的活终止,去做新的活

    image-20240106234015775

  • 任务超时时间

    假如我们任务订的10秒,但是15秒还没有执行完,那超时了就不执行了

image-20240106233955743

  • 失败重试次数

image-20240106234031806

2.5 分片广播

分片广播:执行器集群部署时,任务路由策略选择"分片广播"情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务

那这样处理同一批视频的话,会不会重复处理同一个视频?

不会

分片广播在给执行器分配任务的时候会给执行器分发序号

调度中心去广播的时候,会通知三个执行器执行任务

比如通知第0号执行器:把第零部分执行一下

比如通知第1号执行器:把第1部分执行一下

比如通知第2号执行器:把第2部分执行一下

此时各个执行器就能执行各个的任务

image-20240108222749814

作业分片适用哪些场景呢?

• 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;

• 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。

所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。

2.5.1 分片广播事例

如下所示的代码有一个示例,我们可以看一下

image-20240109201752815

可以把这个复制到我们的工程中

下面的代码需要做的就是告诉执行器编号,给每个执行器进行编号

/*** 2、分片广播任务*/@XxlJob("shardingJobHandler")public void shardingJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();//执行器的序号,从0开始int shardTotal = XxlJobHelper.getShardTotal();//执行器总数//只要有了上面两个参数,我们就可以人为确定我们执行器执行哪一部分System.out.println("shardIndex:"+shardIndex);System.out.println("shardTotal:"+shardTotal);}

我们需要两个执行器,那只能启动两个MediaApplication项目

其中对于media-service-dev工程一定要在nacos配置本地优先策略

spring:cloud:config:override-none: true

-Dserver.port=63051 配置一下启动端口

-Dxxl.job.executor.port=9998 配置执行器的端口

image-20240109203208973

启动两个执行器,如下图所示:

image-20240109203431565

然后查看一下调度中心,是否有两个执行器

image-20240109203526928

要想让执行器执行任务,还需要在调度中心的任务管理注册任务

image-20240109203925973

执行任务

image-20240109203951905

观察控制台情况

分别是0号执行器和1号执行器,总共两个执行器

image-20240109204236627

image-20240109204252924

三、视频处理

3.1 技术方案

3.1.1 作业分片方案

此时如何保证多个执行器不会查询到重复的任务呢

将待处理的文件进行编号

两个执行器实例那么分片总数为2,序号为0、1

从任务1开始,如下:

1 % 2 = 1 执行器2执行

2 % 2 = 0 执行器1执行

3 % 2 = 1 执行器2执行

以此类推.

image-20240109204635421

3.1.2 保证任务不重复执行

多个执行器在并行执行,怎么保证任务不重复执行呢

比如说0号执行器有1号任务和3号任务,但是此时调度中心又来进行调度,那1号任务和3号任务可能又执行了一遍

如果一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办

  • 配置调度过期策略

调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等;
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;

这里我们选择忽略,如果立即执行一次就可能重复执行相同的任务

image-20240109205643699

  • 配置阻塞处理策略

    阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;

阻塞处理策略就是当前执行器正在执行任务还没有结束时调度中心进行任务调度,此时该如何处理

单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

这里如果选择覆盖之前调度则可能重复执行任务,这里选择 丢弃后续调度来避免任务重复执行

image-20240109205902337

  • 保证任务处理的幂等性

任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的

某一个视频不管是调度多少次,只会转码一次

某个视频转码成功后再来调度就不会进行了,因为已经转码成功了

什么是幂等性?它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果

幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等

解决幂等性常用的方案

1)数据库约束,比如:唯一索引,主键。

2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。

3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。

基于以上分析,在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性,要有办法去判断该视频是否处理完成,如果正在处理中或处理完则不再处理。这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理

3.1.3 视频处理方案

边梳理整个视频上传及处理的业务流程

上传视频成功向视频处理待处理表添加记录

我们上传视频的代码已经做了,但是没有做向任务表插入一条待处理视频的任务

image-20240109211156990

视频处理的详细流程如下:

image-20240109211603845

1、任务调度中心广播作业分片。

2、执行器收到广播作业分片,从数据库读取待处理任务,读取未处理及处理失败的任务。

3、执行器更新任务为处理中,根据任务内容从MinIO下载要处理的文件。

4、执行器启动多线程去处理任务。

5、任务处理完成,上传处理后的视频到MinIO。

6、将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。

行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

这里如果选择覆盖之前调度则可能重复执行任务,这里选择 丢弃后续调度来避免任务重复执行

[外链图片转存中…(img-Bdh0beJE-1704806448805)]

  • 保证任务处理的幂等性

任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的

某一个视频不管是调度多少次,只会转码一次

某个视频转码成功后再来调度就不会进行了,因为已经转码成功了

什么是幂等性?它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果

幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等

解决幂等性常用的方案

1)数据库约束,比如:唯一索引,主键。

2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。

3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。

基于以上分析,在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性,要有办法去判断该视频是否处理完成,如果正在处理中或处理完则不再处理。这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理

相关文章:

4.4 媒资管理模块 - 分布式任务处理介绍、视频处理技术方案

媒资管理模块 - 视频处理 文章目录 媒资管理模块 - 视频处理一、视频转码1.1 视频转码介绍1.2 FFmpeg 基本使用1.2.1 下载安装配置1.2.2 转码测试 1.3 工具类1.3.1 VideoUtil1.3.2 Mp4VideoUtil1.3.3 测试工具类 二、分布式任务处理2.1 分布式任务调度2.2 XXL-JOB 配置执行器 中…...

K8S集群重新初始化--详细过程

K8S集群重新初始化 0、当前环境1、master节点1.1、在master节点执行下面reset命令&#xff1a;1.2、手动清除配置信息&#xff0c;这一步很关键&#xff1a;1.3、重新引导集群1.4、创建配置目录&#xff0c;并复制权限配置文件到用户目录下&#xff1a;1.5 查看集群状态1.6 安装…...

服务器 Linux常见指令

删除文件 删除文件 单个删除&#xff1a;rm -f 文件名 rm -f 2018_12_26.stderrout.log.060121612 --执行完成即将这个文件删除删除文件夹 rm -rf 路径/目录名tar命令 压缩 tar -cvf [文件名].tar [文件目录] //打包成.tar文件 tar -jcvf [文件名].tar.bz2 [文件目录]…...

C++合并K个有序链表

本篇博客介绍如何使用C合并k个有序链表&#xff0c;在代码中会用到std::priority_queue&#xff0c;首先需要介绍一下std::priority_queue的用法&#xff0c;介绍完std::priority_queue后将介绍如何使用std::priority_queue来辅助合并k个有序链表。 一、C priority_queue用法介…...

win10在启动游戏时报错,提示“d3dx9_25.dll文件丢失”,怎么办?d3dx9_25.dll丢失如何自动修复

一、d3dx9_25.dll文件是什么&#xff1f; d3dx9_25.dll是DirectX的一部分&#xff0c;DirectX是一种由微软开发的专门处理与多媒体、游戏程序和视频相关的应用程序接口。d3dx9_25.dll文件是DirectX9中一个重要的dll文件&#xff0c;主要负责处理3D图形程序&#xff0c;作用是帮…...

16. 蒙特卡洛强化学习基本概念与算法框架

文章目录 1. 是什么2. 有何优点3. 基本概念3.1 立即回报3.2 累积回报3.3 状态值函数3.4 行为值函数3.4 回合&#xff08;或完整轨迹&#xff0c;episode&#xff09;3.5 多个回合&#xff08;或完整轨迹&#xff09;的描述 4.MC强化学习问题的正式描述5. 蒙特卡洛&#xff08;M…...

QT中程序执行时间精准计算的三种方法及对比

一.QT程序在提升程序性能的调试中经常要计算一段程序的执行时间&#xff0c;下面介绍两种简单的实现方式&#xff0c;精确度都可以达到ms。 1.方式一 &#xff08;1&#xff09;代码&#xff1a; #include <QDateTime> qDebug() << "Current_date_and_tim…...

js下载方法分享*

JavaScript可以使用浏览器的API实现文件的下载&#xff0c;以下是一种常用的方法&#xff1a; 假设你已经有了一个文件 URL&#xff0c;你可以创建一个新的 a 标签&#xff0c;并将 href 属性设置为文件的 URL&#xff0c;然后模拟点击这个标签以开始下载。 function downloa…...

C# Stopwatch类_性能_时间计时器

文章只含部分属性方法等&#xff0c;有想了解全面的在下面链接中可以查看&#xff1a;.NET API browser Stopwatch 类 (System.Diagnostics) | Microsoft Learn 一、什么是Stopwatch Stopwatch&#xff1a;提供一组方法和属性&#xff0c;可以准确的测量运行时间。使用的时候需…...

鸿蒙原生应用再添新丁!天眼查 入局鸿蒙

鸿蒙原生应用再添新丁&#xff01;天眼查 入局鸿蒙 来自 HarmonyOS 微博1月12日消息&#xff0c;#天眼查启动鸿蒙原生应用开发#作为累计用户数超6亿的头部商业信息查询平台&#xff0c;天眼查可以为商家企业&#xff0c;职场人士以及普通消费者等用户便捷和安全地提供查询海量…...

HarmonyOS4.0——ArkUI应用说明

一、ArkUI框架简介 ArkUI开发框架是方舟开发框架的简称&#xff0c;它是一套构建 HarmonyOS / OpenHarmony 应用界面的声明式UI开发框架&#xff0c;它使用极简的UI信息语法、丰富的UI组件以及实时界面语言工具&#xff0c;帮助开发者提升应用界面开发效率 30%&#xff0c;开发…...

基于模块自定义扩展字段的后端逻辑实现(二)

目录 一&#xff1a;创建表 二&#xff1a;代码逻辑 上一节我们详细讲解了自定义扩展字段的逻辑实现和表的设计&#xff0c;这一节我们以一个具体例子演示下&#xff0c;如何实现一个订单模块的自定义扩展数据。 一&#xff1a;创建表 订单主表: CREATE TABLE t_order ( …...

图像中部分RGB矩阵可视化

图像中部分RGB可视化 今天室友有个需求就是模仿下面这张图画个示意图&#xff1a; 大致就是把图像中的一小部分区域的RGB值可视化了一下。他居然不知道该怎么画&#xff0c;我寻思这不直接秒了。 import cv2 as cv import numpy as np import matplotlib.pyplot as pltclass …...

RPA财务机器人在厦门市海沧医院财务管理流程优化汇总的应用

目前国内外研究人员对于RPA机器人在财务管理流程优化领域中的应用研究层出不穷&#xff0c;但现有研究成果主要集中在财务业务单一领域&#xff0c;缺乏财务管理整体流程一体化管控的研究。RPA机器人的功能绝非单一的财务业务处理&#xff0c;无论从自身技术发展&#xff0c;或…...

聚焦老年生活与健康,“老有所依·情暖夕阳”元岗街社区微型养老博览会顺利开展

尊老敬老是中华民族的传统美德&#xff0c; 爱老助老是全社会的共同责任。 家有一老&#xff0c;如有一宝&#xff0c; 长者的生活情况是一个家庭的头等大事&#xff0c; 做好长者服务是街道和社区的重要工作。 2024年1月6日&#xff0c;由元岗街道党工委、元岗街道办事处、…...

记录汇川:H5U与Factory IO测试12

主程序&#xff1a; 子程序&#xff1a; IO映射 子程序&#xff1a; 辅助出料 子程序&#xff1a; 自动程序 Factory IO配置&#xff1a; 实际动作如下&#xff1a; Factory IO测试12...

PingCAP 受邀参加 FICC 2023,获 Open100 世纪全球开源贡献奖

2023 年 12 月&#xff0c;2023 国际测试委员会智能计算与芯片联邦大会&#xff08;FICC 2023&#xff09;在海南三亚举办&#xff0c;中外院士和数十位领域专家莅临出席。 大会现场 &#xff0c;开放源代码促进会创始人 Bruce Perens 颁发了 Open100 世纪全球开源贡献奖&…...

10-skywalking告警

https://github.com/apache/skywalking/blob/master/docs/en/setup/backend/backend-alarm.md 5.1&#xff1a;告警指标 ~$ vim /apps/apache-skywalking-apm-bin/config/oal/core.oal service_resp_time # 服务的响应时间 service_sla # 服务http请求成功率SLV&#xff0c;比…...

vue前端开发自学,插槽练习第二次,name属性的使用

vue前端开发自学,插槽练习第二次,name属性的使用!可以使用name属性&#xff0c;来自定义一个名字&#xff0c;这样&#xff0c;就可以在一个组件内同时出现多个插槽的内容了。在子组件内接收的时候&#xff0c;很简答&#xff0c;只需要在slot标签里面加上name“mz”&#xff1…...

AI副业拆解:人像卡通化,赋予你的形象全新生命力

大家好我是在看&#xff0c;记录普通人学习探索AI之路。 &#x1f525;让你的形象瞬间穿越二次元&#xff01;&#x1f680;人像卡通化&#xff0c;捕捉你的独特魅力&#xff0c;让真实与梦幻在此刻交融。&#x1f3a8; 今天为大家介绍如何免费把人像卡通化--漫画风 https://w…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

leetcodeSQL解题:3564. 季节性销售分析

leetcodeSQL解题&#xff1a;3564. 季节性销售分析 题目&#xff1a; 表&#xff1a;sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理

引言 Bitmap&#xff08;位图&#xff09;是Android应用内存占用的“头号杀手”。一张1080P&#xff08;1920x1080&#xff09;的图片以ARGB_8888格式加载时&#xff0c;内存占用高达8MB&#xff08;192010804字节&#xff09;。据统计&#xff0c;超过60%的应用OOM崩溃与Bitm…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

FFmpeg:Windows系统小白安装及其使用

一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】&#xff0c;注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录&#xff08;即exe所在文件夹&#xff09;加入系统变量…...

永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器

一、原理介绍 传统滑模观测器采用如下结构&#xff1a; 传统SMO中LPF会带来相位延迟和幅值衰减&#xff0c;并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF)&#xff0c;可以去除高次谐波&#xff0c;并且不用相位补偿就可以获得一个误差较小的转子位…...