Stream流实战:高效处理数据的现代前端技巧

ლ瑞玲 前端 阅读 2,929
赞 39 收藏
二维码
手机扫码查看
反馈

项目初期的技术选型

去年底接了个数据看板类的项目,客户要求实时展示后台数据变化,比如订单状态更新、库存变动这些。一开始我本能地想用 WebSocket,但后端同学说他们用的是 SSE(Server-Sent Events),而且已经封装好了接口。行吧,那就用 SSE。不过前端这边得自己处理流式数据的接收和渲染。

Stream流实战:高效处理数据的现代前端技巧

说实话,之前对 Stream 流的理解还停留在 Node.js 的文件读写层面,浏览器里的 ReadableStream 真没怎么碰过。但这次 SSE 返回的就是一个 MessageEvent 流,直接处理 text 也行,但总觉得不够“现代”。于是决定试试用 fetch + ReadableStream 来接管整个数据流——虽然最后发现这选择有点自讨苦吃。

核心代码就这几行

先贴个基础版,能跑通的那种:

const response = await fetch('https://jztheme.com/api/stream-data');
if (!response.body) {
  throw new Error('ReadableStream not supported');
}

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  const chunk = decoder.decode(value, { stream: true });
  // 这里处理 chunk,比如解析 JSON 并更新 UI
  handleChunk(chunk);
}

看起来挺清爽是吧?但实际项目哪有这么简单。数据不是整块来的,可能一个 JSON 对象被切成好几段,甚至跨多个 chunk。比如你收到 "{"id":1,"sta",下一段才是 "tus":"paid"}"。直接 JSON.parse 肯定炸。

最大的坑:数据边界对不齐

开始我以为只要拼字符串就行,搞了个 buffer:

let buffer = '';
function handleChunk(chunk) {
  buffer += chunk;
  const lines = buffer.split('n');
  // 最后一行可能是不完整的,留着下次拼
  buffer = lines.pop() || '';
  for (const line of lines) {
    if (line.trim()) {
      try {
        const data = JSON.parse(line);
        updateUI(data);
      } catch (e) {
        console.warn('Invalid JSON:', line);
      }
    }
  }
}

结果线上一跑,CPU 直接飙到 60%。排查半天才发现,buffer += chunk 在高频数据下会产生大量中间字符串,GC 压力巨大。而且 split('n') 每次都新建数组,更雪上加霜。

后来改用 Uint8Array 缓冲区,配合手动找换行符位置:

let buffer = new Uint8Array(0);

function appendBuffer(chunk) {
  const tmp = new Uint8Array(buffer.length + chunk.length);
  tmp.set(buffer);
  tmp.set(chunk, buffer.length);
  return tmp;
}

function handleChunk(chunk) {
  buffer = appendBuffer(chunk);
  let start = 0;
  while (true) {
    const idx = buffer.indexOf(10, start); // 10 是 'n' 的 ASCII
    if (idx === -1) break;
    const lineBytes = buffer.slice(start, idx);
    const line = new TextDecoder().decode(lineBytes);
    if (line.trim()) {
      try {
        const data = JSON.parse(line);
        updateUI(data);
      } catch (e) {
        console.warn('Parse error:', line);
      }
    }
    start = idx + 1;
  }
  // 保留未完成的部分
  buffer = buffer.slice(start);
}

性能好多了,但代码复杂度翻倍。而且这里有个隐藏问题:如果某条消息特别大(比如超过缓冲区预分配大小),appendBuffer 会频繁扩容,还是有性能抖动。

又踩坑了:内存泄漏

测试时发现页面开半小时,内存占用涨了 300MB。一看是 reader.read() 的 promise 链没正确中断。用户切走页面时,stream 还在后台默默读取,buffer 越积越大。

解决方案是在组件卸载时主动 cancel reader:

let reader = null;

// 在 setup 阶段
reader = response.body.getReader();

// 在 teardown 阶段(比如 React 的 useEffect cleanup)
return () => {
  if (reader) {
    reader.cancel();
    reader = null;
  }
};

但注意:reader.cancel() 只是通知 stream 停止,不会立即释放内存。最好配合 AbortController 更稳妥:

const controller = new AbortController();
const signal = controller.signal;

fetch(url, { signal })
  .then(res => { /* ... */ });

// cleanup
controller.abort();

这个组合拳打完,内存问题才彻底解决。

最终的解决方案

折腾两轮后,我意识到:对于 SSE 场景,其实没必要硬上 ReadableStream。SSE 协议本身以 data: 开头、nn 结尾,天然有分隔符。直接用原生 EventSource 更省事:

const eventSource = new EventSource('https://jztheme.com/api/sse-endpoint');
eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  updateUI(data);
};
eventSource.onerror = (err) => {
  console.error('SSE error:', err);
};

但项目中途已经写了太多基于 Stream 的逻辑,重构成本太高。所以最后折中:保留 Stream 方案,但优化两点:

  • 用固定大小的环形缓冲区替代动态扩容的 Uint8Array(预估最大消息长度为 4KB)
  • 加入节流,避免 UI 更新太频繁(毕竟人眼也看不出毫秒级变化)

关键代码片段:

const MAX_BUFFER_SIZE = 4096;
let buffer = new Uint8Array(MAX_BUFFER_SIZE);
let writeIndex = 0;

function handleChunk(chunk) {
  if (writeIndex + chunk.length > MAX_BUFFER_SIZE) {
    console.warn('Buffer overflow, dropping data');
    writeIndex = 0;
    return;
  }
  buffer.set(chunk, writeIndex);
  writeIndex += chunk.length;

  // 查找所有完整消息(以 nn 结尾)
  let searchStart = 0;
  while (searchStart < writeIndex - 1) {
    const idx = findDoubleNewline(buffer, searchStart, writeIndex);
    if (idx === -1) break;
    const messageBytes = buffer.slice(0, idx);
    const messageText = new TextDecoder().decode(messageBytes);
    // 解析并更新 UI(略)
    processMessage(messageText);
    // 移动剩余数据到 buffer 开头
    const remaining = buffer.slice(idx + 2, writeIndex);
    buffer.set(remaining);
    writeIndex = remaining.length;
    searchStart = 0;
  }
}

虽然 findDoubleNewline 还得自己实现,但至少内存可控了。

回顾与反思

这次 Stream 实战下来,有几个体会:

  • 别为了用新技术而用:SSE 用 EventSource 足够,硬上 ReadableStream 纯属给自己加戏
  • 流处理的核心是边界处理:90% 的 bug 出在数据切片不对齐
  • 内存管理比想象中重要:尤其移动端,buffer 泄漏很快导致页面卡死

最终效果还行,数据延迟控制在 200ms 内,内存稳定。但说实话,如果重来一次,我会直接用 EventSource + 简单字符串拼接——毕竟项目 deadline 比技术炫技重要多了。

现在还有个小问题没解决:当网络抖动时,Stream 会断开但不会自动重连。目前靠外层定时检测状态手动 reconnect,有点糙。不过客户说能接受,也就没再深究。

以上是我踩坑后的总结,希望对你有帮助。如果你有更好的 Stream 处理方案,欢迎评论区交流!

本文章不代表JZTHEME立场,仅为作者个人观点 / 研究心得 / 经验分享,旨在交流探讨,供读者参考。
发表评论

暂无评论