支持异步/并行执行的查询管道¶
这里我们将展示具备异步+并行执行能力的查询管道。
我们通过搭建一个实现以下功能的 RAG 管道来演示:
- 将查询同时发送到多个 RAG 查询引擎
- 合并返回结果
在此过程中,我们还将展示一些优雅的结果合并抽象方案(例如我们的 ArgPackComponent())
加载数据¶
以保罗·格雷厄姆的文章为例进行加载。
In [ ]:
Copied!
%pip install llama-index-llms-openai
%pip install llama-index-llms-openai
In [ ]:
Copied!
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt' -O pg_essay.txt
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt' -O pg_essay.txt
--2024-01-10 12:31:00-- https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.110.133, 185.199.108.133, ... Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 75042 (73K) [text/plain] Saving to: ‘pg_essay.txt’ pg_essay.txt 100%[===================>] 73.28K --.-KB/s in 0.01s 2024-01-10 12:31:00 (6.32 MB/s) - ‘pg_essay.txt’ saved [75042/75042]
In [ ]:
Copied!
from llama_index.core import SimpleDirectoryReader
reader = SimpleDirectoryReader(input_files=["pg_essay.txt"])
documents = reader.load_data()
from llama_index.core import SimpleDirectoryReader
reader = SimpleDirectoryReader(input_files=["pg_essay.txt"])
documents = reader.load_data()
配置查询管道¶
我们建立了一个并行查询管道,可同时执行多种分块尺寸的查询并合并结果。
In [ ]:
Copied!
from llama_index.core.query_pipeline import (
QueryPipeline,
InputComponent,
ArgPackComponent,
)
from typing import Dict, Any, List, Optional
from llama_index.core.llama_pack import BaseLlamaPack
from llama_index.core.llms import LLM
from llama_index.llms.openai import OpenAI
from llama_index.core import Document, VectorStoreIndex
from llama_index.core.response_synthesizers import TreeSummarize
from llama_index.core.schema import NodeWithScore, TextNode
from llama_index.core.node_parser import SentenceSplitter
llm = OpenAI(model="gpt-3.5-turbo")
chunk_sizes = [128, 256, 512, 1024]
query_engines = {}
for chunk_size in chunk_sizes:
splitter = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=0)
nodes = splitter.get_nodes_from_documents(documents)
vector_index = VectorStoreIndex(nodes)
query_engines[str(chunk_size)] = vector_index.as_query_engine(llm=llm)
from llama_index.core.query_pipeline import (
QueryPipeline,
InputComponent,
ArgPackComponent,
)
from typing import Dict, Any, List, Optional
from llama_index.core.llama_pack import BaseLlamaPack
from llama_index.core.llms import LLM
from llama_index.llms.openai import OpenAI
from llama_index.core import Document, VectorStoreIndex
from llama_index.core.response_synthesizers import TreeSummarize
from llama_index.core.schema import NodeWithScore, TextNode
from llama_index.core.node_parser import SentenceSplitter
llm = OpenAI(model="gpt-3.5-turbo")
chunk_sizes = [128, 256, 512, 1024]
query_engines = {}
for chunk_size in chunk_sizes:
splitter = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=0)
nodes = splitter.get_nodes_from_documents(documents)
vector_index = VectorStoreIndex(nodes)
query_engines[str(chunk_size)] = vector_index.as_query_engine(llm=llm)
构建查询管道¶
将输入连接至多个查询引擎,并合并结果。
In [ ]:
Copied!
# construct query pipeline
p = QueryPipeline(verbose=True)
module_dict = {
**query_engines,
"input": InputComponent(),
"summarizer": TreeSummarize(),
"join": ArgPackComponent(
convert_fn=lambda x: NodeWithScore(node=TextNode(text=str(x)))
),
}
p.add_modules(module_dict)
# add links from input to query engine (id'ed by chunk_size)
for chunk_size in chunk_sizes:
p.add_link("input", str(chunk_size))
p.add_link(str(chunk_size), "join", dest_key=str(chunk_size))
p.add_link("join", "summarizer", dest_key="nodes")
p.add_link("input", "summarizer", dest_key="query_str")
# construct query pipeline
p = QueryPipeline(verbose=True)
module_dict = {
**query_engines,
"input": InputComponent(),
"summarizer": TreeSummarize(),
"join": ArgPackComponent(
convert_fn=lambda x: NodeWithScore(node=TextNode(text=str(x)))
),
}
p.add_modules(module_dict)
# add links from input to query engine (id'ed by chunk_size)
for chunk_size in chunk_sizes:
p.add_link("input", str(chunk_size))
p.add_link(str(chunk_size), "join", dest_key=str(chunk_size))
p.add_link("join", "summarizer", dest_key="nodes")
p.add_link("input", "summarizer", dest_key="query_str")
In [ ]:
Copied!
import time
start_time = time.time()
response = await p.arun(input="What did the author do during his time in YC?")
print(str(response))
end_time = time.time()
print(f"Time taken: {end_time - start_time}")
import time
start_time = time.time()
response = await p.arun(input="What did the author do during his time in YC?")
print(str(response))
end_time = time.time()
print(f"Time taken: {end_time - start_time}")
> Running modules and inputs in parallel: Module key: input. Input: input: What did the author do during his time in YC? > Running modules and inputs in parallel: Module key: 128. Input: input: What did the author do during his time in YC? Module key: 256. Input: input: What did the author do during his time in YC? Module key: 512. Input: input: What did the author do during his time in YC? Module key: 1024. Input: input: What did the author do during his time in YC? > Running modules and inputs in parallel: Module key: join. Input: 128: The author worked on solving the problems of startups that were part of the YC program. 256: The author worked on YC's internal software in Arc and also wrote essays during his time in YC. 512: During his time in YC, the author worked on various projects. Initially, he intended to do three things: hack, write essays, and work on YC. However, as YC grew and he became more excited about it, it... 1024: During his time in YC, the author worked on YC's internal software in Arc and wrote essays. He also worked on various projects related to YC, such as helping startups and solving their problems. Addit... > Running modules and inputs in parallel: Module key: summarizer. Input: query_str: What did the author do during his time in YC? nodes: [NodeWithScore(node=TextNode(id_='7e0b0aeb-04e3-4518-b534-2cf68c07ae1f', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='fe9144af45... During his time in YC, the author worked on various projects, including YC's internal software in Arc and writing essays. He also helped startups and solved their problems, and was involved in disputes between cofounders. Additionally, the author worked hard to ensure the success of YC and dealt with people who maltreated startups. Time taken: 3.943013906478882
In [ ]:
Copied!
# compare with sync method
start_time = time.time()
response = p.run(input="What did the author do during his time in YC?")
print(str(response))
end_time = time.time()
print(f"Time taken: {end_time - start_time}")
# compare with sync method
start_time = time.time()
response = p.run(input="What did the author do during his time in YC?")
print(str(response))
end_time = time.time()
print(f"Time taken: {end_time - start_time}")
> Running module input with input: input: What did the author do during his time in YC? > Running module 128 with input: input: What did the author do during his time in YC? > Running module 256 with input: input: What did the author do during his time in YC? > Running module 512 with input: input: What did the author do during his time in YC? > Running module 1024 with input: input: What did the author do during his time in YC? > Running module join with input: 128: The author worked on solving the problems of startups that were part of the YC program. 256: The author worked on YC's internal software in Arc and also wrote essays. 512: During his time in YC, the author worked on various projects. Initially, he intended to do three things: hack, write essays, and work on YC. However, as YC grew and he became more excited about it, it... 1024: During his time in YC, the author worked on YC's internal software in Arc, wrote essays, and worked on various projects related to YC. He also engaged in solving the problems faced by startups that we... > Running module summarizer with input: query_str: What did the author do during his time in YC? nodes: [NodeWithScore(node=TextNode(id_='4d698e2f-811e-42ce-bd0d-9b5615b0bbfd', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='fe9144af45... During his time in YC, the author worked on YC's internal software in Arc, wrote essays, and worked on various projects related to YC. He also engaged in solving the problems faced by startups that were part of YC's program. Additionally, the author mentioned working on tasks he didn't particularly enjoy, such as resolving disputes between cofounders and dealing with people who mistreated startups. Time taken: 7.640604019165039