子图¶
回顾¶
我们正在构建一个多智能体研究助手,它将整合本课程所有模块的功能。
刚刚我们讨论了并行化处理,这是LangGraph可控性的一个重要主题。
目标¶
现在我们将学习子图的使用。
状态管理¶
子图允许你在图的不同部分创建和管理不同的状态。
这对于多智能体系统特别有用,可以让不同团队的智能体各自维护自己的状态。
让我们看一个简单示例:
- 系统接收日志输入
- 由不同智能体执行两个独立子任务(日志摘要、故障模式分析)
- 需要在两个不同的子图中执行这些操作
最关键的是要理解子图之间如何通信!
简而言之,通信是通过共享键值实现的:
- 子图可以访问父图的
docs
字段 - 父图可以访问子图的
summary/failure_report
字段
输入定义¶
现在让我们为将要输入到图中的日志数据定义模式结构。
%%capture --no-stderr
%pip install -U langgraph
import os, getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("LANGSMITH_API_KEY")
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "langchain-academy"
from operator import add
from typing_extensions import TypedDict
from typing import List, Optional, Annotated
# The structure of the logs
class Log(TypedDict):
id: str
question: str
docs: Optional[List]
answer: str
grade: Optional[int]
grader: Optional[str]
feedback: Optional[str]
子图¶
以下是故障分析的子图,该图使用了 FailureAnalysisState
。
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
# Failure Analysis Sub-graph
class FailureAnalysisState(TypedDict):
cleaned_logs: List[Log]
failures: List[Log]
fa_summary: str
processed_logs: List[str]
class FailureAnalysisOutputState(TypedDict):
fa_summary: str
processed_logs: List[str]
def get_failures(state):
""" Get logs that contain a failure """
cleaned_logs = state["cleaned_logs"]
failures = [log for log in cleaned_logs if "grade" in log]
return {"failures": failures}
def generate_summary(state):
""" Generate summary of failures """
failures = state["failures"]
# Add fxn: fa_summary = summarize(failures)
fa_summary = "Poor quality retrieval of Chroma documentation."
return {"fa_summary": fa_summary, "processed_logs": [f"failure-analysis-on-log-{failure['id']}" for failure in failures]}
fa_builder = StateGraph(state_schema=FailureAnalysisState,output_schema=FailureAnalysisOutputState)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)
graph = fa_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
以下是问题摘要子图,它使用 QuestionSummarizationState
。
# Summarization subgraph
class QuestionSummarizationState(TypedDict):
cleaned_logs: List[Log]
qs_summary: str
report: str
processed_logs: List[str]
class QuestionSummarizationOutputState(TypedDict):
report: str
processed_logs: List[str]
def generate_summary(state):
cleaned_logs = state["cleaned_logs"]
# Add fxn: summary = summarize(generate_summary)
summary = "Questions focused on usage of ChatOllama and Chroma vector store."
return {"qs_summary": summary, "processed_logs": [f"summary-on-log-{log['id']}" for log in cleaned_logs]}
def send_to_slack(state):
qs_summary = state["qs_summary"]
# Add fxn: report = report_generation(qs_summary)
report = "foo bar baz"
return {"report": report}
qs_builder = StateGraph(QuestionSummarizationState,output_schema=QuestionSummarizationOutputState)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)
graph = qs_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
将子图添加至父图¶
现在,我们可以将所有部分整合起来了。
我们使用 EntryGraphState
创建父图。
然后将各个子图作为节点添加进去!
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())
# Entry Graph
class EntryGraphState(TypedDict):
raw_logs: List[Log]
cleaned_logs: Annotated[List[Log], add] # This will be USED BY in BOTH sub-graphs
fa_summary: str # This will only be generated in the FA sub-graph
report: str # This will only be generated in the QS sub-graph
processed_logs: Annotated[List[int], add] # This will be generated in BOTH sub-graphs
但是,为什么cleaned_logs
会有一个reducer(归约器)呢?它明明只是作为输入进入每个子图,并没有被修改。
cleaned_logs: Annotated[List[Log], add] # 这个字段会被两个子图共同使用
这是因为子图的输出状态会包含所有键,即使它们未被修改。
这些子图是并行运行的。
由于并行的子图返回相同的键,就需要像operator.add
这样的reducer来合并来自各个子图的输入值。
不过,我们可以用之前讨论过的另一个概念来解决这个问题。
我们只需为每个子图创建一个输出状态模式,并确保输出状态模式包含不同的键作为输出发布。
实际上并不需要每个子图都输出cleaned_logs
。
# Entry Graph
class EntryGraphState(TypedDict):
raw_logs: List[Log]
cleaned_logs: List[Log]
fa_summary: str # This will only be generated in the FA sub-graph
report: str # This will only be generated in the QS sub-graph
processed_logs: Annotated[List[int], add] # This will be generated in BOTH sub-graphs
def clean_logs(state):
# Get logs
raw_logs = state["raw_logs"]
# Data cleaning raw_logs -> docs
cleaned_logs = raw_logs
return {"cleaned_logs": cleaned_logs}
entry_builder = StateGraph(EntryGraphState)
entry_builder.add_node("clean_logs", clean_logs)
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())
entry_builder.add_edge(START, "clean_logs")
entry_builder.add_edge("clean_logs", "failure_analysis")
entry_builder.add_edge("clean_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)
graph = entry_builder.compile()
from IPython.display import Image, display
# Setting xray to 1 will show the internal structure of the nested graph
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
# Dummy logs
question_answer = Log(
id="1",
question="How can I import ChatOllama?",
answer="To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'",
)
question_answer_feedback = Log(
id="2",
question="How can I use Chroma vector store?",
answer="To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).",
grade=0,
grader="Document Relevance Recall",
feedback="The retrieved documents discuss vector stores in general, but not Chroma specifically",
)
raw_logs = [question_answer,question_answer_feedback]
graph.invoke({"raw_logs": raw_logs})
{'raw_logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'grader': 'Document Relevance Recall', 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'cleaned_logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'grader': 'Document Relevance Recall', 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'fa_summary': 'Poor quality retrieval of Chroma documentation.', 'report': 'foo bar baz', 'processed_logs': ['failure-analysis-on-log-2', 'summary-on-log-1', 'summary-on-log-2']}
LangSmith¶
让我们查看 LangSmith 的追踪记录:
https://smith.langchain.com/public/f8f86f61-1b30-48cf-b055-3734dfceadf2/r
# Getting Started with the API
## Authentication
To use the API, you must first authenticate your requests. All API endpoints require an API key, which you can obtain from your account dashboard.
### Obtaining an API Key
1. Log in to your account at [https://example.com/login](https://example.com/login)
2. Navigate to **Settings** > **API Keys**
3. Click **Generate New Key**
4. Copy the generated key and store it securely
### Using the API Key
Include your API key in the `Authorization` header of all requests:
```http
GET /api/v1/users HTTP/1.1
Authorization: Bearer your_api_key_here
Warning: Never share your API key in client-side code or public repositories.
Making Your First Request¶
Now that you have an API key, you can make your first request to the API. Here's an example using cURL:
curl -X GET https://api.example.com/v1/users \
-H "Authorization: Bearer your_api_key_here"
The API will return a JSON response:
{
"users": [
{
"id": 1,
"name": "John Doe",
"email": "john@example.com"
}
]
}
Rate Limits¶
The API has the following rate limits:
Plan | Requests per minute |
---|---|
Free | 60 |
Professional | 1,000 |
Enterprise | 10,000 |
```markdown
# API 入门指南
## 认证授权
使用 API 前需先完成请求认证。所有 API 端点均需提供 API 密钥,该密钥可在您的账户面板中获取。
### 获取 API 密钥
1. 登录您的账户 [https://example.com/login](https://example.com/login)
2. 进入 **设置** > **API 密钥**
3. 点击 **生成新密钥**
4. 复制生成的密钥并妥善保存
### 使用 API 密钥
在所有请求的 `Authorization` 头部中包含您的 API 密钥:
```http
GET /api/v1/users HTTP/1.1
Authorization: Bearer your_api_key_here
警告:切勿在客户端代码或公开仓库中共享您的 API 密钥。
发起首次请求¶
获取 API 密钥后,即可向 API 发起首次请求。以下是通过 cURL 的示例:
curl -X GET https://api.example.com/v1/users \
-H "Authorization: Bearer your_api_key_here"
API 将返回 JSON 格式响应:
{
"users": [
{
"id": 1,
"name": "John Doe",
"email": "john@example.com"
}
]
}
速率限制¶
本 API 设有以下速率限制:
套餐类型 | 每分钟请求数 |
---|---|
免费版 | 60 |
专业版 | 1,000 |
企业版 | 10,000 |