%%capture --no-stderr
%pip install -U langchain_openai langgraph
import os, getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
_set_env("LANGSMITH_API_KEY")
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "langchain-academy"
问题描述¶
Map-reduce(映射-归约)操作对于高效的任务分解和并行处理至关重要。
该操作包含两个阶段:
(1) Map
(映射阶段) - 将任务拆分为更小的子任务,并行处理每个子任务。
(2) Reduce
(归约阶段) - 汇总所有已完成并行子任务的结果。
我们需要设计一个能实现以下功能的系统:
(1) Map
(映射阶段) - 生成与特定主题相关的一系列笑话。
(2) Reduce
(归约阶段) - 从列表中选出最佳笑话。
我们将使用大型语言模型(LLM)来完成笑话生成和选择的工作。
from langchain_openai import ChatOpenAI
# Prompts we will use
subjects_prompt = """Generate a list of 3 sub-topics that are all related to this overall topic: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one, starting 0 as the ID for the first joke. Jokes: \n\n {jokes}"""
# LLM
model = ChatOpenAI(model="gpt-4o", temperature=0)
import operator
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel
class Subjects(BaseModel):
subjects: list[str]
class BestJoke(BaseModel):
id: int
class OverallState(TypedDict):
topic: str
subjects: list
jokes: Annotated[list, operator.add]
best_selected_joke: str
生成笑话主题。
def generate_topics(state: OverallState):
prompt = subjects_prompt.format(topic=state["topic"])
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}
神奇之处在于:我们使用 Send 功能为每个主题生成笑话。
这非常实用!它可以自动并行地为任意数量的主题生成笑话。
generate_joke
:图中节点的名称{"subject": s
}:要发送的状态
Send
允许你将任意状态传递给 generate_joke
!这些状态不必与 OverallState
保持一致。
在本例中,generate_joke
使用其自身的内部状态,我们可以通过 Send
来填充这些状态。
from langgraph.types import Send
def continue_to_jokes(state: OverallState):
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
class JokeState(TypedDict):
subject: str
class Joke(BaseModel):
joke: str
def generate_joke(state: JokeState):
prompt = joke_prompt.format(subject=state["subject"])
response = model.with_structured_output(Joke).invoke(prompt)
return {"jokes": [response.joke]}
最佳笑话筛选(精简版)¶
现在,我们添加逻辑来挑选最佳笑话。
def best_joke(state: OverallState):
jokes = "\n\n".join(state["jokes"])
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
response = model.with_structured_output(BestJoke).invoke(prompt)
return {"best_selected_joke": state["jokes"][response.id]}
编译¶
from IPython.display import Image
from langgraph.graph import END, StateGraph, START
# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
# Compile the graph
app = graph.compile()
Image(app.get_graph().draw_mermaid_png())
# Call the graph: here we call it to generate a list of jokes
for s in app.stream({"topic": "animals"}):
print(s)
{'generate_topics': {'subjects': ['mammals', 'reptiles', 'birds']}} {'generate_joke': {'jokes': ["Why don't mammals ever get lost? Because they always follow their 'instincts'!"]}} {'generate_joke': {'jokes': ["Why don't alligators like fast food? Because they can't catch it!"]}} {'generate_joke': {'jokes': ["Why do birds fly south for the winter? Because it's too far to walk!"]}} {'best_joke': {'best_selected_joke': "Why don't alligators like fast food? Because they can't catch it!"}}
工作室¶
⚠️ 免责声明
自这些视频拍摄以来,我们已对工作室进行了更新,使其可在本地运行并在浏览器中打开。现在这是运行工作室的首选方式(而非如视频所示使用桌面应用程序)。有关本地开发服务器的文档请参见此处,运行本地工作室的说明请参见此处。要启动本地开发服务器,请在本模块的/studio
目录下的终端中运行以下命令:
langgraph dev
您将看到如下输出:
- 🚀 API: http://127.0.0.1:2024
- � 工作室界面: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API文档: http://127.0.0.1:2024/docs
打开浏览器并访问工作室界面:https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
。
让我们在工作室界面中加载上述图表,该界面使用module-4/studio/map_reduce.py
中设置的内容,配置文件位于module-4/studio/langgraph.json
。