Pathway 阅读器¶
Pathway 是一个开源的数据处理框架。它能让您轻松开发数据转换流水线和机器学习应用,实时处理动态数据源和变化中的数据。
本笔记本演示如何搭建实时数据索引流水线。您可以从LLM应用中查询该流水线结果,操作方式与常规阅读器无异。但在底层,Pathway会在每次数据变更时更新索引,确保您始终获得最新答案。
在本笔记本中,我们将首先将llama_index.readers.pathway.PathwayReader阅读器连接到公开的演示文档处理流水线,该流水线能够:
- 监控多个云端数据源的变更
- 为数据构建向量索引
如需搭建专属文档处理流水线,可查看托管方案或自行构建(参照本笔记本实现)。
本文描述的基础流水线可轻松构建云端文件的简易索引。但Pathway还提供构建实时数据流水线和应用所需的全部功能,包括:类SQL操作(如分组聚合和跨数据源连接)、基于时间的数据分组与窗口函数,以及丰富的连接器。
有关Pathway数据摄取流水线和向量存储的更多细节,请访问向量存储流水线。
前提条件¶
安装 llama-index-readers-pathway 集成包
%pip install llama-index-readers-pathway
配置日志记录
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.ERROR)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))
设置您的 OpenAI API 密钥。
import getpass
import os
# omit if embedder of choice is not OpenAI
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
创建读取器并连接至公共管道¶
要实例化和配置 PathwayReader,您需要提供文档索引管道的 url 或 host 和 port。在以下代码中,我们使用了一个公开可用的演示管道,其 REST API 可通过 https://demo-document-indexing.pathway.stream 访问。该演示会从 Google Drive 和 Sharepoint 摄取文档,并维护一个用于检索文档的索引。
from llama_index.readers.pathway import PathwayReader
reader = PathwayReader(url="https://demo-document-indexing.pathway.stream")
# let us search with some text
reader.load_data(query_text="What is Pathway")
使用 llama-index 创建摘要索引¶
docs = reader.load_data(query_text="What is Pathway", k=2)
from llama_index.core import SummaryIndex
index = SummaryIndex.from_documents(docs)
query_engine = index.as_query_engine()
response = query_engine.query("What does Pathway do?")
print(response)
构建自定义数据处理流水线¶
前提条件¶
安装 pathway 包。然后下载示例数据。
%pip install pathway
%pip install llama-index-embeddings-openai
!mkdir -p 'data/'
!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'
定义由Pathway追踪的数据源¶
Pathway 能够同时监听多种数据源的变化,包括本地文件、S3 文件夹、云存储以及各类数据流。
更多信息请参阅 pathway-io。
import pathway as pw
data_sources = []
data_sources.append(
pw.io.fs.read(
"./data",
format="binary",
mode="streaming",
with_metadata=True,
) # This creates a `pathway` connector that tracks
# all the files in the ./data directory
)
# This creates a connector that tracks files in Google drive.
# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials
# data_sources.append(
# pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))
创建文档索引管道¶
让我们创建文档索引处理流程。transformations 应为一个以 Embedding 转换结尾的 TransformComponent 组件列表。
在本示例中,我们首先使用 TokenTextSplitter 进行文本分割,然后通过 OpenAIEmbedding 进行嵌入处理。
from pathway.xpacks.llm.vector_store import VectorStoreServer
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter
embed_model = OpenAIEmbedding(embed_batch_size=10)
transformations_example = [
TokenTextSplitter(
chunk_size=150,
chunk_overlap=10,
separator=" ",
),
embed_model,
]
processing_pipeline = VectorStoreServer.from_llamaindex_components(
*data_sources,
transformations=transformations_example,
)
# Define the Host and port that Pathway will be on
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754
# `threaded` runs pathway in detached mode, we have to set it to False when running from terminal or container
# for more information on `with_cache` check out https://pathway.com/developers/api-docs/persistence-api
processing_pipeline.run_server(
host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True
)
将读取器连接至自定义流水线¶
from llama_index.readers.pathway import PathwayReader
reader = PathwayReader(host=PATHWAY_HOST, port=PATHWAY_PORT)
# let us search with some text
reader.load_data(query_text="What is Pathway")