SuperComponents
SuperComponent 允许您封装一个完整的 pipeline,并将其像单个组件一样使用。当您想简化复杂 pipeline 的接口、在不同上下文中重用它或只公开必要的输入和输出时,这会非常有用。
@super_component 装饰器 (推荐)
@super_component 装饰器 (推荐)Haystack 现在提供了一个简单的@super_component 装饰器,用于将 pipeline 包装成组件。您需要做的就是创建一个带有装饰器的类,并包含一个pipeline 属性。
使用此装饰器,to_dict 和from_dict 序列化是可选的,输入和输出映射也是可选的。
示例
下面的自定义 HybridRetriever 示例 SuperComponent 将您的查询转换为嵌入,然后同时运行 BM25 搜索和基于嵌入的搜索。最后,它合并这两个结果集并返回组合后的文档。
# pip install haystack-ai datasets "sentence-transformers>=3.0.0"
from haystack import Document, Pipeline, super_component
from haystack.components.joiners import DocumentJoiner
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from datasets import load_dataset
@super_component
class HybridRetriever:
def __init__(self, document_store: InMemoryDocumentStore, embedder_model: str = "BAAI/bge-small-en-v1.5"):
embedding_retriever = InMemoryEmbeddingRetriever(document_store)
bm25_retriever = InMemoryBM25Retriever(document_store)
text_embedder = SentenceTransformersTextEmbedder(embedder_model)
document_joiner = DocumentJoiner()
self.pipeline = Pipeline()
self.pipeline.add_component("text_embedder", text_embedder)
self.pipeline.add_component("embedding_retriever", embedding_retriever)
self.pipeline.add_component("bm25_retriever", bm25_retriever)
self.pipeline.add_component("document_joiner", document_joiner)
self.pipeline.connect("text_embedder", "embedding_retriever")
self.pipeline.connect("bm25_retriever", "document_joiner")
self.pipeline.connect("embedding_retriever", "document_joiner")
dataset = load_dataset("HaystackBot/medrag-pubmed-chunk-with-embeddings", split="train")
docs = [Document(content=doc["contents"], embedding=doc["embedding"]) for doc in dataset]
document_store = InMemoryDocumentStore()
document_store.write_documents(docs)
query = "What treatments are available for chronic bronchitis?"
result = HybridRetriever(document_store).run(text=query, query=query)
print(result)
输入映射
您可以选择将 SuperComponent 的输入名称映射到 pipeline 中的实际插槽。
input_mapping = {
"query": ["retriever.query", "prompt.query"]
}
输出映射
您还可以映射您想暴露给 SuperComponent 输出名称的 pipeline 输出插槽。
output_mapping = {
"llm.replies": "replies"
}
如果您不提供映射,SuperComponent 将尝试自动检测它们。因此,如果多个组件具有同名的输出,我们建议使用output_mapping 来避免冲突。
SuperComponent 类
Haystack 还提供了继承 SuperComponent 类的选项。此选项需要to_dict 和from_dict 序列化,以及上面描述的输入和输出映射。
示例
这是一个初始化SuperComponent 并带有 pipeline 的简单示例。
from haystack import Pipeline, SuperComponent
with open("pipeline.yaml", "r") as file:
pipeline = Pipeline.load(file)
super_component = SuperComponent(pipeline)
下面的示例 pipeline 根据用户查询检索相关文档,使用这些文档构建自定义提示,然后将提示发送给OpenAIChatGenerator 以创建答案。该SuperComponent 封装了 pipeline,因此可以使用简单的输入 (query) 运行它,并返回一个清晰的输出 (replies).
from haystack import Pipeline, SuperComponent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.builders import ChatPromptBuilder
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.dataclasses.chat_message import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.dataclasses import Document
document_store = InMemoryDocumentStore()
documents = [
Document(content="Paris is the capital of France."),
Document(content="London is the capital of England."),
]
document_store.write_documents(documents)
prompt_template = [
ChatMessage.from_user(
'''
According to the following documents:
{% for document in documents %}
{{document.content}}
{% endfor %}
Answer the given question: {{query}}
Answer:
'''
)
]
prompt_builder = ChatPromptBuilder(template=prompt_template, required_variables="*")
pipeline = Pipeline()
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("llm", OpenAIChatGenerator())
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "llm.messages")
# Create a super component with simplified input/output mapping
wrapper = SuperComponent(
pipeline=pipeline,
input_mapping={
"query": ["retriever.query", "prompt_builder.query"],
},
output_mapping={
"llm.replies": "replies",
"retriever.documents": "documents"
}
)
# Run the pipeline with simplified interface
result = wrapper.run(query="What is the capital of France?")
print(result)
{'replies': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>,
_content=[TextContent(text='The capital of France is Paris.')],...)
类型检查和静态代码分析
使用 @supercomponent 装饰器创建 SuperComponents 可能会导致类型或 linting 错误。避免这些问题的一种方法是将公开的公共方法添加到您的 SuperComponent 中。这是一个示例。
from typing import TYPE_CHECKING
if TYPE_CHECKING:
def run(self, *, documents: List[Document]) -> dict[str, list[Document]]:
...
def warm_up(self) -> None: # noqa: D102
...
现成的 SuperComponents
您可以在 Haystack 中看到两个已集成的 SuperComponents 实现。
更新于 5 个月前
