数据摄取管道 + 文档管理¶
为摄取管道附加 docstore 可实现文档管理功能。
系统将以 document.doc_id 或 node.ref_doc_id 作为基准点,主动检测重复文档。
工作原理如下:
- 存储
doc_id->document_hash的映射关系 - 当检测到重复
doc_id且哈希值发生变化时,文档将重新处理 - 若哈希值未变化,则该文档会在管道中被跳过
若不附加向量数据库,系统仅能检测并移除重复输入。
若附加向量数据库,则还能处理更新插入操作!我们另有关于更新插入与向量数据库的专门指南。
创建种子数据¶
In [ ]:
Copied!
%pip install llama-index-storage-docstore-redis
%pip install llama-index-storage-docstore-mongodb
%pip install llama-index-embeddings-huggingface
%pip install llama-index-storage-docstore-redis
%pip install llama-index-storage-docstore-mongodb
%pip install llama-index-embeddings-huggingface
In [ ]:
Copied!
# Make some test data
!mkdir -p data
!echo "This is a test file: one!" > data/test1.txt
!echo "This is a test file: two!" > data/test2.txt
# Make some test data
!mkdir -p data
!echo "This is a test file: one!" > data/test1.txt
!echo "This is a test file: two!" > data/test2.txt
In [ ]:
Copied!
from llama_index.core import SimpleDirectoryReader
# load documents with deterministic IDs
documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
from llama_index.core import SimpleDirectoryReader
# load documents with deterministic IDs
documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
/home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages/deeplake/util/check_latest_version.py:32: UserWarning: A newer version of deeplake (3.8.9) is available. It's recommended that you update to the latest version using `pip install -U deeplake`. warnings.warn(
创建包含文档存储的流水线¶
In [ ]:
Copied!
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.storage.docstore.mongodb import MongoDocumentStore
from llama_index.core.node_parser import SentenceSplitter
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
],
docstore=SimpleDocumentStore(),
)
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.storage.docstore.mongodb import MongoDocumentStore
from llama_index.core.node_parser import SentenceSplitter
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
],
docstore=SimpleDocumentStore(),
)
In [ ]:
Copied!
nodes = pipeline.run(documents=documents)
nodes = pipeline.run(documents=documents)
Docstore strategy set to upserts, but no vector store. Switching to duplicates_only strategy.
In [ ]:
Copied!
print(f"Ingested {len(nodes)} Nodes")
print(f"Ingested {len(nodes)} Nodes")
Ingested 2 Nodes
In [ ]:
Copied!
pipeline.persist("./pipeline_storage")
pipeline.persist("./pipeline_storage")
In [ ]:
Copied!
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
]
)
# restore the pipeline
pipeline.load("./pipeline_storage")
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
]
)
# restore the pipeline
pipeline.load("./pipeline_storage")
In [ ]:
Copied!
!echo "This is a test file: three!" > data/test3.txt
!echo "This is a NEW test file: one!" > data/test1.txt
!echo "This is a test file: three!" > data/test3.txt
!echo "This is a NEW test file: one!" > data/test1.txt
In [ ]:
Copied!
documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
In [ ]:
Copied!
nodes = pipeline.run(documents=documents)
nodes = pipeline.run(documents=documents)
Docstore strategy set to upserts, but no vector store. Switching to duplicates_only strategy.
In [ ]:
Copied!
print(f"Ingested {len(nodes)} Nodes")
print(f"Ingested {len(nodes)} Nodes")
Ingested 2 Nodes
让我们确认哪些节点已被摄取:
In [ ]:
Copied!
for node in nodes:
print(f"Node: {node.text}")
for node in nodes:
print(f"Node: {node.text}")
Node: This is a NEW test file: one! Node: This is a test file: three!
我们还可以验证文档存储中仅跟踪了三份文档
In [ ]:
Copied!
print(len(pipeline.docstore.docs))
print(len(pipeline.docstore.docs))
3