Langchain Academy translated
  • module-0
    • LangChain 学院
  • module-1
    • 智能体记忆
    • 智能体
    • 链式结构
    • 部署
    • 路由器
    • 最简单的图结构
  • module-2
    • 支持消息摘要与外部数据库记忆的聊天机器人
    • 支持消息摘要的聊天机器人
    • 多模式架构
    • 状态归约器
    • 状态模式
    • 消息过滤与修剪
  • module-3
    • 断点
    • 动态断点
    • 编辑图状态
    • 流式处理
    • 时间回溯
  • module-4
    • 映射-归约
    • 并行节点执行
    • 研究助手
    • 子图
  • module-5
    • 记忆代理
    • 具备记忆功能的聊天机器人
    • 基于集合架构的聊天机器人
    • 支持个人资料架构的聊天机器人
  • module-6
    • 助手
    • 连接 LangGraph 平台部署
    • 创建部署
    • 双重消息处理
  • Search
  • Previous
  • Next
  • 时间回溯
    • 回顾
    • 目标
    • 浏览历史
    • 回放执行
    • 分支执行

在 Colab 中打开 在 LangChain Academy 中打开

时间回溯¶

回顾¶

我们讨论了人在回路(human-in-the-loop)的动机:

(1) 审批机制 - 可以中断智能体运行,向用户展示当前状态,并允许用户批准某个操作

(2) 调试功能 - 能够回退图谱状态以复现或规避问题

(3) 编辑功能 - 支持直接修改运行状态

我们演示了断点功能如何使图谱在特定节点暂停,或让图谱动态地自行中断运行。

随后展示了如何通过人工审批继续执行,或根据人工反馈直接编辑图谱状态。

目标¶

现在,我们将展示LangGraph如何通过查看、重放甚至从历史状态分叉来支持调试。

我们称这项功能为时间回溯。

In [ ]:
Copied!
%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai langgraph_sdk langgraph-prebuilt
%%capture --no-stderr %pip install --quiet -U langgraph langchain_openai langgraph_sdk langgraph-prebuilt
In [ ]:
Copied!
import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")
import os, getpass def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("OPENAI_API_KEY")

让我们构建我们的智能体。

In [1]:
Copied!
from langchain_openai import ChatOpenAI

def multiply(a: int, b: int) -> int:
    """Multiply a and b.

    Args:
        a: first int
        b: second int
    """
    return a * b

# This will be a tool
def add(a: int, b: int) -> int:
    """Adds a and b.

    Args:
        a: first int
        b: second int
    """
    return a + b

def divide(a: int, b: int) -> float:
    """Divide a by b.

    Args:
        a: first int
        b: second int
    """
    return a / b

tools = [add, multiply, divide]
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)
from langchain_openai import ChatOpenAI def multiply(a: int, b: int) -> int: """Multiply a and b. Args: a: first int b: second int """ return a * b # This will be a tool def add(a: int, b: int) -> int: """Adds a and b. Args: a: first int b: second int """ return a + b def divide(a: int, b: int) -> float: """Divide a by b. Args: a: first int b: second int """ return a / b tools = [add, multiply, divide] llm = ChatOpenAI(model="gpt-4o") llm_with_tools = llm.bind_tools(tools)
In [2]:
Copied!
from IPython.display import Image, display

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
from langgraph.graph import START, END, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode

from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

# System message
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")

# Node
def assistant(state: MessagesState):
   return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

# Graph
builder = StateGraph(MessagesState)

# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))

# Define edges: these determine the control flow
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
    "assistant",
    # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
    # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
    tools_condition,
)
builder.add_edge("tools", "assistant")

memory = MemorySaver()
graph = builder.compile(checkpointer=MemorySaver())

