Langchain Academy translated
  • module-0
    • LangChain 学院
  • module-1
    • 智能体记忆
    • 智能体
    • 链式结构
    • 部署
    • 路由器
    • 最简单的图结构
  • module-2
    • 支持消息摘要与外部数据库记忆的聊天机器人
    • 支持消息摘要的聊天机器人
    • 多模式架构
    • 状态归约器
    • 状态模式
    • 消息过滤与修剪
  • module-3
    • 断点
    • 动态断点
    • 编辑图状态
    • 流式处理
    • 时间回溯
  • module-4
    • 映射-归约
    • 并行节点执行
    • 研究助手
    • 子图
  • module-5
    • 记忆代理
    • 具备记忆功能的聊天机器人
    • 基于集合架构的聊天机器人
    • 支持个人资料架构的聊天机器人
  • module-6
    • 助手
    • 连接 LangGraph 平台部署
    • 创建部署
    • 双重消息处理
  • Search
  • Previous
  • Next
  • 状态归约器
    • 回顾
    • 目标
    • 默认覆盖状态
    • 分支控制
    • 归约器
    • 自定义归约器
    • 消息处理

在 Colab 中打开 在 LangChain Academy 中打开

状态归约器¶

回顾¶

我们探讨了定义LangGraph状态模式的几种不同方法,包括TypedDict、Pydantic或Dataclasses。

目标¶

现在我们将深入探讨归约器(reducers),它用于指定如何在状态模式的特定键/通道上执行状态更新。

In [ ]:
Copied!
%%capture --no-stderr
%pip install --quiet -U langchain_core langgraph
%%capture --no-stderr %pip install --quiet -U langchain_core langgraph

默认覆盖状态¶

我们使用 TypedDict 作为状态模式。

In [1]:
Copied!
from typing_extensions import TypedDict
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    foo: int

def node_1(state):
    print("---Node 1---")
    return {"foo": state['foo'] + 1}

# Build graph
builder = StateGraph(State)
builder.add_node("node_1", node_1)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
from typing_extensions import TypedDict from IPython.display import Image, display from langgraph.graph import StateGraph, START, END class State(TypedDict): foo: int def node_1(state): print("---Node 1---") return {"foo": state['foo'] + 1} # Build graph builder = StateGraph(State) builder.add_node("node_1", node_1) # Logic builder.add_edge(START, "node_1") builder.add_edge("node_1", END) # Add graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image
In [2]:
Copied!
graph.invoke({"foo" : 1})
graph.invoke({"foo" : 1})
---Node 1---
Out[2]:
{'foo': 2}

让我们来看状态更新的例子:return {"foo": state['foo'] + 1}。

如前所述,默认情况下 LangGraph 并不知道更新状态的首选方式。

因此,它会直接覆盖 node_1 中的 foo 值:

return {"foo": state['foo'] + 1}

如果我们传入 {'foo': 1} 作为输入,图返回的状态将是 {'foo': 2}。

分支控制¶

现在让我们看看节点出现分支的情况。

In [3]:
Copied!
class State(TypedDict):
    foo: int

def node_1(state):
    print("---Node 1---")
    return {"foo": state['foo'] + 1}

def node_2(state):
    print("---Node 2---")
    return {"foo": state['foo'] + 1}

def node_3(state):
    print("---Node 3---")
    return {"foo": state['foo'] + 1}

# Build graph
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_1", "node_3")
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
class State(TypedDict): foo: int def node_1(state): print("---Node 1---") return {"foo": state['foo'] + 1} def node_2(state): print("---Node 2---") return {"foo": state['foo'] + 1} def node_3(state): print("---Node 3---") return {"foo": state['foo'] + 1} # Build graph builder = StateGraph(State) builder.add_node("node_1", node_1) builder.add_node("node_2", node_2) builder.add_node("node_3", node_3) # Logic builder.add_edge(START, "node_1") builder.add_edge("node_1", "node_2") builder.add_edge("node_1", "node_3") builder.add_edge("node_2", END) builder.add_edge("node_3", END) # Add graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image
In [4]:
Copied!
from langgraph.errors import InvalidUpdateError
try:
    graph.invoke({"foo" : 1})
