In [ ]:
Copied!
%pip install -U llama-index-core llama-index-llms-openai
%pip install -U llama-index-core llama-index-llms-openai
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
配置说明¶
要实现此功能,我们需要两个关键组件:
- 一个内存区块,用于将所有历史聊天消息压缩为单个字符串,同时保持令牌限制
- 一个配置了令牌限制的
Memory
实例,该实例会使用上述内存区块,确保多轮对话内容始终被刷新到内存区块进行处理
首先,自定义内存块:
In [ ]:
Copied!
import tiktoken
from pydantic import Field
from typing import List, Optional, Any
from llama_index.core.llms import ChatMessage, TextBlock
from llama_index.core.memory import Memory, BaseMemoryBlock
class CondensedMemoryBlock(BaseMemoryBlock[str]):
current_memory: List[str] = Field(default_factory=list)
token_limit: int = Field(default=50000)
tokenizer: tiktoken.Encoding = tiktoken.encoding_for_model(
"gpt-4o"
) # all openai models use 4o tokenizer these days
async def _aget(
self, messages: Optional[List[ChatMessage]] = None, **block_kwargs: Any
) -> str:
"""Return the current memory block contents."""
return "\n".join(self.current_memory)
async def _aput(self, messages: List[ChatMessage]) -> None:
"""Push messages into the memory block. (Only handles text content)"""
# construct a string for each message
for message in messages:
text_contents = "\n".join(
block.text
for block in message.blocks
if isinstance(block, TextBlock)
)
memory_str = f"<message role={message.role}>"
if text_contents:
memory_str += f"\n{text_contents}"
# include additional kwargs, like tool calls, when needed
# filter out injected session_id
kwargs = {
key: val
for key, val in message.additional_kwargs.items()
if key != "session_id"
}
if kwargs:
memory_str += f"\n({kwargs})"
memory_str += "\n</message>"
self.current_memory.append(memory_str)
# ensure this memory block doesn't get too large
message_length = sum(
len(self.tokenizer.encode(message))
for message in self.current_memory
)
while message_length > self.token_limit:
self.current_memory = self.current_memory[1:]
message_length = sum(
len(self.tokenizer.encode(message))
for message in self.current_memory
)
import tiktoken
from pydantic import Field
from typing import List, Optional, Any
from llama_index.core.llms import ChatMessage, TextBlock
from llama_index.core.memory import Memory, BaseMemoryBlock
class CondensedMemoryBlock(BaseMemoryBlock[str]):
current_memory: List[str] = Field(default_factory=list)
token_limit: int = Field(default=50000)
tokenizer: tiktoken.Encoding = tiktoken.encoding_for_model(
"gpt-4o"
) # all openai models use 4o tokenizer these days
async def _aget(
self, messages: Optional[List[ChatMessage]] = None, **block_kwargs: Any
) -> str:
"""Return the current memory block contents."""
return "\n".join(self.current_memory)
async def _aput(self, messages: List[ChatMessage]) -> None:
"""Push messages into the memory block. (Only handles text content)"""
# construct a string for each message
for message in messages:
text_contents = "\n".join(
block.text
for block in message.blocks
if isinstance(block, TextBlock)
)
memory_str = f""
if text_contents:
memory_str += f"\n{text_contents}"
# include additional kwargs, like tool calls, when needed
# filter out injected session_id
kwargs = {
key: val
for key, val in message.additional_kwargs.items()
if key != "session_id"
}
if kwargs:
memory_str += f"\n({kwargs})"
memory_str += "\n "
self.current_memory.append(memory_str)
# ensure this memory block doesn't get too large
message_length = sum(
len(self.tokenizer.encode(message))
for message in self.current_memory
)
while message_length > self.token_limit:
self.current_memory = self.current_memory[1:]
message_length = sum(
len(self.tokenizer.encode(message))
for message in self.current_memory
)
然后,创建一个使用该内存块的 Memory
实例,同时为短期记忆配置非常有限的令牌限制:
In [ ]:
Copied!
block = CondensedMemoryBlock(name="condensed_memory")
memory = Memory.from_defaults(
session_id="test-mem-01",
token_limit=60000,
token_flush_size=5000,
async_database_uri="sqlite+aiosqlite:///:memory:",
memory_blocks=[block],
insert_method="user",
# Prevent the short-term chat history from containing too many turns!
# This limit will effectively mean that the short-term memory is always flushed
chat_history_token_ratio=0.0001,
)
block = CondensedMemoryBlock(name="condensed_memory")
memory = Memory.from_defaults(
session_id="test-mem-01",
token_limit=60000,
token_flush_size=5000,
async_database_uri="sqlite+aiosqlite:///:memory:",
memory_blocks=[block],
insert_method="user",
# Prevent the short-term chat history from containing too many turns!
# This limit will effectively mean that the short-term memory is always flushed
chat_history_token_ratio=0.0001,
)
使用方式¶
让我们通过一些模拟消息来探索其使用方法,并观察内存管理的运作机制。
In [ ]:
Copied!
initial_messages = [
ChatMessage(role="user", content="Hello! My name is Logan"),
ChatMessage(role="assistant", content="Hello! How can I help you?"),
ChatMessage(role="user", content="What is the capital of France?"),
ChatMessage(role="assistant", content="The capital of France is Paris"),
]
initial_messages = [
ChatMessage(role="user", content="Hello! My name is Logan"),
ChatMessage(role="assistant", content="Hello! How can I help you?"),
ChatMessage(role="user", content="What is the capital of France?"),
ChatMessage(role="assistant", content="The capital of France is Paris"),
]
In [ ]:
Copied!
await memory.aput_messages(initial_messages)
await memory.aput_messages(initial_messages)
然后,让我们添加下一条用户消息!
In [ ]:
Copied!
await memory.aput_messages(
[ChatMessage(role="user", content="What was my name again?")]
)
await memory.aput_messages(
[ChatMessage(role="user", content="What was my name again?")]
)
这样,我们就能在发送给大语言模型(LLM)之前,查看聊天历史记录的具体内容。
In [ ]:
Copied!
chat_history = await memory.aget()
for message in chat_history:
print(message.role)
print(message.content)
print()
chat_history = await memory.aget()
for message in chat_history:
print(message.role)
print(message.content)
print()
MessageRole.USER <memory> <condensed_memory> <message role=MessageRole.USER> Hello! My name is Logan </message> <message role=MessageRole.ASSISTANT> Hello! How can I help you? </message> <message role=MessageRole.USER> What is the capital of France? </message> <message role=MessageRole.ASSISTANT> The capital of France is Paris </message> </condensed_memory> </memory> What was my name again?
太棒了!尽管我们添加了许多消息,但它们都被压缩成了一条用户消息!
接下来让我们尝试使用一个实际的代理。
代理使用¶
在此,我们可以创建一个带有简单工具的 FunctionAgent
,该工具将使用我们的记忆功能。
In [ ]:
Copied!
from llama_index.core.agent.workflow import FunctionAgent
from llama_index.llms.openai import OpenAI
def multiply(a: float, b: float) -> float:
"""Multiply two numbers."""
return a * b
def divide(a: float, b: float) -> float:
"""Divide two numbers."""
return a / b
def add(a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def subtract(a: float, b: float) -> float:
"""Subtract two numbers."""
return a - b
llm = OpenAI(model="gpt-4.1-mini")
agent = FunctionAgent(
tools=[multiply, divide, add, subtract],
llm=llm,
system_prompt="You are a helpful assistant that can do simple math operations with tools.",
)
from llama_index.core.agent.workflow import FunctionAgent
from llama_index.llms.openai import OpenAI
def multiply(a: float, b: float) -> float:
"""Multiply two numbers."""
return a * b
def divide(a: float, b: float) -> float:
"""Divide two numbers."""
return a / b
def add(a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def subtract(a: float, b: float) -> float:
"""Subtract two numbers."""
return a - b
llm = OpenAI(model="gpt-4.1-mini")
agent = FunctionAgent(
tools=[multiply, divide, add, subtract],
llm=llm,
system_prompt="You are a helpful assistant that can do simple math operations with tools.",
)
In [ ]:
Copied!
block = CondensedMemoryBlock(name="condensed_memory")
memory = Memory.from_defaults(
session_id="test-mem-01",
token_limit=60000,
token_flush_size=5000,
async_database_uri="sqlite+aiosqlite:///:memory:",
memory_blocks=[block],
insert_method="user",
# Prevent the short-term chat history from containing too many turns!
# This limit will effectively mean that the short-term memory is always flushed
chat_history_token_ratio=0.0001,
)
block = CondensedMemoryBlock(name="condensed_memory")
memory = Memory.from_defaults(
session_id="test-mem-01",
token_limit=60000,
token_flush_size=5000,
async_database_uri="sqlite+aiosqlite:///:memory:",
memory_blocks=[block],
insert_method="user",
# Prevent the short-term chat history from containing too many turns!
# This limit will effectively mean that the short-term memory is always flushed
chat_history_token_ratio=0.0001,
)
In [ ]:
Copied!
resp = await agent.run("What is (3214 * 322) / 2?", memory=memory)
print(resp)
resp = await agent.run("What is (3214 * 322) / 2?", memory=memory)
print(resp)
The value of (3214 * 322) / 2 is 517454.0.
In [ ]:
Copied!
current_chat_history = await memory.aget()
for message in current_chat_history:
print(message.role)
print(message.content)
print()
current_chat_history = await memory.aget()
for message in current_chat_history:
print(message.role)
print(message.content)
print()
MessageRole.ASSISTANT The value of (3214 * 322) / 2 is 517454.0. MessageRole.USER <memory> <condensed_memory> <message role=MessageRole.USER> What is (3214 * 322) / 2? </message> <message role=MessageRole.ASSISTANT> ({'tool_calls': [{'index': 0, 'id': 'call_U78I0CSWETFQlRBCWPpswEmq', 'function': {'arguments': '{"a": 3214, "b": 322}', 'name': 'multiply'}, 'type': 'function'}, {'index': 1, 'id': 'call_3eFXqalMN9PyiCVEYE073bEl', 'function': {'arguments': '{"a": 3214, "b": 2}', 'name': 'divide'}, 'type': 'function'}]}) </message> <message role=MessageRole.TOOL> 1034908 ({'tool_call_id': 'call_U78I0CSWETFQlRBCWPpswEmq'}) </message> <message role=MessageRole.TOOL> 1607.0 ({'tool_call_id': 'call_3eFXqalMN9PyiCVEYE073bEl'}) </message> <message role=MessageRole.ASSISTANT> ({'tool_calls': [{'index': 0, 'id': 'call_GvtLKm7FCzlaucfYnaxOLBVW', 'function': {'arguments': '{"a":1034908,"b":2}', 'name': 'divide'}, 'type': 'function'}]}) </message> <message role=MessageRole.TOOL> 517454.0 ({'tool_call_id': 'call_GvtLKm7FCzlaucfYnaxOLBVW'}) </message> </condensed_memory> </memory>
完美!由于记忆库中尚未存储新的用户消息,系统已将当前对话状态添加为一条记忆记录。在下一条用户消息到来时,这条记忆记录将与新消息合并处理,正如我们之前看到的那样。
让我们通过几个后续对话来验证这个机制是否正常工作
In [ ]:
Copied!
resp = await agent.run(
"What was the last question I asked you?", memory=memory
)
print(resp)
resp = await agent.run(
"What was the last question I asked you?", memory=memory
)
print(resp)
The last question you asked was: "What is (3214 * 322) / 2?"
In [ ]:
Copied!
resp = await agent.run(
"And how did you go about answering that message?", memory=memory
)
print(resp)
resp = await agent.run(
"And how did you go about answering that message?", memory=memory
)
print(resp)
To answer your question "What is (3214 * 322) / 2?", I followed these steps: 1. First, I multiplied 3214 by 322. 2. Then, I divided the result of that multiplication by 2. 3. Finally, I provided you with the result of the calculation, which is 517454.0.