Skip to content

数据摄取管道#

IngestionPipeline 采用 Transformations(数据转换)概念来处理输入数据。这些转换会应用于输入数据,生成的节点要么被返回,要么被插入到向量数据库中(如果指定了数据库)。每个节点+转换的组合会被缓存,这样后续运行(如果缓存被持久化)相同节点+转换组合时就可以使用缓存结果,从而节省时间。

要查看 IngestionPipeline 的实际交互示例,请参阅 RAG CLI

使用模式#

最简单的使用方式是像这样实例化一个 IngestionPipeline

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache

# 创建包含转换操作的管道
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(),
    ]
)

# 运行管道
nodes = pipeline.run(documents=[Document.example()])

注意在实际场景中,您应该通过 SimpleDirectoryReader 或 Llama Hub 中的其他阅读器获取文档。

连接向量数据库#

运行摄取管道时,您也可以选择将生成的节点自动插入远程向量存储。

之后,您可以从该向量存储构建索引。

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore

import qdrant_client

client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(),
    ],
    vector_store=vector_store,
)

# 直接摄取到向量数据库
pipeline.run(documents=[Document.example()])

# 创建索引
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_vector_store(vector_store)

在管道中计算嵌入向量#

注意在上例中,嵌入向量是作为管道的一部分计算的。如果您将管道连接到向量存储,嵌入向量必须是管道的一个阶段,否则后续索引实例化会失败。

如果您没有连接到向量存储(即仅生成节点列表),可以从管道中省略嵌入向量计算。

缓存机制#

IngestionPipeline 中,每个节点+转换的组合会被哈希并缓存。这可以节省后续使用相同数据时的运行时间。

以下部分介绍关于缓存的基本用法。

本地缓存管理#

创建管道后,您可能需要存储和加载缓存。

# 保存
pipeline.persist("./pipeline_storage")

# 加载并恢复状态
new_pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
    ],
)
new_pipeline.load("./pipeline_storage")

# 由于缓存存在会立即运行
nodes = pipeline.run(documents=[Document.example()])

如果缓存过大,可以清除它:

# 删除所有缓存内容
cache.clear()

远程缓存管理#

我们支持多种远程存储后端作为缓存:

  • RedisCache
  • MongoDBCache
  • FirestoreCache

以下是使用 RedisCache 的示例:

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache


ingest_cache = IngestionCache(
    cache=RedisCache.from_host_and_port(host="127.0.0.1", port=6379),
    collection="my_test_cache",
)

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(),
    ],
    cache=ingest_cache,
)

# 直接摄取到向量数据库
nodes = pipeline.run(documents=[Document.example()])

这里不需要持久化步骤,因为所有内容都会实时缓存在指定的远程集合中。

异步支持#

IngestionPipeline 也支持异步操作:

nodes = await pipeline.arun(documents=documents)

文档管理#

为摄取管道附加 docstore 可实现文档管理功能。

使用 document.doc_idnode.ref_doc_id 作为基准点,摄取管道会主动查找重复文档。

工作原理:

  • 存储 doc_id -> document_hash 的映射
  • 如果附加了向量存储:
  • 如果检测到重复 doc_id 且哈希值已更改,文档将被重新处理并更新插入
  • 如果检测到重复 doc_id 但哈希值未变,则跳过该节点
  • 如果未附加向量存储:
  • 检查每个节点的所有现有哈希值
  • 如果发现重复则跳过节点
  • 否则处理该节点

注意: 如果不附加向量存储,我们只能检查并移除重复输入。

from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore

pipeline = IngestionPipeline(
    transformations=[...], docstore=SimpleDocumentStore()
)

完整流程请参阅我们的 演示笔记本

另请查看使用 Redis 作为完整摄取栈 的指南。

并行处理#

IngestionPipelinerun 方法支持并行处理。它通过利用 multiprocessing.Pool 将节点批次分配到多个处理器来实现。

要启用并行处理,请将 num_workers 设置为所需的进程数:

from llama_index.core.ingestion import IngestionPipeline

pipeline = IngestionPipeline(
    transformations=[...],
)
pipeline.run(documents=[...], num_workers=4)

相关模块#