%%capture --no-stderr
%pip install --quiet -U langchain_core langgraph
默认覆盖状态¶
我们使用 TypedDict
作为状态模式。
from typing_extensions import TypedDict
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
foo: int
def node_1(state):
print("---Node 1---")
return {"foo": state['foo'] + 1}
# Build graph
builder = StateGraph(State)
builder.add_node("node_1", node_1)
# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)
# Add
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))
graph.invoke({"foo" : 1})
---Node 1---
{'foo': 2}
让我们来看状态更新的例子:return {"foo": state['foo'] + 1}
。
如前所述,默认情况下 LangGraph 并不知道更新状态的首选方式。
因此,它会直接覆盖 node_1
中的 foo
值:
return {"foo": state['foo'] + 1}
如果我们传入 {'foo': 1}
作为输入,图返回的状态将是 {'foo': 2}
。
分支控制¶
现在让我们看看节点出现分支的情况。
class State(TypedDict):
foo: int
def node_1(state):
print("---Node 1---")
return {"foo": state['foo'] + 1}
def node_2(state):
print("---Node 2---")
return {"foo": state['foo'] + 1}
def node_3(state):
print("---Node 3---")
return {"foo": state['foo'] + 1}
# Build graph
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_1", "node_3")
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)
# Add
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))
from langgraph.errors import InvalidUpdateError
try:
graph.invoke({"foo" : 1})
except InvalidUpdateError as e:
print(f"InvalidUpdateError occurred: {e}")
---Node 1--- ---Node 2--- ---Node 3--- InvalidUpdateError occurred: At key 'foo': Can receive only one value per step. Use an Annotated key to handle multiple values.
我们发现了一个问题!
节点1分支出节点2和节点3。
节点2和节点3并行运行,这意味着它们在图谱的同一个步骤中执行。
这两个节点都试图在同一个步骤内覆写状态。
这对图谱来说存在歧义!它应该保留哪个状态?
from operator import add
from typing import Annotated
class State(TypedDict):
foo: Annotated[list[int], add]
def node_1(state):
print("---Node 1---")
return {"foo": [state['foo'][0] + 1]}
# Build graph
builder = StateGraph(State)
builder.add_node("node_1", node_1)
# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)
# Add
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))
graph.invoke({"foo" : [1]})
---Node 1---
{'foo': [1, 2]}
现在,我们的状态键 foo
是一个列表。
这个 operator.add
归约函数会将每个节点的更新内容追加到该列表中。
def node_1(state):
print("---Node 1---")
return {"foo": [state['foo'][-1] + 1]}
def node_2(state):
print("---Node 2---")
return {"foo": [state['foo'][-1] + 1]}
def node_3(state):
print("---Node 3---")
return {"foo": [state['foo'][-1] + 1]}
# Build graph
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_1", "node_3")
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)
# Add
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))
我们可以看到,节点2和节点3的更新是并发执行的,因为它们处于同一个步骤中。
graph.invoke({"foo" : [1]})
---Node 1--- ---Node 2--- ---Node 3---
{'foo': [1, 2, 3, 3]}
现在,让我们看看如果向 foo
传递 None
会发生什么。
我们会看到一个错误,因为我们的归约函数 operator.add
试图将作为输入传递的 NoneType
与 node_1
中的列表进行拼接。
try:
graph.invoke({"foo" : None})
except TypeError as e:
print(f"TypeError occurred: {e}")
TypeError occurred: can only concatenate list (not "NoneType") to list
def reduce_list(left: list | None, right: list | None) -> list:
"""Safely combine two lists, handling cases where either or both inputs might be None.
Args:
left (list | None): The first list to combine, or None.
right (list | None): The second list to combine, or None.
Returns:
list: A new list containing all elements from both input lists.
If an input is None, it's treated as an empty list.
"""
if not left:
left = []
if not right:
right = []
return left + right
class DefaultState(TypedDict):
foo: Annotated[list[int], add]
class CustomReducerState(TypedDict):
foo: Annotated[list[int], reduce_list]
在 node_1
中,我们追加了数值 2。
def node_1(state):
print("---Node 1---")
return {"foo": [2]}
# Build graph
builder = StateGraph(DefaultState)
builder.add_node("node_1", node_1)
# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)
# Add
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))
try:
print(graph.invoke({"foo" : None}))
except TypeError as e:
print(f"TypeError occurred: {e}")
TypeError occurred: can only concatenate list (not "NoneType") to list
现在,尝试使用我们的自定义 reducer。可以看到没有抛出任何错误。
# Build graph
builder = StateGraph(CustomReducerState)
builder.add_node("node_1", node_1)
# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)
# Add
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))
try:
print(graph.invoke({"foo" : None}))
except TypeError as e:
print(f"TypeError occurred: {e}")
---Node 1--- {'foo': [2]}
消息处理¶
在模块1中,我们演示了如何使用内置的reducer add_messages
来管理状态中的消息。
我们还说明了如果你想处理消息,MessagesState
是一个实用的快捷方式。
MessagesState
内置了messages
键- 同时为该键内置了
add_messages
reducer
这两种方式是等效的。
为简洁起见,我们将通过 from langgraph.graph import MessagesState
来使用 MessagesState
类。
from typing import Annotated
from langgraph.graph import MessagesState
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
# Define a custom TypedDict that includes a list of messages with add_messages reducer
class CustomMessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
added_key_1: str
added_key_2: str
# etc
# Use MessagesState, which includes the messages key with add_messages reducer
class ExtendedMessagesState(MessagesState):
# Add any keys needed beyond messages, which is pre-built
added_key_1: str
added_key_2: str
# etc
让我们进一步探讨 add_messages
reducer 的使用方法。
from langgraph.graph.message import add_messages
from langchain_core.messages import AIMessage, HumanMessage
# Initial state
initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model"),
HumanMessage(content="I'm looking for information on marine biology.", name="Lance")
]
# New message to add
new_message = AIMessage(content="Sure, I can help with that. What specifically are you interested in?", name="Model")
# Test
add_messages(initial_messages , new_message)
[AIMessage(content='Hello! How can I assist you?', name='Model', id='f470d868-cf1b-45b2-ae16-48154cd55c12'), HumanMessage(content="I'm looking for information on marine biology.", name='Lance', id='a07a88c5-cb2a-4cbd-9485-5edb9d658366'), AIMessage(content='Sure, I can help with that. What specifically are you interested in?', name='Model', id='7938e615-86c2-4cbb-944b-c9b2342dee68')]
因此我们可以看到,add_messages
允许我们将消息追加到状态中的 messages
键。
重写机制¶
下面展示一些使用 add_messages
缩减器时的实用技巧。
如果我们传递的消息 ID 与 messages
列表中已存在的消息相同,该消息将被覆盖!
# Initial state
initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model", id="1"),
HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2")
]
# New message to add
new_message = HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2")
# Test
add_messages(initial_messages , new_message)
[AIMessage(content='Hello! How can I assist you?', name='Model', id='1'), HumanMessage(content="I'm looking for information on whales, specifically", name='Lance', id='2')]
from langchain_core.messages import RemoveMessage
# Message list
messages = [AIMessage("Hi.", name="Bot", id="1")]
messages.append(HumanMessage("Hi.", name="Lance", id="2"))
messages.append(AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"))
messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4"))
# Isolate messages to delete
delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
print(delete_messages)
[RemoveMessage(content='', id='1'), RemoveMessage(content='', id='2')]
/var/folders/l9/bpjxdmfx7lvd1fbdjn38y5dh0000gn/T/ipykernel_17703/3097054180.py:10: LangChainBetaWarning: The class `RemoveMessage` is in beta. It is actively being worked on, so the API may change. delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
add_messages(messages , delete_messages)
[AIMessage(content='So you said you were researching ocean mammals?', name='Bot', id='3'), HumanMessage(content='Yes, I know about whales. But what others should I learn about?', name='Lance', id='4')]
我们可以看到,delete_messages
中标注的消息 ID 1 和 2 已被 reducer 移除。
稍后我们将看到这一机制的实际应用。