文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
API 参考

Elasticsearch

Haystack 的 Elasticsearch 集成

模块 haystack_integrations.components.retrievers.elasticsearch.bm25_retriever

ElasticsearchBM25Retriever

ElasticsearchBM25Retriever 使用 BM25 算法从 ElasticsearchDocumentStore 中检索文档,以查找与用户查询最相似的文档。

此检索器仅与 ElasticsearchDocumentStore 兼容。

使用示例

from haystack import Document
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchBM25Retriever

document_store = ElasticsearchDocumentStore(hosts="https://:9200")
retriever = ElasticsearchBM25Retriever(document_store=document_store)

# Add documents to DocumentStore
documents = [
    Document(text="My name is Carla and I live in Berlin"),
    Document(text="My name is Paul and I live in New York"),
    Document(text="My name is Silvano and I live in Matera"),
    Document(text="My name is Usagi Tsukino and I live in Tokyo"),
]
document_store.write_documents(documents)

result = retriever.run(query="Who lives in Berlin?")
for doc in result["documents"]:
    print(doc.content)

ElasticsearchBM25Retriever.__init__

def __init__(*,
             document_store: ElasticsearchDocumentStore,
             filters: Optional[Dict[str, Any]] = None,
             fuzziness: str = "AUTO",
             top_k: int = 10,
             scale_score: bool = False,
             filter_policy: Union[str, FilterPolicy] = FilterPolicy.REPLACE)

使用 ElasticsearchDocumentStore 实例初始化 ElasticsearchBM25Retriever。

参数:

  • document_store: ElasticsearchDocumentStore 的一个实例。
  • filters: 应用于检索到的文档的过滤器,有关更多信息,请参阅ElasticsearchDocumentStore.filter_documents.
  • fuzziness: 传递给 Elasticsearch 的模糊匹配参数。有关更多详细信息,请参阅官方文档
  • top_k: 返回的最大文档数。
  • scale_score: 如果True 将文档的分数缩放到 0 和 1 之间。
  • filter_policy: 确定如何应用过滤器的策略。

引发:

  • ValueError: 如果document_store 不是一个实例ElasticsearchDocumentStore.

ElasticsearchBM25Retriever.to_dict

def to_dict() -> Dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

ElasticsearchBM25Retriever.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ElasticsearchBM25Retriever"

从字典反序列化组件。

参数:

  • data: 要反序列化的字典。

返回值:

反序列化后的组件。

ElasticsearchBM25Retriever.run

@component.output_types(documents=List[Document])
def run(query: str,
        filters: Optional[Dict[str, Any]] = None,
        top_k: Optional[int] = None) -> Dict[str, List[Document]]

使用基于 BM25 关键字的算法检索文档。

参数:

  • query: 要在Documents 文本中搜索的字符串。Documents 文本。
  • filters: 应用于检索到的文档的过滤器。运行时过滤器的应用方式取决于初始化检索器时选择的filter_policy。有关更多详细信息,请参阅 init 方法文档字符串。
  • top_k: 返回的最大Document数。Document数。

返回值:

包含以下键的字典

  • documents: 匹配查询的Document列表。Document列表。

ElasticsearchBM25Retriever.run_async

@component.output_types(documents=List[Document])
async def run_async(query: str,
                    filters: Optional[Dict[str, Any]] = None,
                    top_k: Optional[int] = None) -> Dict[str, List[Document]]

使用基于 BM25 关键字的算法异步检索文档。

参数:

  • query: 要在Documents 文本中搜索的字符串。Document 文本。
  • filters: 应用于检索到的文档的过滤器。运行时过滤器的应用方式取决于初始化检索器时选择的filter_policy。有关更多详细信息,请参阅 init 方法文档字符串。
  • top_k: 返回的最大Document数。Document数。

返回值:

包含以下键的字典

  • documents: 匹配查询的Document列表。Document列表。

模块 haystack_integrations.components.retrievers.elasticsearch.embedding_retriever

ElasticsearchEmbeddingRetriever

ElasticsearchEmbeddingRetriever 使用向量相似性从 ElasticsearchDocumentStore 中检索文档。

使用示例

from haystack import Document
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchEmbeddingRetriever

document_store = ElasticsearchDocumentStore(hosts="https://:9200")
retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)

# Add documents to DocumentStore
documents = [
    Document(text="My name is Carla and I live in Berlin"),
    Document(text="My name is Paul and I live in New York"),
    Document(text="My name is Silvano and I live in Matera"),
    Document(text="My name is Usagi Tsukino and I live in Tokyo"),
]
document_store.write_documents(documents)

te = SentenceTransformersTextEmbedder()
te.warm_up()
query_embeddings = te.run("Who lives in Berlin?")["embedding"]

result = retriever.run(query=query_embeddings)
for doc in result["documents"]:
    print(doc.content)

ElasticsearchEmbeddingRetriever.__init__

def __init__(*,
             document_store: ElasticsearchDocumentStore,
             filters: Optional[Dict[str, Any]] = None,
             top_k: int = 10,
             num_candidates: Optional[int] = None,
             filter_policy: Union[str, FilterPolicy] = FilterPolicy.REPLACE)

