Langchain Academy translated
  • module-0
    • LangChain 学院
  • module-1
    • 智能体记忆
    • 智能体
    • 链式结构
    • 部署
    • 路由器
    • 最简单的图结构
  • module-2
    • 支持消息摘要与外部数据库记忆的聊天机器人
    • 支持消息摘要的聊天机器人
    • 多模式架构
    • 状态归约器
    • 状态模式
    • 消息过滤与修剪
  • module-3
    • 断点
    • 动态断点
    • 编辑图状态
    • 流式处理
    • 时间回溯
  • module-4
    • 映射-归约
    • 并行节点执行
    • 研究助手
    • 子图
  • module-5
    • 记忆代理
    • 具备记忆功能的聊天机器人
    • 基于集合架构的聊天机器人
    • 支持个人资料架构的聊天机器人
  • module-6
    • 助手
    • 连接 LangGraph 平台部署
    • 创建部署
    • 双重消息处理
  • Search
  • Previous
  • Next
  • 并行节点执行
    • 回顾
    • 目标
    • 扇出与扇入
    • 等待节点执行完成
    • 设置状态更新的顺序
    • 大语言模型应用实践
    • 与 LangGraph API 配合使用

在 Colab 中打开 在 LangChain Academy 中打开

并行节点执行¶

回顾¶

在模块3中,我们深入探讨了人工介入循环,展示了3个常见用例:

(1) 审批 - 我们可以中断智能体运行,向用户展示状态,并允许用户接受某个操作

(2) 调试 - 我们可以回滚图谱以复现或避免问题

(3) 编辑 - 您可以修改状态

目标¶

本模块将基于人工介入循环概念以及模块2讨论的记忆概念进行扩展。

我们将深入探讨多智能体工作流,最终构建一个整合本课程所有模块的多智能体研究助手。

为了构建这个多智能体研究助手,我们将首先讨论几个LangGraph可控性主题。

我们将从并行化开始。

扇出与扇入¶

让我们构建一个简单的线性图谱,该图谱会在每个步骤覆盖状态。

In [ ]:
Copied!
%%capture --no-stderr
%pip install -U  langgraph tavily-python wikipedia langchain_openai langchain_community langgraph_sdk
%%capture --no-stderr %pip install -U langgraph tavily-python wikipedia langchain_openai langchain_community langgraph_sdk
In [1]:
Copied!
import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")
import os, getpass def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("OPENAI_API_KEY")
In [2]:
Copied!
from IPython.display import Image, display

from typing import Any
from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    state: str

class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['state']}")
        return {"state": [self._value]}

# Add nodes
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", "c")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
from IPython.display import Image, display from typing import Any from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END class State(TypedDict): # The operator.add reducer fn makes this append-only state: str class ReturnNodeValue: def __init__(self, node_secret: str): self._value = node_secret def __call__(self, state: State) -> Any: print(f"Adding {self._value} to {state['state']}") return {"state": [self._value]} # Add nodes builder = StateGraph(State) # Initialize each node with node_secret builder.add_node("a", ReturnNodeValue("I'm A")) builder.add_node("b", ReturnNodeValue("I'm B")) builder.add_node("c", ReturnNodeValue("I'm C")) builder.add_node("d", ReturnNodeValue("I'm D")) # Flow builder.add_edge(START, "a") builder.add_edge("a", "b") builder.add_edge("b", "c") builder.add_edge("c", "d") builder.add_edge("d", END) graph = builder.compile() display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image

我们按预期覆盖了状态。

In [2]:
Copied!
graph.invoke({"state": []})
graph.invoke({"state": []})
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm B"]
Adding I'm D to ["I'm C"]
Out[2]:
{'state': ["I'm D"]}

现在,让我们并行运行 b 和 c。

然后运行 d。

我们可以轻松实现从 a 到 b 和 c 的分发(fan-out),再汇集到 d(fan-in)。

状态更新会在每个步骤结束时应用。

开始运行吧。

