在整个库中使用的实用函数和类。
模块 type_serialization
serialize_type
def serialize_type(target: Any) -> str
将类型或实例序列化为其字符串表示形式,包括模块名称。
此函数处理类型、类型实例和特殊类型对象。它假定非类型对象将具有 'name' 属性。
参数:
target:要序列化的对象,可以是实例或类型。
返回值:
类型的字符串表示形式。
deserialize_type
def deserialize_type(type_str: str) -> Any
根据类型的完整导入路径(字符串表示形式)反序列化类型,包括嵌套的泛型类型。
此函数将在尚未导入时动态导入模块,然后从该模块中检索类型对象。它还可以处理嵌套的泛型类型,如list[dict[int, str]].
参数:
type_str:类型的完整导入路径的字符串表示形式。
引发:
DeserializationError:如果由于缺少模块或类型而无法反序列化该类型。
返回值:
反序列化后的类型对象。
thread_safe_import
def thread_safe_import(module_name: str) -> ModuleType
以线程安全的方式导入模块。
在多线程环境中导入模块可能导致竞争条件。此函数可确保模块以线程安全的方式导入,而不会影响单线程环境下的导入性能。
参数:
module_name:要导入的模块
模块 asynchronous
is_callable_async_compatible
def is_callable_async_compatible(func: Callable) -> bool
返回给定的可调用对象是否可以在组件的run_async 方法中使用。
参数:
callable:要检查的可调用对象。
返回值:
如果可调用对象兼容,则为 True,否则为 False。
模块 requests_utils
request_with_retry
def request_with_retry(attempts: int = 3,
status_codes_to_retry: Optional[list[int]] = None,
**kwargs: Any) -> requests.Response
执行 HTTP 请求,并在失败时具有可配置的指数退避重试。
使用示例
from haystack.utils import request_with_retry
# Sending an HTTP request with default retry configs
res = request_with_retry(method="GET", url="https://example.com")
# Sending an HTTP request with custom number of attempts
res = request_with_retry(method="GET", url="https://example.com", attempts=10)
# Sending an HTTP request with custom HTTP codes to retry
res = request_with_retry(method="GET", url="https://example.com", status_codes_to_retry=[408, 503])
# Sending an HTTP request with custom timeout in seconds
res = request_with_retry(method="GET", url="https://example.com", timeout=5)
# Sending an HTTP request with custom authorization handling
class CustomAuth(requests.auth.AuthBase):
def __call__(self, r):
r.headers["authorization"] = "Basic <my_token_here>"
return r
res = request_with_retry(method="GET", url="https://example.com", auth=CustomAuth())
# All of the above combined
res = request_with_retry(
method="GET",
url="https://example.com",
auth=CustomAuth(),
attempts=10,
status_codes_to_retry=[408, 503],
timeout=5
)
# Sending a POST request
res = request_with_retry(method="POST", url="https://example.com", data={"key": "value"}, attempts=10)
# Retry all 5xx status codes
res = request_with_retry(method="GET", url="https://example.com", status_codes_to_retry=list(range(500, 600)))
参数:
attempts:重试请求的最大次数。status_codes_to_retry:将触发重试的 HTTP 状态码列表。当参数为None时,将重试 HTTP 408、418、429 和 503。kwargs:`request` 接受的可选参数。request接受。
返回值:
该Response 对象。
async_request_with_retry
async def async_request_with_retry(attempts: int = 3,
status_codes_to_retry: Optional[
list[int]] = None,
**kwargs: Any) -> httpx.Response
执行异步 HTTP 请求,并在失败时具有可配置的指数退避重试。
使用示例
import asyncio
from haystack.utils import async_request_with_retry
# Sending an async HTTP request with default retry configs
async def example():
res = await async_request_with_retry(method="GET", url="https://example.com")
return res
# Sending an async HTTP request with custom number of attempts
async def example_with_attempts():
res = await async_request_with_retry(method="GET", url="https://example.com", attempts=10)
return res
# Sending an async HTTP request with custom HTTP codes to retry
async def example_with_status_codes():
res = await async_request_with_retry(method="GET", url="https://example.com", status_codes_to_retry=[408, 503])
return res
# Sending an async HTTP request with custom timeout in seconds
async def example_with_timeout():
res = await async_request_with_retry(method="GET", url="https://example.com", timeout=5)
return res
# Sending an async HTTP request with custom headers
async def example_with_headers():
headers = {"Authorization": "Bearer <my_token_here>"}
res = await async_request_with_retry(method="GET", url="https://example.com", headers=headers)
return res
# All of the above combined
async def example_combined():
headers = {"Authorization": "Bearer <my_token_here>"}
res = await async_request_with_retry(
method="GET",
url="https://example.com",
headers=headers,
attempts=10,
status_codes_to_retry=[408, 503],
timeout=5
)
return res
# Sending an async POST request
async def example_post():
res = await async_request_with_retry(
method="POST",
url="https://example.com",
json={"key": "value"},
attempts=10
)
return res
# Retry all 5xx status codes
async def example_5xx():
res = await async_request_with_retry(
method="GET",
url="https://example.com",
status_codes_to_retry=list(range(500, 600))
)
return res
参数:
attempts:重试请求的最大次数。status_codes_to_retry:将触发重试的 HTTP 状态码列表。当参数为None时,将重试 HTTP 408、418、429 和 503。kwargs:`request` 接受的可选参数。httpx.AsyncClient.request接受。
返回值:
该httpx.Response 对象。
模块 azure
default_azure_ad_token_provider
def default_azure_ad_token_provider() -> str
使用 DefaultAzureCredential 和 "https://cognitiveservices.azure.com/.default" 范围获取 Azure AD 令牌。
模块 callable_serialization
serialize_callable
def serialize_callable(callable_handle: Callable) -> str
将可调用对象序列化为其完整路径。
参数:
callable_handle:要序列化的可调用对象
返回值:
可调用对象的完整路径
deserialize_callable
def deserialize_callable(callable_handle: str) -> Callable
根据其字符串表示形式的完整导入路径反序列化可调用对象。
参数:
callable_handle:可调用对象的完整路径
引发:
DeserializationError:如果找不到该可调用对象
返回值:
可调用对象
模块 misc
expand_page_range
def expand_page_range(page_range: list[Union[str, int]]) -> list[int]
将页面编号和范围列表展开为页面编号列表。
例如,给定 page_range=['1-3', '5', '8', '10-12'],函数将返回 [1, 2, 3, 5, 8, 10, 11, 12]
参数:
page_range:页面编号和范围列表
返回值:
展开后的页面整数列表
expit
def expit(
x: Union[float, ndarray[Any, Any]]) -> Union[float, ndarray[Any, Any]]
计算 Logistic Sigmoid 函数。将输入值映射到 0 和 1 之间的范围
参数:
x:输入值。可以是标量或 NumPy 数组。
模块 filters
raise_on_invalid_filter_syntax
def raise_on_invalid_filter_syntax(
filters: Optional[dict[str, Any]] = None) -> None
如果过滤器语法无效,则引发错误。
document_matches_filter
def document_matches_filter(filters: dict[str, Any],
document: Union[Document, ByteStream]) -> bool
返回filters 是否匹配 Document 或 ByteStream。
有关过滤器的详细规范,请参阅DocumentStore.filter_documents() 协议文档。
模块 jinja2_chat_extension
ChatMessageExtension
一个 Jinja2 扩展,用于创建具有混合内容类型的结构化聊天消息。
此扩展提供了自定义{% message %} 标签,允许创建具有不同属性(角色、名称、元数据)和混合内容类型(文本、图像等)的聊天消息。
灵感来自 Banks。
示例:
{% message role="system" %}
You are a helpful assistant. You like to talk with {{user_name}}.
{% endmessage %}
{% message role="user" %}
Hello! I am {{user_name}}. Please describe the images.
{% for image in images %}
{{ image | templatize_part }}
{% endfor %}
{% endmessage %}
工作原理
- 该使用 `{% message %}` 标签定义聊天消息。
- 消息可以包含文本和其他结构化内容部分。
- 要将结构化内容部分包含在消息中,请使用
| templatize_part过滤器。该过滤器将内容部分序列化为 JSON 字符串,并将其包装在<haystack_content_part>标签中。 - 该扩展的 `_build_chat_message_json` 方法解析消息内容部分,将其转换为 ChatMessage 对象,并将其序列化为 JSON 字符串。
- 获得的 JSON 字符串可在 ChatPromptBuilder 组件中使用,该组件中的模板被渲染为实际的 ChatMessage 对象。
ChatMessageExtension.parse
def parse(parser: Any) -> Union[nodes.Node, list[nodes.Node]]
解析 Jinja2 模板中的消息标签及其属性。
此方法处理角色(必需)、名称(可选)、元数据(可选)和消息正文内容的解析。
参数:
parser:Jinja2 解析器实例
引发:
TemplateSyntaxError:如果提供了无效角色
返回值:
包含已解析消息配置的 CallBlock 节点
templatize_part
def templatize_part(value: ChatMessageContentT) -> str
Jinja 过滤器,用于将 ChatMessageContentT 对象转换为 JSON 字符串,并包装在特殊的 XML 内容标签中。
参数:
value:要转换的 ChatMessageContentT 对象
引发:
ValueError:如果值不是 ChatMessageContentT 的实例
返回值:
包装在特殊 XML 内容标签中的 JSON 字符串
模块 jinja2_extensions
Jinja2TimeExtension
Jinja2TimeExtension.__init__
def __init__(environment: Environment)
初始化 JinjaTimeExtension 对象。
参数:
environment:用于初始化扩展的 Jinja2 环境。它提供了扩展将要操作的上下文。
Jinja2TimeExtension.parse
def parse(parser: Any) -> Union[nodes.Node, list[nodes.Node]]
解析模板表达式以确定如何处理日期时间格式。
参数:
parser:处理模板表达式并管理语法树的解析器对象。它用于解释模板的结构。
模块 base_serialization
serialize_class_instance
def serialize_class_instance(obj: Any) -> dict[str, Any]
将具有 `to_dict` 方法的对象序列化为字典。to_dict 方法
参数:
obj:要序列化的对象。
引发:
SerializationError:如果对象没有 `to_dict` 方法。to_dict方法。
返回值:
对象的字典表示。
deserialize_class_instance
def deserialize_class_instance(data: dict[str, Any]) -> Any
从由 `auto_serialize_class_instance` 生成的字典表示形式反序列化对象。auto_serialize_class_instance.
参数:
data: 要反序列化的字典。
引发:
DeserializationError:如果序列化数据格式错误、类类型无法导入,或类缺少 `from_dict` 方法。from_dict方法。
返回值:
反序列化后的对象。
模块 device
DeviceType
表示 Haystack 支持的设备类型。
这也包括不直接由模型使用的设备 - 例如,磁盘设备仅在支持将模型权重卸载到磁盘的框架的设备映射中使用。
DeviceType.from_str
@staticmethod
def from_str(string: str) -> "DeviceType"
从字符串创建设备类型。
参数:
string:要转换的字符串。
返回值:
设备类型。
Device
设备的通用表示。
参数:
type:设备类型。id:可选的设备 ID。
Device.__init__
def __init__(type: DeviceType, id: Optional[int] = None)
创建通用设备。
参数:
type:设备类型。id:设备 ID。
Device.cpu
@staticmethod
def cpu() -> "Device"
创建通用 CPU 设备。
返回值:
CPU 设备。
Device.gpu
@staticmethod
def gpu(id: int = 0) -> "Device"
创建通用 GPU 设备。
参数:
id:GPU ID。
返回值:
GPU 设备。
Device.disk
@staticmethod
def disk() -> "Device"
创建通用磁盘设备。
返回值:
磁盘设备。
Device.mps
@staticmethod
def mps() -> "Device"
创建通用 Apple Metal Performance Shader 设备。
返回值:
MPS 设备。
Device.xpu
@staticmethod
def xpu() -> "Device"
创建通用 Intel GPU 优化设备。
返回值:
XPU 设备。
Device.from_str
@staticmethod
def from_str(string: str) -> "Device"
从字符串创建通用设备。
返回值:
设备。
DeviceMap
字符串到设备的通用映射。
字符串的语义取决于目标框架。主要用于将 HuggingFace 模型部署到多个设备。
参数:
mapping:将字符串映射到设备的字典。
DeviceMap.to_dict
def to_dict() -> dict[str, str]
将映射序列化为 JSON 可序列化字典。
返回值:
序列化后的映射。
DeviceMap.first_device
@property
def first_device() -> Optional[Device]
返回映射中的第一个设备(如果存在)。
返回值:
第一个设备。
DeviceMap.from_dict
@staticmethod
def from_dict(dict: dict[str, str]) -> "DeviceMap"
从 JSON 序列化的字典创建通用设备映射。
参数:
dict:序列化后的映射。
返回值:
通用设备映射。
DeviceMap.from_hf
@staticmethod
def from_hf(
hf_device_map: dict[str, Union[int, str,
"torch.device"]]) -> "DeviceMap"
从 HuggingFace 设备映射创建通用设备映射。
参数:
hf_device_map:HuggingFace 设备映射。
返回值:
反序列化后的设备映射。
ComponentDevice
组件设备的表示。
这可以是单个设备或设备映射。
ComponentDevice.from_str
@classmethod
def from_str(cls, device_str: str) -> "ComponentDevice"
从设备字符串创建组件设备表示。
设备字符串只能表示单个设备。
参数:
device_str:设备字符串。
返回值:
组件设备表示。
ComponentDevice.from_single
@classmethod
def from_single(cls, device: Device) -> "ComponentDevice"
从单个设备创建组件设备表示。
磁盘不能用作单个设备。
参数:
device:设备。
返回值:
组件设备表示。
ComponentDevice.from_multiple
@classmethod
def from_multiple(cls, device_map: DeviceMap) -> "ComponentDevice"
从设备映射创建组件设备表示。
参数:
device_map:设备映射。
返回值:
组件设备表示。
ComponentDevice.to_torch
def to_torch() -> "torch.device"
将组件设备表示转换为 PyTorch 格式。
不支持设备映射。
返回值:
PyTorch 设备表示。
ComponentDevice.to_torch_str
def to_torch_str() -> str
将组件设备表示转换为 PyTorch 字符串格式。
不支持设备映射。
返回值:
PyTorch 设备字符串表示。
ComponentDevice.to_spacy
def to_spacy() -> int
将组件设备表示转换为 spaCy 格式。
不支持设备映射。
返回值:
spaCy 设备表示。
ComponentDevice.to_hf
def to_hf() -> Union[Union[int, str], dict[str, Union[int, str]]]
将组件设备表示转换为 HuggingFace 格式。
返回值:
HuggingFace 设备表示。
ComponentDevice.update_hf_kwargs
def update_hf_kwargs(hf_kwargs: dict[str, Any], *,
overwrite: bool) -> dict[str, Any]
将组件设备表示转换为 HuggingFace 格式。
将它们作为规范关键字参数添加到关键字参数字典中。
参数:
hf_kwargs:HuggingFace 关键字参数字典。overwrite:是否覆盖现有设备参数。
返回值:
HuggingFace 关键字参数字典。
ComponentDevice.has_multiple_devices
@property
def has_multiple_devices() -> bool
此组件设备表示是否包含多个设备。
ComponentDevice.first_device
@property
def first_device() -> Optional["ComponentDevice"]
返回单个设备或设备映射中的第一个设备(如果存在)。
返回值:
第一个设备。
ComponentDevice.resolve_device
@staticmethod
def resolve_device(
device: Optional["ComponentDevice"] = None) -> "ComponentDevice"
为组件选择设备。如果指定了设备,则使用该设备。否则,使用默认设备。
参数:
device:提供的设备(如果存在)。
返回值:
已解析的设备。
ComponentDevice.to_dict
def to_dict() -> dict[str, Any]
将组件设备表示转换为 JSON 可序列化字典。
返回值:
字典表示。
ComponentDevice.from_dict
@classmethod
def from_dict(cls, dict: dict[str, Any]) -> "ComponentDevice"
从 JSON 序列化字典创建组件设备表示。
参数:
dict:序列化表示。
返回值:
反序列化的组件设备。
模块 http_client
init_http_client
def init_http_client(
http_client_kwargs: Optional[dict[str, Any]] = None,
async_client: bool = False
) -> Union[httpx.Client, httpx.AsyncClient, None]
根据 http_client_kwargs 初始化 httpx 客户端。
参数:
http_client_kwargs:要传递给 httpx 客户端的 kwargs。async_client:是否初始化异步客户端。
返回值:
httpx 客户端或异步 httpx 客户端。
模块 jupyter
is_in_jupyter
def is_in_jupyter() -> bool
返回值True 如果在 Jupyter 或 Google Colab 中,False 否则。
模块 url_validation
is_valid_http_url
def is_valid_http_url(url: str) -> bool
检查 URL 是否为有效的 HTTP/HTTPS URL。
模块 deserialization
deserialize_document_store_in_init_params_inplace
def deserialize_document_store_in_init_params_inplace(
data: dict[str, Any], key: str = "document_store") -> None
就地反序列化序列化组件的 init_parameters 中的通用文档存储。
参数:
data: 要反序列化的字典。key:`data["init_parameters"]` 字典中的键,其中指定了文档存储。data["init_parameters"]字典。
引发:
DeserializationError:如果文档存储未在序列化数据中正确指定,或其类型无法导入。
返回值:
字典,已反序列化文档存储。
deserialize_chatgenerator_inplace
def deserialize_chatgenerator_inplace(data: dict[str, Any],
key: str = "chat_generator") -> None
就地反序列化字典中的 ChatGenerator。
参数:
data:包含序列化数据的字典。key:字典中存储 ChatGenerator 的键。
引发:
DeserializationError:如果序列化数据中缺少键,值为非字典,缺少类型键,类无法导入,或类缺少 'from_dict' 方法。
deserialize_component_inplace
def deserialize_component_inplace(data: dict[str, Any],
key: str = "chat_generator") -> None
就地反序列化字典中的 Component。
参数:
data:包含序列化数据的字典。key:字典中存储 Component 的键。默认为 "chat_generator"。
引发:
DeserializationError:如果序列化数据中缺少键,值为非字典,缺少类型键,类无法导入,或类缺少 'from_dict' 方法。
模块 auth
SecretType
SecretType.from_str
@staticmethod
def from_str(string: str) -> "SecretType"
将字符串转换为 SecretType。
参数:
string:要转换的字符串。
Secret
封装用于身份验证的秘密。
使用示例
from haystack.components.generators import OpenAIGenerator
from haystack.utils import Secret
generator = OpenAIGenerator(api_key=Secret.from_token("<here_goes_your_token>"))
Secret.from_token
@staticmethod
def from_token(token: str) -> "Secret"
创建基于令牌的秘密。无法序列化。
参数:
token:用于身份验证的令牌。
Secret.from_env_var
@staticmethod
def from_env_var(env_vars: Union[str, list[str]],
*,
strict: bool = True) -> "Secret"
创建基于环境变量的秘密。接受一个或多个环境变量。
解析后,它将返回第一个已设置环境变量的字符串令牌。
参数:
env_vars:单个环境变量或有序的候选环境变量列表。strict:如果所有环境变量都未设置,是否引发异常。
Secret.to_dict
def to_dict() -> dict[str, Any]
将秘密转换为 JSON 可序列化字典。
某些秘密可能无法序列化。
返回值:
序列化后的策略。
Secret.from_dict
@staticmethod
def from_dict(dict: dict[str, Any]) -> "Secret"
从 JSON 可序列化字典创建秘密。
参数:
dict:包含序列化数据的字典。
返回值:
反序列化后的秘密。
Secret.resolve_value
@abstractmethod
def resolve_value() -> Optional[Any]
将秘密解析为原子值。该值的语义取决于具体秘密。
返回值:
秘密的值(如果存在)。
Secret.type
@property
@abstractmethod
def type() -> SecretType
秘密的类型。
deserialize_secrets_inplace
def deserialize_secrets_inplace(data: dict[str, Any],
keys: Iterable[str],
*,
recursive: bool = False) -> None
就地反序列化字典中的秘密。
参数:
data:包含序列化数据的字典。keys:要反序列化的秘密的键。recursive:是否递归反序列化嵌套字典。
