构建基于 Google Drive 文件的实时 RAG 管道¶
本指南将展示如何构建一个基于 Google Drive 文件的"实时"RAG(检索增强生成)管道。
该管道会索引 Google Drive 文件并将其存储到 Redis 向量数据库中。之后每次重新运行数据摄取管道时,系统只会传播增量更新,确保向量数据库中仅更新发生变动的文档。这意味着我们无需重新索引所有文档!
我们使用以下数据源——您需要将这些文件复制并上传到自己的 Google Drive 目录中!
注意:您还需要设置服务账号和 credentials.json 文件。更多详情请参阅我们在 LlamaHub 上的 Google Drive 加载器页面:https://llamahub.ai/l/readers/llama-index-readers-google?from=readers
安装¶
我们安装必要的软件包并启动 Redis Docker 镜像。
In [ ]:
Copied!
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
%pip install llama-index-readers-google
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
%pip install llama-index-readers-google
In [ ]:
Copied!
# if creating a new container
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
# # if starting an existing container
# !docker start -a redis-stack
# if creating a new container
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
# # if starting an existing container
# !docker start -a redis-stack
d32273cc1267d3221afa780db0edcd6ce5eee08ae88886f645410b9a220d4916
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
定义数据摄取管道¶
此处我们定义数据摄取管道。给定一组文档后,我们将执行句子分割/嵌入转换操作,随后将其加载至 Redis 文档存储/向量存储中。
向量存储用于数据索引及嵌入向量存储,文档存储则用于追踪重复内容。
In [ ]:
Copied!
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import (
DocstoreStrategy,
IngestionPipeline,
IngestionCache,
)
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.redis import RedisVectorStore
from redisvl.schema import IndexSchema
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import (
DocstoreStrategy,
IngestionPipeline,
IngestionCache,
)
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.redis import RedisVectorStore
from redisvl.schema import IndexSchema
In [ ]:
Copied!
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
model.safetensors: 0%| | 0.00/133M [00:00<?, ?B/s]
tokenizer_config.json: 0%| | 0.00/366 [00:00<?, ?B/s]
vocab.txt: 0%| | 0.00/232k [00:00<?, ?B/s]
tokenizer.json: 0%| | 0.00/711k [00:00<?, ?B/s]
special_tokens_map.json: 0%| | 0.00/125 [00:00<?, ?B/s]
1_Pooling/config.json: 0%| | 0.00/190 [00:00<?, ?B/s]
In [ ]:
Copied!
custom_schema = IndexSchema.from_dict(
{
"index": {"name": "gdrive", "prefix": "doc"},
# customize fields that are indexed
"fields": [
# required fields for llamaindex
{"type": "tag", "name": "id"},
{"type": "tag", "name": "doc_id"},
{"type": "text", "name": "text"},
# custom vector field for bge-small-en-v1.5 embeddings
{
"type": "vector",
"name": "vector",
"attrs": {
"dims": 384,
"algorithm": "hnsw",
"distance_metric": "cosine",
},
},
],
}
)
vector_store = RedisVectorStore(
schema=custom_schema,
redis_url="redis://localhost:6379",
)
custom_schema = IndexSchema.from_dict(
{
"index": {"name": "gdrive", "prefix": "doc"},
# customize fields that are indexed
"fields": [
# required fields for llamaindex
{"type": "tag", "name": "id"},
{"type": "tag", "name": "doc_id"},
{"type": "text", "name": "text"},
# custom vector field for bge-small-en-v1.5 embeddings
{
"type": "vector",
"name": "vector",
"attrs": {
"dims": 384,
"algorithm": "hnsw",
"distance_metric": "cosine",
},
},
],
}
)
vector_store = RedisVectorStore(
schema=custom_schema,
redis_url="redis://localhost:6379",
)
In [ ]:
Copied!
# Optional: clear vector store if exists
if vector_store.index_exists():
vector_store.delete_index()
# Optional: clear vector store if exists
if vector_store.index_exists():
vector_store.delete_index()
In [ ]:
Copied!
# Set up the ingestion cache layer
cache = IngestionCache(
cache=RedisCache.from_host_and_port("localhost", 6379),
collection="redis_cache",
)
# Set up the ingestion cache layer
cache = IngestionCache(
cache=RedisCache.from_host_and_port("localhost", 6379),
collection="redis_cache",
)
In [ ]:
Copied!
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store"
),
vector_store=vector_store,
cache=cache,
docstore_strategy=DocstoreStrategy.UPSERTS,
)
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store"
),
vector_store=vector_store,
cache=cache,
docstore_strategy=DocstoreStrategy.UPSERTS,
)
定义向量存储索引¶
我们将索引定义为对底层向量存储的封装。
In [ ]:
Copied!
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
In [ ]:
Copied!
from llama_index.readers.google import GoogleDriveReader
from llama_index.readers.google import GoogleDriveReader
In [ ]:
Copied!
loader = GoogleDriveReader()
loader = GoogleDriveReader()
In [ ]:
Copied!
def load_data(folder_id: str):
docs = loader.load_data(folder_id=folder_id)
for doc in docs:
doc.id_ = doc.metadata["file_name"]
return docs
docs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5")
# print(docs)
def load_data(folder_id: str):
docs = loader.load_data(folder_id=folder_id)
for doc in docs:
doc.id_ = doc.metadata["file_name"]
return docs
docs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5")
# print(docs)
In [ ]:
Copied!
nodes = pipeline.run(documents=docs)
print(f"Ingested {len(nodes)} Nodes")
nodes = pipeline.run(documents=docs)
print(f"Ingested {len(nodes)} Nodes")
由于这是我们首次启动向量存储,可以看到所有文档都已通过分块和嵌入的方式完成转换/摄取。
基于初始数据提出问题¶
In [ ]:
Copied!
query_engine = index.as_query_engine()
query_engine = index.as_query_engine()
In [ ]:
Copied!
response = query_engine.query("What are the sub-types of question answering?")
response = query_engine.query("What are the sub-types of question answering?")
In [ ]:
Copied!
print(str(response))
print(str(response))
The sub-types of question answering mentioned in the context are semantic search and summarization.
In [ ]:
Copied!
docs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5")
nodes = pipeline.run(documents=docs)
print(f"Ingested {len(nodes)} Nodes")
docs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5")
nodes = pipeline.run(documents=docs)
print(f"Ingested {len(nodes)} Nodes")
请注意,这里仅有一个节点被摄取。这是因为只有一份文档发生了变更,而其他文档保持不变。这意味着我们只需重新转换和重新嵌入这一份文档即可!
针对新数据提问¶
In [ ]:
Copied!
query_engine = index.as_query_engine()
query_engine = index.as_query_engine()
In [ ]:
Copied!
response = query_engine.query("What are the sub-types of question answering?")
response = query_engine.query("What are the sub-types of question answering?")
In [ ]:
Copied!
print(str(response))
print(str(response))
The sub-types of question answering mentioned in the context are semantic search, summarization, and structured analytics.