安装与配置¶
# Install some libraries we'll use for our examples. These
# are not required to use Airtrain with LlamaIndex, and are just
# there to help us illustrate use.
%pip install llama-index-embeddings-openai==0.2.4
%pip install llama-index-readers-web==0.2.2
%pip install llama-index-readers-github==0.2.0
# Install Airtrain SDK with LlamaIndex integration
%pip install airtrain-py[llama-index]
# Running async code in a notebook requires using nest_asyncio, and we will
# use some async examples. So we will set up nest_asyncio here. Outside
# an async context or outside a notebook, this step is not required.
import nest_asyncio
nest_asyncio.apply()
API 密钥设置¶
配置运行后续示例所需的 API 密钥。
GitHub API 令牌和 OpenAI API 密钥仅在示例「配合读取器/嵌入/分割器使用」时需要。获取 GitHub 访问令牌的说明详见此处,而 OpenAI API 密钥可在此获取。
获取 Airtrain API 密钥的步骤:
- 访问此链接创建 Airtrain 账户
- 点击左下角「设置」,进入「账单」页面注册专业版账户或开启试用
- 在「账单」页面的「Airtrain API 密钥」标签页中复制您的 API 密钥
请注意 Airtrain 试用版仅允许同时存在一个数据集。由于本笔记本会创建多个数据集,您可能需要随时在 Airtrain 界面中删除旧数据集,以便为新数据集腾出空间。
import os
os.environ["GITHUB_TOKEN"] = "<your GitHub token>"
os.environ["OPENAI_API_KEY"] = "<your OpenAi API key>"
os.environ["AIRTRAIN_API_KEY"] = "<your Airtrain API key>"
import os
import airtrain as at
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.readers.github import GithubRepositoryReader, GithubClient
下一步是配置我们的阅读器。本例中使用的是 GitHub 阅读器,但这仅用于演示目的。Airtrain 能够处理来自任何阅读器的文档,无论其原始来源如何。
github_token = os.environ.get("GITHUB_TOKEN")
github_client = GithubClient(github_token=github_token, verbose=True)
reader = GithubRepositoryReader(
github_client=github_client,
owner="sematic-ai",
repo="sematic",
use_parser=False,
verbose=False,
filter_directories=(
["docs"],
GithubRepositoryReader.FilterType.INCLUDE,
),
filter_file_extensions=(
[
".md",
],
GithubRepositoryReader.FilterType.INCLUDE,
),
)
read_kwargs = dict(branch="main")
使用阅读器查阅文档
documents = reader.load_data(**read_kwargs)
直接从文档创建数据集¶
您可以直接从这些文档创建 Airtrain 数据集,无需进行任何额外处理。这种情况下,Airtrain 会在生成进一步洞察前自动为您完成文档嵌入操作。数据集中的每一行将代表整个 Markdown 文档。Airtrain 将自动提供诸如文档语义聚类等洞察功能,让您可以通过查看涵盖相似主题的文档进行浏览,或发现可能需要移除的文档子集。
虽然基本文档检索之外的处理步骤并非必需,但系统允许此类操作。您可以在上传至 Airtrain 前,通过添加元数据、筛选文档或任意方式处理文档来增强数据价值。
result = at.upload_from_llama_nodes(
documents,
name="Sematic Docs Dataset: Whole Documents",
)
print(f"Uploaded {result.size} rows to '{result.name}'. View at: {result.url}")
Uploaded 42 rows to 'Sematic Docs Dataset: Whole Documents'. View at: https://app.airtrain.ai/dataset/7fd09dca-81b9-42b8-acc9-01ce08302b16
在分割和嵌入后创建数据集¶
如果您希望查看面向文档内部节点而非完整文档的数据集,同样可以实现。Airtrain 将自动生成分析结果,例如嵌入向量的二维 PCA 投影图,使您能够直观探索 RAG 节点检索所基于的嵌入空间。您还可以点击单行数据,查看完整 n 维嵌入空间中与其最接近的节点,从而进行深入分析。系统还将自动生成聚类结果和其他分析指标,以增强并辅助您的探索过程。
此处我们将使用 OpenAI 嵌入和 SemanticSplitterNodeParser
分割器,但您也可以选用其他任何 LlamaIndex 工具来处理节点,然后再上传至 Airtrain。您甚至可以完全跳过自主嵌入步骤,这种情况下 Airtrain 会代为完成节点嵌入工作。
embed_model = OpenAIEmbedding()
splitter = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
)
nodes = splitter.get_nodes_from_documents(documents)
⚠️ 注意 ⚠️:如果您正在试用 Airtrain 且已创建过完整文档数据集,在上传新数据集前需先删除原有数据集。
result = at.upload_from_llama_nodes(
nodes,
name="Sematic Docs, split + embedded",
)
print(f"Uploaded {result.size} rows to {result.name}. View at: {result.url}")
Uploaded 137 rows to Sematic Docs, split + embedded. View at: https://app.airtrain.ai/dataset/ebec9bcc-6ed8-4165-a0de-29bef740c70b
示例 2:使用 Workflow API¶
由于文档(documents)和节点(nodes)是 Airtrain 集成功能所处理的核心抽象概念,而这些抽象概念在 LlamaIndex 的工作流 API 中也是共通的,因此您也可以将 Airtrain 作为更广泛工作流的一部分来使用。此处我们将通过抓取几个 Hacker News 评论主题来演示用法,但需要再次说明的是,您并不局限于网页抓取工作流——任何能生成文档或节点的工作流都适用。
import asyncio
from llama_index.core.schema import Node
from llama_index.core.workflow import (
Context,
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.readers.web import AsyncWebPageReader
from airtrain import DatasetMetadata, upload_from_llama_nodes
指定我们将要抓取的评论线程。本示例中的特定评论线程位于或接近2024年9月30日的首页。如果您希望从Hacker News以外的页面获取内容,请注意某些网站的内容是通过客户端渲染的,这种情况下您可能需要使用类似WholeSiteReader
这样的阅读器,它通过无头Chrome驱动在返回文档前先渲染页面。为简化操作,此处我们将使用一个服务端渲染HTML的页面。
URLS = [
"https://news.ycombinator.com/item?id=41694044",
"https://news.ycombinator.com/item?id=41696046",
"https://news.ycombinator.com/item?id=41693087",
"https://news.ycombinator.com/item?id=41695756",
"https://news.ycombinator.com/item?id=41666269",
"https://news.ycombinator.com/item?id=41697137",
"https://news.ycombinator.com/item?id=41695840",
"https://news.ycombinator.com/item?id=41694712",
"https://news.ycombinator.com/item?id=41690302",
"https://news.ycombinator.com/item?id=41695076",
"https://news.ycombinator.com/item?id=41669747",
"https://news.ycombinator.com/item?id=41694504",
"https://news.ycombinator.com/item?id=41697032",
"https://news.ycombinator.com/item?id=41694025",
"https://news.ycombinator.com/item?id=41652935",
"https://news.ycombinator.com/item?id=41693979",
"https://news.ycombinator.com/item?id=41696236",
"https://news.ycombinator.com/item?id=41696434",
"https://news.ycombinator.com/item?id=41688469",
"https://news.ycombinator.com/item?id=41646782",
"https://news.ycombinator.com/item?id=41689332",
"https://news.ycombinator.com/item?id=41688018",
"https://news.ycombinator.com/item?id=41668896",
"https://news.ycombinator.com/item?id=41690087",
"https://news.ycombinator.com/item?id=41679497",
"https://news.ycombinator.com/item?id=41687739",
"https://news.ycombinator.com/item?id=41686722",
"https://news.ycombinator.com/item?id=41689138",
"https://news.ycombinator.com/item?id=41691530",
]
接下来我们将定义一个基础事件,因为在 LlamaIndex 工作流中,事件是步骤间传递数据的标准方式。
class CompletedDocumentRetrievalEvent(Event):
name: str
documents: list[Node]
之后我们将定义工作流本身。在我们的案例中,工作流将包含三个步骤:第一步从网络获取文档,第二步将文档导入Airtrain,第三步完成工作流收尾。
class IngestToAirtrainWorkflow(Workflow):
@step
async def ingest_documents(
self, ctx: Context, ev: StartEvent
) -> CompletedDocumentRetrievalEvent | None:
if not ev.get("urls"):
return None
reader = AsyncWebPageReader(html_to_text=True)
documents = await reader.aload_data(urls=ev.get("urls"))
return CompletedDocumentRetrievalEvent(
name=ev.get("name"), documents=documents
)
@step
async def ingest_documents_to_airtrain(
self, ctx: Context, ev: CompletedDocumentRetrievalEvent
) -> StopEvent | None:
dataset_meta = upload_from_llama_nodes(ev.documents, name=ev.name)
return StopEvent(result=dataset_meta)
由于工作流 API 将异步代码视为一等公民,我们将定义一个异步的 main
函数来驱动工作流。
async def main() -> None:
workflow = IngestToAirtrainWorkflow()
result = await workflow.run(
name="My HN Discussions Dataset",
urls=URLS,
)
print(
f"Uploaded {result.size} rows to {result.name}. View at: {result.url}"
)
最后,我们将使用 asyncio 事件循环来执行异步主程序。
⚠️ 注意 ⚠️:如果您正在使用 Airtrain 试用版且已运行过上述示例,在上传新数据集前需要先删除已生成的数据集。
asyncio.run(main()) # actually run the main & the workflow
error fetching page from https://news.ycombinator.com/item?id=41693087 error fetching page from https://news.ycombinator.com/item?id=41666269 error fetching page from https://news.ycombinator.com/item?id=41697137 error fetching page from https://news.ycombinator.com/item?id=41697032 error fetching page from https://news.ycombinator.com/item?id=41652935 error fetching page from https://news.ycombinator.com/item?id=41696434 error fetching page from https://news.ycombinator.com/item?id=41688469 error fetching page from https://news.ycombinator.com/item?id=41646782 error fetching page from https://news.ycombinator.com/item?id=41668896
Uploaded 20 rows to My HN Discussions Dataset. View at: https://app.airtrain.ai/dataset/bd330f0a-6ff1-4e51-9fe2-9900a1a42308