%%capture --no-stderr
%pip install -U langgraph tavily-python wikipedia langchain_openai langchain_community langgraph_sdk
import os, getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
from IPython.display import Image, display
from typing import Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
state: str
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['state']}")
return {"state": [self._value]}
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", "c")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
我们按预期覆盖了状态。
graph.invoke({"state": []})
Adding I'm A to [] Adding I'm B to ["I'm A"] Adding I'm C to ["I'm B"] Adding I'm D to ["I'm C"]
{'state': ["I'm D"]}
现在,让我们并行运行 b
和 c
。
然后运行 d
。
我们可以轻松实现从 a
到 b
和 c
的分发(fan-out),再汇集到 d
(fan-in)。
状态更新会在每个步骤结束时应用。
开始运行吧。
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
我们遇到了一个错误!
这是因为 b
和 c
在同一计算步骤中尝试向相同的状态键/通道执行写入操作。
from langgraph.errors import InvalidUpdateError
try:
graph.invoke({"state": []})
except InvalidUpdateError as e:
print(f"An error occurred: {e}")
Adding I'm A to [] Adding I'm B to ["I'm A"] Adding I'm C to ["I'm A"] An error occurred: At key 'state': Can receive only one value per step. Use an Annotated key to handle multiple values.
使用扇出(fan out)模式时,若多个步骤需写入相同通道/键,必须确保使用归约器(reducer)。
正如我们在模块2中提到的,operator.add
是Python内置operator模块中的函数。
当operator.add
作用于列表时,执行的是列表拼接操作。
import operator
from typing import Annotated
class State(TypedDict):
# The operator.add reducer fn makes this append-only
state: Annotated[list, operator.add]
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
graph.invoke({"state": []})
Adding I'm A to [] Adding I'm B to ["I'm A"] Adding I'm C to ["I'm A"] Adding I'm D to ["I'm A", "I'm B", "I'm C"]
{'state': ["I'm A", "I'm B", "I'm C", "I'm D"]}
现在我们看到,我们为 b
和 c
并行执行的更新操作追加了状态。
等待节点执行完成¶
现在,让我们考虑一种情况:其中一条并行路径包含的步骤比另一条路径更多。
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
在这种情况下,b
、b2
和 c
都属于同一个步骤。
流程图会等待所有这些步骤完成后再继续执行步骤 d
。
graph.invoke({"state": []})
Adding I'm A to [] Adding I'm B to ["I'm A"] Adding I'm C to ["I'm A"] Adding I'm B2 to ["I'm A", "I'm B", "I'm C"] Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
{'state': ["I'm A", "I'm B", "I'm C", "I'm B2", "I'm D"]}
设置状态更新的顺序¶
然而,在每个步骤内部,我们无法精确控制状态更新的顺序!
简单来说,这个顺序是由 LangGraph 根据图拓扑结构决定的确定性顺序,我们无法控制。
在上面的例子中,我们看到 c
在 b2
之前被添加。
不过,我们可以使用自定义归约器来调整这个顺序,例如对状态更新进行排序。
def sorting_reducer(left, right):
""" Combines and sorts the values in a list"""
if not isinstance(left, list):
left = [left]
if not isinstance(right, list):
right = [right]
return sorted(left + right, reverse=False)
class State(TypedDict):
# sorting_reducer will sort the values in state
state: Annotated[list, sorting_reducer]
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
graph.invoke({"state": []})
Adding I'm A to [] Adding I'm C to ["I'm A"] Adding I'm B to ["I'm A"] Adding I'm B2 to ["I'm A", "I'm B", "I'm C"] Adding I'm D to ["I'm A", "I'm B", "I'm B2", "I'm C"]
{'state': ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]}
现在,reducer会对更新后的状态值进行排序!
s排序_reducer
示例会对所有值进行全局排序。我们还可以:
- 在并行步骤中将输出写入状态的单独字段
- 在并行步骤后使用"sink"节点来合并和排序这些输出
- 合并后清除临时字段
更多详情请参阅文档。
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)
class State(TypedDict):
question: str
answer: str
context: Annotated[list, operator.add]
你可以尝试不同的网络搜索工具。Tavily 是一个值得考虑的优秀选项,但请确保已设置你的 TAVILY_API_KEY
。
import os, getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("TAVILY_API_KEY")
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools import TavilySearchResults
def search_web(state):
""" Retrieve docs from web search """
# Search
tavily_search = TavilySearchResults(max_results=3)
search_docs = tavily_search.invoke(state['question'])
# Format
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}
def search_wikipedia(state):
""" Retrieve docs from wikipedia """
# Search
search_docs = WikipediaLoader(query=state['question'],
load_max_docs=2).load()
# Format
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}
def generate_answer(state):
""" Node to answer a question """
# Get state
context = state["context"]
question = state["question"]
# Template
answer_template = """Answer the question {question} using this context: {context}"""
answer_instructions = answer_template.format(question=question,
context=context)
# Answer
answer = llm.invoke([SystemMessage(content=answer_instructions)]+[HumanMessage(content=f"Answer the question.")])
# Append it to state
return {"answer": answer}
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("search_web",search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)
# Flow
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
result = graph.invoke({"question": "How were Nvidia's Q2 2024 earnings"})
result['answer'].content
"Nvidia's Q2 2024 earnings were notably strong. The company reported a GAAP net income of $6.188 billion, a significant increase from $656 million in the same quarter the previous year. Revenue for the quarter was $13.507 billion, up from $6.704 billion in Q2 2023. The GAAP gross profit was $9.462 billion with a gross margin of 70.1%. Non-GAAP gross profit was $9.614 billion with a gross margin of 71.2%. Operating income also saw a substantial rise to $6.800 billion from $499 million in the previous year. Overall, Nvidia demonstrated robust financial performance in Q2 2024."
与 LangGraph API 配合使用¶
⚠️ 免责声明
自这些视频录制以来,我们已对 Studio 进行了更新,现在可以本地运行并在浏览器中打开。这是目前运行 Studio 的推荐方式(而非视频中展示的桌面应用程序)。关于本地开发服务器的文档请参见此处,运行说明请参考此指南。要启动本地开发服务器,请在本模块的 /studio
目录下执行以下终端命令:
langgraph dev
您将看到如下输出:
- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio 界面: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API 文档: http://127.0.0.1:2024/docs
在浏览器中访问 Studio 界面:https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
。
if 'google.colab' in str(get_ipython()):
raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")
thread = await client.threads.create()
input_question = {"question": "How were Nvidia Q2 2024 earnings?"}
async for event in client.runs.stream(thread["thread_id"],
assistant_id="parallelization",
input=input_question,
stream_mode="values"):
# Check if answer has been added to state
answer = event.data.get('answer', None)
if answer:
print(answer['content'])
Nvidia's Q2 2024 earnings were exceptionally strong. The company reported $13.5 billion in revenue, significantly surpassing expectations, and made $6 billion in pure profit. The earnings per share were $2.70, adjusted, compared to the $2.09 per share expected by analysts. The gross profit margins were 75.1%, and the adjusted earnings per share were 68 cents. The strong performance was driven by unprecedented demand for its generative AI chips.