Langchain Academy translated
  • module-0
    • LangChain 学院
  • module-1
    • 智能体记忆
    • 智能体
    • 链式结构
    • 部署
    • 路由器
    • 最简单的图结构
  • module-2
    • 支持消息摘要与外部数据库记忆的聊天机器人
    • 支持消息摘要的聊天机器人
    • 多模式架构
    • 状态归约器
    • 状态模式
    • 消息过滤与修剪
  • module-3
    • 断点
    • 动态断点
    • 编辑图状态
    • 流式处理
    • 时间回溯
  • module-4
    • 映射-归约
    • 并行节点执行
    • 研究助手
    • 子图
  • module-5
    • 记忆代理
    • 具备记忆功能的聊天机器人
    • 基于集合架构的聊天机器人
    • 支持个人资料架构的聊天机器人
  • module-6
    • 助手
    • 连接 LangGraph 平台部署
    • 创建部署
    • 双重消息处理
  • Search
  • Previous
  • Next
  • 双重消息处理
    • 拒绝处理
    • 入队
    • 中断机制
    • 回滚操作

双重消息处理¶

无缝处理双重消息对于应对现实使用场景至关重要,特别是在聊天应用中。

用户可能在先前的运行尚未完成时就连续发送多条消息,我们需要确保能够优雅地处理这种情况。

拒绝处理¶

一种简单的方法是拒绝任何新的运行请求,直到当前运行完成为止。

In [13]:
Copied!
%%capture --no-stderr
%pip install -U langgraph_sdk
%%capture --no-stderr %pip install -U langgraph_sdk
In [2]:
Copied!
from langgraph_sdk import get_client
url_for_cli_deployment = "http://localhost:8123"
client = get_client(url=url_for_cli_deployment)
from langgraph_sdk import get_client url_for_cli_deployment = "http://localhost:8123" client = get_client(url=url_for_cli_deployment)
In [18]:
Copied!
import httpx
from langchain_core.messages import HumanMessage

# Create a thread
thread = await client.threads.create()

# Create to dos
user_input_1 = "Add a ToDo to follow-up with DI Repairs."
user_input_2 = "Add a ToDo to mount dresser to the wall."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro" 

run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_1)]}, 
    config=config,
)
try:
    await client.runs.create(
        thread["thread_id"],
        graph_name,
        input={"messages": [HumanMessage(content=user_input_2)]}, 
        config=config,
        multitask_strategy="reject",
    )
except httpx.HTTPStatusError as e:
    print("Failed to start concurrent run", e)
import httpx from langchain_core.messages import HumanMessage # Create a thread thread = await client.threads.create() # Create to dos user_input_1 = "Add a ToDo to follow-up with DI Repairs." user_input_2 = "Add a ToDo to mount dresser to the wall." config = {"configurable": {"user_id": "Test-Double-Texting"}} graph_name = "task_maistro" run = await client.runs.create( thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input_1)]}, config=config, ) try: await client.runs.create( thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input_2)]}, config=config, multitask_strategy="reject", ) except httpx.HTTPStatusError as e: print("Failed to start concurrent run", e)
Failed to start concurrent run Client error '409 Conflict' for url 'http://localhost:8123/threads/2b58630e-00fd-4c35-afad-a6b59e9b9104/runs'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409
In [19]:
Copied!
from langchain_core.messages import convert_to_messages

# Wait until the original run completes
await client.runs.join(thread["thread_id"], run["run_id"])

# Get the state of the thread
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()
from langchain_core.messages import convert_to_messages # Wait until the original run completes await client.runs.join(thread["thread_id"], run["run_id"]) # Get the state of the thread state = await client.threads.get_state(thread["thread_id"]) for m in convert_to_messages(state["values"]["messages"]): m.pretty_print()
================================ Human Message =================================

Add a ToDo to follow-up with DI Repairs.
================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_6xqHubCPNufS0bg4tbUxC0FU)
 Call ID: call_6xqHubCPNufS0bg4tbUxC0FU
  Args:
    update_type: todo
================================= Tool Message =================================

New ToDo created:
Content: {'task': 'Follow-up with DI Repairs', 'time_to_complete': 30, 'deadline': None, 'solutions': ['Call DI Repairs customer service', 'Email DI Repairs support', 'Check DI Repairs website for updates'], 'status': 'not started'}
================================== Ai Message ==================================

I've added a task to follow-up with DI Repairs to your ToDo list. If there's anything else you need, feel free to let me know!

入队¶

