云霞资讯网

LangGraph RemoteGraph:本地图与远程图的组合机制解析

把 AI agent 的逻辑拆分到多个独立运行的服务中,听起来复杂做起来也确实容易乱。LangGraph 的 Remot

把 AI agent 的逻辑拆分到多个独立运行的服务中,听起来复杂做起来也确实容易乱。LangGraph 的 RemoteGraph 特性算是一个干净的方案:本地编排器负责流程控制,远程图服务器承担具体计算,状态管理和控制流的职责边界清晰。

本文要构建的项目是一个循环数学引擎:本地图编排一个远程图:随机选择数学运算和生成随机数。编排器会以两种方式实现——顺序执行和并行执行——以便对比两者的取舍,方便根据场景选择合适的模式。循环持续运行,直到远程图返回 end。

架构概览

一个 math_service 远程图负责两种操作,本地 math_orchestrator 在每次迭代中调用它两次,每种操作各一次。下面分别是顺序版本和并行版本的编排器结构:

顺序流——远程图被依次调用两次:

并行流——远程图用 fan-out/fan-in 模式同时被调用两次:

math_service 是远程图,接受 action 字段:"pick_operation" 返回一个随机数学运算或 end,"generate_number" 返回一个随机整数。

math_orchestrator 是本地图,接受初始数字后每次迭代调用远程图两次(分别传入不同 action),执行数学运算,operation 为 end 时终止。

环境准备

uv/pyproject.toml 配置如下:

[project]  name = "langgraph-random-math"  version = "0.1.0"  description = "Add your description here"  requires-python = "==3.13"  dependencies = [      "langgraph",      "langgraph-cli",      "langgraph-sdk"  ]

截至本文编写时 pydantic 与 Python 3.14 及更高版本不兼容,所以这里用 3.13。

两个图都在本地运行——远程图跑在 LangGraph 开发服务器上,本地图作为普通 Python 脚本执行。

步骤 1:math_service——远程图

远程图用条件路由在单个图中处理两种操作。传入状态的 action 字段决定路由方向:pick_operation 或 generate_number。

创建远程服务目录结构:

math_service/  ├── auth.py  ├── graph.py  ├── langgraph.json  └── .env

math_service/graph.py

import random  from typing import TypedDict  from langgraph.graph import StateGraph, START, END

class MathServiceState(TypedDict):      action: str                # "pick_operation" or "generate_number"      operation: str             # 结果: "add", "subtract", "multiply", "divide", 或 "end"      number: int                # 结果: 随机整数      manual_input_chance: float # 请求用户输入的概率 (0.0-1.0)      ask_user: bool             # 结果: True = 编排器应提示用户

def route_action(state: MathServiceState) -> str:      """根据 action 字段路由到相应的节点。"""      if state["action"] == "pick_operation":          return "pick_operation"      elif state["action"] == "generate_number":          return "generate_number"      else:          raise ValueError(f"Unknown action: {state['action']}")

def pick_operation(state: MathServiceState) -> dict:      """随机选择一个数学运算或 'end' 来停止循环。"""      operations = ["add", "subtract", "multiply", "divide", "end"]      # 'end' 有 10% 的概率;剩余 90% 在数学运算之间平均分配      weights = [9, 9, 9, 9, 4]      chosen = random.choices(operations, weights=weights, k=1)[0]      return {"operation": chosen}

def generate_number(state: MathServiceState) -> dict:      """      生成随机数或请求手动用户输入。        根据 manual_input_chance 进行掷骰:如果结果低于阈值,      返回 ask_user=True(不生成数字——编排器应提示用户)。      否则,生成并返回一个随机整数。      """      chance = state.get("manual_input_chance", 0.0)        if chance > 0.0 and random.random() < chance:          return {"ask_user": True}        return {"number": random.randint(1, 20), "ask_user": False}

builder = StateGraph(MathServiceState)  builder.add_node("pick_operation", pick_operation)  builder.add_node("generate_number", generate_number)    # 基于 action 字段从 START 进行条件路由  builder.add_conditional_edges(      START,      route_action,      {          "pick_operation": "pick_operation",          "generate_number": "generate_number",      },  )    builder.add_edge("pick_operation", END)  builder.add_edge("generate_number", END)    graph = builder.compile()

math_service/auth.py

