文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
API 参考

Joiners (连接器)

将不同对象的列表合并的组件

模块 answer_joiner

JoinMode

AnswerJoiner 的加入模式枚举。

JoinMode.from_str

@staticmethod
def from_str(string: str) -> "JoinMode"

将字符串转换为 JoinMode 枚举。

AnswerJoiner

合并多个由Answer 对象组成的列表到一个列表中。

使用此组件可将来自不同生成器的答案合并到一个列表中。目前,该组件仅支持一种加入模式CONCATENATE。此模式将多个答案列表串联到一个列表中。

使用示例

在此示例中,AnswerJoiner 合并了来自两个不同生成器的答案

from haystack.components.builders import AnswerBuilder
from haystack.components.joiners import AnswerJoiner

from haystack.core.pipeline import Pipeline

from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage


query = "What's Natural Language Processing?"
messages = [ChatMessage.from_system("You are a helpful, respectful and honest assistant. Be super concise."),
            ChatMessage.from_user(query)]

pipe = Pipeline()
pipe.add_component("gpt-4o", OpenAIChatGenerator(model="gpt-4o"))
pipe.add_component("gpt-4o-mini", OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.add_component("aba", AnswerBuilder())
pipe.add_component("abb", AnswerBuilder())
pipe.add_component("joiner", AnswerJoiner())

pipe.connect("gpt-4o.replies", "aba")
pipe.connect("gpt-4o-mini.replies", "abb")
pipe.connect("aba.answers", "joiner")
pipe.connect("abb.answers", "joiner")

results = pipe.run(data={"gpt-4o": {"messages": messages},
                            "gpt-4o-mini": {"messages": messages},
                            "aba": {"query": query},
                            "abb": {"query": query}})

AnswerJoiner.__init__

def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
             top_k: Optional[int] = None,
             sort_by_score: bool = False)

创建 AnswerJoiner 组件。

参数:

  • join_mode:指定要使用的加入模式。可用模式
  • concatenate:将多个答案列表串联到一个列表中。
  • top_k:返回的最大答案数。
  • sort_by_score:如果True,则按分数降序对文档进行排序。如果文档没有分数,则其分数视为 -infinity。

AnswerJoiner.run

@component.output_types(answers=list[AnswerType])
def run(answers: Variadic[list[AnswerType]], top_k: Optional[int] = None)

根据join_mode 参数,将多个答案列表合并到一个列表中。

参数:

  • answers:要合并的答案的嵌套列表。
  • top_k:返回的最大答案数。如果提供,将覆盖实例的top_k

返回值:

包含以下键的字典

  • answers:合并的答案列表

AnswerJoiner.to_dict

def to_dict() -> dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

AnswerJoiner.from_dict

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AnswerJoiner"

从字典反序列化组件。

参数:

  • data: 要反序列化的字典。

返回值:

反序列化后的组件。

模块 branch

BranchJoiner

将管道中的多个输入分支合并到单个输出流的组件。

BranchJoiner 接收相同数据类型的多个输入,并将第一个接收到的值转发到其输出。这对于多个分支需要会聚才能继续进行的情况很有用。

常见用例

  • 循环处理 BranchJoiner 有助于关闭管道中的循环。例如,如果管道组件验证或修改传入数据并产生一个错误处理分支,BranchJoiner 可以合并两个分支并将数据发送(或在循环情况下重新发送)到评估错误的组件。请参阅下面的“用法示例”。

  • 基于决策的合并 BranchJoiner 协调来自 Router 组件(如ConditionalRouter, TextLanguageRouter)的分支。假设TextLanguageRouter 根据检测到的语言将用户查询定向到不同的检索器。每个检索器处理其分配的查询并将结果传递给BranchJoiner,后者将它们合并为单个输出,然后再将其传递给下一个组件,例如PromptBuilder.

示例用法

import json

from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage

# Define a schema for validation
person_schema = {
    "type": "object",
    "properties": {
        "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
    },
    "required": ["first_name", "last_name", "nationality"]
}

# Initialize a pipeline
pipe = Pipeline()

# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(list[ChatMessage]))
pipe.add_component('generator', OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", list[ChatMessage], unsafe=True))

# And connect them
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "generator")
pipe.connect("generator.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")