我们可以使用入队功能将新运行任务加入队列,直到当前运行完成。

In [20]:
Copied!
# Create a new thread
thread = await client.threads.create()

# Create new ToDos
user_input_1 = "Send Erik his t-shirt gift this weekend."
user_input_2 = "Get cash and pay nanny for 2 weeks. Do this by Friday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro" 

first_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_1)]}, 
    config=config,
)

second_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_2)]}, 
    config=config,
    multitask_strategy="enqueue",
)

# Wait until the second run completes
await client.runs.join(thread["thread_id"], second_run["run_id"])

# Get the state of the thread
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()
# Create a new thread thread = await client.threads.create() # Create new ToDos user_input_1 = "Send Erik his t-shirt gift this weekend." user_input_2 = "Get cash and pay nanny for 2 weeks. Do this by Friday." config = {"configurable": {"user_id": "Test-Double-Texting"}} graph_name = "task_maistro" first_run = await client.runs.create( thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input_1)]}, config=config, ) second_run = await client.runs.create( thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input_2)]}, config=config, multitask_strategy="enqueue", ) # Wait until the second run completes await client.runs.join(thread["thread_id"], second_run["run_id"]) # Get the state of the thread state = await client.threads.get_state(thread["thread_id"]) for m in convert_to_messages(state["values"]["messages"]): m.pretty_print()
================================ Human Message =================================

Send Erik his t-shirt gift this weekend.
================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_svTeXPmWGTLY8aQ8EifjwHAa)
 Call ID: call_svTeXPmWGTLY8aQ8EifjwHAa
  Args:
    update_type: todo
================================= Tool Message =================================

New ToDo created:
Content: {'task': 'Send Erik his t-shirt gift', 'time_to_complete': 30, 'deadline': '2024-11-19T23:59:00', 'solutions': ['Wrap the t-shirt', "Get Erik's address", 'Visit the post office', 'Choose a delivery service'], 'status': 'not started'}
================================== Ai Message ==================================

I've updated your ToDo list to send Erik his t-shirt gift this weekend. If there's anything else you need, feel free to let me know!
================================ Human Message =================================

Get cash and pay nanny for 2 weeks. Do this by Friday.
================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_Cq0Tfn6yqccHH8n0DOucz5OQ)
 Call ID: call_Cq0Tfn6yqccHH8n0DOucz5OQ
  Args:
    update_type: todo
================================= Tool Message =================================

New ToDo created:
Content: {'task': 'Get cash and pay nanny for 2 weeks', 'time_to_complete': 15, 'deadline': '2024-11-17T23:59:00', 'solutions': ['Visit the ATM', 'Calculate the total amount for 2 weeks', 'Hand over the cash to the nanny'], 'status': 'not started'}

Document af1fe011-f3c5-4c1c-b98b-181869bc2944 updated:
Plan: Update the deadline for sending Erik his t-shirt gift to this weekend, which is by 2024-11-17.
Added content: 2024-11-17T23:59:00
================================== Ai Message ==================================

I've updated your ToDo list to ensure you get cash and pay the nanny for 2 weeks by Friday. Let me know if there's anything else you need!

中断机制¶

我们可以利用中断功能来终止当前运行流程,同时保留该中断点之前已完成的所有工作成果。

In [32]:
Copied!
import asyncio

# Create a new thread
thread = await client.threads.create()

# Create new ToDos
user_input_1 = "Give me a summary of my ToDos due tomrrow."
user_input_2 = "Never mind, create a ToDo to Order Ham for Thanksgiving by next Friday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro" 

interrupted_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_1)]}, 
    config=config,
)

# Wait for some of run 1 to complete so that we can see it in the thread 
await asyncio.sleep(1)

second_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_2)]}, 
    config=config,
    multitask_strategy="interrupt",
)

# Wait until the second run completes
await client.runs.join(thread["thread_id"], second_run["run_id"])

# Get the state of the thread
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()
import asyncio # Create a new thread thread = await client.threads.create() # Create new ToDos user_input_1 = "Give me a summary of my ToDos due tomrrow." user_input_2 = "Never mind, create a ToDo to Order Ham for Thanksgiving by next Friday." config = {"configurable": {"user_id": "Test-Double-Texting"}} graph_name = "task_maistro" interrupted_run = await client.runs.create( thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input_1)]}, config=config, ) # Wait for some of run 1 to complete so that we can see it in the thread await asyncio.sleep(1) second_run = await client.runs.create( thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input_2)]}, config=config, multitask_strategy="interrupt", ) # Wait until the second run completes await client.runs.join(thread["thread_id"], second_run["run_id"]) # Get the state of the thread state = await client.threads.get_state(thread["thread_id"]) for m in convert_to_messages(state["values"]["messages"]): m.pretty_print()
================================ Human Message =================================

