分支与循环#
工作流的一个关键特性是支持分支和循环逻辑,这比基于图的方法更简单灵活。
工作流中的循环#
要创建循环,我们将沿用之前教程中的示例 MyWorkflow
,并新增一个自定义事件类型。我们将其命名为 LoopEvent
(当然可以使用任意名称)。
class LoopEvent(Event):
loop_output: str
现在引入 import random
并修改 step_one
函数,使其随机决定是循环还是继续:
@step
async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
if random.randint(0, 1) == 0:
print("Bad thing happened")
return LoopEvent(loop_output="Back to step one.")
else:
print("Good thing happened")
return FirstEvent(first_output="First step complete.")
可视化效果如下:
通过定义适当的事件类型和返回类型,您可以在任意步骤之间创建循环。
工作流中的分支#
与循环密切相关的是分支。正如您已看到的,可以条件性地返回不同事件。以下是一个分为两条路径的工作流示例:
class BranchA1Event(Event):
payload: str
class BranchA2Event(Event):
payload: str
class BranchB1Event(Event):
payload: str
class BranchB2Event(Event):
payload: str
class BranchWorkflow(Workflow):
@step
async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
if random.randint(0, 1) == 0:
print("Go to branch A")
return BranchA1Event(payload="Branch A")
else:
print("Go to branch B")
return BranchB1Event(payload="Branch B")
@step
async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
print(ev.payload)
return BranchA2Event(payload=ev.payload)
@step
async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
print(ev.payload)
return BranchB2Event(payload=ev.payload)
@step
async def step_a2(self, ev: BranchA2Event) -> StopEvent:
print(ev.payload)
return StopEvent(result="Branch A complete.")
@step
async def step_b2(self, ev: BranchB2Event) -> StopEvent:
print(ev.payload)
return StopEvent(result="Branch B complete.")
导入部分与之前相同,但我们新增了4个事件类型。start
步骤随机选择分支路径,随后各分支中的多个步骤完成工作流。可视化如下:
您当然可以按任意顺序组合分支和循环来满足应用需求。本教程后续将介绍如何使用 send_event
并行运行多个分支,并通过 collect_events
实现同步。
接下来我们将学习如何使用 Context 维护状态。