使用 Delphic 构建全栈 LlamaIndex Web 应用的指南#
本指南将引导您使用 Delphic 这个生产就绪的 Web 应用启动模板与 LlamaIndex 进行集成。所有代码示例均可在 Delphic 代码库中找到。
我们将构建什么#
以下是 Delphic 开箱即用功能的快速演示:
https://user-images.githubusercontent.com/5049984/233236432-aa4980b6-a510-42f3-887a-81485c9644e6.mp4
架构概述#
Delphic 利用 LlamaIndex Python 库,让用户能够创建自己的文档集合,并通过响应式前端进行查询。
我们选择的技术栈提供了响应迅速、健壮的混合技术方案,能够:(1) 编排复杂的 Python 处理任务,(2) 提供现代化响应式前端,(3) 构建可扩展的安全后端。
核心技术栈包括:
- Django
- Django Channels
- Django Ninja
- Redis
- Celery
- LlamaIndex
- Langchain
- React
- Docker & Docker Compose
得益于这个基于超级稳定的 Django Web 框架构建的现代化技术栈,Delphic 启动应用提供了流畅的开发体验,内置认证和用户管理,异步向量存储处理,以及基于 WebSocket 的查询连接以实现响应式 UI。此外,我们的前端使用 TypeScript 构建,基于 MUI React 框架,提供了响应式现代化用户界面。
系统要求#
Celery 无法在 Windows 上运行。虽然可能通过 Windows Subsystem for Linux 部署,但相关配置已超出本教程范围。因此我们建议仅在 Linux 或 OSX 系统上跟随本教程操作。部署应用需要安装 Docker 和 Docker Compose。本地开发需要 node 版本管理器 (nvm)。
Django 后端#
项目目录概览#
Delphic 应用采用符合 Django 项目惯例的结构化后端目录组织。在代码库根目录的 ./delphic 子文件夹中,主要包含:
contrib:包含对 Django 内置contrib应用的自定义修改或扩展-
indexes:包含与文档索引和 LLM 集成相关的核心功能,具体包括: -
admin.py:应用的 Django 管理配置 apps.py:应用配置models.py:包含应用的数据库模型migrations:包含应用数据库模式迁移的目录signals.py:定义应用的信号-
tests.py:应用单元测试 -
tasks:包含使用 Celery 进行异步处理的任务。index_tasks.py文件包含创建向量索引的任务 users:专用于用户管理的目录utils:包含应用通用的工具模块和函数,如自定义存储后端、路径辅助工具和集合相关实用程序
数据库模型#
Delphic 应用有两个核心模型:Document 和 Collection。这些模型代表了应用在使用 LLM 索引和查询文档时处理的核心实体。它们定义在 ./delphic/indexes/models.py。
-
Collection: -
api_key:外键,将集合与 API 密钥关联,帮助将任务与源 API 密钥关联 title:字符字段,提供集合标题description:文本字段,提供集合描述status:字符字段,存储集合的处理状态,使用CollectionStatus枚举created:日期时间字段,记录集合创建时间modified:日期时间字段,记录集合最后修改时间model:文件字段,存储与集合关联的模型-
processing:布尔字段,指示集合当前是否正在处理 -
Document: -
collection:外键,将文档与集合关联,表示文档与集合的关系 file:文件字段,存储上传的文档文件description:文本字段,提供文档描述created:日期时间字段,记录文档创建时间modified:日期时间字段,记录文档最后修改时间
这些模型为文档集合及其通过 LlamaIndex 创建的索引提供了坚实基础。
Django Ninja API#
Django Ninja 是一个基于 Django 和 Python 3.7+ 类型提示的 Web API 框架。它通过 Python 类型提示自动生成输入验证、序列化和文档,提供了一种简单直观的 API 端点定义方式。
在 Delphic 代码库中,./config/api/endpoints.py 文件包含了 API 路由和端点逻辑。以下是 endpoints.py 文件中各端点的功能说明:
-
/heartbeat:一个简单的 GET 端点,用于检查 API 是否正常运行。若 API 可访问则返回True。该端点对 Kubernetes 部署特别有用,可用于容器健康检查。 -
/collections/create:POST 端点,用于创建新的Collection。接收表单参数如title、description和文件列表files。创建新集合并为每个文件生成Document实例,同时调度 Celery 任务创建索引。
@collections_router.post("/create")
async def create_collection(
request,
title: str = Form(...),
description: str = Form(...),
files: list[UploadedFile] = File(...),
):
key = None if getattr(request, "auth", None) is None else request.auth
if key is not None:
key = await key
collection_instance = Collection(
api_key=key,
title=title,
description=description,
status=CollectionStatusEnum.QUEUED,
)
await sync_to_async(collection_instance.save)()
for uploaded_file in files:
doc_data = uploaded_file.file.read()
doc_file = ContentFile(doc_data, uploaded_file.name)
document = Document(collection=collection_instance, file=doc_file)
await sync_to_async(document.save)()
create_index.si(collection_instance.id).apply_async()
return await sync_to_async(CollectionModelSchema)(...)
/collections/query:POST 端点,用于通过 LLM 查询文档集合。接收包含collection_id和query_str的 JSON 载荷,返回查询结果。虽然聊天 GUI 中实际使用 WebSocket(见下文),但可通过该 REST 端点构建应用查询特定集合。
@collections_router.post(
"/query",
response=CollectionQueryOutput,
summary="Ask a question of a document collection",
)
def query_collection_view(
request: HttpRequest, query_input: CollectionQueryInput
):
collection_id = query_input.collection_id
query_str = query_input.query_str
response = query_collection(collection_id, query_str)
return {"response": response}
/collections/available:GET 端点,返回用户 API 密钥创建的所有集合列表。输出通过CollectionModelSchema序列化。
@collections_router.get(
"/available",
response=list[CollectionModelSchema],
summary="Get a list of all of the collections created with my api_key",
)
async def get_my_collections_view(request: HttpRequest):
key = None if getattr(request, "auth", None) is None else request.auth
if key is not None:
key = await key
collections = Collection.objects.filter(api_key=key)
return [{...} async for collection in collections]
/collections/{collection_id}/add_file:POST 端点,向现有集合添加文件。接收路径参数collection_id和表单参数file、description,将文件添加为关联该集合的Document实例。
@collections_router.post(
"/{collection_id}/add_file", summary="Add a file to a collection"
)
async def add_file_to_collection(
request,
collection_id: int,
file: UploadedFile = File(...),
description: str = Form(...),
):
collection = await sync_to_async(Collection.objects.get)(id=collection_id)
WebSocket 简介#
WebSocket 是一种通信协议,支持客户端与服务器通过单一长连接进行双向全双工通信。该协议设计为通过 HTTP/HTTPS 相同端口(80/443)工作,并使用类似握手流程建立连接。连接建立后,数据可以"帧"形式双向传输,无需像传统 HTTP 请求那样重复建立连接。
使用 WebSocket 的优势包括(尤其适用于需要长时间加载但运行快速的代码场景):
- 性能:消除多次连接开销,降低延迟
- 效率:支持实时通信,无需轮询,资源利用率更高
- 扩展性:可处理大量并发连接,适合高并发应用
Delphic 应用采用 WebSocket 是因为 LLM 模型加载内存成本高。通过 WebSocket 连接可保持模型常驻内存,使后续请求无需重复加载即可快速处理。
ASGI 配置文件 ./config/asgi.py 使用 Django Channels 的 ProtocolTypeRouter 按协议类型路由连接。这里定义了两类协议:
- "http":使用标准 Django ASGI 应用处理 HTTP 请求
- "websocket":通过自定义 TokenAuthMiddleware 认证连接,其内部 URLRouter 定义了处理文档集合查询的 CollectionQueryConsumer WebSocket 路由
application = ProtocolTypeRouter(
{
"http": get_asgi_application(),
"websocket": TokenAuthMiddleware(
URLRouter(
[
re_path(
r"ws/collections/(?P<collection_id>\w+)/query/$",
CollectionQueryConsumer.as_asgi(),
),
]
)
),
}
)
该配置允许客户端通过 WebSocket 高效查询文档集合,无需为每个请求重新加载模型。
WebSocket 处理器#
config/api/websockets/queries.py 中的 CollectionQueryConsumer 类继承自 Django Channels 的 AsyncWebsocketConsumer,负责处理文档集合查询的 WebSocket 连接,包含三个核心方法:
connect:WebSocket 握手时调用disconnect:连接关闭时调用receive:接收到消息时调用
WebSocket 连接监听器#
connect 方法负责建立连接,从路径提取集合 ID,加载集合模型并接受连接:
async def connect(self):
try:
self.collection_id = extract_connection_id(self.scope["path"])
self.index = await load_collection_model(self.collection_id)
await self.accept()
except ValueError as e:
await self.accept()
await self.close(code=4000)
except Exception as e:
pass
WebSocket 断开监听器#
当前 disconnect 方法为空实现,因为连接关闭时无需执行额外操作。
WebSocket 接收监听器#
receive 方法负责处理来自 WebSocket 的传入消息。它会接收传入消息,进行解码,然后使用提供的查询语句查询已加载的集合模型。随后将响应格式化为 Markdown 字符串,并通过 WebSocket 连接发送回客户端。
async def receive(self, text_data):
text_data_json = json.loads(text_data)
if self.index is not None:
query_str = text_data_json["query"]
modified_query_str = f"Please return a nicely formatted markdown string to this request:\n\n{query_str}"
query_engine = self.index.as_query_engine()
response = query_engine.query(modified_query_str)
markdown_response = f"## Response\n\n{response}\n\n"
if response.source_nodes:
markdown_sources = (
f"## Sources\n\n{response.get_formatted_sources()}"
)
else:
markdown_sources = ""
formatted_response = f"{markdown_response}{markdown_sources}"
await self.send(json.dumps({"response": formatted_response}, indent=4))
else:
await self.send(
json.dumps(
{"error": "No index loaded for this connection."}, indent=4
)
)
要加载集合模型,需使用 load_collection_model 函数,该函数位于 delphic/utils/collections.py。此函数会获取具有给定集合 ID 的集合对象,检查是否存在该集合模型的 JSON 文件,若不存在则创建。随后设置 LLM 和 Settings,并使用缓存文件加载 VectorStoreIndex。
from llama_index.core import Settings
async def load_collection_model(collection_id: str | int) -> VectorStoreIndex:
"""
从缓存或数据库加载集合模型,并返回索引。
参数:
collection_id (Union[str, int]): 集合模型实例的 ID。
返回:
VectorStoreIndex: 加载的索引。
此函数执行以下步骤:
1. 获取具有给定 collection_id 的 Collection 对象。
2. 检查是否存在名为 '/cache/model_{collection_id}.json' 的 JSON 文件。
3. 若 JSON 文件不存在,则从 Collection.model 的 FileField 加载 JSON 并保存到 '/cache/model_{collection_id}.json'。
4. 使用 cache_file_path 调用 VectorStoreIndex.load_from_disk。
"""
# 获取 Collection 对象
collection = await Collection.objects.aget(id=collection_id)
logger.info(f"load_collection_model() - loaded collection {collection_id}")
# 确保存在模型
if collection.model.name:
logger.info("load_collection_model() - Setup local json index file")
# 检查 JSON 文件是否存在
cache_dir = Path(settings.BASE_DIR) / "cache"
cache_file_path = cache_dir / f"model_{collection_id}.json"
if not cache_file_path.exists():
cache_dir.mkdir(parents=True, exist_ok=True)
with collection.model.open("rb") as model_file:
with cache_file_path.open(
"w+", encoding="utf-8"
) as cache_file:
cache_file.write(model_file.read().decode("utf-8"))
# 定义 LLM
logger.info(
f"load_collection_model() - Setup Settings with tokens {settings.MAX_TOKENS} and "
f"model {settings.MODEL_NAME}"
)
Settings.llm = OpenAI(
temperature=0, model="gpt-3.5-turbo", max_tokens=512
)
# 调用 VectorStoreIndex.load_from_disk
logger.info("load_collection_model() - Load llama index")
index = VectorStoreIndex.load_from_disk(
cache_file_path,
)
logger.info(
"load_collection_model() - Llamaindex loaded and ready for query..."
)
else:
logger.error(
f"load_collection_model() - collection {collection_id} has no model!"
)
raise ValueError("No model exists for this collection!")
return index
React 前端#
概述#
我们选择在 Delphic 项目的前端中使用 TypeScript、React 和 Material-UI (MUI),原因如下:首先,作为最受欢迎的前端框架(React)的最受欢迎的组件库(MUI),这一选择使得该项目能够面向庞大的开发者社区。其次,React 目前是一个稳定且普遍受欢迎的框架,它通过虚拟 DOM 提供了有价值的抽象,同时仍然相对稳定,并且在我们看来相当容易学习,这再次提高了其可访问性。
前端项目结构#
前端代码位于仓库的 /frontend 目录中,React 相关组件位于 /frontend/src。你会注意到 frontend 目录中有一个 DockerFile 和几个与配置前端 Web 服务器相关的文件夹和文件——nginx。
/frontend/src/App.tsx 文件是应用程序的入口点。它定义了主要组件,如登录表单、抽屉布局和集合创建模态框。这些主要组件会根据用户是否登录以及是否有认证令牌来条件渲染。
DrawerLayout2 组件定义在 DrawerLayour2.tsx 文件中。该组件管理应用程序的布局,并提供导航和主要内容区域。
由于应用程序相对简单,我们可以不使用复杂的状态管理解决方案(如 Redux),而仅使用 React 的 useState 钩子。
从后端获取集合#
登录用户的可用集合会在 DrawerLayout2 组件中获取并显示。该过程可分为以下步骤:
- 初始化状态变量:
const [collections, setCollections] = useState<CollectionModelSchema[]>([]);
const [loading, setLoading] = useState(true);
这里我们初始化了两个状态变量:collections 用于存储集合列表,loading 用于跟踪集合是否正在获取中。
- 通过
fetchCollections()函数为登录用户获取集合:
const
fetchCollections = async () = > {
try {
const accessToken = localStorage.getItem("accessToken");
if (accessToken) {
const response = await getMyCollections(accessToken);
setCollections(response.data);
}
} catch (error) {
console.error(error);
} finally {
setLoading(false);
}
};
fetchCollections 函数通过调用 getMyCollections API 函数并传入用户的访问令牌,获取登录用户的集合。随后用获取的数据更新 collections 状态,并将 loading 状态设为 false 以表示获取完成。
显示集合#
最新集合会以如下方式显示在抽屉中:
< List >
{collections.map((collection) = > (
< div key={collection.id} >
< ListItem disablePadding >
< ListItemButton
disabled={
collection.status != = CollectionStatus.COMPLETE | |
!collection.has_model
}
onClick={() = > handleCollectionClick(collection)}
selected = {
selectedCollection & &
selectedCollection.id == = collection.id
}
>
< ListItemText
primary = {collection.title} / >
{collection.status == = CollectionStatus.RUNNING ? (
< CircularProgress
size={24}
style={{position: "absolute", right: 16}}
/ >
): null}
< / ListItemButton >
< / ListItem >
< / div >
))}
< / List >
你会注意到集合的 ListItemButton 的 disabled 属性会根据集合状态是否为 CollectionStatus.COMPLETE 或集合是否有模型(!collection.has_model)来设置。如果任一条件为真,按钮将被禁用,防止用户选择未完成或无模型的集合。当 CollectionStatus 为 RUNNING 时,我们还会在按钮上显示一个加载转轮。
在单独的 useEffect 钩子中,我们会检查 collections 状态中是否有集合的状态为 CollectionStatus.RUNNING 或 CollectionStatus.QUEUED。如果有,我们会设置一个间隔,每 15 秒(15000 毫秒)重复调用 fetchCollections 函数以更新集合状态。这样,应用程序会定期检查已完成的集合,并在处理完成后相应地更新 UI。
useEffect(() = > {
let
interval: NodeJS.Timeout;
if (
collections.some(
(collection) = >
collection.status == = CollectionStatus.RUNNING | |
collection.status == = CollectionStatus.QUEUED
)
) {
interval = setInterval(() = > {
fetchCollections();
}, 15000);
}
return () = > clearInterval(interval);
}, [collections]);
聊天视图组件#
frontend/src/chat/ChatView.tsx 中的 ChatView 组件负责处理和显示用户与集合交互的聊天界面。该组件会建立 WebSocket 连接以实时与服务器通信,发送和接收消息。
ChatView 组件的关键功能包括:
- 建立并管理与服务器的 WebSocket 连接。
- 以聊天形式显示用户和服务器的消息。
- 处理用户输入以向服务器发送消息。
- 根据从服务器接收的消息更新消息状态和 UI。
- 显示连接状态和错误,例如加载消息、连接服务器或加载集合时遇到的错误。
所有这些功能共同为用户提供了与所选集合交互的非常流畅、低延迟的体验。
聊天 WebSocket 客户端#
ChatView 组件中的 WebSocket 连接用于建立客户端与服务器之间的实时通信。WebSocket 连接的建立和管理方式如下:
首先初始化 WebSocket 引用:
const websocket = useRef<WebSocket | null>(null);
使用 useRef 创建了一个 websocket 引用,该引用将保存用于通信的 WebSocket 对象。useRef 是 React 中的一个钩子,允许创建在多次渲染间持久存在的可变引用对象。当需要持有对可变对象(如 WebSocket 连接)的引用而不引发不必要的重新渲染时,这个特性尤其有用。
在 ChatView 组件中,WebSocket 连接需要在组件生命周期内建立并保持,且连接状态变化时不应触发重新渲染。通过使用 useRef,可以确保 WebSocket 连接作为引用保存,组件仅在发生实际状态变更(如更新消息或显示错误)时才会重新渲染。
setupWebsocket 函数负责建立 WebSocket 连接并设置事件处理器来处理不同的 WebSocket 事件。
完整的 setupWebsocket 函数如下:
const setupWebsocket = () => {
setConnecting(true);
// 此处创建新的 WebSocket 对象,URL 包含所选集合的 ID 和用户认证令牌
websocket.current = new WebSocket(
`ws://localhost:8000/ws/collections/${selectedCollection.id}/query/?token=${authToken}`,
);
websocket.current.onopen = (event) => {
//...
};
websocket.current.onmessage = (event) => {
//...
};
websocket.current.onclose = (event) => {
//...
};
websocket.current.onerror = (event) => {
//...
};
return () => {
websocket.current?.close();
};
};
注意我们在多处根据 WebSocket 客户端的信息触发了 GUI 更新。
当组件首次打开并尝试建立连接时,会触发 onopen 监听器。在回调中,组件更新状态以反映连接已建立,清除之前的错误,并标记没有消息等待响应:
websocket.current.onopen = (event) => {
setError(false);
setConnecting(false);
setAwaitingMessage(false);
console.log("WebSocket connected:", event);
};
当通过 WebSocket 连接从服务器接收到新消息时,会触发 onmessage。在回调中,解析接收到的数据,并用服务器的新消息更新 messages 状态:
websocket.current.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("WebSocket message received:", data);
setAwaitingMessage(false);
if (data.response) {
// 用服务器的新消息更新消息状态
setMessages((prevMessages) => [
...prevMessages,
{
sender_id: "server",
message: data.response,
timestamp: new Date().toLocaleTimeString(),
},
]);
}
};
当 WebSocket 连接关闭时,会触发 onclose。在回调中,组件检查特定的关闭代码(4000)以显示警告提示,并相应更新组件状态。同时记录关闭事件:
websocket.current.onclose = (event) => {
if (event.code === 4000) {
toast.warning(
"所选集合的模型不可用。是否已正确创建?",
);
setError(true);
setConnecting(false);
setAwaitingMessage(false);
}
console.log("WebSocket closed:", event);
};
最后,当 WebSocket 连接发生错误时,会触发 onerror。在回调中,组件更新状态以反映错误,并记录错误事件:
websocket.current.onerror = (event) => {
setError(true);
setConnecting(false);
setAwaitingMessage(false);
console.error("WebSocket error:", event);
};
渲染聊天消息#
在 ChatView 组件中,布局通过 CSS 样式和 Material-UI 组件确定。主布局采用 flex 显示和列向 flexDirection,确保容器内容垂直排列。
布局包含三个主要部分:
- 聊天消息区域:占据大部分可用空间,显示用户与服务器交换的消息列表。设置
overflow-y为auto,允许内容溢出时滚动。每条消息使用ChatMessage组件渲染,等待服务器响应时使用ChatMessageLoading组件显示加载状态。 - 分隔线:使用 Material-UI 的
Divider组件分隔消息区域和输入区域,形成清晰的视觉区分。 - 输入区域:位于底部,允许用户输入和发送消息。包含 Material-UI 的
TextField组件,设置为最多 2 行的多行输入。输入区域还包括发送消息的Button组件。用户可点击"发送"按钮或按键盘"Enter"键发送消息。
ChatView 组件接受的用户输入是用户在 TextField 中输入的文本消息。组件处理这些文本输入并通过 WebSocket 连接发送至服务器。
部署#
先决条件#
部署应用需要安装 Docker 和 Docker Compose。如果您使用 Ubuntu 或其他常见 Linux 发行版,DigitalOcean 提供 优秀的 Docker 教程 和 Docker Compose 教程。 如果这些不适用,请参考 官方 Docker 文档。
构建与部署#
本项目基于 django-cookiecutter 框架,可轻松部署至虚拟机并配置为特定域名提供 HTTPS 服务。不过相关配置较为复杂——这并非本项目自身原因,而是因为证书配置、DNS设置等本身属于较为繁琐的专题。
出于本指南的目的,我们仅演示本地运行方式。未来可能会发布生产环境部署指南。在此期间,建议先参阅 Django Cookiecutter 项目文档 作为入门参考。
本指南假设您的目标是运行应用程序以供使用。若需进行开发,通常不应使用 --profiles fullstack 参数启动 compose 堆栈,而应通过 node 开发服务器启动 React 前端。
部署步骤如下:
首先克隆仓库:
git clone https://github.com/yourusername/delphic.git
进入项目目录:
cd delphic
复制示例环境文件:
mkdir -p ./.envs/.local/
cp -a ./docs/sample_envs/local/.frontend ./frontend
cp -a ./docs/sample_envs/local/.django ./.envs/.local
cp -a ./docs/sample_envs/local/.postgres ./.envs/.local
编辑 .django 和 .postgres 配置文件,填入您的 OpenAI API 密钥并为数据库用户设置独立密码。您还可在 .django 文件中设置响应令牌上限或切换使用的 OpenAI 模型(支持 GPT4,前提是您已获得访问权限)。
使用 --profiles fullstack 参数构建 docker compose 堆栈:
sudo docker-compose --profiles fullstack -f local.yml build
fullstack 参数会指示 compose 从前端文件夹构建 docker 容器,该容器将与所有所需的后端容器一同启动。但构建生产版 React 容器耗时较长,因此不建议以此方式进行开发。开发环境配置请遵循 项目 readme.md 中的说明。
最后启动应用:
sudo docker-compose -f local.yml up
现在通过浏览器访问 localhost:3000 即可查看前端界面,开始本地使用 Delphic 应用。
使用应用#
用户设置#
当前阶段要使用本应用(未来可能会开放部分模型给未认证用户共享),您需要登录账号。可使用超级用户或普通用户,但首先需要通过控制台创建超级用户:
为何要设置 Django 超级用户? Django 超级用户拥有应用程序的所有权限,可管理系统各项功能,包括创建、修改和删除用户、数据集及其他数据。设置超级用户能让您全面掌控应用。
创建 Django 超级用户步骤:
1 执行以下命令创建超级用户:
sudo docker-compose -f local.yml run django python manage.py createsuperuser
2 根据提示输入超级用户的用户名、电子邮箱和密码。
通过 Django 管理员界面创建其他用户:
- 按照部署指南在本地启动 Delphic 应用
- 在浏览器中访问
http://localhost:8000/admin进入 Django 管理界面 - 使用先前创建的超级用户凭证登录
- 在"认证与授权"部分点击"Users"
- 点击右上角"Add user +"按钮
- 输入新用户所需信息(如用户名和密码),点击"Save"创建用户
- 如需授予新用户额外权限或设为超级用户,在用户列表中点击其用户名,滚动至"Permissions"部分进行配置后保存更改