文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
API 参考

Pipeline

具有实验性功能的 Haystack Pipeline API。

模块 haystack_experimental.core.pipeline.pipeline

Pipeline

编排引擎的同步版本。

按顺序协调组件执行,一个接一个地执行。

Pipeline.run

def run(data: Dict[str, Any],
        include_outputs_from: Optional[Set[str]] = None,
        break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
        resume_state: Optional[Dict[str, Any]] = None,
        debug_path: Optional[Union[str, Path]] = None) -> Dict[str, Any]

使用给定的输入数据运行 Pipeline。

用法

from haystack import Pipeline, Document
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder

# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
    Document(content="My name is Jean and I live in Paris."),
    Document(content="My name is Mark and I live in Berlin."),
    Document(content="My name is Giorgio and I live in Rome.")
])

prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
    {{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
"""

retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = PromptBuilder(template=prompt_template)
llm = OpenAIGenerator(api_key=Secret.from_token(api_key))

rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")

# Ask a question
question = "Who lives in Paris?"
results = rag_pipeline.run(
    {
        "retriever": {"query": question},
        "prompt_builder": {"question": question},
    }
)

print(results["llm"]["replies"])
# Jean lives in Paris

参数:

  • data:用于管道组件的输入字典。每个键是组件名称,其值是该组件输入参数的字典。
data = {
    "comp1": {"input1": 1, "input2": 2},
}

为了方便起见,当输入名称唯一时也支持此格式。

data = {
    "input1": 1, "input2": 2,
}
  • include_outputs_from:一个组件名称集合,其单个输出将包含在管道的输出中。对于被调用多次(在循环中)的组件,只包含最后一次产生的输出。
  • break_point:可用于调试管道执行的一组断点。
  • resume_state:一个字典,其中包含先前保存的管道执行的状态。
  • debug_path:管道状态应保存到的目录路径。

引发:

  • ValueError:如果向管道提供了无效输入。
  • PipelineRuntimeError:如果管道包含具有不支持的连接的循环,这些循环会导致管道卡住并运行失败。或者如果组件失败或返回了不支持类型的输出。
  • PipelineMaxComponentRuns:如果组件达到在此管道中可运行的最大次数。
  • PipelineBreakpointException:当触发 pipeline_breakpoint 时。包含组件名称、状态和部分结果。

返回值:

一个字典,其中每个条目对应一个组件名称及其输出。如果include_outputs_fromNone,此字典将仅包含叶子组件的输出,即没有出向连接的组件。

Pipeline.inject_resume_state_into_graph

def inject_resume_state_into_graph(resume_state)

从文件中加载 resume 状态并将其注入管道图。