# Show
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
from IPython.display import Image, display from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import MessagesState from langgraph.graph import START, END, StateGraph from langgraph.prebuilt import tools_condition, ToolNode from langchain_core.messages import AIMessage, HumanMessage, SystemMessage # System message sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.") # Node def assistant(state: MessagesState): return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]} # Graph builder = StateGraph(MessagesState) # Define nodes: these do the work builder.add_node("assistant", assistant) builder.add_node("tools", ToolNode(tools)) # Define edges: these determine the control flow builder.add_edge(START, "assistant") builder.add_conditional_edges( "assistant", # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END tools_condition, ) builder.add_edge("tools", "assistant") memory = MemorySaver() graph = builder.compile(checkpointer=MemorySaver()) # Show display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
No description has been provided for this image

让我们像之前一样运行它。

In [3]:
Copied!
# Input
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
    event['messages'][-1].pretty_print()
# Input initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")} # Thread thread = {"configurable": {"thread_id": "1"}} # Run the graph until the first interruption for event in graph.stream(initial_input, thread, stream_mode="values"): event['messages'][-1].pretty_print()
================================ Human Message =================================

Multiply 2 and 3
================================== Ai Message ==================================
Tool Calls:
  multiply (call_ikJxMpb777bKMYgmM3d9mYjW)
 Call ID: call_ikJxMpb777bKMYgmM3d9mYjW
  Args:
    a: 2
    b: 3
================================= Tool Message =================================
Name: multiply

6
================================== Ai Message ==================================

The result of multiplying 2 and 3 is 6.

浏览历史¶

我们可以使用 get_state 来查看给定 thread_id 时图的当前状态!

In [4]:
Copied!
graph.get_state({'configurable': {'thread_id': '1'}})
graph.get_state({'configurable': {'thread_id': '1'}})
Out[4]:
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_ikJxMpb777bKMYgmM3d9mYjW', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 131, 'total_tokens': 148}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-bc24d334-8013-4f85-826f-e1ed69c86df0-0', tool_calls=[{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_ikJxMpb777bKMYgmM3d9mYjW', 'type': 'tool_call'}], usage_metadata={'input_tokens': 131, 'output_tokens': 17, 'total_tokens': 148}), ToolMessage(content='6', name='multiply', id='1012611a-30c5-4732-b789-8c455580c7b4', tool_call_id='call_ikJxMpb777bKMYgmM3d9mYjW'), AIMessage(content='The result of multiplying 2 and 3 is 6.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 156, 'total_tokens': 170}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-b46f3fed-ca3b-4e09-83f4-77ea5071e9bf-0', usage_metadata={'input_tokens': 156, 'output_tokens': 14, 'total_tokens': 170})]}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-ac9e-6024-8003-6fd8435c1d3b'}}, metadata={'source': 'loop', 'writes': {'assistant': {'messages': [AIMessage(content='The result of multiplying 2 and 3 is 6.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 156, 'total_tokens': 170}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-b46f3fed-ca3b-4e09-83f4-77ea5071e9bf-0', usage_metadata={'input_tokens': 156, 'output_tokens': 14, 'total_tokens': 170})]}}, 'step': 3, 'parents': {}}, created_at='2024-09-03T22:29:54.309727+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a759-6d02-8002-f1da6393e1ab'}}, tasks=())

我们也可以浏览智能体的状态历史记录。

通过 get_state_history 方法可以获取所有先前步骤的状态。

In [5]:
Copied!
all_states = [s for s in graph.get_state_history(thread)]
all_states = [s for s in graph.get_state_history(thread)]
In [6]:
Copied!
len(all_states)
len(all_states)
Out[6]:
5

第一个元素是当前状态,正如我们从 get_state 获取到的那样。

In [7]:
Copied!
all_states[-2]
all_states[-2]
Out[7]:
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}, next=('assistant',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}, metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}}, created_at='2024-09-03T22:29:52.988265+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-9ffe-6512-bfff-9e6d8dc24bba'}}, tasks=(PregelTask(id='ca669906-0c4f-5165-840d-7a6a3fce9fb9', name='assistant', error=None, interrupts=(), state=None),))

