%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai langgraph_sdk langgraph-prebuilt
import os, getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
from langchain_openai import ChatOpenAI
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a * b
# This will be a tool
def add(a: int, b: int) -> int:
"""Adds a and b.
Args:
a: first int
b: second int
"""
return a + b
def divide(a: int, b: int) -> float:
"""Divide a by b.
Args:
a: first int
b: second int
"""
return a / b
tools = [add, multiply, divide]
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)
from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_core.messages import HumanMessage, SystemMessage
# System message
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
# Node
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# Graph
builder = StateGraph(MessagesState)
# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# Define edges: these determine the control flow
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
builder.add_edge("tools", "assistant")
memory = MemorySaver()
graph = builder.compile(interrupt_before=["assistant"], checkpointer=memory)
# Show
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
让我们开始吧!
可以看到图表在聊天模型响应之前就中断了。
# Input
initial_input = {"messages": "Multiply 2 and 3"}
# Thread
thread = {"configurable": {"thread_id": "1"}}
# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
================================ Human Message =================================
Multiply 2 and 3
state = graph.get_state(thread)
state
StateSnapshot(values={'messages': [HumanMessage(content='Multiply 2 and 3', id='e7edcaba-bfed-4113-a85b-25cc39d6b5a7')]}, next=('assistant',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a412-5b2d-601a-8000-4af760ea1d0d'}}, metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}}, created_at='2024-09-03T22:09:10.966883+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a412-5b28-6ace-bfff-55d7a2c719ae'}}, tasks=(PregelTask(id='dbee122a-db69-51a7-b05b-a21fab160696', name='assistant', error=None, interrupts=(), state=None),))
现在,我们可以直接应用状态更新。
请注意,对 messages
键的更新将使用 add_messages
归约器:
- 若需覆盖现有消息,可提供消息的
id
; - 若只需在消息列表末尾追加新消息,则可传入未指定
id
的消息(如下所示)。
graph.update_state(
thread,
{"messages": [HumanMessage(content="No, actually multiply 3 and 3!")]},
)
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a414-f419-6182-8001-b9e899eca7e5'}}
让我们来看一下。
我们调用了 update_state
并传入了一条新消息。
add_messages
这个 reducer 会将其追加到我们的状态键 messages
中。
new_state = graph.get_state(thread).values
for m in new_state['messages']:
m.pretty_print()
================================ Human Message ================================= Multiply 2 and 3 ================================ Human Message ================================= No, actually multiply 3 and 3!
现在,让我们继续执行代理程序,只需传入 None
并允许它从当前状态继续运行。
我们输出当前状态,然后继续执行剩余的节点。
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
================================ Human Message ================================= No, actually multiply 3 and 3! ================================== Ai Message ================================== Tool Calls: multiply (call_Mbu8MfA0krQh8rkZZALYiQMk) Call ID: call_Mbu8MfA0krQh8rkZZALYiQMk Args: a: 3 b: 3 ================================= Tool Message ================================= Name: multiply 9
现在,我们回到了 assistant
,这里设有我们的 breakpoint
(断点)。
我们可以再次传入 None
以继续执行。
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
================================= Tool Message ================================= Name: multiply 9 ================================== Ai Message ================================== 3 multiplied by 3 equals 9.
在 Studio 中编辑图状态¶
⚠️ 免责声明
自这些视频录制以来,我们已更新 Studio 使其可在本地运行并通过浏览器访问。现在这是运行 Studio 的首选方式(而非视频中展示的桌面应用)。关于本地开发服务器的文档请参见此处,运行说明请参见此处。要启动本地开发服务器,请在本模块的 /studio
目录下运行以下终端命令:
langgraph dev
您将看到如下输出:
- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio 界面: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API 文档: http://127.0.0.1:2024/docs
打开浏览器并访问 Studio 界面:https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
。
LangGraph API 支持编辑图状态。
if 'google.colab' in str(get_ipython()):
raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")
# This is the URL of the local development server
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")
我们的智能体定义在 studio/agent.py
文件中。
查看代码时你会发现,它并没有设置断点!
当然,我们可以直接修改 agent.py
来添加断点,但该 API 的一个绝佳特性是:我们可以直接传入断点参数!
这里我们传入了 interrupt_before=["assistant"]
参数。
initial_input = {"messages": "Multiply 2 and 3"}
thread = await client.threads.create()
async for chunk in client.runs.stream(
thread["thread_id"],
"agent",
input=initial_input,
stream_mode="values",
interrupt_before=["assistant"],
):
print(f"Receiving new event of type: {chunk.event}...")
messages = chunk.data.get('messages', [])
if messages:
print(messages[-1])
print("-" * 50)
Receiving new event of type: metadata... -------------------------------------------------- Receiving new event of type: values... {'content': 'Multiply 2 and 3', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': '882dabe4-b877-4d71-bd09-c34cb97c4f46', 'example': False} --------------------------------------------------
我们可以获取当前状态
current_state = await client.threads.get_state(thread['thread_id'])
current_state
{'values': {'messages': [{'content': 'Multiply 2 and 3', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': '882dabe4-b877-4d71-bd09-c34cb97c4f46', 'example': False}]}, 'next': ['assistant'], 'tasks': [{'id': 'a71c0b80-a679-57cb-aa59-a1655b763480', 'name': 'assistant', 'error': None, 'interrupts': [], 'state': None}], 'metadata': {'step': 0, 'run_id': '1ef6a41c-ea63-663f-b3e8-4f001bf0bf53', 'source': 'loop', 'writes': None, 'parents': {}, 'user_id': '', 'graph_id': 'agent', 'thread_id': 'a95ffa54-2435-4a47-a9da-e886369ca8ee', 'created_by': 'system', 'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'}, 'created_at': '2024-09-03T22:13:54.466695+00:00', 'checkpoint_id': '1ef6a41c-ead7-637b-8000-8c6a7b98379e', 'parent_checkpoint_id': '1ef6a41c-ead3-637d-bfff-397ebdb4f2ea'}
我们可以查看状态中的最后一条消息。
last_message = current_state['values']['messages'][-1]
last_message
{'content': 'Multiply 2 and 3', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': '882dabe4-b877-4d71-bd09-c34cb97c4f46', 'example': False}
我们可以编辑它!
last_message['content'] = "No, actually multiply 3 and 3!"
last_message
{'content': 'No, actually multiply 3 and 3!', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': '882dabe4-b877-4d71-bd09-c34cb97c4f46', 'example': False}
last_message
{'content': 'No, actually multiply 3 and 3!', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': '882dabe4-b877-4d71-bd09-c34cb97c4f46', 'example': False}
请记住,正如我们之前所说,对 messages
键的更新将使用相同的 add_messages
归约器。
若需要覆盖现有消息,我们可以提供消息的 id
参数。
如上所示,我们正是这样操作的——仅修改了消息的 content
内容。
await client.threads.update_state(thread['thread_id'], {"messages": last_message})
{'configurable': {'thread_id': 'a95ffa54-2435-4a47-a9da-e886369ca8ee', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a41d-cc8e-6979-8001-8c7c283b636c'}, 'checkpoint_id': '1ef6a41d-cc8e-6979-8001-8c7c283b636c'}
现在,我们通过传入 None
来恢复执行。
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None,
stream_mode="values",
interrupt_before=["assistant"],
):
print(f"Receiving new event of type: {chunk.event}...")
messages = chunk.data.get('messages', [])
if messages:
print(messages[-1])
print("-" * 50)
Receiving new event of type: metadata... -------------------------------------------------- Receiving new event of type: values... {'content': 'No, actually multiply 3 and 3!', 'additional_kwargs': {'additional_kwargs': {}, 'response_metadata': {}, 'example': False}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': '882dabe4-b877-4d71-bd09-c34cb97c4f46', 'example': False} -------------------------------------------------- Receiving new event of type: values... {'content': '', 'additional_kwargs': {'tool_calls': [{'index': 0, 'id': 'call_vi16je2EIikHuT7Aue2sd1qd', 'function': {'arguments': '{"a":3,"b":3}', 'name': 'multiply'}, 'type': 'function'}]}, 'response_metadata': {'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-775b42f7-0590-4c54-aaeb-78599b1f12d2', 'example': False, 'tool_calls': [{'name': 'multiply', 'args': {'a': 3, 'b': 3}, 'id': 'call_vi16je2EIikHuT7Aue2sd1qd', 'type': 'tool_call'}], 'invalid_tool_calls': [], 'usage_metadata': None} -------------------------------------------------- Receiving new event of type: values... {'content': '9', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '226bfbad-0cea-4900-80c5-761a62bd4bc1', 'tool_call_id': 'call_vi16je2EIikHuT7Aue2sd1qd', 'artifact': None, 'status': 'success'} --------------------------------------------------
我们如期获得了工具调用的结果 9
。
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input=None,
stream_mode="values",
interrupt_before=["assistant"],
):
print(f"Receiving new event of type: {chunk.event}...")
messages = chunk.data.get('messages', [])
if messages:
print(messages[-1])
print("-" * 50)
Receiving new event of type: metadata... -------------------------------------------------- Receiving new event of type: values... {'content': '9', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'tool', 'name': 'multiply', 'id': '226bfbad-0cea-4900-80c5-761a62bd4bc1', 'tool_call_id': 'call_vi16je2EIikHuT7Aue2sd1qd', 'artifact': None, 'status': 'success'} -------------------------------------------------- Receiving new event of type: values... {'content': 'The result of multiplying 3 by 3 is 9.', 'additional_kwargs': {}, 'response_metadata': {'finish_reason': 'stop', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_157b3831f5'}, 'type': 'ai', 'name': None, 'id': 'run-859bbf47-9f35-4e71-ae98-9d93ee49d16c', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None} --------------------------------------------------
等待用户输入¶
显然,我们可以在断点后编辑智能体的状态。
那么,如果我们希望允许通过人工反馈来执行这种状态更新呢?
我们将在智能体中添加一个作为人工反馈占位符的节点。
这个human_feedback
节点允许用户直接向状态添加反馈。
我们通过在human_feedback
节点前设置interrupt_before
来指定断点。
同时配置检查点保存器,用于保存运行至该节点前的图状态。
# System message
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
# no-op node that should be interrupted on
def human_feedback(state: MessagesState):
pass
# Assistant node
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# Graph
builder = StateGraph(MessagesState)
# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_node("human_feedback", human_feedback)
# Define edges: these determine the control flow
builder.add_edge(START, "human_feedback")
builder.add_edge("human_feedback", "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
builder.add_edge("tools", "human_feedback")
memory = MemorySaver()
graph = builder.compile(interrupt_before=["human_feedback"], checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))
我们将从用户处获取反馈。
与之前一样,我们使用 .update_state
方法根据获得的人工反馈来更新图的状态。
通过 as_node="human_feedback"
参数,我们将此状态更新应用于指定的 human_feedback
节点。
# Input
initial_input = {"messages": "Multiply 2 and 3"}
# Thread
thread = {"configurable": {"thread_id": "5"}}
# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
# Get user input
user_input = input("Tell me how you want to update the state: ")
# We now update the state as if we are the human_feedback node
graph.update_state(thread, {"messages": user_input}, as_node="human_feedback")
# Continue the graph execution
for event in graph.stream(None, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
================================ Human Message ================================= Multiply 2 and 3 ================================ Human Message ================================= no, multiply 3 and 3 ================================== Ai Message ================================== Tool Calls: multiply (call_sewrDyCrAJBQQecusUoT6OJ6) Call ID: call_sewrDyCrAJBQQecusUoT6OJ6 Args: a: 3 b: 3 ================================= Tool Message ================================= Name: multiply 9
# Continue the graph execution
for event in graph.stream(None, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
================================= Tool Message ================================= Name: multiply 9 ================================== Ai Message ================================== The result of multiplying 3 and 3 is 9.