except InvalidUpdateError as e:
    print(f"InvalidUpdateError occurred: {e}")
from langgraph.errors import InvalidUpdateError try: graph.invoke({"foo" : 1}) except InvalidUpdateError as e: print(f"InvalidUpdateError occurred: {e}")
---Node 1---
---Node 2---
---Node 3---
InvalidUpdateError occurred: At key 'foo': Can receive only one value per step. Use an Annotated key to handle multiple values.

我们发现了一个问题!

节点1分支出节点2和节点3。

节点2和节点3并行运行,这意味着它们在图谱的同一个步骤中执行。

这两个节点都试图在同一个步骤内覆写状态。

这对图谱来说存在歧义!它应该保留哪个状态?

归约器¶

归约器为我们提供了一种通用的解决方案。

它们定义了如何执行更新操作。

我们可以使用Annotated类型来指定归约函数。

例如在本例中,我们需要追加每个节点返回的值而非直接覆盖它们。

只需一个能实现此功能的归约器:Python内置operator模块中的operator.add函数。

当operator.add作用于列表时,执行的是列表拼接操作。

In [5]:
Copied!
from operator import add
from typing import Annotated

class State(TypedDict):
    foo: Annotated[list[int], add]

def node_1(state):
    print("---Node 1---")
    return {"foo": [state['foo'][0] + 1]}

# Build graph
builder = StateGraph(State)
builder.add_node("node_1", node_1)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
from operator import add from typing import Annotated class State(TypedDict): foo: Annotated[list[int], add] def node_1(state): print("---Node 1---") return {"foo": [state['foo'][0] + 1]} # Build graph builder = StateGraph(State) builder.add_node("node_1", node_1) # Logic builder.add_edge(START, "node_1") builder.add_edge("node_1", END) # Add graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image
In [6]:
Copied!
graph.invoke({"foo" : [1]})
graph.invoke({"foo" : [1]})
---Node 1---
Out[6]:
{'foo': [1, 2]}

现在,我们的状态键 foo 是一个列表。

这个 operator.add 归约函数会将每个节点的更新内容追加到该列表中。

In [7]:
Copied!
def node_1(state):
    print("---Node 1---")
    return {"foo": [state['foo'][-1] + 1]}

def node_2(state):
    print("---Node 2---")
    return {"foo": [state['foo'][-1] + 1]}

def node_3(state):
    print("---Node 3---")
    return {"foo": [state['foo'][-1] + 1]}

# Build graph
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_1", "node_3")
builder.add_edge("node_2", END)
builder.add_edge("node_3", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))
def node_1(state): print("---Node 1---") return {"foo": [state['foo'][-1] + 1]} def node_2(state): print("---Node 2---") return {"foo": [state['foo'][-1] + 1]} def node_3(state): print("---Node 3---") return {"foo": [state['foo'][-1] + 1]} # Build graph builder = StateGraph(State) builder.add_node("node_1", node_1) builder.add_node("node_2", node_2) builder.add_node("node_3", node_3) # Logic builder.add_edge(START, "node_1") builder.add_edge("node_1", "node_2") builder.add_edge("node_1", "node_3") builder.add_edge("node_2", END) builder.add_edge("node_3", END) # Add graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image

我们可以看到,节点2和节点3的更新是并发执行的,因为它们处于同一个步骤中。

In [8]:
Copied!
graph.invoke({"foo" : [1]})
graph.invoke({"foo" : [1]})
---Node 1---
---Node 2---
---Node 3---
Out[8]:
{'foo': [1, 2, 3, 3]}

现在,让我们看看如果向 foo 传递 None 会发生什么。

我们会看到一个错误,因为我们的归约函数 operator.add 试图将作为输入传递的 NoneType 与 node_1 中的列表进行拼接。

In [9]:
Copied!
try:
    graph.invoke({"foo" : None})
except TypeError as e:
    print(f"TypeError occurred: {e}")
try: graph.invoke({"foo" : None}) except TypeError as e: print(f"TypeError occurred: {e}")
TypeError occurred: can only concatenate list (not "NoneType") to list

自定义归约器¶

为解决此类情况,我们也可以定义自定义归约器。

