时间回溯¶
回顾¶
我们讨论了人在回路(human-in-the-loop)的动机:
(1) 审批机制
- 可以中断智能体运行,向用户展示当前状态,并允许用户批准某个操作
(2) 调试功能
- 能够回退图谱状态以复现或规避问题
(3) 编辑功能
- 支持直接修改运行状态
我们演示了断点功能如何使图谱在特定节点暂停,或让图谱动态地自行中断运行。
随后展示了如何通过人工审批继续执行,或根据人工反馈直接编辑图谱状态。
目标¶
现在,我们将展示LangGraph如何通过查看、重放甚至从历史状态分叉来支持调试。
我们称这项功能为时间回溯
。
%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai langgraph_sdk langgraph-prebuilt
import os, getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
让我们构建我们的智能体。
from langchain_openai import ChatOpenAI
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a * b
# This will be a tool
def add(a: int, b: int) -> int:
"""Adds a and b.
Args:
a: first int
b: second int
"""
return a + b
def divide(a: int, b: int) -> float:
"""Divide a by b.
Args:
a: first int
b: second int
"""
return a / b
tools = [add, multiply, divide]
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)
from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
from langgraph.graph import START, END, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
# System message
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
# Node
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# Graph
builder = StateGraph(MessagesState)
# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# Define edges: these determine the control flow
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
builder.add_edge("tools", "assistant")
memory = MemorySaver()
graph = builder.compile(checkpointer=MemorySaver())
# Show
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
让我们像之前一样运行它。
# Input
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
# Thread
thread = {"configurable": {"thread_id": "1"}}
# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
================================ Human Message ================================= Multiply 2 and 3 ================================== Ai Message ================================== Tool Calls: multiply (call_ikJxMpb777bKMYgmM3d9mYjW) Call ID: call_ikJxMpb777bKMYgmM3d9mYjW Args: a: 2 b: 3 ================================= Tool Message ================================= Name: multiply 6 ================================== Ai Message ================================== The result of multiplying 2 and 3 is 6.
浏览历史¶
我们可以使用 get_state
来查看给定 thread_id
时图的当前状态!
graph.get_state({'configurable': {'thread_id': '1'}})
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_ikJxMpb777bKMYgmM3d9mYjW', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 131, 'total_tokens': 148}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-bc24d334-8013-4f85-826f-e1ed69c86df0-0', tool_calls=[{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_ikJxMpb777bKMYgmM3d9mYjW', 'type': 'tool_call'}], usage_metadata={'input_tokens': 131, 'output_tokens': 17, 'total_tokens': 148}), ToolMessage(content='6', name='multiply', id='1012611a-30c5-4732-b789-8c455580c7b4', tool_call_id='call_ikJxMpb777bKMYgmM3d9mYjW'), AIMessage(content='The result of multiplying 2 and 3 is 6.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 156, 'total_tokens': 170}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-b46f3fed-ca3b-4e09-83f4-77ea5071e9bf-0', usage_metadata={'input_tokens': 156, 'output_tokens': 14, 'total_tokens': 170})]}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-ac9e-6024-8003-6fd8435c1d3b'}}, metadata={'source': 'loop', 'writes': {'assistant': {'messages': [AIMessage(content='The result of multiplying 2 and 3 is 6.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 156, 'total_tokens': 170}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-b46f3fed-ca3b-4e09-83f4-77ea5071e9bf-0', usage_metadata={'input_tokens': 156, 'output_tokens': 14, 'total_tokens': 170})]}}, 'step': 3, 'parents': {}}, created_at='2024-09-03T22:29:54.309727+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a759-6d02-8002-f1da6393e1ab'}}, tasks=())
我们也可以浏览智能体的状态历史记录。
通过 get_state_history
方法可以获取所有先前步骤的状态。
all_states = [s for s in graph.get_state_history(thread)]
len(all_states)
5
第一个元素是当前状态,正如我们从 get_state
获取到的那样。
all_states[-2]
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}, next=('assistant',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}, metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}}, created_at='2024-09-03T22:29:52.988265+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-9ffe-6512-bfff-9e6d8dc24bba'}}, tasks=(PregelTask(id='ca669906-0c4f-5165-840d-7a6a3fce9fb9', name='assistant', error=None, interrupts=(), state=None),))
我们可以将上述所有内容可视化如下:
让我们回顾一下接收人工输入的步骤!
to_replay = all_states[-2]
to_replay
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}, next=('assistant',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}, metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}}, created_at='2024-09-03T22:29:52.988265+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-9ffe-6512-bfff-9e6d8dc24bba'}}, tasks=(PregelTask(id='ca669906-0c4f-5165-840d-7a6a3fce9fb9', name='assistant', error=None, interrupts=(), state=None),))
查看状态。
to_replay.values
{'messages': [HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}
我们可以查看下一个待调用的节点。
to_replay.next
('assistant',)
我们还会获取配置信息,其中包含 checkpoint_id
和 thread_id
的相关数据。
to_replay.config
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}
要从这里重新执行,我们只需将配置传回给代理即可!
该图谱知道此检查点已被执行过。
它只是从这个检查点重新开始执行!
for event in graph.stream(None, to_replay.config, stream_mode="values"):
event['messages'][-1].pretty_print()
================================ Human Message ================================= Multiply 2 and 3 ================================== Ai Message ================================== Tool Calls: multiply (call_SABfB57CnDkMu9HJeUE0mvJ9) Call ID: call_SABfB57CnDkMu9HJeUE0mvJ9 Args: a: 2 b: 3 ================================= Tool Message ================================= Name: multiply 6 ================================== Ai Message ================================== The result of multiplying 2 and 3 is 6.
现在,我们可以看到代理重新运行后的当前状态。
to_fork = all_states[-2]
to_fork.values["messages"]
[HumanMessage(content='Multiply 2 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]
再次强调,我们使用的是配置文件。
to_fork.config
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}
让我们在这个检查点修改状态。
只需运行带有 checkpoint_id
参数的 update_state
即可。
还记得我们对 messages
的 reducer 是如何工作的:
- 默认会追加消息,除非我们提供了消息ID
- 我们通过提供消息ID来覆盖消息,而不是追加到状态中!
因此,要覆盖消息,我们只需提供消息ID,即 to_fork.values["messages"].id
。
fork_config = graph.update_state(
to_fork.config,
{"messages": [HumanMessage(content='Multiply 5 and 3',
id=to_fork.values["messages"][0].id)]},
)
fork_config
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a442-3661-62f6-8001-d3c01b96f98b'}}
这将创建一个新的分叉检查点。
但元数据(例如下一步操作指向)仍会保留!
我们可以看到当前智能体的状态已随分叉更新。
all_states = [state for state in graph.get_state_history(thread) ]
all_states[0].values["messages"]
[HumanMessage(content='Multiply 5 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]
graph.get_state({'configurable': {'thread_id': '1'}})
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 5 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}, next=('assistant',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a442-3661-62f6-8001-d3c01b96f98b'}}, metadata={'source': 'update', 'step': 1, 'writes': {'__start__': {'messages': [HumanMessage(content='Multiply 5 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d')]}}, 'parents': {}}, created_at='2024-09-03T22:30:35.598707+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a440-a003-6c74-8000-8a2d82b0d126'}}, tasks=(PregelTask(id='f8990132-a8d3-5ddd-8d9e-1efbfc220da1', name='assistant', error=None, interrupts=(), state=None),))
现在,当我们进行流式处理时,图结构知道这个检查点从未被执行过。
因此,图结构会重新运行,而非简单地回放。
for event in graph.stream(None, fork_config, stream_mode="values"):
event['messages'][-1].pretty_print()
================================ Human Message ================================= Multiply 5 and 3 ================================== Ai Message ================================== Tool Calls: multiply (call_KP2CVNMMUKMJAQuFmamHB21r) Call ID: call_KP2CVNMMUKMJAQuFmamHB21r Args: a: 5 b: 3 ================================= Tool Message ================================= Name: multiply 15 ================================== Ai Message ================================== The result of multiplying 5 and 3 is 15.
现在,我们可以看到当前状态标志着智能体运行的结束。
graph.get_state({'configurable': {'thread_id': '1'}})
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 5 and 3', id='4ee8c440-0e4a-47d7-852f-06e2a6c4f84d'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_KP2CVNMMUKMJAQuFmamHB21r', 'function': {'arguments': '{"a":5,"b":3}', 'name': 'multiply'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 131, 'total_tokens': 148}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-bc420009-d1f6-49b8-bea7-dfc9fca7eb79-0', tool_calls=[{'name': 'multiply', 'args': {'a': 5, 'b': 3}, 'id': 'call_KP2CVNMMUKMJAQuFmamHB21r', 'type': 'tool_call'}], usage_metadata={'input_tokens': 131, 'output_tokens': 17, 'total_tokens': 148}), ToolMessage(content='15', name='multiply', id='9232e653-816d-471a-9002-9a1ecd453364', tool_call_id='call_KP2CVNMMUKMJAQuFmamHB21r'), AIMessage(content='The result of multiplying 5 and 3 is 15.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 156, 'total_tokens': 170}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-86c21888-d832-47c5-9e76-0aa2676116dc-0', usage_metadata={'input_tokens': 156, 'output_tokens': 14, 'total_tokens': 170})]}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a442-a2e2-6e98-8004-4a0b75537950'}}, metadata={'source': 'loop', 'writes': {'assistant': {'messages': [AIMessage(content='The result of multiplying 5 and 3 is 15.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 156, 'total_tokens': 170}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-86c21888-d832-47c5-9e76-0aa2676116dc-0', usage_metadata={'input_tokens': 156, 'output_tokens': 14, 'total_tokens': 170})]}}, 'step': 4, 'parents': {}}, created_at='2024-09-03T22:30:46.976463+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a442-9db0-6056-8003-7304cab7bed8'}}, tasks=())
使用 LangGraph API 进行时间旅行¶
⚠️ 免责声明
自这些视频录制以来,我们已更新 Studio 使其可在本地运行并在浏览器中打开。现在这是运行 Studio 的首选方式(而非视频中展示的桌面应用)。关于本地开发服务器的文档请参见此处,本地运行 Studio 的说明请参见此处。要启动本地开发服务器,请在本模块的 /studio
目录下运行以下终端命令:
langgraph dev
你将看到如下输出:
- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio 界面: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API 文档: http://127.0.0.1:2024/docs
打开浏览器并访问 Studio 界面:https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
。
我们通过 SDK 连接到该界面,并展示 LangGraph API 如何支持时间旅行。
if 'google.colab' in str(get_ipython()):
raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")
重放执行¶
让我们运行智能体流式传输updates
(更新),在每次节点调用后同步更新图谱状态。
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
thread = await client.threads.create()
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id = "agent",
input=initial_input,
stream_mode="updates",
):
if chunk.data:
assisant_node = chunk.data.get('assistant', {}).get('messages', [])
tool_node = chunk.data.get('tools', {}).get('messages', [])
if assisant_node:
print("-" * 20+"Assistant Node"+"-" * 20)
print(assisant_node[-1])
elif tool_node:
print("-" * 20+"Tools Node"+"-" * 20)
print(tool_node[-1])
--------------------Assistant Node-------------------- {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_SG7XYqDENGq7mwXrnioNLosS', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-2c120fc3-3c82-4599-b8ec-24fbee207cad', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_SG7XYqDENGq7mwXrnioNLosS', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None} --------------------Tools Node-------------------- {'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '3b40d091-58b2-4566-a84c-60af67206307', 'tool_call_id': 'call_SG7XYqDENGq7mwXrnioNLosS', 'artifact': None, 'status': 'success'} --------------------Assistant Node-------------------- {'content': 'The result of multiplying 2 and 3 is 6.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_fde2829a40'}, 'type': 'ai', 'name': None, 'id': 'run-1272d9b0-a0aa-4ff7-8bad-fdffd27c5506', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}
现在,让我们看看如何从指定的检查点进行重放。
我们只需要传入 checkpoint_id
参数即可。
states = await client.threads.get_history(thread['thread_id'])
to_replay = states[-2]
to_replay
{'values': {'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042', 'example': False}]}, 'next': ['assistant'], 'tasks': [{'id': 'e497456f-827a-5027-87bd-b0ccd54aa89a', 'name': 'assistant', 'error': None, 'interrupts': [], 'state': None}], 'metadata': {'step': 0, 'run_id': '1ef6a449-7fbc-6c90-8754-4e6b1b582790', 'source': 'loop', 'writes': None, 'parents': {}, 'user_id': '', 'graph_id': 'agent', 'thread_id': '708e1d8f-f7c8-4093-9bb4-999c4237cb4a', 'created_by': 'system', 'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'}, 'created_at': '2024-09-03T22:33:51.380352+00:00', 'checkpoint_id': '1ef6a449-817f-6b55-8000-07c18fbdf7c8', 'parent_checkpoint_id': '1ef6a449-816c-6fd6-bfff-32a56dd2635f'}
让我们使用 stream_mode="values"
进行流式传输,以便在回放时查看每个节点的完整状态。
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None,
stream_mode="values",
checkpoint_id=to_replay['checkpoint_id']
):
print(f"Receiving new event of type: {chunk.event}...")
print(chunk.data)
print("\n\n")
Receiving new event of type: metadata... {'run_id': '1ef6a44a-5806-6bb1-b2ee-92ecfda7f67d'} Receiving new event of type: values... {'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042', 'example': False}]} Receiving new event of type: values... {'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042', 'example': False}, {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-e60d82d7-7743-4f13-bebd-3616a88720a9', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}]} Receiving new event of type: values... {'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042', 'example': False}, {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-e60d82d7-7743-4f13-bebd-3616a88720a9', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}, {'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': 'f1be0b83-4565-4aa2-9b9a-cd8874c6a2bc', 'tool_call_id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'artifact': None, 'status': 'success'}]} Receiving new event of type: values... {'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'df98147a-cb3d-4f1a-b7f7-1545c4b6f042', 'example': False}, {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-e60d82d7-7743-4f13-bebd-3616a88720a9', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None}, {'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': 'f1be0b83-4565-4aa2-9b9a-cd8874c6a2bc', 'tool_call_id': 'call_Rn9YQ6iZyYtzrELBz7EfQcs0', 'artifact': None, 'status': 'success'}, {'content': 'The result of multiplying 2 and 3 is 6.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-55e5847a-d542-4977-84d7-24852e78b0a9', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]}
我们可以将此视为仅流式传输由我们回复的节点对状态所做的 updates
(更新)。
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None,
stream_mode="updates",
checkpoint_id=to_replay['checkpoint_id']
):
if chunk.data:
assisant_node = chunk.data.get('assistant', {}).get('messages', [])
tool_node = chunk.data.get('tools', {}).get('messages', [])
if assisant_node:
print("-" * 20+"Assistant Node"+"-" * 20)
print(assisant_node[-1])
elif tool_node:
print("-" * 20+"Tools Node"+"-" * 20)
print(tool_node[-1])
--------------------Assistant Node-------------------- {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_I2qudhMCwcw1GzcFN5q80rjj', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-550e75ad-dbbc-4e55-9f00-aa896228914c', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_I2qudhMCwcw1GzcFN5q80rjj', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None} --------------------Tools Node-------------------- {'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '731b7d4f-780d-4a8b-aec9-0d8b9c58c40a', 'tool_call_id': 'call_I2qudhMCwcw1GzcFN5q80rjj', 'artifact': None, 'status': 'success'} --------------------Assistant Node-------------------- {'content': 'The result of multiplying 2 and 3 is 6.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-2326afa5-eb43-4568-b5ed-424c0a0fa076', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
thread = await client.threads.create()
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=initial_input,
stream_mode="updates",
):
if chunk.data:
assisant_node = chunk.data.get('assistant', {}).get('messages', [])
tool_node = chunk.data.get('tools', {}).get('messages', [])
if assisant_node:
print("-" * 20+"Assistant Node"+"-" * 20)
print(assisant_node[-1])
elif tool_node:
print("-" * 20+"Tools Node"+"-" * 20)
print(tool_node[-1])
--------------------Assistant Node-------------------- {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_HdWoyLELFZGEcqGxFt2fZzek', 'function': {'arguments': '{"a":2,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-cbd081b1-8cef-4ca8-9dd5-aceb134404dc', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'call_HdWoyLELFZGEcqGxFt2fZzek', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None} --------------------Tools Node-------------------- {'content': '6', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '11dd4a7f-0b6b-44da-b9a4-65f1677c8813', 'tool_call_id': 'call_HdWoyLELFZGEcqGxFt2fZzek', 'artifact': None, 'status': 'success'} --------------------Assistant Node-------------------- {'content': 'The result of multiplying 2 and 3 is 6.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-936cf990-9302-45c7-9051-6ff1e2e9f316', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}
states = await client.threads.get_history(thread['thread_id'])
to_fork = states[-2]
to_fork['values']
{'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {'example': False, 'additional_kwargs': {}, 'response_metadata': {}}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': '93c18b95-9050-4a52-99b8-9374e98ee5db', 'example': False}]}
to_fork['values']['messages'][0]['id']
'93c18b95-9050-4a52-99b8-9374e98ee5db'
to_fork['next']
['assistant']
to_fork['checkpoint_id']
'1ef6a44b-27ec-681c-8000-ff7e345aee7e'
让我们来编辑状态。
还记得 messages
的 reducer 是如何工作的:
- 默认会追加消息,除非我们提供了消息 ID
- 我们通过提供消息 ID 来实现覆盖消息(而非追加到状态中)
forked_input = {"messages": HumanMessage(content="Multiply 3 and 3",
id=to_fork['values']['messages'][0]['id'])}
forked_config = await client.threads.update_state(
thread["thread_id"],
forked_input,
checkpoint_id=to_fork['checkpoint_id']
)
forked_config
{'configurable': {'thread_id': 'c99502e7-b0d7-473e-8295-1ad60e2b7ed2', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a44b-90dc-68c8-8001-0c36898e0f34'}, 'checkpoint_id': '1ef6a44b-90dc-68c8-8001-0c36898e0f34'}
states = await client.threads.get_history(thread['thread_id'])
states[0]
{'values': {'messages': [{'content': 'Multiply 3 and 3', 'additional_kwargs': {'additional_kwargs': {}, 'response_metadata': {}, 'example': False}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': '93c18b95-9050-4a52-99b8-9374e98ee5db', 'example': False}]}, 'next': ['assistant'], 'tasks': [{'id': 'da5d6548-62ca-5e69-ba70-f6179b2743bd', 'name': 'assistant', 'error': None, 'interrupts': [], 'state': None}], 'metadata': {'step': 1, 'source': 'update', 'writes': {'__start__': {'messages': {'id': '93c18b95-9050-4a52-99b8-9374e98ee5db', 'name': None, 'type': 'human', 'content': 'Multiply 3 and 3', 'example': False, 'additional_kwargs': {}, 'response_metadata': {}}}}, 'parents': {}, 'graph_id': 'agent'}, 'created_at': '2024-09-03T22:34:46.678333+00:00', 'checkpoint_id': '1ef6a44b-90dc-68c8-8001-0c36898e0f34', 'parent_checkpoint_id': '1ef6a44b-27ec-681c-8000-ff7e345aee7e'}
要重新运行,我们传入 checkpoint_id
。
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None,
stream_mode="updates",
checkpoint_id=forked_config['checkpoint_id']
):
if chunk.data:
assisant_node = chunk.data.get('assistant', {}).get('messages', [])
tool_node = chunk.data.get('tools', {}).get('messages', [])
if assisant_node:
print("-" * 20+"Assistant Node"+"-" * 20)
print(assisant_node[-1])
elif tool_node:
print("-" * 20+"Tools Node"+"-" * 20)
print(tool_node[-1])
--------------------Assistant Node-------------------- {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_aodhCt5fWv33qVbO7Nsub9Q3', 'function': {'arguments': '{"a":3,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-e9759422-e537-4b9b-b583-36c688e13b4b', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 3, 'b': 3}, 'id': 'call_aodhCt5fWv33qVbO7Nsub9Q3', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None} --------------------Tools Node-------------------- {'content': '9', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '89787b0b-93de-4c0a-bea8-d2c3845534e1', 'tool_call_id': 'call_aodhCt5fWv33qVbO7Nsub9Q3', 'artifact': None, 'status': 'success'} --------------------Assistant Node-------------------- {'content': 'The result of multiplying 3 by 3 is 9.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-0e16610f-4e8d-46f3-a5df-c2f187fae593', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}
LangGraph Studio¶
让我们通过 Studio 用户界面来观察 agent
的分支执行情况,该代理使用了 module-1/studio/agent.py
中定义的配置,相关设置保存在 module-1/studio/langgraph.json
文件中。