工作流的并发执行#
除了循环和分支之外,工作流还可以并发执行步骤。当您有多个可以独立运行的步骤,并且这些步骤包含需要await
的耗时操作时,这种特性特别有用,它允许其他步骤并行运行。
触发多个事件#
在目前的示例中,我们每个步骤只触发一个事件。但很多情况下您会希望并行运行多个步骤。为此,您需要触发多个事件。可以使用send_event
实现:
class ParallelFlow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None:
ctx.send_event(StepTwoEvent(query="Query 1"))
ctx.send_event(StepTwoEvent(query="Query 2"))
ctx.send_event(StepTwoEvent(query="Query 3"))
@step(num_workers=4)
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
print("Running slow query ", ev.query)
await asyncio.sleep(random.randint(1, 5))
return StopEvent(result=ev.query)
在这个示例中,start
步骤触发了3个StepTwoEvent
。step_two
步骤通过num_workers=4
装饰器指定,表示该步骤最多可并发运行4个实例(这是默认值)。
收集事件#
执行前一个示例时,您会注意到工作流会在最先完成的查询处停止。有时这很有用,但有时您可能希望等待所有耗时操作完成后再继续下一步。可以使用collect_events
实现:
class ConcurrentFlow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None:
ctx.send_event(StepTwoEvent(query="Query 1"))
ctx.send_event(StepTwoEvent(query="Query 2"))
ctx.send_event(StepTwoEvent(query="Query 3"))
@step(num_workers=4)
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
print("Running query ", ev.query)
await asyncio.sleep(random.randint(1, 5))
return StepThreeEvent(result=ev.query)
@step
async def step_three(
self, ctx: Context, ev: StepThreeEvent
) -> StopEvent | None:
# wait until we receive 3 events
result = ctx.collect_events(ev, [StepThreeEvent] * 3)
if result is None:
return None
# do something with all 3 results together
print(result)
return StopEvent(result="Done")
collect_events
方法位于Context
上,它接收触发步骤的事件和一个需要等待的事件类型数组。本例中我们等待3个相同StepThreeEvent
类型的事件。
每次接收到StepThreeEvent
时都会触发step_three
步骤,但collect_events
会返回None
直到收集完所有3个事件。之后步骤会继续执行,您可以对所有3个结果进行统一处理。
collect_events
返回的result
是一个按接收顺序排列的事件数组。
多事件类型#
当然,您不必等待相同类型的事件。可以等待任意事件组合,如下例所示:
class ConcurrentFlow(Workflow):
@step
async def start(
self, ctx: Context, ev: StartEvent
) -> StepAEvent | StepBEvent | StepCEvent | None:
ctx.send_event(StepAEvent(query="Query 1"))
ctx.send_event(StepBEvent(query="Query 2"))
ctx.send_event(StepCEvent(query="Query 3"))
@step
async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
print("Doing something A-ish")
return StepACompleteEvent(result=ev.query)
@step
async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
print("Doing something B-ish")
return StepBCompleteEvent(result=ev.query)
@step
async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
print("Doing something C-ish")
return StepCCompleteEvent(result=ev.query)
@step
async def step_three(
self,
ctx: Context,
ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
) -> StopEvent:
print("Received event ", ev.result)
# wait until we receive 3 events
if (
ctx.collect_events(
ev,
[StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
)
is None
):
return None
# do something with all 3 results together
return StopEvent(result="Done")
我们做了几处修改来处理多事件类型:
start
现在声明为触发3种不同事件类型step_three
现在声明为接受3种不同事件类型collect_events
现在接收一个需要等待的事件类型数组
注意传递给collect_events
的数组中的事件类型顺序很重要。事件将按照传递给collect_events
的顺序返回,而非接收顺序。
这个工作流的可视化效果相当直观:
接下来我们将了解如何通过子类化和其他技术扩展工作流。