文档API参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
API 参考

MongoDB Atlas

Haystack 的 MongoDB Atlas 集成

模块 haystack_integrations.document_stores.mongodb_atlas.document_store

MongoDBAtlasDocumentStore

一个 MongoDBAtlasDocumentStore 实现,它使用易于部署、操作和扩展的 MongoDB Atlas 服务。

要连接到 MongoDB Atlas,您需要提供一个格式为"mongodb+srv://{mongo_atlas_username}:{mongo_atlas_password}@{mongo_atlas_host}/?{mongo_atlas_params_string}" 的连接字符串。.

此连接字符串可在 MongoDB Atlas Dashboard 上通过点击CONNECT 按钮,选择 Python 作为驱动程序,然后复制连接字符串来获取。连接字符串可以作为环境变量提供MONGO_CONNECTION_STRING 或直接作为参数传递给MongoDBAtlasDocumentStore 构造函数。

提供连接字符串后,您需要指定要使用的database_namecollection_name。最有可能您将通过 MongoDB Atlas Web UI 创建它们,但也可以通过 MongoDB Python 驱动程序来创建。创建数据库和集合超出了 MongoDBAtlasDocumentStore 的范围。此文档存储的主要目的是读取和写入文档到现有集合。

用户必须同时提供一个vector_search_index 用于向量搜索操作,以及一个full_text_search_index 用于全文搜索操作。该vector_search_index 支持选定的指标(例如,余弦、点积或欧氏距离),而full_text_search_index 可实现高效的文本搜索。这两个索引都可以通过 Atlas Web UI 创建。

有关 MongoDB Atlas 的更多详细信息,请参阅官方 MongoDB Atlas 文档

使用示例

from haystack_integrations.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore

store = MongoDBAtlasDocumentStore(database_name="your_existing_db",
                                  collection_name="your_existing_collection",
                                  vector_search_index="your_existing_index",
                                  full_text_search_index="your_existing_index")
print(store.count_documents())

MongoDBAtlasDocumentStore.__init__

def __init__(*,
             mongo_connection_string: Secret = Secret.from_env_var(
                 "MONGO_CONNECTION_STRING"),
             database_name: str,
             collection_name: str,
             vector_search_index: str,
             full_text_search_index: str,
             embedding_field: str = "embedding",
             content_field: str = "content")

创建一个新的 MongoDBAtlasDocumentStore 实例。

参数:

  • mongo_connection_string: MongoDB Atlas 连接字符串,格式为"mongodb+srv://{mongo_atlas_username}:{mongo_atlas_password}@{mongo_atlas_host}/?{mongo_atlas_params_string}"。此连接字符串可在 MongoDB Atlas Dashboard 上通过点击CONNECT 按钮获取。此值将自动从环境变量 "MONGO_CONNECTION_STRING" 读取。
  • database_name: 要使用的数据库名称。
  • collection_name: 要使用的集合名称。要使用此文档存储进行嵌入检索,此集合需要在embedding 字段上设置向量搜索索引。
  • vector_search_index: 用于向量搜索操作的向量搜索索引的名称。在 Atlas Web UI 中创建 vector_search_index 并指定 MongoDBAtlasDocumentStore 的初始化参数。有关更多详细信息,请参阅 MongoDB Atlas 文档
  • full_text_search_index: 用于全文搜索操作的搜索索引的名称。在 Atlas Web UI 中创建 full_text_search_index 并指定 MongoDBAtlasDocumentStore 的初始化参数。有关更多详细信息,请参阅 MongoDB Atlas 文档
  • embedding_field: 包含文档嵌入的字段的名称。默认为 "embedding"。
  • content_field: 包含文档内容的字段的名称。默认为 "content"。此字段允许定义要将哪个字段加载到 Haystack Document 对象中作为内容。在与现有集合集成进行检索时,这可能特别有用。我们不建议在使用 Haystack 创建的集合时使用此参数。

引发:

  • ValueError: 如果集合名称包含无效字符。

MongoDBAtlasDocumentStore.__del__

def __del__() -> None

析构方法,在实例被销毁时关闭 MongoDB 连接。

MongoDBAtlasDocumentStore.to_dict

def to_dict() -> Dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

