如何实现类似ChatGPT的打字机效果

HTTP 响应体流

HTTP 响应体流是什么

单次 HTTP 请求对应一条响应;响应体暴露为 ReadableStream,浏览器用 fetch 拿到 res.body 后,通过 getReader() 循环 read() 按块消费,像边下载边读一样

HTTP 数据格式

正文多为连续 UTF-8 字节流(例如 Content-Type: text/plain),没有统一的应用层分帧规范;是否在分块边界截断、如何拼接成字符串由业务约定(下文示例把整个回答收成一段纯文本)。

代码实现

服务端代码实现

原始 HTTP 分块:正文为连续 UTF-8 文本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
app.post("/api/chat", async (req, res) => {
try {
const parsed = chatBodySchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({ error: parsed.error.flatten() });
return;
}

// 调用模型,返回流式响应数据
const stream = await invokeChatStream(parsed.data.message);
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.setHeader("Cache-Control", "no-cache, no-transform");
res.setHeader("X-Content-Type-Options", "nosniff");
res.flushHeaders();

for await (const chunk of stream) {
const text = getChunkText(chunk);
if (text) res.write(text);
}
res.end();
} catch (err) {
console.error(err);
if (!res.headersSent) {
res.status(500).json({ error: "internal_server_error" });
} else {
res.end();
}
}
});
前端代码实现

ReadableStream + getReader()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
export async function readPlainTextStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
signal: AbortSignal,
onAccumulated: (fullText: string) => void
): Promise<string> {
const decoder = new TextDecoder();
let accumulated = "";

while (true) {
const { done, value } = await reader.read();
if (done) break;
if (signal.aborted) {
await reader.cancel().catch(() => {});
return accumulated;
}
accumulated += decoder.decode(value, { stream: true });
onAccumulated(accumulated);
}
accumulated += decoder.decode();
onAccumulated(accumulated);
return accumulated;
}


const sendViaPlainReadableStream = async () => {
const text = message.trim();
if (!text || streaming) return;

setError(null);
setResponse("");
abortRef.current?.abort();
abortRef.current = new AbortController();
const { signal } = abortRef.current;
setStreaming(true);

try {
const res = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: text }),
signal,
});
if (!res.ok) {
setError(`请求失败 (${res.status})`);
return;
}
if (!res.body) {
setError("无法流式读取响应体");
return;
}

const reader = res.body.getReader();
const finalText = await readPlainTextStream(reader, signal, setResponse);
setResponse(finalText);
} catch (err) {
if (err && err.name === "AbortError") return;
setError("网络异常");
} finally {
setStreaming(false);
}
};

SSE

SSE 是什么

SSE(Server-Sent Events)是 HTML 标准中的机制:在 单次 HTTP 响应 里由服务端 持续推送 文本事件。与 WebSocket全双工 不同,SSE 默认语义是 服务端 → 客户端 单向,但基于 HTTP

SSE 数据格式

文本协议:事件块由若干行字段 + 空行 \n\n 结束;常见为 data: 行,可带注释及可选字段 event:id:retry: 等。适合按「一条事件一条事件」解析(示例中每帧 JSON 带 delta),边界比裸字节流更清晰。

最简一帧示例:

1
data: 消息内容

带上所有字段的示例:

1
2
3
4
event: message
id: 1
retry: 3000
data: 消息内容

代码实现

服务端代码实现

SSE:text/event-stream,帧为 data: {"delta":"..."}\\n\\n,结束 data: [DONE]\\n\\n

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
app.post("/api/chat/sse", async (req, res) => {
try {
const parsed = chatBodySchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({ error: parsed.error.flatten() });
return;
}

// 调用模型,返回流式响应数据
const stream = await invokeChatStream(parsed.data.message);
writeSseHeaders(res);

for await (const chunk of stream) {
const text = getChunkText(chunk);
if (text) res.write(sseDataLine({ delta: text }));
}
res.write(sseDone());
res.end();
} catch (err) {
console.error(err);
if (!res.headersSent) {
res.status(500).json({ error: "internal_server_error" });
} else {
res.end();
}
}
});
前端代码实现

按SSE格式把多个帧拼好

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async function readSseDeltaStream(reader, signal, onDelta) {
const decoder = new TextDecoder();
let carry = "";

function consumeBlock(block) {
const dataLines = block
.split("\n")
.filter((l) => l.startsWith("data:"))
.map((l) => l.slice(5).trimStart());
if (!dataLines.length) return;
const raw = dataLines.join("\n");
if (raw === "[DONE]") return;
try {
const { delta } = JSON.parse(raw);
if (typeof delta === "string" && delta) onDelta(delta);
} catch {
/* 非 JSON 帧忽略 */
}
}

while (true) {
const { done, value } = await reader.read();
if (done) break;
if (signal.aborted) {
await reader.cancel().catch(() => {});
return;
}
carry += decoder.decode(value, { stream: true });
const parts = carry.split("\n\n");
carry = parts.pop() ?? "";
for (const b of parts) {
if (b.trim()) consumeBlock(b);
}
}
carry += decoder.decode();
for (const b of carry.split("\n\n")) {
if (b.trim()) consumeBlock(b);
}
}

