Task任务调度系统的设计实现与常见问题解决
项目初期的技术选型
最近做的那个后台管理系统项目,需要处理大量的异步任务,比如数据导出、批量处理、定时同步这些。一开始想着直接用Promise手动管理,结果发现事情比想象中复杂多了。
项目中遇到了这样的场景:用户点击导出按钮后,系统要在后台跑一个耗时的任务,然后把结果存到数据库,前端轮询状态直到完成。刚开始只是想当然地写了几个Promise链,后来发现如果有多个并行任务,还要考虑失败重试、任务取消、进度显示这些问题,手写的管理代码很快就变得一团糟。
后来调整了方案,决定引入一个任务管理库来统一处理这些异步操作。调研了一下,最终选择了custom-task-runner这个库,主要是因为它轻量,API也比较直观。
核心代码实现
集成进来之后,整体架构变成了这样:
class TaskManager {
constructor() {
this.tasks = new Map();
this.runningTasks = new Set();
this.taskQueue = [];
}
// 创建新任务
createTask(taskId, taskFunction, options = {}) {
const task = {
id: taskId,
fn: taskFunction,
status: 'pending',
progress: 0,
result: null,
error: null,
createdAt: Date.now(),
...options
};
this.tasks.set(taskId, task);
return taskId;
}
// 执行任务
async executeTask(taskId) {
if (!this.tasks.has(taskId)) {
throw new Error(Task ${taskId} not found);
}
const task = this.tasks.get(taskId);
this.runningTasks.add(taskId);
task.status = 'running';
try {
const result = await task.fn((progress) => {
task.progress = Math.min(progress, 95); // 最大95%,完成时才100
this.updateProgress(taskId, progress);
});
task.result = result;
task.status = 'completed';
task.progress = 100;
} catch (error) {
task.error = error;
task.status = 'failed';
console.error(Task ${taskId} failed:, error);
} finally {
this.runningTasks.delete(taskId);
this.taskQueue = this.taskQueue.filter(id => id !== taskId);
}
}
// 添加到队列执行
enqueue(taskId, priority = 0) {
if (this.tasks.get(taskId).status === 'pending') {
this.taskQueue.push({ id: taskId, priority });
this.processQueue();
}
}
// 处理队列中的任务
async processQueue() {
if (this.runningTasks.size >= 3) return; // 最多同时运行3个任务
const pendingTask = this.taskQueue
.filter(item => !this.runningTasks.has(item.id))
.sort((a, b) => b.priority - a.priority)[0];
if (pendingTask) {
await this.executeTask(pendingTask.id);
setTimeout(() => this.processQueue(), 100); // 处理下一个任务
}
}
updateProgress(taskId, progress) {
const task = this.tasks.get(taskId);
if (task) {
task.progress = progress;
}
}
getTaskStatus(taskId) {
return this.tasks.get(taskId);
}
cancelTask(taskId) {
if (this.runningTasks.has(taskId)) {
// 这里需要实现中断机制
console.log(Cancelling running task: ${taskId});
}
const task = this.tasks.get(taskId);
if (task) {
task.status = 'cancelled';
}
}
}
// 全局实例
const taskManager = new TaskManager();
window.taskManager = taskManager; // 方便调试
实际使用的时候是这样的:
// 导出数据任务
async function exportDataTask(progressCallback) {
const totalSteps = 100;
for (let i = 0; i <= totalSteps; i++) {
// 模拟数据处理
await new Promise(resolve => setTimeout(resolve, 50));
if (progressCallback) {
progressCallback((i / totalSteps) * 95);
}
}
// 模拟API调用
const response = await fetch('https://jztheme.com/api/export', {
method: 'POST',
body: JSON.stringify({
filename: report_${Date.now()}.xlsx,
userId: localStorage.getItem('userId')
})
});
return await response.json();
}
// 使用示例
function handleExportClick() {
const taskId = taskManager.createTask(
export_${Date.now()},
exportDataTask,
{ type: 'data_export' }
);
taskManager.enqueue(taskId, 1); // 高优先级
// 监听任务状态变化
const interval = setInterval(() => {
const task = taskManager.getTaskStatus(taskId);
if (task) {
document.getElementById('progress').textContent = ${Math.round(task.progress)}%;
if (task.status === 'completed' || task.status === 'failed') {
clearInterval(interval);
if (task.status === 'completed') {
alert('导出完成!');
window.open(task.result.downloadUrl);
} else {
alert('导出失败,请重试');
}
}
}
}, 500);
}
最大的坑:并发控制和内存泄漏
开始没想到并发控制会有这么大的坑。最开始的版本没有限制同时运行的任务数量,用户点击多个导出按钮,系统就会瞬间启动几十个任务,浏览器直接卡死。这个问题折腾了半天才发现,CPU占用率达到100%,内存占用也疯涨。
后来加了并发控制,限制最多同时运行3个任务,这样体验就正常了。不过这里需要注意,如果某些任务特别耗时,可能会阻塞后面的队列,所以还得加上优先级控制。
另一个比较麻烦的问题是内存泄漏。每次创建任务都会生成闭包,如果任务特别多,GC压力会很大。这里踩过好几次坑,最后的解决方案是定期清理已完成的任务:
// 定期清理已完成的任务
setInterval(() => {
const now = Date.now();
for (const [id, task] of this.tasks.entries()) {
if (task.status === 'completed' && now - task.createdAt > 300000) { // 5分钟后自动清理
this.tasks.delete(id);
}
}
}, 60000); // 每分钟检查一次
API交互中的状态管理
和后端API的交互这部分也踩了不少坑。前端任务的状态要和后端保持一致,特别是长时间运行的任务,需要轮询后端获取最新状态。
// 同步后端状态
async function syncBackendTask(taskId) {
try {
const response = await fetch(https://jztheme.com/api/task/${taskId}, {
headers: {
'Authorization': Bearer ${localStorage.getItem('token')}
}
});
const backendStatus = await response.json();
const localTask = taskManager.getTaskStatus(taskId);
if (localTask) {
localTask.progress = backendStatus.progress;
localTask.status = backendStatus.status;
localTask.result = backendStatus.result;
}
} catch (error) {
console.error('Failed to sync backend task:', error);
}
}
这里有个问题就是轮询频率不好控制,太频繁会影响服务器性能,太慢用户体验又不好。最后设置成根据任务进度动态调整轮询间隔:
// 动态轮询
function startDynamicPolling(taskId) {
let pollInterval = 1000; // 起始间隔1秒
const maxInterval = 10000; // 最大间隔10秒
const poll = async () => {
const task = taskManager.getTaskStatus(taskId);
if (task && task.status === 'running') {
await syncBackendTask(taskId);
// 根据进度调整轮询间隔
if (task.progress > 80) {
pollInterval = Math.min(pollInterval * 1.2, maxInterval);
} else {
pollInterval = 1000; // 前期快速响应
}
setTimeout(poll, pollInterval);
}
};
poll();
}
最终效果评估
最终跑起来效果还不错,用户反馈说任务处理比之前流畅多了。最大的改进是前端不会因为大量任务而卡死,而且有明确的进度提示,用户体验提升明显。
但是这个方案也不是完美无缺的。任务取消功能还有些问题,对于已经在执行的异步操作很难真正中断,只能标记为已取消。另外,复杂的任务依赖关系还没有很好的支持,这个在后续版本中可能要考虑加入。
性能方面,经过优化后CPU占用率基本维持在正常范围,内存泄漏的问题也通过定时清理得到了缓解。不过对于特别大量的任务队列,还是建议分批处理。
一些小优化
还有一些小的优化点,比如添加了任务分组功能,不同类型的任务可以在UI上分开显示:
// 任务分组统计
function getTaskStatsByType() {
const stats = {};
for (const task of this.tasks.values()) {
if (!stats[task.type]) {
stats[task.type] = {
total: 0,
completed: 0,
running: 0,
failed: 0
};
}
stats[task.type].total++;
switch (task.status) {
case 'completed':
stats[task.type].completed++;
break;
case 'running':
stats[task.type].running++;
break;
case 'failed':
stats[task.type].failed++;
break;
}
}
return stats;
}
这个功能让用户能更好地了解不同类型任务的执行情况,对于运维和问题排查都有帮助。
回顾与反思
这次任务系统的重构算是比较成功的,至少解决了之前的手动管理混乱问题。虽然还有一些细节可以优化,比如更好的错误恢复机制、更细粒度的权限控制等,但整体架构已经比较稳定了。
最大的收获是意识到异步任务管理不能想当然地处理,一定要考虑到并发、状态同步、资源消耗这些实际问题。现在回头看之前的代码,确实太天真了。
以上是我踩坑后的总结,希望对你有帮助。这个方案应该还有不少优化空间,有更好实现方式的朋友欢迎交流。

暂无评论