基于 NVIDIA NIM 的子问题查询引擎¶
子问题查询引擎能够将单个复杂问题分解为多个子问题,每个子问题可由不同工具进行解答。我们将利用 NVIDIA NIM 技术来驱动子问题生成和答案检索过程。
NVIDIA NIM 微服务¶
NIM 支持跨领域的模型,包括来自社区及 NVIDIA 自身的聊天、嵌入和重排序模型。这些模型经过 NVIDIA 优化,可在 NVIDIA 加速基础设施上实现最佳性能,并以 NIM 微服务形式部署——这是一种开箱即用的预构建容器,只需在 NVIDIA 加速基础设施上执行单一命令即可随处部署。
用户可通过 NVIDIA API 目录 测试 NVIDIA 托管的 NIM 微服务部署方案。测试完成后,企业可使用 NVIDIA AI Enterprise 许可证从 NVIDIA API 目录导出 NIM 微服务,并在本地或云端运行,从而完全掌控自身知识产权和 AI 应用的所有权。
NIM 微服务按模型进行容器镜像封装,通过 NVIDIA NGC 目录以 NGC 容器镜像形式分发。其核心价值在于为 AI 模型推理提供简单、统一且符合行业惯例的 API 接口。
环境配置¶
导入项目依赖项,并从NVIDIA API目录(https://build.nvidia.com)获取我们将使用的两个模型(嵌入模型和重排序模型)的NVIDIA API密钥。
开始前准备:
在托管NVIDIA AI基础模型的NVIDIA平台注册免费账户
选择您需要的模型
在输入区域选择Python标签页,点击
获取API密钥
,然后点击生成密钥
复制并保存生成的密钥为NVIDIA_API_KEY。完成此步骤后,您即可访问相关API端点
安装项目依赖:
- LlamaIndex核心库(基础功能)
- NVIDIA NIM LLM及嵌入模型(用于LLM操作)
llama-index-readers-file
(为SimpleDirectoryReader
中的PDF阅读器提供支持)
!pip install llama-index-core llama-index-llms-nvidia llama-index-embeddings-nvidia llama-index-readers-file llama-index-utils-workflow
引入依赖项作为导入:
import os, json
from llama_index.core import (
SimpleDirectoryReader,
VectorStoreIndex,
StorageContext,
load_index_from_storage,
Settings,
)
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)
from llama_index.core.agent.workflow import ReActAgent
from llama_index.llms.nvidia import NVIDIA
from llama_index.embeddings.nvidia import NVIDIAEmbedding
from llama_index.utils.workflow import draw_all_possible_flows
将子问题查询引擎定义为工作流¶
我们的 StartEvent 会进入
query()
方法,该方法负责处理以下事项:- 接收并存储原始查询
- 存储用于处理查询的 LLM
- 存储支持子问题查询的工具列表
- 将原始问题传递给 LLM,要求其将问题拆分为若干子问题
- 为每个生成的子问题触发
QueryEvent
事件
QueryEvents 会进入
sub_question()
方法,该方法会实例化一个新的 ReAct 代理(包含所有可用工具列表),并让其选择使用哪个工具- 这比 LlamaIndex 内置的 SQQE 稍好,因为后者无法使用多个工具
- 每个 QueryEvent 都会生成一个
AnswerEvent
AnswerEvents 会进入
combine_answers()
方法- 该方法使用
self.collect_events()
等待每个 QueryEvent 返回答案 - 将所有答案组合成最终提示,供 LLM 将其整合为单一响应
- 生成 StopEvent 以返回最终结果
- 该方法使用
class QueryEvent(Event):
question: str
class AnswerEvent(Event):
question: str
answer: str
class SubQuestionQueryEngine(Workflow):
@step
async def query(self, ctx: Context, ev: StartEvent) -> QueryEvent:
if hasattr(ev, "query"):
await ctx.store.set("original_query", ev.query)
print(f"Query is {await ctx.store.get('original_query')}")
if hasattr(ev, "llm"):
await ctx.store.set("llm", ev.llm)
if hasattr(ev, "tools"):
await ctx.store.set("tools", ev.tools)
response = (await ctx.store.get("llm")).complete(
f"""
Given a user question, and a list of tools, output a list of
relevant sub-questions, such that the answers to all the
sub-questions put together will answer the question. Respond
in pure JSON without any markdown, like this:
{{
"sub_questions": [
"What is the population of San Francisco?",
"What is the budget of San Francisco?",
"What is the GDP of San Francisco?"
]
}}
Here is the user question: {await ctx.store.get('original_query')}
And here is the list of tools: {await ctx.store.get('tools')}
"""
)
print(f"Sub-questions are {response}")
response_obj = json.loads(str(response))
sub_questions = response_obj["sub_questions"]
await ctx.store.set("sub_question_count", len(sub_questions))
for question in sub_questions:
self.send_event(QueryEvent(question=question))
return None
@step
async def sub_question(self, ctx: Context, ev: QueryEvent) -> AnswerEvent:
print(f"Sub-question is {ev.question}")
agent = ReActAgent(
tools=await ctx.store.get("tools"),
llm=await ctx.store.get("llm"),
)
response = await agent.run(ev.question)
return AnswerEvent(question=ev.question, answer=str(response))
@step
async def combine_answers(
self, ctx: Context, ev: AnswerEvent
) -> StopEvent | None:
ready = ctx.collect_events(
ev, [AnswerEvent] * await ctx.store.get("sub_question_count")
)
if ready is None:
return None
answers = "\n\n".join(
[
f"Question: {event.question}: \n Answer: {event.answer}"
for event in ready
]
)
prompt = f"""
You are given an overall question that has been split into sub-questions,
each of which has been answered. Combine the answers to all the sub-questions
into a single answer to the original question.
Original question: {await ctx.store.get('original_query')}
Sub-questions and answers:
{answers}
"""
print(f"Final prompt is {prompt}")
response = (await ctx.store.get("llm")).complete(prompt)
print("Final response is", response)
return StopEvent(result=str(response))
draw_all_possible_flows(
SubQuestionQueryEngine, filename="sub_question_query_engine.html"
)
可视化这一流程看起来相当线性,因为它并未体现 query()
可以生成多个并行的 QueryEvents
,这些事件最终会被收集到 combine_answers
中。
,然后将它们传入。
每个工具都是基于单个(非常冗长的)旧金山预算文档构建的独立查询引擎,每份文档都超过300页。为了避免重复运行时耗费时间,我们将生成的索引持久化存储到磁盘。
import getpass
if os.environ.get("NVIDIA_API_KEY", "").startswith("nvapi-"):
print("Valid NVIDIA_API_KEY already in environment. Delete to reset")
else:
nvapi_key = getpass.getpass("NVAPI Key (starts with nvapi-): ")
assert nvapi_key.startswith(
"nvapi-"
), f"{nvapi_key[:5]}... is not a valid key"
os.environ["NVIDIA_API_KEY"] = nvapi_key
folder = "./data/sf_budgets/"
files = os.listdir(folder)
Settings.embed_model = NVIDIAEmbedding(
model="nvidia/nv-embedqa-e5-v5", truncate="END"
)
Settings.llm = NVIDIA()
query_engine_tools = []
for file in files:
year = file.split(" - ")[0]
index_persist_path = f"./storage/budget-{year}/"
if os.path.exists(index_persist_path):
storage_context = StorageContext.from_defaults(
persist_dir=index_persist_path
)
index = load_index_from_storage(storage_context)
else:
documents = SimpleDirectoryReader(
input_files=[folder + file]
).load_data()
index = VectorStoreIndex.from_documents(documents)
index.storage_context.persist(index_persist_path)
engine = index.as_query_engine()
query_engine_tools.append(
QueryEngineTool(
query_engine=engine,
metadata=ToolMetadata(
name=f"budget_{year}",
description=f"You can ask this tool natural-language questions about San Francisco's budget in {year}",
),
)
)
engine = SubQuestionQueryEngine(timeout=120, verbose=True)
result = await engine.run(
llm=Settings.llm,
tools=query_engine_tools,
query="How has the total amount of San Francisco's budget changed from 2016 to 2023?",
)
print(result)
我们的调试输出内容非常详细!你可以看到子问题被生成的过程,然后反复调用 sub_question()
方法,每次调用都会生成一份简短的 ReAct 代理思考与行动日志来回答每个较小的子问题。
可以看到 combine_answers
会多次执行;这些执行是由每个 AnswerEvent
触发的,但在收集完所有 8 个 AnswerEvent
之前就会启动。在最后一次运行时,它会生成完整的提示,整合所有答案并返回最终结果。