工作流操作手册:全面解析工作流的所有功能¶
首先,我们安装所需的依赖项。Core 模块包含了大部分基础功能;OpenAI 用于处理大语言模型访问,而 utils-workflow 则提供了我们后续将用到的可视化能力。
!pip install --upgrade llama-index-core llama-index-llms-openai llama-index-utils-workflow
然后引入我们刚刚安装的依赖项
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
Context,
)
import random
from llama_index.core.workflow import draw_all_possible_flows
from llama_index.utils.workflow import draw_most_recent_execution
from llama_index.llms.openai import OpenAI
设置我们的 OpenAI 密钥,以便进行实际的 LLM 操作。
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
工作流基础¶
让我们从最基础的工作流开始:它只需启动、执行单一任务然后停止。如果您的任务如此简单,确实没有使用工作流的必要,但我们只是借此演示其运作原理。
from llama_index.llms.openai import OpenAI
class OpenAIGenerator(Workflow):
@step
async def generate(self, ev: StartEvent) -> StopEvent:
llm = OpenAI(model="gpt-4o")
response = await llm.acomplete(ev.query)
return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)
LlamaIndex, formerly known as GPT Index, is a data framework designed to facilitate the connection between large language models (LLMs) and external data sources. It provides tools to index various data types, such as documents, databases, and APIs, enabling LLMs to interact with and retrieve information from these sources more effectively. The framework supports the creation of indices that can be queried by LLMs, enhancing their ability to access and utilize external data in a structured manner. This capability is particularly useful for applications requiring the integration of LLMs with specific datasets or knowledge bases.
关于 Workflows 的一个巧妙之处在于我们可以使用 pyvis 来可视化它们。让我们看看这个非常简单的流程会呈现怎样的效果。
draw_all_possible_flows(OpenAIGenerator, filename="trivial_workflow.html")
:
error: str
class QueryEvent(Event):
query: str
class LoopExampleFlow(Workflow):
@step
async def answer_query(
self, ev: StartEvent | QueryEvent
) -> FailedEvent | StopEvent:
query = ev.query
# try to answer the query
random_number = random.randint(0, 1)
if random_number == 0:
return FailedEvent(error="Failed to answer the query.")
else:
return StopEvent(result="The answer to your query")
@step
async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
# improve the query or decide it can't be fixed
random_number = random.randint(0, 1)
if random_number == 0:
return QueryEvent(query="Here's a better query.")
else:
return StopEvent(result="Your query can't be fixed.")
我们在此使用随机数来模拟LLM(大型语言模型)的行为,以便获得稳定且有趣的交互效果。
answer_query()
函数接收一个起始事件(start event),随后可执行两种操作:
- 直接回答查询并发出
StopEvent
事件,此时返回结果 - 判定查询无效并发出
FailedEvent
事件
improve_query()
函数接收 FailedEvent
事件,同样有两种处理方式:
- 判定查询无法优化时发出
StopEvent
事件,此时返回失败状态 - 生成更优化的查询时发出
QueryEvent
事件,从而形成循环并重新触发answer_query()
我们还可以将这个更复杂的工作流程可视化:
draw_all_possible_flows(LoopExampleFlow, filename="loop_workflow.html")
loop_workflow.html
我们在此设置了 verbose=True
以便清晰查看触发的具体事件。您可以看到它直观地演示了循环执行与后续响应的过程。
l = LoopExampleFlow(timeout=10, verbose=True)
result = await l.run(query="What's LlamaIndex?")
print(result)
Running step answer_query Step answer_query produced event FailedEvent Running step improve_query Step improve_query produced event StopEvent Your query can't be fixed.
在事件之间维护状态¶
存在一个全局状态,允许您保留任意数据或函数,供所有事件处理程序使用。
class GlobalExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
# load our data here
await ctx.store.set("some_database", ["value1", "value2", "value3"])
return QueryEvent(query=ev.query)
@step
async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
# use our data with our query
data = await ctx.store.get("some_database")
result = f"The answer to your query is {data[1]}"
return StopEvent(result=result)
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="What's LlamaIndex?")
print(result)
Running step setup Step setup produced event QueryEvent Running step query Step query produced event StopEvent The answer to your query is value2
当然,这个流程本质上仍然是线性的。更现实的例子是,如果您的起始事件可能是查询或数据填充事件,并且您需要等待。让我们设置这个场景来看看它的样子:
class WaitExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "data"):
await ctx.store.set("data", ev.data)
return StopEvent(result=None)
@step
async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "query"):
# do we have any data?
if hasattr(self, "data"):
data = await ctx.store.get("data")
return StopEvent(result=f"Got the data {data}")
else:
# there's non data yet
return None
else:
# this isn't a query
return None
w = WaitExampleFlow(verbose=True)
result = await w.run(query="Can I kick it?")
if result is None:
print("No you can't")
print("---")
result = await w.run(data="Yes you can")
print("---")
result = await w.run(query="Can I kick it?")
print(result)
Running step query Step query produced no event Running step setup Step setup produced event StopEvent No you can't --- Running step query Step query produced no event Running step setup Step setup produced event StopEvent --- Running step query Step query produced event StopEvent Running step setup Step setup produced event StopEvent Got the data Yes you can
让我们可视化这个流程的工作原理:
draw_all_possible_flows(WaitExampleFlow, filename="wait_workflow.html")
wait_workflow.html

