具有实验性功能的 Haystack Pipeline API。
模块 haystack_experimental.core.pipeline.pipeline
Pipeline
编排引擎的同步版本。
按顺序协调组件执行,一个接一个地执行。
Pipeline.run
def run(data: Dict[str, Any],
include_outputs_from: Optional[Set[str]] = None,
break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
resume_state: Optional[Dict[str, Any]] = None,
debug_path: Optional[Union[str, Path]] = None) -> Dict[str, Any]
使用给定的输入数据运行 Pipeline。
用法
from haystack import Pipeline, Document
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome.")
])
prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
"""
retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = PromptBuilder(template=prompt_template)
llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
# Ask a question
question = "Who lives in Paris?"
results = rag_pipeline.run(
{
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
)
print(results["llm"]["replies"])
# Jean lives in Paris
参数:
data:用于管道组件的输入字典。每个键是组件名称,其值是该组件输入参数的字典。
data = {
"comp1": {"input1": 1, "input2": 2},
}
为了方便起见,当输入名称唯一时也支持此格式。
data = {
"input1": 1, "input2": 2,
}
include_outputs_from:一个组件名称集合,其单个输出将包含在管道的输出中。对于被调用多次(在循环中)的组件,只包含最后一次产生的输出。break_point:可用于调试管道执行的一组断点。resume_state:一个字典,其中包含先前保存的管道执行的状态。debug_path:管道状态应保存到的目录路径。
引发:
ValueError:如果向管道提供了无效输入。PipelineRuntimeError:如果管道包含具有不支持的连接的循环,这些循环会导致管道卡住并运行失败。或者如果组件失败或返回了不支持类型的输出。PipelineMaxComponentRuns:如果组件达到在此管道中可运行的最大次数。PipelineBreakpointException:当触发 pipeline_breakpoint 时。包含组件名称、状态和部分结果。
返回值:
一个字典,其中每个条目对应一个组件名称及其输出。如果include_outputs_from 是None,此字典将仅包含叶子组件的输出,即没有出向连接的组件。
Pipeline.inject_resume_state_into_graph
def inject_resume_state_into_graph(resume_state)
从文件中加载 resume 状态并将其注入管道图。
