相同事件并行执行示例¶
在本示例中,我们将演示如何利用工作流功能实现相似能力,同时允许并行执行多个同类型事件。
通过在@step
装饰器中设置num_workers
参数,我们可以控制同时执行的步骤数量,从而实现高效的并行处理。
# %pip install llama-index-core llama-index-utils-workflow -q
导入所需库¶
安装完依赖项后,我们可以导入所需的库:
import asyncio
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)
我们将创建两个工作流:一个通过使用 @step(num_workers=N)
装饰器实现多数据项并行处理,另一个不设置 num_workers 作为对比项。
通过在 @step
装饰器中使用 num_workers
参数,我们可以限制同时执行的步骤数量,从而控制并行处理级别。这种方法特别适合需要处理相似任务同时管理资源使用的场景。
例如,您可以一次性执行多个子查询,但请注意 num_workers 不能无限制设置,具体数值取决于您的工作负载或令牌限制。
定义事件类型¶
我们将定义两种事件类型:一种用于待处理的输入事件,另一种用于处理结果:
class ProcessEvent(Event):
data: str
class ResultEvent(Event):
result: str
创建顺序与并行工作流¶
现在,我们将创建一个包含三个主要步骤的 SequentialWorkflow(顺序工作流)和 ParallelWorkflow(并行工作流)类:
- start:初始化并发送多个并行事件
- process_data:处理数据
- combine_results:收集并合并所有处理结果
import random
class SequentialWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent:
data_list = ["A", "B", "C"]
await ctx.store.set("num_to_collect", len(data_list))
for item in data_list:
ctx.send_event(ProcessEvent(data=item))
return None
@step(num_workers=1)
async def process_data(self, ev: ProcessEvent) -> ResultEvent:
# Simulate some time-consuming processing
processing_time = 2 + random.random()
await asyncio.sleep(processing_time)
result = f"Processed: {ev.data}"
print(f"Completed processing: {ev.data}")
return ResultEvent(result=result)
@step
async def combine_results(
self, ctx: Context, ev: ResultEvent
) -> StopEvent | None:
num_to_collect = await ctx.store.get("num_to_collect")
results = ctx.collect_events(ev, [ResultEvent] * num_to_collect)
if results is None:
return None
combined_result = ", ".join([event.result for event in results])
return StopEvent(result=combined_result)
class ParallelWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent:
data_list = ["A", "B", "C"]
await ctx.store.set("num_to_collect", len(data_list))
for item in data_list:
ctx.send_event(ProcessEvent(data=item))
return None
@step(num_workers=3)
async def process_data(self, ev: ProcessEvent) -> ResultEvent:
# Simulate some time-consuming processing
processing_time = 2 + random.random()
await asyncio.sleep(processing_time)
result = f"Processed: {ev.data}"
print(f"Completed processing: {ev.data}")
return ResultEvent(result=result)
@step
async def combine_results(
self, ctx: Context, ev: ResultEvent
) -> StopEvent | None:
num_to_collect = await ctx.store.get("num_to_collect")
results = ctx.collect_events(ev, [ResultEvent] * num_to_collect)
if results is None:
return None
combined_result = ", ".join([event.result for event in results])
return StopEvent(result=combined_result)
在这两种工作流中:
- start 方法负责初始化并发送多个 ProcessEvent。
- process_data 方法:
- 在 SequentialWorkflow 中仅使用
@step
装饰器 - 在 ParallelWorkflow 中使用
@step(num_workers=3)
装饰器将同时执行的工作线程数限制为 3 个
- 在 SequentialWorkflow 中仅使用
- combine_results 方法收集所有处理结果并进行合并
运行工作流¶
最后,我们可以创建一个主函数来运行工作流:
import time
sequential_workflow = SequentialWorkflow()
print(
"Start a sequential workflow without setting num_workers in the step of process_data"
)
start_time = time.time()
result = await sequential_workflow.run()
end_time = time.time()
print(f"Workflow result: {result}")
print(f"Time taken: {end_time - start_time} seconds")
print("-" * 30)
parallel_workflow = ParallelWorkflow()
print(
"Start a parallel workflow with setting num_workers in the step of process_data"
)
start_time = time.time()
result = await parallel_workflow.run()
end_time = time.time()
print(f"Workflow result: {result}")
print(f"Time taken: {end_time - start_time} seconds")
Start a sequential workflow without setting num_workers in the step of process_data Completed processing: A Completed processing: B Completed processing: C Workflow result: Processed: A, Processed: B, Processed: C Time taken: 7.439495086669922 seconds ------------------------------ Start a parallel workflow with setting num_workers in the step of process_data Completed processing: C Completed processing: A Completed processing: B Workflow result: Processed: C, Processed: A, Processed: B Time taken: 2.5881590843200684 seconds
注意事项¶
- 若不设置
num_workers=1
,总耗时可能达到 6-9 秒。通过设置num_workers=3
,处理过程将并行执行,每次同时处理 3 个项目,总耗时仅需 2-3 秒。 - 在 ParallelWorkflow 中,完成结果的顺序可能与输入顺序不一致,具体取决于各任务的完成时间。
本示例演示了使用与不使用 num_workers
时的执行速度差异,以及如何在工作流中实现并行处理。通过设置 num_workers
参数,我们可以控制并行度,这对于需要平衡性能与资源占用的场景非常实用。
检查点¶
对上述定义的并行执行工作流(Workflow)进行检查点(Checkpointing)操作也是可行的。为此,我们需要将 Workflow
封装在 WorkflowCheckpointer
对象中,并通过这些实例执行运行。在工作流执行过程中,检查点会存储在这个封装对象中,可用于状态检查以及作为后续运行的起始点。
from llama_index.core.workflow.checkpointer import WorkflowCheckpointer
wflow_ckptr = WorkflowCheckpointer(workflow=parallel_workflow)
handler = wflow_ckptr.run()
await handler
Completed processing: C Completed processing: A Completed processing: B
'Processed: C, Processed: A, Processed: B'
上述运行过程的检查点存储在 WorkflowCheckpointer.checkpoints
字典属性中。
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']
我们可以从任意存储的检查点开始运行,使用 WorkflowCheckpointer.run_from(checkpoint=...)
方法。以首次完成"process_data"后存储的第一个检查点为例,从该检查点开始运行。
ckpt = wflow_ckptr.checkpoints[run_id][0]
handler = wflow_ckptr.run_from(ckpt)
await handler
Completed processing: B Completed processing: A
'Processed: C, Processed: B, Processed: A'
调用 run_from
或 run
方法会在 checkpoints
属性中创建新的运行记录。在指定检查点的最新运行中,我们可以看到仅剩两个"process_data"步骤和最终的"combine_results"步骤待完成。
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results'] Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results']
现在,如果我们使用与同一初始运行的第二次"process_data"完成相关联的检查点作为起点,那么应该会看到一个新条目,其中仅包含两个步骤:"process_data"和"combine_results"。
# get the run_id of the first initial run
first_run_id = next(iter(wflow_ckptr.checkpoints.keys()))
first_run_id
'90812bec-b571-4513-8ad5-aa957ad7d4fb'
ckpt = wflow_ckptr.checkpoints[first_run_id][
1
] # checkpoint after the second "process_data" step
handler = wflow_ckptr.run_from(ckpt)
await handler
Completed processing: B
'Processed: C, Processed: A, Processed: B'
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results'] Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results'] Run: e4f94fcd-9b78-4e28-8981-e0232d068f6e has ['process_data', 'combine_results']
同样地,如果我们从初始运行的第三次"process_data"完成的检查点开始,那么应该只能看到最后的"combine_results"步骤。
ckpt = wflow_ckptr.checkpoints[first_run_id][
2
] # checkpoint after the third "process_data" step
handler = wflow_ckptr.run_from(ckpt)
await handler
'Processed: C, Processed: A, Processed: B'
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results'] Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results'] Run: e4f94fcd-9b78-4e28-8981-e0232d068f6e has ['process_data', 'combine_results'] Run: c498a1a0-cf4c-4d80-a1e2-a175bb90b66d has ['combine_results']