Task任务调度系统的设计实现与常见问题解决

Prog.文明 工具 阅读 1,963
赞 7 收藏
二维码
手机扫码查看
反馈

项目初期的技术选型

最近做的那个后台管理系统项目,需要处理大量的异步任务,比如数据导出、批量处理、定时同步这些。一开始想着直接用Promise手动管理,结果发现事情比想象中复杂多了。

Task任务调度系统的设计实现与常见问题解决

项目中遇到了这样的场景:用户点击导出按钮后,系统要在后台跑一个耗时的任务,然后把结果存到数据库,前端轮询状态直到完成。刚开始只是想当然地写了几个Promise链,后来发现如果有多个并行任务,还要考虑失败重试、任务取消、进度显示这些问题,手写的管理代码很快就变得一团糟。

后来调整了方案,决定引入一个任务管理库来统一处理这些异步操作。调研了一下,最终选择了custom-task-runner这个库,主要是因为它轻量,API也比较直观。

核心代码实现

集成进来之后,整体架构变成了这样:

class TaskManager {
  constructor() {
    this.tasks = new Map();
    this.runningTasks = new Set();
    this.taskQueue = [];
  }

  // 创建新任务
  createTask(taskId, taskFunction, options = {}) {
    const task = {
      id: taskId,
      fn: taskFunction,
      status: 'pending',
      progress: 0,
      result: null,
      error: null,
      createdAt: Date.now(),
      ...options
    };
    
    this.tasks.set(taskId, task);
    return taskId;
  }

  // 执行任务
  async executeTask(taskId) {
    if (!this.tasks.has(taskId)) {
      throw new Error(Task ${taskId} not found);
    }

    const task = this.tasks.get(taskId);
    this.runningTasks.add(taskId);
    task.status = 'running';

    try {
      const result = await task.fn((progress) => {
        task.progress = Math.min(progress, 95); // 最大95%,完成时才100
        this.updateProgress(taskId, progress);
      });
      
      task.result = result;
      task.status = 'completed';
      task.progress = 100;
    } catch (error) {
      task.error = error;
      task.status = 'failed';
      console.error(Task ${taskId} failed:, error);
    } finally {
      this.runningTasks.delete(taskId);
      this.taskQueue = this.taskQueue.filter(id => id !== taskId);
    }
  }

  // 添加到队列执行
  enqueue(taskId, priority = 0) {
    if (this.tasks.get(taskId).status === 'pending') {
      this.taskQueue.push({ id: taskId, priority });
      this.processQueue();
    }
  }

  // 处理队列中的任务
  async processQueue() {
    if (this.runningTasks.size >= 3) return; // 最多同时运行3个任务
    
    const pendingTask = this.taskQueue
      .filter(item => !this.runningTasks.has(item.id))
      .sort((a, b) => b.priority - a.priority)[0];

    if (pendingTask) {
      await this.executeTask(pendingTask.id);
      setTimeout(() => this.processQueue(), 100); // 处理下一个任务
    }
  }

  updateProgress(taskId, progress) {
    const task = this.tasks.get(taskId);
    if (task) {
      task.progress = progress;
    }
  }

  getTaskStatus(taskId) {
    return this.tasks.get(taskId);
  }

  cancelTask(taskId) {
    if (this.runningTasks.has(taskId)) {
      // 这里需要实现中断机制
      console.log(Cancelling running task: ${taskId});
    }
    const task = this.tasks.get(taskId);
    if (task) {
      task.status = 'cancelled';
    }
  }
}

// 全局实例
const taskManager = new TaskManager();
window.taskManager = taskManager; // 方便调试

实际使用的时候是这样的:

// 导出数据任务
async function exportDataTask(progressCallback) {
  const totalSteps = 100;
  
  for (let i = 0; i <= totalSteps; i++) {
    // 模拟数据处理
    await new Promise(resolve => setTimeout(resolve, 50));
    
    if (progressCallback) {
      progressCallback((i / totalSteps) * 95);
    }
  }
  
  // 模拟API调用
  const response = await fetch('https://jztheme.com/api/export', {
    method: 'POST',
    body: JSON.stringify({
      filename: report_${Date.now()}.xlsx,
      userId: localStorage.getItem('userId')
    })
  });
  
  return await response.json();
}

// 使用示例
function handleExportClick() {
  const taskId = taskManager.createTask(
    export_${Date.now()},
    exportDataTask,
    { type: 'data_export' }
  );
  
  taskManager.enqueue(taskId, 1); // 高优先级
  
  // 监听任务状态变化
  const interval = setInterval(() => {
    const task = taskManager.getTaskStatus(taskId);
    if (task) {
      document.getElementById('progress').textContent = ${Math.round(task.progress)}%;
      
      if (task.status === 'completed' || task.status === 'failed') {
        clearInterval(interval);
        if (task.status === 'completed') {
          alert('导出完成!');
          window.open(task.result.downloadUrl);
        } else {
          alert('导出失败,请重试');
        }
      }
    }
  }, 500);
}