import os  from langgraph_sdk import Auth    auth = Auth()    # 在生产环境中,请使用正规的 JWT 验证库。  # 此示例使用简单的 token 查找以保持清晰。  VALID_TOKENS = {      os.environ.get("MATH_SERVICE_TOKEN", "dev-token"): {          "id": "orchestrator",          "name": "Math Orchestrator",      },  }

@auth.authenticate  async def authenticate(authorization: str | None) -> Auth.types.MinimalUserDict:      """验证 Authorization 头中的 Bearer token。"""      if not authorization:          raise Auth.exceptions.HTTPException(              status_code=401, detail="Missing authorization header"          )        try:          scheme, token = authorization.split(" ", 1)      except ValueError:          raise Auth.exceptions.HTTPException(              status_code=401, detail="Invalid authorization format"          )        if scheme.lower() != "bearer" or token not in VALID_TOKENS:          raise Auth.exceptions.HTTPException(              status_code=401, detail="Invalid token"          )        user = VALID_TOKENS[token]      return {          "identity": user["id"],          "is_authenticated": True,      }

@auth.on  async def authorize_all(ctx: Auth.types.AuthContext, value: dict):      """允许已认证用户执行所有操作。        在更复杂的设置中,你可以通过检查 value 载荷来限制      每个调用者可以调用哪些操作(pick_operation vs generate_number)。      """      return None  # None = 允许,不添加额外过滤

这是一个最小认证层:检查 Bearer token 有效性,对已认证的调用者放行所有操作。生产环境中应当用 JWT 验证(PyJWT、Auth0 等)替代 token 查表,并按需增加操作级别的授权。

generate_number 节点内部完成决策——根据 manual_input_chance 掷骰后,要么生成数字,要么置 ask_user=True。编排器检查这个标志并在需要时于本地提示用户。决策逻辑留在服务端,用户交互留在客户端,正是微服务中典型的职责划分方式。

math_service/langgraph.json:

{    "dependencies": ["."],    "graphs": {      "math_service": "./graph.py:graph"    },    "auth": {      "path": "./auth.py:auth"    },    "env": ".env"  }

创建 .env 文件写入服务 token:

MATH_SERVICE_TOKEN=dev-token

在端口 2024 启动服务器:

cd math_service  langgraph dev --port 2024 --no-browser

可以针对运行中的服务器测试两种操作:

from langgraph.pregel.remote import RemoteGraph    # 不带 token — 应该返回 401 失败  try:      bad_service = RemoteGraph("math_service", url="http://localhost:2024")      bad_service.invoke({          "action": "pick_operation", "operation": "", "number": 0,          "manual_input_chance": 0.0, "ask_user": False,      })      print("❌ Should have failed without token!")  except Exception as e:      print(f"✅ Correctly blocked: {e}")    # 带有效 token — 应该成功  service = RemoteGraph(      "math_service",      url="http://localhost:2024",      headers={"Authorization": "Bearer dev-token"},  )    # 测试: 选择一个运算  result = service.invoke({      "action": "pick_operation", "operation": "", "number": 0,      "manual_input_chance": 0.0, "ask_user": False,  })  print(result["operation"])  # 例如 'multiply'    # 测试: 生成一个数字(自动模式)  result = service.invoke({      "action": "generate_number", "operation": "", "number": 0,      "manual_input_chance": 0.0, "ask_user": False,  })  print(result["number"], result["ask_user"])  # 例如 14, False    # 测试: 生成一个数字(始终询问用户)  result = service.invoke({      "action": "generate_number", "operation": "", "number": 0,      "manual_input_chance": 1.0, "ask_user": False,  })  print(result["ask_user"])  # True — 编排器应提示用户

步骤 2:本地编排器图

接下来构建编排器,分顺序和并行两个版本以便对照。两者共享状态定义、远程图连接和节点函数,全部抽取到公共模块 shared.py 中。差异只在图的边如何连接。

目录结构:

math_orchestrator/   ├── shared.py   ├── shared_resilient.py   ├── orchestrator_sequential.py   ├── orchestrator_parallel.py   └── orchestrator_parallel_resilient.py

math_orchestrator/shared.py 公共状态、连接和节点

import os  from typing import TypedDict, Annotated  import operator  from langgraph.pregel.remote import RemoteGraph

# --- 状态定义 ---    class OrchestratorState(TypedDict):      current_number: float      operation: str      random_number: int      history: Annotated[list[str], operator.add]      manual_input_chance: float  # 0.0 = 始终远程, 1.0 = 始终手动

