Stream流开发实战从入门到进阶的那些事儿

Mr-子怡 前端 阅读 2,816
赞 15 收藏
二维码
手机扫码查看
反馈

我的写法,亲测靠谱

先说说我用Stream流的一些习惯吧。最近在处理一个大文件上传的需求,服务器要求分片上传,前端要对文件进行切割和并发控制。这种场景下,Stream流简直是神器。

Stream流开发实战从入门到进阶的那些事儿

核心代码其实就这几行:

function* createFileChunk(file, size = 1024 * 1024) {
    let current = 0;
    while (current < file.size) {
        yield {
            chunk: file.slice(current, current + size),
            index: Math.floor(current / size)
        };
        current += size;
    }
}

const uploadStream = async (file) => {
    const chunks = createFileChunk(file);
    for (let chunk of chunks) {
        await fetch('https://jztheme.com/api/upload', {
            method: 'POST',
            body: chunk.chunk
        });
    }
}

这里有几个关键点:第一,我用了Generator函数来创建迭代器,这样可以精确控制每次的yield。第二,fetch请求放在for…of循环里,配合async/await实现串行上传。这样做最大的好处是可控性很强,不会一下子把所有分片都塞进内存。

这几种错误写法,别再踩坑了

说真的,在用Stream流的过程中,我踩过的坑还真不少。最常见的一种错误写法是这样的:

// 错误示范
const chunks = [];
for (let i = 0; i < file.size; i += chunkSize) {
    chunks.push(file.slice(i, i + chunkSize));
}
chunks.forEach(async (chunk) => {
    await fetch('https://jztheme.com/api/upload', { 
        method: 'POST',
        body: chunk 
    });
});

这段代码看起来没毛病,实际运行会出大事。问题出在forEach上,它不会等待异步操作完成,导致所有分片几乎同时发起请求,瞬间就把带宽占满了。

另一个常见的坑是直接用Blob的stream()方法:

// 另一个坑
file.stream().getReader().read().then(({done, value}) => {
    // 处理数据
});

这个写法的问题在于它太底层了,你需要手动处理读取结束、错误处理、backpressure(背压)等问题。我就曾经因为没处理好背压,导致内存占用飙升,页面直接卡死。

实际项目中的坑

在真实项目中,最容易忽略的就是错误重试机制。记得去年做一个视频处理的项目,用户上传4K视频,经常会出现网络波动导致上传中断。当时我就想着简单加个try-catch完事:

try {
    await fetch('https://jztheme.com/api/upload', { 
        method: 'POST',
        body: chunk 
    });
} catch(e) {
    console.error('上传失败');
}

结果被打脸打得很惨。这种写法根本没法自动重试,用户体验特别差。后来改成了这样:

const retryFetch = async (url, options, retries = 3) => {
    try {
        return await fetch(url, options);
    } catch(e) {
        if(retries > 0) {
            return retryFetch(url, options, retries - 1);
        }
        throw e;
    }
}

加上指数退避算法后更完美:

const retryFetch = async (url, options, retries = 3, backoff = 300) => {
    try {
        return await fetch(url, options);
    } catch(e) {
        if(retries > 0) {
            await new Promise(resolve => setTimeout(resolve, backoff));
            return retryFetch(url, options, retries - 1, backoff * 2);
        }
        throw e;
    }
}

这个方案不是最完美的,但胜在简单实用。至少用户不用再频繁遇到上传失败的情况了。

性能优化的小技巧

关于性能优化,我总结了几点实战经验:

  • 控制并发数:不要一股脑把所有分片都并发出去,用Promise.all或者自己实现一个简单的并发控制器
  • 合理设置缓冲区大小:太大容易内存溢出,太小又会影响性能,我一般设在1-5MB之间
  • 及时释放引用:处理完的数据要及时置null,让GC能尽快回收

这里有个简单的并发控制示例:

class ConcurrencyControl {
    constructor(max) {
        this.max = max;
        this.queue = [];
        this.activeCount = 0;
    }

    add(task) {
        return new Promise((resolve, reject) => {
            this.queue.push({ task, resolve, reject });
            this.next();
        });
    }

    next() {
        if (this.activeCount < this.max && this.queue.length) {
            const { task, resolve, reject } = this.queue.shift();
            this.activeCount++;
            task()
                .then(resolve)
                .catch(reject)
                .finally(() => {
                    this.activeCount--;
                    this.next();
                });
        }
    }
}

结尾唠叨两句

以上是我总结的Stream流使用经验,希望能帮到正在踩坑的你。说实话,这些方案都不是完美的,但在实际项目中确实能解决问题。有更好的方案欢迎评论区交流,前端这条路,大家一起进步。

本文章不代表JZTHEME立场,仅为作者个人观点 / 研究心得 / 经验分享,旨在交流探讨,供读者参考。
发表评论

暂无评论