In [3]:
Copied!
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
builder = StateGraph(State) # Initialize each node with node_secret builder.add_node("a", ReturnNodeValue("I'm A")) builder.add_node("b", ReturnNodeValue("I'm B")) builder.add_node("c", ReturnNodeValue("I'm C")) builder.add_node("d", ReturnNodeValue("I'm D")) # Flow builder.add_edge(START, "a") builder.add_edge("a", "b") builder.add_edge("a", "c") builder.add_edge("b", "d") builder.add_edge("c", "d") builder.add_edge("d", END) graph = builder.compile() display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image

我们遇到了一个错误!

这是因为 b 和 c 在同一计算步骤中尝试向相同的状态键/通道执行写入操作。

In [4]:
Copied!
from langgraph.errors import InvalidUpdateError
try:
    graph.invoke({"state": []})
except InvalidUpdateError as e:
    print(f"An error occurred: {e}")
from langgraph.errors import InvalidUpdateError try: graph.invoke({"state": []}) except InvalidUpdateError as e: print(f"An error occurred: {e}")
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
An error occurred: At key 'state': Can receive only one value per step. Use an Annotated key to handle multiple values.

使用扇出(fan out)模式时,若多个步骤需写入相同通道/键,必须确保使用归约器(reducer)。

正如我们在模块2中提到的,operator.add 是Python内置operator模块中的函数。

当operator.add作用于列表时,执行的是列表拼接操作。

In [3]:
Copied!
import operator
from typing import Annotated

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    state: Annotated[list, operator.add]

# Add nodes
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
import operator from typing import Annotated class State(TypedDict): # The operator.add reducer fn makes this append-only state: Annotated[list, operator.add] # Add nodes builder = StateGraph(State) # Initialize each node with node_secret builder.add_node("a", ReturnNodeValue("I'm A")) builder.add_node("b", ReturnNodeValue("I'm B")) builder.add_node("c", ReturnNodeValue("I'm C")) builder.add_node("d", ReturnNodeValue("I'm D")) # Flow builder.add_edge(START, "a") builder.add_edge("a", "b") builder.add_edge("a", "c") builder.add_edge("b", "d") builder.add_edge("c", "d") builder.add_edge("d", END) graph = builder.compile() display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image
In [6]:
Copied!
graph.invoke({"state": []})
graph.invoke({"state": []})
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]
Out[6]:
{'state': ["I'm A", "I'm B", "I'm C", "I'm D"]}

现在我们看到,我们为 b 和 c 并行执行的更新操作追加了状态。

等待节点执行完成¶

现在,让我们考虑一种情况:其中一条并行路径包含的步骤比另一条路径更多。

In [7]:
Copied!
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
builder = StateGraph(State) # Initialize each node with node_secret builder.add_node("a", ReturnNodeValue("I'm A")) builder.add_node("b", ReturnNodeValue("I'm B")) builder.add_node("b2", ReturnNodeValue("I'm B2")) builder.add_node("c", ReturnNodeValue("I'm C")) builder.add_node("d", ReturnNodeValue("I'm D")) # Flow builder.add_edge(START, "a") builder.add_edge("a", "b") builder.add_edge("a", "c") builder.add_edge("b", "b2") builder.add_edge(["b2", "c"], "d") builder.add_edge("d", END) graph = builder.compile() display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image

在这种情况下,b、b2 和 c 都属于同一个步骤。

流程图会等待所有这些步骤完成后再继续执行步骤 d。

In [8]:
Copied!
graph.invoke({"state": []})
graph.invoke({"state": []})
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
Out[8]:
{'state': ["I'm A", "I'm B", "I'm C", "I'm B2", "I'm D"]}

设置状态更新的顺序¶

然而,在每个步骤内部,我们无法精确控制状态更新的顺序!

简单来说,这个顺序是由 LangGraph 根据图拓扑结构决定的确定性顺序,我们无法控制。

在上面的例子中,我们看到 c 在 b2 之前被添加。

不过,我们可以使用自定义归约器来调整这个顺序,例如对状态更新进行排序。

In [9]:
Copied!
def sorting_reducer(left, right):
    """ Combines and sorts the values in a list"""
    if not isinstance(left, list):
        left = [left]

    if not isinstance(right, list):
        right = [right]
    
    return sorted(left + right, reverse=False)