# --- 连接远程图 ---  # 单个远程图处理两种操作。  # 认证 token 从环境变量加载。    math_service = RemoteGraph(      "math_service",      url=os.environ.get("MATH_SERVICE_URL", "http://localhost:2024"),      headers={"Authorization": f"Bearer {os.environ.get('MATH_SERVICE_TOKEN', '')}"},  )

# --- 节点函数 ---    def build_initial_state(current_number: float, manual_input_chance: float) -> dict:      """构建 graph.invoke() 的初始状态字典。"""      if manual_input_chance == 0.0:          mode = "automatic"      elif manual_input_chance == 1.0:          mode = "manual"      else:          mode = f"mixed ({int(manual_input_chance * 100)}% manual)"        return {          "current_number": current_number,          "operation": "",          "random_number": 0,          "history": [f"Starting number: {current_number} (mode: {mode})"],          "manual_input_chance": manual_input_chance,      }

def get_operation(state: OrchestratorState) -> dict:      """使用 action='pick_operation' 调用 math_service。"""      result = math_service.invoke({          "action": "pick_operation",          "operation": "",          "number": 0,          "manual_input_chance": 0.0,          "ask_user": False,      })      op = result["operation"]      print(f"  → Operation: {op}")      return {"operation": op}

def _prompt_user_number(state: OrchestratorState) -> int:      """通过 stdin 提示用户输入一个数字。"""      op = state.get("operation", "?")      current = state.get("current_number", 0)      while True:          raw = input(              f"  Current: {current} | Operation: {op} | Enter a number: "          )          try:              return int(raw)          except ValueError:              print("  Please enter a valid integer.")

def get_random_number(state: OrchestratorState) -> dict:      """      获取数学运算中要使用的下一个数字。        使用 action='generate_number' 调用 math_service,同时传递      manual_input_chance。远程图决定是生成一个数字还是请求      手动输入(通过 ask_user 标志)。      如果 ask_user 为 True,编排器在本地提示用户。      """      chance = state.get("manual_input_chance", 0.0)        result = math_service.invoke({          "action": "generate_number",          "operation": "",          "number": 0,          "manual_input_chance": chance,          "ask_user": False,      })        if result.get("ask_user"):          num = _prompt_user_number(state)          print(f"  → Number: {num} (manual)")          return {"random_number": num}        num = result["number"]      print(f"  → Number: {num}")      return {"random_number": num}

def execute_operation(state: OrchestratorState) -> dict:      """对 current_number 执行数学运算。"""      current = state["current_number"]      op = state["operation"]      num = state["random_number"]        if op == "add":          new_number = current + num          symbol = "+"      elif op == "subtract":          new_number = current - num          symbol = "-"      elif op == "multiply":          new_number = current * num          symbol = "×"      elif op == "divide":          if num == 0:              num = 1          new_number = round(current / num, 2)          symbol = "÷"      else:          new_number = current          symbol = "?"        entry = f"  {current} {symbol} {num} = {new_number}"      print(entry)        return {          "current_number": new_number,          "history": [entry],      }

get_operation 和 get_random_number 调用的是同一个 math_service,只是传入不同的 action 值。编排器视角下,远程图是一个支持多种操作的单一端点。

下面看两种不同的图连接方式。每个编排器文件都很简短——业务逻辑全在 shared.py 里,编排器文件只关心拓扑结构。

顺序执行

math_orchestrator/orchestrator_sequential.py

顺序版本先调用 get_operation,拿到 end 就直接终止,无需再去取随机数。非 end 的情况下继续调用 get_random_number 和 execute_operation,然后循环回来。

import argparse   from langgraph.graph import StateGraph, START, END   from shared import (      OrchestratorState,      build_initial_state,      get_operation,      get_random_number,      execute_operation,   )

# --- 路由逻辑 ---    def should_continue(state: OrchestratorState) -> str:      """获取操作后,决定:继续还是停止。"""      if state.get("operation") == "end":          return "finish"      return "continue"

