文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
文档

BranchJoiner

使用此组件将管道的不同分支连接到单个输出。

pipeline 中的最常见位置灵活:可以出现在管道的开头,或循环的开始。
必需的初始化变量"type": 预期的前置组件的数据类型
强制运行变量“**kwargs”: 初始化时定义的任何输入数据类型。此输入是可变数量的,意味着您可以将其连接到可变数量的组件。
输出变量“value”: 从连接的组件接收的第一个值。
API 参考Joiners (连接器)
GitHub 链接https://github.com/deepset-ai/haystack/blob/main/haystack/components/joiners/branch.py

概述

BranchJoiner 连接管道中的多个分支,允许其输出合并到一个分支中。这在需要将多个分支合并到下一个单个组件之前进行统一的管道中特别有用。

BranchJoiner 接收来自其他组件的相同类型的多个数据连接,并将其接收的第一个值传递到其单个输出。这使得它对于关闭管道中的循环或协调决策组件的多个分支至关重要。

BranchJoiner 只能处理由__init__ 函数声明的一个数据类型的单个输入。它确保在整个管道分支中数据类型保持一致。如果在调用 run 时收到多个值,该组件将引发错误

from haystack.components.joiners import BranchJoiner

bj = BranchJoiner(int)
bj.run(value=[3, 4, 5])

>>> ValueError: BranchJoiner expects only one input, but 3 were received.

用法

单独使用

尽管每次运行只允许一个输入值,但由于其可变数量的性质BranchJoiner 仍然期望一个列表。例如

from haystack.components.joiners import BranchJoiner

# an example where input and output are strings
bj = BranchJoiner(str)
bj.run(value=["hello"])
>>> {"value" : "hello"}

# an example where input and output are integers
bj = BranchJoiner(int)
bj.run(value=[3])
>>> {"value": 3}

在 pipeline 中

启用循环

下面是一个使用BranchJoiner 来关闭循环的示例。在此示例中,BranchJoiner 接收来自JsonSchemaValidator 的循环回的ChatMessage 对象列表,并将其发送到OpenAIChatGenerator 进行重新生成。

import json
from typing import List

from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage

person_schema = {
    "type": "object",
    "properties": {
        "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
    },
    "required": ["first_name", "last_name", "nationality"]
}

# Initialize a pipeline
pipe = Pipeline()

# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage], unsafe=True))

# Connect components
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")

result = pipe.run(data={
    "fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
    "adapter": {"chat_message": [ChatMessage.from_user("Create json object from Peter Parker")]}
})

print(json.loads(result["validator"]["validated"][0].text))

# Output:
# {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
# 'Superhero', 'age': 23, 'location': 'New York City'}

展开以查看管道图

协调分支

在此示例中,TextLanguageRouter 组件将查询定向到三个语言特定的检索器之一。下一个组件将是PromptBuilder,但我们无法直接将多个检索器连接到单个PromptBuilder。相反,我们将所有检索器连接到BranchJoiner 组件。然后,BranchJoiner 获取实际调用的检索器的输出,并将其作为单个文档列表传递给PromptBuilder。通过将来自检索器的不同输出整合到统一的连接中以供进一步处理,BranchJoiner 确保管道可以无缝处理多种语言。

from haystack import Document, Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.joiners import BranchJoiner
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.components.routers import TextLanguageRouter

prompt_template = """
Answer the quesiton based on the given reviews.
Reviews:
  {% for doc in documents %}
    {{ doc.content }}
  {% endfor %}
Question: {{ query}}
Answer:
"""

documents = [
    Document(
        content="Super appartement. Juste au dessus de plusieurs bars qui ferment très tard. A savoir à l'avance. (Bouchons d'oreilles fournis !)"
    ),
    Document(
        content="El apartamento estaba genial y muy céntrico, todo a mano. Al lado de la librería Lello y De la Torre de los clérigos. Está situado en una zona de marcha, así que si vais en fin de semana , habrá ruido, aunque a nosotros no nos molestaba para dormir"
    ),
    Document(
        content="The keypad with a code is convenient and the location is convenient. Basically everything else, very noisy, wi-fi didn't work, check-in person didn't explain anything about facilities, shower head was broken, there's no cleaning and everything else one may need is charged."
    ),
    Document(
        content="It is very central and appartement has a nice appearance (even though a lot IKEA stuff), *W A R N I N G** the appartement presents itself as a elegant and as a place to relax, very wrong place to relax - you cannot sleep in this appartement, even the beds are vibrating from the bass of the clubs in the same building - you get ear plugs from the hotel."
    ),
    Document(
        content="Céntrico. Muy cómodo para moverse y ver Oporto. Edificio con terraza propia en la última planta. Todo reformado y nuevo. Te traen un estupendo desayuno todas las mañanas al apartamento. Solo que se puede escuchar algo de ruido de la calle a primeras horas de la noche. Es un zona de ocio nocturno. Pero respetan los horarios."
    ),
]

en_document_store = InMemoryDocumentStore()
fr_document_store = InMemoryDocumentStore()
es_document_store = InMemoryDocumentStore()

rag_pipeline = Pipeline()
rag_pipeline.add_component(instance=TextLanguageRouter(["en", "fr", "es"]), name="router")
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=en_document_store), name="en_retriever")
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=fr_document_store), name="fr_retriever")
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=es_document_store), name="es_retriever")
rag_pipeline.add_component(instance=BranchJoiner(type_=list[Document]), name="joiner")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")

rag_pipeline.connect("router.en", "en_retriever.query")
rag_pipeline.connect("router.fr", "fr_retriever.query")
rag_pipeline.connect("router.es", "es_retriever.query")
rag_pipeline.connect("en_retriever", "joiner")
rag_pipeline.connect("fr_retriever", "joiner")
rag_pipeline.connect("es_retriever", "joiner")
rag_pipeline.connect("joiner", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")

en_question = "Does this apartment has a noise problem?"

result = rag_pipeline.run({"router": {"text": en_question}, "prompt_builder": {"query": en_question}})

print(result["llm"]["replies"][0])

展开以查看管道图

相关链接

在我们的 API 参考中查看参数详情