支持个人资料架构的聊天机器人¶
回顾¶
我们介绍了LangGraph记忆存储作为保存和检索长期记忆的方法。
我们构建了一个简单的聊天机器人,它同时使用了短期记忆(线程内)
和长期记忆(跨线程)
。
当用户与机器人聊天时,它会实时保存长期语义记忆(关于用户的事实)"在热路径中"。
目标¶
我们的聊天机器人目前将记忆保存为字符串。但在实际应用中,我们通常希望记忆具有结构化特征。
例如,记忆可以是单一、持续更新的架构。
在我们的案例中,我们希望将其作为单一用户资料。
我们将扩展聊天机器人功能,使其能将语义记忆保存到单一的用户资料中。
同时我们将引入Trustcall库,用于使用新信息更新这个架构。
%%capture --no-stderr
%pip install -U langchain_openai langgraph trustcall langchain_core
import os, getpass
def _set_env(var: str):
# Check if the variable is set in the OS environment
env_value = os.environ.get(var)
if not env_value:
# If not set, prompt the user for input
env_value = getpass.getpass(f"{var}: ")
# Set the environment variable for the current process
os.environ[var] = env_value
_set_env("LANGSMITH_API_KEY")
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "langchain-academy"
from typing import TypedDict, List
class UserProfile(TypedDict):
"""User profile schema with typed fields"""
user_name: str # The user's preferred name
interests: List[str] # A list of the user's interests
将模式保存至存储库¶
LangGraph 存储库接受任意 Python 字典作为 value
值。
# TypedDict instance
user_profile: UserProfile = {
"user_name": "Lance",
"interests": ["biking", "technology", "coffee"]
}
user_profile
{'user_name': 'Lance', 'interests': ['biking', 'technology', 'coffee']}
我们使用 put 方法将 TypedDict 保存到存储中。
import uuid
from langgraph.store.memory import InMemoryStore
# Initialize the in-memory store
in_memory_store = InMemoryStore()
# Namespace for the memory to save
user_id = "1"
namespace_for_memory = (user_id, "memory")
# Save a memory to namespace as key and value
key = "user_profile"
value = user_profile
in_memory_store.put(namespace_for_memory, key, value)
我们使用search方法通过命名空间从存储中检索对象。
# Search
for m in in_memory_store.search(namespace_for_memory):
print(m.dict())
{'value': {'user_name': 'Lance', 'interests': ['biking', 'technology', 'coffee']}, 'key': 'user_profile', 'namespace': ['1', 'memory'], 'created_at': '2024-11-04T23:37:34.871675+00:00', 'updated_at': '2024-11-04T23:37:34.871680+00:00'}
我们也可以通过 get 方法,根据命名空间和键来检索特定对象。
# Get the memory by namespace and key
profile = in_memory_store.get(namespace_for_memory, "user_profile")
profile.value
{'user_name': 'Lance', 'interests': ['biking', 'technology', 'coffee']}
带个人资料架构的聊天机器人¶
现在我们已经了解如何为记忆指定架构并将其保存到存储中。
那么,我们如何实际创建符合这个特定架构的记忆呢?
在我们的聊天机器人中,我们需要从用户聊天记录创建记忆。
这时结构化输出的概念就派上用场了。
LangChain的聊天模型接口提供了with_structured_output
方法来强制生成结构化输出。
当我们希望确保输出符合特定架构时,这个方法非常有用,它还会自动为我们解析输出内容。
_set_env("OPENAI_API_KEY")
让我们将创建的 UserProfile
模式传递给 with_structured_output
方法。
随后,我们可以通过一组消息调用聊天模型,并获取符合我们模式的结构化输出。
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
# Initialize the model
model = ChatOpenAI(model="gpt-4o", temperature=0)
# Bind schema to model
model_with_structure = model.with_structured_output(UserProfile)
# Invoke the model to produce structured output that matches the schema
structured_output = model_with_structure.invoke([HumanMessage("My name is Lance, I like to bike.")])
structured_output
{'user_name': 'Lance', 'interests': ['biking']}
现在,让我们将这个功能应用到聊天机器人中。
只需对 write_memory
函数进行少量修改即可实现。
我们使用上文定义的 model_with_structure
来生成符合我们架构的用户画像。
from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.store.base import BaseStore
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.runnables.config import RunnableConfig
# Chatbot instruction
MODEL_SYSTEM_MESSAGE = """You are a helpful assistant with memory that provides information about the user.
If you have memory for this user, use it to personalize your responses.
Here is the memory (it may be empty): {memory}"""
# Create new memory from the chat history and any existing memory
CREATE_MEMORY_INSTRUCTION = """Create or update a user profile memory based on the user's chat history.
This will be saved for long-term memory. If there is an existing memory, simply update it.
Here is the existing memory (it may be empty): {memory}"""
def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):
"""Load memory from the store and use it to personalize the chatbot's response."""
# Get the user ID from the config
user_id = config["configurable"]["user_id"]
# Retrieve memory from the store
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")
# Format the memories for the system prompt
if existing_memory and existing_memory.value:
memory_dict = existing_memory.value
formatted_memory = (
f"Name: {memory_dict.get('user_name', 'Unknown')}\n"
f"Interests: {', '.join(memory_dict.get('interests', []))}"
)
else:
formatted_memory = None
# Format the memory in the system prompt
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=formatted_memory)
# Respond using memory as well as the chat history
response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])
return {"messages": response}
def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
"""Reflect on the chat history and save a memory to the store."""
# Get the user ID from the config
user_id = config["configurable"]["user_id"]
# Retrieve existing memory from the store
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")
# Format the memories for the system prompt
if existing_memory and existing_memory.value:
memory_dict = existing_memory.value
formatted_memory = (
f"Name: {memory_dict.get('user_name', 'Unknown')}\n"
f"Interests: {', '.join(memory_dict.get('interests', []))}"
)
else:
formatted_memory = None
# Format the existing memory in the instruction
system_msg = CREATE_MEMORY_INSTRUCTION.format(memory=formatted_memory)
# Invoke the model to produce structured output that matches the schema
new_memory = model_with_structure.invoke([SystemMessage(content=system_msg)]+state['messages'])
# Overwrite the existing use profile memory
key = "user_memory"
store.put(namespace, key, new_memory)
# Define the graph
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)
# Store for long-term (across-thread) memory
across_thread_memory = InMemoryStore()
# Checkpointer for short-term (within-thread) memory
within_thread_memory = MemorySaver()
# Compile the graph with the checkpointer fir and store
graph = builder.compile(checkpointer=within_thread_memory, store=across_thread_memory)
# View
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
# We supply a thread ID for short-term (within-thread) memory
# We supply a user ID for long-term (across-thread) memory
config = {"configurable": {"thread_id": "1", "user_id": "1"}}
# User input
input_messages = [HumanMessage(content="Hi, my name is Lance and I like to bike around San Francisco and eat at bakeries.")]
# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
================================ Human Message ================================= Hi, my name is Lance and I like to bike around San Francisco and eat at bakeries. ================================== Ai Message ================================== Hi Lance! It's great to meet you. Biking around San Francisco sounds like a fantastic way to explore the city, and there are so many amazing bakeries to try. Do you have any favorite bakeries or biking routes in the city?
让我们检查存储中的内存。
可以看到内存是一个符合我们架构的字典。
# Namespace for the memory to save
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.value
{'user_name': 'Lance', 'interests': ['biking', 'bakeries', 'San Francisco']}
何时可能失效?¶
with_structured_output
功能非常实用,但当我们处理更复杂的模式时会发生什么?
这里展示了一个更复杂的模式示例,我们将在下文进行测试。
这是一个基于 Pydantic 的模型,用于描述用户在通信和信任验证方面的偏好设置。
from typing import List, Optional
class OutputFormat(BaseModel):
preference: str
sentence_preference_revealed: str
class TelegramPreferences(BaseModel):
preferred_encoding: Optional[List[OutputFormat]] = None
favorite_telegram_operators: Optional[List[OutputFormat]] = None
preferred_telegram_paper: Optional[List[OutputFormat]] = None
class MorseCode(BaseModel):
preferred_key_type: Optional[List[OutputFormat]] = None
favorite_morse_abbreviations: Optional[List[OutputFormat]] = None
class Semaphore(BaseModel):
preferred_flag_color: Optional[List[OutputFormat]] = None
semaphore_skill_level: Optional[List[OutputFormat]] = None
class TrustFallPreferences(BaseModel):
preferred_fall_height: Optional[List[OutputFormat]] = None
trust_level: Optional[List[OutputFormat]] = None
preferred_catching_technique: Optional[List[OutputFormat]] = None
class CommunicationPreferences(BaseModel):
telegram: TelegramPreferences
morse_code: MorseCode
semaphore: Semaphore
class UserPreferences(BaseModel):
communication_preferences: CommunicationPreferences
trust_fall_preferences: TrustFallPreferences
class TelegramAndTrustFallPreferences(BaseModel):
pertinent_user_preferences: UserPreferences
现在,让我们尝试使用 with_structured_output
方法来提取这个模式。
from pydantic import ValidationError
# Bind schema to model
model_with_structure = model.with_structured_output(TelegramAndTrustFallPreferences)
# Conversation
conversation = """Operator: How may I assist with your telegram, sir?
Customer: I need to send a message about our trust fall exercise.
Operator: Certainly. Morse code or standard encoding?
Customer: Morse, please. I love using a straight key.
Operator: Excellent. What's your message?
Customer: Tell him I'm ready for a higher fall, and I prefer the diamond formation for catching.
Operator: Done. Shall I use our "Daredevil" paper for this daring message?
Customer: Perfect! Send it by your fastest carrier pigeon.
Operator: It'll be there within the hour, sir."""
# Invoke the model
try:
model_with_structure.invoke(f"""Extract the preferences from the following conversation:
<convo>
{conversation}
</convo>""")
except ValidationError as e:
print(e)
1 validation error for TelegramAndTrustFallPreferences pertinent_user_preferences.communication_preferences.semaphore Input should be a valid dictionary or instance of Semaphore [type=model_type, input_value=None, input_type=NoneType] For further information visit https://errors.pydantic.dev/2.9/v/model_type
如果我们简单地提取更复杂的模式,即使使用像 gpt-4o
这样的高容量模型,也容易失败。
用于创建和更新概要模式的 TrustCall¶
正如我们所看到的,处理模式可能会很棘手。
复杂的模式可能难以提取。
此外,即使是更新简单的模式也可能带来挑战。
以我们之前提到的聊天机器人为例。
每次选择保存新记忆时,我们都要从零开始重新生成概要模式。
这种做法效率低下,如果模式包含大量需要每次重新生成的信息,可能会浪费模型令牌。
更糟糕的是,在从头开始重新生成概要时,我们可能会丢失信息。
TrustCall 正是为解决这些问题而诞生的!
这是一个由 LangChain 团队的 Will Fu-Hinthorn 开发的开源库,专门用于更新 JSON 模式。
它的开发动机正是源于处理记忆时遇到的这些挑战。
让我们首先展示如何使用 TrustCall 在这个消息列表上进行简单的提取操作。
# Conversation
conversation = [HumanMessage(content="Hi, I'm Lance."),
AIMessage(content="Nice to meet you, Lance."),
HumanMessage(content="I really like biking around San Francisco.")]
from trustcall import create_extractor
# Schema
class UserProfile(BaseModel):
"""User profile schema with typed fields"""
user_name: str = Field(description="The user's preferred name")
interests: List[str] = Field(description="A list of the user's interests")
# Initialize the model
model = ChatOpenAI(model="gpt-4o", temperature=0)
# Create the extractor
trustcall_extractor = create_extractor(
model,
tools=[UserProfile],
tool_choice="UserProfile"
)
# Instruction
system_msg = "Extract the user profile from the following conversation"
# Invoke the extractor
result = trustcall_extractor.invoke({"messages": [SystemMessage(content=system_msg)]+conversation})
当我们调用提取器时,会获得以下内容:
messages
:包含工具调用的AIMessages
列表。responses
:根据我们模式解析后的工具调用结果。response_metadata
:适用于更新现有工具调用的情况。它标明哪些响应对应于哪些现有对象。
for m in result["messages"]:
m.pretty_print()
================================== Ai Message ==================================
Tool Calls:
UserProfile (call_spGGUsoaUFXU7oOrUNCASzfL)
Call ID: call_spGGUsoaUFXU7oOrUNCASzfL
Args:
user_name: Lance
interests: ['biking around San Francisco']
schema = result["responses"]
schema
[UserProfile(user_name='Lance', interests=['biking around San Francisco'])]
schema[0].model_dump()
{'user_name': 'Lance', 'interests': ['biking around San Francisco']}
result["response_metadata"]
[{'id': 'call_spGGUsoaUFXU7oOrUNCASzfL'}]
让我们看看如何用它来更新个人资料。
更新时,TrustCall 需要接收一组消息以及现有的架构。
其核心思想是提示模型生成一个 JSON Patch,仅更新架构中相关的部分。
这种方法比直接覆盖整个架构更不易出错。
同时也更高效,因为模型只需生成架构中发生变化的部分。
我们可以将现有架构保存为字典格式。
使用 model_dump()
方法可以将 Pydantic 模型实例序列化为字典。
我们将其与架构名称 UserProfile
一起传递给 "existing"
参数。
# Update the conversation
updated_conversation = [HumanMessage(content="Hi, I'm Lance."),
AIMessage(content="Nice to meet you, Lance."),
HumanMessage(content="I really like biking around San Francisco."),
AIMessage(content="San Francisco is a great city! Where do you go after biking?"),
HumanMessage(content="I really like to go to a bakery after biking."),]
# Update the instruction
system_msg = f"""Update the memory (JSON doc) to incorporate new information from the following conversation"""
# Invoke the extractor with the updated instruction and existing profile with the corresponding tool name (UserProfile)
result = trustcall_extractor.invoke({"messages": [SystemMessage(content=system_msg)]+updated_conversation},
{"existing": {"UserProfile": schema[0].model_dump()}})
for m in result["messages"]:
m.pretty_print()
================================== Ai Message ==================================
Tool Calls:
UserProfile (call_WeZl0ACfQStxblim0ps8LNKT)
Call ID: call_WeZl0ACfQStxblim0ps8LNKT
Args:
user_name: Lance
interests: ['biking', 'visiting bakeries']
result["response_metadata"]
[{'id': 'call_WeZl0ACfQStxblim0ps8LNKT'}]
updated_schema = result["responses"][0]
updated_schema.model_dump()
{'user_name': 'Lance', 'interests': ['biking', 'visiting bakeries']}
LangSmith 追踪记录:
https://smith.langchain.com/public/229eae22-1edb-44c6-93e6-489124a43968/r
现在,让我们也在之前看到的复杂模式上测试 Trustcall。
bound = create_extractor(
model,
tools=[TelegramAndTrustFallPreferences],
tool_choice="TelegramAndTrustFallPreferences",
)
# Conversation
conversation = """Operator: How may I assist with your telegram, sir?
Customer: I need to send a message about our trust fall exercise.
Operator: Certainly. Morse code or standard encoding?
Customer: Morse, please. I love using a straight key.
Operator: Excellent. What's your message?
Customer: Tell him I'm ready for a higher fall, and I prefer the diamond formation for catching.
Operator: Done. Shall I use our "Daredevil" paper for this daring message?
Customer: Perfect! Send it by your fastest carrier pigeon.
Operator: It'll be there within the hour, sir."""
result = bound.invoke(
f"""Extract the preferences from the following conversation:
<convo>
{conversation}
</convo>"""
)
# Extract the preferences
result["responses"][0]
TelegramAndTrustFallPreferences(pertinent_user_preferences=UserPreferences(communication_preferences=CommunicationPreferences(telegram=TelegramPreferences(preferred_encoding=[OutputFormat(preference='standard encoding', sentence_preference_revealed='standard encoding')], favorite_telegram_operators=None, preferred_telegram_paper=[OutputFormat(preference='Daredevil', sentence_preference_revealed='Daredevil')]), morse_code=MorseCode(preferred_key_type=[OutputFormat(preference='straight key', sentence_preference_revealed='straight key')], favorite_morse_abbreviations=None), semaphore=Semaphore(preferred_flag_color=None, semaphore_skill_level=None)), trust_fall_preferences=TrustFallPreferences(preferred_fall_height=[OutputFormat(preference='higher', sentence_preference_revealed='higher')], trust_level=None, preferred_catching_technique=[OutputFormat(preference='diamond formation', sentence_preference_revealed='diamond formation')])))
支持个人资料架构更新的聊天机器人¶
现在,让我们将 Trustcall 集成到聊天机器人中,以实现个人资料记忆的创建与更新功能。
from IPython.display import Image, display
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables.config import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore
# Initialize the model
model = ChatOpenAI(model="gpt-4o", temperature=0)
# Schema
class UserProfile(BaseModel):
""" Profile of a user """
user_name: str = Field(description="The user's preferred name")
user_location: str = Field(description="The user's location")
interests: list = Field(description="A list of the user's interests")
# Create the extractor
trustcall_extractor = create_extractor(
model,
tools=[UserProfile],
tool_choice="UserProfile", # Enforces use of the UserProfile tool
)
# Chatbot instruction
MODEL_SYSTEM_MESSAGE = """You are a helpful assistant with memory that provides information about the user.
If you have memory for this user, use it to personalize your responses.
Here is the memory (it may be empty): {memory}"""
# Extraction instruction
TRUSTCALL_INSTRUCTION = """Create or update the memory (JSON doc) to incorporate information from the following conversation:"""
def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):
"""Load memory from the store and use it to personalize the chatbot's response."""
# Get the user ID from the config
user_id = config["configurable"]["user_id"]
# Retrieve memory from the store
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")
# Format the memories for the system prompt
if existing_memory and existing_memory.value:
memory_dict = existing_memory.value
formatted_memory = (
f"Name: {memory_dict.get('user_name', 'Unknown')}\n"
f"Location: {memory_dict.get('user_location', 'Unknown')}\n"
f"Interests: {', '.join(memory_dict.get('interests', []))}"
)
else:
formatted_memory = None
# Format the memory in the system prompt
system_msg = MODEL_SYSTEM_MESSAGE.format(memory=formatted_memory)
# Respond using memory as well as the chat history
response = model.invoke([SystemMessage(content=system_msg)]+state["messages"])
return {"messages": response}
def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
"""Reflect on the chat history and save a memory to the store."""
# Get the user ID from the config
user_id = config["configurable"]["user_id"]
# Retrieve existing memory from the store
namespace = ("memory", user_id)
existing_memory = store.get(namespace, "user_memory")
# Get the profile as the value from the list, and convert it to a JSON doc
existing_profile = {"UserProfile": existing_memory.value} if existing_memory else None
# Invoke the extractor
result = trustcall_extractor.invoke({"messages": [SystemMessage(content=TRUSTCALL_INSTRUCTION)]+state["messages"], "existing": existing_profile})
# Get the updated profile as a JSON object
updated_profile = result["responses"][0].model_dump()
# Save the updated profile
key = "user_memory"
store.put(namespace, key, updated_profile)
# Define the graph
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)
# Store for long-term (across-thread) memory
across_thread_memory = InMemoryStore()
# Checkpointer for short-term (within-thread) memory
within_thread_memory = MemorySaver()
# Compile the graph with the checkpointer fir and store
graph = builder.compile(checkpointer=within_thread_memory, store=across_thread_memory)
# View
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
# We supply a thread ID for short-term (within-thread) memory
# We supply a user ID for long-term (across-thread) memory
config = {"configurable": {"thread_id": "1", "user_id": "1"}}
# User input
input_messages = [HumanMessage(content="Hi, my name is Lance")]
# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
================================ Human Message ================================= Hi, my name is Lance ================================== Ai Message ================================== Hello, Lance! It's nice to meet you. How can I assist you today?
# User input
input_messages = [HumanMessage(content="I like to bike around San Francisco")]
# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
================================ Human Message ================================= I like to bike around San Francisco ================================== Ai Message ================================== That sounds like a great way to explore the city! San Francisco has some beautiful routes and views. Do you have any favorite trails or spots you like to visit while biking?
# Namespace for the memory to save
user_id = "1"
namespace = ("memory", user_id)
existing_memory = across_thread_memory.get(namespace, "user_memory")
existing_memory.dict()
{'value': {'user_name': 'Lance', 'user_location': 'San Francisco', 'interests': ['biking']}, 'key': 'user_memory', 'namespace': ['memory', '1'], 'created_at': '2024-11-04T23:51:17.662428+00:00', 'updated_at': '2024-11-04T23:51:41.697652+00:00'}
# The user profile saved as a JSON object
existing_memory.value
{'user_name': 'Lance', 'user_location': 'San Francisco', 'interests': ['biking']}
# User input
input_messages = [HumanMessage(content="I also enjoy going to bakeries")]
# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
================================ Human Message ================================= I also enjoy going to bakeries ================================== Ai Message ================================== Biking and visiting bakeries sounds like a delightful combination! San Francisco has some fantastic bakeries. Do you have any favorites, or are you looking for new recommendations to try out?
在新会话线程中继续讨论。
# We supply a thread ID for short-term (within-thread) memory
# We supply a user ID for long-term (across-thread) memory
config = {"configurable": {"thread_id": "2", "user_id": "1"}}
# User input
input_messages = [HumanMessage(content="What bakeries do you recommend for me?")]
# Run the graph
for chunk in graph.stream({"messages": input_messages}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
================================ Human Message ================================= What bakeries do you recommend for me? ================================== Ai Message ================================== Since you're in San Francisco and enjoy going to bakeries, here are a few recommendations you might like: 1. **Tartine Bakery** - Known for its delicious bread and pastries, it's a must-visit for any bakery enthusiast. 2. **B. Patisserie** - Offers a delightful selection of French pastries, including their famous kouign-amann. 3. **Arsicault Bakery** - Renowned for its croissants, which have been praised as some of the best in the country. 4. **Craftsman and Wolves** - Known for their inventive pastries and the "Rebel Within," a savory muffin with a soft-cooked egg inside. 5. **Mr. Holmes Bakehouse** - Famous for their cruffins and other creative pastries. These spots should offer a great variety of treats for you to enjoy. Happy bakery hopping!