资源#
资源是指诸如内存、大型语言模型(LLM)、查询引擎或聊天历史实例等外部依赖项,它们将在运行时被注入到工作流的各个步骤中。
资源是将工作流步骤与Python对象绑定的强大方式,否则我们需要每次都手动创建这些对象。出于性能考虑,默认情况下资源会被工作流缓存,这意味着相同的资源实例会被传递给所有需要注入的步骤。掌握这个概念非常重要,因为缓存和非缓存的资源可能导致意外行为,让我们详细了解一下。
资源默认被缓存#
首先,要在代码中使用资源,我们需要从resource
子模块导入Resource
:
from llama_index.core.workflow.resource import Resource
from llama_index.core.workflow import (
Event,
step,
StartEvent,
StopEvent,
Workflow,
)
Resource
包装了一个函数或可调用对象,该函数必须返回与资源定义中相同类型的对象,请看示例:
from typing import Annotated
from llama_index.core.memory import Memory
def get_memory(*args, **kwargs) -> Memory:
return Memory.from_defaults("user_id_123", token_limit=60000)
resource = Annotated[Memory, Resource(get_memory)]
在上面的例子中,Annotated[Memory, Resource(get_memory)
定义了一个类型为Memory
的资源,该资源将在运行时由get_memory()
函数提供。像这样定义的资源可以通过将其作为方法参数传递来注入到步骤中:
import random
from typing import Union
from llama_index.core.llms import ChatMessage
RANDOM_MESSAGES = [
"Hello World!",
"Python is awesome!",
"Resources are great!",
]
class CustomStartEvent(StartEvent):
message: str
class SecondEvent(Event):
message: str
class ThirdEvent(Event):
message: str
class WorkflowWithMemory(Workflow):
@step
async def first_step(
self,
ev: CustomStartEvent,
memory: Annotated[Memory, Resource(get_memory)],
) -> SecondEvent:
await memory.aput(
ChatMessage.from_str(
role="user", content="First step: " + ev.message
)
)
return SecondEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])
@step
async def second_step(
self, ev: SecondEvent, memory: Annotated[Memory, Resource(get_memory)]
) -> Union[ThirdEvent, StopEvent]:
await memory.aput(
ChatMessage(role="assistant", content="Second step: " + ev.message)
)
if random.randint(0, 1) == 0:
return ThirdEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])
else:
messages = await memory.aget_all()
return StopEvent(result=messages)
@step
async def third_step(
self, ev: ThirdEvent, memory: Annotated[Memory, Resource(get_memory)]
) -> StopEvent:
await memory.aput(
ChatMessage(role="user", content="Third step: " + ev.message)
)
messages = await memory.aget_all()
return StopEvent(result=messages)
如你所见,每个步骤都可以访问memory
资源并写入数据。需要注意的是,get_memory()
只会被调用一次,相同的内存实例会被注入到不同的步骤中。我们可以通过运行工作流来验证这一点:
wf = WorkflowWithMemory(disable_validation=True)
async def main():
messages = await wf.run(
start_event=CustomStartEvent(message="Happy birthday!")
)
for m in messages:
print(m.blocks[0].text)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
可能的运行结果如下:
First step: Happy birthday!
Second step: Python is awesome!
Third step: Hello World!
这表明每个步骤都将消息添加到了全局内存中,这正是我们所期望的!
需要注意的是,资源在同一工作流实例的不同步骤间是保持的,但在不同工作流间不会共享。如果我们运行两个WorkflowWithMemory
实例,get_memory
会为每个工作流各调用一次,因此它们的内存将是独立且互不影响的:
wf1 = WorkflowWithMemory(disable_validation=True)
wf2 = WorkflowWithMemory(disable_validation=True)
async def main():
messages1 = await wf1.run(
start_event=CustomStartEvent(message="Happy birthday!")
)
messages2 = await wf1.run(
start_event=CustomStartEvent(message="Happy New Year!")
)
for m in messages1:
print(m.blocks[0].text)
print("===================")
for m in messages2:
print(m.blocks[0].text)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
可能的输出如下:
First step: Happy birthday!
Second step: Resources are great!
===================
First step: Happy New Year!
Second step: Python is awesome!
禁用资源缓存#
如果我们在定义资源时向Resource
传递cache=False
参数,那么每次将资源注入步骤时都会调用包装的函数。这种行为有时是可取的,让我们看一个使用自定义Counter
类的简单示例:
from pydantic import BaseModel, Field
class Counter(BaseModel):
counter: int = Field(description="A simple counter", default=0)
async def increment(self) -> None:
self.counter += 1
def get_counter() -> Counter:
return Counter()
class SecondEvent(Event):
count: int
class WorkflowWithCounter(Workflow):
@step
async def first_step(
self,
ev: StartEvent,
counter: Annotated[Counter, Resource(get_counter, cache=False)],
) -> SecondEvent:
await counter.increment()
return SecondEvent(count=counter.counter)
@step
async def second_step(
self,
ev: SecondEvent,
counter: Annotated[Counter, Resource(get_counter, cache=False)],
) -> StopEvent:
print("Counter at first step: ", ev.count)
await counter.increment()
print("Counter at second step: ", counter.counter)
return StopEvent(result="End of Workflow")
如果我们现在运行这个工作流,将得到如下输出:
Counter at first step: 1
Counter at second step: 1
关于有状态和无状态资源的说明#
正如我们所看到的,缓存的资源预期是有状态的,这意味着它们可以在不同的工作流运行和不同步骤间保持状态,除非另有说明。但这并不意味着仅仅因为禁用了缓存,我们就可以认为资源是无状态的。让我们看一个例子:
global_mem = Memory.from_defaults("global_id", token_limit=60000)
def get_memory(*args, **kwargs) -> Memory:
return global_mem
如果我们使用Annotated[Memory, Resource(get_memory, cache=False)]
禁用缓存,函数get_memory
将被多次调用,但资源实例始终相同。这样的资源应被视为有状态的,而不考虑其缓存行为。
现在我们已经掌握了资源的概念,让我们来看看工作流中的可观测性和调试。