Skip to content

资源#

资源是指诸如内存、大型语言模型(LLM)、查询引擎或聊天历史实例等外部依赖项,它们将在运行时被注入到工作流的各个步骤中。

资源是将工作流步骤与Python对象绑定的强大方式,否则我们需要每次都手动创建这些对象。出于性能考虑,默认情况下资源会被工作流缓存,这意味着相同的资源实例会被传递给所有需要注入的步骤。掌握这个概念非常重要,因为缓存和非缓存的资源可能导致意外行为,让我们详细了解一下。

资源默认被缓存#

首先,要在代码中使用资源,我们需要从resource子模块导入Resource

from llama_index.core.workflow.resource import Resource
from llama_index.core.workflow import (
    Event,
    step,
    StartEvent,
    StopEvent,
    Workflow,
)

Resource包装了一个函数或可调用对象,该函数必须返回与资源定义中相同类型的对象,请看示例:

from typing import Annotated
from llama_index.core.memory import Memory


def get_memory(*args, **kwargs) -> Memory:
    return Memory.from_defaults("user_id_123", token_limit=60000)


resource = Annotated[Memory, Resource(get_memory)]

在上面的例子中,Annotated[Memory, Resource(get_memory)定义了一个类型为Memory的资源,该资源将在运行时由get_memory()函数提供。像这样定义的资源可以通过将其作为方法参数传递来注入到步骤中:

import random

from typing import Union
from llama_index.core.llms import ChatMessage

RANDOM_MESSAGES = [
    "Hello World!",
    "Python is awesome!",
    "Resources are great!",
]


class CustomStartEvent(StartEvent):
    message: str


class SecondEvent(Event):
    message: str


class ThirdEvent(Event):
    message: str


class WorkflowWithMemory(Workflow):
    @step
    async def first_step(
        self,
        ev: CustomStartEvent,
        memory: Annotated[Memory, Resource(get_memory)],
    ) -> SecondEvent:
        await memory.aput(
            ChatMessage.from_str(
                role="user", content="First step: " + ev.message
            )
        )
        return SecondEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])

    @step
    async def second_step(
        self, ev: SecondEvent, memory: Annotated[Memory, Resource(get_memory)]
    ) -> Union[ThirdEvent, StopEvent]:
        await memory.aput(
            ChatMessage(role="assistant", content="Second step: " + ev.message)
        )
        if random.randint(0, 1) == 0:
            return ThirdEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])
        else:
            messages = await memory.aget_all()
            return StopEvent(result=messages)

    @step
    async def third_step(
        self, ev: ThirdEvent, memory: Annotated[Memory, Resource(get_memory)]
    ) -> StopEvent:
        await memory.aput(
            ChatMessage(role="user", content="Third step: " + ev.message)
        )
        messages = await memory.aget_all()
        return StopEvent(result=messages)

如你所见,每个步骤都可以访问memory资源并写入数据。需要注意的是,get_memory()只会被调用一次,相同的内存实例会被注入到不同的步骤中。我们可以通过运行工作流来验证这一点:

wf = WorkflowWithMemory(disable_validation=True)


async def main():
    messages = await wf.run(
        start_event=CustomStartEvent(message="Happy birthday!")
    )
    for m in messages:
        print(m.blocks[0].text)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

可能的运行结果如下:

First step: Happy birthday!
Second step: Python is awesome!
Third step: Hello World!

这表明每个步骤都将消息添加到了全局内存中,这正是我们所期望的!

需要注意的是,资源在同一工作流实例的不同步骤间是保持的,但在不同工作流间不会共享。如果我们运行两个WorkflowWithMemory实例,get_memory会为每个工作流各调用一次,因此它们的内存将是独立且互不影响的:

wf1 = WorkflowWithMemory(disable_validation=True)
wf2 = WorkflowWithMemory(disable_validation=True)


async def main():
    messages1 = await wf1.run(
        start_event=CustomStartEvent(message="Happy birthday!")
    )
    messages2 = await wf1.run(
        start_event=CustomStartEvent(message="Happy New Year!")
    )
    for m in messages1:
        print(m.blocks[0].text)
    print("===================")
    for m in messages2:
        print(m.blocks[0].text)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

可能的输出如下:

First step: Happy birthday!
Second step: Resources are great!
===================
First step: Happy New Year!
Second step: Python is awesome!

禁用资源缓存#

如果我们在定义资源时向Resource传递cache=False参数,那么每次将资源注入步骤时都会调用包装的函数。这种行为有时是可取的,让我们看一个使用自定义Counter类的简单示例:

from pydantic import BaseModel, Field


class Counter(BaseModel):
    counter: int = Field(description="A simple counter", default=0)

    async def increment(self) -> None:
        self.counter += 1


def get_counter() -> Counter:
    return Counter()


class SecondEvent(Event):
    count: int


class WorkflowWithCounter(Workflow):
    @step
    async def first_step(
        self,
        ev: StartEvent,
        counter: Annotated[Counter, Resource(get_counter, cache=False)],
    ) -> SecondEvent:
        await counter.increment()
        return SecondEvent(count=counter.counter)

    @step
    async def second_step(
        self,
        ev: SecondEvent,
        counter: Annotated[Counter, Resource(get_counter, cache=False)],
    ) -> StopEvent:
        print("Counter at first step: ", ev.count)
        await counter.increment()
        print("Counter at second step: ", counter.counter)
        return StopEvent(result="End of Workflow")

如果我们现在运行这个工作流,将得到如下输出:

Counter at first step:  1
Counter at second step:  1

关于有状态和无状态资源的说明#

正如我们所看到的,缓存的资源预期是有状态的,这意味着它们可以在不同的工作流运行和不同步骤间保持状态,除非另有说明。但这并不意味着仅仅因为禁用了缓存,我们就可以认为资源是无状态的。让我们看一个例子:

global_mem = Memory.from_defaults("global_id", token_limit=60000)


def get_memory(*args, **kwargs) -> Memory:
    return global_mem

如果我们使用Annotated[Memory, Resource(get_memory, cache=False)]禁用缓存,函数get_memory将被多次调用,但资源实例始终相同。这样的资源应被视为有状态的,而不考虑其缓存行为。

现在我们已经掌握了资源的概念,让我们来看看工作流中的可观测性和调试