JSONalyze 查询引擎¶
JSONalyze 查询引擎通常设计用于在通过代理等调用 API 后接入,此时我们已获得批量返回的行数据实例,下一步需要对这些数据进行统计分析。
JSONalyze 在底层实现中,会将加载的 JSON 列表创建为内存中的 SQLite 表,使查询引擎能够对数据执行 SQL 查询,并以查询结果形式返回分析问题的答案。
本笔记本将通过工作流演示 JSON 分析查询引擎的实现过程。
具体我们将实现 JSONalyzeQueryEngine。
!pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
由于工作流默认采用异步优先模式,因此在笔记本环境中可以顺畅运行。若需在自定义代码中执行,当不存在已启动的异步事件循环时,应使用 asyncio.run()
来启动:
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
工作流程¶
jsonalyzer:
- 该功能以 StartEvent 作为输入,返回一个 JsonAnalyzerEvent。
- 函数会创建一个内存中的 SQLite 数据库,加载 JSON 数据,基于查询语句通过 LLM 生成 SQL 查询,执行查询后返回结果(包含 SQL 查询语句和表结构)。
synthesize:
该函数通过 LLM 基于 SQL 查询语句、表结构和查询结果生成综合响应。
流程步骤将使用内置的 StartEvent
和 StopEvent
事件。
定义事件¶
from llama_index.core.workflow import Event
from typing import Dict, List, Any
class JsonAnalyzerEvent(Event):
"""
Event containing results of JSON analysis.
Attributes:
sql_query (str): The generated SQL query.
table_schema (Dict[str, Any]): Schema of the analyzed table.
results (List[Dict[str, Any]]): Query execution results.
"""
sql_query: str
table_schema: Dict[str, Any]
results: List[Dict[str, Any]]
提示词模板¶
此处我们定义了默认的 DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL
、DEFAULT_RESPONSE_SYNTHESIS_PROMPT
和 DEFAULT_TABLE_NAME
。
from llama_index.core.prompts.prompt_type import PromptType
from llama_index.core.prompts import PromptTemplate
DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL = (
"Given a query, synthesize a response based on SQL query results"
" to satisfy the query. Only include details that are relevant to"
" the query. If you don't know the answer, then say that.\n"
"SQL Query: {sql_query}\n"
"Table Schema: {table_schema}\n"
"SQL Response: {sql_response}\n"
"Query: {query_str}\n"
"Response: "
)
DEFAULT_RESPONSE_SYNTHESIS_PROMPT = PromptTemplate(
DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL,
prompt_type=PromptType.SQL_RESPONSE_SYNTHESIS,
)
DEFAULT_TABLE_NAME = "items"
from llama_index.core.base.response.schema import Response
from llama_index.core.indices.struct_store.sql_retriever import (
DefaultSQLParser,
)
from llama_index.core.prompts.default_prompts import DEFAULT_JSONALYZE_PROMPT
from llama_index.core.utils import print_text
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
from IPython.display import Markdown, display
class JSONAnalyzeQueryEngineWorkflow(Workflow):
@step
async def jsonalyzer(
self, ctx: Context, ev: StartEvent
) -> JsonAnalyzerEvent:
"""
Analyze JSON data using a SQL-like query approach.
This asynchronous method sets up an in-memory SQLite database, loads JSON data,
generates a SQL query based on a natural language question, executes the query,
and returns the results.
Args:
ctx (Context): The context object for storing data during execution.
ev (StartEvent): The event object containing input parameters.
Returns:
JsonAnalyzerEvent: An event object containing the SQL query, table schema, and query results.
The method performs the following steps:
1. Imports the required 'sqlite-utils' package.
2. Extracts necessary data from the input event.
3. Sets up an in-memory SQLite database and loads the JSON data.
4. Generates a SQL query using a LLM based on the input question.
5. Executes the SQL query and retrieves the results.
6. Returns the results along with the SQL query and table schema.
Note:
This method requires the 'sqlite-utils' package to be installed.
"""
try:
import sqlite_utils
except ImportError as exc:
IMPORT_ERROR_MSG = (
"sqlite-utils is needed to use this Query Engine:\n"
"pip install sqlite-utils"
)
raise ImportError(IMPORT_ERROR_MSG) from exc
await ctx.store.set("query", ev.get("query"))
await ctx.store.set("llm", ev.get("llm"))
query = ev.get("query")
table_name = ev.get("table_name")
list_of_dict = ev.get("list_of_dict")
prompt = DEFAULT_JSONALYZE_PROMPT
# Instantiate in-memory SQLite database
db = sqlite_utils.Database(memory=True)
try:
# Load list of dictionaries into SQLite database
db[ev.table_name].insert_all(list_of_dict)
except sqlite_utils.utils.sqlite3.IntegrityError as exc:
print_text(
f"Error inserting into table {table_name}, expected format:"
)
print_text("[{col1: val1, col2: val2, ...}, ...]")
raise ValueError("Invalid list_of_dict") from exc
# Get the table schema
table_schema = db[table_name].columns_dict
# Get the SQL query with text-to-SQL prompt
response_str = await ev.llm.apredict(
prompt=prompt,
table_name=table_name,
table_schema=table_schema,
question=query,
)
sql_parser = DefaultSQLParser()
sql_query = sql_parser.parse_response_to_sql(response_str, ev.query)
try:
# Execute the SQL query
results = list(db.query(sql_query))
except sqlite_utils.utils.sqlite3.OperationalError as exc:
print_text(f"Error executing query: {sql_query}")
raise ValueError("Invalid query") from exc
return JsonAnalyzerEvent(
sql_query=sql_query, table_schema=table_schema, results=results
)
@step
async def synthesize(
self, ctx: Context, ev: JsonAnalyzerEvent
) -> StopEvent:
"""Synthesize the response."""
llm = await ctx.store.get("llm", default=None)
query = await ctx.store.get("query", default=None)
response_str = llm.predict(
DEFAULT_RESPONSE_SYNTHESIS_PROMPT,
sql_query=ev.sql_query,
table_schema=ev.table_schema,
sql_response=ev.results,
query_str=query,
)
response_metadata = {
"sql_query": ev.sql_query,
"table_schema": str(ev.table_schema),
}
response = Response(response=response_str, metadata=response_metadata)
return StopEvent(result=response)
创建 JSON 列表¶
json_list = [
{
"name": "John Doe",
"age": 25,
"major": "Computer Science",
"email": "john.doe@example.com",
"address": "123 Main St",
"city": "New York",
"state": "NY",
"country": "USA",
"phone": "+1 123-456-7890",
"occupation": "Software Engineer",
},
{
"name": "Jane Smith",
"age": 30,
"major": "Business Administration",
"email": "jane.smith@example.com",
"address": "456 Elm St",
"city": "San Francisco",
"state": "CA",
"country": "USA",
"phone": "+1 234-567-8901",
"occupation": "Marketing Manager",
},
{
"name": "Michael Johnson",
"age": 35,
"major": "Finance",
"email": "michael.johnson@example.com",
"address": "789 Oak Ave",
"city": "Chicago",
"state": "IL",
"country": "USA",
"phone": "+1 345-678-9012",
"occupation": "Financial Analyst",
},
{
"name": "Emily Davis",
"age": 28,
"major": "Psychology",
"email": "emily.davis@example.com",
"address": "234 Pine St",
"city": "Los Angeles",
"state": "CA",
"country": "USA",
"phone": "+1 456-789-0123",
"occupation": "Psychologist",
},
{
"name": "Alex Johnson",
"age": 27,
"major": "Engineering",
"email": "alex.johnson@example.com",
"address": "567 Cedar Ln",
"city": "Seattle",
"state": "WA",
"country": "USA",
"phone": "+1 567-890-1234",
"occupation": "Civil Engineer",
},
{
"name": "Jessica Williams",
"age": 32,
"major": "Biology",
"email": "jessica.williams@example.com",
"address": "890 Walnut Ave",
"city": "Boston",
"state": "MA",
"country": "USA",
"phone": "+1 678-901-2345",
"occupation": "Biologist",
},
{
"name": "Matthew Brown",
"age": 26,
"major": "English Literature",
"email": "matthew.brown@example.com",
"address": "123 Peach St",
"city": "Atlanta",
"state": "GA",
"country": "USA",
"phone": "+1 789-012-3456",
"occupation": "Writer",
},
{
"name": "Olivia Wilson",
"age": 29,
"major": "Art",
"email": "olivia.wilson@example.com",
"address": "456 Plum Ave",
"city": "Miami",
"state": "FL",
"country": "USA",
"phone": "+1 890-123-4567",
"occupation": "Artist",
},
{
"name": "Daniel Thompson",
"age": 31,
"major": "Physics",
"email": "daniel.thompson@example.com",
"address": "789 Apple St",
"city": "Denver",
"state": "CO",
"country": "USA",
"phone": "+1 901-234-5678",
"occupation": "Physicist",
},
{
"name": "Sophia Clark",
"age": 27,
"major": "Sociology",
"email": "sophia.clark@example.com",
"address": "234 Orange Ln",
"city": "Austin",
"state": "TX",
"country": "USA",
"phone": "+1 012-345-6789",
"occupation": "Social Worker",
},
{
"name": "Christopher Lee",
"age": 33,
"major": "Chemistry",
"email": "christopher.lee@example.com",
"address": "567 Mango St",
"city": "San Diego",
"state": "CA",
"country": "USA",
"phone": "+1 123-456-7890",
"occupation": "Chemist",
},
{
"name": "Ava Green",
"age": 28,
"major": "History",
"email": "ava.green@example.com",
"address": "890 Cherry Ave",
"city": "Philadelphia",
"state": "PA",
"country": "USA",
"phone": "+1 234-567-8901",
"occupation": "Historian",
},
{
"name": "Ethan Anderson",
"age": 30,
"major": "Business",
"email": "ethan.anderson@example.com",
"address": "123 Lemon Ln",
"city": "Houston",
"state": "TX",
"country": "USA",
"phone": "+1 345-678-9012",
"occupation": "Entrepreneur",
},
{
"name": "Isabella Carter",
"age": 28,
"major": "Mathematics",
"email": "isabella.carter@example.com",
"address": "456 Grape St",
"city": "Phoenix",
"state": "AZ",
"country": "USA",
"phone": "+1 456-789-0123",
"occupation": "Mathematician",
},
{
"name": "Andrew Walker",
"age": 32,
"major": "Economics",
"email": "andrew.walker@example.com",
"address": "789 Berry Ave",
"city": "Portland",
"state": "OR",
"country": "USA",
"phone": "+1 567-890-1234",
"occupation": "Economist",
},
{
"name": "Mia Evans",
"age": 29,
"major": "Political Science",
"email": "mia.evans@example.com",
"address": "234 Lime St",
"city": "Washington",
"state": "DC",
"country": "USA",
"phone": "+1 678-901-2345",
"occupation": "Political Analyst",
},
]
定义大语言模型¶
llm = OpenAI(model="gpt-3.5-turbo")
运行工作流!¶
w = JSONAnalyzeQueryEngineWorkflow()
# Run a query
query = "What is the maximum age among the individuals?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
Question: What is the maximum age among the individuals?
Answer: The maximum age among the individuals is 35.
query = "How many individuals have an occupation related to science or engineering?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
Question: How many individuals have an occupation related to science or engineering?
Answer: There are 0 individuals with an occupation related to science or engineering.
query = "How many individuals have a phone number starting with '+1 234'?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
Question: How many individuals have a phone number starting with '+1 234'?
Answer: There are 2 individuals with a phone number starting with '+1 234'.
query = "What is the percentage of individuals residing in California (CA)?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
Question: What is the percentage of individuals residing in California (CA)?
Answer: The percentage of individuals residing in California (CA) is 18.75%.
query = "How many individuals have a major in Psychology?"
result = await w.run(
query=query, list_of_dict=json_list, llm=llm, table_name=DEFAULT_TABLE_NAME
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
Question: How many individuals have a major in Psychology?
Answer: There is 1 individual who has a major in Psychology.