MongoDBAtlasDocumentStore.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MongoDBAtlasDocumentStore"

从字典反序列化组件。

参数:

  • data: 要反序列化的字典。

返回值:

反序列化后的组件。

MongoDBAtlasDocumentStore.count_documents

def count_documents() -> int

返回文档存储中存在的文档数量。

返回值:

文档存储中的文档数量。

MongoDBAtlasDocumentStore.count_documents_async

async def count_documents_async() -> int

异步返回文档存储中存在的文档数量。

返回值:

文档存储中的文档数量。

MongoDBAtlasDocumentStore.filter_documents

def filter_documents(
        filters: Optional[Dict[str, Any]] = None) -> List[Document]

返回与提供的过滤器匹配的文档。

有关过滤器的详细规范,请参阅 Haystack 文档

参数:

  • filters: 要应用的过滤器。它只返回与过滤器匹配的文档。

返回值:

与给定过滤器匹配的文档列表。

MongoDBAtlasDocumentStore.filter_documents_async

async def filter_documents_async(
        filters: Optional[Dict[str, Any]] = None) -> List[Document]

异步返回与提供的过滤器匹配的文档。

有关过滤器的详细规范,请参阅 Haystack 文档

参数:

  • filters: 要应用的过滤器。它只返回与过滤器匹配的文档。

返回值:

与给定过滤器匹配的文档列表。

MongoDBAtlasDocumentStore.write_documents

