流式事件处理#
工作流可能非常复杂——它们被设计用于处理具有分支和并发逻辑的复杂场景——这意味着完整执行可能需要较长时间。为了给用户提供良好的体验,您可以通过实时流式传输事件来展示进度。工作流在Context
对象中内置了对这一功能的支持。
要实现这个功能,首先引入所有必要的依赖项:
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context,
)
import asyncio
from llama_index.llms.openai import OpenAI
from llama_index.utils.workflow import draw_all_possible_flows
接下来为简单的三步工作流设置一些事件,并添加一个用于流式传输进度的事件:
class FirstEvent(Event):
first_output: str
class SecondEvent(Event):
second_output: str
response: str
class ProgressEvent(Event):
msg: str
然后定义一个会发送事件的工作流类:
class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
return FirstEvent(first_output="First step complete.")
@step
async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
llm = OpenAI(model="gpt-4o-mini")
generator = await llm.astream_complete(
"Please give me the first 3 paragraphs of Moby Dick, a book in the public domain."
)
async for response in generator:
# Allow the workflow to stream this piece of response
ctx.write_event_to_stream(ProgressEvent(msg=response.delta))
return SecondEvent(
second_output="Second step complete, full response attached",
response=str(response),
)
@step
async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
return StopEvent(result="Workflow complete.")
Tip
此处的OpenAI()
假设您已在环境变量中设置了OPENAI_API_KEY
。您也可以通过api_key
参数直接传入密钥。
在step_one
和step_three
中,我们向事件流写入独立事件。在step_two
中,我们使用astream_complete
生成LLM响应的可迭代生成器,然后为LLM返回的每个数据块(大约每个单词对应一个块)生成事件,最后将完整响应传递给step_three
。
要实际获取这些输出,我们需要异步运行工作流并监听事件,如下所示:
async def main():
w = MyWorkflow(timeout=30, verbose=True)
handler = w.run(first_input="Start the workflow.")
async for ev in handler.stream_events():
if isinstance(ev, ProgressEvent):
print(ev.msg)
final_result = await handler
print("Final result", final_result)
draw_all_possible_flows(MyWorkflow, filename="streaming_workflow.html")
if __name__ == "__main__":
asyncio.run(main())
run
方法会在后台运行工作流,而stream_events
将提供写入事件流的任何事件。当事件流传递StopEvent
后停止,之后您可以像往常一样获取工作流的最终结果。
接下来我们将探讨并发执行。