管道断点
了解如何使用断点来暂停和恢复 Haystack 管道或 Agent 的执行,以调试、检查和从保存的快照继续工作流程。
引言
Haystack 管道支持断点,用于调试复杂的执行流程。一个断点允许您在特定的组件处暂停执行,检查管道状态,并从保存的快照恢复执行。此功能适用于任何常规组件以及Agent组件。
您可以在管道中的任何组件上设置一个断点,并指定触发它的访问计数。触发时,系统将停止管道的执行,并创建一个包含当前管道状态快照的 JSON 文件。您可以检查和修改快照,并使用它从停止的确切点恢复执行。
您还可以为 Agent 设置断点,特别是为ChatGenerator组件或ToolInvoker组件中指定的任何Tool设置断点。
在常规组件上设置断点
断点通过指定组件名称和触发断点的访问计数来创建断点。这对于包含循环的管道很有用。默认的visit_count值为 0。
from haystack.dataclasses.breakpoints import Breakpoint
from haystack.core.errors import BreakpointException
# Create a breakpoint that triggers on the first visit to the "llm" component
break_point = Breakpoint(
component_name="llm",
visit_count=0, # 0 = first visit, 1 = second visit, etc.
snapshot_file_path="/path/to/snapshots" # Optional: save snapshot to file
)
# Run pipeline with breakpoint
try:
result = pipeline.run(data=input_data, break_point=break_point)
except BreakpointException as e:
print(f"Breakpoint triggered at component: {e.component}")
print(f"Component inputs: {e.inputs}")
print(f"Pipeline results so far: {e.results}")
一个BreakpointException将被引发,其中包含组件输入和管道的输出,直到执行被中断的时刻,例如在断点关联的组件执行之前 - 在上面的示例中是llm。
如果在断点中指定了snapshot_file_path,系统将保存一个 JSON 快照,其中包含与BreakpointException .
中相同的信息。为了在断点期间访问管道状态,我们可以捕获断点引发的异常,也可以指定 JSON 文件的保存位置。
从断点恢复管道执行
要从断点恢复管道的执行,请在管道运行时使用pipeline_snapshot.
使用load_pipeline_snapshot() 传递生成 JSON 文件的路径,然后将其传递给管道。
from haystack.core.pipeline.breakpoint import load_pipeline_snapshot
# Load the snapshot
snapshot = load_pipeline_snapshot("llm_2025_05_03_11_23_23.json")
# Resume execution from the snapshot
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
print(result["llm"]["replies"])
在 Agent 上设置断点
您也可以在 Agent 组件中设置断点。Agent 支持两种类型的断点:
- 聊天生成器断点:在 LLM 调用之前暂停。
- 工具调用器断点:在任何工具执行之前暂停。
一个ChatGenerator断点的定义如下。您需要像管道断点一样定义一个断点,然后定义一个AgentBreakpoint,在其中传递之前定义的断点和 Agent 组件的名称。
from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, ToolBreakpoint
# Break at chat generator (LLM calls)
chat_bp = Breakpoint(component_name="chat_generator", visit_count=0)
agent_breakpoint = AgentBreakpoint(
break_point=chat_bp,
agent_name="my_agent"
)
要在 Agent 中的工具上设置断点,请执行以下操作:
首先,定义一个ToolBreakpoint,指定名为tool_invoker的ToolInvoker组件,然后是与断点关联的工具,在本例中是weather_tool .
然后,定义一个AgentBreakpoint,将之前定义的ToolBreakpoint作为断点传递。
from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, ToolBreakpoint
# Break at tool invoker (tool calls)
tool_bp = ToolBreakpoint(
component_name="tool_invoker",
visit_count=0,
tool_name="weather_tool" # Specific tool, or None for any tool
)
agent_breakpoint = AgentBreakpoint(
break_point=tool_bp,
agent_name="my_agent"
)
恢复 Agent 执行
当 Agent 断点被触发时,您可以使用保存的快照恢复执行。与管道中的常规组件类似,将带有快照的 JSON 文件传递给管道的run()方法。
from haystack.core.pipeline.breakpoint import load_pipeline_snapshot
# Load the snapshot
snapshot_file = "./agent_debug/agent_chat_generator_2025_07_11_23_23.json"
snapshot = load_pipeline_snapshot(snapshot_file)
# Resume pipeline execution
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
print("Pipeline resumed successfully")
print(f"Final result: {result}")
使用快照进行错误恢复
如果运行失败,管道会自动创建最后一个有效状态的快照。快照包含失败前的输入、访问计数和中间输出。您可以检查它,修复问题,并从该检查点恢复执行,而不是重新开始整个运行。
访问失败时的快照
将pipeline.run()包装在try/except块中,并从引发的PipelineRuntimeError:
from haystack.core.errors import PipelineRuntimeError
try:
pipeline.run(data=input_data)
except PipelineRuntimeError as e:
snapshot = e.pipeline_snapshot
if snapshot is not None:
intermediate_outputs = snapshot.pipeline_state.pipeline_outputs
# Inspect intermediate_outputs to diagnose the failure
中检索快照。Haystack 还会将相同的快照保存到磁盘上的 JSON 文件中。目录的自动选择顺序如下:
~/.haystack/pipeline_snapshot/tmp/haystack/pipeline_snapshot./.haystack/pipeline_snapshot
文件名将遵循以下模式:{component_name}_{visit_nr}_{YYYY_MM_DD_HH_MM_SS}.json.
从快照恢复
您可以直接从内存中的快照恢复,也可以从磁盘加载。
从内存恢复
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
从磁盘恢复
from haystack.core.pipeline.breakpoint import load_pipeline_snapshot
snapshot = load_pipeline_snapshot("/path/to/.haystack/pipeline_snapshot/reader_0_2025_09_20_12_33_10.json")
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
更新于 26 天前
