Trong các ứng dụng AI, độ trễ (latency) là một rào cản lớn. Một Agent phức tạp có thể mất 10-20 giây để hoàn thành nhiệm vụ. Nếu người dùng chỉ nhìn thấy một vòng quay loading, họ sẽ cảm thấy ứng dụng bị treo. Streaming cho phép bạn "hé lộ" tiến trình xử lý ngay lập tức, giúp ứng dụng phản hồi nhanh và chuyên nghiệp hơn.
I. Các chế độ Streaming (Stream Modes)
LangGraph hỗ trợ nhiều chế độ streaming khác nhau tùy thuộc vào dữ liệu bạn muốn hiển thị:
| Chế độ | Mô tả | Ứng dụng |
|---|---|---|
values | Trả về toàn bộ State sau mỗi bước của Graph. | Theo dõi sự thay đổi của dữ liệu tổng thể. |
updates | Chỉ trả về phần dữ liệu thay đổi từ các Node vừa chạy xong. | Theo dõi Agent đang chạy đến node nào và cập nhật cái gì. |
messages | Trả về từng token từ LLM. | Hiển thị văn bản phản hồi chạy dần dần (kiểu ChatGPT). |
custom | Trả về dữ liệu tùy chỉnh do bạn tự định nghĩa. | Hiển thị trạng thái: "Đang tìm kiếm...", "Đang phân tích...". |
II. Gửi dữ liệu tùy chỉnh với writer
Đôi khi bạn muốn gửi thông báo cho người dùng từ bên trong một Node trước khi Node đó kết thúc (ví dụ: thông báo đang gọi một API tốn thời gian). Bạn có thể sử dụng tham số config.writer. Dữ liệu gửi qua writer sẽ được bắt bởi chế độ streamMode: "custom".
III. Mã nguồn: 11-streaming-real-time.ts
Dưới đây là mã nguồn minh họa việc kết hợp stream cả cập nhật node (updates) và dữ liệu tùy chỉnh (custom).
// path: 11-streaming-real-time.ts
import { END, START, StateGraph } from '@langchain/langgraph';
import { generateImage } from '@workspace/util-langchain';
import z from 'zod';
const State = z.object({
topic: z.string(),
result: z.string().optional(),
});
// Define node with access to 'config' for custom writing
async function thinkingNode(state: any, config: any) {
// Send custom status update using writer
config.writer({ message: 'Agent is thinking deeply about ' + state.topic });
// Simulate heavy processing or LLM call
await new Promise((r) => setTimeout(r, 1000));
return { result: 'AI is the future of humanity.' };
}
const workflow = new StateGraph(State)
.addNode('think', thinkingNode)
.addEdge(START, 'think')
.addEdge('think', END)
.compile();
async function run() {
await generateImage(
workflow,
'graph-ignore/scripts-11-streaming-real-time.jpg',
);
console.log('--- Starting Real-time Stream ---\n');
// Stream both node updates and custom messages simultaneously
const stream = await workflow.stream(
{ topic: 'Artificial Intelligence' },
{ streamMode: ['updates', 'custom'] },
);
for await (const [mode, chunk] of stream) {
if (mode === 'custom') {
// Catch data from config.writer
console.log('💬 [STATUS UPDATE]:', chunk.message);
} else if (mode === 'updates') {
// Catch when a node finishes and returns data
const nodeName = Object.keys(chunk)[0];
console.log('✅ [NODE COMPLETE]:', nodeName);
}
}
}
run();IV. Giải thích cơ chế
- Async Generator: Hàm
workflow.streamtrả về một trình tạo bất đồng bộ. Chúng ta sử dụng vòng lặpfor awaitđể tiêu thụ các mảnh dữ liệu (chunks) ngay khi chúng được tạo ra. - Đa chế độ (Multi-mode): Bằng cách truyền mảng
['updates', 'custom'], chúng ta có thể nhận được cả thông tin về vòng đời của graph và các thông điệp nghiệp vụ cùng lúc. - Cải thiện UX: Người dùng sẽ nhìn thấy dòng chữ "Agent is thinking..." ngay giây đầu tiên, thay vì phải đợi 1 giây sau mới thấy kết quả cuối cùng.
Sơ đồ hoạt động:

V. Tổng kết
- Streaming biến ứng dụng AI từ trạng thái "tĩnh" (đợi và nhận) sang "động" (tương tác liên tục).
streamMode: "messages"là chìa khóa để hiển thị văn bản theo phong cách real-time tokens.config.writerlà công cụ mạnh mẽ để báo cáo tiến độ cho người dùng trong các tác vụ chạy lâu (long-running tasks).
Ở bài học tiếp theo, chúng ta sẽ học về Interruption - cách kết hợp Streaming với việc tạm dừng Graph để chờ con người trực tiếp can thiệp và điều hướng Agent!
👉 Bài tiếp theo: 12. Interruption - Human-in-the-loop (HITL)