class State(TypedDict):
    # sorting_reducer will sort the values in state
    state: Annotated[list, sorting_reducer]

# Add nodes
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
def sorting_reducer(left, right): """ Combines and sorts the values in a list""" if not isinstance(left, list): left = [left] if not isinstance(right, list): right = [right] return sorted(left + right, reverse=False) class State(TypedDict): # sorting_reducer will sort the values in state state: Annotated[list, sorting_reducer] # Add nodes builder = StateGraph(State) # Initialize each node with node_secret builder.add_node("a", ReturnNodeValue("I'm A")) builder.add_node("b", ReturnNodeValue("I'm B")) builder.add_node("b2", ReturnNodeValue("I'm B2")) builder.add_node("c", ReturnNodeValue("I'm C")) builder.add_node("d", ReturnNodeValue("I'm D")) # Flow builder.add_edge(START, "a") builder.add_edge("a", "b") builder.add_edge("a", "c") builder.add_edge("b", "b2") builder.add_edge(["b2", "c"], "d") builder.add_edge("d", END) graph = builder.compile() display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image
In [10]:
Copied!
graph.invoke({"state": []})
graph.invoke({"state": []})
Adding I'm A to []
Adding I'm C to ["I'm A"]
Adding I'm B to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm B2", "I'm C"]
Out[10]:
{'state': ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]}

现在,reducer会对更新后的状态值进行排序!

s排序_reducer示例会对所有值进行全局排序。我们还可以:

  1. 在并行步骤中将输出写入状态的单独字段
  2. 在并行步骤后使用"sink"节点来合并和排序这些输出
  3. 合并后清除临时字段

更多详情请参阅文档。

大语言模型应用实践¶

现在,让我们添加一个实际案例!

我们的目标是从两个外部来源(维基百科和网络搜索)收集上下文信息,并利用大语言模型来回答问题。

In [4]:
Copied!
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)
from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o", temperature=0)
In [5]:
Copied!
class State(TypedDict):
    question: str
    answer: str
    context: Annotated[list, operator.add]
class State(TypedDict): question: str answer: str context: Annotated[list, operator.add]

你可以尝试不同的网络搜索工具。Tavily 是一个值得考虑的优秀选项,但请确保已设置你的 TAVILY_API_KEY。

In [6]:
Copied!
import os, getpass
def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("TAVILY_API_KEY")
import os, getpass def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("TAVILY_API_KEY")
In [9]:
Copied!
from langchain_core.messages import HumanMessage, SystemMessage

from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools import TavilySearchResults

def search_web(state):
    
    """ Retrieve docs from web search """

    # Search
    tavily_search = TavilySearchResults(max_results=3)
    search_docs = tavily_search.invoke(state['question'])

     # Format
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
            for doc in search_docs
        ]
    )

    return {"context": [formatted_search_docs]} 

def search_wikipedia(state):
    
    """ Retrieve docs from wikipedia """

    # Search
    search_docs = WikipediaLoader(query=state['question'], 
                                  load_max_docs=2).load()

     # Format
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
            for doc in search_docs
        ]
    )

    return {"context": [formatted_search_docs]} 

def generate_answer(state):
    
    """ Node to answer a question """

    # Get state
    context = state["context"]
    question = state["question"]

    # Template
    answer_template = """Answer the question {question} using this context: {context}"""
    answer_instructions = answer_template.format(question=question, 
                                                       context=context)    
    
    # Answer
    answer = llm.invoke([SystemMessage(content=answer_instructions)]+[HumanMessage(content=f"Answer the question.")])
      
    # Append it to state
    return {"answer": answer}

# Add nodes
builder = StateGraph(State)

# Initialize each node with node_secret 
builder.add_node("search_web",search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)

