Skip to content

分支与循环#

工作流的一个关键特性是支持分支和循环逻辑,这比基于图的方法更简单灵活。

工作流中的循环#

要创建循环,我们将沿用之前教程中的示例 MyWorkflow,并新增一个自定义事件类型。我们将其命名为 LoopEvent(当然可以使用任意名称)。

class LoopEvent(Event):
    loop_output: str

现在引入 import random 并修改 step_one 函数,使其随机决定是循环还是继续:

@step
async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
    if random.randint(0, 1) == 0:
        print("Bad thing happened")
        return LoopEvent(loop_output="Back to step one.")
    else:
        print("Good thing happened")
        return FirstEvent(first_output="First step complete.")

可视化效果如下:

简单循环

通过定义适当的事件类型和返回类型,您可以在任意步骤之间创建循环。

工作流中的分支#

与循环密切相关的是分支。正如您已看到的,可以条件性地返回不同事件。以下是一个分为两条路径的工作流示例:

class BranchA1Event(Event):
    payload: str


class BranchA2Event(Event):
    payload: str


class BranchB1Event(Event):
    payload: str


class BranchB2Event(Event):
    payload: str


class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")

    @step
    async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
        print(ev.payload)
        return BranchA2Event(payload=ev.payload)

    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print(ev.payload)
        return BranchB2Event(payload=ev.payload)

    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch A complete.")

    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch B complete.")

导入部分与之前相同,但我们新增了4个事件类型。start 步骤随机选择分支路径,随后各分支中的多个步骤完成工作流。可视化如下:

简单分支

您当然可以按任意顺序组合分支和循环来满足应用需求。本教程后续将介绍如何使用 send_event 并行运行多个分支,并通过 collect_events 实现同步。

接下来我们将学习如何使用 Context 维护状态