const sendViaSse = async () => {
const text = message.trim();
if (!text || streaming) return;

setError(null);
setResponse("");
abortRef.current?.abort();
abortRef.current = new AbortController();
const { signal } = abortRef.current;
setStreaming(true);

try {
const res = await fetch("/api/chat/sse", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: text }),
signal,
});
if (!res.ok) {
setError(`请求失败 (${res.status})`);
return;
}
if (!res.body) {
setError("无法流式读取响应体");
return;
}

const reader = res.body.getReader();
let accumulated = "";
await readSseDeltaStream(reader, signal, (delta) => {
accumulated += delta;
setResponse(accumulated);
});
setResponse(accumulated);
} catch (err) {
if (err && err.name === "AbortError") return;
setError("网络异常");
} finally {
setStreaming(false);
}
};

WebSocket

HTTP Upgrade → 同一条 TCP 上 WebSocket 全双工

WebSocket 是什么

客户端与服务器通过 HTTP 升级Upgrade: websocket)建立连接后,在 同一套接字双向 收发消息。适合多轮对话、控制信令与数据流同通道共存;客户端可随时再发指令(如中止、改参数),不必绑在「单次请求—单次响应」上。

WebSocket 数据格式

为单位,载荷可以是 文本二进制;应用层协议自定。下文示例约定 JSON:客户端发 { type: "chat", message },服务端多条 { type: "delta", delta },结束 { type: "done" }

WebSocket 适用场景

强交互(中断、多路消息、后续还要继续发控制类消息)、双向 频繁通信、不满足「一个请求一路推到底」时使用。需自行处理重连、心跳、鉴权及与 Cookie/同源相关约束;链路上要支持 Upgrade 与长连接(本地开发时反向代理需显式支持 ws,例如 setupProxyws: true)。

代码实现

服务端代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
export function attachChatWebSocket(server: Server) {
const wss = new WebSocketServer({ server, path: "/api/ws/chat" });

wss.on("connection", (socket) => {
socket.on("message", async (raw) => {
let parsed: z.infer<typeof wsChatSchema>;
try {
const json = JSON.parse(String(raw));
const r = wsChatSchema.safeParse(json);
if (!r.success) {
safeSend(socket, { type: "error", error: r.error.flatten() });
return;
}
parsed = r.data;
} catch {
safeSend(socket, { type: "error", message: "invalid_json" });
return;
}

try {
const stream = await invokeChatStream(parsed.message);
for await (const chunk of stream) {
if (socket.readyState !== WebSocket.OPEN) return;
const text = getChunkText(chunk);
if (text) safeSend(socket, { type: "delta", delta: text });
}
safeSend(socket, { type: "done" });
} catch (err) {
console.error(err);
safeSend(socket, { type: "error", message: "internal_server_error" });
}
});
});
}
前端代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/** WS 协议:`{type:"chat",message}` → 多条 `{type:"delta",delta}` → `{type:"done"}`。development 下先打 `/api/health`,避免 CRA 代理尚未注册 upgrade 时首连 WS 挂起。 */
export function runWsChatStream(
message: string,
signal: AbortSignal,
onAccumulated: (text: string) => void
): Promise<void> {
const proto = window.location.protocol === "https:" ? "wss" : "ws";
const url = `${proto}://${window.location.host}/api/ws/chat`;

const primeDevProxy = (): Promise<void> => {
if (process.env.NODE_ENV !== "development") return Promise.resolve();
if (signal.aborted) return Promise.resolve();
return fetch("/api/health", { signal }).then(
() => {},
() => {}
);
};

return primeDevProxy().then(() => {
if (signal.aborted) return Promise.resolve();

return new Promise<void>((resolve, reject) => {
const ws = new WebSocket(url);
let accumulated = "";
let settled = false;

const finish = (fn: () => void) => {
if (settled) return;
settled = true;
signal.removeEventListener("abort", onAbort);
fn();
};

const onAbort = () => {
ws.close();
finish(() => resolve());
};
signal.addEventListener("abort", onAbort);

ws.onopen = () => {
ws.send(JSON.stringify({ type: "chat", message }));
};

ws.onmessage = (ev) => {
try {
const data = JSON.parse(String(ev.data)) as {
type?: string;
delta?: string;
message?: string;
error?: { fieldErrors?: { message?: string[] } };
};
if (data.type === "delta" && typeof data.delta === "string") {
accumulated += data.delta;
onAccumulated(accumulated);
} else if (data.type === "done") {
finish(() => resolve());
ws.close();
} else if (data.type === "error") {
finish(() => reject(new Error(formatWsServerError(data))));
ws.close();
}
} catch {
finish(() => reject(new Error("invalid_server_message")));
ws.close();
}
};

ws.onerror = () => {
finish(() => reject(new Error("WebSocket 连接错误")));
};

ws.onclose = () => {
if (settled) return;
finish(() => {
if (signal.aborted) resolve();
else if (!accumulated) reject(new Error("连接已关闭"));
else resolve();
});
};
});
});
}

如何选型

  • 只需要简单的让模型流式输出,直接用HTTP流SSE 的方案足矣
  • 要在 同一条连接频繁来回发时,使用WebSocket 更合适