。该函数会捕获事件并存储它们,在所需的所有事件收集完成前返回 None
。这些事件将按照指定顺序附加到 collect_events
的输出中。让我们通过示例来观察其运作方式:
class InputEvent(Event):
input: str
class SetupEvent(Event):
error: bool
class QueryEvent(Event):
query: str
class CollectExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:
# generically start everything up
if not hasattr(self, "setup") or not self.setup:
self.setup = True
print("I got set up")
return SetupEvent(error=False)
@step
async def collect_input(self, ev: StartEvent) -> InputEvent:
if hasattr(ev, "input"):
# perhaps validate the input
print("I got some input")
return InputEvent(input=ev.input)
@step
async def parse_query(self, ev: StartEvent) -> QueryEvent:
if hasattr(ev, "query"):
# parse the query in some way
print("I got a query")
return QueryEvent(query=ev.query)
@step
async def run_query(
self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent
) -> StopEvent | None:
ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])
if ready is None:
print("Not enough events yet")
return None
# run the query
print("Now I have all the events")
print(ready)
result = f"Ran query '{ready[0].query}' on input '{ready[1].input}'"
return StopEvent(result=result)
c = CollectExampleFlow()
result = await c.run(input="Here's some input", query="Here's my question")
print(result)
I got some input I got a query Not enough events yet Not enough events yet Now I have all the events [QueryEvent(query="Here's my question"), InputEvent(input="Here's some input"), SetupEvent(error=False)] Ran query 'Here's my question' on input 'Here's some input'
你可以观察到每个事件的触发过程,以及集合事件在收集到足够数量前会持续返回 None
。下面我们用流程图来展示这一过程:
draw_all_possible_flows(CollectExampleFlow, "collect_workflow.html")
collect_workflow.html
# 专业Markdown文档翻译机器人
## 任务说明
将英文Markdown文档翻译为自然、准确的中文,严格保留原始格式和结构。
## 翻译规则
1. **保留所有Markdown语法**:包括标题、列表、链接、图片、代码块、表格、引用等,不得修改层级或结构。
2. **标题符号严格保留**:不调整标题等级,即使原文层级不合理。
3. **代码块完全保留**:禁止翻译或改动代码块(```)和行内代码(`code`)内容。
4. **URL和路径不翻译**:图片链接、文件名等保持原样。
5. **正文专业通顺**:避免机械翻译,确保语言自然流畅。
6. **仅输出翻译结果**:不添加额外解释或空行。
7. **保持结构完整性**:即使输入不完整,也不重组段落或列表。
## 待翻译内容
![示例截图](