我们可以将上述所有内容可视化如下:

fig1.jpg

回放执行¶

我们可以从任意先前步骤重新运行智能体。

fig2.jpg

让我们回顾一下接收人工输入的步骤!

In [8]:
Copied!
to_replay = all_states[-2]
to_replay = all_states[-2]
In [9]:
Copied!
to_replay
to_replay
Out[9]:
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}, next=('assistant',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}, metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}}, created_at='2024-09-03T22:29:52.988265+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-9ffe-6512-bfff-9e6d8dc24bba'}}, tasks=(PregelTask(id='ca669906-0c4f-5165-840d-7a6a3fce9fb9', name='assistant', error=None, interrupts=(), state=None),))

查看状态。

In [10]:
Copied!
to_replay.values
to_replay.values
Out[10]:
{'messages': [HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}

我们可以查看下一个待调用的节点。

In [11]:
Copied!
to_replay.next
to_replay.next
Out[11]:
('assistant',)

我们还会获取配置信息,其中包含 checkpoint_id 和 thread_id 的相关数据。

In [12]:
Copied!
to_replay.config
to_replay.config
Out[12]:
{'configurable': {'thread_id': '1',
  'checkpoint_ns': '',
  'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}

要从这里重新执行,我们只需将配置传回给代理即可!

该图谱知道此检查点已被执行过。

它只是从这个检查点重新开始执行!

In [13]:
Copied!
for event in graph.stream(None, to_replay.config, stream_mode="values"):
    event['messages'][-1].pretty_print()
for event in graph.stream(None, to_replay.config, stream_mode="values"): event['messages'][-1].pretty_print()
================================ Human Message =================================

Multiply 2 and 3
================================== Ai Message ==================================
Tool Calls:
  multiply (call_SABfB57CnDkMu9HJeUE0mvJ9)
 Call ID: call_SABfB57CnDkMu9HJeUE0mvJ9
  Args:
    a: 2
    b: 3
================================= Tool Message =================================
Name: multiply

6
================================== Ai Message ==================================

The result of multiplying 2 and 3 is 6.

现在,我们可以看到代理重新运行后的当前状态。

分支执行¶

如果我们希望从相同的步骤开始运行,但使用不同的输入参数,这时该怎么办?

这就是分支执行的概念。

fig3.jpg

In [14]:
Copied!
to_fork = all_states[-2]
to_fork.values["messages"]
to_fork = all_states[-2] to_fork.values["messages"]
Out[14]:
[HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]

再次强调,我们使用的是配置文件。

In [15]:
Copied!
to_fork.config
to_fork.config
Out[15]:
{'configurable': {'thread_id': '1',
  'checkpoint_ns': '',
  'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}

让我们在这个检查点修改状态。

只需运行带有 checkpoint_id 参数的 update_state 即可。

还记得我们对 messages 的 reducer 是如何工作的:

  • 默认会追加消息,除非我们提供了消息ID
  • 我们通过提供消息ID来覆盖消息,而不是追加到状态中!

因此,要覆盖消息,我们只需提供消息ID,即 to_fork.values["messages"].id。

In [16]:
Copied!
fork_config = graph.update_state(
    to_fork.config,
    {"messages": [HumanMessage(content='Multiply 5 and 3', 
                               id=to_fork.values["messages"][0].id)]},
)
fork_config = graph.update_state( to_fork.config, {"messages": [HumanMessage(content='Multiply 5 and 3', id=to_fork.values["messages"][0].id)]}, )
In [17]:
Copied!
fork_config
fork_config
Out[17]:
{'configurable': {'thread_id': '1',
  'checkpoint_ns': '',
  'checkpoint_id': '1ef6a442-3661-62f6-8001-d3c01b96f98b'}}

这将创建一个新的分叉检查点。

但元数据(例如下一步操作指向)仍会保留!

我们可以看到当前智能体的状态已随分叉更新。

In [18]:
Copied!
all_states = [state for state in graph.get_state_history(thread) ]
all_states[0].values["messages"]
all_states = [state for state in graph.get_state_history(thread) ] all_states[0].values["messages"]
Out[18]:
[HumanMessage(content='Multiply 5 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]
In [19]:
Copied!
graph.get_state({'configurable': {'thread_id': '1'}})
graph.get_state({'configurable': {'thread_id': '1'}})
Out[19]:
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 5 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}, next=('assistant',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a442-3661-62f6-8001-d3c01b96f98b'}}, metadata={'source': 'update', 'step': 1, 'writes': {'__start__': {'messages': [HumanMessage(content='Multiply 5 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}}, 'parents': {}}, created_at='2024-09-03T22:30:35.598707+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}, tasks=(PregelTask(id='f8990132-a8d3-5ddd-8d9e-1efbfc220da1', name='assistant', error=None, interrupts=(), state=None),))

现在,当我们进行流式处理时,图结构知道这个检查点从未被执行过。

因此,图结构会重新运行,而非简单地回放。

In [20]:
Copied!
for event in graph.stream(None, fork_config, stream_mode="values"):
    event['messages'][-1].pretty_print()
for event in graph.stream(None, fork_config, stream_mode="values"): event['messages'][-1].pretty_print()
================================ Human Message =================================

Multiply 5 and 3
================================== Ai Message ==================================
Tool Calls:
  multiply (call_KP2CVNMMUKMJAQuFmamHB21r)
 Call ID: call_KP2CVNMMUKMJAQuFmamHB21r
  Args:
    a: 5
    b: 3
================================= Tool Message =================================
Name: multiply

15
================================== Ai Message ==================================

The result of multiplying 5 and 3 is 15.

现在,我们可以看到当前状态标志着智能体运行的结束。

In [21]:
Copied!
graph.get_state({'configurable': {'thread_id': '1'}})
graph.get_state({'configurable': {'thread_id': '1'}})
Out[21]:
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 5 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_KP2CVNMMUKMJAQuFmamHB21r', 'function': {'arguments': '{"a":5,"b":3}', 'name': 'multiply'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 131, 'total_tokens': 148}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-bc420009-d1f6-49b8-bea7-dfc9fca7eb79-0', tool_calls=[{'name': 'multiply', 'args': {'a': 5, 'b': 3}, 'id': 'call_KP2CVNMMUKMJAQuFmamHB21r', 'type': 'tool_call'}], usage_metadata={'input_tokens': 131, 'output_tokens': 17, 'total_tokens': 148}), ToolMessage(content='15', name='multiply', id='9232e653-816d-471a-9002-9a1ecd453364', tool_call_id='call_KP2CVNMMUKMJAQuFmamHB21r'), AIMessage(content='The result of multiplying 5 and 3 is 15.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 156, 'total_tokens': 170}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-86c21888-d832-47c5-9e76-0aa2676116dc-0', usage_metadata={'input_tokens': 156, 'output_tokens': 14, 'total_tokens': 170})]}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a442-a2e2-6e98-8004-4a0b75537950'}}, metadata={'source': 'loop', 'writes': {'assistant': {'messages': [AIMessage(content='The result of multiplying 5 and 3 is 15.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 156, 'total_tokens': 170}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-86c21888-d832-47c5-9e76-0aa2676116dc-0', usage_metadata={'input_tokens': 156, 'output_tokens': 14, 'total_tokens': 170})]}}, 'step': 4, 'parents': {}}, created_at='2024-09-03T22:30:46.976463+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a442-9db0-6056-8003-7304cab7bed8'}}, tasks=())

