基于 Weaviate QueryAgent 的多智能体工作流¶
在本示例中,我们将构建一个LlamaIndex智能体工作流,最终形成一个多智能体系统,旨在实现以下功能的文档助手:
- 向Weaviate中的"LlamaIndexDocs"集合写入新内容
- 向Weaviate中的"WeaviateDocs"集合写入新内容
- 使用Weaviate的
QueryAgent
基于这些集合内容回答问题
QueryAgent
是Weaviate提供的完整智能体产品,能够执行常规搜索以及对授权集合进行聚合操作。我们的"协调器"智能体将决定何时调用Weaviate QueryAgent,而将创建Weaviate特定搜索查询的工作交由它处理。
所需准备:
- OpenAI API密钥(或切换至其他提供商并调整下方代码)
- Weaviate沙盒环境(免费)
- 您的Weaviate沙盒URL和API密钥
安装与导入依赖项¶
!pip install llama-index-core llama-index-utils-workflow weaviate-client[agents] llama-index-llms-openai llama-index-readers-web
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context,
)
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.readers.web import SimpleWebPageReader
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import FunctionAgent
from enum import Enum
from pydantic import BaseModel, Field
from llama_index.llms.openai import OpenAI
from typing import List, Union
import json
import weaviate
from weaviate.auth import Auth
from weaviate.agents.query import QueryAgent
from weaviate.classes.config import Configure, Property, DataType
import os
from getpass import getpass
配置 Weaviate¶
要使用 Weaviate 查询代理,首先需要创建 Weaviate Cloud 账户👇
- 创建 Serverless Weaviate Cloud 账户 并设置免费的 Sandbox
- 进入「Embedding」选项并启用该功能,默认情况下系统将使用
Snowflake/snowflake-arctic-embed-l-v2.0
作为嵌入模型 - 记录下
WEAVIATE_URL
和WEAVIATE_API_KEY
以便后续连接集群
提示:我们推荐使用 Weaviate Embeddings 功能,这样您就无需为外部嵌入服务提供商提供额外密钥。
if "WEAVIATE_API_KEY" not in os.environ:
os.environ["WEAVIATE_API_KEY"] = getpass("Add Weaviate API Key")
if "WEAVIATE_URL" not in os.environ:
os.environ["WEAVIATE_URL"] = getpass("Add Weaviate URL")
client = weaviate.connect_to_weaviate_cloud(
cluster_url=os.environ.get("WEAVIATE_URL"),
auth_credentials=Auth.api_key(os.environ.get("WEAVIATE_API_KEY")),
)
创建 WeaviateDocs 和 LlamaIndexDocs 集合¶
以下辅助函数将在 Weaviate 中创建 "WeaviateDocs" 和 "LlamaIndexDocs" 集合(如果它们尚不存在)。同时会配置一个能访问这两个集合的 QueryAgent
。
Weaviate 的 QueryAgent
专为查询 Weviate 集合而设计,既能执行常规搜索也能进行聚合操作,并内部处理了创建 Weaviate 特定查询的复杂性。
该代理在构建查询时会使用集合描述以及属性描述信息。
def fresh_setup_weaviate(client):
if client.collections.exists("WeaviateDocs"):
client.collections.delete("WeaviateDocs")
client.collections.create(
"WeaviateDocs",
description="A dataset with the contents of Weaviate technical Docs and website",
vectorizer_config=Configure.Vectorizer.text2vec_weaviate(),
properties=[
Property(
name="url",
data_type=DataType.TEXT,
description="the source URL of the webpage",
),
Property(
name="text",
data_type=DataType.TEXT,
description="the content of the webpage",
),
],
)
if client.collections.exists("LlamaIndexDocs"):
client.collections.delete("LlamaIndexDocs")
client.collections.create(
"LlamaIndexDocs",
description="A dataset with the contents of LlamaIndex technical Docs and website",
vectorizer_config=Configure.Vectorizer.text2vec_weaviate(),
properties=[
Property(
name="url",
data_type=DataType.TEXT,
description="the source URL of the webpage",
),
Property(
name="text",
data_type=DataType.TEXT,
description="the content of the webpage",
),
],
)
agent = QueryAgent(
client=client, collections=["LlamaIndexDocs", "WeaviateDocs"]
)
return agent
将网页内容写入集合¶
以下辅助函数使用 SimpleWebPageReader
将网页内容写入对应的 Weaviate 集合
def write_webpages_to_weaviate(client, urls: list[str], collection_name: str):
documents = SimpleWebPageReader(html_to_text=True).load_data(urls)
collection = client.collections.get(collection_name)
with collection.batch.dynamic() as batch:
for doc in documents:
batch.add_object(properties={"url": doc.id_, "text": doc.text})
创建函数调用代理¶
现在我们已具备写入集合的相关函数以及现成的 QueryAgent
,可以开始使用 FunctionAgent
—— 这是一个简单的工具调用代理。
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("openai-key")
weaviate_agent = fresh_setup_weaviate(client)
llm = OpenAI(model="gpt-4o-mini")
def write_to_weaviate_collection(urls=list[str]):
"""Useful for writing new content to the WeaviateDocs collection"""
write_webpages_to_weaviate(client, urls, "WeaviateDocs")
def write_to_li_collection(urls=list[str]):
"""Useful for writing new content to the LlamaIndexDocs collection"""
write_webpages_to_weaviate(client, urls, "LlamaIndexDocs")
def query_agent(query: str) -> str:
"""Useful for asking questions about Weaviate and LlamaIndex"""
response = weaviate_agent.run(query)
return response.final_answer
agent = FunctionAgent(
tools=[write_to_weaviate_collection, write_to_li_collection, query_agent],
llm=llm,
system_prompt="""You are a helpful assistant that can write the
contents of urls to WeaviateDocs and LlamaIndexDocs collections,
as well as forwarding questions to a QueryAgent""",
)
response = await agent.run(
user_msg="Can you save https://docs.llamaindex.ai/en/stable/examples/agent/agent_workflow_basic/"
)
print(str(response))
response = await agent.run(
user_msg="""What are llama index workflows? And can you save
these to weaviate docs: https://weaviate.io/blog/what-are-agentic-workflows
and https://weaviate.io/blog/ai-agents"""
)
print(str(response))
Llama Index workflows refer to orchestrations involving one or more AI agents within the LlamaIndex framework. These workflows manage complex tasks dynamically by leveraging components such as large language models (LLMs), tools, and memory states. Key features of Llama Index workflows include: - Support for single or multiple agents managed within an AgentWorkflow orchestrator. - Ability to maintain state across runs via serializable context objects. - Integration of external tools with type annotations, including asynchronous functions. - Streaming of intermediate outputs and event-based interactions. - Human-in-the-loop capabilities to confirm or guide agent actions during workflow execution. These workflows enable agents to execute sequences of operations, call external tools asynchronously, maintain conversation or task states, stream partial results, and incorporate human inputs when necessary. They embody dynamic, agent-driven sequences of task decomposition, tool use, and reflection, allowing AI systems to plan, act, and improve iteratively toward specific goals. I have also saved the contents from the provided URLs to the WeaviateDocs collection.
response = await agent.run(
user_msg="How many docs do I have in the weaviate and llamaindex collections in total?"
)
print(str(response))
You have a total of 2 documents in the WeaviateDocs collection and 1 document in the LlamaIndexDocs collection. In total, that makes 3 documents across both collections.
weaviate_agent = fresh_setup_weaviate(client)
创建带分支的工作流¶
class EvaluateQuery(Event):
query: str
class WriteLlamaIndexDocsEvent(Event):
urls: list[str]
class WriteWeaviateDocsEvent(Event):
urls: list[str]
class QueryAgentEvent(Event):
query: str
简单示例:一个分支工作流(尚未实现功能)¶
class DocsAssistantWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> EvaluateQuery:
return EvaluateQuery(query=ev.query)
@step
async def evaluate_query(
self, ctx: Context, ev: EvaluateQuery
) -> QueryAgentEvent | WriteLlamaIndexDocsEvent | WriteWeaviateDocsEvent | StopEvent:
if ev.query == "llama":
return WriteLlamaIndexDocsEvent(urls=[ev.query])
if ev.query == "weaviate":
return WriteWeaviateDocsEvent(urls=[ev.query])
if ev.query == "question":
return QueryAgentEvent(query=ev.query)
return StopEvent()
@step
async def write_li_docs(
self, ctx: Context, ev: WriteLlamaIndexDocsEvent
) -> StopEvent:
print(f"Got a request to write something to LlamaIndexDocs")
return StopEvent()
@step
async def write_weaviate_docs(
self, ctx: Context, ev: WriteWeaviateDocsEvent
) -> StopEvent:
print(f"Got a request to write something to WeaviateDocs")
return StopEvent()
@step
async def query_agent(
self, ctx: Context, ev: QueryAgentEvent
) -> StopEvent:
print(f"Got a request to forward a query to the QueryAgent")
return StopEvent()
workflow_that_does_nothing = DocsAssistantWorkflow()
# draw_all_possible_flows(workflow_that_does_nothing)
print(
await workflow_that_does_nothing.run(start_event=StartEvent(query="llama"))
)
Got a request to write something to LlamaIndexDocs None
结构化输出查询分类¶
class SaveToLlamaIndexDocs(BaseModel):
"""The URLs to parse and save into a llama-index specific docs collection."""
llama_index_urls: List[str] = Field(default_factory=list)
class SaveToWeaviateDocs(BaseModel):
"""The URLs to parse and save into a weaviate specific docs collection."""
weaviate_urls: List[str] = Field(default_factory=list)
class Ask(BaseModel):
"""The natural language questions that can be asked to a Q&A agent."""
queries: List[str] = Field(default_factory=list)
class Actions(BaseModel):
"""Actions to take based on the latest user message."""
actions: List[
Union[SaveToLlamaIndexDocs, SaveToWeaviateDocs, Ask]
] = Field(default_factory=list)
创建工作流¶
让我们创建一个目前仍不执行任何操作的工作流,但传入的用户查询将被转换为我们的结构体。根据该结构体的内容,工作流将决定运行哪个步骤。
注意观察:无论哪个步骤首先运行,都会返回一个 StopEvent
... 这虽然可行,但或许我们后续可以优化这一点!
from llama_index.llms.openai import OpenAIResponses
class DocsAssistantWorkflow(Workflow):
def __init__(self, *args, **kwargs):
self.llm = OpenAIResponses(model="gpt-4.1-mini")
self.system_prompt = """You are a docs assistant. You evaluate incoming queries and break them down to subqueries when needed.
You decide on the next best course of action. Overall, here are the options:
- You can write the contents of a URL to llamaindex docs (if it's a llamaindex url)
- You can write the contents of a URL to weaviate docs (if it's a weaviate url)
- You can answer a question about llamaindex and weaviate using the QueryAgent"""
super().__init__(*args, **kwargs)
@step
async def start(self, ev: StartEvent) -> EvaluateQuery:
return EvaluateQuery(query=ev.query)
@step
async def evaluate_query(
self, ev: EvaluateQuery
) -> QueryAgentEvent | WriteLlamaIndexDocsEvent | WriteWeaviateDocsEvent:
sllm = self.llm.as_structured_llm(Actions)
response = await sllm.achat(
[
ChatMessage(role="system", content=self.system_prompt),
ChatMessage(role="user", content=ev.query),
]
)
actions = response.raw.actions
print(actions)
for action in actions:
if isinstance(action, SaveToLlamaIndexDocs):
return WriteLlamaIndexDocsEvent(urls=action.llama_index_urls)
elif isinstance(action, SaveToWeaviateDocs):
return WriteWeaviateDocsEvent(urls=action.weaviate_urls)
elif isinstance(action, Ask):
for query in action.queries:
return QueryAgentEvent(query=query)
@step
async def write_li_docs(self, ev: WriteLlamaIndexDocsEvent) -> StopEvent:
print(f"Writing {ev.urls} to LlamaIndex Docs")
return StopEvent()
@step
async def write_weaviate_docs(
self, ev: WriteWeaviateDocsEvent
) -> StopEvent:
print(f"Writing {ev.urls} to Weaviate Docs")
return StopEvent()
@step
async def query_agent(self, ev: QueryAgentEvent) -> StopEvent:
print(f"Sending `'{ev.query}`' to agent")
return StopEvent()
everything_docs_agent_beta = DocsAssistantWorkflow()
async def run_docs_agent_beta(query: str):
print(
await everything_docs_agent_beta.run(
start_event=StartEvent(query=query)
)
)
await run_docs_agent_beta(
"""Can you save https://www.llamaindex.ai/blog/get-citations-and-reasoning-for-extracted-data-in-llamaextract
and https://www.llamaindex.ai/blog/llamaparse-update-may-2025-new-models-skew-detection-and-more??"""
)
[SaveToLlamaIndexDocs(llama_index_urls=['https://www.llamaindex.ai/blog/get-citations-and-reasoning-for-extracted-data-in-llamaextract', 'https://www.llamaindex.ai/blog/llamaparse-update-may-2025-new-models-skew-detection-and-more'])] Writing ['https://www.llamaindex.ai/blog/get-citations-and-reasoning-for-extracted-data-in-llamaextract', 'https://www.llamaindex.ai/blog/llamaparse-update-may-2025-new-models-skew-detection-and-more'] to LlamaIndex Docs None
await run_docs_agent_beta(
"How many documents do we have in the LlamaIndexDocs collection now?"
)
[Ask(queries=['How many documents are in the LlamaIndexDocs collection?'])] Sending `'How many documents are in the LlamaIndexDocs collection?`' to agent None
await run_docs_agent_beta("What are LlamaIndex workflows?")
[Ask(queries=['What are LlamaIndex workflows?'])] Sending `'What are LlamaIndex workflows?`' to agent None
await run_docs_agent_beta(
"Can you save https://weaviate.io/blog/graph-rag and https://weaviate.io/blog/genai-apps-with-weaviate-and-databricks??"
)
[SaveToWeaviateDocs(weaviate_urls=['https://weaviate.io/blog/graph-rag', 'https://weaviate.io/blog/genai-apps-with-weaviate-and-databricks'])] Writing ['https://weaviate.io/blog/graph-rag', 'https://weaviate.io/blog/genai-apps-with-weaviate-and-databricks'] to Weaviate Docs None
运行多分支流程并整合执行¶
在这些场景中,运行多个分支流程是合理的解决方案。这意味着单个步骤可以同时触发多个事件!我们可以通过上下文使用 send_event
方法实现 👇
class ActionCompleted(Event):
result: str
class DocsAssistantWorkflow(Workflow):
def __init__(self, *args, **kwargs):
self.llm = OpenAIResponses(model="gpt-4.1-mini")
self.system_prompt = """You are a docs assistant. You evaluate incoming queries and break them down to subqueries when needed.
You decide on the next best course of action. Overall, here are the options:
- You can write the contents of a URL to llamaindex docs (if it's a llamaindex url)
- You can write the contents of a URL to weaviate docs (if it's a weaviate url)
- You can answer a question about llamaindex and weaviate using the QueryAgent"""
super().__init__(*args, **kwargs)
@step
async def start(self, ctx: Context, ev: StartEvent) -> EvaluateQuery:
return EvaluateQuery(query=ev.query)
@step
async def evaluate_query(
self, ctx: Context, ev: EvaluateQuery
) -> QueryAgentEvent | WriteLlamaIndexDocsEvent | WriteWeaviateDocsEvent | None:
await ctx.store.set("results", [])
sllm = self.llm.as_structured_llm(Actions)
response = await sllm.achat(
[
ChatMessage(role="system", content=self.system_prompt),
ChatMessage(role="user", content=ev.query),
]
)
actions = response.raw.actions
await ctx.store.set("num_events", len(actions))
await ctx.store.set("results", [])
print(actions)
for action in actions:
if isinstance(action, SaveToLlamaIndexDocs):
ctx.send_event(
WriteLlamaIndexDocsEvent(urls=action.llama_index_urls)
)
elif isinstance(action, SaveToWeaviateDocs):
ctx.send_event(
WriteWeaviateDocsEvent(urls=action.weaviate_urls)
)
elif isinstance(action, Ask):
for query in action.queries:
ctx.send_event(QueryAgentEvent(query=query))
@step
async def write_li_docs(
self, ctx: Context, ev: WriteLlamaIndexDocsEvent
) -> ActionCompleted:
print(f"Writing {ev.urls} to LlamaIndex Docs")
write_webpages_to_weaviate(
client, urls=ev.urls, collection_name="LlamaIndexDocs"
)
results = await ctx.store.get("results")
results.append(f"Wrote {ev.urls} it LlamaIndex Docs")
return ActionCompleted(result=f"Writing {ev.urls} to LlamaIndex Docs")
@step
async def write_weaviate_docs(
self, ctx: Context, ev: WriteWeaviateDocsEvent
) -> ActionCompleted:
print(f"Writing {ev.urls} to Weaviate Docs")
write_webpages_to_weaviate(
client, urls=ev.urls, collection_name="WeaviateDocs"
)
results = await ctx.store.get("results")
results.append(f"Wrote {ev.urls} it Weavite Docs")
return ActionCompleted(result=f"Writing {ev.urls} to Weaviate Docs")
@step
async def query_agent(
self, ctx: Context, ev: QueryAgentEvent
) -> ActionCompleted:
print(f"Sending {ev.query} to agent")
response = weaviate_agent.run(ev.query)
results = await ctx.store.get("results")
results.append(f"QueryAgent responded with:\n {response.final_answer}")
return ActionCompleted(result=f"Sending `'{ev.query}`' to agent")
@step
async def collect(
self, ctx: Context, ev: ActionCompleted
) -> StopEvent | None:
num_events = await ctx.store.get("num_events")
evs = ctx.collect_events(ev, [ActionCompleted] * num_events)
if evs is None:
return None
return StopEvent(result=[ev.result for ev in evs])
everything_docs_agent = DocsAssistantWorkflow(timeout=None)
async def run_docs_agent(query: str):
handler = everything_docs_agent.run(start_event=StartEvent(query=query))
result = await handler
for response in await handler.ctx.store.get("results"):
print(response)
await run_docs_agent(
"Can you save https://docs.llamaindex.ai/en/stable/understanding/workflows/ and https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/"
)
[SaveToLlamaIndexDocs(llama_index_urls=['https://docs.llamaindex.ai/en/stable/understanding/workflows/']), SaveToLlamaIndexDocs(llama_index_urls=['https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/'])] Writing ['https://docs.llamaindex.ai/en/stable/understanding/workflows/'] to LlamaIndex Docs Writing ['https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/'] to LlamaIndex Docs Wrote ['https://docs.llamaindex.ai/en/stable/understanding/workflows/'] it LlamaIndex Docs Wrote ['https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/'] it LlamaIndex Docs
await run_docs_agent(
"How many documents do we have in the LlamaIndexDocs collection now?"
)
[Ask(queries=['How many documents are in the LlamaIndexDocs collection?'])] Sending How many documents are in the LlamaIndexDocs collection? to agent QueryAgent responded with: The LlamaIndexDocs collection contains 2 documents, specifically related to workflows and branches and loops within the documentation.
await run_docs_agent(
"What are LlamaIndex workflows? And can you save https://weaviate.io/blog/graph-rag"
)
[Ask(queries=['What are LlamaIndex workflows?'])] Sending What are LlamaIndex workflows? to agent QueryAgent responded with: LlamaIndex workflows are an event-driven, step-based framework designed to control and manage the execution flow of complex applications, particularly those involving generative AI. They break an application into discrete Steps, each triggered by Events and capable of emitting further Events, allowing for complex logic involving loops, branches, and parallel execution. In a LlamaIndex workflow, steps perform functions ranging from simple tasks to complex agents, with inputs and outputs communicated via Events. This event-driven model facilitates maintainability and clarity, overcoming limitations of previous approaches like directed acyclic graphs (DAGs) which struggled with complex flows involving loops and branching. Key features include: - **Loops:** Steps can return events that loop back to previous steps to enable iterative processes. - **Branches:** Workflows can branch into different paths based on conditions, allowing for multiple distinct sequences of steps. - **Parallelism:** Multiple branches or steps can run concurrently and synchronize their results. - **State Maintenance:** Workflows support maintaining state and context throughout execution. - **Observability and Debugging:** Supported by various components and callbacks for monitoring. An example workflow might involve judging whether a query is of sufficient quality, looping to improve it if not, then concurrently executing different retrieval-augmented generation (RAG) strategies, and finally judging their responses to produce a single output. Workflows are especially useful as applications grow in complexity, enabling developers to organize and control intricate AI logic more naturally and efficiently than traditional graph-based methods. For simpler pipelines, LlamaIndex suggests using workflows optionally, but for advanced agentic applications, workflows provide a flexible and powerful control abstraction.
await run_docs_agent("How do I use loops in llamaindex workflows?")
[Ask(queries=['How to use loops in llamaindex workflows'])] Sending How to use loops in llamaindex workflows to agent QueryAgent responded with: In LlamaIndex workflows, loops are implemented using an event-driven approach where you define custom event types and steps that emit events to control the workflow's execution flow. To create a loop, you define a custom event (e.g., `LoopEvent`) and a workflow step that can return either the event continuing the loop or another event to proceed. For example, a workflow step might randomly decide to either loop back (emit `LoopEvent` again) or continue to a next step emitting a different event. This allows creating flexible looping behaviors where any step can loop back to any other step by returning the corresponding event instances. The approach leverages Python's async functions decorated with `@step`, which process events and return the next event(s), enabling both loops and conditional branching in workflows. Thus, loops in LlamaIndex workflows are event-based, using custom event types and the return of events from steps to signal iterations until a condition is met. Example: ```python from llamaindex.workflow import Workflow, Event, StartEvent, StopEvent, step import random class LoopEvent(Event): loop_output: str class FirstEvent(Event): first_output: str class MyWorkflow(Workflow): @step async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent: if random.randint(0, 1) == 0: print("Bad thing happened") return LoopEvent(loop_output="Back to step one.") else: print("Good thing happened") return FirstEvent(first_output="First step complete.") # ... other steps ... # Running this workflow will cause step_one to loop randomly until it proceeds. ``` You can combine loops with branching and parallel execution in workflows to build complex control flows. For detailed guidance and examples, consult the LlamaIndex documentation under "Branches and Loops" and the "Workflows" guides.