创建 ElasticsearchEmbeddingRetriever 组件。

参数:

  • document_store: ElasticsearchDocumentStore 的一个实例。
  • filters: 应用于检索到的文档的过滤器。在近似 KNN 搜索期间应用过滤器,以确保返回 top_k 个匹配的文档。
  • top_k: 返回的最大文档数。
  • num_candidates: 每个分片上的近似最近邻候选数量。默认为 top_k * 10。增加此值将提高搜索精度,但会降低搜索速度。您可以在 Elasticsearch文档中阅读更多相关信息。
  • filter_policy: 确定如何应用过滤器的策略。

引发:

  • ValueError: 如果document_store 不是 ElasticsearchDocumentStore 的实例。

ElasticsearchEmbeddingRetriever.to_dict

def to_dict() -> Dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

ElasticsearchEmbeddingRetriever.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ElasticsearchEmbeddingRetriever"

从字典反序列化组件。

参数:

  • data: 要反序列化的字典。

返回值:

反序列化后的组件。

ElasticsearchEmbeddingRetriever.run

@component.output_types(documents=List[Document])
def run(query_embedding: List[float],
        filters: Optional[Dict[str, Any]] = None,
        top_k: Optional[int] = None) -> Dict[str, List[Document]]

使用向量相似性度量检索文档。

参数:

  • query_embedding: 查询的嵌入。
  • filters: 在从 Document Store 获取文档时应用的过滤器。在近似 kNN 搜索期间应用过滤器,以确保 Retriever 返回top_k 个匹配文档。运行时过滤器的应用方式取决于初始化 Retriever 时选择的filter_policy
  • top_k: 要返回的最大文档数。

返回值:

包含以下键的字典

  • documents: 匹配查询的Document列表。Documents 与给定query_embedding

ElasticsearchEmbeddingRetriever.run_async

@component.output_types(documents=List[Document])
async def run_async(query_embedding: List[float],
                    filters: Optional[Dict[str, Any]] = None,
                    top_k: Optional[int] = None) -> Dict[str, List[Document]]

使用向量相似性度量异步检索文档。

参数:

  • query_embedding: 查询的嵌入。
  • filters: 在从 Document Store 获取文档时应用的过滤器。在近似 kNN 搜索期间应用过滤器,以确保 Retriever 返回top_k 个匹配文档。运行时过滤器的应用方式取决于初始化 Retriever 时选择的filter_policy
  • top_k: 要返回的最大文档数。

返回值:

包含以下键的字典

  • documents: 匹配查询的Document列表。Document列表。

模块 haystack_integrations.document_stores.elasticsearch.document_store

ElasticsearchDocumentStore

一个 ElasticsearchDocumentStore 实例,可与 Elastic Cloud 或您自己的 Elasticsearch 集群配合使用。

用法示例(Elastic Cloud)

from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
document_store = ElasticsearchDocumentStore(
    api_key_id=Secret.from_env_var("ELASTIC_API_KEY_ID", strict=False),
    api_key=Secret.from_env_var("ELASTIC_API_KEY", strict=False),
)

用法示例(自托管 Elasticsearch 实例)

from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
document_store = ElasticsearchDocumentStore(hosts="https://:9200")

在上面的示例中,我们禁用了安全连接,仅为展示基本用法。我们强烈建议启用安全功能,以确保只有授权用户才能访问您的数据。

有关如何连接到 Elasticsearch 和配置安全性的更多详细信息,请参阅官方 Elasticsearch文档

所有额外的关键字参数都将传递给 Elasticsearch 客户端。

ElasticsearchDocumentStore.__init__

def __init__(
        *,
        hosts: Optional[Hosts] = None,
        custom_mapping: Optional[Dict[str, Any]] = None,
        index: str = "default",
        api_key: Secret = Secret.from_env_var("ELASTIC_API_KEY", strict=False),
        api_key_id: Secret = Secret.from_env_var("ELASTIC_API_KEY_ID",
                                                 strict=False),
        embedding_similarity_function: Literal["cosine", "dot_product",
                                               "l2_norm",
                                               "max_inner_product"] = "cosine",
        **kwargs: Any)

创建一个新的 ElasticsearchDocumentStore 实例。

它还会尝试创建索引(如果尚不存在)。否则,将使用现有索引。

还可以设置用于比较文档嵌入的相似度函数。这在使用 Pipeline 中的ElasticsearchDocumentStoreElasticsearchEmbeddingRetriever.

有关连接参数的更多信息,请参阅官方 Elasticsearch文档

有关支持的 kwargs 的完整列表,请参阅官方 Elasticsearch参考

身份验证通过 Secret 对象提供,默认从环境变量加载。您可以同时提供api_key_idapi_key,或者只提供api_key,其中包含 base64 编码的id:secret 字符串。Secret 实例也可以使用Secret.from_token() 方法从令牌加载。