使用 LangGraph API 进行时间旅行¶

⚠️ 免责声明

自这些视频录制以来,我们已更新 Studio 使其可在本地运行并在浏览器中打开。现在这是运行 Studio 的首选方式(而非视频中展示的桌面应用)。关于本地开发服务器的文档请参见此处,本地运行 Studio 的说明请参见此处。要启动本地开发服务器,请在本模块的 /studio 目录下运行以下终端命令:

langgraph dev

你将看到如下输出:

- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio 界面: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API 文档: http://127.0.0.1:2024/docs

打开浏览器并访问 Studio 界面:https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024。

我们通过 SDK 连接到该界面,并展示 LangGraph API 如何支持时间旅行。

In [ ]:
Copied!
if 'google.colab' in str(get_ipython()):
    raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")
if 'google.colab' in str(get_ipython()): raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")
In [22]:
Copied!
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")
from langgraph_sdk import get_client client = get_client(url="http://127.0.0.1:2024")

重放执行¶

让我们运行智能体流式传输updates(更新),在每次节点调用后同步更新图谱状态。

In [23]:
Copied!
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
thread = await client.threads.create()
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id = "agent",
    input=initial_input,
    stream_mode="updates",
):
    if chunk.data:
        assisant_node = chunk.data.get('assistant', {}).get('messages', [])
        tool_node = chunk.data.get('tools', {}).get('messages', [])
        if assisant_node:
            print("-" * 20+"Assistant Node"+"-" * 20)
            print(assisant_node[-1])
        elif tool_node:
            print("-" * 20+"Tools Node"+"-" * 20)
            print(tool_node[-1])
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")} thread = await client.threads.create() async for chunk in client.runs.stream( thread["thread_id"], assistant_id = "agent", input=initial_input, stream_mode="updates", ): if chunk.data: assisant_node = chunk.data.get('assistant', {}).get('messages', []) tool_node = chunk.data.get('tools', {}).get('messages', []) if assisant_node: print("-" * 20+"Assistant Node"+"-" * 20) print(assisant_node[-1]) elif tool_node: print("-" * 20+"Tools Node"+"-" * 20) print(tool_node[-1])
--------------------Assistant Node--------------------
{'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_SG7XYqDENGq7mwXrnioNLosS', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-2c120fc3-3c82-4599-b8ec-24fbee207cad', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_SG7XYqDENGq7mwXrnioNLosS', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}
--------------------Tools Node--------------------
{'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '3b40d091-58b2-4566-a84c-60af67206307', 'tool_call_id': 'call_SG7XYqDENGq7mwXrnioNLosS', 'artifact': None, 'status': 'success'}
--------------------Assistant Node--------------------
{'content': 'The result of multiplying 2 and 3 is 6.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_fde2829a40'}, 'type': 'ai', 'name': None, 'id': 'run-1272d9b0-a0aa-4ff7-8bad-fdffd27c5506', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}

