Langchain Academy translated
  • module-0
    • LangChain 学院
  • module-1
    • 智能体记忆
    • 智能体
    • 链式结构
    • 部署
    • 路由器
    • 最简单的图结构
  • module-2
    • 支持消息摘要与外部数据库记忆的聊天机器人
    • 支持消息摘要的聊天机器人
    • 多模式架构
    • 状态归约器
    • 状态模式
    • 消息过滤与修剪
  • module-3
    • 断点
    • 动态断点
    • 编辑图状态
    • 流式处理
    • 时间回溯
  • module-4
    • 映射-归约
    • 并行节点执行
    • 研究助手
    • 子图
  • module-5
    • 记忆代理
    • 具备记忆功能的聊天机器人
    • 基于集合架构的聊天机器人
    • 支持个人资料架构的聊天机器人
  • module-6
    • 助手
    • 连接 LangGraph 平台部署
    • 创建部署
    • 双重消息处理
  • Search
  • Previous
  • Next
  • 动态断点
    • 回顾
    • 目标

在 Colab 中打开 在 LangChain Academy 中打开

动态断点¶

回顾¶

我们讨论了人在回路(human-in-the-loop)的应用动机:

(1) 审批机制 - 可以中断智能体运行,向用户展示当前状态,并允许用户批准某个操作

(2) 调试功能 - 能够回退执行图以复现或规避问题

(3) 编辑功能 - 支持修改运行状态

我们介绍了断点作为在特定步骤暂停执行图的通用方法,这实现了诸如审批机制等用例

还演示了如何编辑执行图状态,并引入人工反馈。

目标¶

断点通常由开发者在图编译阶段设置在特定节点上。

但有时允许执行图动态自我中断会非常有用!

这种内部断点机制,可通过NodeInterrupt实现。

具体优势包括:

(1) 支持条件触发(基于开发者定义的节点内部逻辑)

(2) 可向用户说明中断原因(通过向NodeInterrupt传递任意自定义信息)

下面我们将创建一个根据输入长度触发NodeInterrupt的执行图。

In [ ]:
Copied!
%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai langgraph_sdk
%%capture --no-stderr %pip install --quiet -U langgraph langchain_openai langgraph_sdk
In [1]:
Copied!
from IPython.display import Image, display

from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.graph import START, END, StateGraph

class State(TypedDict):
    input: str

def step_1(state: State) -> State:
    print("---Step 1---")
    return state

def step_2(state: State) -> State:
    # Let's optionally raise a NodeInterrupt if the length of the input is longer than 5 characters
    if len(state['input']) > 5:
        raise NodeInterrupt(f"Received input that is longer than 5 characters: {state['input']}")
    
    print("---Step 2---")
    return state

def step_3(state: State) -> State:
    print("---Step 3---")
    return state

builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Compile the graph with memory
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))
from IPython.display import Image, display from typing_extensions import TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph.errors import NodeInterrupt from langgraph.graph import START, END, StateGraph class State(TypedDict): input: str def step_1(state: State) -> State: print("---Step 1---") return state def step_2(state: State) -> State: # Let's optionally raise a NodeInterrupt if the length of the input is longer than 5 characters if len(state['input']) > 5: raise NodeInterrupt(f"Received input that is longer than 5 characters: {state['input']}") print("---Step 2---") return state def step_3(state: State) -> State: print("---Step 3---") return state builder = StateGraph(State) builder.add_node("step_1", step_1) builder.add_node("step_2", step_2) builder.add_node("step_3", step_3) builder.add_edge(START, "step_1") builder.add_edge("step_1", "step_2") builder.add_edge("step_2", "step_3") builder.add_edge("step_3", END) # Set up memory memory = MemorySaver() # Compile the graph with memory graph = builder.compile(checkpointer=memory) # View display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image

让我们用一个超过5个字符的输入来运行这个图。

In [2]:
Copied!
initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
initial_input = {"input": "hello world"} thread_config = {"configurable": {"thread_id": "1"}} # Run the graph until the first interruption for event in graph.stream(initial_input, thread_config, stream_mode="values"): print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}

此时如果检查图状态,我们会发现下一个待执行的节点已设置为 step_2。

In [3]:
Copied!
state = graph.get_state(thread_config)
print(state.next)
state = graph.get_state(thread_config) print(state.next)
('step_2',)

可以看到 Interrupt 已被记录到状态中。

In [4]:
Copied!
print(state.tasks)
print(state.tasks)
(PregelTask(id='6eb3910d-e231-5ba2-b25e-28ad575690bd', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),), state=None),)

我们可以尝试从断点处恢复图形执行。

但是,这只会重新运行相同的节点!

除非状态发生改变,否则我们将在此处陷入循环。

In [5]:
Copied!
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)
for event in graph.stream(None, thread_config, stream_mode="values"): print(event)
{'input': 'hello world'}
In [6]:
Copied!
state = graph.get_state(thread_config)
print(state.next)
state = graph.get_state(thread_config) print(state.next)
('step_2',)

现在,我们可以更新状态。