参数:

  • hosts: 运行 Elasticsearch 客户端的主机列表。
  • custom_mapping: 索引的自定义映射。如果未提供,则使用默认映射。
  • index: Elasticsearch 中的索引名称。
  • api_key: 一个 Secret 对象,包含用于身份验证的 API 密钥,或者包含用“:”分隔的 secret 和 id 的 base64 编码字符串,用于向 Elasticsearch 进行身份验证。
  • api_key_id: 一个 Secret 对象,包含用于向 Elasticsearch 进行身份验证的 API 密钥 ID。
  • embedding_similarity_function: 用于比较文档嵌入的相似度函数。此参数仅在索引尚不存在且正在创建时生效。要选择最合适的函数,请查找有关您的嵌入模型的信息。要了解文档分数是如何计算的,请参阅 Elasticsearch文档
  • **kwargs: 可选参数,Elasticsearch 接受。

ElasticsearchDocumentStore.client

@property
def client() -> Elasticsearch

返回同步 Elasticsearch 客户端,并在必要时进行初始化。

ElasticsearchDocumentStore.async_client

@property
def async_client() -> AsyncElasticsearch

返回异步 Elasticsearch 客户端,并在必要时进行初始化。

ElasticsearchDocumentStore.to_dict

def to_dict() -> Dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

ElasticsearchDocumentStore.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ElasticsearchDocumentStore"

从字典反序列化组件。

参数:

  • data: 要反序列化的字典。

返回值:

反序列化后的组件。

ElasticsearchDocumentStore.count_documents

def count_documents() -> int

返回文档存储中存在的文档数量。

返回值:

文档存储中的文档数量。

ElasticsearchDocumentStore.count_documents_async

async def count_documents_async() -> int

异步返回文档存储中存在的文档数量。

返回值:

文档存储中的文档数量。

ElasticsearchDocumentStore.filter_documents

def filter_documents(
        filters: Optional[Dict[str, Any]] = None) -> List[Document]

文档存储的主要查询方法。它检索所有匹配过滤器的文档。

参数:

  • filters: 要应用的过滤器字典。有关过滤器结构的更多信息,请参阅官方 Elasticsearch文档

返回值:

匹配过滤器的Document列表。Document列表。

ElasticsearchDocumentStore.filter_documents_async

async def filter_documents_async(
        filters: Optional[Dict[str, Any]] = None) -> List[Document]

异步检索所有匹配过滤器的文档。

参数:

  • filters: 要应用的过滤器字典。有关过滤器结构的更多信息,请参阅官方 Elasticsearch文档

返回值:

匹配过滤器的Document列表。Document列表。

ElasticsearchDocumentStore.write_documents

def write_documents(documents: List[Document],
                    policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int

Documents 写入 Elasticsearch。

参数:

  • documents: 要写入文档存储的文档列表。
  • policy: 当文档存储中已存在具有相同 ID 的文档时应用的 DuplicatePolicy。

引发:

  • ValueError: 如果documents 不是Documents 的列表。Documents 的列表。
  • DuplicateDocumentError: 如果文档存储中已存在具有相同 ID 的文档,并且policy 设置为DuplicatePolicy.FAILDuplicatePolicy.NONE.
  • DocumentStoreError: 在将文档写入文档存储时发生错误。

返回值:

写入文档存储的文档数量。

ElasticsearchDocumentStore.write_documents_async

async def write_documents_async(
        documents: List[Document],
        policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int

异步写入Documents 写入 Elasticsearch。

参数:

  • documents: 要写入文档存储的文档列表。
  • policy: 当文档存储中已存在具有相同 ID 的文档时应用的 DuplicatePolicy。

引发:

  • ValueError: 如果documents 不是Documents 的列表。Documents 的列表。
  • DuplicateDocumentError: 如果文档存储中已存在具有相同 ID 的文档,并且policy 设置为DuplicatePolicy.FAILDuplicatePolicy.NONE.
  • DocumentStoreError: 在将文档写入文档存储时发生错误。

返回值:

写入文档存储的文档数量。

ElasticsearchDocumentStore.delete_documents

def delete_documents(document_ids: List[str]) -> None

从文档存储中删除所有具有匹配 document_ids 的文档。

参数:

  • document_ids: 要删除的文档 ID

ElasticsearchDocumentStore.delete_documents_async

async def delete_documents_async(document_ids: List[str]) -> None

异步地从文档存储中删除所有具有匹配 document_ids 的文档。

参数:

  • document_ids: 要删除的文档 ID

ElasticsearchDocumentStore.delete_all_documents

def delete_all_documents(recreate_index: bool = False) -> None

删除文档存储中的所有文档。

一种快速清除文档存储中所有文档的方法,同时保留任何索引设置和映射。

参数:

  • recreate_index: 如果为 True,则将删除并重新创建索引,并使用原始映射和设置。如果为 False,则将使用delete_by_query API 删除所有文档。

ElasticsearchDocumentStore.delete_all_documents_async

async def delete_all_documents_async(recreate_index: bool = False) -> None

异步删除文档存储中的所有文档。

一种快速清除文档存储中所有文档的方法,同时保留任何索引设置和映射。

参数:

  • recreate_index: 如果为 True,则将删除并重新创建索引,并使用原始映射和设置。如果为 False,则将使用delete_by_query API 删除所有文档。