Langchain Academy translated
  • module-0
    • LangChain 学院
  • module-1
    • 智能体记忆
    • 智能体
    • 链式结构
    • 部署
    • 路由器
    • 最简单的图结构
  • module-2
    • 支持消息摘要与外部数据库记忆的聊天机器人
    • 支持消息摘要的聊天机器人
    • 多模式架构
    • 状态归约器
    • 状态模式
    • 消息过滤与修剪
  • module-3
    • 断点
    • 动态断点
    • 编辑图状态
    • 流式处理
    • 时间回溯
  • module-4
    • 映射-归约
    • 并行节点执行
    • 研究助手
    • 子图
  • module-5
    • 记忆代理
    • 具备记忆功能的聊天机器人
    • 基于集合架构的聊天机器人
    • 支持个人资料架构的聊天机器人
  • module-6
    • 助手
    • 连接 LangGraph 平台部署
    • 创建部署
    • 双重消息处理
  • Search
  • Previous
  • Next
  • 子图
    • 回顾
    • 目标
    • 状态管理
    • 输入定义
    • 子图
    • 将子图添加至父图
    • LangSmith

在 Colab 中打开 在 LangChain Academy 中打开

子图¶

回顾¶

我们正在构建一个多智能体研究助手,它将整合本课程所有模块的功能。

刚刚我们讨论了并行化处理,这是LangGraph可控性的一个重要主题。

目标¶

现在我们将学习子图的使用。

状态管理¶

子图允许你在图的不同部分创建和管理不同的状态。

这对于多智能体系统特别有用,可以让不同团队的智能体各自维护自己的状态。

让我们看一个简单示例:

  • 系统接收日志输入
  • 由不同智能体执行两个独立子任务(日志摘要、故障模式分析)
  • 需要在两个不同的子图中执行这些操作

最关键的是要理解子图之间如何通信!

简而言之,通信是通过共享键值实现的:

  • 子图可以访问父图的docs字段
  • 父图可以访问子图的summary/failure_report字段

subgraph.png

输入定义¶

现在让我们为将要输入到图中的日志数据定义模式结构。

In [ ]:
Copied!
%%capture --no-stderr
%pip install -U  langgraph
%%capture --no-stderr %pip install -U langgraph

我们将使用 LangSmith 进行追踪。

In [1]:
Copied!
import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("LANGSMITH_API_KEY")
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "langchain-academy"
import os, getpass def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("LANGSMITH_API_KEY") os.environ["LANGSMITH_TRACING"] = "true" os.environ["LANGSMITH_PROJECT"] = "langchain-academy"
In [1]:
Copied!
from operator import add
from typing_extensions import TypedDict
from typing import List, Optional, Annotated

# The structure of the logs
class Log(TypedDict):
    id: str
    question: str
    docs: Optional[List]
    answer: str
    grade: Optional[int]
    grader: Optional[str]
    feedback: Optional[str]
from operator import add from typing_extensions import TypedDict from typing import List, Optional, Annotated # The structure of the logs class Log(TypedDict): id: str question: str docs: Optional[List] answer: str grade: Optional[int] grader: Optional[str] feedback: Optional[str]

子图¶

以下是故障分析的子图,该图使用了 FailureAnalysisState。

In [ ]:
Copied!
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

# Failure Analysis Sub-graph
class FailureAnalysisState(TypedDict):
    cleaned_logs: List[Log]
    failures: List[Log]
    fa_summary: str
    processed_logs: List[str]

class FailureAnalysisOutputState(TypedDict):
    fa_summary: str
    processed_logs: List[str]

def get_failures(state):
    """ Get logs that contain a failure """
    cleaned_logs = state["cleaned_logs"]
    failures = [log for log in cleaned_logs if "grade" in log]
    return {"failures": failures}

def generate_summary(state):
    """ Generate summary of failures """
    failures = state["failures"]
    # Add fxn: fa_summary = summarize(failures)
    fa_summary = "Poor quality retrieval of Chroma documentation."
    return {"fa_summary": fa_summary, "processed_logs": [f"failure-analysis-on-log-{failure['id']}" for failure in failures]}

