文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
文档

SuperComponents

SuperComponent 允许您封装一个完整的 pipeline,并将其像单个组件一样使用。当您想简化复杂 pipeline 的接口、在不同上下文中重用它或只公开必要的输入和输出时,这会非常有用。

@super_component 装饰器 (推荐)

Haystack 现在提供了一个简单的@super_component 装饰器,用于将 pipeline 包装成组件。您需要做的就是创建一个带有装饰器的类,并包含一个pipeline 属性。

使用此装饰器,to_dictfrom_dict 序列化是可选的,输入和输出映射也是可选的。

示例

下面的自定义 HybridRetriever 示例 SuperComponent 将您的查询转换为嵌入,然后同时运行 BM25 搜索和基于嵌入的搜索。最后,它合并这两个结果集并返回组合后的文档。

# pip install haystack-ai datasets "sentence-transformers>=3.0.0"

from haystack import Document, Pipeline, super_component
from haystack.components.joiners import DocumentJoiner
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

from datasets import load_dataset


@super_component
class HybridRetriever:
    def __init__(self, document_store: InMemoryDocumentStore, embedder_model: str = "BAAI/bge-small-en-v1.5"):
        embedding_retriever = InMemoryEmbeddingRetriever(document_store)
        bm25_retriever = InMemoryBM25Retriever(document_store)
        text_embedder = SentenceTransformersTextEmbedder(embedder_model)
        document_joiner = DocumentJoiner()

        self.pipeline = Pipeline()
        self.pipeline.add_component("text_embedder", text_embedder)
        self.pipeline.add_component("embedding_retriever", embedding_retriever)
        self.pipeline.add_component("bm25_retriever", bm25_retriever)
        self.pipeline.add_component("document_joiner", document_joiner)

        self.pipeline.connect("text_embedder", "embedding_retriever")
        self.pipeline.connect("bm25_retriever", "document_joiner")
        self.pipeline.connect("embedding_retriever", "document_joiner")


dataset = load_dataset("HaystackBot/medrag-pubmed-chunk-with-embeddings", split="train")
docs = [Document(content=doc["contents"], embedding=doc["embedding"]) for doc in dataset]
document_store = InMemoryDocumentStore()
document_store.write_documents(docs)

query = "What treatments are available for chronic bronchitis?"

result = HybridRetriever(document_store).run(text=query, query=query)
print(result)

输入映射

您可以选择将 SuperComponent 的输入名称映射到 pipeline 中的实际插槽。

input_mapping = {
    "query": ["retriever.query", "prompt.query"]
}

输出映射

您还可以映射您想暴露给 SuperComponent 输出名称的 pipeline 输出插槽。

output_mapping = {
    "llm.replies": "replies"
}

如果您不提供映射,SuperComponent 将尝试自动检测它们。因此,如果多个组件具有同名的输出,我们建议使用output_mapping 来避免冲突。

SuperComponent 类

Haystack 还提供了继承 SuperComponent 类的选项。此选项需要to_dictfrom_dict 序列化,以及上面描述的输入和输出映射。

示例

这是一个初始化SuperComponent 并带有 pipeline 的简单示例。

from haystack import Pipeline, SuperComponent

with open("pipeline.yaml", "r") as file:
  pipeline = Pipeline.load(file)

super_component = SuperComponent(pipeline)

下面的示例 pipeline 根据用户查询检索相关文档,使用这些文档构建自定义提示,然后将提示发送给OpenAIChatGenerator 以创建答案。该SuperComponent 封装了 pipeline,因此可以使用简单的输入 (query) 运行它,并返回一个清晰的输出 (replies).

from haystack import Pipeline, SuperComponent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.builders import ChatPromptBuilder
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.dataclasses.chat_message import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.dataclasses import Document

document_store = InMemoryDocumentStore()
documents = [
    Document(content="Paris is the capital of France."),
    Document(content="London is the capital of England."),
]
document_store.write_documents(documents)

prompt_template = [
    ChatMessage.from_user(
    '''
    According to the following documents:
    {% for document in documents %}
    {{document.content}}
    {% endfor %}
    Answer the given question: {{query}}
    Answer:
    '''
    )
]

prompt_builder = ChatPromptBuilder(template=prompt_template, required_variables="*")

pipeline = Pipeline()
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("llm", OpenAIChatGenerator())
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "llm.messages")

# Create a super component with simplified input/output mapping
wrapper = SuperComponent(
    pipeline=pipeline,
    input_mapping={
        "query": ["retriever.query", "prompt_builder.query"],
    },
    output_mapping={
        "llm.replies": "replies",
        "retriever.documents": "documents"
    }
)

# Run the pipeline with simplified interface
result = wrapper.run(query="What is the capital of France?")
print(result)
{'replies': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>,
 _content=[TextContent(text='The capital of France is Paris.')],...)

类型检查和静态代码分析

使用 @supercomponent 装饰器创建 SuperComponents 可能会导致类型或 linting 错误。避免这些问题的一种方法是将公开的公共方法添加到您的 SuperComponent 中。这是一个示例。

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    def run(self, *, documents: List[Document]) -> dict[str, list[Document]]:
        ...
    def warm_up(self) -> None:  # noqa: D102
        ...

现成的 SuperComponents

您可以在 Haystack 中看到两个已集成的 SuperComponents 实现。