路径检索器¶
Pathway 是一个开源的数据处理框架。它能让您轻松开发数据转换流水线和机器学习应用,这些应用可实时处理动态数据源和变化的数据。
本笔记本演示了如何结合 LlamaIndex 使用实时数据索引流水线。您可以通过提供的 PathwayRetriever 在LLM应用中查询该流水线的结果。其底层机制会在每次数据变更时更新索引,确保您始终获得最新答案。
在本示例中,我们将使用一个公开的文档处理流水线演示,该流水线能够:
- 监控多个云端数据源的变更
- 为数据构建向量索引
如需搭建专属文档处理流水线,可查看托管服务方案或自行构建(参照本笔记本实现)。
我们将通过 llama_index.retrievers.pathway.PathwayRetriever 检索器连接索引,该组件实现了 retrieve 接口。
本文档描述的基础流水线可轻松构建云端文件的简易索引。但Pathway还提供构建实时数据流水线和应用所需的完整功能,包括:类SQL操作(如跨数据源的分组聚合与连接)、基于时间的数据分组与窗口计算,以及丰富的连接器支持。
有关Pathway数据摄取流水线和向量存储的更多细节,请访问向量存储流水线。
前提条件¶
要使用 PathwayRetrievier,您必须安装 llama-index-retrievers-pathway 包。
!pip install llama-index-retrievers-pathway
为 llama-index 创建检索器¶
要实例化和配置 PathPathRetriever,您需要提供文档索引管道的 url 或 host 与 port 参数。以下代码示例中,我们使用公开可用的演示管道,其 REST API 可通过 https://demo-document-indexing.pathway.stream 访问。该演示管道会从Google 云端硬盘和Sharepoint摄取文档,并维护用于检索文档的索引。
from llama_index.retrievers.pathway import PathwayRetriever
retriever = PathwayRetriever(
url="https://demo-document-indexing.pathway.stream"
)
retriever.retrieve(str_or_query_bundle="what is pathway")
轮到你了! 获取你的数据处理管道 或向演示管道上传新文档后重新尝试查询!
在查询引擎中的使用¶
from llama_index.core.query_engine import RetrieverQueryEngine
query_engine = RetrieverQueryEngine.from_args(
retriever,
)
response = query_engine.query("Tell me about Pathway")
print(str(response))
构建自定义数据处理管道¶
前提条件¶
安装 pathway 包。然后下载示例数据。
%pip install pathway
%pip install llama-index-embeddings-openai
!mkdir -p 'data/'
!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'
定义由Pathway追踪的数据源¶
Pathway 能够同时监听多种数据源的变化,包括本地文件、S3 文件夹、云存储以及各类数据流。
更多信息请参阅 pathway-io。
import pathway as pw
data_sources = []
data_sources.append(
pw.io.fs.read(
"./data",
format="binary",
mode="streaming",
with_metadata=True,
) # This creates a `pathway` connector that tracks
# all the files in the ./data directory
)
# This creates a connector that tracks files in Google drive.
# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials
# data_sources.append(
# pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))
创建文档索引管道¶
让我们创建文档索引流水线。transformations 应为一个以 Embedding 转换结尾的 TransformComponent 组件列表。
在本示例中,我们首先使用 TokenTextSplitter 对文本进行分割,然后通过 OpenAIEmbedding 进行嵌入处理。
from pathway.xpacks.llm.vector_store import VectorStoreServer
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter
embed_model = OpenAIEmbedding(embed_batch_size=10)
transformations_example = [
TokenTextSplitter(
chunk_size=150,
chunk_overlap=10,
separator=" ",
),
embed_model,
]
processing_pipeline = VectorStoreServer.from_llamaindex_components(
*data_sources,
transformations=transformations_example,
)
# Define the Host and port that Pathway will be on
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754
# `threaded` runs pathway in detached mode, we have to set it to False when running from terminal or container
# for more information on `with_cache` check out https://pathway.com/developers/api-docs/persistence-api
processing_pipeline.run_server(
host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True
)
将检索器连接到自定义流水线¶
from llama_index.retrievers.pathway import PathwayRetriever
retriever = PathwayRetriever(host=PATHWAY_HOST, port=PATHWAY_PORT)
retriever.retrieve(str_or_query_bundle="what is pathway")