# --- 构建图 ---    builder = StateGraph(OrchestratorState)    # 添加节点  builder.add_node("get_operation", get_operation)  builder.add_node("get_random_number", get_random_number)  builder.add_node("execute_operation", execute_operation)    # 定义边 — 顺序链  builder.add_edge(START, "get_operation")    # 获取操作后,决定:继续还是结束?  builder.add_conditional_edges(      "get_operation",      should_continue,      {          "continue": "get_random_number",          "finish": END,      },  )    builder.add_edge("get_random_number", "execute_operation")    # 执行后,循环回到 get_operation  builder.add_edge("execute_operation", "get_operation")    # 编译  graph = builder.compile()

# --- 运行 ---    if __name__ == "__main__":      parser = argparse.ArgumentParser()      parser.add_argument(          "--start-number", type=float, default=None,          help="Initial number to start with (prompts if not provided)",      )      parser.add_argument(          "--manual-input", action="store_true",          help="Always prompt user for numbers (shorthand for --manual-input-chance 1.0)",      )      parser.add_argument(          "--manual-input-chance", type=float, default=0.0,          help="Probability (0.0-1.0) of prompting user for each number (default: 0.0)",      )      args = parser.parse_args()        start = args.start_number      if start is None:          start = float(input("Enter starting number: "))        chance = 1.0 if args.manual_input else args.manual_input_chance        result = graph.invoke(build_initial_state(          current_number=start,          manual_input_chance=chance,      ))        print("\n🧮 Math Engine Complete! (Sequential)\n")      print("Computation History:")      for entry in result["history"]:          print(entry)      print(f"\nFinal Result: {result['current_number']}")

顺序流的好处是逻辑直白,而且最后一次迭代拿到 end 时可以直接跳过取随机数的调用,省掉一次无用的 HTTP 请求。代价是每轮迭代的两次远程调用必须串行,一个等另一个。

并行执行

math_orchestrator/orchestrator_parallel.py

并行版本利用 LangGraph 的 fan-out/fan-in 模式。get_operation 和 get_random_number 在同一个 superstep 中同时执行,两者都完成后 execute_operation 再决定是继续 fan-out 还是终止。

import argparse  from langgraph.graph import StateGraph, START, END  from shared import (      OrchestratorState,      build_initial_state,      get_operation,      get_random_number,      execute_operation,  )

# --- 路由逻辑 ---    def should_continue(state: OrchestratorState) -> list[str] | str:      """      决定是继续循环还是停止。      返回节点名称列表用于 fan-out(并行),      或返回 END 以终止。      """      if state.get("operation") == "end":          return END      # Fan-out: 并行路由到两个节点      return ["get_operation", "get_random_number"]

# --- 构建图 ---    builder = StateGraph(OrchestratorState)    # 添加节点  builder.add_node("get_operation", get_operation)  builder.add_node("get_random_number", get_random_number)  builder.add_node("execute_operation", execute_operation)    # 定义边  # Fan-out: START 并行发送到两个节点  builder.add_edge(START, "get_operation")  builder.add_edge(START, "get_random_number")    # Fan-in: 两个节点都必须完成后 execute_operation 才能运行  builder.add_edge("get_operation", "execute_operation")  builder.add_edge("get_random_number", "execute_operation")    # 执行后,决定:再次 fan-out,还是结束  builder.add_conditional_edges(      "execute_operation",      should_continue,      ["get_operation", "get_random_number", END],  )    # 编译  graph = builder.compile()

# --- 运行 ---    if __name__ == "__main__":      parser = argparse.ArgumentParser()      parser.add_argument(          "--start-number", type=float, default=None,          help="Initial number to start with (prompts if not provided)",      )      parser.add_argument(          "--manual-input", action="store_true",          help="Always prompt user for numbers (shorthand for --manual-input-chance 1.0)",      )      parser.add_argument(          "--manual-input-chance", type=float, default=0.0,          help="Probability (0.0-1.0) of prompting user for each number (default: 0.0)",      )      args = parser.parse_args()        start = args.start_number      if start is None:          start = float(input("Enter starting number: "))        chance = 1.0 if args.manual_input else args.manual_input_chance        result = graph.invoke(build_initial_state(          current_number=start,          manual_input_chance=chance,      ))        print("\n🧮 Math Engine Complete! (Parallel)\n")      print("Computation History:")      for entry in result["history"]:          print(entry)      print(f"\nFinal Result: {result['current_number']}")

运行结果

打开两个终端窗口:

终端 1——启动远程 math_service(如果此前没启动的话):

cd math_service  langgraph dev --port 2024 --no-browser

