多策略工作流与反思机制¶
本笔记本将演示一种并行尝试3种不同查询策略并选择最优方案的工作流程。
如下图所示:
- 首先评估查询质量。若判定为低质量查询,系统会发出
BadQueryEvent
事件,随后improve_query
步骤将在重试前尝试提升查询质量。此过程即为反思机制。 - 当获得合格查询后,系统会同时触发三个事件:
NaiveRAGEvent
、HighTopKEvent
和RerankEvent
。 - 每个事件都由专用处理步骤捕获,这些步骤会在同一索引上尝试不同的RAG策略。所有策略最终都会发出
ResponseEvent
事件。 judge
步骤会等待收集全部三个ResponseEvents
,经比较后最终发出最优响应作为StopEvent
。
# 使用说明
## 概述
本指南介绍如何通过iOS设备上的"快捷指令"应用自动化截图整理流程,将截图自动保存至指定相簿并按月份分类。
## 功能特点
- 自动识别截图
- 按月份创建相簿(格式:`截图 YYYY-MM`)
- 支持iOS 12及以上系统版本
- 每日自动运行(需设置自动化)
## 安装步骤
1. 获取快捷指令
- 点击下方链接安装快捷指令:
[获取快捷指令](https://www.icloud.com/shortcuts/...)
2. 添加快捷指令
- 打开链接后点击"获取快捷指令"
- 滚动到底部选择"添加不受信任的快捷指令"
## 使用说明
### 手动运行
1. 打开"快捷指令"应用
2. 点击"截图整理"快捷指令
3. 等待流程完成
### 设置自动化(推荐)
1. 打开"快捷指令"应用
2. 切换到"自动化"标签页
3. 点击"+"创建个人自动化
4. 选择特定时间(建议每日凌晨)
5. 添加操作 → 选择"运行快捷指令"
6. 选择"截图整理"快捷指令
7. 关闭"运行前询问"
## 注意事项
- 首次运行需授予相册访问权限
- 每月会自动创建新相簿
- 现有截图不会被重复移动
## 常见问题
**Q: 为什么有些截图没有被整理?**
A: 本指令仅识别标准截图(含"屏幕快照"文件名),部分应用截图可能需手动整理。
**Q: 如何修改相簿命名格式?**
A: 编辑快捷指令,修改"文本"步骤中的日期格式。
## 反馈渠道
如有问题或建议,请联系:
- 邮箱:support@example.com
- Twitter:@ScreenShotHelper
---
> 版本:1.2.0
> 最后更新:2023-03-15
> 兼容性:iOS 12+
安装依赖项¶
我们需要 LlamaIndex(用于读取 PDF 的文件阅读器)、工作流可视化工具(用于绘制上述图表)以及 OpenAI(用于数据嵌入和查询大语言模型)。
In [ ]:
Copied!
!pip install llama-index-core llama-index-llms-openai llama-index-utils-workflow llama-index-readers-file llama-index-embeddings-openai
!pip install llama-index-core llama-index-llms-openai llama-index-utils-workflow llama-index-readers-file llama-index-embeddings-openai
获取数据¶
我们使用了三份旧金山2016至2018年度的长篇幅预算报告PDF文档。
In [ ]:
Copied!
!mkdir data
!wget "https://www.dropbox.com/scl/fi/xt3squt47djba0j7emmjb/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf?rlkey=xs064cjs8cb4wma6t5pw2u2bl&dl=0" -O "data/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf"
!wget "https://www.dropbox.com/scl/fi/jvw59g5nscu1m7f96tjre/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf?rlkey=v988oigs2whtcy87ti9wti6od&dl=0" -O "data/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf"
!wget "https://www.dropbox.com/scl/fi/izknlwmbs7ia0lbn7zzyx/2018-o0181-18.pdf?rlkey=p5nv2ehtp7272ege3m9diqhei&dl=0" -O "data/2018-o0181-18.pdf"
!mkdir data
!wget "https://www.dropbox.com/scl/fi/xt3squt47djba0j7emmjb/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf?rlkey=xs064cjs8cb4wma6t5pw2u2bl&dl=0" -O "data/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf"
!wget "https://www.dropbox.com/scl/fi/jvw59g5nscu1m7f96tjre/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf?rlkey=v988oigs2whtcy87ti9wti6od&dl=0" -O "data/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf"
!wget "https://www.dropbox.com/scl/fi/izknlwmbs7ia0lbn7zzyx/2018-o0181-18.pdf?rlkey=p5nv2ehtp7272ege3m9diqhei&dl=0" -O "data/2018-o0181-18.pdf"
引入依赖项¶
现在我们将导入所有依赖项
In [ ]:
Copied!
import os
from llama_index.core import (
SimpleDirectoryReader,
VectorStoreIndex,
StorageContext,
load_index_from_storage,
)
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)
from llama_index.llms.openai import OpenAI
from llama_index.core.postprocessor.rankGPT_rerank import RankGPTRerank
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.utils.workflow import draw_all_possible_flows
import os
from llama_index.core import (
SimpleDirectoryReader,
VectorStoreIndex,
StorageContext,
load_index_from_storage,
)
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)
from llama_index.llms.openai import OpenAI
from llama_index.core.postprocessor.rankGPT_rerank import RankGPTRerank
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.utils.workflow import draw_all_possible_flows
我们还需要设置 OpenAI 密钥。
In [ ]:
Copied!
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get("openai-key")
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get("openai-key")
定义事件类¶
我们的流程会生成多种不同的事件类型。
In [ ]:
Copied!
class JudgeEvent(Event):
query: str
class BadQueryEvent(Event):
query: str
class NaiveRAGEvent(Event):
query: str
class HighTopKEvent(Event):
query: str
class RerankEvent(Event):
query: str
class ResponseEvent(Event):
query: str
response: str
class SummarizeEvent(Event):
query: str
response: str
class JudgeEvent(Event):
query: str
class BadQueryEvent(Event):
query: str
class NaiveRAGEvent(Event):
query: str
class HighTopKEvent(Event):
query: str
class RerankEvent(Event):
query: str
class ResponseEvent(Event):
query: str
response: str
class SummarizeEvent(Event):
query: str
response: str
定义工作流程¶
这是我们工作流程的核心内容,让我们逐步解析:
load_or_create_index
是一个标准的 RAG 函数,它会从磁盘读取 PDF 文件并建立索引(如果尚未建立)。若索引已存在,则直接从磁盘恢复现有索引。judge_query
执行多项操作- 初始化 LLM 并调用
load_or_create_index
完成准备工作。这些对象会被存储在上下文中供后续使用 - 评估查询质量
- 若查询质量差,则触发
BadQueryEvent
- 若查询质量好,则依次触发
NaiveRAGEvent
、HighTopKEvent
和RerankerEvent
- 初始化 LLM 并调用
improve_query
接收BadQueryEvent
,尝试通过 LLM 对查询进行扩展和消歧处理(如可行),然后重新跳转至judge_query
naive_rag
、high_top_k
和rerank
分别处理对应事件,尝试三种不同的 RAG 策略。每个策略都会触发携带结果的ResponseEvent
,并通过source
参数标明所采用的策略judge
会在每次ResponseEvent
触发时执行,但通过collect_events
缓冲事件直至收集完所有三种响应。随后将响应发送给 LLM 进行"最佳"选择,最终以StopEvent
形式发出最优响应
In [ ]:
Copied!
class ComplicatedWorkflow(Workflow):
def load_or_create_index(self, directory_path, persist_dir):
# Check if the index already exists
if os.path.exists(persist_dir):
print("Loading existing index...")
# Load the index from disk
storage_context = StorageContext.from_defaults(
persist_dir=persist_dir
)
index = load_index_from_storage(storage_context)
else:
print("Creating new index...")
# Load documents from the specified directory
documents = SimpleDirectoryReader(directory_path).load_data()
# Create a new index from the documents
index = VectorStoreIndex.from_documents(documents)
# Persist the index to disk
index.storage_context.persist(persist_dir=persist_dir)
return index
@step
async def judge_query(
self, ctx: Context, ev: StartEvent | JudgeEvent
) -> BadQueryEvent | NaiveRAGEvent | HighTopKEvent | RerankEvent:
# initialize
llm = await ctx.store.get("llm", default=None)
if llm is None:
await ctx.store.set("llm", OpenAI(model="gpt-4o", temperature=0.1))
await ctx.store.set(
"index", self.load_or_create_index("data", "storage")
)
# we use a chat engine so it remembers previous interactions
await ctx.store.set("judge", SimpleChatEngine.from_defaults())
response = await ctx.store.get("judge").chat(
f"""
Given a user query, determine if this is likely to yield good results from a RAG system as-is. If it's good, return 'good', if it's bad, return 'bad'.
Good queries use a lot of relevant keywords and are detailed. Bad queries are vague or ambiguous.
Here is the query: {ev.query}
"""
)
if response == "bad":
# try again
return BadQueryEvent(query=ev.query)
else:
# send query to all 3 strategies
self.send_event(NaiveRAGEvent(query=ev.query))
self.send_event(HighTopKEvent(query=ev.query))
self.send_event(RerankEvent(query=ev.query))
@step
async def improve_query(
self, ctx: Context, ev: BadQueryEvent
) -> JudgeEvent:
response = await ctx.store.get("llm").complete(
f"""
This is a query to a RAG system: {ev.query}
The query is bad because it is too vague. Please provide a more detailed query that includes specific keywords and removes any ambiguity.
"""
)
return JudgeEvent(query=str(response))
@step
async def naive_rag(
self, ctx: Context, ev: NaiveRAGEvent
) -> ResponseEvent:
index = await ctx.store.get("index")
engine = index.as_query_engine(similarity_top_k=5)
response = engine.query(ev.query)
print("Naive response:", response)
return ResponseEvent(
query=ev.query, source="Naive", response=str(response)
)
@step
async def high_top_k(
self, ctx: Context, ev: HighTopKEvent
) -> ResponseEvent:
index = await ctx.store.get("index")
engine = index.as_query_engine(similarity_top_k=20)
response = engine.query(ev.query)
print("High top k response:", response)
return ResponseEvent(
query=ev.query, source="High top k", response=str(response)
)
@step
async def rerank(self, ctx: Context, ev: RerankEvent) -> ResponseEvent:
index = await ctx.store.get("index")
reranker = RankGPTRerank(top_n=5, llm=await ctx.store.get("llm"))
retriever = index.as_retriever(similarity_top_k=20)
engine = RetrieverQueryEngine.from_args(
retriever=retriever,
node_postprocessors=[reranker],
)
response = engine.query(ev.query)
print("Reranker response:", response)
return ResponseEvent(
query=ev.query, source="Reranker", response=str(response)
)
@step
async def judge(self, ctx: Context, ev: ResponseEvent) -> StopEvent:
ready = ctx.collect_events(ev, [ResponseEvent] * 3)
if ready is None:
return None
response = await ctx.store.get("judge").chat(
f"""
A user has provided a query and 3 different strategies have been used
to try to answer the query. Your job is to decide which strategy best
answered the query. The query was: {ev.query}
Response 1 ({ready[0].source}): {ready[0].response}
Response 2 ({ready[1].source}): {ready[1].response}
Response 3 ({ready[2].source}): {ready[2].response}
Please provide the number of the best response (1, 2, or 3).
Just provide the number, with no other text or preamble.
"""
)
best_response = int(str(response))
print(
f"Best response was number {best_response}, which was from {ready[best_response-1].source}"
)
return StopEvent(result=str(ready[best_response - 1].response))
class ComplicatedWorkflow(Workflow):
def load_or_create_index(self, directory_path, persist_dir):
# Check if the index already exists
if os.path.exists(persist_dir):
print("Loading existing index...")
# Load the index from disk
storage_context = StorageContext.from_defaults(
persist_dir=persist_dir
)
index = load_index_from_storage(storage_context)
else:
print("Creating new index...")
# Load documents from the specified directory
documents = SimpleDirectoryReader(directory_path).load_data()
# Create a new index from the documents
index = VectorStoreIndex.from_documents(documents)
# Persist the index to disk
index.storage_context.persist(persist_dir=persist_dir)
return index
@step
async def judge_query(
self, ctx: Context, ev: StartEvent | JudgeEvent
) -> BadQueryEvent | NaiveRAGEvent | HighTopKEvent | RerankEvent:
# initialize
llm = await ctx.store.get("llm", default=None)
if llm is None:
await ctx.store.set("llm", OpenAI(model="gpt-4o", temperature=0.1))
await ctx.store.set(
"index", self.load_or_create_index("data", "storage")
)
# we use a chat engine so it remembers previous interactions
await ctx.store.set("judge", SimpleChatEngine.from_defaults())
response = await ctx.store.get("judge").chat(
f"""
Given a user query, determine if this is likely to yield good results from a RAG system as-is. If it's good, return 'good', if it's bad, return 'bad'.
Good queries use a lot of relevant keywords and are detailed. Bad queries are vague or ambiguous.
Here is the query: {ev.query}
"""
)
if response == "bad":
# try again
return BadQueryEvent(query=ev.query)
else:
# send query to all 3 strategies
self.send_event(NaiveRAGEvent(query=ev.query))
self.send_event(HighTopKEvent(query=ev.query))
self.send_event(RerankEvent(query=ev.query))
@step
async def improve_query(
self, ctx: Context, ev: BadQueryEvent
) -> JudgeEvent:
response = await ctx.store.get("llm").complete(
f"""
This is a query to a RAG system: {ev.query}
The query is bad because it is too vague. Please provide a more detailed query that includes specific keywords and removes any ambiguity.
"""
)
return JudgeEvent(query=str(response))
@step
async def naive_rag(
self, ctx: Context, ev: NaiveRAGEvent
) -> ResponseEvent:
index = await ctx.store.get("index")
engine = index.as_query_engine(similarity_top_k=5)
response = engine.query(ev.query)
print("Naive response:", response)
return ResponseEvent(
query=ev.query, source="Naive", response=str(response)
)
@step
async def high_top_k(
self, ctx: Context, ev: HighTopKEvent
) -> ResponseEvent:
index = await ctx.store.get("index")
engine = index.as_query_engine(similarity_top_k=20)
response = engine.query(ev.query)
print("High top k response:", response)
return ResponseEvent(
query=ev.query, source="High top k", response=str(response)
)
@step
async def rerank(self, ctx: Context, ev: RerankEvent) -> ResponseEvent:
index = await ctx.store.get("index")
reranker = RankGPTRerank(top_n=5, llm=await ctx.store.get("llm"))
retriever = index.as_retriever(similarity_top_k=20)
engine = RetrieverQueryEngine.from_args(
retriever=retriever,
node_postprocessors=[reranker],
)
response = engine.query(ev.query)
print("Reranker response:", response)
return ResponseEvent(
query=ev.query, source="Reranker", response=str(response)
)
@step
async def judge(self, ctx: Context, ev: ResponseEvent) -> StopEvent:
ready = ctx.collect_events(ev, [ResponseEvent] * 3)
if ready is None:
return None
response = await ctx.store.get("judge").chat(
f"""
A user has provided a query and 3 different strategies have been used
to try to answer the query. Your job is to decide which strategy best
answered the query. The query was: {ev.query}
Response 1 ({ready[0].source}): {ready[0].response}
Response 2 ({ready[1].source}): {ready[1].response}
Response 3 ({ready[2].source}): {ready[2].response}
Please provide the number of the best response (1, 2, or 3).
Just provide the number, with no other text or preamble.
"""
)
best_response = int(str(response))
print(
f"Best response was number {best_response}, which was from {ready[best_response-1].source}"
)
return StopEvent(result=str(ready[best_response - 1].response))
绘制流程图¶
这是我们如何获得开头所示图表的方法。
In [ ]:
Copied!
draw_all_possible_flows(
ComplicatedWorkflow, filename="complicated_workflow.html"
)
draw_all_possible_flows(
ComplicatedWorkflow, filename="complicated_workflow.html"
)
运行工作流¶
让我们实际操作这个工作流:
judge_query
事件未返回任何结果,因为它使用了send_event
方式。因此该查询被判定为"合格"。- 所有3个RAG步骤都运行并为查询生成不同的答案
judge
步骤会运行3次。前两次不会产生事件,因为它尚未收集到必需的3个ResponseEvent
。- 第三次运行时,它会选择最佳响应并返回一个
StopEvent
In [ ]:
Copied!
c = ComplicatedWorkflow(timeout=120, verbose=True)
result = await c.run(
# query="How has spending on police changed in San Francisco's budgets from 2016 to 2018?"
# query="How has spending on healthcare changed in San Francisco?"
query="How has spending changed?"
)
print(result)
c = ComplicatedWorkflow(timeout=120, verbose=True)
result = await c.run(
# query="How has spending on police changed in San Francisco's budgets from 2016 to 2018?"
# query="How has spending on healthcare changed in San Francisco?"
query="How has spending changed?"
)
print(result)
Running step judge_query Creating new index... Step judge_query produced no event Running step naive_rag Naive response: Spending has increased over the years due to various factors such as new voter-approved minimum spending requirements, the creation of new voter-approved baselines, and growth in baseline funded requirements. Additionally, there have been notable changes in spending across different service areas and departments, with increases in funding for areas like public protection, transportation, and public works. Step naive_rag produced event ResponseEvent Running step rerank Reranker response: Spending has increased over the years, with notable changes in the allocation of funds to various service areas and departments. The budget reflects adjustments in spending to address evolving needs and priorities, resulting in a rise in overall expenditures across different categories. Step rerank produced event ResponseEvent Running step high_top_k High top k response: Spending has increased over the years, with the total budget showing growth in various areas such as aid assistance/grants, materials & supplies, equipment, debt service, services of other departments, and professional & contractual services. Additionally, there have been new investments in programs like workforce development, economic development, film services, and finance and administration. The budget allocations have been adjusted to accommodate changing needs and priorities, reflecting an overall increase in spending across different departments and programs. Step high_top_k produced event ResponseEvent Running step judge Step judge produced no event Running step judge Step judge produced no event Running step judge Best response was number 3, which was from High top k Step judge produced event StopEvent Spending has increased over the years, with the total budget showing growth in various areas such as aid assistance/grants, materials & supplies, equipment, debt service, services of other departments, and professional & contractual services. Additionally, there have been new investments in programs like workforce development, economic development, film services, and finance and administration. The budget allocations have been adjusted to accommodate changing needs and priorities, reflecting an overall increase in spending across different departments and programs.