Skip to content

工作流的并发执行#

除了循环和分支之外,工作流还可以并发执行步骤。当您有多个可以独立运行的步骤,并且这些步骤包含需要await的耗时操作时,这种特性特别有用,它允许其他步骤并行运行。

触发多个事件#

在目前的示例中,我们每个步骤只触发一个事件。但很多情况下您会希望并行运行多个步骤。为此,您需要触发多个事件。可以使用send_event实现:

class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))

        return StopEvent(result=ev.query)

在这个示例中,start步骤触发了3个StepTwoEventstep_two步骤通过num_workers=4装饰器指定,表示该步骤最多可并发运行4个实例(这是默认值)。

收集事件#

执行前一个示例时,您会注意到工作流会在最先完成的查询处停止。有时这很有用,但有时您可能希望等待所有耗时操作完成后再继续下一步。可以使用collect_events实现:

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)

    @step
    async def step_three(
        self, ctx: Context, ev: StepThreeEvent
    ) -> StopEvent | None:
        # wait until we receive 3 events
        result = ctx.collect_events(ev, [StepThreeEvent] * 3)
        if result is None:
            return None

        # do something with all 3 results together
        print(result)
        return StopEvent(result="Done")

collect_events方法位于Context上,它接收触发步骤的事件和一个需要等待的事件类型数组。本例中我们等待3个相同StepThreeEvent类型的事件。

每次接收到StepThreeEvent时都会触发step_three步骤,但collect_events会返回None直到收集完所有3个事件。之后步骤会继续执行,您可以对所有3个结果进行统一处理。

collect_events返回的result是一个按接收顺序排列的事件数组。

多事件类型#

当然,您不必等待相同类型的事件。可以等待任意事件组合,如下例所示:

class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent | None:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        if (
            ctx.collect_events(
                ev,
                [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
            )
            is None
        ):
            return None

        # do something with all 3 results together
        return StopEvent(result="Done")

我们做了几处修改来处理多事件类型:

  • start现在声明为触发3种不同事件类型
  • step_three现在声明为接受3种不同事件类型
  • collect_events现在接收一个需要等待的事件类型数组

注意传递给collect_events的数组中的事件类型顺序很重要。事件将按照传递给collect_events的顺序返回,而非接收顺序。

这个工作流的可视化效果相当直观:

并发工作流

接下来我们将了解如何通过子类化和其他技术扩展工作流。