def write_documents(documents: List[Document],
                    policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int

将文档写入 MongoDB Atlas 集合。

参数:

  • documents: 要写入文档存储的文档列表。
  • policy: 写入文档时使用的重复策略。

引发:

  • DuplicateDocumentError: 如果文档 ID 相同,并且策略设置为 DuplicatePolicy.FAIL(或未指定)。
  • ValueError: 如果文档不是 Document 类型。

返回值:

写入文档存储的文档数量。

MongoDBAtlasDocumentStore.write_documents_async

async def write_documents_async(
        documents: List[Document],
        policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int

将文档写入 MongoDB Atlas 集合。

参数:

  • documents: 要写入文档存储的文档列表。
  • policy: 写入文档时使用的重复策略。

引发:

  • DuplicateDocumentError: 如果文档 ID 相同,并且策略设置为 DuplicatePolicy.FAIL(或未指定)。
  • ValueError: 如果文档不是 Document 类型。

返回值:

写入文档存储的文档数量。

MongoDBAtlasDocumentStore.delete_documents

def delete_documents(document_ids: List[str]) -> None

从文档存储中删除所有具有匹配 document_ids 的文档。

参数:

  • document_ids: 要删除的文档 ID

MongoDBAtlasDocumentStore.delete_documents_async

async def delete_documents_async(document_ids: List[str]) -> None

异步从文档存储中删除所有具有匹配 document_ids 的文档。

参数:

  • document_ids: 要删除的文档 ID

模块 haystack_integrations.components.retrievers.mongodb_atlas.embedding_retriever

MongoDBAtlasEmbeddingRetriever

通过嵌入相似性从 MongoDBAtlasDocumentStore 检索文档。

相似性取决于 MongoDBAtlasDocumentStore 中使用的 vector_search_index 以及索引创建期间选择的指标(即余弦、点积或欧氏距离)。有关更多信息,请参阅 MongoDBAtlasDocumentStore。

使用示例

import numpy as np
from haystack_integrations.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore
from haystack_integrations.components.retrievers.mongodb_atlas import MongoDBAtlasEmbeddingRetriever

store = MongoDBAtlasDocumentStore(database_name="haystack_integration_test",
                                  collection_name="test_embeddings_collection",
                                  vector_search_index="cosine_index",
                                  full_text_search_index="full_text_index")
retriever = MongoDBAtlasEmbeddingRetriever(document_store=store)

results = retriever.run(query_embedding=np.random.random(768).tolist())
print(results["documents"])

上面的示例从 MongoDBAtlasDocumentStore 中检索了与随机查询嵌入最相似的 10 个文档。请注意,query_embedding 的维度必须与 MongoDBAtlasDocumentStore 中存储的嵌入的维度匹配。

MongoDBAtlasEmbeddingRetriever.__init__

def __init__(*,
             document_store: MongoDBAtlasDocumentStore,
             filters: Optional[Dict[str, Any]] = None,
             top_k: int = 10,
             filter_policy: Union[str, FilterPolicy] = FilterPolicy.REPLACE)

创建 MongoDBAtlasDocumentStore 组件。

参数:

  • document_store: MongoDBAtlasDocumentStore 的一个实例。
  • filters: 应用于检索到的文档的过滤器。请确保过滤器中使用的字段包含在vector_search_index 的配置中。配置必须手动在 MongoDB Atlas 的 Web UI 中完成。
  • top_k: 要返回的最大文档数量。
  • filter_policy: 确定如何应用过滤器的策略。

引发:

  • ValueError: 如果document_store 不是一个实例MongoDBAtlasDocumentStore.

MongoDBAtlasEmbeddingRetriever.to_dict

def to_dict() -> Dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

MongoDBAtlasEmbeddingRetriever.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MongoDBAtlasEmbeddingRetriever"

从字典反序列化组件。

参数:

  • data: 要反序列化的字典。

返回值:

反序列化后的组件。

MongoDBAtlasEmbeddingRetriever.run

@component.output_types(documents=List[Document])
def run(query_embedding: List[float],
        filters: Optional[Dict[str, Any]] = None,
        top_k: Optional[int] = None) -> Dict[str, List[Document]]

根据提供的嵌入相似性,从 MongoDBAtlasDocumentStore 检索文档。

参数:

  • query_embedding: 查询的嵌入。
  • filters: 应用于检索到的文档的过滤器。运行时过滤器的应用方式取决于初始化检索器时选择的filter_policy。有关更多详细信息,请参阅 init 方法文档字符串。
  • top_k: 要返回的最大文档数量。会覆盖初始化时指定的值。

返回值:

包含以下键的字典

  • documents: 与给定的query_embedding

最相似的文档列表

@component.output_types(documents=List[Document])
async def run_async(query_embedding: List[float],
                    filters: Optional[Dict[str, Any]] = None,
                    top_k: Optional[int] = None) -> Dict[str, List[Document]]

MongoDBAtlasEmbeddingRetriever.run_async

异步根据提供的嵌入相似性,从 MongoDBAtlasDocumentStore 检索文档。

参数:

  • query_embedding: 查询的嵌入。
  • filters: 应用于检索到的文档的过滤器。运行时过滤器的应用方式取决于初始化检索器时选择的filter_policy。有关更多详细信息,请参阅 init 方法文档字符串。
  • top_k: 要返回的最大文档数量。会覆盖初始化时指定的值。

返回值:

包含以下键的字典

  • documents: 与给定的query_embedding

模块 haystack_integrations.components.retrievers.mongodb_atlas.full_text_retriever

MongoDBAtlasFullTextRetriever

通过全文搜索从 MongoDBAtlasDocumentStore 检索文档。

全文搜索取决于 MongoDBAtlasDocumentStore 中使用的 full_text_search_index。有关更多信息,请参阅 MongoDBAtlasDocumentStore。

使用示例

from haystack_integrations.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore
from haystack_integrations.components.retrievers.mongodb_atlas import MongoDBAtlasFullTextRetriever

store = MongoDBAtlasDocumentStore(database_name="your_existing_db",
                                  collection_name="your_existing_collection",
                                  vector_search_index="your_existing_index",
                                  full_text_search_index="your_existing_index")
retriever = MongoDBAtlasFullTextRetriever(document_store=store)

results = retriever.run(query="Lorem ipsum")
print(results["documents"])

上面的示例从 MongoDBAtlasDocumentStore 中检索了与查询 "Lorem ipsum" 最相似的 10 个文档。

MongoDBAtlasFullTextRetriever.__init__

def __init__(*,
             document_store: MongoDBAtlasDocumentStore,
             filters: Optional[Dict[str, Any]] = None,
             top_k: int = 10,
             filter_policy: Union[str, FilterPolicy] = FilterPolicy.REPLACE)

参数:

  • document_store: MongoDBAtlasDocumentStore 的一个实例。
  • filters: 应用于检索到的文档的过滤器。请确保过滤器中使用的字段包含在full_text_search_index。配置必须手动在 MongoDB Atlas 的 Web UI 中完成。
  • top_k: 要返回的最大文档数量。
  • filter_policy: 确定如何应用过滤器的策略。

引发:

  • ValueError: 如果document_store 不是 MongoDBAtlasDocumentStore 的实例。

MongoDBAtlasFullTextRetriever.to_dict

def to_dict() -> Dict[str, Any]

将组件序列化为字典。

返回值:

包含序列化数据的字典。

MongoDBAtlasFullTextRetriever.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MongoDBAtlasFullTextRetriever"

从字典反序列化组件。

参数:

  • data: 要反序列化的字典。

返回值:

反序列化后的组件。

MongoDBAtlasFullTextRetriever.run

@component.output_types(documents=List[Document])
def run(query: Union[str, List[str]],
        fuzzy: Optional[Dict[str, int]] = None,
        match_criteria: Optional[Literal["any", "all"]] = None,
        score: Optional[Dict[str, Dict]] = None,
        synonyms: Optional[str] = None,
        filters: Optional[Dict[str, Any]] = None,
        top_k: int = 10) -> Dict[str, List[Document]]

通过全文搜索从 MongoDBAtlasDocumentStore 检索文档。

参数:

  • query: 要搜索的查询字符串或查询字符串列表。如果查询包含多个词,Atlas Search 会单独评估每个词是否匹配。
  • fuzzy: 启用查找与搜索词相似的字符串。注意,fuzzy 不能与synonyms 一起使用。可配置选项包括maxEdits, prefixLengthmaxExpansions。有关更多详细信息,请参阅 MongoDB Atlas 文档
  • match_criteria: 定义查询中的词如何匹配。支持的选项是"any""all"。有关更多详细信息,请参阅 MongoDB Atlas 文档
  • score: 指定匹配结果的评分方法。支持的选项包括boost, constantfunction。有关更多详细信息,请参阅 MongoDB Atlas 文档
  • synonyms: 索引中同义词映射定义的名称。此值不能是空字符串。注意,synonyms 不能与fuzzy.
  • filters: 应用于检索到的文档的过滤器。运行时过滤器的应用方式取决于初始化检索器时选择的filter_policy。有关更多详细信息,请参阅 init 方法文档字符串。
  • top_k: 要返回的最大文档数量。会覆盖初始化时指定的值。

返回值:

包含以下键的字典

  • documents: 与给定的query

MongoDBAtlasFullTextRetriever.run_async

@component.output_types(documents=List[Document])
async def run_async(query: Union[str, List[str]],
                    fuzzy: Optional[Dict[str, int]] = None,
                    match_criteria: Optional[Literal["any", "all"]] = None,
                    score: Optional[Dict[str, Dict]] = None,
                    synonyms: Optional[str] = None,
                    filters: Optional[Dict[str, Any]] = None,
                    top_k: int = 10) -> Dict[str, List[Document]]

异步通过全文搜索从 MongoDBAtlasDocumentStore 检索文档。

参数:

  • query: 要搜索的查询字符串或查询字符串列表。如果查询包含多个词,Atlas Search 会单独评估每个词是否匹配。
  • fuzzy: 启用查找与搜索词相似的字符串。注意,fuzzy 不能与synonyms 一起使用。可配置选项包括maxEdits, prefixLengthmaxExpansions。有关更多详细信息,请参阅 MongoDB Atlas 文档
  • match_criteria: 定义查询中的词如何匹配。支持的选项是"any""all"。有关更多详细信息,请参阅 MongoDB Atlas 文档
  • score: 指定匹配结果的评分方法。支持的选项包括boost, constantfunction。有关更多详细信息,请参阅 MongoDB Atlas 文档
  • synonyms: 索引中同义词映射定义的名称。此值不能是空字符串。注意,synonyms 不能与fuzzy.
  • filters: 应用于检索到的文档的过滤器。运行时过滤器的应用方式取决于初始化检索器时选择的filter_policy。有关更多详细信息,请参阅 init 方法文档字符串。
  • top_k: 要返回的最大文档数量。会覆盖初始化时指定的值。

返回值:

包含以下键的字典

  • documents: 与给定的query