文档API 参考📓 教程🧑‍🍳 料理手册🤝 集成💜 Discord🎨 Studio
文档

AsyncPipeline

使用 AsyncPipeline 同时运行多个 Haystack 组件以加快处理速度。

Haystack 中的 AsyncPipeline 引入了异步执行能力,允许在依赖关系允许的情况下并发执行组件。这优化了性能,尤其是在允许多个独立组件并行运行的复杂管道中。

AsyncPipeline 在以下场景中提供了显著的性能改进:

  • 混合检索管道,其中多个检索器可以并行运行,
  • 可以并发执行的多个 LLM 调用,
  • 具有独立执行分支的复杂管道,
  • 受益于异步执行的 I/O 密集型操作。

主要特点

并发执行

AsyncPipeline 根据输入的就绪状态和依赖关系解析来调度组件,确保在可能的情况下进行高效的并行执行。例如,在混合检索场景中,如果多个检索器之间没有依赖关系,它们可以同时运行。

执行方法

AsyncPipeline 提供了三种运行管道的方法:

同步运行(run)

)使用提供的数据同步执行管道。此方法是阻塞的,因此适用于不支持或不需要异步执行的环境。尽管组件在内部并发执行,但该方法会阻塞直到管道完成。

异步运行(run_async)

)以异步方式执行管道,允许非阻塞执行。当将管道集成到异步工作流中时,此方法是理想的选择,它可以在更大的异步应用程序或服务中实现平稳运行。

异步生成器(run_async_generator)

)通过在组件完成任务时产生部分输出来允许分步执行。这对于监控进度、调试和增量处理输出特别有用。它与run_async 不同,后者在一次异步调用中执行管道。

AsyncPipeline 中,组件 A 和 B 将并行运行,*前提是它们没有共享依赖项* 并且可以独立处理输入。

并发控制

您可以使用concurrency_limit 参数来控制同时运行的组件的最大数量,以确保资源使用可控。

您可以在我们的API 参考中找到更多详细信息,或者直接在管道的GitHub 代码中查看。

示例

import asyncio
from haystack import AsyncPipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryEmbeddingRetriever, InMemoryBM25Retriever
from haystack.components.joiners import DocumentJoiner
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator

hybrid_rag_retrieval = AsyncPipeline()
hybrid_rag_retrieval.add_component("text_embedder", SentenceTransformersTextEmbedder())
hybrid_rag_retrieval.add_component("embedding_retriever", InMemoryEmbeddingRetriever(document_store=document_store))
hybrid_rag_retrieval.add_component("bm25_retriever", InMemoryBM25Retriever(document_store=document_store))
hybrid_rag_retrieval.add_component("document_joiner", DocumentJoiner())
hybrid_rag_retrieval.add_component("prompt_builder", ChatPromptBuilder(template=prompt_template))
hybrid_rag_retrieval.add_component("llm", OpenAIChatGenerator())

hybrid_rag_retrieval.connect("text_embedder", "embedding_retriever")
hybrid_rag_retrieval.connect("bm25_retriever", "document_joiner")
hybrid_rag_retrieval.connect("embedding_retriever", "document_joiner")
hybrid_rag_retrieval.connect("document_joiner", "prompt_builder.documents")
hybrid_rag_retrieval.connect("prompt_builder", "llm")

async def process_results():
    async for partial_output in hybrid_rag_retrieval.run_async_generator(
            data=data,
            include_outputs_from={"document_joiner", "llm"}
    ):
        # Each partial_output contains the results from a completed component
        if "retriever" in partial_output:
            print("Retrieved documents:", len(partial_output["document_joiner"]["documents"]))
        if "llm" in partial_output:
            print("Generated answer:", partial_output["llm"]["replies"][0])

asyncio.run(process_results())