将不同对象的列表合并的组件
模块 answer_joiner
JoinMode
AnswerJoiner 的加入模式枚举。
JoinMode.from_str
@staticmethod
def from_str(string: str) -> "JoinMode"
将字符串转换为 JoinMode 枚举。
AnswerJoiner
合并多个由Answer 对象组成的列表到一个列表中。
使用此组件可将来自不同生成器的答案合并到一个列表中。目前,该组件仅支持一种加入模式CONCATENATE。此模式将多个答案列表串联到一个列表中。
使用示例
在此示例中,AnswerJoiner 合并了来自两个不同生成器的答案
from haystack.components.builders import AnswerBuilder
from haystack.components.joiners import AnswerJoiner
from haystack.core.pipeline import Pipeline
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
query = "What's Natural Language Processing?"
messages = [ChatMessage.from_system("You are a helpful, respectful and honest assistant. Be super concise."),
ChatMessage.from_user(query)]
pipe = Pipeline()
pipe.add_component("gpt-4o", OpenAIChatGenerator(model="gpt-4o"))
pipe.add_component("gpt-4o-mini", OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.add_component("aba", AnswerBuilder())
pipe.add_component("abb", AnswerBuilder())
pipe.add_component("joiner", AnswerJoiner())
pipe.connect("gpt-4o.replies", "aba")
pipe.connect("gpt-4o-mini.replies", "abb")
pipe.connect("aba.answers", "joiner")
pipe.connect("abb.answers", "joiner")
results = pipe.run(data={"gpt-4o": {"messages": messages},
"gpt-4o-mini": {"messages": messages},
"aba": {"query": query},
"abb": {"query": query}})
AnswerJoiner.__init__
def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
top_k: Optional[int] = None,
sort_by_score: bool = False)
创建 AnswerJoiner 组件。
参数:
join_mode:指定要使用的加入模式。可用模式concatenate:将多个答案列表串联到一个列表中。top_k:返回的最大答案数。sort_by_score:如果True,则按分数降序对文档进行排序。如果文档没有分数,则其分数视为 -infinity。
AnswerJoiner.run
@component.output_types(answers=list[AnswerType])
def run(answers: Variadic[list[AnswerType]], top_k: Optional[int] = None)
根据join_mode 参数,将多个答案列表合并到一个列表中。
参数:
answers:要合并的答案的嵌套列表。top_k:返回的最大答案数。如果提供,将覆盖实例的top_k。
返回值:
包含以下键的字典
answers:合并的答案列表
AnswerJoiner.to_dict
def to_dict() -> dict[str, Any]
将组件序列化为字典。
返回值:
包含序列化数据的字典。
AnswerJoiner.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AnswerJoiner"
从字典反序列化组件。
参数:
data: 要反序列化的字典。
返回值:
反序列化后的组件。
模块 branch
BranchJoiner
将管道中的多个输入分支合并到单个输出流的组件。
BranchJoiner 接收相同数据类型的多个输入,并将第一个接收到的值转发到其输出。这对于多个分支需要会聚才能继续进行的情况很有用。
常见用例
-
循环处理
BranchJoiner有助于关闭管道中的循环。例如,如果管道组件验证或修改传入数据并产生一个错误处理分支,BranchJoiner可以合并两个分支并将数据发送(或在循环情况下重新发送)到评估错误的组件。请参阅下面的“用法示例”。 -
基于决策的合并
BranchJoiner协调来自 Router 组件(如ConditionalRouter,TextLanguageRouter)的分支。假设TextLanguageRouter根据检测到的语言将用户查询定向到不同的检索器。每个检索器处理其分配的查询并将结果传递给BranchJoiner,后者将它们合并为单个输出,然后再将其传递给下一个组件,例如PromptBuilder.
示例用法
import json
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage
# Define a schema for validation
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}
# Initialize a pipeline
pipe = Pipeline()
# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(list[ChatMessage]))
pipe.add_component('generator', OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", list[ChatMessage], unsafe=True))
# And connect them
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "generator")
pipe.connect("generator.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")
result = pipe.run(
data={
"generator": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}}
)
print(json.loads(result["validator"]["validated"][0].text))
>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}
请注意,BranchJoiner 一次只能处理一种数据类型。在这种情况下,BranchJoiner 是为传递list[ChatMessage] 而创建的。这决定了BranchJoiner 将从上游连接的组件接收的数据类型,以及BranchJoiner 将通过其输出发送的数据类型。
在代码示例中,BranchJoiner 从JsonSchemaValidator 接收一个循环回的list[ChatMessage] 并将其发送到OpenAIChatGenerator 进行重新生成。我们可以在管道中有多个循环连接。在此实例中,下游组件只有一个(OpenAIChatGenerator),但管道可以有更多下游组件。
BranchJoiner.__init__
def __init__(type_: type)
创建BranchJoiner 组件。
参数:
type_:输入和输出的预期数据类型。
BranchJoiner.to_dict
def to_dict() -> dict[str, Any]
将组件序列化为字典。
返回值:
包含序列化数据的字典。
BranchJoiner.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "BranchJoiner"
从字典反序列化BranchJoiner 实例。
参数:
data:包含序列化组件数据的字典。
返回值:
一个反序列化的BranchJoiner 实例。
BranchJoiner.run
def run(**kwargs) -> dict[str, Any]
执行BranchJoiner,选择第一个可用的输入值并将其向下游传递。
参数:
**kwargs:输入数据。必须是初始化时由type_声明的数据类型。
返回值:
一个包含单个键value 的字典,其中包含收到的第一个输入。
模块 document_joiner
JoinMode
加入模式的枚举。
JoinMode.from_str
@staticmethod
def from_str(string: str) -> "JoinMode"
将字符串转换为 JoinMode 枚举。
DocumentJoiner
将多个文档列表合并到一个列表中。
它支持不同的加入模式:
- concatenate:在重复的情况下保留得分最高的文档。
- merge:计算重复项的加权分数总和并合并它们。
- reciprocal_rank_fusion:根据倒数秩融合合并并分配分数。
- distribution_based_rank_fusion:根据每个检索器中的分数分布合并并分配分数。
使用示例
from haystack import Pipeline, Document
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.joiners import DocumentJoiner
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
document_store = InMemoryDocumentStore()
docs = [Document(content="Paris"), Document(content="Berlin"), Document(content="London")]
embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
embedder.warm_up()
docs_embeddings = embedder.run(docs)
document_store.write_documents(docs_embeddings['documents'])
p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
p.add_component(
instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
name="text_embedder",
)
p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
p.add_component(instance=DocumentJoiner(), name="joiner")
p.connect("bm25_retriever", "joiner")
p.connect("embedding_retriever", "joiner")
p.connect("text_embedder", "embedding_retriever")
query = "What is the capital of France?"
p.run(data={"query": query, "text": query, "top_k": 1})
DocumentJoiner.__init__
def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
weights: Optional[list[float]] = None,
top_k: Optional[int] = None,
sort_by_score: bool = True)
创建 DocumentJoiner 组件。
参数:
join_mode:指定要使用的加入模式。可用模式concatenate:在重复的情况下保留得分最高的文档。merge:计算重复项的加权分数总和并合并它们。reciprocal_rank_fusion:根据倒数秩融合合并并分配分数。distribution_based_rank_fusion:根据每个检索器中的分数分布合并并分配分数。weights:为每个文档列表分配重要性,以影响它们的合并方式。此参数将被忽略concatenate或distribution_based_rank_fusion加入模式。每个文档列表的权重必须与输入的数量匹配。top_k: 要返回的最大文档数。sort_by_score:如果True,则按分数降序对文档进行排序。如果文档没有分数,则其分数视为 -infinity。
DocumentJoiner.run
@component.output_types(documents=list[Document])
def run(documents: Variadic[list[Document]], top_k: Optional[int] = None)
根据join_mode 参数,将多个答案列表合并到一个列表中。
参数:
documents:要合并的文档列表的列表。top_k:返回的最大文档数。将覆盖实例的top_k。
返回值:
包含以下键的字典
documents:合并的文档列表
DocumentJoiner.to_dict
def to_dict() -> dict[str, Any]
将组件序列化为字典。
返回值:
包含序列化数据的字典。
DocumentJoiner.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DocumentJoiner"
从字典反序列化组件。
参数:
data: 要反序列化的字典。
返回值:
反序列化后的组件。
模块 list_joiner
ListJoiner
将多个列表连接成单个扁平列表的组件。
ListJoiner 接收多个相同类型的列表,并将它们连接成一个扁平的列表。输出顺序遵循管道的执行顺序,较早的输入先添加。
使用示例
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack import Pipeline
from haystack.components.joiners import ListJoiner
user_message = [ChatMessage.from_user("Give a brief answer the following question: {{query}}")]
feedback_prompt = """
You are given a question and an answer.
Your task is to provide a score and a brief feedback on the answer.
Question: {{query}}
Answer: {{response}}
"""
feedback_message = [ChatMessage.from_system(feedback_prompt)]
prompt_builder = ChatPromptBuilder(template=user_message)
feedback_prompt_builder = ChatPromptBuilder(template=feedback_message)
llm = OpenAIChatGenerator(model="gpt-4o-mini")
feedback_llm = OpenAIChatGenerator(model="gpt-4o-mini")
pipe = Pipeline()
pipe.add_component("prompt_builder", prompt_builder)
pipe.add_component("llm", llm)
pipe.add_component("feedback_prompt_builder", feedback_prompt_builder)
pipe.add_component("feedback_llm", feedback_llm)
pipe.add_component("list_joiner", ListJoiner(list[ChatMessage]))
pipe.connect("prompt_builder.prompt", "llm.messages")
pipe.connect("prompt_builder.prompt", "list_joiner")
pipe.connect("llm.replies", "list_joiner")
pipe.connect("llm.replies", "feedback_prompt_builder.response")
pipe.connect("feedback_prompt_builder.prompt", "feedback_llm.messages")
pipe.connect("feedback_llm.replies", "list_joiner")
query = "What is nuclear physics?"
ans = pipe.run(data={"prompt_builder": {"template_variables":{"query": query}},
"feedback_prompt_builder": {"template_variables":{"query": query}}})
print(ans["list_joiner"]["values"])
ListJoiner.__init__
def __init__(list_type_: Optional[type] = None)
创建 ListJoiner 组件。
参数:
list_type_:此组件将连接的列表的预期类型(例如,list[ChatMessage])。如果指定,所有输入列表都必须符合此类型。如果为 None,则组件默认为处理任何类型的列表,包括混合类型。
ListJoiner.to_dict
def to_dict() -> dict[str, Any]
将组件序列化为字典。
返回值:
包含序列化数据的字典。
ListJoiner.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ListJoiner"
从字典反序列化组件。
参数:
data: 要反序列化的字典。
返回值:
反序列化后的组件。
ListJoiner.run
def run(values: Variadic[list[Any]]) -> dict[str, list[Any]]
将多个列表连接成单个扁平列表。
参数:
values:要连接的列表。
返回值:
包含连接列表的“values”键的字典。
模块 string_joiner
StringJoiner
将来自不同组件的字符串连接到字符串列表的组件。
使用示例
from haystack.components.joiners import StringJoiner
from haystack.components.builders import PromptBuilder
from haystack.core.pipeline import Pipeline
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
string_1 = "What's Natural Language Processing?"
string_2 = "What is life?"
pipeline = Pipeline()
pipeline.add_component("prompt_builder_1", PromptBuilder("Builder 1: {{query}}"))
pipeline.add_component("prompt_builder_2", PromptBuilder("Builder 2: {{query}}"))
pipeline.add_component("string_joiner", StringJoiner())
pipeline.connect("prompt_builder_1.prompt", "string_joiner.strings")
pipeline.connect("prompt_builder_2.prompt", "string_joiner.strings")
print(pipeline.run(data={"prompt_builder_1": {"query": string_1}, "prompt_builder_2": {"query": string_2}}))
>> {"string_joiner": {"strings": ["Builder 1: What's Natural Language Processing?", "Builder 2: What is life?"]}}
StringJoiner.run
@component.output_types(strings=list[str])
def run(strings: Variadic[str])
将字符串连接成字符串列表
参数:
strings:来自不同组件的字符串
返回值:
包含以下键的字典
strings:合并的字符串列表