终端 2——运行编排器(任选其一):

cd math_orchestrator    # 选项 A: 顺序 — 提示输入起始数字   python orchestrator_sequential.py    # 选项 B: 并行 — 通过参数指定起始数字   python orchestrator_parallel.py --start-number 100    # 任何选项配合手动输入模式 — 每次提示你输入数字:   python orchestrator_sequential.py --start-number 100 --manual-input    # 混合模式 — 每次迭代有 50% 的概率提示你:   python orchestrator_sequential.py --start-number 100 --manual-input-chance 0.5

一次典型运行的输出如下:

→ Operation: subtract    → Number: 10    100.0 - 10 = 90.0    → Operation: subtract    Current: 90.0 | Operation: subtract | Enter a number: 1    → Number: 1 (manual)    90.0 - 1 = 89.0    → Operation: add    Current: 89.0 | Operation: add | Enter a number: 1    → Number: 1 (manual)    89.0 + 1 = 90.0    → Operation: divide    → Number: 17    90.0 ÷ 17 = 5.29    → Operation: add    → Number: 5    5.29 + 5 = 10.29    → Operation: divide    Current: 10.29 | Operation: divide | Enter a number: 1    → Number: 1 (manual)    10.29 ÷ 1 = 10.29    → Operation: subtract    Current: 10.29 | Operation: subtract | Enter a number: 1    → Number: 1 (manual)    10.29 - 1 = 9.29    → Operation: multiply    Current: 9.29 | Operation: multiply | Enter a number: 1    → Number: 1 (manual)    9.29 × 1 = 9.29    → Operation: multiply    Current: 9.29 | Operation: multiply | Enter a number: 2    → Number: 2 (manual)    9.29 × 2 = 18.58    → Operation: multiply    → Number: 10    18.58 × 10 = 185.79999999999998    → Operation: end    🧮 Math Engine Complete! (Sequential)    Computation History:  Starting number: 100.0 (mode: mixed (50% manual))    100.0 - 10 = 90.0    90.0 - 1 = 89.0    89.0 + 1 = 90.0    90.0 ÷ 17 = 5.29    5.29 + 5 = 10.29    10.29 ÷ 1 = 10.29    10.29 - 1 = 9.29    9.29 × 1 = 9.29    9.29 × 2 = 18.58    18.58 × 10 = 185.79999999999998    Final Result: 185.79999999999998

运算和数字都由远程服务随机生成,每次运行的结果不同。

从控制台到生产环境:使用 interrupt

input() 适合本地脚本调试。到了生产环境——编排器可能藏在 REST API、Web UI 或聊天界面后面——没有控制台可用。LangGraph 对此有一个一等原语:interrupt。

机制不复杂:节点调用 interrupt() 时 LangGraph 暂停整个图,将完整状态写入 checkpoint,然后把控制权交还给调用方。调用方(API 服务、Web 应用等)拿到暂停信号后向用户展示提示,收到响应后用 Command(resume=...) 恢复执行。图从 interrupt() 调用处精确恢复,哪怕过了几个小时、换了一台机器也没问题。

以下是用 interrupt 替换 input() 后 get_random_number 的写法:

math_orchestrator/shared_interrupt.py(相关摘录)

import sqlite3  from langgraph.types import interrupt, Command  from langgraph.checkpoint.sqlite import SqliteSaver  from langgraph.pregel.remote import RemoteGraph

math_service = RemoteGraph(      "math_service",      url="http://localhost:2024",  )

def get_random_number(state):      """      获取下一个数字 — 通过远程图或人工中断。        当远程图返回 ask_user=True 时,我们不调用 input(),      而是调用 interrupt(),它会暂停整个图并向调用应用程序      呈现一个提示。      """      chance = state.get("manual_input_chance", 0.0)        result = math_service.invoke({          "action": "generate_number",          "operation": "",          "number": 0,          "manual_input_chance": chance,          "ask_user": False,      })        if result.get("ask_user"):          # 暂停图 — 调用者接收此提示          num = interrupt({              "prompt": "Enter a number",              "current_number": state.get("current_number"),              "operation": state.get("operation"),          })          print(f"  → Number: {num} (manual)")          return {"random_number": int(num)}        num = result["number"]      print(f"  → Number: {num}")      return {"random_number": num}

编译图时必须附带 checkpointer(状态持久化),调用时必须指定 thread_id(标识具体会话):