# Flow
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))
from langchain_core.messages import HumanMessage, SystemMessage from langchain_community.document_loaders import WikipediaLoader from langchain_community.tools import TavilySearchResults def search_web(state): """ Retrieve docs from web search """ # Search tavily_search = TavilySearchResults(max_results=3) search_docs = tavily_search.invoke(state['question']) # Format formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc["content"]}\n' for doc in search_docs ] ) return {"context": [formatted_search_docs]} def search_wikipedia(state): """ Retrieve docs from wikipedia """ # Search search_docs = WikipediaLoader(query=state['question'], load_max_docs=2).load() # Format formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ] ) return {"context": [formatted_search_docs]} def generate_answer(state): """ Node to answer a question """ # Get state context = state["context"] question = state["question"] # Template answer_template = """Answer the question {question} using this context: {context}""" answer_instructions = answer_template.format(question=question, context=context) # Answer answer = llm.invoke([SystemMessage(content=answer_instructions)]+[HumanMessage(content=f"Answer the question.")]) # Append it to state return {"answer": answer} # Add nodes builder = StateGraph(State) # Initialize each node with node_secret builder.add_node("search_web",search_web) builder.add_node("search_wikipedia", search_wikipedia) builder.add_node("generate_answer", generate_answer) # Flow builder.add_edge(START, "search_wikipedia") builder.add_edge(START, "search_web") builder.add_edge("search_wikipedia", "generate_answer") builder.add_edge("search_web", "generate_answer") builder.add_edge("generate_answer", END) graph = builder.compile() display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image
In [10]:
Copied!
result = graph.invoke({"question": "How were Nvidia's Q2 2024 earnings"})
result['answer'].content
result = graph.invoke({"question": "How were Nvidia's Q2 2024 earnings"}) result['answer'].content
Out[10]:
"Nvidia's Q2 2024 earnings were notably strong. The company reported a GAAP net income of $6.188 billion, a significant increase from $656 million in the same quarter the previous year. Revenue for the quarter was $13.507 billion, up from $6.704 billion in Q2 2023. The GAAP gross profit was $9.462 billion with a gross margin of 70.1%. Non-GAAP gross profit was $9.614 billion with a gross margin of 71.2%. Operating income also saw a substantial rise to $6.800 billion from $499 million in the previous year. Overall, Nvidia demonstrated robust financial performance in Q2 2024."

与 LangGraph API 配合使用¶

⚠️ 免责声明

自这些视频录制以来,我们已对 Studio 进行了更新,现在可以本地运行并在浏览器中打开。这是目前运行 Studio 的推荐方式(而非视频中展示的桌面应用程序)。关于本地开发服务器的文档请参见此处,运行说明请参考此指南。要启动本地开发服务器,请在本模块的 /studio 目录下执行以下终端命令:

langgraph dev

您将看到如下输出:

- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio 界面: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API 文档: http://127.0.0.1:2024/docs

在浏览器中访问 Studio 界面:https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024。

In [ ]:
Copied!
if 'google.colab' in str(get_ipython()):
    raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")
if 'google.colab' in str(get_ipython()): raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")
In [17]:
Copied!
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")
from langgraph_sdk import get_client client = get_client(url="http://127.0.0.1:2024")
In [18]:
Copied!
thread = await client.threads.create()
input_question = {"question": "How were Nvidia Q2 2024 earnings?"}
async for event in client.runs.stream(thread["thread_id"], 
                                      assistant_id="parallelization", 
                                      input=input_question, 
                                      stream_mode="values"):
    # Check if answer has been added to state  
    answer = event.data.get('answer', None)
    if answer:
        print(answer['content'])
thread = await client.threads.create() input_question = {"question": "How were Nvidia Q2 2024 earnings?"} async for event in client.runs.stream(thread["thread_id"], assistant_id="parallelization", input=input_question, stream_mode="values"): # Check if answer has been added to state answer = event.data.get('answer', None) if answer: print(answer['content'])
Nvidia's Q2 2024 earnings were exceptionally strong. The company reported $13.5 billion in revenue, significantly surpassing expectations, and made $6 billion in pure profit. The earnings per share were $2.70, adjusted, compared to the $2.09 per share expected by analysts. The gross profit margins were 75.1%, and the adjusted earnings per share were 68 cents. The strong performance was driven by unprecedented demand for its generative AI chips.

Documentation built with MkDocs.

Search

From here you can search these documents. Enter your search terms below.

Keyboard Shortcuts

Keys Action
? Open this help
n Next page
p Previous page
s Search