result = pipe.run(
    data={
    "generator": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
    "adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}}
)

print(json.loads(result["validator"]["validated"][0].text))


>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}

请注意,BranchJoiner 一次只能处理一种数据类型。在这种情况下,BranchJoiner 是为传递list[ChatMessage] 而创建的。这决定了BranchJoiner 将从上游连接的组件接收的数据类型,以及BranchJoiner 将通过其输出发送的数据类型。

在代码示例中,BranchJoinerJsonSchemaValidator 接收一个循环回的list[ChatMessage] 并将其发送到OpenAIChatGenerator 进行重新生成。我们可以在管道中有多个循环连接。在此实例中,下游组件只有一个(OpenAIChatGenerator),但管道可以有更多下游组件。

BranchJoiner.__init__

def __init__(type_: type)

创建BranchJoiner 组件。

参数:

  • type_:输入和输出的预期数据类型。

BranchJoiner.to_dict

def to_dict() -> dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

BranchJoiner.from_dict

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "BranchJoiner"

从字典反序列化BranchJoiner 实例。

参数:

  • data:包含序列化组件数据的字典。

返回值:

一个反序列化的BranchJoiner 实例。

BranchJoiner.run

def run(**kwargs) -> dict[str, Any]

执行BranchJoiner,选择第一个可用的输入值并将其向下游传递。

参数:

  • **kwargs:输入数据。必须是初始化时由type_ 声明的数据类型。

返回值:

一个包含单个键value 的字典,其中包含收到的第一个输入。

模块 document_joiner

JoinMode

加入模式的枚举。

JoinMode.from_str

@staticmethod
def from_str(string: str) -> "JoinMode"

将字符串转换为 JoinMode 枚举。

DocumentJoiner

将多个文档列表合并到一个列表中。

它支持不同的加入模式:

  • concatenate:在重复的情况下保留得分最高的文档。
  • merge:计算重复项的加权分数总和并合并它们。
  • reciprocal_rank_fusion:根据倒数秩融合合并并分配分数。
  • distribution_based_rank_fusion:根据每个检索器中的分数分布合并并分配分数。

使用示例

from haystack import Pipeline, Document
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.joiners import DocumentJoiner
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()
docs = [Document(content="Paris"), Document(content="Berlin"), Document(content="London")]
embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
embedder.warm_up()
docs_embeddings = embedder.run(docs)
document_store.write_documents(docs_embeddings['documents'])

p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
p.add_component(
        instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
        name="text_embedder",
    )
p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
p.add_component(instance=DocumentJoiner(), name="joiner")
p.connect("bm25_retriever", "joiner")
p.connect("embedding_retriever", "joiner")
p.connect("text_embedder", "embedding_retriever")
query = "What is the capital of France?"
p.run(data={"query": query, "text": query, "top_k": 1})

DocumentJoiner.__init__

def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
             weights: Optional[list[float]] = None,
             top_k: Optional[int] = None,
             sort_by_score: bool = True)

创建 DocumentJoiner 组件。

参数:

  • join_mode:指定要使用的加入模式。可用模式
  • concatenate:在重复的情况下保留得分最高的文档。
  • merge:计算重复项的加权分数总和并合并它们。
  • reciprocal_rank_fusion:根据倒数秩融合合并并分配分数。
  • distribution_based_rank_fusion:根据每个检索器中的分数分布合并并分配分数。
  • weights:为每个文档列表分配重要性,以影响它们的合并方式。此参数将被忽略concatenatedistribution_based_rank_fusion 加入模式。每个文档列表的权重必须与输入的数量匹配。
  • top_k: 要返回的最大文档数。
  • sort_by_score:如果True,则按分数降序对文档进行排序。如果文档没有分数,则其分数视为 -infinity。

DocumentJoiner.run

@component.output_types(documents=list[Document])
def run(documents: Variadic[list[Document]], top_k: Optional[int] = None)

根据join_mode 参数,将多个答案列表合并到一个列表中。

参数:

  • documents:要合并的文档列表的列表。
  • top_k:返回的最大文档数。将覆盖实例的top_k

返回值:

包含以下键的字典

  • documents:合并的文档列表

DocumentJoiner.to_dict

def to_dict() -> dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

