Skip to content

可观测性#

调试是应用开发中不可或缺的环节,Workflows 提供了多种调试方式。

可视化调试#

最简单的调试方式是可视化,本教程已多次使用该方法。您可随时运行以下代码查看工作流可视化:

from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(MyWorkflow, filename="some_filename.html")

这将生成交互式可视化文件some_filename.html,可在任意浏览器中查看。

并发工作流示意图

详细模式#

运行工作流时可添加verbose=True参数,将输出每个步骤的执行名称及其返回的事件类型。以下使用教程前一阶段的ConcurrentWorkflow示例:

class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        if (
            ctx.collect_events(
                ev,
                [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
            )
            is None
        ):
            return None

        # do something with all 3 results together
        return StopEvent(result="Done")

启用详细模式的运行方式:

w = ConcurrentFlow(timeout=10, verbose=True)
result = await w.run()

输出示例如下:

执行步骤 start
步骤 start 未生成事件
执行步骤 step_a
执行A类操作
步骤 step_a 生成事件 StepACompleteEvent
执行步骤 step_b
执行B类操作
步骤 step_b 生成事件 StepBCompleteEvent
执行步骤 step_c
执行C类操作
步骤 step_c 生成事件 StepCCompleteEvent
执行步骤 step_three
收到事件  Query 1
步骤 step_three 未生成事件
执行步骤 step_three
收到事件  Query 2
步骤 step_three 未生成事件
执行步骤 step_three
收到事件  Query 3
步骤 step_three 生成事件 StopEvent

分步执行#

在Notebook环境中,逐步执行工作流很有帮助。可通过调用handler对象的run_step实现:

w = ConcurrentFlow(timeout=10, verbose=True)
handler = w.run(stepwise=True)

# 每次调用`run_step`,工作流将推进并返回上一步生成的所有事件
# 这些事件需手动传播以继续工作流(使用:=运算符赋值给produced_events)
while produced_events := await handler.run_step():
    # 执行至此表示至少有一个待传播事件
    # 通过send_event方法传播
    for ev in produced_events:
        handler.ctx.send_event(ev)

# 执行至此表示工作流已完成
# 可获取最终结果
result = await handler

可多次调用run_step实现单步调试。

最近执行可视化#

分步执行或分支工作流后,可通过draw_most_recent_execution仅可视化刚执行的步骤:

from llama_index.utils.workflow import draw_most_recent_execution

draw_most_recent_execution(w, filename="last_execution.html")

注意此处传入的是工作流实例w而非类名。

检查点机制#

完整工作流执行可能耗时较长,通常只需调试观察特定步骤。为加速开发周期,WorkflowCheckpointer对象会封装工作流并在每个步骤完成后创建存储Checkpoint。这些检查点可查看、调试并作为后续执行的起点。

from llama_index.core.workflow.checkpointer import WorkflowCheckpointer

w = ConcurrentFlow()
w_ckptr = WorkflowCheckpointer(workflow=w)

# 通过检查点器运行工作流
handler = w_cptr.run()
await handler

# 查看最近运行的检查点
w_ckptr.checkpoints[handler.run_id]

# 从指定检查点恢复运行
ckpt = w_ckptr.checkpoints[handler.run_id][0]
handler = w_ckptr.run_from(checkpoint=ckpt)
await handler

第三方工具#

也可使用我们支持的第三方可视化调试工具,如Arize

Arize流程图

最后说明#

本教程最后将介绍使用非绑定函数替代类定义工作流的替代语法。