fa_builder = StateGraph(state_schema=FailureAnalysisState,output_schema=FailureAnalysisOutputState)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)

graph = fa_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
from IPython.display import Image, display from langgraph.graph import StateGraph, START, END # Failure Analysis Sub-graph class FailureAnalysisState(TypedDict): cleaned_logs: List[Log] failures: List[Log] fa_summary: str processed_logs: List[str] class FailureAnalysisOutputState(TypedDict): fa_summary: str processed_logs: List[str] def get_failures(state): """ Get logs that contain a failure """ cleaned_logs = state["cleaned_logs"] failures = [log for log in cleaned_logs if "grade" in log] return {"failures": failures} def generate_summary(state): """ Generate summary of failures """ failures = state["failures"] # Add fxn: fa_summary = summarize(failures) fa_summary = "Poor quality retrieval of Chroma documentation." return {"fa_summary": fa_summary, "processed_logs": [f"failure-analysis-on-log-{failure['id']}" for failure in failures]} fa_builder = StateGraph(state_schema=FailureAnalysisState,output_schema=FailureAnalysisOutputState) fa_builder.add_node("get_failures", get_failures) fa_builder.add_node("generate_summary", generate_summary) fa_builder.add_edge(START, "get_failures") fa_builder.add_edge("get_failures", "generate_summary") fa_builder.add_edge("generate_summary", END) graph = fa_builder.compile() display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image

以下是问题摘要子图,它使用 QuestionSummarizationState。

In [ ]:
Copied!
# Summarization subgraph
class QuestionSummarizationState(TypedDict):
    cleaned_logs: List[Log]
    qs_summary: str
    report: str
    processed_logs: List[str]

class QuestionSummarizationOutputState(TypedDict):
    report: str
    processed_logs: List[str]

def generate_summary(state):
    cleaned_logs = state["cleaned_logs"]
    # Add fxn: summary = summarize(generate_summary)
    summary = "Questions focused on usage of ChatOllama and Chroma vector store."
    return {"qs_summary": summary, "processed_logs": [f"summary-on-log-{log['id']}" for log in cleaned_logs]}

def send_to_slack(state):
    qs_summary = state["qs_summary"]
    # Add fxn: report = report_generation(qs_summary)
    report = "foo bar baz"
    return {"report": report}

qs_builder = StateGraph(QuestionSummarizationState,output_schema=QuestionSummarizationOutputState)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)

graph = qs_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
# Summarization subgraph class QuestionSummarizationState(TypedDict): cleaned_logs: List[Log] qs_summary: str report: str processed_logs: List[str] class QuestionSummarizationOutputState(TypedDict): report: str processed_logs: List[str] def generate_summary(state): cleaned_logs = state["cleaned_logs"] # Add fxn: summary = summarize(generate_summary) summary = "Questions focused on usage of ChatOllama and Chroma vector store." return {"qs_summary": summary, "processed_logs": [f"summary-on-log-{log['id']}" for log in cleaned_logs]} def send_to_slack(state): qs_summary = state["qs_summary"] # Add fxn: report = report_generation(qs_summary) report = "foo bar baz" return {"report": report} qs_builder = StateGraph(QuestionSummarizationState,output_schema=QuestionSummarizationOutputState) qs_builder.add_node("generate_summary", generate_summary) qs_builder.add_node("send_to_slack", send_to_slack) qs_builder.add_edge(START, "generate_summary") qs_builder.add_edge("generate_summary", "send_to_slack") qs_builder.add_edge("send_to_slack", END) graph = qs_builder.compile() display(Image(graph.get_graph().draw_mermaid_png()))
No description has been provided for this image

将子图添加至父图¶

现在,我们可以将所有部分整合起来了。

我们使用 EntryGraphState 创建父图。

然后将各个子图作为节点添加进去!

entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())
In [4]:
Copied!
# Entry Graph
class EntryGraphState(TypedDict):
    raw_logs: List[Log]
    cleaned_logs: Annotated[List[Log], add] # This will be USED BY in BOTH sub-graphs
    fa_summary: str # This will only be generated in the FA sub-graph
    report: str # This will only be generated in the QS sub-graph
    processed_logs:  Annotated[List[int], add] # This will be generated in BOTH sub-graphs
# Entry Graph class EntryGraphState(TypedDict): raw_logs: List[Log] cleaned_logs: Annotated[List[Log], add] # This will be USED BY in BOTH sub-graphs fa_summary: str # This will only be generated in the FA sub-graph report: str # This will only be generated in the QS sub-graph processed_logs: Annotated[List[int], add] # This will be generated in BOTH sub-graphs

但是,为什么cleaned_logs会有一个reducer(归约器)呢?它明明只是作为输入进入每个子图,并没有被修改。

cleaned_logs: Annotated[List[Log], add] # 这个字段会被两个子图共同使用

这是因为子图的输出状态会包含所有键,即使它们未被修改。

这些子图是并行运行的。

由于并行的子图返回相同的键,就需要像operator.add这样的reducer来合并来自各个子图的输入值。

不过,我们可以用之前讨论过的另一个概念来解决这个问题。

我们只需为每个子图创建一个输出状态模式,并确保输出状态模式包含不同的键作为输出发布。

实际上并不需要每个子图都输出cleaned_logs。

In [5]:
Copied!
# Entry Graph
class EntryGraphState(TypedDict):
    raw_logs: List[Log]
    cleaned_logs: List[Log]
    fa_summary: str # This will only be generated in the FA sub-graph
    report: str # This will only be generated in the QS sub-graph
    processed_logs:  Annotated[List[int], add] # This will be generated in BOTH sub-graphs

def clean_logs(state):
    # Get logs
    raw_logs = state["raw_logs"]
    # Data cleaning raw_logs -> docs 
    cleaned_logs = raw_logs
    return {"cleaned_logs": cleaned_logs}

entry_builder = StateGraph(EntryGraphState)
entry_builder.add_node("clean_logs", clean_logs)
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())