现在,让我们看看如何从指定的检查点进行重放。

我们只需要传入 checkpoint_id 参数即可。

In [24]:
Copied!
states = await client.threads.get_history(thread['thread_id'])
to_replay = states[-2]
to_replay
states = await client.threads.get_history(thread['thread_id']) to_replay = states[-2] to_replay
Out[24]:
{'values': {'messages': [{'content': 'Multiply 2 and 3',
    'additional_kwargs': {'example': False,
     'additional_kwargs': {},
     'response_metadata': {}},
    'response_metadata': {},
    'type': 'human',
    'name': None,
    'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042',
    'example': False}]},
 'next': ['assistant'],
 'tasks': [{'id': 'e497456f-827a-5027-87bd-b0ccd54aa89a',
   'name': 'assistant',
   'error': None,
   'interrupts': [],
   'state': None}],
 'metadata': {'step': 0,
  'run_id': '1ef6a449-7fbc-6c90-8754-4e6b1b582790',
  'source': 'loop',
  'writes': None,
  'parents': {},
  'user_id': '',
  'graph_id': 'agent',
  'thread_id': '708e1d8f-f7c8-4093-9bb4-999c4237cb4a',
  'created_by': 'system',
  'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'},
 'created_at': '2024-09-03T22:33:51.380352+00:00',
 'checkpoint_id': '1ef6a449-817f-6b55-8000-07c18fbdf7c8',
 'parent_checkpoint_id': '1ef6a449-816c-6fd6-bfff-32a56dd2635f'}

