把 AI agent 的逻辑拆分到多个独立运行的服务中,听起来复杂做起来也确实容易乱。LangGraph 的 RemoteGraph 特性算是一个干净的方案:本地编排器负责流程控制,远程图服务器承担具体计算,状态管理和控制流的职责边界清晰。
本文要构建的项目是一个循环数学引擎:本地图编排一个远程图:随机选择数学运算和生成随机数。编排器会以两种方式实现——顺序执行和并行执行——以便对比两者的取舍,方便根据场景选择合适的模式。循环持续运行,直到远程图返回 end。
架构概览一个 math_service 远程图负责两种操作,本地 math_orchestrator 在每次迭代中调用它两次,每种操作各一次。下面分别是顺序版本和并行版本的编排器结构:
顺序流——远程图被依次调用两次:

并行流——远程图用 fan-out/fan-in 模式同时被调用两次:

math_service 是远程图,接受 action 字段:"pick_operation" 返回一个随机数学运算或 end,"generate_number" 返回一个随机整数。
math_orchestrator 是本地图,接受初始数字后每次迭代调用远程图两次(分别传入不同 action),执行数学运算,operation 为 end 时终止。
环境准备uv/pyproject.toml 配置如下:
[project] name = "langgraph-random-math" version = "0.1.0" description = "Add your description here" requires-python = "==3.13" dependencies = [ "langgraph", "langgraph-cli", "langgraph-sdk" ]
截至本文编写时 pydantic 与 Python 3.14 及更高版本不兼容,所以这里用 3.13。
两个图都在本地运行——远程图跑在 LangGraph 开发服务器上,本地图作为普通 Python 脚本执行。
步骤 1:math_service——远程图远程图用条件路由在单个图中处理两种操作。传入状态的 action 字段决定路由方向:pick_operation 或 generate_number。
创建远程服务目录结构:
math_service/ ├── auth.py ├── graph.py ├── langgraph.json └── .env
math_service/graph.py
import random from typing import TypedDict from langgraph.graph import StateGraph, START, END
class MathServiceState(TypedDict): action: str # "pick_operation" or "generate_number" operation: str # 结果: "add", "subtract", "multiply", "divide", 或 "end" number: int # 结果: 随机整数 manual_input_chance: float # 请求用户输入的概率 (0.0-1.0) ask_user: bool # 结果: True = 编排器应提示用户
def route_action(state: MathServiceState) -> str: """根据 action 字段路由到相应的节点。""" if state["action"] == "pick_operation": return "pick_operation" elif state["action"] == "generate_number": return "generate_number" else: raise ValueError(f"Unknown action: {state['action']}")
def pick_operation(state: MathServiceState) -> dict: """随机选择一个数学运算或 'end' 来停止循环。""" operations = ["add", "subtract", "multiply", "divide", "end"] # 'end' 有 10% 的概率;剩余 90% 在数学运算之间平均分配 weights = [9, 9, 9, 9, 4] chosen = random.choices(operations, weights=weights, k=1)[0] return {"operation": chosen}
def generate_number(state: MathServiceState) -> dict: """ 生成随机数或请求手动用户输入。 根据 manual_input_chance 进行掷骰:如果结果低于阈值, 返回 ask_user=True(不生成数字——编排器应提示用户)。 否则,生成并返回一个随机整数。 """ chance = state.get("manual_input_chance", 0.0) if chance > 0.0 and random.random() < chance: return {"ask_user": True} return {"number": random.randint(1, 20), "ask_user": False}
builder = StateGraph(MathServiceState) builder.add_node("pick_operation", pick_operation) builder.add_node("generate_number", generate_number) # 基于 action 字段从 START 进行条件路由 builder.add_conditional_edges( START, route_action, { "pick_operation": "pick_operation", "generate_number": "generate_number", }, ) builder.add_edge("pick_operation", END) builder.add_edge("generate_number", END) graph = builder.compile()
math_service/auth.py
import os from langgraph_sdk import Auth auth = Auth() # 在生产环境中,请使用正规的 JWT 验证库。 # 此示例使用简单的 token 查找以保持清晰。 VALID_TOKENS = { os.environ.get("MATH_SERVICE_TOKEN", "dev-token"): { "id": "orchestrator", "name": "Math Orchestrator", }, }
@auth.authenticate async def authenticate(authorization: str | None) -> Auth.types.MinimalUserDict: """验证 Authorization 头中的 Bearer token。""" if not authorization: raise Auth.exceptions.HTTPException( status_code=401, detail="Missing authorization header" ) try: scheme, token = authorization.split(" ", 1) except ValueError: raise Auth.exceptions.HTTPException( status_code=401, detail="Invalid authorization format" ) if scheme.lower() != "bearer" or token not in VALID_TOKENS: raise Auth.exceptions.HTTPException( status_code=401, detail="Invalid token" ) user = VALID_TOKENS[token] return { "identity": user["id"], "is_authenticated": True, }
@auth.on async def authorize_all(ctx: Auth.types.AuthContext, value: dict): """允许已认证用户执行所有操作。 在更复杂的设置中,你可以通过检查 value 载荷来限制 每个调用者可以调用哪些操作(pick_operation vs generate_number)。 """ return None # None = 允许,不添加额外过滤
这是一个最小认证层:检查 Bearer token 有效性,对已认证的调用者放行所有操作。生产环境中应当用 JWT 验证(PyJWT、Auth0 等)替代 token 查表,并按需增加操作级别的授权。
generate_number 节点内部完成决策——根据 manual_input_chance 掷骰后,要么生成数字,要么置 ask_user=True。编排器检查这个标志并在需要时于本地提示用户。决策逻辑留在服务端,用户交互留在客户端,正是微服务中典型的职责划分方式。
math_service/langgraph.json:
{ "dependencies": ["."], "graphs": { "math_service": "./graph.py:graph" }, "auth": { "path": "./auth.py:auth" }, "env": ".env" }
创建 .env 文件写入服务 token:
MATH_SERVICE_TOKEN=dev-token
在端口 2024 启动服务器:
cd math_service langgraph dev --port 2024 --no-browser
可以针对运行中的服务器测试两种操作:
from langgraph.pregel.remote import RemoteGraph # 不带 token — 应该返回 401 失败 try: bad_service = RemoteGraph("math_service", url="http://localhost:2024") bad_service.invoke({ "action": "pick_operation", "operation": "", "number": 0, "manual_input_chance": 0.0, "ask_user": False, }) print("❌ Should have failed without token!") except Exception as e: print(f"✅ Correctly blocked: {e}") # 带有效 token — 应该成功 service = RemoteGraph( "math_service", url="http://localhost:2024", headers={"Authorization": "Bearer dev-token"}, ) # 测试: 选择一个运算 result = service.invoke({ "action": "pick_operation", "operation": "", "number": 0, "manual_input_chance": 0.0, "ask_user": False, }) print(result["operation"]) # 例如 'multiply' # 测试: 生成一个数字(自动模式) result = service.invoke({ "action": "generate_number", "operation": "", "number": 0, "manual_input_chance": 0.0, "ask_user": False, }) print(result["number"], result["ask_user"]) # 例如 14, False # 测试: 生成一个数字(始终询问用户) result = service.invoke({ "action": "generate_number", "operation": "", "number": 0, "manual_input_chance": 1.0, "ask_user": False, }) print(result["ask_user"]) # True — 编排器应提示用户
步骤 2:本地编排器图接下来构建编排器,分顺序和并行两个版本以便对照。两者共享状态定义、远程图连接和节点函数,全部抽取到公共模块 shared.py 中。差异只在图的边如何连接。
目录结构:
math_orchestrator/ ├── shared.py ├── shared_resilient.py ├── orchestrator_sequential.py ├── orchestrator_parallel.py └── orchestrator_parallel_resilient.py
math_orchestrator/shared.py 公共状态、连接和节点
import os from typing import TypedDict, Annotated import operator from langgraph.pregel.remote import RemoteGraph
# --- 状态定义 --- class OrchestratorState(TypedDict): current_number: float operation: str random_number: int history: Annotated[list[str], operator.add] manual_input_chance: float # 0.0 = 始终远程, 1.0 = 始终手动
# --- 连接远程图 --- # 单个远程图处理两种操作。 # 认证 token 从环境变量加载。 math_service = RemoteGraph( "math_service", url=os.environ.get("MATH_SERVICE_URL", "http://localhost:2024"), headers={"Authorization": f"Bearer {os.environ.get('MATH_SERVICE_TOKEN', '')}"}, )
# --- 节点函数 --- def build_initial_state(current_number: float, manual_input_chance: float) -> dict: """构建 graph.invoke() 的初始状态字典。""" if manual_input_chance == 0.0: mode = "automatic" elif manual_input_chance == 1.0: mode = "manual" else: mode = f"mixed ({int(manual_input_chance * 100)}% manual)" return { "current_number": current_number, "operation": "", "random_number": 0, "history": [f"Starting number: {current_number} (mode: {mode})"], "manual_input_chance": manual_input_chance, }
def get_operation(state: OrchestratorState) -> dict: """使用 action='pick_operation' 调用 math_service。""" result = math_service.invoke({ "action": "pick_operation", "operation": "", "number": 0, "manual_input_chance": 0.0, "ask_user": False, }) op = result["operation"] print(f" → Operation: {op}") return {"operation": op}
def _prompt_user_number(state: OrchestratorState) -> int: """通过 stdin 提示用户输入一个数字。""" op = state.get("operation", "?") current = state.get("current_number", 0) while True: raw = input( f" Current: {current} | Operation: {op} | Enter a number: " ) try: return int(raw) except ValueError: print(" Please enter a valid integer.")
def get_random_number(state: OrchestratorState) -> dict: """ 获取数学运算中要使用的下一个数字。 使用 action='generate_number' 调用 math_service,同时传递 manual_input_chance。远程图决定是生成一个数字还是请求 手动输入(通过 ask_user 标志)。 如果 ask_user 为 True,编排器在本地提示用户。 """ chance = state.get("manual_input_chance", 0.0) result = math_service.invoke({ "action": "generate_number", "operation": "", "number": 0, "manual_input_chance": chance, "ask_user": False, }) if result.get("ask_user"): num = _prompt_user_number(state) print(f" → Number: {num} (manual)") return {"random_number": num} num = result["number"] print(f" → Number: {num}") return {"random_number": num}
def execute_operation(state: OrchestratorState) -> dict: """对 current_number 执行数学运算。""" current = state["current_number"] op = state["operation"] num = state["random_number"] if op == "add": new_number = current + num symbol = "+" elif op == "subtract": new_number = current - num symbol = "-" elif op == "multiply": new_number = current * num symbol = "×" elif op == "divide": if num == 0: num = 1 new_number = round(current / num, 2) symbol = "÷" else: new_number = current symbol = "?" entry = f" {current} {symbol} {num} = {new_number}" print(entry) return { "current_number": new_number, "history": [entry], }
get_operation 和 get_random_number 调用的是同一个 math_service,只是传入不同的 action 值。编排器视角下,远程图是一个支持多种操作的单一端点。
下面看两种不同的图连接方式。每个编排器文件都很简短——业务逻辑全在 shared.py 里,编排器文件只关心拓扑结构。
顺序执行math_orchestrator/orchestrator_sequential.py
顺序版本先调用 get_operation,拿到 end 就直接终止,无需再去取随机数。非 end 的情况下继续调用 get_random_number 和 execute_operation,然后循环回来。
import argparse from langgraph.graph import StateGraph, START, END from shared import ( OrchestratorState, build_initial_state, get_operation, get_random_number, execute_operation, )
# --- 路由逻辑 --- def should_continue(state: OrchestratorState) -> str: """获取操作后,决定:继续还是停止。""" if state.get("operation") == "end": return "finish" return "continue"
# --- 构建图 --- builder = StateGraph(OrchestratorState) # 添加节点 builder.add_node("get_operation", get_operation) builder.add_node("get_random_number", get_random_number) builder.add_node("execute_operation", execute_operation) # 定义边 — 顺序链 builder.add_edge(START, "get_operation") # 获取操作后,决定:继续还是结束? builder.add_conditional_edges( "get_operation", should_continue, { "continue": "get_random_number", "finish": END, }, ) builder.add_edge("get_random_number", "execute_operation") # 执行后,循环回到 get_operation builder.add_edge("execute_operation", "get_operation") # 编译 graph = builder.compile()
# --- 运行 --- if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--start-number", type=float, default=None, help="Initial number to start with (prompts if not provided)", ) parser.add_argument( "--manual-input", action="store_true", help="Always prompt user for numbers (shorthand for --manual-input-chance 1.0)", ) parser.add_argument( "--manual-input-chance", type=float, default=0.0, help="Probability (0.0-1.0) of prompting user for each number (default: 0.0)", ) args = parser.parse_args() start = args.start_number if start is None: start = float(input("Enter starting number: ")) chance = 1.0 if args.manual_input else args.manual_input_chance result = graph.invoke(build_initial_state( current_number=start, manual_input_chance=chance, )) print("\n🧮 Math Engine Complete! (Sequential)\n") print("Computation History:") for entry in result["history"]: print(entry) print(f"\nFinal Result: {result['current_number']}")
顺序流的好处是逻辑直白,而且最后一次迭代拿到 end 时可以直接跳过取随机数的调用,省掉一次无用的 HTTP 请求。代价是每轮迭代的两次远程调用必须串行,一个等另一个。
并行执行math_orchestrator/orchestrator_parallel.py
并行版本利用 LangGraph 的 fan-out/fan-in 模式。get_operation 和 get_random_number 在同一个 superstep 中同时执行,两者都完成后 execute_operation 再决定是继续 fan-out 还是终止。
import argparse from langgraph.graph import StateGraph, START, END from shared import ( OrchestratorState, build_initial_state, get_operation, get_random_number, execute_operation, )
# --- 路由逻辑 --- def should_continue(state: OrchestratorState) -> list[str] | str: """ 决定是继续循环还是停止。 返回节点名称列表用于 fan-out(并行), 或返回 END 以终止。 """ if state.get("operation") == "end": return END # Fan-out: 并行路由到两个节点 return ["get_operation", "get_random_number"]
# --- 构建图 --- builder = StateGraph(OrchestratorState) # 添加节点 builder.add_node("get_operation", get_operation) builder.add_node("get_random_number", get_random_number) builder.add_node("execute_operation", execute_operation) # 定义边 # Fan-out: START 并行发送到两个节点 builder.add_edge(START, "get_operation") builder.add_edge(START, "get_random_number") # Fan-in: 两个节点都必须完成后 execute_operation 才能运行 builder.add_edge("get_operation", "execute_operation") builder.add_edge("get_random_number", "execute_operation") # 执行后,决定:再次 fan-out,还是结束 builder.add_conditional_edges( "execute_operation", should_continue, ["get_operation", "get_random_number", END], ) # 编译 graph = builder.compile()
# --- 运行 --- if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--start-number", type=float, default=None, help="Initial number to start with (prompts if not provided)", ) parser.add_argument( "--manual-input", action="store_true", help="Always prompt user for numbers (shorthand for --manual-input-chance 1.0)", ) parser.add_argument( "--manual-input-chance", type=float, default=0.0, help="Probability (0.0-1.0) of prompting user for each number (default: 0.0)", ) args = parser.parse_args() start = args.start_number if start is None: start = float(input("Enter starting number: ")) chance = 1.0 if args.manual_input else args.manual_input_chance result = graph.invoke(build_initial_state( current_number=start, manual_input_chance=chance, )) print("\n🧮 Math Engine Complete! (Parallel)\n") print("Computation History:") for entry in result["history"]: print(entry) print(f"\nFinal Result: {result['current_number']}")
运行结果打开两个终端窗口:
终端 1——启动远程 math_service(如果此前没启动的话):
cd math_service langgraph dev --port 2024 --no-browser
终端 2——运行编排器(任选其一):
cd math_orchestrator # 选项 A: 顺序 — 提示输入起始数字 python orchestrator_sequential.py # 选项 B: 并行 — 通过参数指定起始数字 python orchestrator_parallel.py --start-number 100 # 任何选项配合手动输入模式 — 每次提示你输入数字: python orchestrator_sequential.py --start-number 100 --manual-input # 混合模式 — 每次迭代有 50% 的概率提示你: python orchestrator_sequential.py --start-number 100 --manual-input-chance 0.5
一次典型运行的输出如下:
→ Operation: subtract → Number: 10 100.0 - 10 = 90.0 → Operation: subtract Current: 90.0 | Operation: subtract | Enter a number: 1 → Number: 1 (manual) 90.0 - 1 = 89.0 → Operation: add Current: 89.0 | Operation: add | Enter a number: 1 → Number: 1 (manual) 89.0 + 1 = 90.0 → Operation: divide → Number: 17 90.0 ÷ 17 = 5.29 → Operation: add → Number: 5 5.29 + 5 = 10.29 → Operation: divide Current: 10.29 | Operation: divide | Enter a number: 1 → Number: 1 (manual) 10.29 ÷ 1 = 10.29 → Operation: subtract Current: 10.29 | Operation: subtract | Enter a number: 1 → Number: 1 (manual) 10.29 - 1 = 9.29 → Operation: multiply Current: 9.29 | Operation: multiply | Enter a number: 1 → Number: 1 (manual) 9.29 × 1 = 9.29 → Operation: multiply Current: 9.29 | Operation: multiply | Enter a number: 2 → Number: 2 (manual) 9.29 × 2 = 18.58 → Operation: multiply → Number: 10 18.58 × 10 = 185.79999999999998 → Operation: end 🧮 Math Engine Complete! (Sequential) Computation History: Starting number: 100.0 (mode: mixed (50% manual)) 100.0 - 10 = 90.0 90.0 - 1 = 89.0 89.0 + 1 = 90.0 90.0 ÷ 17 = 5.29 5.29 + 5 = 10.29 10.29 ÷ 1 = 10.29 10.29 - 1 = 9.29 9.29 × 1 = 9.29 9.29 × 2 = 18.58 18.58 × 10 = 185.79999999999998 Final Result: 185.79999999999998
运算和数字都由远程服务随机生成,每次运行的结果不同。
从控制台到生产环境:使用 interruptinput() 适合本地脚本调试。到了生产环境——编排器可能藏在 REST API、Web UI 或聊天界面后面——没有控制台可用。LangGraph 对此有一个一等原语:interrupt。
机制不复杂:节点调用 interrupt() 时 LangGraph 暂停整个图,将完整状态写入 checkpoint,然后把控制权交还给调用方。调用方(API 服务、Web 应用等)拿到暂停信号后向用户展示提示,收到响应后用 Command(resume=...) 恢复执行。图从 interrupt() 调用处精确恢复,哪怕过了几个小时、换了一台机器也没问题。
以下是用 interrupt 替换 input() 后 get_random_number 的写法:
math_orchestrator/shared_interrupt.py(相关摘录)
import sqlite3 from langgraph.types import interrupt, Command from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.pregel.remote import RemoteGraph
math_service = RemoteGraph( "math_service", url="http://localhost:2024", )
def get_random_number(state): """ 获取下一个数字 — 通过远程图或人工中断。 当远程图返回 ask_user=True 时,我们不调用 input(), 而是调用 interrupt(),它会暂停整个图并向调用应用程序 呈现一个提示。 """ chance = state.get("manual_input_chance", 0.0) result = math_service.invoke({ "action": "generate_number", "operation": "", "number": 0, "manual_input_chance": chance, "ask_user": False, }) if result.get("ask_user"): # 暂停图 — 调用者接收此提示 num = interrupt({ "prompt": "Enter a number", "current_number": state.get("current_number"), "operation": state.get("operation"), }) print(f" → Number: {num} (manual)") return {"random_number": int(num)} num = result["number"] print(f" → Number: {num}") return {"random_number": num}
编译图时必须附带 checkpointer(状态持久化),调用时必须指定 thread_id(标识具体会话):
# 带 checkpointer 编译 checkpointer = SqliteSaver(sqlite3.connect("math_engine.db")) graph = builder.compile(checkpointer=checkpointer) # 启动新线程 config = {"configurable": {"thread_id": "session-42"}} # 首次调用 — 运行直到 interrupt() 被调用 result = graph.invoke( build_initial_state(current_number=100, manual_input_chance=1.0), config=config, ) # 图现在已暂停。result["__interrupt__"] 包含提示: # [Interrupt(value={"prompt": "Enter a number", "current_number": 100, ...})] # ... 时间流逝,用户通过 Web UI、API 等提供输入 ... # 使用用户的值恢复 result = graph.invoke(Command(resume=42), config=config) # 图从 interrupt() 被调用的地方继续执行, # num = 42,并运行直到下一个 interrupt 或 END。
几个要点。interrupt() 和 input() 目的相同,区别在于前者走 HTTP 而非 stdin——传入一个 payload(提示、上下文等),调用方通过 Command(resume=...) 回传用户输入。checkpointer 负责持久化图状态,LangGraph 支持 SQLite、Postgres 等多种后端。thread_id 用来标识会话,多个用户可以各自持有独立的暂停/运行中的图实例。远程图和图的连接方式无需任何改动,变化只发生在节点函数和调用模式上。
保护线程:认证与授权如果 thread ID 是保护暂停会话的唯一手段,任何猜中或截获了 thread ID 的人都能恢复别人的图、注入自己的值。LangGraph Platform 对此有内置的认证与授权层。
认证系统分两步走。@auth.authenticate 处理程序作为中间件在每个请求上运行,验证调用者的凭据(JWT token、API 密钥、OAuth2 等)并返回用户身份;@auth.on 处理程序执行资源级访问控制,给每个线程打上所有者标记,过滤访问权限,确保用户只能看到和恢复自己的线程。
线程级安全的实现如下:
from langgraph_sdk import Auth auth = Auth() @auth.authenticate async def authenticate(authorization: str) -> Auth.types.MinimalUserDict: """验证 Bearer token 并返回用户信息。""" token = authorization.split(" ", 1)[1] user = await verify_jwt(token) # 你的 JWT 验证逻辑 return { "identity": user["sub"], "is_authenticated": True, } @auth.on.threads.create async def on_thread_create( ctx: Auth.types.AuthContext, value: Auth.types.on.threads.create.value, ): """为每个新线程标记创建者的身份。 `value` 是线程创建载荷 — 一个包含来自 API 请求的字段的字典: thread_id, metadata, if_exists 等。 我们修改 value["metadata"] 以在存储之前标记所有者。 返回值是 LangGraph 写入线程 metadata 的元数据过滤器。 """ value.setdefault("metadata", {})["owner"] = ctx.user.identity return {"owner": ctx.user.identity} @auth.on.threads.read async def on_thread_read( ctx: Auth.types.AuthContext, value: Auth.types.on.threads.read.value, ): """过滤线程,使用户只能看到自己的。 `value` 是读取请求载荷 — 一个包含 thread_id 和来自 API 请求的任何 metadata 的字典。 返回值不是检查 — 而是查询过滤器。 LangGraph 在存储层应用它:只有 metadata.owner 与 ctx.user.identity 匹配的线程才会被返回。 其他用户拥有的线程是不可见的,而不仅仅是被阻止。 """ return {"owner": ctx.user.identity} @auth.on.threads.create_run async def on_thread_resume( ctx: Auth.types.AuthContext, value: Auth.types.on.threads.create_run.value, ): """过滤用户可以在哪些线程上恢复运行。 `value` 是运行创建载荷 — 一个包含 thread_id, assistant_id, input, command, metadata, config 等的字典。 相同的过滤器机制:LangGraph 仅在线程的 metadata.owner 与返回的过滤器匹配时才允许运行。如果用户尝试恢复 另一个用户的线程,平台会拒绝请求,因为该线程 未通过过滤器。 """ metadata = value.setdefault("metadata", {}) metadata["owner"] = ctx.user.identity return {"owner": ctx.user.identity}
在 langgraph.json 中注册:
{ "auth": { "path": "src/security/auth.py:auth" } }
配置完成后,即使攻击者拿到了其他用户的 thread ID 也无法读取线程状态或恢复运行——授权处理程序会因为 owner metadata 不匹配而拒绝请求。过滤发生在平台层,不在图代码中,无法通过构造 API 请求绕过。
生产部署时 LangGraph Platform 可以对接 Auth0、Supabase 以及任何 OAuth2/JWT 认证体系。记住一点:thread ID 是标识符,不是密钥——安全保障来自认证层对访问权限的把控。
前面基于控制台 input() 的版本已经是这一模式的可运行原型。迁移到 interrupt 只需改动节点函数和调用模式,架构其余部分——远程图、fan-out/fan-in、错误处理——全部保持原样。
RemoteGraph 的工作原理langgraph.pregel.remote 中的 RemoteGraph 类是整个组合能力的底层支撑。它实现的接口和本地编译的图完全一致,可以 .invoke()、.stream(),也可以直接嵌入为另一个图的子图节点。通信通过 HTTP对接 LangGraph Server API。
from langgraph.pregel.remote import RemoteGraph remote = RemoteGraph( "math_service", # 来自 langgraph.json 的 assistant ID url="http://localhost:2024", ) # 像使用普通图一样使用它 — action 字段控制行为 result = remote.invoke({"action": "pick_operation", "operation": "", "number": 0})
RemoteGraph 遵循 Runnable 接口,可以直接作为节点添加到另一个图中:
builder.add_node("my_remote_node", remote_graph)
编排器图不需要了解远程图的内部实现细节——它可以是一个简单状态机、一个 LLM 驱动的 agent,或者任何介于两者之间的东西。而通过条件路由,一个远程图部署就能承载多种操作。
第二个调用依赖于第一个调用的结果时选顺序执行,或者想在最后一轮 end 时省掉无用的远程调用也该选顺序。两个调用互相独立、想缩短总耗时就选并行——在生产环境中远程图调用可能涉及 LLM 推理或数据库查询,并行执行差不多能把每轮延迟砍掉一半。
错误处理并行执行带来一个自然的问题:远程图调用失败了怎么办?math_service 临时挂掉\网络请求超时,这些情况都需要考虑。LangGraph 的处理提供了多种应对策略。
默认行为:原子 superstepLangGraph 中并行节点在一个 superstep 里共同执行。superstep 中任何一个节点抛出异常,整个 superstep 原子性失败,不会有部分状态写入。假如 get_random_number 成功、get_operation 失败,两边的结果都不会写入状态,避免了有随机数却没运算符这种不一致。
配了 checkpointer 的情况下 LangGraph 会在内部保存成功节点的结果。恢复执行时只有失败分支重试,成功分支的工作不必重复。
策略 1:RetryPolicy(图原生重试)最干净的做法是对容易出错的节点附加 RetryPolicy。LangGraph 接管重试循环,支持配置尝试次数、退避策略和抖动。只有失败分支重试,成功的并行节点不会重新执行。
重试全部耗尽后异常向上传播,图调用失败。对网络超时、5xx 错误这类瞬态故障,这是恰当的处理方式。
策略 2:节点内 Try/Except(降级处理)需要图在远程服务不可用时仍然继续运行的场景下,在节点内部捕获异常并返回降级值即可。操作调用失败则引擎停止循环;数字生成调用失败则用安全默认值代替。
策略 3:两者结合(生产环境推荐)生产中通常既要重试也要降级。RetryPolicy 透明处理瞬态故障,重试全部耗尽后异常才落到节点内部的 try/except 块,由它提供兜底逻辑。
以下是三种策略的完整可运行示例。
总结RemoteGraph 让分布式 agent 架构的组合变得相当简洁——顺序还是并行的连接方式随意切换,RetryPolicy 加上节点内降级逻辑构成两层容错。单个远程图通过条件路由就能承载多种操作,基础设施不必铺得很大,编排逻辑也能保持清晰。
这个数学引擎作为 demo 虽然简单,展示的模式却可以直接迁移到正式系统——微服务化的 AI 编排,每个图作为一个独立部署的 agent 逻辑单元,根据延迟和成本需求选择合适的执行策略。
本文完整代码:
https://avoid.overfit.cn/post/d9102c5bf109459494a5bf2b99560b18
by Alexander Machekhin