动态断点¶
回顾¶
我们讨论了人在回路(human-in-the-loop)的应用动机:
(1) 审批机制
- 可以中断智能体运行,向用户展示当前状态,并允许用户批准某个操作
(2) 调试功能
- 能够回退执行图以复现或规避问题
(3) 编辑功能
- 支持修改运行状态
我们介绍了断点作为在特定步骤暂停执行图的通用方法,这实现了诸如审批机制
等用例
还演示了如何编辑执行图状态,并引入人工反馈。
目标¶
断点通常由开发者在图编译阶段设置在特定节点上。
但有时允许执行图动态自我中断会非常有用!
这种内部断点机制,可通过NodeInterrupt
实现。
具体优势包括:
(1) 支持条件触发(基于开发者定义的节点内部逻辑)
(2) 可向用户说明中断原因(通过向NodeInterrupt
传递任意自定义信息)
下面我们将创建一个根据输入长度触发NodeInterrupt
的执行图。
%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai langgraph_sdk
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.graph import START, END, StateGraph
class State(TypedDict):
input: str
def step_1(state: State) -> State:
print("---Step 1---")
return state
def step_2(state: State) -> State:
# Let's optionally raise a NodeInterrupt if the length of the input is longer than 5 characters
if len(state['input']) > 5:
raise NodeInterrupt(f"Received input that is longer than 5 characters: {state['input']}")
print("---Step 2---")
return state
def step_3(state: State) -> State:
print("---Step 3---")
return state
builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)
# Set up memory
memory = MemorySaver()
# Compile the graph with memory
graph = builder.compile(checkpointer=memory)
# View
display(Image(graph.get_graph().draw_mermaid_png()))
让我们用一个超过5个字符的输入来运行这个图。
initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "1"}}
# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
print(event)
{'input': 'hello world'} ---Step 1--- {'input': 'hello world'}
此时如果检查图状态,我们会发现下一个待执行的节点已设置为 step_2
。
state = graph.get_state(thread_config)
print(state.next)
('step_2',)
可以看到 Interrupt
已被记录到状态中。
print(state.tasks)
(PregelTask(id='6eb3910d-e231-5ba2-b25e-28ad575690bd', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),), state=None),)
我们可以尝试从断点处恢复图形执行。
但是,这只会重新运行相同的节点!
除非状态发生改变,否则我们将在此处陷入循环。
for event in graph.stream(None, thread_config, stream_mode="values"):
print(event)
{'input': 'hello world'}
state = graph.get_state(thread_config)
print(state.next)
('step_2',)
现在,我们可以更新状态。
graph.update_state(
thread_config,
{"input": "hi"},
)
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a434-06cf-6f1e-8002-0ea6dc69e075'}}
for event in graph.stream(None, thread_config, stream_mode="values"):
print(event)
{'input': 'hi'} ---Step 2--- {'input': 'hi'} ---Step 3--- {'input': 'hi'}
与 LangGraph API 的配合使用¶
⚠️ 免责声明
自这些视频录制以来,我们已对 Studio 进行了更新,现在可以本地运行并在浏览器中打开。这是目前运行 Studio 的首选方式(而非视频中展示的桌面应用程序)。关于本地开发服务器的文档请参见此处,运行开发服务器的说明请参阅此文档。要启动本地开发服务器,请在本模块的 /studio
目录下运行以下终端命令:
langgraph dev
您将看到如下输出:
- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio 界面: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API 文档: http://127.0.0.1:2024/docs
打开浏览器并访问 Studio 界面:https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
。
if 'google.colab' in str(get_ipython()):
raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")
我们通过 SDK 与其建立连接。
from langgraph_sdk import get_client
# This is the URL of the local development server
URL = "http://127.0.0.1:2024"
client = get_client(url=URL)
# Search all hosted graphs
assistants = await client.assistants.search()
thread = await client.threads.create()
input_dict = {"input": "hello world"}
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="dynamic_breakpoints",
input=input_dict,
stream_mode="values",):
print(f"Receiving new event of type: {chunk.event}...")
print(chunk.data)
print("\n\n")
Receiving new event of type: metadata... {'run_id': '1ef6a43a-1b04-64d0-9a79-1caff72c8a89'} Receiving new event of type: values... {'input': 'hello world'} Receiving new event of type: values... {'input': 'hello world'}
current_state = await client.threads.get_state(thread['thread_id'])
current_state['next']
['step_2']
await client.threads.update_state(thread['thread_id'], {"input": "hi!"})
{'configurable': {'thread_id': 'ea8c2912-987e-49d9-b890-6e81d46065f9', 'checkpoint_ns': '', 'checkpoint_id': '1ef6a43a-64b2-6e85-8002-3cf4f2873968'}, 'checkpoint_id': '1ef6a43a-64b2-6e85-8002-3cf4f2873968'}
async for chunk in client.runs.stream(
thread["thread_id"],
assistant_id="dynamic_breakpoints",
input=None,
stream_mode="values",):
print(f"Receiving new event of type: {chunk.event}...")
print(chunk.data)
print("\n\n")
Receiving new event of type: metadata... {'run_id': '1ef64c33-fb34-6eaf-8b59-1d85c5b8acc9'} Receiving new event of type: values... {'input': 'hi!'} Receiving new event of type: values... {'input': 'hi!'}
current_state = await client.threads.get_state(thread['thread_id'])
current_state
{'values': {'input': 'hi!'}, 'next': ['step_2'], 'tasks': [{'id': '858e41b2-6501-585c-9bca-55c1e729ef91', 'name': 'step_2', 'error': None, 'interrupts': [], 'state': None}], 'metadata': {'step': 2, 'source': 'update', 'writes': {'step_1': {'input': 'hi!'}}, 'parents': {}, 'graph_id': 'dynamic_breakpoints'}, 'created_at': '2024-09-03T22:27:05.707260+00:00', 'checkpoint_id': '1ef6a43a-64b2-6e85-8002-3cf4f2873968', 'parent_checkpoint_id': '1ef6a43a-1cb8-6c3d-8001-7b11d0d34f00'}