让我们使用 stream_mode="values" 进行流式传输,以便在回放时查看每个节点的完整状态。

In [25]:
Copied!
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="agent",
    input=None,
    stream_mode="values",
    checkpoint_id=to_replay['checkpoint_id']
):      
    print(f"Receiving new event of type: {chunk.event}...")
    print(chunk.data)
    print("\n\n")
async for chunk in client.runs.stream( thread["thread_id"], assistant_id="agent", input=None, stream_mode="values", checkpoint_id=to_replay['checkpoint_id'] ): print(f"Receiving new event of type: {chunk.event}...") print(chunk.data) print("\n\n")
Receiving new event of type: metadata...
{'run_id': '1ef6a44a-5806-6bb1-b2ee-92ecfda7f67d'}



Receiving new event of type: values...
{'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042', 'example': False}]}



Receiving new event of type: values...
{'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042', 'example': False}, {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-e60d82d7-7743-4f13-bebd-3616a88720a9', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}]}



Receiving new event of type: values...
{'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042', 'example': False}, {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-e60d82d7-7743-4f13-bebd-3616a88720a9', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}, {'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': 'f1be0b83-4565-4aa2-9b9a-cd8874c6a2bc', 'tool_call_id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'artifact': None, 'status': 'success'}]}



Receiving new event of type: values...
{'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042', 'example': False}, {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-e60d82d7-7743-4f13-bebd-3616a88720a9', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}, {'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': 'f1be0b83-4565-4aa2-9b9a-cd8874c6a2bc', 'tool_call_id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'artifact': None, 'status': 'success'}, {'content': 'The result of multiplying 2 and 3 is 6.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-55e5847a-d542-4977-84d7-24852e78b0a9', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]}



我们可以将此视为仅流式传输由我们回复的节点对状态所做的 updates(更新)。

In [26]:
Copied!
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="agent",
    input=None,
    stream_mode="updates",
    checkpoint_id=to_replay['checkpoint_id']
):
    if chunk.data:
        assisant_node = chunk.data.get('assistant', {}).get('messages', [])
        tool_node = chunk.data.get('tools', {}).get('messages', [])
        if assisant_node:
            print("-" * 20+"Assistant Node"+"-" * 20)
            print(assisant_node[-1])
        elif tool_node:
            print("-" * 20+"Tools Node"+"-" * 20)
            print(tool_node[-1])
