学成在线 - 第3章任务补偿机制实现 + 分块文件清理

7.9 额外实现

7.9.1 任务补偿机制

问题:如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。

单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。

任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。

大家思考这个sql该如何实现?

大家尝试自己实现此任务补偿机制。

数据库表结构

media_process:

在这里插入图片描述

根据status字段判断视频文件是否正在处理,如果status值为4(正在处理),且当前时间减去create_date超过30分钟,就把status的值修改为3(处理失败),并更新其他字段的值。

具体实现流程
  1. 编写检查超时的sql

    SELECT * FROM `media_process_history`
    WHERE TIMESTAMPDIFF(MINUTE,create_date,finish_date) < 30
    
  2. 在media_process的mapper接口中添加相应的接口

    MediaProcessMapper.java

    /*** 查询是否有执行超过30分钟的视频处理任务* @param nowDate 当前任务执行的时间* @return List<MediaProcess>*/
    @Select("SELECT * FROM media_process_history WHERE TIMESTAMPDIFF(MINUTE,create_date,#{date}) > 30")
    List<MediaProcess> selectTimeoutProcess(@Param("date")Date nowDate);
    
  3. 实现service代码

    /*** 超时任务列表* @param shardIndex 分片序号* @param shardTotal 分片总数* @param count 获取记录数(cpu核心数)* @return List<MediaProcess>*/
    @Override
    public List<MediaProcess> getTimeoutMediaProcessList(int shardIndex, int shardTotal, int count) {List<MediaProcess> mediaTimeoutProcessList = mediaProcessMapper.selectTimeoutProcess(LocalDateTime.now(), shardIndex, shardTotal, count);return mediaTimeoutProcessList;
    }
    
  4. 业务逻辑在task中实现

    查询相应超时记录,把status和errMsg字段更新后保存到media_process表里。

    /*** 处理视频超时任务* @throws Exception*/
    @XxlJob("videoTimeoutJobHandler")
    public void videoTimeoutJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();List<MediaProcess> mediaTimeoutProcessList = null;int size = 0;try {// 取出cpu核心数作为一次检查超时的记录条数int availableProcessors = Runtime.getRuntime().availableProcessors();mediaTimeoutProcessList = mediaFileProcessService.getTimeoutMediaProcessList(LocalDateTime.now(), shardIndex, shardTotal, availableProcessors);size = mediaTimeoutProcessList.size();log.debug("取出的超时任务数量是 {} 条", size);if (size <= 0) {return ;}} catch (Exception e) {log.error("取出的超时任务超时", size);e.printStackTrace();}// 启动size个线程的线程池ExecutorService fixedThreadPool = Executors.newFixedThreadPool(size);// 定义size个插销CountDownLatch countDownLatch = new CountDownLatch(size);mediaTimeoutProcessList.forEach(mediaProcess -> {// 将任务加入线程池fixedThreadPool.execute(() -> {try {// 把status值设置为3,并把errMsg更新Long taskId = mediaProcess.getId();String fileId = mediaProcess.getFileId();mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "处理视频超时(30分钟)");log.debug("删除超时任务");} catch (Exception e) {e.printStackTrace();log.error("删除超时任务失败,失败原因: {}", e.getMessage());} finally {countDownLatch.countDown();}});});// 等待,给一个充裕的超时事件,防止无限等待,到达超时时间还没有处理完成则结束任务countDownLatch.await(30, TimeUnit.MINUTES);
    }
    

7.9.2 达到最大失败次数(暂时未实现)

问题:需要找到前端对应的接口。

当任务达到最大失败次数时一般就说明程序处理此视频存在问题,这种情况就需要人工处理,在页面上会提示失败的信息,人工可手动执行该视频进行处理,或通过其它转码工具进行视频转码,转码后直接上传mp4视频。

7.9.3 分块文件清理问题

上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?

1、在数据库中有一张文件表记录minio中存储的文件信息。

2、文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。(实现过程中并没有往文件表中插入正在上传的文件记录,只是在media_minio_files临时保存了分块信息)

3、当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。

实现思路

