渐进式 JSON:200 行代码实现流式传输,让页面加载“快如闪电”
在深入研究 react 服务端组件(React Server Components)时,偶然发现一篇关于“渐进式 JSON(progressive JSON)”的文章。作者 Dan Abramov 在文中介绍了一种从服务器向客户端分块流式传输 JSON 的技术,允许客户端在尚未接收完整个数据之前,提前开始渲染部分内容。对于大型数据集而言,这种方式能显著提升“感知性能”。这引发了笔者的好奇:要实现这样一个功能究竟需要多少代码?事实证明,这是一次颇为有趣的练习,最终诞生了一个约 200 行代码的小型库,名为 Streamson。本文即分享其构建过程。
Streamson 项目地址:https://github.com/krasimir/streamson
核心思路
渐进式 JSON 流式传输的核心思想在于:一旦部分数据准备就绪,立即将其发送至客户端,而非等待整个 JSON 结构完全生成后再统一返回。这在处理大型数据集或实时生成的数据时尤为实用。对于尚未就绪的部分,可先发送占位符,待数据准备好后,再由客户端替换为真实内容。示例如下:
{
"user": {
"id": 1,
"name": "John Doe",
"posts": [
{ "id": 101, "title": "First Post", "content": "..." },
{ "id": 102, "title": "Second Post", "content": "..." }
]
}
}假设用户信息可立即获取,而帖子内容需从数据库读取,存在一定延迟。传统做法是等待所有帖子加载完毕后再返回完整对象。而采用渐进式 JSON,可先发送占位符:
{
"user": {
"id": 1,
"name": "John Doe",
"posts": "_$1"
}
}待帖子加载完成后,再单独发送一个分块数据:
{
"_$1": [
{ "id": 101, "title": "First Post", "content": "..." },
{ "id": 102, "title": "Second Post", "content": "..." }
]
}客户端需具备识别占位符并在对应数据到达时完成替换的能力。
服务端实现
首先编写一个简单的函数,接收服务器响应对象(即通向客户端的通道)和待发送的数据对象:
function serve(res, data) {
res.setHeader("Content-Type", "application/x-ndjson; charset=utf-8");
res.setHeader("Transfer-Encoding", "chunked");
// 向客户端发送分块数据
res.write(JSON.stringify(...) + "\n");
res.write(JSON.stringify(...) + "\n");
// 全部完成后
res.end();
}关键点说明:
使用 application/x-ndjson 内容类型:NDJSON(Newline Delimited JSON,换行分隔 JSON)是一种便捷的流式传输格式,每行均为独立的 JSON 对象。这使得在一个响应中发送多个 JSON 对象成为可能,并以换行符进行分隔。
设置 Transfer-Encoding: chunked 头:该头信息告知客户端响应将以分块形式传输,因此不能依赖 Content-Length 判断数据结束。同时,连接将保持开启,直至调用 res.end()。
接下来需要对数据进行“分块化”处理。遍历数据对象,将需要后续发送的部分替换为占位符。当遇到异步数据(如 Promise)时,将其放入队列,待其完成后再作为独立分块发送。
以下是用于处理数据的函数:
function normalize(value) {
function walk(node) {
if (isPromise(node)) {
const id = getId();
registerPromise(node, id);
return id;
}
if (Array.isArray(node)) {
return node.map((item) => walk(item));
}
if (node && typeof node === "object") {
const out = {};
for (const [key, val] of Object.entries(node)) {
out[key] = walk(val);
}
return out;
}
return node;
}
return walk(value);
}该函数递归遍历数据对象。遇到 Promise 时,生成唯一占位符 ID 并注册该 Promise,等待其解析。数组和对象递归处理,原始值(如数字、字符串)则直接返回。
registerPromise 函数将 Promise 和占位符 ID 存入队列。当 Promise 解析成功时,将结果作为新分块发送给客户端:
let promises = [];
function registerPromise(promise, id) {
promises.push({ promise, id });
promise
.then((value) => {
send(id, value);
})
.catch((err) => {
console.error("Error resolving promise for path", err);
send(id, { error: "promise error", timeoutMs: TIMEOUT });
});
}send 函数负责将解析后的数据写入响应:
function send(id, value) {
res.write(JSON.stringify({ i: id, c: normalize(value) }) + "\n");
promises = promises.filter((p) => p.id !== id);
if (promises.length === 0) res.end();
}它会向客户端写入一个新的 JSON 行,包含占位符 ID 及对应数据。
当 Promise 处理完成后,从队列中移除。若队列中无待处理的 Promise,则调用 res.end() 结束响应。
完整服务端实现可参考:https://github.com/krasimir/streamson/blob/main/packages/streamson/lib/server.js
以下是一个可从服务端发送的对象示例:
const data = {
user: {
id: 1,
name: "John Doe",
posts: fetchPostsFromDatabase(), // 返回一个 Promise
},
};
async function fetchPostsFromDatabase() {
const posts = await database.query("SELECT * FROM posts WHERE userId = 1");
return posts.map((post) => ({
id: post.id,
title: post.title,
content: post.content,
comments: fetchCommentsForPost(post.id), // 同样返回 Promise
}));
}注意,每个帖子中的 comments 字段也是一个 Promise,这意味着评论数据将在帖子数据发送之后,作为单独分块传送给客户端。
客户端实现
客户端需处理从服务器接收的分块数据,并将占位符替换为真实内容。可使用 Fetch api 发起请求,并将响应作为流读取。遇到占位符时,用 Promise 替代;待实际数据到达时,再解析该 Promise。核心逻辑大致如下:
try {
const res = await fetch(endpoint);
const reader = res.body.getReader();
const decoder = new TextDecoder();
async function process() {
let done = false;
while (!done) {
const { value, done: readerDone } = await reader.read();
done = readerDone;
if (value) {
try {
const chunk = JSON.parse(decoder.decode(value, { stream: true }));
chunk.c = walk(chunk.c);
if (promises.has(chunk.i)) {
promises.get(chunk.i)(chunk.c);
promises.delete(chunk.i);
}
} catch (e) {
console.error(`解析分块数据出错`, e);
}
}
}
}
process();
} catch (e) {
console.error(e);
throw new Error(`从 Streamson 接口 ${endpoint} 获取数据失败`);
}process 函数逐个读取响应流的分块。每个分块被解析为 JSON 后,调用 walk 函数将占位符替换为 Promise。若分块包含之前注册过的占位符 ID 对应的数据,则解析该 Promise。关键点在于 reader.read() —— 它允许等待新数据的到来。
以下是 walk 函数的实现,用于将占位符替换为 Promise:
function walk(node) {
if (isPromisePlaceholder(node)) {
return new Promise((done) => {
promises.set(node, done);
});
}
if (Array.isArray(node)) {
return node.map((item) => walk(item));
}
if (node && typeof node === "object") {
const out = {};
for (const [key, val] of Object.entries(node)) {
out[key] = walk(val);
}
return out;
}
return node;
}
function isPromisePlaceholder(val) {
return typeof val === "string" && val.match(/^_\$(\d)/);
}该函数逻辑与服务端的 normalize 函数相似。遇到占位符时返回一个新的 Promise,待实际数据到达时解析。数组和对象递归处理,原始值直接返回。占位符 ID 需与服务器生成的一致。
完整客户端实现可参考:https://github.com/krasimir/streamson/blob/main/packages/streamson/lib/client.js
服务端与客户端的代码合计仅 155 行 😎。
npm 包:Streamson
是的,这套实现已被封装为 NPM 库 —— Streamson! 👨💻
通过占位符分块流式传输 JSON,是一种颇具趣味的技术。它能显著提升 Web 应用的“感知性能”,尤其适用于处理大型数据集或动态生成的数据。通过让服务器在数据就绪时立即发送分块,客户端得以更早开始渲染页面,从而带来更优的用户体验。
只需同时掌控服务端与客户端,约 200 行 JavaScript 代码 即可实现。
现已将这套代码封装为 NPM 包,名为 Streamson。
安装命令如下:
npm install streamson在服务端的使用方式:
import { serve } from "streamson";
import express from "express";
const app = express();
const port = 5009;
app.get("/data", async (req, res) => {
const myData = {
title: "My Blog",
description: "A simple blog example using Streamson",
posts: getBlogPosts(), // 返回一个 Promise
};
serve(res, myData);
});
app.listen(port, () => {
console.log(`示例应用已启动,监听端口 ${port}`);
});客户端部分仅需约 1KB 的 JavaScript,可从以下地址获取:https://unpkg.com/streamson@latest/dist/streamson.min.js
引入后会得到一个全局函数 Streamson,使用方式如下:
const request = Streamson("/data");
const data = await request.get();
console.log(data.title); // "My Blog"
const posts = await request.get("posts");
console.log(posts); // 博客文章数组本文内容仅供个人学习、研究或参考使用,不构成任何形式的决策建议、专业指导或法律依据。未经授权,禁止任何单位或个人以商业售卖、虚假宣传、侵权传播等非学习研究目的使用本文内容。如需分享或转载,请保留原文来源信息,不得篡改、删减内容或侵犯相关权益。感谢您的理解与支持!