Nile 向量存储(多租户 PostgreSQL)¶
本笔记本演示如何使用基于 Postgres 的向量存储 NileVectorStore
来存储和查询多租户 RAG 应用中的向量嵌入。
什么是 Nile?¶
Nile 是一个 Postgres 数据库,支持按租户执行所有数据库操作(包括自动扩展、分支和备份),并实现完全的客户隔离。
多租户 RAG 应用正日益流行,因为它们在使用大语言模型的同时提供了安全性和隐私性。
然而,管理底层 Postgres 数据库并非易事。每个租户单独使用数据库(DB-per-tenant)成本高昂且管理复杂,而共享数据库(shared-DB)存在安全和隐私问题,同时也会限制 RAG 应用的可扩展性和性能。Nile 对 Postgres 进行了重构,实现了两全其美——既具备 DB-per-tenant 的隔离性,又拥有 shared-DB 的成本效益和开发体验。
在共享数据库中存储数百万向量可能速度缓慢,且需要大量资源进行索引和查询。但如果在 Nile 的虚拟租户数据库中存储 1000 个租户(每个租户含 1000 个向量),这将变得易于管理。特别是您可以将大型租户部署在专属计算资源上,而小型租户可以高效共享计算资源并按需自动扩展。
开始使用 Nile¶
首先注册 Nile。注册完成后,系统会引导您创建第一个数据库。完成创建后,您将被重定向至新数据库的"查询编辑器"页面。
在该页面点击"Home"(左侧菜单顶部图标),选择"generate credentials"并复制生成的连接字符串,稍后您将需要用到它。
附加资源¶
%pip install llama-index-vector-stores-nile
%pip install /Users/gwen/workspaces/llama_index/llama-index-integrations/vector_stores/llama-index-vector-stores-nile/dist/llama_index_vector_stores_nile-0.1.1.tar.gz
!pip install llama-index
import logging
from llama_index.core import SimpleDirectoryReader, StorageContext
from llama_index.core import VectorStoreIndex
from llama_index.core.vector_stores import (
MetadataFilter,
MetadataFilters,
FilterOperator,
)
from llama_index.vector_stores.nile import NileVectorStore, IndexType
建立与 Nile 数据库的连接¶
假设您已按照前文《Nile 入门指南》中的说明操作,现在应已获得 Nile 数据库的连接字符串。
您可以通过设置名为 NILEDB_SERVICE_URL
的环境变量,或直接在 Python 中配置该连接字符串。
%env NILEDB_SERVICE_URL=postgresql://username:password@us-west-2.db.thenile.dev:5432/niledb
现在,我们将创建一个 NileVectorStore
。请注意,除了 URL 和维度等常规参数外,我们还设置了 tenant_aware=True
。
:fire: NileVectorStore 同时支持租户感知向量存储(隔离各租户的文档)和常规存储(通常用于所有租户均可访问的共享数据)。下文我们将演示租户感知向量存储的功能。
# Get the service url by reading local .env file with NILE_SERVICE_URL variable
import os
NILEDB_SERVICE_URL = os.environ["NILEDB_SERVICE_URL"]
# OR set it explicitly
# NILE_SERVICE_URL = "postgresql://nile:password@db.thenile.dev:5432/nile"
vector_store = NileVectorStore(
service_url=NILEDB_SERVICE_URL,
table_name="documents",
tenant_aware=True,
num_dimensions=1536,
)
配置 OpenAI¶
您可以在 .env
文件中进行设置,也可以直接在 Python 中配置
%env OPENAI_API_KEY=sk-...
# Uncomment and set it explicitly if you prefer not to use .env
# os.environ["OPENAI_API_KEY"] = "sk-..."
多租户相似性搜索¶
为了演示如何使用 LlamaIndex 和 Nile 实现多租户相似性搜索,我们将下载两份文档——每份文档分别包含不同公司的销售电话记录。Nexiv 提供 IT 服务,而 ModaMart 从事零售业务。我们将为每份文档添加租户标识符,并将其加载到支持多租户的向量存储中。随后,我们将针对每个租户查询该存储库。您将看到同一个问题如何生成两种不同的响应,因为系统会为每个租户检索不同的文档。
下载数据¶
!mkdir -p data
!wget "https://raw.githubusercontent.com/niledatabase/niledatabase/main/examples/ai/sales_insight/data/transcripts/nexiv-solutions__0_transcript.txt" -O "data/nexiv-solutions__0_transcript.txt"
!wget "https://raw.githubusercontent.com/niledatabase/niledatabase/main/examples/ai/sales_insight/data/transcripts/modamart__0_transcript.txt" -O "data/modamart__0_transcript.txt"
加载文档¶
我们将使用 LlamaIndex 的 SimpleDirectoryReader
来加载文档。由于需要在加载后为文档添加租户元数据,我们将为每个租户单独使用一个读取器。
reader = SimpleDirectoryReader(
input_files=["data/nexiv-solutions__0_transcript.txt"]
)
documents_nexiv = reader.load_data()
reader = SimpleDirectoryReader(input_files=["data/modamart__0_transcript.txt"])
documents_modamart = reader.load_data()
通过租户元数据丰富文档内容¶
我们将创建两个 Nile 租户,并将每个租户的 ID 添加到文档元数据中。同时还会添加一些额外的元数据,例如自定义文档 ID 和分类标签。这些元数据可在检索过程中用于筛选文档。当然,在您自己的应用程序中,您也可以为现有租户加载文档,并添加任何您认为有用的元数据信息。
tenant_id_nexiv = str(vector_store.create_tenant("nexiv-solutions"))
tenant_id_modamart = str(vector_store.create_tenant("modamart"))
# Add the tenant id to the metadata
for i, doc in enumerate(documents_nexiv, start=1):
doc.metadata["tenant_id"] = tenant_id_nexiv
doc.metadata[
"category"
] = "IT" # We will use this to apply additional filters in a later example
doc.id_ = f"nexiv_doc_id_{i}" # We are also setting a custom id, this is optional but can be useful
for i, doc in enumerate(documents_modamart, start=1):
doc.metadata["tenant_id"] = tenant_id_modamart
doc.metadata["category"] = "Retail"
doc.id_ = f"modamart_doc_id_{i}"
使用 NileVectorStore 创建 VectorStore 索引¶
我们将所有文档加载至同一个 VectorStoreIndex
。由于在初始设置时已创建支持租户隔离的 NileVectorStore
,Nile 会正确利用元数据中的 tenant_id
字段实现文档隔离。
若尝试将不含 tenant_id
的文档加载至支持租户隔离的存储,系统将抛出 ValueException
异常。
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents_nexiv + documents_modamart,
storage_context=storage_context,
show_progress=True,
)
/Users/gwen/.pyenv/versions/3.10.15/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html from .autonotebook import tqdm as notebook_tqdm Parsing nodes: 100%|██████████| 2/2 [00:00<00:00, 1129.32it/s] Generating embeddings: 100%|██████████| 2/2 [00:00<00:00, 4.58it/s]
为每个租户查询索引¶
如下所示,我们可以为每个查询指定租户,从而仅获取与该租户相关且专属的查询结果
nexiv_query_engine = index.as_query_engine(
similarity_top_k=3,
vector_store_kwargs={
"tenant_id": str(tenant_id_nexiv),
},
)
print(nexiv_query_engine.query("What were the customer pain points?"))
The customer pain points were related to managing customer data using multiple platforms, leading to data discrepancies, time-consuming reconciliation efforts, and decreased productivity.
modamart_query_engine = index.as_query_engine(
similarity_top_k=3,
vector_store_kwargs={
"tenant_id": str(tenant_id_modamart),
},
)
print(modamart_query_engine.query("What were the customer pain points?"))
The customer's pain points were concerns about the quality and value of the winter jackets, skepticism towards reviews, worries about sizing and fit when ordering clothes online, and the desire for a warm but lightweight jacket.
查询现有嵌入向量¶
在上述示例中,我们通过加载和嵌入新文档创建了索引。但如果我们已经生成嵌入向量并将其存储在 Nile 中,该如何操作呢?
这种情况下,你仍需要像上文那样初始化 NileVectorStore
,但不再使用 VectorStoreIndex.from_documents(...)
,而是改用以下方式:
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
query_engine = index.as_query_engine(
vector_store_kwargs={
"tenant_id": str(tenant_id_modamart),
},
)
response = query_engine.query("What action items do we need to follow up on?")
print(response)
The action items to follow up on include sending the customer detailed testimonials about the lightweight and warm qualities of the jackets, providing the customer with a sizing guide, and emailing the customer a 10% discount on their first purchase.
使用 ANN 索引实现近似最近邻搜索¶
Nile 支持 pgvector 提供的所有索引类型 - IVFFlat 和 HNSW。IVFFlat 速度更快、资源占用更少且调参简单。HNSW 在创建和使用时消耗更多资源,调参更具挑战性,但在准确性与速度之间提供了出色的平衡。虽然仅有两篇文档的示例实际上并不需要索引,但我们仍将演示如何使用这些索引。
IVFFlat 索引¶
IVFFlat 索引的工作原理是将向量空间划分为称为"列表"的区域,首先找到最近的列表,然后在这些列表中搜索最近的邻居。在创建索引时需指定列表数量(nlists
),而在查询时,可以指定搜索中使用多少个最近列表(ivfflat_probes
)。
try:
vector_store.create_index(index_type=IndexType.PGVECTOR_IVFFLAT, nlists=10)
except Exception as e:
# This will throw an error if the index already exists, which may be expected
print(e)
nexiv_query_engine = index.as_query_engine(
similarity_top_k=3,
vector_store_kwargs={
"tenant_id": str(tenant_id_nexiv),
"ivfflat_probes": 10,
},
)
print(
nexiv_query_engine.query("What action items do we need to follow up on?")
)
vector_store.drop_index()
Index documents_embedding_idx already exists
HNSW 索引¶
HNSW 索引的工作原理是将向量空间划分为多层图结构,每层包含不同粒度级别的节点连接关系。搜索时,算法会从粗粒度层逐步导航至细粒度层,从而在数据中识别最近邻。创建索引时,您需要指定每层的最大连接数(m
)以及构建图结构时考虑的候选向量数量(ef_construction
)。执行查询时,可指定待搜索候选列表的大小(hnsw_ef
)。
try:
vector_store.create_index(
index_type=IndexType.PGVECTOR_HNSW, m=16, ef_construction=64
)
except Exception as e:
# This will throw an error if the index already exists, which may be expected
print(e)
nexiv_query_engine = index.as_query_engine(
similarity_top_k=3,
vector_store_kwargs={
"tenant_id": str(tenant_id_nexiv),
"hnsw_ef": 10,
},
)
print(nexiv_query_engine.query("Did we mention any pricing?"))
vector_store.drop_index()
filters = MetadataFilters(
filters=[
MetadataFilter(
key="category", operator=FilterOperator.EQ, value="Retail"
),
]
)
nexiv_query_engine_filtered = index.as_query_engine(
similarity_top_k=3,
filters=filters,
vector_store_kwargs={"tenant_id": str(tenant_id_nexiv)},
)
print(
"test query on nexiv with filter on category = Retail (should return empty): ",
nexiv_query_engine_filtered.query("What were the customer pain points?"),
)
test query on nexiv with filter on category = Retail (should return empty): Empty Response
删除文档¶
删除文档可能相当重要。特别是当您的某些租户位于需要遵守 GDPR(通用数据保护条例)的地区时。
ref_doc_id = "nexiv_doc_id_1"
vector_store.delete(ref_doc_id, tenant_id=tenant_id_nexiv)
# Query the data again
print(
"test query on nexiv after deletion (should return empty): ",
nexiv_query_engine.query("What were the customer pain points?"),
)
test query on nexiv after deletion (should return empty): Empty Response