Give me a summary of my ToDos due tomrrow.
================================ Human Message =================================

Never mind, create a ToDo to Order Ham for Thanksgiving by next Friday.
================================== Ai Message ==================================
Tool Calls:
  UpdateMemory (call_Rk80tTSJzik2oY44tyUWk8FM)
 Call ID: call_Rk80tTSJzik2oY44tyUWk8FM
  Args:
    update_type: todo
================================= Tool Message =================================

New ToDo created:
Content: {'task': 'Order Ham for Thanksgiving', 'time_to_complete': 30, 'deadline': '2024-11-22T23:59:59', 'solutions': ['Check local grocery stores for availability', 'Order online from a specialty meat provider', 'Visit a local butcher shop'], 'status': 'not started'}
================================== Ai Message ==================================

I've added the task "Order Ham for Thanksgiving" to your ToDo list with a deadline of next Friday. If you need any more help, feel free to ask!

我们可以看到初始运行已被保存,状态显示为 interrupted。

In [33]:
Copied!
# Confirm that the first run was interrupted
print((await client.runs.get(thread["thread_id"], interrupted_run["run_id"]))["status"])
# Confirm that the first run was interrupted print((await client.runs.get(thread["thread_id"], interrupted_run["run_id"]))["status"])
interrupted

回滚操作¶

我们可以使用回滚功能来中断图表的前次运行,将其删除,并使用双文本输入重新开始新的运行流程。

In [28]:
Copied!
# Create a new thread
thread = await client.threads.create()

# Create new ToDos
user_input_1 = "Add a ToDo to call to make appointment at Yoga."
user_input_2 = "Actually, add a ToDo to drop by Yoga in person on Sunday."
config = {"configurable": {"user_id": "Test-Double-Texting"}}
graph_name = "task_maistro" 

rolled_back_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_1)]}, 
    config=config,
)

second_run = await client.runs.create(
    thread["thread_id"],
    graph_name,
    input={"messages": [HumanMessage(content=user_input_2)]}, 
    config=config,
    multitask_strategy="rollback",
)

# Wait until the second run completes
await client.runs.join(thread["thread_id"], second_run["run_id"])

# Get the state of the thread
state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(state["values"]["messages"]):
    m.pretty_print()
# Create a new thread thread = await client.threads.create() # Create new ToDos user_input_1 = "Add a ToDo to call to make appointment at Yoga." user_input_2 = "Actually, add a ToDo to drop by Yoga in person on Sunday." config = {"configurable": {"user_id": "Test-Double-Texting"}} graph_name = "task_maistro" rolled_back_run = await client.runs.create( thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input_1)]}, config=config, ) second_run = await client.runs.create( thread["thread_id"], graph_name, input={"messages": [HumanMessage(content=user_input_2)]}, config=config, multitask_strategy="rollback", ) # Wait until the second run completes await client.runs.join(thread["thread_id"], second_run["run_id"]) # Get the state of the thread state = await client.threads.get_state(thread["thread_id"]) for m in convert_to_messages(state["values"]["messages"]): m.pretty_print()
================================ Human Message =================================

Actually, add a ToDo to drop by Yoga in person on Sunday.
================================== Ai Message ==================================

It looks like the task "Drop by Yoga in person" is already on your ToDo list with a deadline of November 19, 2024. Would you like me to update the deadline to the upcoming Sunday instead?

初始运行记录已被删除。

In [29]:
Copied!
# Confirm that the original run was deleted
try:
    await client.runs.get(thread["thread_id"], rolled_back_run["run_id"])
except httpx.HTTPStatusError as _:
    print("Original run was correctly deleted")
# Confirm that the original run was deleted try: await client.runs.get(thread["thread_id"], rolled_back_run["run_id"]) except httpx.HTTPStatusError as _: print("Original run was correctly deleted")
Original run was correctly deleted

总结¶

我们可以查看所有方法的汇总:

Screenshot 2024-11-15 at 12.13.18 PM.png


Documentation built with MkDocs.

Search

From here you can search these documents. Enter your search terms below.

Keyboard Shortcuts

Keys Action
? Open this help
n Next page
p Previous page
s Search