例如,我们可以定义自定义归约逻辑来合并列表,并处理其中一个或两个输入可能为None的情况。

In [10]:
Copied!
def reduce_list(left: list | None, right: list | None) -> list:
    """Safely combine two lists, handling cases where either or both inputs might be None.

    Args:
        left (list | None): The first list to combine, or None.
        right (list | None): The second list to combine, or None.

    Returns:
        list: A new list containing all elements from both input lists.
               If an input is None, it's treated as an empty list.
    """
    if not left:
        left = []
    if not right:
        right = []
    return left + right

class DefaultState(TypedDict):
    foo: Annotated[list[int], add]

class CustomReducerState(TypedDict):
    foo: Annotated[list[int], reduce_list]
def reduce_list(left: list | None, right: list | None) -> list: """Safely combine two lists, handling cases where either or both inputs might be None. Args: left (list | None): The first list to combine, or None. right (list | None): The second list to combine, or None. Returns: list: A new list containing all elements from both input lists. If an input is None, it's treated as an empty list. """ if not left: left = [] if not right: right = [] return left + right class DefaultState(TypedDict): foo: Annotated[list[int], add] class CustomReducerState(TypedDict): foo: Annotated[list[int], reduce_list]

在 node_1 中,我们追加了数值 2。

In [11]:
Copied!
def node_1(state):
    print("---Node 1---")
    return {"foo": [2]}

# Build graph
builder = StateGraph(DefaultState)
builder.add_node("node_1", node_1)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

try:
    print(graph.invoke({"foo" : None}))
except TypeError as e:
    print(f"TypeError occurred: {e}")
def node_1(state): print("---Node 1---") return {"foo": [2]} # Build graph builder = StateGraph(DefaultState) builder.add_node("node_1", node_1) # Logic builder.add_edge(START, "node_1") builder.add_edge("node_1", END) # Add graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png())) try: print(graph.invoke({"foo" : None})) except TypeError as e: print(f"TypeError occurred: {e}")
No description has been provided for this image
TypeError occurred: can only concatenate list (not "NoneType") to list

现在,尝试使用我们的自定义 reducer。可以看到没有抛出任何错误。

In [12]:
Copied!
# Build graph
builder = StateGraph(CustomReducerState)
builder.add_node("node_1", node_1)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", END)

# Add
graph = builder.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

try:
    print(graph.invoke({"foo" : None}))
except TypeError as e:
    print(f"TypeError occurred: {e}")
# Build graph builder = StateGraph(CustomReducerState) builder.add_node("node_1", node_1) # Logic builder.add_edge(START, "node_1") builder.add_edge("node_1", END) # Add graph = builder.compile() # View display(Image(graph.get_graph().draw_mermaid_png())) try: print(graph.invoke({"foo" : None})) except TypeError as e: print(f"TypeError occurred: {e}")
No description has been provided for this image
---Node 1---
{'foo': [2]}

消息处理¶

在模块1中,我们演示了如何使用内置的reducer add_messages 来管理状态中的消息。

我们还说明了如果你想处理消息,MessagesState 是一个实用的快捷方式。

  • MessagesState 内置了 messages 键
  • 同时为该键内置了 add_messages reducer

这两种方式是等效的。

为简洁起见,我们将通过 from langgraph.graph import MessagesState 来使用 MessagesState 类。

In [29]:
Copied!
from typing import Annotated
from langgraph.graph import MessagesState
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

# Define a custom TypedDict that includes a list of messages with add_messages reducer
class CustomMessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    added_key_1: str
    added_key_2: str
    # etc

# Use MessagesState, which includes the messages key with add_messages reducer
class ExtendedMessagesState(MessagesState):
    # Add any keys needed beyond messages, which is pre-built 
    added_key_1: str
    added_key_2: str
    # etc
from typing import Annotated from langgraph.graph import MessagesState from langchain_core.messages import AnyMessage from langgraph.graph.message import add_messages # Define a custom TypedDict that includes a list of messages with add_messages reducer class CustomMessagesState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] added_key_1: str added_key_2: str # etc # Use MessagesState, which includes the messages key with add_messages reducer class ExtendedMessagesState(MessagesState): # Add any keys needed beyond messages, which is pre-built added_key_1: str added_key_2: str # etc

