HTTP 响应体流 HTTP 响应体流是什么 单次 HTTP 请求对应一条响应;响应体暴露为 ReadableStream,浏览器用 fetch 拿到 res.body 后,通过 getReader() 循环 read() 按块消费,像边下载边读一样
HTTP 数据格式 正文多为连续 UTF-8 字节流(例如 Content-Type: text/plain),没有 统一的应用层分帧规范;是否在分块边界截断、如何拼接成字符串由业务约定(下文示例把整个回答收成一段纯文本)。
代码实现 服务端代码实现
原始 HTTP 分块:正文为连续 UTF-8 文本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 app.post ("/api/chat" , async (req, res) => { try { const parsed = chatBodySchema.safeParse (req.body ); if (!parsed.success ) { res.status (400 ).json ({ error : parsed.error .flatten () }); return ; } const stream = await invokeChatStream (parsed.data .message ); res.setHeader ("Content-Type" , "text/plain; charset=utf-8" ); res.setHeader ("Cache-Control" , "no-cache, no-transform" ); res.setHeader ("X-Content-Type-Options" , "nosniff" ); res.flushHeaders (); for await (const chunk of stream) { const text = getChunkText (chunk); if (text) res.write (text); } res.end (); } catch (err) { console .error (err); if (!res.headersSent ) { res.status (500 ).json ({ error : "internal_server_error" }); } else { res.end (); } } });
前端代码实现
ReadableStream + getReader()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 export async function readPlainTextStream ( reader: ReadableStreamDefaultReader<Uint8Array >, signal: AbortSignal, onAccumulated: (fullText: string) => void ): Promise <string> { const decoder = new TextDecoder (); let accumulated = "" ; while (true ) { const { done, value } = await reader.read (); if (done) break ; if (signal.aborted ) { await reader.cancel ().catch (() => {}); return accumulated; } accumulated += decoder.decode (value, { stream : true }); onAccumulated (accumulated); } accumulated += decoder.decode (); onAccumulated (accumulated); return accumulated; } const sendViaPlainReadableStream = async ( ) => { const text = message.trim (); if (!text || streaming) return ; setError (null ); setResponse ("" ); abortRef.current ?.abort (); abortRef.current = new AbortController (); const { signal } = abortRef.current ; setStreaming (true ); try { const res = await fetch ("/api/chat" , { method : "POST" , headers : { "Content-Type" : "application/json" }, body : JSON .stringify ({ message : text }), signal, }); if (!res.ok ) { setError (`请求失败 (${res.status} )` ); return ; } if (!res.body ) { setError ("无法流式读取响应体" ); return ; } const reader = res.body .getReader (); const finalText = await readPlainTextStream (reader, signal, setResponse); setResponse (finalText); } catch (err) { if (err && err.name === "AbortError" ) return ; setError ("网络异常" ); } finally { setStreaming (false ); } };
SSE SSE 是什么 SSE(Server-Sent Events)是 HTML 标准中的机制:在 单次 HTTP 响应 里由服务端 持续推送 文本事件。与 WebSocket 的 全双工 不同,SSE 默认语义是 服务端 → 客户端 单向,但基于 HTTP
SSE 数据格式 文本协议:事件块由若干行字段 + 空行 \n\n 结束;常见为 data: 行,可带注释及可选字段 event:、id:、retry: 等。适合按「一条事件一条事件」解析(示例中每帧 JSON 带 delta),边界比裸字节流更清晰。
最简一帧示例:
带上所有字段的示例:
1 2 3 4 event: message id: 1 retry: 3000 data: 消息内容
代码实现 服务端代码实现
SSE:text/event-stream,帧为 data: {"delta":"..."}\\n\\n,结束 data: [DONE]\\n\\n
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 app.post ("/api/chat/sse" , async (req, res) => { try { const parsed = chatBodySchema.safeParse (req.body ); if (!parsed.success ) { res.status (400 ).json ({ error : parsed.error .flatten () }); return ; } const stream = await invokeChatStream (parsed.data .message ); writeSseHeaders (res); for await (const chunk of stream) { const text = getChunkText (chunk); if (text) res.write (sseDataLine ({ delta : text })); } res.write (sseDone ()); res.end (); } catch (err) { console .error (err); if (!res.headersSent ) { res.status (500 ).json ({ error : "internal_server_error" }); } else { res.end (); } } });
前端代码实现
按SSE格式把多个帧拼好
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 async function readSseDeltaStream (reader, signal, onDelta ) { const decoder = new TextDecoder (); let carry = "" ; function consumeBlock (block ) { const dataLines = block .split ("\n" ) .filter ((l ) => l.startsWith ("data:" )) .map ((l ) => l.slice (5 ).trimStart ()); if (!dataLines.length ) return ; const raw = dataLines.join ("\n" ); if (raw === "[DONE]" ) return ; try { const { delta } = JSON .parse (raw); if (typeof delta === "string" && delta) onDelta (delta); } catch { } } while (true ) { const { done, value } = await reader.read (); if (done) break ; if (signal.aborted ) { await reader.cancel ().catch (() => {}); return ; } carry += decoder.decode (value, { stream : true }); const parts = carry.split ("\n\n" ); carry = parts.pop () ?? "" ; for (const b of parts) { if (b.trim ()) consumeBlock (b); } } carry += decoder.decode (); for (const b of carry.split ("\n\n" )) { if (b.trim ()) consumeBlock (b); } } const sendViaSse = async ( ) => { const text = message.trim (); if (!text || streaming) return ; setError (null ); setResponse ("" ); abortRef.current ?.abort (); abortRef.current = new AbortController (); const { signal } = abortRef.current ; setStreaming (true ); try { const res = await fetch ("/api/chat/sse" , { method : "POST" , headers : { "Content-Type" : "application/json" }, body : JSON .stringify ({ message : text }), signal, }); if (!res.ok ) { setError (`请求失败 (${res.status} )` ); return ; } if (!res.body ) { setError ("无法流式读取响应体" ); return ; } const reader = res.body .getReader (); let accumulated = "" ; await readSseDeltaStream (reader, signal, (delta ) => { accumulated += delta; setResponse (accumulated); }); setResponse (accumulated); } catch (err) { if (err && err.name === "AbortError" ) return ; setError ("网络异常" ); } finally { setStreaming (false ); } };
WebSocket
HTTP Upgrade → 同一条 TCP 上 WebSocket 全双工 帧
WebSocket 是什么 客户端与服务器通过 HTTP 升级 (Upgrade: websocket)建立连接后,在 同一套接字 上 双向 收发消息。适合多轮对话、控制信令与数据流同通道共存;客户端可随时再发指令(如中止、改参数),不必绑在「单次请求—单次响应」上。
WebSocket 数据格式 以 帧 为单位,载荷可以是 文本 或 二进制 ;应用层协议自定。下文示例约定 JSON:客户端发 { type: "chat", message },服务端多条 { type: "delta", delta },结束 { type: "done" }。
WebSocket 适用场景 强交互(中断、多路消息、后续还要继续发控制类消息)、双向 频繁通信、不满足「一个请求一路推到底」时使用。需自行处理重连、心跳、鉴权及与 Cookie/同源相关约束;链路上要支持 Upgrade 与长连接(本地开发时反向代理需显式支持 ws,例如 setupProxy 中 ws: true)。
代码实现 服务端代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 export function attachChatWebSocket (server: Server ) { const wss = new WebSocketServer ({ server, path : "/api/ws/chat" }); wss.on ("connection" , (socket ) => { socket.on ("message" , async (raw) => { let parsed : z.infer <typeof wsChatSchema>; try { const json = JSON .parse (String (raw)); const r = wsChatSchema.safeParse (json); if (!r.success ) { safeSend (socket, { type : "error" , error : r.error .flatten () }); return ; } parsed = r.data ; } catch { safeSend (socket, { type : "error" , message : "invalid_json" }); return ; } try { const stream = await invokeChatStream (parsed.message ); for await (const chunk of stream) { if (socket.readyState !== WebSocket .OPEN ) return ; const text = getChunkText (chunk); if (text) safeSend (socket, { type : "delta" , delta : text }); } safeSend (socket, { type : "done" }); } catch (err) { console .error (err); safeSend (socket, { type : "error" , message : "internal_server_error" }); } }); }); }
前端代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 export function runWsChatStream ( message: string, signal: AbortSignal, onAccumulated: (text: string) => void ): Promise <void > { const proto = window .location .protocol === "https:" ? "wss" : "ws" ; const url = `${proto} ://${window .location.host} /api/ws/chat` ; const primeDevProxy = (): Promise <void > => { if (process.env .NODE_ENV !== "development" ) return Promise .resolve (); if (signal.aborted ) return Promise .resolve (); return fetch ("/api/health" , { signal }).then ( () => {}, () => {} ); }; return primeDevProxy ().then (() => { if (signal.aborted ) return Promise .resolve (); return new Promise <void >((resolve, reject ) => { const ws = new WebSocket (url); let accumulated = "" ; let settled = false ; const finish = (fn: () => void ) => { if (settled) return ; settled = true ; signal.removeEventListener ("abort" , onAbort); fn (); }; const onAbort = ( ) => { ws.close (); finish (() => resolve ()); }; signal.addEventListener ("abort" , onAbort); ws.onopen = () => { ws.send (JSON .stringify ({ type : "chat" , message })); }; ws.onmessage = (ev ) => { try { const data = JSON .parse (String (ev.data )) as { type?: string; delta?: string; message?: string; error?: { fieldErrors?: { message?: string[] } }; }; if (data.type === "delta" && typeof data.delta === "string" ) { accumulated += data.delta ; onAccumulated (accumulated); } else if (data.type === "done" ) { finish (() => resolve ()); ws.close (); } else if (data.type === "error" ) { finish (() => reject (new Error (formatWsServerError (data)))); ws.close (); } } catch { finish (() => reject (new Error ("invalid_server_message" ))); ws.close (); } }; ws.onerror = () => { finish (() => reject (new Error ("WebSocket 连接错误" ))); }; ws.onclose = () => { if (settled) return ; finish (() => { if (signal.aborted ) resolve (); else if (!accumulated) reject (new Error ("连接已关闭" )); else resolve (); }); }; }); }); }
如何选型
只需要简单的让模型流式输出,直接用HTTP流 或 SSE 的方案足矣
要在 同一条连接 上 频繁来回发 时,使用WebSocket 更合适