最大的坑:并发控制和内存泄漏

开始没想到并发控制会有这么大的坑。最开始的版本没有限制同时运行的任务数量,用户点击多个导出按钮,系统就会瞬间启动几十个任务,浏览器直接卡死。这个问题折腾了半天才发现,CPU占用率达到100%,内存占用也疯涨。

后来加了并发控制,限制最多同时运行3个任务,这样体验就正常了。不过这里需要注意,如果某些任务特别耗时,可能会阻塞后面的队列,所以还得加上优先级控制。

另一个比较麻烦的问题是内存泄漏。每次创建任务都会生成闭包,如果任务特别多,GC压力会很大。这里踩过好几次坑,最后的解决方案是定期清理已完成的任务:

// 定期清理已完成的任务
setInterval(() => {
  const now = Date.now();
  for (const [id, task] of this.tasks.entries()) {
    if (task.status === 'completed' && now - task.createdAt > 300000) { // 5分钟后自动清理
      this.tasks.delete(id);
    }
  }
}, 60000); // 每分钟检查一次

API交互中的状态管理

和后端API的交互这部分也踩了不少坑。前端任务的状态要和后端保持一致,特别是长时间运行的任务,需要轮询后端获取最新状态。

// 同步后端状态
async function syncBackendTask(taskId) {
  try {
    const response = await fetch(https://jztheme.com/api/task/${taskId}, {
      headers: {
        'Authorization': Bearer ${localStorage.getItem(&#039;token&#039;)}
      }
    });
    
    const backendStatus = await response.json();
    const localTask = taskManager.getTaskStatus(taskId);
    
    if (localTask) {
      localTask.progress = backendStatus.progress;
      localTask.status = backendStatus.status;
      localTask.result = backendStatus.result;
    }
  } catch (error) {
    console.error('Failed to sync backend task:', error);
  }
}

这里有个问题就是轮询频率不好控制,太频繁会影响服务器性能,太慢用户体验又不好。最后设置成根据任务进度动态调整轮询间隔:

// 动态轮询
function startDynamicPolling(taskId) {
  let pollInterval = 1000; // 起始间隔1秒
  const maxInterval = 10000; // 最大间隔10秒
  
  const poll = async () => {
    const task = taskManager.getTaskStatus(taskId);
    
    if (task && task.status === 'running') {
      await syncBackendTask(taskId);
      
      // 根据进度调整轮询间隔
      if (task.progress > 80) {
        pollInterval = Math.min(pollInterval * 1.2, maxInterval);
      } else {
        pollInterval = 1000; // 前期快速响应
      }
      
      setTimeout(poll, pollInterval);
    }
  };
  
  poll();
}

最终效果评估

最终跑起来效果还不错,用户反馈说任务处理比之前流畅多了。最大的改进是前端不会因为大量任务而卡死,而且有明确的进度提示,用户体验提升明显。

但是这个方案也不是完美无缺的。任务取消功能还有些问题,对于已经在执行的异步操作很难真正中断,只能标记为已取消。另外,复杂的任务依赖关系还没有很好的支持,这个在后续版本中可能要考虑加入。

性能方面,经过优化后CPU占用率基本维持在正常范围,内存泄漏的问题也通过定时清理得到了缓解。不过对于特别大量的任务队列,还是建议分批处理。

一些小优化

还有一些小的优化点,比如添加了任务分组功能,不同类型的任务可以在UI上分开显示:

// 任务分组统计
function getTaskStatsByType() {
  const stats = {};
  
  for (const task of this.tasks.values()) {
    if (!stats[task.type]) {
      stats[task.type] = { 
        total: 0, 
        completed: 0, 
        running: 0, 
        failed: 0 
      };
    }
    
    stats[task.type].total++;
    
    switch (task.status) {
      case 'completed':
        stats[task.type].completed++;
        break;
      case 'running':
        stats[task.type].running++;
        break;
      case 'failed':
        stats[task.type].failed++;
        break;
    }
  }
  
  return stats;
}

这个功能让用户能更好地了解不同类型任务的执行情况,对于运维和问题排查都有帮助。

回顾与反思

这次任务系统的重构算是比较成功的,至少解决了之前的手动管理混乱问题。虽然还有一些细节可以优化,比如更好的错误恢复机制、更细粒度的权限控制等,但整体架构已经比较稳定了。

最大的收获是意识到异步任务管理不能想当然地处理,一定要考虑到并发、状态同步、资源消耗这些实际问题。现在回头看之前的代码,确实太天真了。

以上是我踩坑后的总结,希望对你有帮助。这个方案应该还有不少优化空间,有更好实现方式的朋友欢迎交流。

本文章不代表JZTHEME立场,仅为作者个人观点 / 研究心得 / 经验分享,旨在交流探讨,供读者参考。
发表评论

暂无评论