LongRAG 工作流程¶
本笔记本展示如何使用 LlamaIndex 工作流实现 LongRAG。
import nest_asyncio
nest_asyncio.apply()
%pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
!wget https://github.com/user-attachments/files/16474262/data.zip -O data.zip
!unzip -o data.zip
!rm data.zip
由于工作流默认采用异步优先模式,这些代码在笔记本环境中可以顺畅运行。若要在自有代码中执行,当不存在已启动的异步事件循环时,需使用 asyncio.run()
来启动:
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
辅助函数¶
这些辅助函数将帮助我们实现文档分块,并根据节点间的关联关系进行分组。
from typing import List, Dict, Optional, Set, FrozenSet
from llama_index.core.schema import BaseNode, TextNode
from llama_index.core.node_parser import SentenceSplitter
# constants
DEFAULT_CHUNK_SIZE = 4096 # optionally splits documents into CHUNK_SIZE, then regroups them to demonstrate grouping algorithm
DEFAULT_MAX_GROUP_SIZE = 20 # maximum number of documents in a group
DEFAULT_SMALL_CHUNK_SIZE = 512 # small chunk size for generating embeddings
DEFAULT_TOP_K = 8 # top k for retrieving
def split_doc(chunk_size: int, documents: List[BaseNode]) -> List[TextNode]:
"""Splits documents into smaller pieces.
Args:
chunk_size (int): Chunk size
documents (List[BaseNode]): Documents
Returns:
List[TextNode]: Smaller chunks
"""
# split docs into tokens
text_parser = SentenceSplitter(chunk_size=chunk_size)
return text_parser.get_nodes_from_documents(documents)
def group_docs(
nodes: List[str],
adj: Dict[str, List[str]],
max_group_size: Optional[int] = DEFAULT_MAX_GROUP_SIZE,
) -> Set[FrozenSet[str]]:
"""Groups documents.
Args:
nodes (List[str]): documents IDs
adj (Dict[str, List[str]]): related documents for each document; id -> list of doc strings
max_group_size (Optional[int], optional): max group size, None if no max group size. Defaults to DEFAULT_MAX_GROUP_SIZE.
"""
docs = sorted(nodes, key=lambda node: len(adj[node]))
groups = set() # set of set of IDs
for d in docs:
related_groups = set()
for r in adj[d]:
for g in groups:
if r in g:
related_groups = related_groups.union(frozenset([g]))
gnew = {d}
related_groupsl = sorted(related_groups, key=lambda el: len(el))
for g in related_groupsl:
if max_group_size is None or len(gnew) + len(g) <= max_group_size:
gnew = gnew.union(g)
if g in groups:
groups.remove(g)
groups.add(frozenset(gnew))
return groups
def get_grouped_docs(
nodes: List[TextNode],
max_group_size: Optional[int] = DEFAULT_MAX_GROUP_SIZE,
) -> List[TextNode]:
"""Gets list of documents that are grouped.
Args:
nodes (t.List[TextNode]): Input list
max_group_size (Optional[int], optional): max group size, None if no max group size. Defaults to DEFAULT_MAX_GROUP_SIZE.
Returns:
t.List[TextNode]: Output list
"""
# node IDs
nodes_str = [node.id_ for node in nodes]
# maps node ID -> related node IDs based on that node's relationships
adj: Dict[str, List[str]] = {
node.id_: [val.node_id for val in node.relationships.values()]
for node in nodes
}
# node ID -> node
nodes_dict = {node.id_: node for node in nodes}
res = group_docs(nodes_str, adj, max_group_size)
ret_nodes = []
for g in res:
cur_node = TextNode()
for node_id in g:
cur_node.text += nodes_dict[node_id].text + "\n\n"
cur_node.metadata.update(nodes_dict[node_id].metadata)
ret_nodes.append(cur_node)
return ret_nodes
构建检索器¶
LongRAG需要一个自定义检索器,其实现如下所示:
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.vector_stores.simple import BasePydanticVectorStore
from llama_index.core.schema import QueryBundle, NodeWithScore
from llama_index.core.vector_stores.types import VectorStoreQuery
from llama_index.core.settings import Settings
class LongRAGRetriever(BaseRetriever):
"""Long RAG Retriever."""
def __init__(
self,
grouped_nodes: List[TextNode],
small_toks: List[TextNode],
vector_store: BasePydanticVectorStore,
similarity_top_k: int = DEFAULT_TOP_K,
) -> None:
"""Constructor.
Args:
grouped_nodes (List[TextNode]): Long retrieval units, nodes with docs grouped together based on relationships
small_toks (List[TextNode]): Smaller tokens
embed_model (BaseEmbedding, optional): Embed model. Defaults to None.
similarity_top_k (int, optional): Similarity top k. Defaults to 8.
"""
self._grouped_nodes = grouped_nodes
self._grouped_nodes_dict = {node.id_: node for node in grouped_nodes}
self._small_toks = small_toks
self._small_toks_dict = {node.id_: node for node in self._small_toks}
self._similarity_top_k = similarity_top_k
self._vec_store = vector_store
self._embed_model = Settings.embed_model
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
"""Retrieves.
Args:
query_bundle (QueryBundle): query bundle
Returns:
List[NodeWithScore]: nodes with scores
"""
# make query
query_embedding = self._embed_model.get_query_embedding(
query_bundle.query_str
)
vector_store_query = VectorStoreQuery(
query_embedding=query_embedding, similarity_top_k=500
)
# query for answer
query_res = self._vec_store.query(vector_store_query)
# determine top parents of most similar children (these are long retrieval units)
top_parents_set: Set[str] = set()
top_parents: List[NodeWithScore] = []
for id_, similarity in zip(query_res.ids, query_res.similarities):
cur_node = self._small_toks_dict[id_]
parent_id = cur_node.ref_doc_id
if parent_id not in top_parents_set:
top_parents_set.add(parent_id)
parent_node = self._grouped_nodes_dict[parent_id]
node_with_score = NodeWithScore(
node=parent_node, score=similarity
)
top_parents.append(node_with_score)
if len(top_parents_set) >= self._similarity_top_k:
break
assert len(top_parents) == min(
self._similarity_top_k, len(self._grouped_nodes)
)
return top_parents
设计工作流¶
LongRAG 包含以下步骤:
- 数据摄取——将文档分组并放入长检索单元,将长检索单元拆分为更小的标记以生成嵌入向量,并对小节点建立索引。
- 构建检索器与查询引擎。
- 根据给定字符串查询数据。
我们定义了一个事件,该事件将长检索单元和小检索单元传入检索器与查询引擎。
from typing import Iterable
from llama_index.core import VectorStoreIndex
from llama_index.core.llms import LLM
from llama_index.core.workflow import Event
class LoadNodeEvent(Event):
"""Event for loading nodes."""
small_nodes: Iterable[TextNode]
grouped_nodes: list[TextNode]
index: VectorStoreIndex
similarity_top_k: int
llm: LLM
定义事件后,我们可以编写工作流和步骤:
from llama_index.core.workflow import (
Workflow,
step,
StartEvent,
StopEvent,
Context,
)
from llama_index.core import SimpleDirectoryReader
from llama_index.core.query_engine import RetrieverQueryEngine
class LongRAGWorkflow(Workflow):
"""Long RAG Workflow."""
@step
async def ingest(self, ev: StartEvent) -> LoadNodeEvent | None:
"""Ingestion step.
Args:
ctx (Context): Context
ev (StartEvent): start event
Returns:
StopEvent | None: stop event with result
"""
data_dir: str = ev.get("data_dir")
llm: LLM = ev.get("llm")
chunk_size: int | None = ev.get("chunk_size")
similarity_top_k: int = ev.get("similarity_top_k")
small_chunk_size: int = ev.get("small_chunk_size")
index: VectorStoreIndex | None = ev.get("index")
index_kwargs: dict[str, t.Any] | None = ev.get("index_kwargs")
if any(
i is None
for i in [data_dir, llm, similarity_top_k, small_chunk_size]
):
return None
if not index:
docs = SimpleDirectoryReader(data_dir).load_data()
if chunk_size is not None:
nodes = split_doc(
chunk_size, docs
) # split documents into chunks of chunk_size
grouped_nodes = get_grouped_docs(
nodes
) # get list of nodes after grouping (groups are combined into one node), these are long retrieval units
else:
grouped_nodes = docs
# split large retrieval units into smaller nodes
small_nodes = split_doc(small_chunk_size, grouped_nodes)
index_kwargs = index_kwargs or {}
index = VectorStoreIndex(small_nodes, **index_kwargs)
else:
# get smaller nodes from index and form large retrieval units from these nodes
small_nodes = index.docstore.docs.values()
grouped_nodes = get_grouped_docs(small_nodes, None)
return LoadNodeEvent(
small_nodes=small_nodes,
grouped_nodes=grouped_nodes,
index=index,
similarity_top_k=similarity_top_k,
llm=llm,
)
@step
async def make_query_engine(
self, ctx: Context, ev: LoadNodeEvent
) -> StopEvent:
"""Query engine construction step.
Args:
ctx (Context): context
ev (LoadNodeEvent): event
Returns:
StopEvent: stop event
"""
# make retriever and query engine
retriever = LongRAGRetriever(
grouped_nodes=ev.grouped_nodes,
small_toks=ev.small_nodes,
similarity_top_k=ev.similarity_top_k,
vector_store=ev.index.vector_store,
)
query_eng = RetrieverQueryEngine.from_args(retriever, ev.llm)
return StopEvent(
result={
"retriever": retriever,
"query_engine": query_eng,
"index": ev.index,
}
)
@step
async def query(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
"""Query step.
Args:
ctx (Context): context
ev (StartEvent): start event
Returns:
StopEvent | None: stop event with result
"""
query_str: str | None = ev.get("query_str")
query_eng = ev.get("query_eng")
if query_str is None:
return None
result = query_eng.query(query_str)
return StopEvent(result=result)
操作流程:
- 存在两个入口点:一个用于文档摄取与索引构建,另一个用于查询处理。
- 摄取流程首先读取文档,将其分割为更小的节点并进行索引。完成后发送
LoadNodeEvent
事件,触发make_query_engine
的执行,从而基于这些节点构建检索器和查询引擎。最终返回检索器实例、查询引擎实例以及索引结构。 - 查询流程接收来自
StartEvent
的查询请求,将其输入上下文中的查询引擎,并返回查询结果。 - 上下文用于存储查询引擎实例。
运行工作流¶
from llama_index.llms.openai import OpenAI
wf = LongRAGWorkflow(timeout=60)
llm = OpenAI("gpt-4o")
data_dir = "data"
# initialize the workflow
result = await wf.run(
data_dir=data_dir,
llm=llm,
chunk_size=DEFAULT_CHUNK_SIZE,
similarity_top_k=DEFAULT_TOP_K,
small_chunk_size=DEFAULT_SMALL_CHUNK_SIZE,
)
from IPython.display import display, Markdown
# run a query
res = await wf.run(
query_str="How can Pittsburgh become a startup hub, and what are the two types of moderates?",
query_eng=result["query_engine"],
)
display(Markdown(str(res)))
Pittsburgh can become a startup hub by leveraging its increasing population of young people, particularly those aged 25 to 29, who are crucial for the startup ecosystem. The city should encourage the youth-driven food boom, preserve historic buildings, and make the city more bicycle and pedestrian-friendly. Additionally, Carnegie Mellon University (CMU) should focus on being an even better research university to attract ambitious talent. The city should also foster a culture of tolerance and gradually build an investor community.
There are two types of moderates: intentional moderates and accidental moderates. Intentional moderates deliberately choose positions midway between the extremes of right and left, while accidental moderates form their opinions independently on each issue, resulting in a broad range of views that average to a moderate position.