让我们进一步探讨 add_messages reducer 的使用方法。

In [13]:
Copied!
from langgraph.graph.message import add_messages
from langchain_core.messages import AIMessage, HumanMessage

# Initial state
initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model"),
                    HumanMessage(content="I'm looking for information on marine biology.", name="Lance")
                   ]

# New message to add
new_message = AIMessage(content="Sure, I can help with that. What specifically are you interested in?", name="Model")

# Test
add_messages(initial_messages , new_message)
from langgraph.graph.message import add_messages from langchain_core.messages import AIMessage, HumanMessage # Initial state initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model"), HumanMessage(content="I'm looking for information on marine biology.", name="Lance") ] # New message to add new_message = AIMessage(content="Sure, I can help with that. What specifically are you interested in?", name="Model") # Test add_messages(initial_messages , new_message)
Out[13]:
[AIMessage(content='Hello! How can I assist you?', name='Model', id='f470d868-cf1b-45b2-ae16-48154cd55c12'),
 HumanMessage(content="I'm looking for information on marine biology.", name='Lance', id='a07a88c5-cb2a-4cbd-9485-5edb9d658366'),
 AIMessage(content='Sure, I can help with that. What specifically are you interested in?', name='Model', id='7938e615-86c2-4cbb-944b-c9b2342dee68')]

因此我们可以看到,add_messages 允许我们将消息追加到状态中的 messages 键。

重写机制¶

下面展示一些使用 add_messages 缩减器时的实用技巧。

如果我们传递的消息 ID 与 messages 列表中已存在的消息相同,该消息将被覆盖!

In [14]:
Copied!
# Initial state
initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model", id="1"),
                    HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2")
                   ]

# New message to add
new_message = HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2")

# Test
add_messages(initial_messages , new_message)
# Initial state initial_messages = [AIMessage(content="Hello! How can I assist you?", name="Model", id="1"), HumanMessage(content="I'm looking for information on marine biology.", name="Lance", id="2") ] # New message to add new_message = HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2") # Test add_messages(initial_messages , new_message)
Out[14]:
[AIMessage(content='Hello! How can I assist you?', name='Model', id='1'),
 HumanMessage(content="I'm looking for information on whales, specifically", name='Lance', id='2')]

消息移除¶

add_messages 还支持消息删除功能。

要实现此功能,只需使用 langchain_core 模块中的 RemoveMessage 即可。

In [15]:
Copied!
from langchain_core.messages import RemoveMessage

# Message list
messages = [AIMessage("Hi.", name="Bot", id="1")]
messages.append(HumanMessage("Hi.", name="Lance", id="2"))
messages.append(AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"))
messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4"))

# Isolate messages to delete
delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
print(delete_messages)
from langchain_core.messages import RemoveMessage # Message list messages = [AIMessage("Hi.", name="Bot", id="1")] messages.append(HumanMessage("Hi.", name="Lance", id="2")) messages.append(AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3")) messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4")) # Isolate messages to delete delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]] print(delete_messages)
[RemoveMessage(content='', id='1'), RemoveMessage(content='', id='2')]
/var/folders/l9/bpjxdmfx7lvd1fbdjn38y5dh0000gn/T/ipykernel_17703/3097054180.py:10: LangChainBetaWarning: The class `RemoveMessage` is in beta. It is actively being worked on, so the API may change.
  delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
In [16]:
Copied!
add_messages(messages , delete_messages)
add_messages(messages , delete_messages)
Out[16]:
[AIMessage(content='So you said you were researching ocean mammals?', name='Bot', id='3'),
 HumanMessage(content='Yes, I know about whales. But what others should I learn about?', name='Lance', id='4')]

我们可以看到,delete_messages 中标注的消息 ID 1 和 2 已被 reducer 移除。

稍后我们将看到这一机制的实际应用。

In [ ]:
Copied!


Documentation built with MkDocs.

Search

From here you can search these documents. Enter your search terms below.

Keyboard Shortcuts

Keys Action
? Open this help
n Next page
p Previous page
s Search