难点:怎么判断文件上传了一半不再上传了?上传分块文件只在上传视频时有用,上传分块文件时不会往media_files表里添加记录,只有所有分块上传完成,并合并成功后,才会往media_files里插入数据。所以要建立一个media_minio_files表,每上传一个分块就往media_minio_files表里插入一条分块信息,记录包括分块的上传时间。如果超过30分钟分块记录还没有被删除,说明上传到一半不传了,把minio里的分块目录删除,并删除对应的数据库里的记录。在文件合并成功后,数据库里的分块文件上传记录也要删除。

media_minio_files表结构

在这里插入图片描述

具体实现流程
  1. 上传文件块的时候,上传一个文件块完毕要把这个块的信息保存到表里。

    /*** 把文件分块信息入库* @param fileMd5 文件md5* @param fileName 文件名称* @param bucket minio桶* @param objectName minio中块的存储路径* @param chunkSize 块大小* @return MediaMinioFiles*/
    @Transactional
    @Override
    public MediaMinioFiles addMediaChunkToDb(String fileMd5, String fileName, String bucket, String objectName, Long chunkSize) {MediaMinioFiles mediaMinioFile = new MediaMinioFiles();mediaMinioFile.setFileId(fileMd5);mediaMinioFile.setFilename(fileName);mediaMinioFile.setBucket(bucket);mediaMinioFile.setFilePath(objectName);mediaMinioFile.setFileSize(chunkSize);mediaMinioFile.setStatus("4");  // 上传中int insert = mediaMinioFilesMapper.insert(mediaMinioFile);if (insert < 0) {log.error("保存分块文件信息到数据库失败,{}", mediaMinioFile.toString());XueChengPlusException.cast("保存分块文件信息失败");}log.debug("保存分块文件信息到数据库成功,{}", mediaMinioFile.toString());return mediaMinioFile;
    }/*** 上传分块* @param fileMd5 文件md5* @param chunk 分块序号* @param localChunkFilePath 分块文件本地路径* @return RestResponse*/
    @Override
    public RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {// 得到分块文件的目录路径String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);// 得到分块文件的路径String chunkFilePath = chunkFileFolderPath + chunk;// mimeTypeString mimeType = getMimeType(null);// 将文件存储到 minioboolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucketVideoFiles, chunkFilePath);// 将分块信息存到数据库中currentProxy.addMediaChunkToDb(fileMd5, null, bucketVideoFiles, chunkFilePath, 1024 * 5L);if (!b) {log.debug("上传分块文件失败: {}", chunkFilePath);return RestResponse.validfail(false, "上传分块失败");}log.debug("上传分块文件成功: {}", chunkFilePath);return RestResponse.success(true);
    }
    
  2. 如果没有上传失败,在成功合并块文件之后,要把上面数据库中记录删除。

    /*** 删除数据库中的分块文件上传记录* @param fileMd5 文件md5*/
    @Transactional
    public void clearChunkFromDb(String fileMd5) {LambdaQueryWrapper<MediaMinioFiles> deleteQueryWrapper = new LambdaQueryWrapper<>();deleteQueryWrapper.eq(MediaMinioFiles::getFileId, fileMd5);try {mediaMinioFilesMapper.delete(deleteQueryWrapper);log.debug("删除分块上传记录成功");} catch (Exception e) {e.printStackTrace();log.error("删除分块上传记录失败");}
    }
    

    在合并分块的最后几行:

    // 文件入库
    currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucketVideoFiles, mergeFilePath);
    // 清除文件分块
    clearChunkFiles(chunkFileFolderPath, chunkTotal);
    // 清除数据库中文件分块上传信息
    currentProxy.clearChunkFromDb(fileMd5);
    return RestResponse.success(true);
    
  3. 如果上传文件一半失败,要根据数据库中media_minio_files表中存的块的路径删除所有超时块。

    首先查询所有的超时块,得到上传的超时块列表:

    /*** 拿到数据库中所有上传超时的文件分块信息* @param time 当前任务执行时间* @return List<MediaMinioFiles>*/
    @Override
    public List<MediaMinioFiles> getChunkTimeoutFiles(LocalDateTime time) {List<MediaMinioFiles> mediaMinioFiles = mediaMinioFilesMapper.selectTimeoutChunks(DateUtil.toDateTime(time));return mediaMinioFiles;
    }
    

    删除minio中所有超时块,并且删除所有超时记录

    @XxlJob("videoChunkTimeoutJobHandler")
    public void videoChunkTimeoutJobHandler() {log.debug(">>>>>>>>>> 开始执行检查上传超时块任务");// 拿到所有的超时分块任务List<MediaMinioFiles> chunkTimeoutFiles = mediaFileService.getChunkTimeoutFiles(LocalDateTime.now());if (chunkTimeoutFiles == null) {  // 没有超时任务log.debug("没有超时任务");return ;}// 根据记录中的file_path,删除minio中的所有文件chunkTimeoutFiles.forEach(chunk -> {String filePath = chunk.getFilePath();mediaFileService.clearSingleChunkFile(filePath);});// 删除数据库中对应的记录mediaFileService.clearChunkFromDb(null, LocalDateTime.now());
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3018149.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

自动化机器学习——获得函数

自动化机器学习——获得函数 在自动化机器学习中&#xff0c;获得函数是一种用于优化算法的工具&#xff0c;它负责计算并返回待优化问题的值或梯度。本文将介绍获得函数的定义、作用、常用的获得函数&#xff0c;并通过Python实现示例代码来演示其效果&#xff0c;并最后进行…

最强特征点检测算法 DeDoDe v1/v2

论文地址v1:https://arxiv.org/pdf/2308.08479 论文地址v1:https://arxiv.org/pdf/2404.08928 代码地址:GitHub - Parskatt/DeDoDe: [3DV 2024 Oral] DeDoDe 🎶 Detect, Dont Describe --- Describe, Dont Detect, for Local Feature Matching 实测确实牛X! DeDoDeV1 关…

JAVA语言开发的(智慧校园系统源码)智慧校园的痛点、智慧校园的安全应用、智慧校园解决方案

一、智慧校园的痛点 1、信息孤岛问题&#xff1a;由于校园内各部门或系统独立开发&#xff0c;缺乏统一规划和标准&#xff0c;导致数据无法有效整合和共享&#xff0c;形成了信息孤岛。 2、技术更新与运维挑战&#xff1a;智慧校园的建设依赖于前沿的信息技术&#xff0c;如云…

网络网络层之(4)IPv4协议

网络网络层之(1)IPv4协议 Author: Once Day Date: 2024年4月4日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 全系列文档可参考专栏&#xff1a;通信网络技术_Once-Day的…

mysql workbench如何导出insert语句?

进行导出设置 导出的sql文件 CREATE DATABASE IF NOT EXISTS jeesite /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci */ /*!80016 DEFAULT ENCRYPTIONN */; USE jeesite; -- MySQL dump 10.13 Distrib 8.0.28, for Win64 (x86_64) -- -- Host: 127.0…

【算法刷题 | 贪心算法09】4.30(单调递增的数字)

文章目录 16.单调递增的数字16.1题目16.2解法&#xff1a;贪心16.2.1贪心思路16.2.2代码实现 16.单调递增的数字 16.1题目 当且仅当每个相邻位数上的数字 x 和 y 满足 x < y 时&#xff0c;我们称这个整数是单调递增的。 给定一个整数 n &#xff0c;返回 小于或等于 n 的…

基于Tornado开发高性能多人在线麻将游戏

(超清)基于Tornado开发高性能多人在线麻将游戏 基于Tornado开发高性能多人在线麻将游戏可以按以下步骤进行&#xff1a; 1.项目规划和设计&#xff1a; 确定游戏的功能和要求&#xff0c;包括用户登录、游戏大厅、匹配系统、实时游戏、聊天功能等。 设计游戏的数据模型和逻…

C语言——每日一题(轮转数组)

一.前言 前不久学习了时间复杂度的概念&#xff0c;便在力扣上刷了一道需要参考时间复杂度的题——轮转数组 https://leetcode.cn/problems/rotate-array/submissions这道题不能使用暴力算法&#xff0c;因为这道题对时间复杂度的要求不能为O&#xff08;N^2&#xff09;。因…

基于svm的手写数字识别程序介绍(matlab)

1、程序界面介绍 该程序GUI界面包括手写板、手写数字可视化&#xff08;原图&#xff09;、对图像进行灰度处理&#xff08;灰度图&#xff09;、图像二值化处理&#xff08;二值化&#xff09;、图像特征可视化&#xff08;HOG特征&#xff08;方向梯度直方图&#xff09;&…

Java进阶06List集合泛型

Java进阶06 集合 一、集合及其体系结构 集合是一个长度可变的容器 1、集合的体系结构 1.1 单列集合 单列集合使用add()方法添加集合元素&#xff0c;一次只能添加一个元素。 单列集合均实现了Collection接口&#xff0c;该接口还有两个子接口List和Set。 List接口 List集合…

文件各种上传,离不开的表单 [html5]

作为程序员的我们&#xff0c;经常会要用到文件的上传和下载功能。到了需要用的时候&#xff0c;各种查资料。有木有..有木有...。为了方便下次使用&#xff0c;这里来做个总结和备忘。 利用表单实现文件上传 最原始、最简单、最粗暴的文件上传。 前端代码&#xff1a; //方…

简单了解泛型

基本数据类型和对应的包装类 在Java中, 基本数据类型不是继承自Object, 为了在泛型代码中可以支持基本类型, Java给每个基本类型都对应了一个包装类型. 简单来说就是让基本数据类型也能面向对象.基本数据类型可以使用很多方法, 这就必须让它变成类. 基本数据类型对定的包装类…

[Linux][网络][TCP][四][流量控制][拥塞控制]详细讲解

目录 1.流量控制2.拥塞控制0.为什么要有拥塞控制&#xff0c;不是有流量控制么&#xff1f;1.什么是拥塞窗口&#xff1f;和发送窗口有什么关系呢&#xff1f;2.怎么知道当前网络是否出现了拥塞呢&#xff1f;3.拥塞控制有哪些算法&#xff1f;4.慢启动5.拥塞避免6.拥塞发生7.快…

LabelImg下载及目标检测数据标注

为什么这一部分内容这么少会单独拎出来呢&#xff0c;因为后期会接着介绍YOLOv8中的其他任务&#xff0c;会使用其他软件进行标注&#xff0c;所以就单独区分开来每一个任务的标注方式了。 这一部分就介绍目标检测任务的标注&#xff0c;数据集是我从COCO2017Val中抽出来两类&a…

矩阵相关运算1

矩阵运算是线性代数中的一个核心部分&#xff0c;它包含了许多不同类型的操作&#xff0c;可以应用于各种科学和工程问题中。 矩阵加法和减法 矩阵加法和减法需要两个矩阵具有相同的维度。操作是逐元素进行的&#xff1a; CAB or CA−B其中 A,B 和 C 是矩阵&#xff0c;且 C…

RTT PIN设备学习

获取GPIO编号 GET_PIN(port, pin)#define LED_BLUE_PIN GET_PIN(A, 0)设置引脚模式 void rt_pin_mode(rt_base_t pin, rt_base_t mode);设置引脚电平 void rt_pin_write(rt_base_t pin, rt_base_t value);rt_base_t pin 同上&#xff0c; 为引脚编号&#xff0c;尽量通过宏定…

新建的springBoot WEB项目无法自动返回html模版(gradle+kotlin版本)

最近研究了springBoot创建web项目&#xff0c; 第一步服务端返回字符串没有问题&#xff0c;第二步返回html时&#xff0c;还是返回的字符串。 文章目录 一、参考方案二、新建springBoot web项目三、启动项目的三种方式 一、参考方案 将控制器类的 RestController 改为 Contro…

代码随想录算法训练营第六十天| 647. 回文子串,516.最长回文子序列,动态规划总结篇

题目与题解 参考资料&#xff1a;动态规划总结篇 647. 回文子串 题目链接&#xff1a;647. 回文子串 代码随想录题解&#xff1a;647. 回文子串 视频讲解&#xff1a;动态规划&#xff0c;字符串性质决定了DP数组的定义 | LeetCode&#xff1a;647.回文子串_哔哩哔哩_bilibili …

【再探】设计模式—适配器、装饰及外观模式

结构型设计模式是用于设计对象和类之间关系的一组设计模式。一共有7种&#xff1a;适配器模式、装饰器模式、外观模式、桥接模式、组合模式、享元模式及代理模式。 1 适配器模式 需求&#xff1a;在软件维护阶段&#xff0c;已存在的方法与目标接口不匹配&#xff0c;需要个中…

每日一题5:Pandas-修改列

一、每日一题 一家公司决定增加员工的薪水。 编写一个解决方案&#xff0c;将每个员工的薪水乘以2来 修改 salary 列。 返回结果格式如下示例所示。 解答&#xff1a; import pandas as pddef modifySalaryColumn(employees: pd.DataFrame) -> pd.DataFrame:employees.loc[…