文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
文档

管道断点

了解如何使用断点来暂停和恢复 Haystack 管道或 Agent 的执行,以调试、检查和从保存的快照继续工作流程。

引言

Haystack 管道支持断点,用于调试复杂的执行流程。一个断点允许您在特定的组件处暂停执行,检查管道状态,并从保存的快照恢复执行。此功能适用于任何常规组件以及Agent组件。

您可以在管道中的任何组件上设置一个断点,并指定触发它的访问计数。触发时,系统将停止管道的执行,并创建一个包含当前管道状态快照的 JSON 文件。您可以检查和修改快照,并使用它从停止的确切点恢复执行。

您还可以为 Agent 设置断点,特别是为ChatGenerator组件或ToolInvoker组件中指定的任何Tool设置断点。

在常规组件上设置断点

通过指定组件名称和触发断点的访问计数来创建断点。这对于包含循环的管道很有用。默认的visit_count值为 0。

from haystack.dataclasses.breakpoints import Breakpoint
from haystack.core.errors import BreakpointException

# Create a breakpoint that triggers on the first visit to the "llm" component
break_point = Breakpoint(
    component_name="llm", 
    visit_count=0,  # 0 = first visit, 1 = second visit, etc.
    snapshot_file_path="/path/to/snapshots"  # Optional: save snapshot to file
)

# Run pipeline with breakpoint
try:
    result = pipeline.run(data=input_data, break_point=break_point)
except BreakpointException as e:
    print(f"Breakpoint triggered at component: {e.component}")
    print(f"Component inputs: {e.inputs}")
    print(f"Pipeline results so far: {e.results}")

一个BreakpointException将被引发,其中包含组件输入和管道的输出,直到执行被中断的时刻,例如在断点关联的组件执行之前 - 在上面的示例中是llm

如果在断点中指定了snapshot_file_path,系统将保存一个 JSON 快照,其中包含与BreakpointException .

中相同的信息。为了在断点期间访问管道状态,我们可以捕获断点引发的异常,也可以指定 JSON 文件的保存位置。

从断点恢复管道执行

要从断点恢复管道的执行,请在管道运行时使用pipeline_snapshot.

使用load_pipeline_snapshot() 传递生成 JSON 文件的路径,然后将其传递给管道。

from haystack.core.pipeline.breakpoint import load_pipeline_snapshot

# Load the snapshot
snapshot = load_pipeline_snapshot("llm_2025_05_03_11_23_23.json")

# Resume execution from the snapshot
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
print(result["llm"]["replies"])

在 Agent 上设置断点

您也可以在 Agent 组件中设置断点。Agent 支持两种类型的断点:

  1. 聊天生成器断点:在 LLM 调用之前暂停。
  2. 工具调用器断点:在任何工具执行之前暂停。

一个ChatGenerator断点的定义如下。您需要像管道断点一样定义一个断点,然后定义一个AgentBreakpoint,在其中传递之前定义的断点和 Agent 组件的名称。

from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, ToolBreakpoint

# Break at chat generator (LLM calls)
chat_bp = Breakpoint(component_name="chat_generator", visit_count=0)
agent_breakpoint = AgentBreakpoint(
    break_point=chat_bp, 
    agent_name="my_agent"
)

要在 Agent 中的工具上设置断点,请执行以下操作:

首先,定义一个ToolBreakpoint,指定名为tool_invokerToolInvoker组件,然后是与断点关联的工具,在本例中是weather_tool .

然后,定义一个AgentBreakpoint,将之前定义的ToolBreakpoint作为断点传递。

from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, ToolBreakpoint

# Break at tool invoker (tool calls)
tool_bp = ToolBreakpoint(
    component_name="tool_invoker", 
    visit_count=0, 
    tool_name="weather_tool"  # Specific tool, or None for any tool
)
agent_breakpoint = AgentBreakpoint(
    break_point=tool_bp, 
    agent_name="my_agent"
)

恢复 Agent 执行

当 Agent 断点被触发时,您可以使用保存的快照恢复执行。与管道中的常规组件类似,将带有快照的 JSON 文件传递给管道的run()方法。

from haystack.core.pipeline.breakpoint import load_pipeline_snapshot

# Load the snapshot
snapshot_file = "./agent_debug/agent_chat_generator_2025_07_11_23_23.json"
snapshot = load_pipeline_snapshot(snapshot_file)
    
# Resume pipeline execution
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
print("Pipeline resumed successfully")
print(f"Final result: {result}")

使用快照进行错误恢复

如果运行失败,管道会自动创建最后一个有效状态的快照。快照包含失败前的输入、访问计数和中间输出。您可以检查它,修复问题,并从该检查点恢复执行,而不是重新开始整个运行。

访问失败时的快照

pipeline.run()包装在try/except块中,并从引发的PipelineRuntimeError:

from haystack.core.errors import PipelineRuntimeError

try:
    pipeline.run(data=input_data)
except PipelineRuntimeError as e:
    snapshot = e.pipeline_snapshot
    if snapshot is not None:
        intermediate_outputs = snapshot.pipeline_state.pipeline_outputs
        # Inspect intermediate_outputs to diagnose the failure

中检索快照。Haystack 还会将相同的快照保存到磁盘上的 JSON 文件中。目录的自动选择顺序如下:

  • ~/.haystack/pipeline_snapshot
  • /tmp/haystack/pipeline_snapshot
  • ./.haystack/pipeline_snapshot

文件名将遵循以下模式:{component_name}_{visit_nr}_{YYYY_MM_DD_HH_MM_SS}.json.

从快照恢复

您可以直接从内存中的快照恢复,也可以从磁盘加载。

从内存恢复

result = pipeline.run(data={}, pipeline_snapshot=snapshot)

从磁盘恢复

from haystack.core.pipeline.breakpoint import load_pipeline_snapshot

snapshot = load_pipeline_snapshot("/path/to/.haystack/pipeline_snapshot/reader_0_2025_09_20_12_33_10.json")
result = pipeline.run(data={}, pipeline_snapshot=snapshot)