使用异步 API 的 Milvus 向量存储¶
本教程演示如何将 LlamaIndex 与 Milvus 结合使用,构建用于 RAG 的异步文档处理流水线。LlamaIndex 提供了处理文档并存储到 Milvus 等向量数据库的方法。通过利用 LlamaIndex 的异步 API 和 Milvus Python 客户端库,我们可以提升流水线的吞吐量,从而高效处理和索引海量数据。
在本教程中,我们将首先从宏观层面介绍使用异步方法构建基于 LlamaIndex 和 Milvus 的 RAG,随后深入讲解底层方法的使用,并对同步与异步方式的性能进行比较。
开始之前¶
本页面的代码片段需要依赖 pymilvus 和 llamaindex。您可以通过以下命令安装这些依赖:
! pip install -U pymilvus llama-index-vector-stores-milvus llama-index nest-asyncio
如果您正在使用 Google Colab,为了使刚安装的依赖项生效,您可能需要重启运行时环境(点击屏幕上方的"Runtime"菜单,然后从下拉菜单中选择"Restart session")。
我们将使用 OpenAI 提供的模型。你需要准备 api key OPENAI_API_KEY
作为环境变量。
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
如果您正在使用 Jupyter Notebook,在运行异步代码之前需要先执行这行代码。
import nest_asyncio
nest_asyncio.apply()
准备数据¶
您可以通过以下命令下载示例数据:
! mkdir -p 'data/'
! wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham_essay.txt'
! wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/10k/uber_2021.pdf' -O 'data/uber_2021.pdf'
import asyncio
import random
import time
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.vector_stores.milvus import MilvusVectorStore
URI = "http://localhost:19530"
DIM = 768
- 若需处理大规模数据,可在 docker 或 kubernetes 上部署高性能 Milvus 服务。此配置中请使用服务器 URI(例如
http://localhost:19530
)作为uri
参数。 - 如需使用全托管云服务 Zilliz Cloud,请调整
uri
和token
参数,它们分别对应 Zilliz Cloud 中的 公共终端地址和 API 密钥。 - 在复杂系统(如网络通信)场景中,异步处理相比同步方式能带来性能提升。因此我们认为 Milvus-Lite 不适用于异步接口,因其使用场景并不匹配该特性。
定义一个初始化函数,以便后续可重复使用来重建 Milvus 集合。
def init_vector_store():
return MilvusVectorStore(
uri=URI,
# token=TOKEN,
dim=DIM,
collection_name="test_collection",
embedding_field="embedding",
id_field="id",
similarity_metric="COSINE",
consistency_level="Strong",
overwrite=True, # To overwrite the collection if it already exists
)
vector_store = init_vector_store()
2025-01-24 20:04:39,414 [DEBUG][_create_connection]: Created new connection using: faa8be8753f74288bffc7e6d38942f8a (async_milvus_client.py:600)
使用 SimpleDirectoryReader 从文件 paul_graham_essay.txt
中封装一个 LlamaIndex 文档对象。
from llama_index.core import SimpleDirectoryReader
# load documents
documents = SimpleDirectoryReader(
input_files=["./data/paul_graham_essay.txt"]
).load_data()
print("Document ID:", documents[0].doc_id)
Document ID: 41a6f99c-489f-49ff-9821-14e2561140eb
在本地实例化一个 Hugging Face 嵌入模型。使用本地模型可避免异步数据插入时触及 API 速率限制的风险,因为并发 API 请求会快速累积并耗尽公共 API 的预算额度。但若您拥有较高的速率限制,也可以选择使用远程模型服务。
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")
创建索引并插入文档。
我们将 use_async
设置为 True
以启用异步插入模式。
# Create an index over the documents
from llama_index.core import VectorStoreIndex, StorageContext
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model,
use_async=True,
)
初始化大语言模型(LLM)。
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo")
在构建查询引擎时,您还可以将 use_async
参数设置为 True
以启用异步搜索。
query_engine = index.as_query_engine(use_async=True, llm=llm)
response = await query_engine.aquery("What did the author learn?")
print(response)
The author learned that the field of artificial intelligence, as practiced at the time, was not as promising as initially believed. The approach of using explicit data structures to represent concepts in AI was not effective in achieving true understanding of natural language. This realization led the author to shift his focus towards Lisp and eventually towards exploring the field of art.
vector_store = init_vector_store()
2025-01-24 20:07:38,727 [DEBUG][_create_connection]: Created new connection using: 5e0d130f3b644555ad7ea6b8df5f1fc2 (async_milvus_client.py:600)
让我们定义一个节点生成函数,该函数将用于为索引生成大量测试节点。
def random_id():
random_num_str = ""
for _ in range(16):
random_digit = str(random.randint(0, 9))
random_num_str += random_digit
return random_num_str
def produce_nodes(num_adding):
node_list = []
for i in range(num_adding):
node = TextNode(
id_=random_id(),
text=f"n{i}_text",
embedding=[0.5] * (DIM - 1) + [random.random()],
relationships={
NodeRelationship.SOURCE: RelatedNodeInfo(node_id=f"n{i+1}")
},
)
node_list.append(node)
return node_list
定义一个异步函数用于向向量存储库添加文档。我们使用 Milvus 向量存储实例中的 async_add()
函数。
async def async_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
tasks = []
for i in range(num_adding):
sub_nodes = node_list[i]
task = vector_store.async_add([sub_nodes]) # use async_add()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
add_counts = [10, 100, 1000]
获取事件循环。
loop = asyncio.get_event_loop()
异步将文档添加至向量存储。
for count in add_counts:
async def measure_async_add():
async_time = await async_add(count)
print(f"Async add for {count} took {async_time:.2f} seconds")
return async_time
loop.run_until_complete(measure_async_add())
Async add for 10 took 0.19 seconds Async add for 100 took 0.48 seconds Async add for 1000 took 3.22 seconds
vector_store = init_vector_store()
2025-01-24 20:07:45,554 [DEBUG][_create_connection]: Created new connection using: b14dde8d6d24489bba26a907593f692d (async_milvus_client.py:600)
同步加法操作的对比¶
定义一个同步加法函数。然后在相同条件下测量其运行时间。
def sync_add(num_adding):
node_list = produce_nodes(num_adding)
start_time = time.time()
for node in node_list:
result = vector_store.add([node])
end_time = time.time()
return end_time - start_time
for count in add_counts:
sync_time = sync_add(count)
print(f"Sync add for {count} took {sync_time:.2f} seconds")
Sync add for 10 took 0.56 seconds Sync add for 100 took 5.85 seconds Sync add for 1000 took 62.91 seconds
结果显示,同步添加过程比异步过程慢得多。
异步搜索¶
在执行搜索前,请重新初始化向量存储并添加若干文档。
vector_store = init_vector_store()
node_list = produce_nodes(num_adding=1000)
inserted_ids = vector_store.add(node_list)
2025-01-24 20:08:57,982 [DEBUG][_create_connection]: Created new connection using: 351dc7ea4fb14d4386cfab02621ab7d1 (async_milvus_client.py:600)
定义一个异步搜索函数。我们使用 Milvus 向量存储实例中的 aquery()
函数。
async def async_search(num_queries):
start_time = time.time()
tasks = []
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
task = vector_store.aquery(query=query) # use aquery()
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
return end_time - start_time
query_counts = [10, 100, 1000]
异步从 Milvus 存储中搜索。
for count in query_counts:
async def measure_async_search():
async_time = await async_search(count)
print(
f"Async search for {count} queries took {async_time:.2f} seconds"
)
return async_time
loop.run_until_complete(measure_async_search())
Async search for 10 queries took 0.55 seconds Async search for 100 queries took 1.39 seconds Async search for 1000 queries took 8.81 seconds
对比同步搜索¶
定义一个同步搜索函数。随后在相同条件下测量其运行时间。
def sync_search(num_queries):
start_time = time.time()
for _ in range(num_queries):
query = VectorStoreQuery(
query_embedding=[0.5] * (DIM - 1) + [0.6], similarity_top_k=3
)
result = vector_store.query(query=query)
end_time = time.time()
return end_time - start_time
for count in query_counts:
sync_time = sync_search(count)
print(f"Sync search for {count} queries took {sync_time:.2f} seconds")
Sync search for 10 queries took 3.29 seconds Sync search for 100 queries took 30.80 seconds Sync search for 1000 queries took 308.80 seconds
结果显示,同步搜索过程比异步方式慢得多。