async for chunk in client.runs.stream( thread["thread_id"], assistant_id="agent", input=None, stream_mode="updates", checkpoint_id=to_replay['checkpoint_id'] ): if chunk.data: assisant_node = chunk.data.get('assistant', {}).get('messages', []) tool_node = chunk.data.get('tools', {}).get('messages', []) if assisant_node: print("-" * 20+"Assistant Node"+"-" * 20) print(assisant_node[-1]) elif tool_node: print("-" * 20+"Tools Node"+"-" * 20) print(tool_node[-1])
--------------------Assistant Node--------------------
{'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_I2qudhMCwcw1GzcFN5q80rjj', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-550e75ad-dbbc-4e55-9f00-aa896228914c', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_I2qudhMCwcw1GzcFN5q80rjj', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}
--------------------Tools Node--------------------
{'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '731b7d4f-780d-4a8b-aec9-0d8b9c58c40a', 'tool_call_id': 'call_I2qudhMCwcw1GzcFN5q80rjj', 'artifact': None, 'status': 'success'}
--------------------Assistant Node--------------------
{'content': 'The result of multiplying 2 and 3 is 6.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-2326afa5-eb43-4568-b5ed-424c0a0fa076', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}

分支操作¶

现在,我们来探讨分支操作。

沿用之前处理过的相同步骤——人工输入环节。

让我们用智能体创建一个新线程。

In [27]:
Copied!
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
thread = await client.threads.create()
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="agent",
    input=initial_input,
    stream_mode="updates",
):
    if chunk.data:
        assisant_node = chunk.data.get('assistant', {}).get('messages', [])
        tool_node = chunk.data.get('tools', {}).get('messages', [])
        if assisant_node:
            print("-" * 20+"Assistant Node"+"-" * 20)
            print(assisant_node[-1])
        elif tool_node:
            print("-" * 20+"Tools Node"+"-" * 20)
            print(tool_node[-1])
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")} thread = await client.threads.create() async for chunk in client.runs.stream( thread["thread_id"], assistant_id="agent", input=initial_input, stream_mode="updates", ): if chunk.data: assisant_node = chunk.data.get('assistant', {}).get('messages', []) tool_node = chunk.data.get('tools', {}).get('messages', []) if assisant_node: print("-" * 20+"Assistant Node"+"-" * 20) print(assisant_node[-1]) elif tool_node: print("-" * 20+"Tools Node"+"-" * 20) print(tool_node[-1])
--------------------Assistant Node--------------------
{'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_HdWoyLELFZGEcqGxFt2fZzek', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-cbd081b1-8cef-4ca8-9dd5-aceb134404dc', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_HdWoyLELFZGEcqGxFt2fZzek', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}
--------------------Tools Node--------------------
{'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '11dd4a7f-0b6b-44da-b9a4-65f1677c8813', 'tool_call_id': 'call_HdWoyLELFZGEcqGxFt2fZzek', 'artifact': None, 'status': 'success'}
--------------------Assistant Node--------------------
{'content': 'The result of multiplying 2 and 3 is 6.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-936cf990-9302-45c7-9051-6ff1e2e9f316', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}
In [28]:
Copied!
states = await client.threads.get_history(thread['thread_id'])
to_fork = states[-2]
to_fork['values']
states = await client.threads.get_history(thread['thread_id']) to_fork = states[-2] to_fork['values']
Out[28]:
{'messages': [{'content': 'Multiply 2 and 3',
   'additional_kwargs': {'example': False,
    'additional_kwargs': {},
    'response_metadata': {}},
   'response_metadata': {},
   'type': 'human',
   'name': None,
   'id': '93c18b95-9050-4a52-99b8-9374e98ee5db',
   'example': False}]}
In [29]:
Copied!
to_fork['values']['messages'][0]['id']
to_fork['values']['messages'][0]['id']
Out[29]:
'93c18b95-9050-4a52-99b8-9374e98ee5db'
In [30]:
Copied!
to_fork['next']
to_fork['next']
Out[30]:
['assistant']
In [31]:
Copied!
to_fork['checkpoint_id']
to_fork['checkpoint_id']
Out[31]:
'1ef6a44b-27ec-681c-8000-ff7e345aee7e'

让我们来编辑状态。

还记得 messages 的 reducer 是如何工作的:

  • 默认会追加消息,除非我们提供了消息 ID
  • 我们通过提供消息 ID 来实现覆盖消息(而非追加到状态中)
In [32]:
Copied!
forked_input = {"messages": HumanMessage(content="Multiply 3 and 3",
                                         id=to_fork['values']['messages'][0]['id'])}

forked_config = await client.threads.update_state(
    thread["thread_id"],
    forked_input,
    checkpoint_id=to_fork['checkpoint_id']
)
forked_input = {"messages": HumanMessage(content="Multiply 3 and 3", id=to_fork['values']['messages'][0]['id'])} forked_config = await client.threads.update_state( thread["thread_id"], forked_input, checkpoint_id=to_fork['checkpoint_id'] )
In [33]:
Copied!
forked_config
forked_config
Out[33]:
{'configurable': {'thread_id': 'c99502e7-b0d7-473e-8295-1ad60e2b7ed2',
  'checkpoint_ns': '',
  'checkpoint_id': '1ef6a44b-90dc-68c8-8001-0c36898e0f34'},
 'checkpoint_id': '1ef6a44b-90dc-68c8-8001-0c36898e0f34'}
In [34]:
Copied!
states = await client.threads.get_history(thread['thread_id'])
states[0]
states = await client.threads.get_history(thread['thread_id']) states[0]
Out[34]:
{'values': {'messages': [{'content': 'Multiply 3 and 3',
    'additional_kwargs': {'additional_kwargs': {},
     'response_metadata': {},
     'example': False},
    'response_metadata': {},
    'type': 'human',
    'name': None,
    'id': '93c18b95-9050-4a52-99b8-9374e98ee5db',
    'example': False}]},
 'next': ['assistant'],
 'tasks': [{'id': 'da5d6548-62ca-5e69-ba70-f6179b2743bd',
   'name': 'assistant',
   'error': None,
   'interrupts': [],
   'state': None}],
 'metadata': {'step': 1,
  'source': 'update',
  'writes': {'__start__': {'messages': {'id': '93c18b95-9050-4a52-99b8-9374e98ee5db',
     'name': None,
     'type': 'human',
     'content': 'Multiply 3 and 3',
     'example': False,
     'additional_kwargs': {},
     'response_metadata': {}}}},
  'parents': {},
  'graph_id': 'agent'},
 'created_at': '2024-09-03T22:34:46.678333+00:00',
 'checkpoint_id': '1ef6a44b-90dc-68c8-8001-0c36898e0f34',
 'parent_checkpoint_id': '1ef6a44b-27ec-681c-8000-ff7e345aee7e'}

要重新运行,我们传入 checkpoint_id。

In [35]:
Copied!
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="agent",
    input=None,
    stream_mode="updates",
    checkpoint_id=forked_config['checkpoint_id']
):
    if chunk.data:
        assisant_node = chunk.data.get('assistant', {}).get('messages', [])
        tool_node = chunk.data.get('tools', {}).get('messages', [])
        if assisant_node:
            print("-" * 20+"Assistant Node"+"-" * 20)
            print(assisant_node[-1])
        elif tool_node:
            print("-" * 20+"Tools Node"+"-" * 20)
            print(tool_node[-1])