# 带 checkpointer 编译  checkpointer = SqliteSaver(sqlite3.connect("math_engine.db"))  graph = builder.compile(checkpointer=checkpointer)    # 启动新线程  config = {"configurable": {"thread_id": "session-42"}}    # 首次调用 — 运行直到 interrupt() 被调用  result = graph.invoke(      build_initial_state(current_number=100, manual_input_chance=1.0),      config=config,  )    # 图现在已暂停。result["__interrupt__"] 包含提示:  # [Interrupt(value={"prompt": "Enter a number", "current_number": 100, ...})]    # ... 时间流逝,用户通过 Web UI、API 等提供输入 ...    # 使用用户的值恢复  result = graph.invoke(Command(resume=42), config=config)    # 图从 interrupt() 被调用的地方继续执行,  # num = 42,并运行直到下一个 interrupt 或 END。

几个要点。interrupt() 和 input() 目的相同,区别在于前者走 HTTP 而非 stdin——传入一个 payload(提示、上下文等),调用方通过 Command(resume=...) 回传用户输入。checkpointer 负责持久化图状态,LangGraph 支持 SQLite、Postgres 等多种后端。thread_id 用来标识会话,多个用户可以各自持有独立的暂停/运行中的图实例。远程图和图的连接方式无需任何改动,变化只发生在节点函数和调用模式上。

保护线程:认证与授权

如果 thread ID 是保护暂停会话的唯一手段,任何猜中或截获了 thread ID 的人都能恢复别人的图、注入自己的值。LangGraph Platform 对此有内置的认证与授权层。

认证系统分两步走。@auth.authenticate 处理程序作为中间件在每个请求上运行,验证调用者的凭据(JWT token、API 密钥、OAuth2 等)并返回用户身份;@auth.on 处理程序执行资源级访问控制,给每个线程打上所有者标记,过滤访问权限,确保用户只能看到和恢复自己的线程。

线程级安全的实现如下:

from langgraph_sdk import Auth    auth = Auth()    @auth.authenticate  async def authenticate(authorization: str) -> Auth.types.MinimalUserDict:      """验证 Bearer token 并返回用户信息。"""      token = authorization.split(" ", 1)[1]      user = await verify_jwt(token)  # 你的 JWT 验证逻辑      return {          "identity": user["sub"],          "is_authenticated": True,      }    @auth.on.threads.create  async def on_thread_create(      ctx: Auth.types.AuthContext,      value: Auth.types.on.threads.create.value,  ):      """为每个新线程标记创建者的身份。        `value` 是线程创建载荷 — 一个包含来自 API 请求的字段的字典:      thread_id, metadata, if_exists 等。      我们修改 value["metadata"] 以在存储之前标记所有者。      返回值是 LangGraph 写入线程 metadata 的元数据过滤器。      """      value.setdefault("metadata", {})["owner"] = ctx.user.identity      return {"owner": ctx.user.identity}    @auth.on.threads.read  async def on_thread_read(      ctx: Auth.types.AuthContext,      value: Auth.types.on.threads.read.value,  ):      """过滤线程,使用户只能看到自己的。        `value` 是读取请求载荷 — 一个包含 thread_id 和来自      API 请求的任何 metadata 的字典。        返回值不是检查 — 而是查询过滤器。      LangGraph 在存储层应用它:只有 metadata.owner 与      ctx.user.identity 匹配的线程才会被返回。      其他用户拥有的线程是不可见的,而不仅仅是被阻止。      """      return {"owner": ctx.user.identity}    @auth.on.threads.create_run  async def on_thread_resume(      ctx: Auth.types.AuthContext,      value: Auth.types.on.threads.create_run.value,  ):      """过滤用户可以在哪些线程上恢复运行。        `value` 是运行创建载荷 — 一个包含 thread_id, assistant_id,      input, command, metadata, config 等的字典。        相同的过滤器机制:LangGraph 仅在线程的 metadata.owner      与返回的过滤器匹配时才允许运行。如果用户尝试恢复      另一个用户的线程,平台会拒绝请求,因为该线程      未通过过滤器。      """      metadata = value.setdefault("metadata", {})      metadata["owner"] = ctx.user.identity      return {"owner": ctx.user.identity}

在 langgraph.json 中注册:

{    "auth": {      "path": "src/security/auth.py:auth"    }  }

