文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
文档

管道序列化

将您的管道保存为自定义格式,并探索序列化选项。

序列化是指将管道转换为一种格式,您可以将其保存在本地磁盘上并在以后加载。

📘

序列化格式

目前 Haystack 2.0 只支持 YAML 格式。我们将逐步推出更多格式。

将管道转换为 YAML

使用使用 dumps() 方法将 Pipeline 对象转换为 YAML。

from haystack import Pipeline

pipe = Pipeline()
print(pipe.dumps())

# Prints:
#
# components: {}
# connections: []
# max_loops_allowed: 100
# metadata: {}

您也可以使用dump() 方法将管道的 YAML 表示形式保存到文件中。

with open("/content/test.yml", "w") as file:
  pipe.dump(file)

将管道转换回 Python

您可以将 YAML 管道转换回 Python。使用loads() 方法将管道的字符串表示形式(str, bytesbytearray)或load() 方法将文件状对象中表示的管道转换为相应的 Python 对象。

两种加载方法都支持回调,允许您在反序列化过程中修改组件。

这是一个示例脚本

from haystack import Pipeline
from haystack.core.serialization import DeserializationCallbacks
from typing import Type, Dict, Any


# This is the YAML you want to convert to Python:
pipeline_yaml = """
components:
  cleaner:
    init_parameters:
      remove_empty_lines: true
      remove_extra_whitespaces: true
      remove_regex: null
      remove_repeated_substrings: false
      remove_substrings: null
    type: haystack.components.preprocessors.document_cleaner.DocumentCleaner
  converter:
    init_parameters:
      encoding: utf-8
    type: haystack.components.converters.txt.TextFileToDocument
connections:
- receiver: cleaner.documents
  sender: converter.documents
max_loops_allowed: 100
metadata: {}
"""

def component_pre_init_callback(component_name: str, component_cls: Type, init_params: Dict[str, Any]):
   # This function gets called every time a component is deserialized.
   if component_name == "cleaner":
      assert "DocumentCleaner" in component_cls.__name__
      # Modify the init parameters. The modified parameters are passed to
      # the init method of the component during deserialization.
      init_params["remove_empty_lines"] = False
      print("Modified 'remove_empty_lines' to False in 'cleaner' component")
   else:
      print(f"Not modifying component {component_name} of class {component_cls}")

pipe = Pipeline.loads(pipeline_yaml, callbacks=DeserializationCallbacks(component_pre_init_callback))

执行自定义序列化

Haystack 中的管道和组件可以开箱即用地序列化简单的组件,包括自定义组件。像这样的代码就可以正常工作。

from haystack import component

@component
class RepeatWordComponent:
    def __init__(self, times: int):
        self.times = times

    @component.output_types(result=str)
    def run(self, word: str):
        return word * self.times

另一方面,如果最终格式是 JSON,这段代码将无法工作,因为set 类型不可 JSON 序列化。

from haystack import component

@component
class SetIntersector:
    def __init__(self, intersect_with: set):
        self.intersect_with = intersect_with

    @component.output_types(result=set)
    def run(self, data: set):
        return data.intersection(self.intersect_with)

在这种情况下,您可以提供自己的实现。from_dictto_dict 到组件。

from haystack import component, default_from_dict, default_to_dict

class SetIntersector:
		def __init__(self, intersect_with: set):
	      self.intersect_with = intersect_with
		
    @component.output_types(result=set)
	  def run(self, data: set):
        return data.intersect(self.intersect_with)

    def to_dict(self):
        return default_to_dict(self, intersect_with=list(self.intersect_with))

    @classmethod
    def from_dict(cls, data):
        # convert the set into a list for the dict representation,
        # so it can be converted to JSON
        data["intersect_with"] = set(data["intersect_with"])
        return default_from_dict(cls, data)

将管道保存到自定义格式

一旦管道以其字典格式可用,序列化的最后一步就是将该字典转换为您可以存储或通过网络发送的格式。Haystack 开箱即用地支持 YAML,但如果您需要不同的格式,您可以编写一个自定义的 Marshaller。

一个Marshaller 是一个 Python 类,负责根据特定格式将文本转换为字典,并将字典转换为文本。Marshaller 必须遵守Marshaller 协议,提供marshalunmarshal 方法。.

这是自定义 TOML marshaller 的代码,它依赖于rtoml 库。

# This code requires a `pip install rtoml`
from typing import Dict, Any, Union
import rtoml

class TomlMarshaller:
    def marshal(self, dict_: Dict[str, Any]) -> str:
        return rtoml.dumps(dict_)

    def unmarshal(self, data_: Union[str, bytes]) -> Dict[str, Any]:
        return dict(rtoml.loads(data_))

然后,您可以将 Marshaller 实例传递给以下方法:dump, dumps, loadloads。:

from haystack import Pipeline
from my_custom_marshallers import TomlMarshaller

pipe = Pipeline()
pipe.dumps(TomlMarshaller())
# prints:
# 'max_loops_allowed = 100\nconnections = []\n\n[metadata]\n\n[components]\n'

其他参考资料

📓 教程:序列化 LLM 管道