async for chunk in client.runs.stream( thread["thread_id"], assistant_id="agent", input=None, stream_mode="updates", checkpoint_id=forked_config['checkpoint_id'] ): if chunk.data: assisant_node = chunk.data.get('assistant', {}).get('messages', []) tool_node = chunk.data.get('tools', {}).get('messages', []) if assisant_node: print("-" * 20+"Assistant Node"+"-" * 20) print(assisant_node[-1]) elif tool_node: print("-" * 20+"Tools Node"+"-" * 20) print(tool_node[-1])
--------------------Assistant Node--------------------
{'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_aodhCt5fWv33qVbO7Nsub9Q3', 'function': {'arguments': '{"a":3,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-e9759422-e537-4b9b-b583-36c688e13b4b', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 3, 'b': 3}, 'id': 'call_aodhCt5fWv33qVbO7Nsub9Q3', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}
--------------------Tools Node--------------------
{'content': '9', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '89787b0b-93de-4c0a-bea8-d2c3845534e1', 'tool_call_id': 'call_aodhCt5fWv33qVbO7Nsub9Q3', 'artifact': None, 'status': 'success'}
--------------------Assistant Node--------------------
{'content': 'The result of multiplying 3 by 3 is 9.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-0e16610f-4e8d-46f3-a5df-c2f187fae593', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}

LangGraph Studio¶

让我们通过 Studio 用户界面来观察 agent 的分支执行情况,该代理使用了 module-1/studio/agent.py 中定义的配置,相关设置保存在 module-1/studio/langgraph.json 文件中。


Documentation built with MkDocs.

Search

From here you can search these documents. Enter your search terms below.

Keyboard Shortcuts

Keys Action
? Open this help
n Next page
p Previous page
s Search