按流程安排组件和集成。
模块 async_pipeline
AsyncPipeline
管道编排引擎的异步版本。
管理管道中的组件,当管道的执行图允许时,可以进行并发处理。这通过最大程度地减少空闲时间和最大程度地利用资源来有效地处理组件。
AsyncPipeline.run_async_generator
async def run_async_generator(
data: dict[str, Any],
include_outputs_from: Optional[set[str]] = None,
concurrency_limit: int = 4) -> AsyncIterator[dict[str, Any]]
异步逐步执行管道,在任何组件完成时生成部分输出。
用法
from haystack import Document
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack import AsyncPipeline
import asyncio
# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome.")
])
prompt_template = [
ChatMessage.from_user(
'''
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
''')
]
# Create and connect pipeline components
retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = ChatPromptBuilder(template=prompt_template)
llm = OpenAIChatGenerator()
rag_pipeline = AsyncPipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
# Prepare input data
question = "Who lives in Paris?"
data = {
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
# Process results as they become available
async def process_results():
async for partial_output in rag_pipeline.run_async_generator(
data=data,
include_outputs_from={"retriever", "llm"}
):
# Each partial_output contains the results from a completed component
if "retriever" in partial_output:
print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
if "llm" in partial_output:
print("Generated answer:", partial_output["llm"]["replies"][0])
asyncio.run(process_results())
参数:
data:管道的初始输入数据。concurrency_limit:允许并发运行的最大组件数。include_outputs_from:要包含在管道输出中的单个输出的组件名称集。对于多次调用的组件(在循环中),只包含最后生成的输出。
引发:
ValueError:如果向管道提供了无效输入。PipelineMaxComponentRuns:如果组件超出管道中允许的最大执行次数。PipelineRuntimeError:如果管道包含不支持的连接的循环,这会导致它卡住并运行失败。或者如果组件失败或返回不支持类型的输出。
返回值:
包含部分(和最终)输出的异步迭代器。
AsyncPipeline.run_async
async def run_async(data: dict[str, Any],
include_outputs_from: Optional[set[str]] = None,
concurrency_limit: int = 4) -> dict[str, Any]
提供异步接口,使用提供的输入数据运行管道。
此方法允许将管道集成到异步工作流中,从而实现管道组件的非阻塞执行。
用法
import asyncio
from haystack import Document
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.core.pipeline import AsyncPipeline
from haystack.dataclasses import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome.")
])
prompt_template = [
ChatMessage.from_user(
'''
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
''')
]
retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = ChatPromptBuilder(template=prompt_template)
llm = OpenAIChatGenerator()
rag_pipeline = AsyncPipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
# Ask a question
question = "Who lives in Paris?"
async def run_inner(data, include_outputs_from):
return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
data = {
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
print(results["llm"]["replies"])
# [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
# _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
# {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
# 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
# audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
# PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
参数:
data:管道组件的输入字典。每个键是组件名称,其值是该组件输入参数的字典。
data = {
"comp1": {"input1": 1, "input2": 2},
}
为方便起见,当输入名称唯一时也支持此格式。
data = {
"input1": 1, "input2": 2,
}
include_outputs_from:要包含在管道输出中的单个输出的组件名称集。对于多次调用的组件(在循环中),只包含最后生成的输出。concurrency_limit:允许并发运行的最大组件数。
引发:
ValueError:如果向管道提供了无效输入。PipelineRuntimeError:如果管道包含不支持的连接的循环,这会导致它卡住并运行失败。或者如果组件失败或返回不支持类型的输出。PipelineMaxComponentRuns:如果组件达到在此管道中可以运行的最大次数。
返回值:
一个字典,其中每个条目对应一个组件名称及其输出。如果include_outputs_from为None,则此字典将仅包含叶组件的输出,即没有传出连接的组件。
AsyncPipeline.run
def run(data: dict[str, Any],
include_outputs_from: Optional[set[str]] = None,
concurrency_limit: int = 4) -> dict[str, Any]
提供同步接口,使用给定输入数据运行管道。
在内部,管道组件是异步执行的,但方法本身将阻塞,直到整个管道执行完成。
如果您需要异步方法,请考虑使用run_async或run_async_generator.
用法
from haystack import Document
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.core.pipeline import AsyncPipeline
from haystack.dataclasses import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome.")
])
prompt_template = [
ChatMessage.from_user(
'''
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
''')
]
retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = ChatPromptBuilder(template=prompt_template)
llm = OpenAIChatGenerator()
rag_pipeline = AsyncPipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
# Ask a question
question = "Who lives in Paris?"
data = {
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
results = rag_pipeline.run(data)
print(results["llm"]["replies"])
# [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
# _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage':
# {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
# CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
# rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
# cached_tokens=0)}})]
参数:
data:管道组件的输入字典。每个键是组件名称,其值是该组件输入参数的字典。
data = {
"comp1": {"input1": 1, "input2": 2},
}
为方便起见,当输入名称唯一时也支持此格式。
data = {
"input1": 1, "input2": 2,
}
include_outputs_from:要包含在管道输出中的单个输出的组件名称集。对于多次调用的组件(在循环中),只包含最后生成的输出。concurrency_limit:允许并发运行的最大组件数。
引发:
ValueError:如果向管道提供了无效输入。PipelineRuntimeError:如果管道包含不支持的连接的循环,这会导致它卡住并运行失败。或者如果组件失败或返回不支持类型的输出。PipelineMaxComponentRuns:如果组件达到在此管道中可以运行的最大次数。RuntimeError:如果从异步上下文内部调用。请改用run_async。
返回值:
一个字典,其中每个条目对应一个组件名称及其输出。如果include_outputs_from为None,则此字典将仅包含叶组件的输出,即没有传出连接的组件。
AsyncPipeline.__init__
def __init__(metadata: Optional[dict[str, Any]] = None,
max_runs_per_component: int = 100,
connection_type_validation: bool = True)
创建管道。
参数:
metadata:用于存储此Pipeline的元数据的任意字典。如果您希望将此Pipeline保存到文件,请确保此字典中包含的所有值都可以序列化和反序列化。max_runs_per_component:此Pipeline可以运行同一组件的次数。如果达到此限制,则会引发PipelineMaxComponentRuns异常。如果未设置,则默认为每个组件运行100次。connection_type_validation:管道是否会验证连接的类型。默认为True。
AsyncPipeline.__eq__
def __eq__(other: object) -> bool
管道的相等性由它们的类型和它们的序列化形式的相等性定义。
相同类型的管道共享所有元数据、节点和边,但它们不需要使用相同的节点实例:这允许保存然后重新加载的管道与自身相等。
AsyncPipeline.__repr__
def __repr__() -> str
返回管道的文本表示。
AsyncPipeline.to_dict
def to_dict() -> dict[str, Any]
将管道序列化为字典。
这旨在作为中间表示,但也可以用于将管道保存到文件。
返回值:
包含序列化数据的字典。
AsyncPipeline.from_dict
@classmethod
def from_dict(cls: type[T],
data: dict[str, Any],
callbacks: Optional[DeserializationCallbacks] = None,
**kwargs: Any) -> T
从字典反序列化管道。
参数:
data: 要反序列化的字典。callbacks:在反序列化期间调用的回调。kwargs:components:一个{name: instance}字典,用于重用组件实例而不是创建新的实例。
返回值:
反序列化后的组件。
AsyncPipeline.dumps
def dumps(marshaller: Marshaller = DEFAULT_MARSHALLER) -> str
根据使用的Marshaller指定的格式,返回此管道的字符串表示形式。
参数:
marshaller:用于创建字符串表示形式的Marshaller。默认为YamlMarshaller.
返回值:
表示管道的字符串。
AsyncPipeline.dump
def dump(fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> None
将此管道的字符串表示形式写入传递给fp参数的文件类对象。
参数:
fp:准备好写入的文件类对象。marshaller:用于创建字符串表示形式的Marshaller。默认为YamlMarshaller.
AsyncPipeline.loads
@classmethod
def loads(cls: type[T],
data: Union[str, bytes, bytearray],
marshaller: Marshaller = DEFAULT_MARSHALLER,
callbacks: Optional[DeserializationCallbacks] = None) -> T
从传递给data参数的字符串表示形式创建Pipeline对象。
参数:
data:管道的字符串表示形式,可以是str,bytes或bytearray.marshaller:用于创建字符串表示形式的Marshaller。默认为YamlMarshaller.callbacks:在反序列化期间调用的回调。
引发:
DeserializationError:如果在反序列化过程中发生错误。
返回值:
一个Pipeline对象。
AsyncPipeline.load
@classmethod
def load(cls: type[T],
fp: TextIO,
marshaller: Marshaller = DEFAULT_MARSHALLER,
callbacks: Optional[DeserializationCallbacks] = None) -> T
从传递给Pipeline对象具有字符串表示形式。
字符串表示形式从传递给fp参数的文件类对象。
参数:
fp的文件类对象中读取:准备好读取的文件类对象。marshaller:用于创建字符串表示形式的Marshaller。默认为YamlMarshaller.callbacks:在反序列化期间调用的回调。
引发:
DeserializationError:如果在反序列化过程中发生错误。
返回值:
一个Pipeline对象。
AsyncPipeline.add_component
def add_component(name: str, instance: Component) -> None
将给定组件添加到管道。
默认情况下,组件不连接到任何东西:使用Pipeline.connect()将组件连接在一起。组件名称必须唯一,但如果需要,可以重用组件实例。
参数:
name:要添加的组件的名称。instance:要添加的组件实例。
引发:
ValueError:如果已存在同名组件。PipelineValidationError:如果给定实例不是组件。
AsyncPipeline.remove_component
def remove_component(name: str) -> Component
从管道中移除并返回组件。
通过提供组件的名称从管道中移除现有组件。连接到该组件的所有边也将被删除。
参数:
name:要移除的组件的名称。
引发:
ValueError:如果管道中不存在该名称的组件。
返回值:
移除的组件实例。
AsyncPipeline.connect
def connect(sender: str, receiver: str) -> "PipelineBase"
将两个组件连接在一起。
所有要连接的组件都必须存在于管道中。如果连接到具有多个输出连接的组件,请将输入和输出名称指定为“component_name.connections_name”。
参数:
sender:提供值的组件。这既可以是组件名称,也可以是component_name.connection_name格式(如果组件有多个输出)。receiver:接收值的组件。这既可以是组件名称,也可以是component_name.connection_name格式(如果组件有多个输入)。
引发:
PipelineConnectError:如果两个组件无法连接(例如,如果其中一个组件不在管道中,或者连接类型不匹配等)。
返回值:
管道实例。
AsyncPipeline.get_component
def get_component(name: str) -> Component
从管道中获取指定名称的组件。
参数:
name:组件的名称。
引发:
ValueError:如果管道中不存在该名称的组件。
返回值:
该组件的实例。
AsyncPipeline.get_component_name
def get_component_name(instance: Component) -> str
返回组件实例的名称,如果它已添加到此管道,否则返回空字符串。
参数:
instance:要查找的组件实例。
返回值:
组件实例的名称。
AsyncPipeline.inputs
def inputs(
include_components_with_connected_inputs: bool = False
) -> dict[str, dict[str, Any]]
返回包含管道输入的字典。
字典中的每个键对应一个组件名称,其值是另一个字典,描述该组件的输入插槽,包括它们的类型以及它们是否是可选的。
参数:
include_components_with_connected_inputs:如果为False,则输出中只包含具有断开连接的输入边的组件。
返回值:
一个字典,其中每个键是管道组件名称,每个值是该组件的输入插槽字典。
AsyncPipeline.outputs
def outputs(
include_components_with_connected_outputs: bool = False
) -> dict[str, dict[str, Any]]
返回包含管道输出的字典。
字典中的每个键对应一个组件名称,其值是另一个字典,描述该组件的输出插槽。
参数:
include_components_with_connected_outputs:如果为False,则输出中只包含具有断开连接的输出边的组件。
返回值:
一个字典,其中每个键是管道组件名称,每个值是该组件的输出插槽字典。
AsyncPipeline.show
def show(*,
server_url: str = "https://mermaid.ink",
params: Optional[dict] = None,
timeout: int = 30,
super_component_expansion: bool = False) -> None
在 Jupyter Notebook 中显示表示此Pipeline的图像。
此函数使用 Mermaid 服务器生成Pipeline的图表,并直接在 Notebook 中显示它。
参数:
server_url:用于渲染的 Mermaid 服务器的基本 URL(默认值:'https://mermaid.ink')。有关如何设置自己的 Mermaid 服务器的更多信息,请参阅https://github.com/jihchi/mermaid.ink和https://github.com/mermaid-js/mermaid-live-editor。params:用于修改输出的自定义参数字典。有关详细信息,请参阅 Mermaid 文档。支持的键:- format:输出格式('img'、'svg'或'pdf')。默认值:'img'。
- type:/img 端点的图像类型('jpeg'、'png'、'webp')。默认值:'png'。
- theme:Mermaid 主题('default'、'neutral'、'dark'、'forest')。默认值:'neutral'。
- bgColor:十六进制(例如,'FFFFFF')或命名格式(例如,'!white')的背景颜色。
- width:输出图像的宽度(整数)。
- height:输出图像的高度(整数)。
- scale:缩放因子(1-3)。仅当指定了'width'或'height'时适用。
- fit:是否将图表大小适应页面(仅限 PDF,布尔值)。
- paper:PDF 的纸张大小(例如,'a4'、'a3')。如果'fit'为 true,则忽略。
- landscape:PDF 的横向方向(布尔值)。如果'fit'为 true,则忽略。
timeout:向 Mermaid 服务器发出请求的超时时间(秒)。super_component_expansion:如果设置为 True 且管道包含 SuperComponents,则图表将显示超级组件的内部结构,就像它们是管道的一部分组件一样,而不是“黑匣子”。否则,将只显示超级组件本身。
引发:
PipelineDrawingError:如果函数在 Jupyter Notebook 之外调用或渲染出现问题。
AsyncPipeline.draw
def draw(*,
path: Path,
server_url: str = "https://mermaid.ink",
params: Optional[dict] = None,
timeout: int = 30,
super_component_expansion: bool = False) -> None
将表示此Pipeline的图像保存到指定的文件路径。
此函数使用 Mermaid 服务器生成Pipeline使用 Mermaid 服务器并将其保存到提供的路径。
参数:
path:将保存生成图像的文件路径。server_url:用于渲染的 Mermaid 服务器的基本 URL(默认值:'https://mermaid.ink')。有关如何设置自己的 Mermaid 服务器的更多信息,请参阅https://github.com/jihchi/mermaid.ink和https://github.com/mermaid-js/mermaid-live-editor。params:用于修改输出的自定义参数字典。有关详细信息,请参阅 Mermaid 文档。支持的键:- format:输出格式('img'、'svg'或'pdf')。默认值:'img'。
- type:/img 端点的图像类型('jpeg'、'png'、'webp')。默认值:'png'。
- theme:Mermaid 主题('default'、'neutral'、'dark'、'forest')。默认值:'neutral'。
- bgColor:十六进制(例如,'FFFFFF')或命名格式(例如,'!white')的背景颜色。
- width:输出图像的宽度(整数)。
- height:输出图像的高度(整数)。
- scale:缩放因子(1-3)。仅当指定了'width'或'height'时适用。
- fit:是否将图表大小适应页面(仅限 PDF,布尔值)。
- paper:PDF 的纸张大小(例如,'a4'、'a3')。如果'fit'为 true,则忽略。
- landscape:PDF 的横向方向(布尔值)。如果'fit'为 true,则忽略。
timeout:向 Mermaid 服务器发出请求的超时时间(秒)。super_component_expansion:如果设置为 True 且管道包含 SuperComponents,则图表将显示超级组件的内部结构,就像它们是管道的一部分组件一样,而不是“黑匣子”。否则,将只显示超级组件本身。
引发:
PipelineDrawingError:如果渲染或保存图像出现问题。
AsyncPipeline.walk
def walk() -> Iterator[tuple[str, Component]]
精确访问管道中的每个组件一次,并生成其名称和实例。
不保证访问顺序。
返回值:
组件名称和组件实例的元组迭代器。
AsyncPipeline.warm_up
def warm_up() -> None
确保所有节点都已预热。
节点有责任确保此方法可以在每次Pipeline.run()调用时执行,而无需重新初始化所有内容。
AsyncPipeline.validate_input
def validate_input(data: dict[str, Any]) -> None
验证管道输入数据。
验证数据
- 每个组件名称确实存在于管道中。
- 每个组件没有缺少任何输入。
- 每个组件的每个输入插槽只有一个输入,如果不是可变参数。
- 每个组件不会接收到其他组件已发送的输入。
参数:
data:管道组件输入的字典。每个键都是一个组件名称。
引发:
ValueError:如果输入根据上述规则无效。
AsyncPipeline.from_template
@classmethod
def from_template(
cls,
predefined_pipeline: PredefinedPipeline,
template_params: Optional[dict[str, Any]] = None) -> "PipelineBase"
从预定义模板创建管道。有关可用选项,请参见PredefinedPipeline。
参数:
predefined_pipeline:要使用的预定义管道。template_params:在渲染管道模板时使用的可选参数字典。
返回值:
一个实例。Pipeline.
AsyncPipeline.validate_pipeline
@staticmethod
def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None
验证管道以检查其是否被阻塞或没有有效的入口点。
参数:
priority_queue:组件名称的优先级队列。
引发:
PipelineRuntimeError:如果管道被阻塞或没有有效的入口点。
模块 pipeline
Pipeline
编排引擎的同步版本。
根据执行图表,按顺序编排组件执行。
Pipeline.run
def run(data: dict[str, Any],
include_outputs_from: Optional[set[str]] = None,
*,
break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
pipeline_snapshot: Optional[PipelineSnapshot] = None
) -> dict[str, Any]
使用给定输入数据运行管道。
用法
from haystack import Pipeline, Document
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome.")
])
prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
"""
retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = PromptBuilder(template=prompt_template)
llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
# Ask a question
question = "Who lives in Paris?"
results = rag_pipeline.run(
{
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
)
print(results["llm"]["replies"])
# Jean lives in Paris
参数:
data:管道组件的输入字典。每个键是组件名称,其值是该组件输入参数的字典。
data = {
"comp1": {"input1": 1, "input2": 2},
}
为方便起见,当输入名称唯一时也支持此格式。
data = {
"input1": 1, "input2": 2,
}
include_outputs_from:要包含在管道输出中的单个输出的组件名称集。对于多次调用的组件(在循环中),只包含最后生成的输出。break_point:一组可用于调试管道执行的断点。pipeline_snapshot:包含先前保存的管道执行快照的字典。
引发:
ValueError:如果向管道提供了无效输入。PipelineRuntimeError:如果管道包含不支持的连接的循环,这会导致它卡住并运行失败。或者如果组件失败或返回不支持类型的输出。PipelineMaxComponentRuns:如果组件达到在此管道中可以运行的最大次数。PipelineBreakpointException:当触发pipeline_breakpoint时。包含组件名称、状态和部分结果。
返回值:
一个字典,其中每个条目对应一个组件名称及其输出。如果include_outputs_from为None,则此字典将仅包含叶组件的输出,即没有传出连接的组件。
Pipeline.__init__
def __init__(metadata: Optional[dict[str, Any]] = None,
max_runs_per_component: int = 100,
connection_type_validation: bool = True)
创建管道。
参数:
metadata:用于存储此Pipeline的元数据的任意字典。如果您希望将此Pipeline保存到文件,请确保此字典中包含的所有值都可以序列化和反序列化。max_runs_per_component:此Pipeline可以运行同一组件的次数。如果达到此限制,则会引发PipelineMaxComponentRuns异常。如果未设置,则默认为每个组件运行100次。connection_type_validation:管道是否会验证连接的类型。默认为True。
Pipeline.__eq__
def __eq__(other: object) -> bool
管道的相等性由它们的类型和它们的序列化形式的相等性定义。
相同类型的管道共享所有元数据、节点和边,但它们不需要使用相同的节点实例:这允许保存然后重新加载的管道与自身相等。
Pipeline.__repr__
def __repr__() -> str
返回管道的文本表示。
Pipeline.to_dict
def to_dict() -> dict[str, Any]
将管道序列化为字典。
这旨在作为中间表示,但也可以用于将管道保存到文件。
返回值:
包含序列化数据的字典。
Pipeline.from_dict
@classmethod
def from_dict(cls: type[T],
data: dict[str, Any],
callbacks: Optional[DeserializationCallbacks] = None,
**kwargs: Any) -> T
从字典反序列化管道。
参数:
data: 要反序列化的字典。callbacks:在反序列化期间调用的回调。kwargs:components:一个{name: instance}字典,用于重用组件实例而不是创建新的实例。
返回值:
反序列化后的组件。
Pipeline.dumps
def dumps(marshaller: Marshaller = DEFAULT_MARSHALLER) -> str
根据使用的Marshaller指定的格式,返回此管道的字符串表示形式。
参数:
marshaller:用于创建字符串表示形式的Marshaller。默认为YamlMarshaller.
返回值:
表示管道的字符串。
Pipeline.dump
def dump(fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> None
将此管道的字符串表示形式写入传递给fp参数的文件类对象。
参数:
fp:准备好写入的文件类对象。marshaller:用于创建字符串表示形式的Marshaller。默认为YamlMarshaller.
Pipeline.loads
@classmethod
def loads(cls: type[T],
data: Union[str, bytes, bytearray],
marshaller: Marshaller = DEFAULT_MARSHALLER,
callbacks: Optional[DeserializationCallbacks] = None) -> T
从传递给data参数的字符串表示形式创建Pipeline对象。
参数:
data:管道的字符串表示形式,可以是str,bytes或bytearray.marshaller:用于创建字符串表示形式的Marshaller。默认为YamlMarshaller.callbacks:在反序列化期间调用的回调。
引发:
DeserializationError:如果在反序列化过程中发生错误。
返回值:
一个Pipeline对象。
Pipeline.load
@classmethod
def load(cls: type[T],
fp: TextIO,
marshaller: Marshaller = DEFAULT_MARSHALLER,
callbacks: Optional[DeserializationCallbacks] = None) -> T
从传递给Pipeline对象具有字符串表示形式。
字符串表示形式从传递给fp参数的文件类对象。
参数:
fp的文件类对象中读取:准备好读取的文件类对象。marshaller:用于创建字符串表示形式的Marshaller。默认为YamlMarshaller.callbacks:在反序列化期间调用的回调。
引发:
DeserializationError:如果在反序列化过程中发生错误。
返回值:
一个Pipeline对象。
Pipeline.add_component
def add_component(name: str, instance: Component) -> None
将给定组件添加到管道。
默认情况下,组件不连接到任何东西:使用Pipeline.connect()将组件连接在一起。组件名称必须唯一,但如果需要,可以重用组件实例。
参数:
name:要添加的组件的名称。instance:要添加的组件实例。
引发:
ValueError:如果已存在同名组件。PipelineValidationError:如果给定实例不是组件。
Pipeline.remove_component
def remove_component(name: str) -> Component
从管道中移除并返回组件。
通过提供组件的名称从管道中移除现有组件。连接到该组件的所有边也将被删除。
参数:
name:要移除的组件的名称。
引发:
ValueError:如果管道中不存在该名称的组件。
返回值:
移除的组件实例。
Pipeline.connect
def connect(sender: str, receiver: str) -> "PipelineBase"
将两个组件连接在一起。
所有要连接的组件都必须存在于管道中。如果连接到具有多个输出连接的组件,请将输入和输出名称指定为“component_name.connections_name”。
参数:
sender:提供值的组件。这既可以是组件名称,也可以是component_name.connection_name格式(如果组件有多个输出)。receiver:接收值的组件。这既可以是组件名称,也可以是component_name.connection_name格式(如果组件有多个输入)。
引发:
PipelineConnectError:如果两个组件无法连接(例如,如果其中一个组件不在管道中,或者连接类型不匹配等)。
返回值:
管道实例。
Pipeline.get_component
def get_component(name: str) -> Component
从管道中获取指定名称的组件。
参数:
name:组件的名称。
引发:
ValueError:如果管道中不存在该名称的组件。
返回值:
该组件的实例。
Pipeline.get_component_name
def get_component_name(instance: Component) -> str
返回组件实例的名称,如果它已添加到此管道,否则返回空字符串。
参数:
instance:要查找的组件实例。
返回值:
组件实例的名称。
Pipeline.inputs
def inputs(
include_components_with_connected_inputs: bool = False
) -> dict[str, dict[str, Any]]
返回包含管道输入的字典。
字典中的每个键对应一个组件名称,其值是另一个字典,描述该组件的输入插槽,包括它们的类型以及它们是否是可选的。
参数:
include_components_with_connected_inputs:如果为False,则输出中只包含具有断开连接的输入边的组件。
返回值:
一个字典,其中每个键是管道组件名称,每个值是该组件的输入插槽字典。
Pipeline.outputs
def outputs(
include_components_with_connected_outputs: bool = False
) -> dict[str, dict[str, Any]]
返回包含管道输出的字典。
字典中的每个键对应一个组件名称,其值是另一个字典,描述该组件的输出插槽。
参数:
include_components_with_connected_outputs:如果为False,则输出中只包含具有断开连接的输出边的组件。
返回值:
一个字典,其中每个键是管道组件名称,每个值是该组件的输出插槽字典。
Pipeline.show
def show(*,
server_url: str = "https://mermaid.ink",
params: Optional[dict] = None,
timeout: int = 30,
super_component_expansion: bool = False) -> None
在 Jupyter Notebook 中显示表示此Pipeline的图像。
此函数使用 Mermaid 服务器生成Pipeline的图表,并直接在 Notebook 中显示它。
参数:
server_url:用于渲染的 Mermaid 服务器的基本 URL(默认值:'https://mermaid.ink')。有关如何设置自己的 Mermaid 服务器的更多信息,请参阅https://github.com/jihchi/mermaid.ink和https://github.com/mermaid-js/mermaid-live-editor。params:用于修改输出的自定义参数字典。有关详细信息,请参阅 Mermaid 文档。支持的键:- format:输出格式('img'、'svg'或'pdf')。默认值:'img'。
- type:/img 端点的图像类型('jpeg'、'png'、'webp')。默认值:'png'。
- theme:Mermaid 主题('default'、'neutral'、'dark'、'forest')。默认值:'neutral'。
- bgColor:十六进制(例如,'FFFFFF')或命名格式(例如,'!white')的背景颜色。
- width:输出图像的宽度(整数)。
- height:输出图像的高度(整数)。
- scale:缩放因子(1-3)。仅当指定了'width'或'height'时适用。
- fit:是否将图表大小适应页面(仅限 PDF,布尔值)。
- paper:PDF 的纸张大小(例如,'a4'、'a3')。如果'fit'为 true,则忽略。
- landscape:PDF 的横向方向(布尔值)。如果'fit'为 true,则忽略。
timeout:向 Mermaid 服务器发出请求的超时时间(秒)。super_component_expansion:如果设置为 True 且管道包含 SuperComponents,则图表将显示超级组件的内部结构,就像它们是管道的一部分组件一样,而不是“黑匣子”。否则,将只显示超级组件本身。
引发:
PipelineDrawingError:如果函数在 Jupyter Notebook 之外调用或渲染出现问题。
Pipeline.draw
def draw(*,
path: Path,
server_url: str = "https://mermaid.ink",
params: Optional[dict] = None,
timeout: int = 30,
super_component_expansion: bool = False) -> None
将表示此Pipeline的图像保存到指定的文件路径。
此函数使用 Mermaid 服务器生成Pipeline使用 Mermaid 服务器并将其保存到提供的路径。
参数:
path:将保存生成图像的文件路径。server_url:用于渲染的 Mermaid 服务器的基本 URL(默认值:'https://mermaid.ink')。有关如何设置自己的 Mermaid 服务器的更多信息,请参阅https://github.com/jihchi/mermaid.ink和https://github.com/mermaid-js/mermaid-live-editor。params:用于修改输出的自定义参数字典。有关详细信息,请参阅 Mermaid 文档。支持的键:- format:输出格式('img'、'svg'或'pdf')。默认值:'img'。
- type:/img 端点的图像类型('jpeg'、'png'、'webp')。默认值:'png'。
- theme:Mermaid 主题('default'、'neutral'、'dark'、'forest')。默认值:'neutral'。
- bgColor:十六进制(例如,'FFFFFF')或命名格式(例如,'!white')的背景颜色。
- width:输出图像的宽度(整数)。
- height:输出图像的高度(整数)。
- scale:缩放因子(1-3)。仅当指定了'width'或'height'时适用。
- fit:是否将图表大小适应页面(仅限 PDF,布尔值)。
- paper:PDF 的纸张大小(例如,'a4'、'a3')。如果'fit'为 true,则忽略。
- landscape:PDF 的横向方向(布尔值)。如果'fit'为 true,则忽略。
timeout:向 Mermaid 服务器发出请求的超时时间(秒)。super_component_expansion:如果设置为 True 且管道包含 SuperComponents,则图表将显示超级组件的内部结构,就像它们是管道的一部分组件一样,而不是“黑匣子”。否则,将只显示超级组件本身。
引发:
PipelineDrawingError:如果渲染或保存图像出现问题。
Pipeline.walk
def walk() -> Iterator[tuple[str, Component]]
精确访问管道中的每个组件一次,并生成其名称和实例。
不保证访问顺序。
返回值:
组件名称和组件实例的元组迭代器。
Pipeline.warm_up
def warm_up() -> None
确保所有节点都已预热。
节点有责任确保此方法可以在每次Pipeline.run()调用时执行,而无需重新初始化所有内容。
Pipeline.validate_input
def validate_input(data: dict[str, Any]) -> None
验证管道输入数据。
验证数据
- 每个组件名称确实存在于管道中。
- 每个组件没有缺少任何输入。
- 每个组件的每个输入插槽只有一个输入,如果不是可变参数。
- 每个组件不会接收到其他组件已发送的输入。
参数:
data:管道组件输入的字典。每个键都是一个组件名称。
引发:
ValueError:如果输入根据上述规则无效。
Pipeline.from_template
@classmethod
def from_template(
cls,
predefined_pipeline: PredefinedPipeline,
template_params: Optional[dict[str, Any]] = None) -> "PipelineBase"
从预定义模板创建管道。有关可用选项,请参见PredefinedPipeline。
参数:
predefined_pipeline:要使用的预定义管道。template_params:在渲染管道模板时使用的可选参数字典。
返回值:
一个实例。Pipeline.
Pipeline.validate_pipeline
@staticmethod
def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None
验证管道以检查其是否被阻塞或没有有效的入口点。
参数:
priority_queue:组件名称的优先级队列。
引发:
PipelineRuntimeError:如果管道被阻塞或没有有效的入口点。