配置完成后,即使攻击者拿到了其他用户的 thread ID 也无法读取线程状态或恢复运行——授权处理程序会因为 owner metadata 不匹配而拒绝请求。过滤发生在平台层,不在图代码中,无法通过构造 API 请求绕过。

生产部署时 LangGraph Platform 可以对接 Auth0、Supabase 以及任何 OAuth2/JWT 认证体系。记住一点:thread ID 是标识符,不是密钥——安全保障来自认证层对访问权限的把控。

前面基于控制台 input() 的版本已经是这一模式的可运行原型。迁移到 interrupt 只需改动节点函数和调用模式,架构其余部分——远程图、fan-out/fan-in、错误处理——全部保持原样。

RemoteGraph 的工作原理

langgraph.pregel.remote 中的 RemoteGraph 类是整个组合能力的底层支撑。它实现的接口和本地编译的图完全一致,可以 .invoke()、.stream(),也可以直接嵌入为另一个图的子图节点。通信通过 HTTP对接 LangGraph Server API。

from langgraph.pregel.remote import RemoteGraph    remote = RemoteGraph(      "math_service",          # 来自 langgraph.json 的 assistant ID      url="http://localhost:2024",  )    # 像使用普通图一样使用它 — action 字段控制行为  result = remote.invoke({"action": "pick_operation", "operation": "", "number": 0})

RemoteGraph 遵循 Runnable 接口,可以直接作为节点添加到另一个图中:

builder.add_node("my_remote_node", remote_graph)

编排器图不需要了解远程图的内部实现细节——它可以是一个简单状态机、一个 LLM 驱动的 agent,或者任何介于两者之间的东西。而通过条件路由,一个远程图部署就能承载多种操作。

第二个调用依赖于第一个调用的结果时选顺序执行,或者想在最后一轮 end 时省掉无用的远程调用也该选顺序。两个调用互相独立、想缩短总耗时就选并行——在生产环境中远程图调用可能涉及 LLM 推理或数据库查询,并行执行差不多能把每轮延迟砍掉一半。

错误处理

并行执行带来一个自然的问题:远程图调用失败了怎么办?math_service 临时挂掉\网络请求超时,这些情况都需要考虑。LangGraph 的处理提供了多种应对策略。

默认行为:原子 superstep

LangGraph 中并行节点在一个 superstep 里共同执行。superstep 中任何一个节点抛出异常,整个 superstep 原子性失败,不会有部分状态写入。假如 get_random_number 成功、get_operation 失败,两边的结果都不会写入状态,避免了有随机数却没运算符这种不一致。

配了 checkpointer 的情况下 LangGraph 会在内部保存成功节点的结果。恢复执行时只有失败分支重试,成功分支的工作不必重复。

策略 1:RetryPolicy(图原生重试)

最干净的做法是对容易出错的节点附加 RetryPolicy。LangGraph 接管重试循环,支持配置尝试次数、退避策略和抖动。只有失败分支重试,成功的并行节点不会重新执行。

重试全部耗尽后异常向上传播,图调用失败。对网络超时、5xx 错误这类瞬态故障,这是恰当的处理方式。

策略 2:节点内 Try/Except(降级处理)

需要图在远程服务不可用时仍然继续运行的场景下,在节点内部捕获异常并返回降级值即可。操作调用失败则引擎停止循环;数字生成调用失败则用安全默认值代替。

策略 3:两者结合(生产环境推荐)

生产中通常既要重试也要降级。RetryPolicy 透明处理瞬态故障,重试全部耗尽后异常才落到节点内部的 try/except 块,由它提供兜底逻辑。

以下是三种策略的完整可运行示例。

总结

RemoteGraph 让分布式 agent 架构的组合变得相当简洁——顺序还是并行的连接方式随意切换,RetryPolicy 加上节点内降级逻辑构成两层容错。单个远程图通过条件路由就能承载多种操作,基础设施不必铺得很大,编排逻辑也能保持清晰。

这个数学引擎作为 demo 虽然简单,展示的模式却可以直接迁移到正式系统——微服务化的 AI 编排,每个图作为一个独立部署的 agent 逻辑单元,根据延迟和成本需求选择合适的执行策略。

本文完整代码:

https://avoid.overfit.cn/post/d9102c5bf109459494a5bf2b99560b18

by Alexander Machekhin