工作流#
LlamaIndex 中的 Workflow
是一种事件驱动的抽象机制,用于将多个事件串联起来。工作流由多个 步骤
组成,每个步骤负责处理特定类型的事件并触发新事件。
LlamaIndex 中的 Workflow
通过使用 @step
装饰器修饰函数来工作。该装饰器用于推断每个工作流的输入和输出类型以进行验证,并确保每个步骤仅在接收到可接受的事件时运行。
你可以创建 Workflow
来完成任何任务!构建智能体、RAG流程、提取流程或任何你想要的流程。
工作流还具备自动监控功能,因此你可以使用 Arize Pheonix 等工具观察每个步骤的运行情况。(注意: 监控功能仅适用于采用新版监控系统的集成方案,具体效果可能有所差异。)
Tip
工作流将异步操作视为一等公民,本文档假设你正在异步环境中运行代码。这意味着你需要正确设置异步代码环境。如果你已经在 FastAPI 服务器或 Notebook 中运行,可以直接使用 await 语法!
如果是自行编写的 Python 脚本,最佳实践是设置单一的异步入口点:
async def main():
w = MyWorkflow(...)
result = await w.run(...)
print(result)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
快速入门#
我们通过一个简单的示例来说明:首先生成一个笑话,然后对其进行点评。
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
# 若未安装请执行 `pip install llama-index-llms-openai`
from llama_index.llms.openai import OpenAI
class JokeEvent(Event):
joke: str
class JokeFlow(Workflow):
llm = OpenAI()
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"Write your best joke about {topic}."
response = await self.llm.acomplete(prompt)
return JokeEvent(joke=str(response))
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
response = await self.llm.acomplete(prompt)
return StopEvent(result=str(response))
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))
这个示例包含多个组件,让我们逐步解析。
定义工作流事件#
class JokeEvent(Event):
joke: str
事件是用户定义的 pydantic 对象。你可以控制其属性和辅助方法。在本例中,我们的工作流依赖一个用户定义事件 JokeEvent
。
创建工作流类#
class JokeFlow(Workflow):
llm = OpenAI(model="gpt-4o-mini")
...
通过继承 Workflow
类实现工作流。为简化示例,我们附加了一个静态的 OpenAI
大语言模型实例。
工作流入门点#
class JokeFlow(Workflow):
...
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"Write your best joke about {topic}."
response = await self.llm.acomplete(prompt)
return JokeEvent(joke=str(response))
...
这里定义了工作流的入口点。虽然大多数事件由用户定义,但框架提供了两个特殊事件:StartEvent
和 StopEvent
。StartEvent
表示工作流的初始输入位置。
StartEvent
比较特殊,因为它可以包含任意属性。我们通过 ev.topic
访问主题,如果属性不存在会引发错误。也可以使用 ev.get("topic")
来避免属性不存在时的错误。
你可能注意到我们并未显式指定每个步骤处理的事件类型。实际上,@step
装饰器会自动推断每个步骤的输入输出类型,并在运行前验证工作流的有效性!
工作流出口点#
class JokeFlow(Workflow):
...
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
response = await self.llm.acomplete(prompt)
return StopEvent(result=str(response))
...
这是工作流的第二个(也是最后一个)步骤。当工作流遇到返回的 StopEvent
时,会立即停止并返回 result
参数中的内容。
本例中结果是字符串,但也可以是字典、列表或其他对象。
运行工作流#
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))
最后创建并运行工作流。可以通过超时(秒)和详细模式等设置辅助调试。
.run()
是异步方法,因此使用 await 等待结果。传递给 run()
的关键字参数会成为 StartEvent
的字段,自动触发工作流。如示例所示,topic
可通过步骤中的 ev.topic
访问。
自定义入口和出口点#
大多数情况下,使用[快速入门]部分介绍的默认入口和出口点就足够了。但工作流也支持自定义事件替代 StartEvent
和 StopEvent
。
使用自定义 StartEvent
#
调用工作流实例的 run()
方法时,关键字参数会自动转换为 StartEvent
实例的字段。当需要传递复杂数据启动工作流时,这种方式可能变得繁琐,此时可以引入自定义启动事件。
要使用自定义启动事件,首先需要创建继承自 StartEvent
的类:
from pathlib import Path
from llama_index.core.workflow import StartEvent
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex
from llama_index.llms.openai import OpenAI
class MyCustomStartEvent(StartEvent):
a_string_field: str
a_path_to_somewhere: Path
an_index: LlamaCloudIndex
an_llm: OpenAI
接下来只需在作为入口点的步骤中使用 MyCustomStartEvent
作为事件类型。以下是一个复杂步骤示例:
class JokeFlow(Workflow):
...
@step
async def generate_joke_from_index(
self, ev: MyCustomStartEvent
) -> JokeEvent:
# 使用启动事件中的索引和LLM构建查询引擎
query_engine = ev.an_index.as_query_engine(llm=ev.an_llm)
topic = query_engine.query(
f"What is the closest topic to {a_string_field}"
)
# 使用启动事件中的LLM生成提示
prompt = f"Write your best joke about {topic}."
response = await ev.an_llm.acomplete(prompt)
# 使用事件中的Path对象将响应写入磁盘
ev.a_path_to_somewhere.write_text(str(response))
# 最后传递JokeEvent
return JokeEvent(joke=str(response))
虽然仍可通过关键字参数传递 MyCustomStartEvent
的字段,但更简洁的方式是通过 start_event
参数传递事件实例:
custom_start_event = MyCustomStartEvent(...)
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(start_event=custom_start_event)
print(str(result))
这种方式使代码更清晰明确,并支持IDE的自动补全功能。
使用自定义 StopEvent
#
与 StartEvent
类似,大多数情况下内置的 StopEvent
都能正常工作,但并非总是如此。实际上,当我们使用 StopEvent
时,工作流的结果必须被赋值到事件实例的 result
字段。由于结果可以是任意 Python 对象,StopEvent
的 result
字段被类型标注为 Any
,从而失去了类型系统的优势。此外,返回多个对象会变得很麻烦:我们通常会将一堆不相关的对象塞进字典,然后赋值给 StopEvent.result
。
支持自定义停止事件的第一步是创建 StopEvent
的子类:
from llama_index.core.workflow import StopEvent
class MyStopEvent(StopEvent):
critique: CompletionResponse
现在我们可以在工作流中用 MyStopEvent
替换 StopEvent
:
class JokeFlow(Workflow):
...
@step
async def critique_joke(self, ev: JokeEvent) -> MyStopEvent:
joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
response = await self.llm.acomplete(prompt)
return MyStopEvent(response)
...
使用自定义停止事件时需要记住的重要一点是,工作流运行的结果将是该事件的实例:
w = JokeFlow(timeout=60, verbose=False)
# 注意!`result` 现在包含的是 MyStopEvent 的实例!
result = await w.run(topic="pirates")
# 现在可以像访问普通事件字段一样访问
print(result.critique.text)
这种方法充分利用了 Python 的类型系统,对 IDE 的自动补全友好,并且允许外部应用程序进行内省,从而确切知道工作流运行会返回什么。
绘制工作流图#
工作流可以通过步骤定义中的类型注解进行可视化。你可以绘制所有可能的路径,或者最近一次执行路径以帮助调试。
首先安装:
pip install llama-index-utils-workflow
然后导入并使用:
from llama_index.utils.workflow import (
draw_all_possible_flows,
draw_most_recent_execution,
)
# 绘制所有路径
draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")
# 绘制执行路径
w = JokeFlow()
await w.run(topic="Pirates")
draw_most_recent_execution(w, filename="joke_flow_recent.html")
使用全局上下文/状态#
你可以选择在步骤之间使用全局上下文。例如,多个步骤可能需要访问用户输入的原始 query
。你可以将其存储在全局上下文中,以便每个步骤都能访问。
from llama_index.core.workflow import Context
@step
async def query(self, ctx: Context, ev: MyEvent) -> StopEvent:
# 从上下文中获取
query = await ctx.store.get("query")
# 使用上下文和事件进行操作
val = ...
result = ...
# 存储到上下文
await ctx.store.set("key", val)
return StopEvent(result=result)
添加类型化状态#
通常,你会希望为工作流使用预设的状态结构。最佳方式是使用 Pydantic
模型来定义状态。这样你可以:
注意: 你应该使用所有字段都有默认值的 Pydantic 模型。这样 Context
对象可以自动用默认值初始化状态。
以下是一个快速示例,展示如何利用工作流 + Pydantic 来使用这些功能:
from pydantic import BaseModel, Field, field_validator, field_serializer
from typing import Union
# 这是一个我们想在状态中使用的随机对象
class MyRandomObject:
def __init__(self, name: str = "default"):
self.name = name
# 这是我们的状态模型
# 注意:所有字段必须有默认值
class MyState(BaseModel):
model_config = {"arbitrary_types_allowed": True}
my_obj: MyRandomObject = Field(default_factory=MyRandomObject)
some_key: str = Field(default="some_value")
# 这是可选的,但如果你想控制状态的序列化会很有用!
@field_serializer("my_obj", when_used="always")
def serialize_my_obj(self, my_obj: MyRandomObject) -> str:
return my_obj.name
@field_validator("my_obj", mode="before")
@classmethod
def deserialize_my_obj(
cls, v: Union[str, MyRandomObject]
) -> MyRandomObject:
if isinstance(v, MyRandomObject):
return v
if isinstance(v, str):
return MyRandomObject(v)
raise ValueError(f"Invalid type for my_obj: {type(v)}")
然后,只需用状态模型标注你的工作流状态:
from llama_index.core.workflow import (
Context,
StartEvent,
StopEvent,
Workflow,
step,
)
class MyWorkflow(Workflow):
@step
async def start(self, ctx: Context[MyState], ev: StartEvent) -> StopEvent:
# 直接返回 MyState
state = await ctx.store.get_state()
state.my_obj.name = "new_name"
await ctx.store.set_state(state)
# 如果需要也可以直接访问字段
name = await ctx.store.get("my_obj.name")
await ctx.store.set("my_obj.name", "newer_name")
return StopEvent(result="Done!")
等待多个事件#
上下文不仅用于保存数据,还提供了缓冲和等待多个事件的工具。
例如,你可能有一个步骤需要等待查询和检索到的节点后才能合成响应:
from llama_index.core import get_response_synthesizer
@step
async def synthesize(
self, ctx: Context, ev: QueryEvent | RetrieveEvent
) -> StopEvent | None:
data = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])
# 检查是否可以运行
if data is None:
return None
# 解包——数据按顺序返回
query_event, retrieve_event = data
# 运行响应合成
synthesizer = get_response_synthesizer()
response = synthesizer.synthesize(
query_event.query, nodes=retrieve_event.nodes
)
return StopEvent(result=response)
使用 ctx.collect_events()
可以缓冲并等待所有预期事件到达。该函数只有在所有事件都到达后才会返回数据(按请求的顺序)。
手动触发事件#
通常,事件是通过在步骤中返回另一个事件来触发的。然而,事件也可以通过在工作流中使用 ctx.send_event(event)
方法手动分发。
以下是一个简短的示例,展示如何使用:
from llama_index.core.workflow import step, Context, Event, Workflow
class MyEvent(Event):
pass
class MyEventResult(Event):
result: str
class GatherEvent(Event):
pass
class MyWorkflow(Workflow):
@step
async def dispatch_step(
self, ctx: Context, ev: StartEvent
) -> MyEvent | GatherEvent:
ctx.send_event(MyEvent())
ctx.send_event(MyEvent())
return GatherEvent()
@step
async def handle_my_event(self, ev: MyEvent) -> MyEventResult:
return MyEventResult(result="result")
@step
async def gather(
self, ctx: Context, ev: GatherEvent | MyEventResult
) -> StopEvent | None:
# 等待事件完成
events = ctx.collect_events(ev, [MyEventResult, MyEventResult])
if not events:
return None
return StopEvent(result=events)
流式事件#
你也可以在事件到达时进行迭代。这对于流式传输、显示进度或调试非常有用。处理程序对象会发出使用 ctx.write_event_to_stream()
显式写入流的事件:
class ProgressEvent(Event):
msg: str
class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
return FirstEvent(first_output="First step complete.")
然后可以像这样接收事件:
w = MyWorkflow(...)
handler = w.run(topic="Pirates")
async for event in handler.stream_events():
print(event)
result = await handler
失败时重试步骤执行#
步骤执行失败可能导致整个工作流失败,但通常错误是预期的,可以安全地重试。例如,由于网络暂时拥堵导致 HTTP 请求超时,或者外部 API 调用触发了速率限制。
对于所有希望步骤重试的情况,可以使用“重试策略”。重试策略是一个对象,指示工作流多次执行一个步骤,并规定在重试前需要等待多长时间。策略会考虑自第一次失败以来经过的时间、连续失败的次数以及最后一次发生的错误。
要为特定步骤设置策略,只需将策略对象传递给 @step
装饰器:
from llama_index.core.workflow.retry_policy import ConstantDelayRetryPolicy
class MyWorkflow(Workflow):
# ...更多工作流定义...
# 此策略将在失败时每 5 秒重试此步骤,最多 10 次
@step(retry_policy=ConstantDelayRetryPolicy(delay=5, maximum_attempts=10))
async def flaky_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
result = flaky_call() # 可能会抛出异常
return StopEvent(result=result)
你可以查看 API 文档 了解框架中可用策略的详细描述。如果找不到适合你用例的策略,可以轻松编写自定义策略。自定义策略的唯一要求是编写一个符合 RetryPolicy
协议的 Python 类。换句话说,你的自定义策略类必须有一个方法,其签名如下:
def next(
self, elapsed_time: float, attempts: int, error: Exception
) -> Optional[float]:
...
例如,这是一个只在周五才重试步骤的策略:
from datetime import datetime
class RetryOnFridayPolicy:
def next(
self, elapsed_time: float, attempts: int, error: Exception
) -> Optional[float]:
if datetime.today().strftime("%A") == "Friday":
# 5 秒后重试
return 5
# 告诉工作流我们不希望重试
return None
人在回路#
由于工作流非常灵活,实现人在回路模式有很多可能的方式。
最简单的实现方式是在事件流中使用 InputRequiredEvent
和 HumanResponseEvent
事件。
from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent
class HumanInTheLoopWorkflow(Workflow):
@step
async def step1(self, ev: StartEvent) -> InputRequiredEvent:
return InputRequiredEvent(prefix="Enter a number: ")
@step
async def step2(self, ev: HumanResponseEvent) -> StopEvent:
return StopEvent(result=ev.response)
# 工作流应该支持流式传输
workflow = HumanInTheLoopWorkflow()
handler = workflow.run()
async for event in handler.stream_events():
if isinstance(event, InputRequiredEvent):
# 在这里,我们可以以任何方式处理人工输入
# 这意味着使用 input()、websockets、访问异步状态等
# 这里我们只用 input()
response = input(event.prefix)
handler.ctx.send_event(HumanResponseEvent(response=response))
final_result = await handler
在这里,工作流将等待直到 HumanResponseEvent
被发出。
注意,你也可以跳出循环,稍后再恢复。这对于暂停工作流等待人工响应,但稍后继续工作流非常有用。
handler = workflow.run()
async for event in handler.stream_events():
if isinstance(event, InputRequiredEvent):
break
# 现在处理人工响应
response = input(event.prefix)
handler.ctx.send_event(HumanResponseEvent(response=response))
# 现在恢复工作流流式传输
async for event in handler.stream_events():
continue
final_result = await handler
逐步执行#
工作流内置了逐步执行的工具,允许你控制执行并在过程中调试状态。
# 创建工作流,与往常一样
workflow = JokeFlow()
# 获取处理程序。传递 `stepwise=True` 会阻塞执行,等待手动干预
handler = workflow.run(stepwise=True)
# 每次调用 `run_step`,工作流会推进并返回上一步中产生的所有事件
# 这些事件需要手动传播以使工作流继续(我们用 := 操作符将它们赋值给 `produced_events`)。
while produced_events := await handler.run_step():
# 如果执行到这里,意味着至少有一个事件需要传播,
# 我们用 `send_event` 来做
for ev in produced_events:
handler.ctx.send_event(ev)
# 如果执行到这里,意味着工作流执行完成,
# 现在可以访问最终结果。
result = await handler
装饰非类函数#
您也可以在不继承工作流类的情况下,通过装饰器为其附加步骤。
以下是之前展示的 JokeFlow
示例,但采用非继承方式实现。
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.llms.openai import OpenAI
class JokeEvent(Event):
joke: str
joke_flow = Workflow(timeout=60, verbose=True)
@step(workflow=joke_flow)
async def generate_joke(ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"Write your best joke about {topic}."
llm = OpenAI()
response = await llm.acomplete(prompt)
return JokeEvent(joke=str(response))
@step(workflow=joke_flow)
async def critique_joke(ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = (
f"Give a thorough analysis and critique of the following joke: {joke}"
)
response = await llm.acomplete(prompt)
return StopEvent(result=str(response))
跨运行保持上下文#
如您所见,工作流包含一个 Context
对象,可用于在步骤间保持状态。
若需在多次工作流运行间保持状态,可将前次上下文传入 .run()
方法。
handler = w.run()
result = await handler
# 继续下一次运行
handler = w.run(ctx=handler.ctx)
result = await handler
资源管理#
资源是可注入到工作流步骤中的外部依赖项。
以下示例展示了工作流中的 memory
资源:
from llama_index.core.workflow.resource import Resource
from llama_index.core.memory import Memory
def get_memory(*args, **kwargs):
return Memory.from_defaults("user_id_123", token_limit=60000)
class SecondEvent(Event):
msg: str
class WorkflowWithResource(Workflow):
@step
async def first_step(
self,
ev: StartEvent,
memory: Annotated[Memory, Resource(get_memory)],
) -> SecondEvent:
print("Memory before step 1", memory)
await memory.aput(
ChatMessage(role="user", content="This is the first step")
)
print("Memory after step 1", memory)
return SecondEvent(msg="This is an input for step 2")
@step
async def second_step(
self, ev: SecondEvent, memory: Annotated[Memory, Resource(get_memory)]
) -> StopEvent:
print("Memory before step 2", memory)
await memory.aput(ChatMessage(role="user", content=ev.msg))
print("Memory after step 2", memory)
return StopEvent(result="Messages put into memory")
要向工作流步骤注入资源,需在步骤签名中添加参数并使用 Annotated
类型标注,通过 Resource()
包装器传入返回实际资源对象的函数。包装函数的返回类型必须与声明类型匹配,确保执行时期望值与提供值的一致性。上例中 memory: Annotated[Memory, Resource(get_memory)
定义了一个 Memory
类型资源,该资源将由 get_memory()
函数提供,并在工作流运行时通过 memory
参数传递给步骤。
资源在工作流步骤间共享,Resource()
包装器仅会调用工厂函数一次。若需禁用此行为,向 Resource()
传入 cache=False
将为不同步骤注入不同资源对象,此时工厂函数会被多次调用。
工作流检查点#
通过 WorfklowCheckpointer
对象,可使工作流在每步完成后创建并存储检查点。这些检查点可作为后续运行的起点,该特性在工作流开发和调试阶段非常实用。
from llama_index.core.workflow import WorkflowCheckpointer
w = JokeFlow(...)
w_cptr = WorkflowCheckpointer(workflow=w)
# 创建检查点运行
handler = w_cptr.run(topic="Pirates")
await handler
# 查看该运行的存储检查点
w_cptr.checkpoints[handler.run_id]
# 从检查点继续运行
ckpt = w_cptr.checkpoints[handler.run_id][0]
handler = w_cptr.run_from(topic="Ships", checkpoint=ckpt)
await handler
部署工作流#
您可以通过 llama_deploy (仓库) 将工作流部署为多智能体服务。每个智能体服务通过控制平面协调,并通过消息队列通信。支持本地部署或 Kubernetes 部署。
示例#
为了帮助您更好地理解工作流概念及其特性,LlamaIndex 文档提供了可运行的示例笔记本供您实践学习:
- 常见工作流模式 通过简单工作流演示循环和状态管理等常见使用模式,通常是最佳入门选择。
- RAG + 重排序 展示如何用相对简单的工作流实现同时包含数据摄取和查询的真实用例。
- 引用查询引擎 类似 RAG + 重排序,但重点演示在检索和生成之间如何实现中间步骤,是学习在工作流中使用
Context
对象的优秀范例。 - 修正型 RAG 在 RAG 工作流基础上增加了复杂度,展示如何在评估步骤后查询网络搜索引擎。
- 并发执行 解释如何管理工作流步骤的并行执行,这对处理日益复杂的工作流至关重要。
RAG 应用易于理解且是学习工作流基础的绝佳途径,但涉及工具调用、记忆和路由等更复杂的智能体场景才是工作流真正擅长的领域。
以下示例重点展示部分此类用例:
- ReAct 智能体 显然是演示工作流中工具实现的完美范例。
- 函数调用智能体 展示了如何在复杂场景(如函数调用)中保持工作流简洁,巧妙运用 LlamaIndex 框架原语。
- CodeAct 智能体 演示如何从零创建 CodeAct 智能体。
- 人机交互:故事创作 强力展示了工作流运行如何实现交互式和有状态特性(本例中用于收集人类输入)。
- 可靠结构化生成 演示如何在工作流中实现循环(本例通过反思改进结构化输出)。
- 工作流查询规划 展示通过分解查询为子项并执行的工作流,重点说明如何流式传输工作流事件、并行执行步骤及循环直到条件满足。
- 工作流检查点 更详尽地演示如何充分利用
WorkflowCheckpointer
为工作流运行创建检查点。
最后,以下高级用例展示了当您需要快速实现原型(例如文献中的概念)时,工作流如何大显身手: