4. Xử lý song song & Reducers

Nam

Nam Hoang / Sep 04, 2025

8 min read

Trong bài học trước, chúng ta đã kết nối các Node theo một đường thẳng (Serial Flow). Tuy nhiên, trong thực tế, nếu bạn cần một Agent thực hiện nhiều tác vụ độc lập — ví dụ: vừa "Tra cứu tài liệu", vừa "Kiểm tra lịch sử mua hàng" — việc thực hiện tuần tự sẽ làm tăng độ trễ (latency) của ứng dụng. Bài học này sẽ giúp bạn giải quyết vấn đề đó bằng Parallel ExecutionReducers.

I. Parallel Execution (Thực thi song song)

LangGraph cho phép bạn "rẽ nhánh" luồng công việc. Khi một Node có nhiều Edge đi ra hướng tới các Node khác nhau, LangGraph sẽ kích hoạt tất cả các Node mục tiêu đó cùng một lúc trong cùng một Superstep.

Mô hình Fan-out / Fan-in:

  1. Fan-out (Rẽ nhánh): Node A kích hoạt cả Node B và Node C.
  2. Superstep: LangGraph chạy B và C đồng thời.
  3. Fan-in (Hội tụ): Cả B và C đều trỏ về Node D. LangGraph sẽ tự động đợi cho đến khi cả hai hoàn thành rồi mới chạy Node D với dữ liệu đã được tổng hợp.

II. Vấn đề ghi đè dữ liệu và Giải pháp Reducer

Khi chạy song song, một vấn đề nảy sinh: Nếu cả Node B và Node C cùng cập nhật vào mảng nlist trong State, Node nào hoàn thành sau sẽ ghi đè lên kết quả của Node hoàn thành trước. Bạn sẽ bị mất dữ liệu của một trong hai nhánh.

Reducer là một hàm đặc biệt được định nghĩa ngay trong State Schema. Nó đóng vai trò như một quy tắc: "Khi có dữ liệu mới gửi đến khóa này, đừng thay thế nó, hãy dùng hàm này để hợp nhất (merge) chúng lại".

III. Mã nguồn: 04-parallelism-reducers.ts

Dưới đây là mã nguồn minh họa cách thiết lập thực thi song song và sử dụng Reducer để nối mảng (concat) dữ liệu từ các nhánh.

// path: 04-parallelism-reducers.ts
import { END, START, StateGraph } from '@langchain/langgraph';
import { registry } from '@langchain/langgraph/zod';
import { generateImage } from '@workspace/util-langchain';
import z from 'zod';

// 1. Define State with Reducer
const StateDefinition = z.object({
  // Use .register to declare a Reducer for the 'nlist' field
  nlist: z.array(z.string()).register(registry, {
    reducer: {
      // 'left' is the current value in state, 'right' is the new update from a node
      fn: (left: string[], right: string[]) => left.concat(right),
    },
    default: () => [],
  }),
});

type State = z.infer<typeof StateDefinition>;

// 2. Define Nodes
function nodeA(state: State) {
  return { nlist: ['Started at A'] };
}

function nodeB(state: State) {
  console.log('--- Running Node B (Parallel) ---');
  return { nlist: ['Data B'] };
}

function nodeC(state: State) {
  console.log('--- Running Node C (Parallel) ---');
  return { nlist: ['Data C'] };
}

function nodeD(state: State) {
  console.log('--- Running Node D (Fan-in/Aggregation) ---');
  return { nlist: ['Ended at D'] };
}

// 3. Build the Graph with branching and merging
export const parallelGraph = new StateGraph(StateDefinition)
  .addNode('a', nodeA)
  .addNode('b', nodeB)
  .addNode('c', nodeC)
  .addNode('d', nodeD)

  .addEdge(START, 'a')

  // FAN-OUT: Branch from 'a' to 'b' and 'c' (Parallel)
  .addEdge('a', 'b')
  .addEdge('a', 'c')

  // FAN-IN: Merge from 'b' and 'c' into 'd'
  .addEdge('b', 'd')
  .addEdge('c', 'd')

  .addEdge('d', END)
  .compile();

// 4. Execution
async function run() {
  // Generate diagram
  await generateImage(
    parallelGraph,
    'graph-ignore/scripts-04-parallelism-reducers.jpg',
  );

  console.log('\n=== L4: Parallelism and Reducers Example ===\n');

  const result = await parallelGraph.invoke({ nlist: [] });

  console.log('\n--- FINAL RESULT ---');
  console.log(result.nlist);
  // The result will contain full data from both B and C thanks to the Reducer:
  // ["Started at A", "Data B", "Data C", "Ended at D"]
}

run();

IV. Giải thích cơ chế

  1. left.concat(right): Đây là "linh hồn" của bài học. Thay vì ghi đè mảng cũ bằng mảng mới, Reducer sẽ nối chúng lại. Nhờ đó, dù Node B hay Node C xong trước, dữ liệu cuối cùng vẫn đầy đủ.
  2. Đồng bộ hóa Superstep: Bạn không cần viết code để "đợi" (await). LangGraph tự biết Node d có hai đường dẫn đi vào, nên nó sẽ treo Node d cho đến khi nhận được tín hiệu hoàn thành từ cả hai nhánh bc.
  3. Hiệu suất: Nếu mỗi tác vụ mất 1 giây, luồng tuần tự mất 4 giây. Luồng song song này chỉ mất khoảng 3 giây vì tác vụ B và C chạy cùng lúc.

Sơ đồ hoạt động:

Sơ đồ Parallel Flow

V. Tổng kết

  • Parallel Execution giúp giảm độ trễ đáng kể cho các tác vụ độc lập.
  • Reducers là bắt buộc khi xử lý song song để tránh mất dữ liệu khi nhiều Node cùng ghi vào một khóa (key) trong State.
  • Cơ chế Supersteps của LangGraph giúp việc đồng bộ hóa các nhánh song song trở nên tự động và dễ dàng.

Trong bài học tới, chúng ta sẽ học cách làm cho Agent trở nên thông minh hơn bằng cách tự đưa ra quyết định rẽ nhánh dựa trên dữ liệu thực tế với Conditional Edges!

👉 Bài tiếp theo: 5. Conditional Edges & Router Logic