entry_builder.add_edge(START, "clean_logs")
entry_builder.add_edge("clean_logs", "failure_analysis")
entry_builder.add_edge("clean_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)

graph = entry_builder.compile()

from IPython.display import Image, display

# Setting xray to 1 will show the internal structure of the nested graph
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
# Entry Graph class EntryGraphState(TypedDict): raw_logs: List[Log] cleaned_logs: List[Log] fa_summary: str # This will only be generated in the FA sub-graph report: str # This will only be generated in the QS sub-graph processed_logs: Annotated[List[int], add] # This will be generated in BOTH sub-graphs def clean_logs(state): # Get logs raw_logs = state["raw_logs"] # Data cleaning raw_logs -> docs cleaned_logs = raw_logs return {"cleaned_logs": cleaned_logs} entry_builder = StateGraph(EntryGraphState) entry_builder.add_node("clean_logs", clean_logs) entry_builder.add_node("question_summarization", qs_builder.compile()) entry_builder.add_node("failure_analysis", fa_builder.compile()) entry_builder.add_edge(START, "clean_logs") entry_builder.add_edge("clean_logs", "failure_analysis") entry_builder.add_edge("clean_logs", "question_summarization") entry_builder.add_edge("failure_analysis", END) entry_builder.add_edge("question_summarization", END) graph = entry_builder.compile() from IPython.display import Image, display # Setting xray to 1 will show the internal structure of the nested graph display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
No description has been provided for this image
In [6]:
Copied!
# Dummy logs
question_answer = Log(
    id="1",
    question="How can I import ChatOllama?",
    answer="To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'",
)

question_answer_feedback = Log(
    id="2",
    question="How can I use Chroma vector store?",
    answer="To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).",
    grade=0,
    grader="Document Relevance Recall",
    feedback="The retrieved documents discuss vector stores in general, but not Chroma specifically",
)

raw_logs = [question_answer,question_answer_feedback]
graph.invoke({"raw_logs": raw_logs})
# Dummy logs question_answer = Log( id="1", question="How can I import ChatOllama?", answer="To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'", ) question_answer_feedback = Log( id="2", question="How can I use Chroma vector store?", answer="To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).", grade=0, grader="Document Relevance Recall", feedback="The retrieved documents discuss vector stores in general, but not Chroma specifically", ) raw_logs = [question_answer,question_answer_feedback] graph.invoke({"raw_logs": raw_logs})
Out[6]:
{'raw_logs': [{'id': '1',
   'question': 'How can I import ChatOllama?',
   'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"},
  {'id': '2',
   'question': 'How can I use Chroma vector store?',
   'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).',
   'grade': 0,
   'grader': 'Document Relevance Recall',
   'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}],
 'cleaned_logs': [{'id': '1',
   'question': 'How can I import ChatOllama?',
   'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"},
  {'id': '2',
   'question': 'How can I use Chroma vector store?',
   'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).',
   'grade': 0,
   'grader': 'Document Relevance Recall',
   'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}],
 'fa_summary': 'Poor quality retrieval of Chroma documentation.',
 'report': 'foo bar baz',
 'processed_logs': ['failure-analysis-on-log-2',
  'summary-on-log-1',
  'summary-on-log-2']}
In [ ]:
Copied!

LangSmith¶

让我们查看 LangSmith 的追踪记录:

https://smith.langchain.com/public/f8f86f61-1b30-48cf-b055-3734dfceadf2/r

# Getting Started with the API

## Authentication

To use the API, you must first authenticate your requests. All API endpoints require an API key, which you can obtain from your account dashboard.

### Obtaining an API Key

1. Log in to your account at [https://example.com/login](https://example.com/login)
2. Navigate to **Settings** > **API Keys**
3. Click **Generate New Key**
4. Copy the generated key and store it securely

### Using the API Key

Include your API key in the `Authorization` header of all requests:

```http
GET /api/v1/users HTTP/1.1
Authorization: Bearer your_api_key_here

Warning: Never share your API key in client-side code or public repositories.

Making Your First Request¶

Now that you have an API key, you can make your first request to the API. Here's an example using cURL:

curl -X GET https://api.example.com/v1/users \
  -H "Authorization: Bearer your_api_key_here"

The API will return a JSON response:

{
  "users": [
    {
      "id": 1,
      "name": "John Doe",
      "email": "john@example.com"
    }
  ]
}

Rate Limits¶

The API has the following rate limits:

Plan Requests per minute
Free 60
Professional 1,000
Enterprise 10,000

```markdown
# API 入门指南

## 认证授权

使用 API 前需先完成请求认证。所有 API 端点均需提供 API 密钥,该密钥可在您的账户面板中获取。

### 获取 API 密钥

1. 登录您的账户 [https://example.com/login](https://example.com/login)
2. 进入 **设置** > **API 密钥**
3. 点击 **生成新密钥**
4. 复制生成的密钥并妥善保存

### 使用 API 密钥

在所有请求的 `Authorization` 头部中包含您的 API 密钥:

```http
GET /api/v1/users HTTP/1.1
Authorization: Bearer your_api_key_here

警告:切勿在客户端代码或公开仓库中共享您的 API 密钥。

发起首次请求¶

获取 API 密钥后,即可向 API 发起首次请求。以下是通过 cURL 的示例:

curl -X GET https://api.example.com/v1/users \
  -H "Authorization: Bearer your_api_key_here"

API 将返回 JSON 格式响应:

{
  "users": [
    {
      "id": 1,
      "name": "John Doe",
      "email": "john@example.com"
    }
  ]
}

速率限制¶

本 API 设有以下速率限制:

套餐类型 每分钟请求数
免费版 60
专业版 1,000
企业版 10,000

Documentation built with MkDocs.

Search

From here you can search these documents. Enter your search terms below.

Keyboard Shortcuts

Keys Action
? Open this help
n Next page
p Previous page
s Search