In [7]:
Copied!
graph.update_state(
    thread_config,
    {"input": "hi"},
)
graph.update_state( thread_config, {"input": "hi"}, )
Out[7]:
{'configurable': {'thread_id': '1',
  'checkpoint_ns': '',
  'checkpoint_id': '1ef6a434-06cf-6f1e-8002-0ea6dc69e075'}}
In [8]:
Copied!
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)
for event in graph.stream(None, thread_config, stream_mode="values"): print(event)
{'input': 'hi'}
---Step 2---
{'input': 'hi'}
---Step 3---
{'input': 'hi'}

与 LangGraph API 的配合使用¶

⚠️ 免责声明

自这些视频录制以来,我们已对 Studio 进行了更新,现在可以本地运行并在浏览器中打开。这是目前运行 Studio 的首选方式(而非视频中展示的桌面应用程序)。关于本地开发服务器的文档请参见此处,运行开发服务器的说明请参阅此文档。要启动本地开发服务器,请在本模块的 /studio 目录下运行以下终端命令:

langgraph dev

您将看到如下输出:

- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio 界面: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API 文档: http://127.0.0.1:2024/docs

打开浏览器并访问 Studio 界面:https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024。

In [1]:
Copied!
if 'google.colab' in str(get_ipython()):
    raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")
if 'google.colab' in str(get_ipython()): raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")

我们通过 SDK 与其建立连接。

In [2]:
Copied!
from langgraph_sdk import get_client

# This is the URL of the local development server
URL = "http://127.0.0.1:2024"
client = get_client(url=URL)

# Search all hosted graphs
assistants = await client.assistants.search()
from langgraph_sdk import get_client # This is the URL of the local development server URL = "http://127.0.0.1:2024" client = get_client(url=URL) # Search all hosted graphs assistants = await client.assistants.search()
In [12]:
Copied!
thread = await client.threads.create()
input_dict = {"input": "hello world"}

async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="dynamic_breakpoints",
    input=input_dict,
    stream_mode="values",):
    
    print(f"Receiving new event of type: {chunk.event}...")
    print(chunk.data)
    print("\n\n")
thread = await client.threads.create() input_dict = {"input": "hello world"} async for chunk in client.runs.stream( thread["thread_id"], assistant_id="dynamic_breakpoints", input=input_dict, stream_mode="values",): print(f"Receiving new event of type: {chunk.event}...") print(chunk.data) print("\n\n")
Receiving new event of type: metadata...
{'run_id': '1ef6a43a-1b04-64d0-9a79-1caff72c8a89'}



Receiving new event of type: values...
{'input': 'hello world'}



Receiving new event of type: values...
{'input': 'hello world'}



In [13]:
Copied!
current_state = await client.threads.get_state(thread['thread_id'])
current_state = await client.threads.get_state(thread['thread_id'])
In [14]:
Copied!
current_state['next']
current_state['next']
Out[14]:
['step_2']
In [15]:
Copied!
await client.threads.update_state(thread['thread_id'], {"input": "hi!"})
await client.threads.update_state(thread['thread_id'], {"input": "hi!"})
Out[15]:
{'configurable': {'thread_id': 'ea8c2912-987e-49d9-b890-6e81d46065f9',
  'checkpoint_ns': '',
  'checkpoint_id': '1ef6a43a-64b2-6e85-8002-3cf4f2873968'},
 'checkpoint_id': '1ef6a43a-64b2-6e85-8002-3cf4f2873968'}
In [23]:
Copied!
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="dynamic_breakpoints",
    input=None,
    stream_mode="values",):
    
    print(f"Receiving new event of type: {chunk.event}...")
    print(chunk.data)
    print("\n\n")
async for chunk in client.runs.stream( thread["thread_id"], assistant_id="dynamic_breakpoints", input=None, stream_mode="values",): print(f"Receiving new event of type: {chunk.event}...") print(chunk.data) print("\n\n")
Receiving new event of type: metadata...
{'run_id': '1ef64c33-fb34-6eaf-8b59-1d85c5b8acc9'}



Receiving new event of type: values...
{'input': 'hi!'}



Receiving new event of type: values...
{'input': 'hi!'}



In [16]:
Copied!
current_state = await client.threads.get_state(thread['thread_id'])
current_state
current_state = await client.threads.get_state(thread['thread_id']) current_state
Out[16]:
{'values': {'input': 'hi!'},
 'next': ['step_2'],
 'tasks': [{'id': '858e41b2-6501-585c-9bca-55c1e729ef91',
   'name': 'step_2',
   'error': None,
   'interrupts': [],
   'state': None}],
 'metadata': {'step': 2,
  'source': 'update',
  'writes': {'step_1': {'input': 'hi!'}},
  'parents': {},
  'graph_id': 'dynamic_breakpoints'},
 'created_at': '2024-09-03T22:27:05.707260+00:00',
 'checkpoint_id': '1ef6a43a-64b2-6e85-8002-3cf4f2873968',
 'parent_checkpoint_id': '1ef6a43a-1cb8-6c3d-8001-7b11d0d34f00'}
In [ ]:
Copied!


Documentation built with MkDocs.

Search

From here you can search these documents. Enter your search terms below.

Keyboard Shortcuts

Keys Action
? Open this help
n Next page
p Previous page
s Search