DocumentJoiner.from_dict

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DocumentJoiner"

从字典反序列化组件。

参数:

  • data: 要反序列化的字典。

返回值:

反序列化后的组件。

模块 list_joiner

ListJoiner

将多个列表连接成单个扁平列表的组件。

ListJoiner 接收多个相同类型的列表,并将它们连接成一个扁平的列表。输出顺序遵循管道的执行顺序,较早的输入先添加。

使用示例

from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack import Pipeline
from haystack.components.joiners import ListJoiner


user_message = [ChatMessage.from_user("Give a brief answer the following question: {{query}}")]

feedback_prompt = """
    You are given a question and an answer.
    Your task is to provide a score and a brief feedback on the answer.
    Question: {{query}}
    Answer: {{response}}
    """
feedback_message = [ChatMessage.from_system(feedback_prompt)]

prompt_builder = ChatPromptBuilder(template=user_message)
feedback_prompt_builder = ChatPromptBuilder(template=feedback_message)
llm = OpenAIChatGenerator(model="gpt-4o-mini")
feedback_llm = OpenAIChatGenerator(model="gpt-4o-mini")

pipe = Pipeline()
pipe.add_component("prompt_builder", prompt_builder)
pipe.add_component("llm", llm)
pipe.add_component("feedback_prompt_builder", feedback_prompt_builder)
pipe.add_component("feedback_llm", feedback_llm)
pipe.add_component("list_joiner", ListJoiner(list[ChatMessage]))

pipe.connect("prompt_builder.prompt", "llm.messages")
pipe.connect("prompt_builder.prompt", "list_joiner")
pipe.connect("llm.replies", "list_joiner")
pipe.connect("llm.replies", "feedback_prompt_builder.response")
pipe.connect("feedback_prompt_builder.prompt", "feedback_llm.messages")
pipe.connect("feedback_llm.replies", "list_joiner")

query = "What is nuclear physics?"
ans = pipe.run(data={"prompt_builder": {"template_variables":{"query": query}},
    "feedback_prompt_builder": {"template_variables":{"query": query}}})

print(ans["list_joiner"]["values"])

ListJoiner.__init__

def __init__(list_type_: Optional[type] = None)

创建 ListJoiner 组件。

参数:

  • list_type_:此组件将连接的列表的预期类型(例如,list[ChatMessage])。如果指定,所有输入列表都必须符合此类型。如果为 None,则组件默认为处理任何类型的列表,包括混合类型。

ListJoiner.to_dict

def to_dict() -> dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

ListJoiner.from_dict

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ListJoiner"

从字典反序列化组件。

参数:

  • data: 要反序列化的字典。

返回值:

反序列化后的组件。

ListJoiner.run

def run(values: Variadic[list[Any]]) -> dict[str, list[Any]]

将多个列表连接成单个扁平列表。

参数:

  • values:要连接的列表。

返回值:

包含连接列表的“values”键的字典。

模块 string_joiner

StringJoiner

将来自不同组件的字符串连接到字符串列表的组件。

使用示例

from haystack.components.joiners import StringJoiner
from haystack.components.builders import PromptBuilder
from haystack.core.pipeline import Pipeline

from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage

string_1 = "What's Natural Language Processing?"
string_2 = "What is life?"

pipeline = Pipeline()
pipeline.add_component("prompt_builder_1", PromptBuilder("Builder 1: {{query}}"))
pipeline.add_component("prompt_builder_2", PromptBuilder("Builder 2: {{query}}"))
pipeline.add_component("string_joiner", StringJoiner())

pipeline.connect("prompt_builder_1.prompt", "string_joiner.strings")
pipeline.connect("prompt_builder_2.prompt", "string_joiner.strings")

print(pipeline.run(data={"prompt_builder_1": {"query": string_1}, "prompt_builder_2": {"query": string_2}}))

>> {"string_joiner": {"strings": ["Builder 1: What's Natural Language Processing?", "Builder 2: What is life?"]}}

StringJoiner.run

@component.output_types(strings=list[str])
def run(strings: Variadic[str])

将字符串连接成字符串列表

参数:

  • strings:来自不同组件的字符串

返回值:

包含以下键的字典

  • strings:合并的字符串列表