Haystack 的 Elasticsearch 集成
模块 haystack_integrations.components.retrievers.elasticsearch.bm25_retriever
ElasticsearchBM25Retriever
ElasticsearchBM25Retriever 使用 BM25 算法从 ElasticsearchDocumentStore 中检索文档,以查找与用户查询最相似的文档。
此检索器仅与 ElasticsearchDocumentStore 兼容。
使用示例
from haystack import Document
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchBM25Retriever
document_store = ElasticsearchDocumentStore(hosts="https://:9200")
retriever = ElasticsearchBM25Retriever(document_store=document_store)
# Add documents to DocumentStore
documents = [
Document(text="My name is Carla and I live in Berlin"),
Document(text="My name is Paul and I live in New York"),
Document(text="My name is Silvano and I live in Matera"),
Document(text="My name is Usagi Tsukino and I live in Tokyo"),
]
document_store.write_documents(documents)
result = retriever.run(query="Who lives in Berlin?")
for doc in result["documents"]:
print(doc.content)
ElasticsearchBM25Retriever.__init__
def __init__(*,
document_store: ElasticsearchDocumentStore,
filters: Optional[Dict[str, Any]] = None,
fuzziness: str = "AUTO",
top_k: int = 10,
scale_score: bool = False,
filter_policy: Union[str, FilterPolicy] = FilterPolicy.REPLACE)
使用 ElasticsearchDocumentStore 实例初始化 ElasticsearchBM25Retriever。
参数:
document_store: ElasticsearchDocumentStore 的一个实例。filters: 应用于检索到的文档的过滤器,有关更多信息,请参阅ElasticsearchDocumentStore.filter_documents.fuzziness: 传递给 Elasticsearch 的模糊匹配参数。有关更多详细信息,请参阅官方文档。top_k: 返回的最大文档数。scale_score: 如果True将文档的分数缩放到 0 和 1 之间。filter_policy: 确定如何应用过滤器的策略。
引发:
ValueError: 如果document_store不是一个实例ElasticsearchDocumentStore.
ElasticsearchBM25Retriever.to_dict
def to_dict() -> Dict[str, Any]
将组件序列化为字典。
返回值:
包含序列化数据的字典。
ElasticsearchBM25Retriever.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ElasticsearchBM25Retriever"
从字典反序列化组件。
参数:
data: 要反序列化的字典。
返回值:
反序列化后的组件。
ElasticsearchBM25Retriever.run
@component.output_types(documents=List[Document])
def run(query: str,
filters: Optional[Dict[str, Any]] = None,
top_k: Optional[int] = None) -> Dict[str, List[Document]]
使用基于 BM25 关键字的算法检索文档。
参数:
query: 要在Documents 文本中搜索的字符串。Documents 文本。filters: 应用于检索到的文档的过滤器。运行时过滤器的应用方式取决于初始化检索器时选择的filter_policy。有关更多详细信息,请参阅 init 方法文档字符串。top_k: 返回的最大Document数。Document数。
返回值:
包含以下键的字典
documents: 匹配查询的Document列表。Document列表。
ElasticsearchBM25Retriever.run_async
@component.output_types(documents=List[Document])
async def run_async(query: str,
filters: Optional[Dict[str, Any]] = None,
top_k: Optional[int] = None) -> Dict[str, List[Document]]
使用基于 BM25 关键字的算法异步检索文档。
参数:
query: 要在Documents 文本中搜索的字符串。Document文本。filters: 应用于检索到的文档的过滤器。运行时过滤器的应用方式取决于初始化检索器时选择的filter_policy。有关更多详细信息,请参阅 init 方法文档字符串。top_k: 返回的最大Document数。Document数。
返回值:
包含以下键的字典
documents: 匹配查询的Document列表。Document列表。
模块 haystack_integrations.components.retrievers.elasticsearch.embedding_retriever
ElasticsearchEmbeddingRetriever
ElasticsearchEmbeddingRetriever 使用向量相似性从 ElasticsearchDocumentStore 中检索文档。
使用示例
from haystack import Document
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchEmbeddingRetriever
document_store = ElasticsearchDocumentStore(hosts="https://:9200")
retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)
# Add documents to DocumentStore
documents = [
Document(text="My name is Carla and I live in Berlin"),
Document(text="My name is Paul and I live in New York"),
Document(text="My name is Silvano and I live in Matera"),
Document(text="My name is Usagi Tsukino and I live in Tokyo"),
]
document_store.write_documents(documents)
te = SentenceTransformersTextEmbedder()
te.warm_up()
query_embeddings = te.run("Who lives in Berlin?")["embedding"]
result = retriever.run(query=query_embeddings)
for doc in result["documents"]:
print(doc.content)
ElasticsearchEmbeddingRetriever.__init__
def __init__(*,
document_store: ElasticsearchDocumentStore,
filters: Optional[Dict[str, Any]] = None,
top_k: int = 10,
num_candidates: Optional[int] = None,
filter_policy: Union[str, FilterPolicy] = FilterPolicy.REPLACE)
创建 ElasticsearchEmbeddingRetriever 组件。
参数:
document_store: ElasticsearchDocumentStore 的一个实例。filters: 应用于检索到的文档的过滤器。在近似 KNN 搜索期间应用过滤器,以确保返回 top_k 个匹配的文档。top_k: 返回的最大文档数。num_candidates: 每个分片上的近似最近邻候选数量。默认为 top_k * 10。增加此值将提高搜索精度,但会降低搜索速度。您可以在 Elasticsearch文档中阅读更多相关信息。filter_policy: 确定如何应用过滤器的策略。
引发:
ValueError: 如果document_store不是 ElasticsearchDocumentStore 的实例。
ElasticsearchEmbeddingRetriever.to_dict
def to_dict() -> Dict[str, Any]
将组件序列化为字典。
返回值:
包含序列化数据的字典。
ElasticsearchEmbeddingRetriever.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ElasticsearchEmbeddingRetriever"
从字典反序列化组件。
参数:
data: 要反序列化的字典。
返回值:
反序列化后的组件。
ElasticsearchEmbeddingRetriever.run
@component.output_types(documents=List[Document])
def run(query_embedding: List[float],
filters: Optional[Dict[str, Any]] = None,
top_k: Optional[int] = None) -> Dict[str, List[Document]]
使用向量相似性度量检索文档。
参数:
query_embedding: 查询的嵌入。filters: 在从 Document Store 获取文档时应用的过滤器。在近似 kNN 搜索期间应用过滤器,以确保 Retriever 返回top_k个匹配文档。运行时过滤器的应用方式取决于初始化 Retriever 时选择的filter_policy。top_k: 要返回的最大文档数。
返回值:
包含以下键的字典
documents: 匹配查询的Document列表。Documents 与给定query_embedding
ElasticsearchEmbeddingRetriever.run_async
@component.output_types(documents=List[Document])
async def run_async(query_embedding: List[float],
filters: Optional[Dict[str, Any]] = None,
top_k: Optional[int] = None) -> Dict[str, List[Document]]
使用向量相似性度量异步检索文档。
参数:
query_embedding: 查询的嵌入。filters: 在从 Document Store 获取文档时应用的过滤器。在近似 kNN 搜索期间应用过滤器,以确保 Retriever 返回top_k个匹配文档。运行时过滤器的应用方式取决于初始化 Retriever 时选择的filter_policy。top_k: 要返回的最大文档数。
返回值:
包含以下键的字典
documents: 匹配查询的Document列表。Document列表。
模块 haystack_integrations.document_stores.elasticsearch.document_store
ElasticsearchDocumentStore
一个 ElasticsearchDocumentStore 实例,可与 Elastic Cloud 或您自己的 Elasticsearch 集群配合使用。
用法示例(Elastic Cloud)
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
document_store = ElasticsearchDocumentStore(
api_key_id=Secret.from_env_var("ELASTIC_API_KEY_ID", strict=False),
api_key=Secret.from_env_var("ELASTIC_API_KEY", strict=False),
)
用法示例(自托管 Elasticsearch 实例)
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
document_store = ElasticsearchDocumentStore(hosts="https://:9200")
在上面的示例中,我们禁用了安全连接,仅为展示基本用法。我们强烈建议启用安全功能,以确保只有授权用户才能访问您的数据。
有关如何连接到 Elasticsearch 和配置安全性的更多详细信息,请参阅官方 Elasticsearch文档。
所有额外的关键字参数都将传递给 Elasticsearch 客户端。
ElasticsearchDocumentStore.__init__
def __init__(
*,
hosts: Optional[Hosts] = None,
custom_mapping: Optional[Dict[str, Any]] = None,
index: str = "default",
api_key: Secret = Secret.from_env_var("ELASTIC_API_KEY", strict=False),
api_key_id: Secret = Secret.from_env_var("ELASTIC_API_KEY_ID",
strict=False),
embedding_similarity_function: Literal["cosine", "dot_product",
"l2_norm",
"max_inner_product"] = "cosine",
**kwargs: Any)
创建一个新的 ElasticsearchDocumentStore 实例。
它还会尝试创建索引(如果尚不存在)。否则,将使用现有索引。
还可以设置用于比较文档嵌入的相似度函数。这在使用 Pipeline 中的ElasticsearchDocumentStore 和ElasticsearchEmbeddingRetriever.
有关连接参数的更多信息,请参阅官方 Elasticsearch文档。
有关支持的 kwargs 的完整列表,请参阅官方 Elasticsearch参考。
身份验证通过 Secret 对象提供,默认从环境变量加载。您可以同时提供api_key_id 和api_key,或者只提供api_key,其中包含 base64 编码的id:secret 字符串。Secret 实例也可以使用Secret.from_token() 方法从令牌加载。
参数:
hosts: 运行 Elasticsearch 客户端的主机列表。custom_mapping: 索引的自定义映射。如果未提供,则使用默认映射。index: Elasticsearch 中的索引名称。api_key: 一个 Secret 对象,包含用于身份验证的 API 密钥,或者包含用“:”分隔的 secret 和 id 的 base64 编码字符串,用于向 Elasticsearch 进行身份验证。api_key_id: 一个 Secret 对象,包含用于向 Elasticsearch 进行身份验证的 API 密钥 ID。embedding_similarity_function: 用于比较文档嵌入的相似度函数。此参数仅在索引尚不存在且正在创建时生效。要选择最合适的函数,请查找有关您的嵌入模型的信息。要了解文档分数是如何计算的,请参阅 Elasticsearch文档。**kwargs: 可选参数,Elasticsearch接受。
ElasticsearchDocumentStore.client
@property
def client() -> Elasticsearch
返回同步 Elasticsearch 客户端,并在必要时进行初始化。
ElasticsearchDocumentStore.async_client
@property
def async_client() -> AsyncElasticsearch
返回异步 Elasticsearch 客户端,并在必要时进行初始化。
ElasticsearchDocumentStore.to_dict
def to_dict() -> Dict[str, Any]
将组件序列化为字典。
返回值:
包含序列化数据的字典。
ElasticsearchDocumentStore.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ElasticsearchDocumentStore"
从字典反序列化组件。
参数:
data: 要反序列化的字典。
返回值:
反序列化后的组件。
ElasticsearchDocumentStore.count_documents
def count_documents() -> int
返回文档存储中存在的文档数量。
返回值:
文档存储中的文档数量。
ElasticsearchDocumentStore.count_documents_async
async def count_documents_async() -> int
异步返回文档存储中存在的文档数量。
返回值:
文档存储中的文档数量。
ElasticsearchDocumentStore.filter_documents
def filter_documents(
filters: Optional[Dict[str, Any]] = None) -> List[Document]
文档存储的主要查询方法。它检索所有匹配过滤器的文档。
参数:
filters: 要应用的过滤器字典。有关过滤器结构的更多信息,请参阅官方 Elasticsearch文档。
返回值:
匹配过滤器的Document列表。Document列表。
ElasticsearchDocumentStore.filter_documents_async
async def filter_documents_async(
filters: Optional[Dict[str, Any]] = None) -> List[Document]
异步检索所有匹配过滤器的文档。
参数:
filters: 要应用的过滤器字典。有关过滤器结构的更多信息,请参阅官方 Elasticsearch文档。
返回值:
匹配过滤器的Document列表。Document列表。
ElasticsearchDocumentStore.write_documents
def write_documents(documents: List[Document],
policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int
将Documents 写入 Elasticsearch。
参数:
documents: 要写入文档存储的文档列表。policy: 当文档存储中已存在具有相同 ID 的文档时应用的 DuplicatePolicy。
引发:
ValueError: 如果documents不是Documents 的列表。Documents 的列表。DuplicateDocumentError: 如果文档存储中已存在具有相同 ID 的文档,并且policy设置为DuplicatePolicy.FAIL或DuplicatePolicy.NONE.DocumentStoreError: 在将文档写入文档存储时发生错误。
返回值:
写入文档存储的文档数量。
ElasticsearchDocumentStore.write_documents_async
async def write_documents_async(
documents: List[Document],
policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int
异步写入Documents 写入 Elasticsearch。
参数:
documents: 要写入文档存储的文档列表。policy: 当文档存储中已存在具有相同 ID 的文档时应用的 DuplicatePolicy。
引发:
ValueError: 如果documents不是Documents 的列表。Documents 的列表。DuplicateDocumentError: 如果文档存储中已存在具有相同 ID 的文档,并且policy设置为DuplicatePolicy.FAIL或DuplicatePolicy.NONE.DocumentStoreError: 在将文档写入文档存储时发生错误。
返回值:
写入文档存储的文档数量。
ElasticsearchDocumentStore.delete_documents
def delete_documents(document_ids: List[str]) -> None
从文档存储中删除所有具有匹配 document_ids 的文档。
参数:
document_ids: 要删除的文档 ID
ElasticsearchDocumentStore.delete_documents_async
async def delete_documents_async(document_ids: List[str]) -> None
异步地从文档存储中删除所有具有匹配 document_ids 的文档。
参数:
document_ids: 要删除的文档 ID
ElasticsearchDocumentStore.delete_all_documents
def delete_all_documents(recreate_index: bool = False) -> None
删除文档存储中的所有文档。
一种快速清除文档存储中所有文档的方法,同时保留任何索引设置和映射。
参数:
recreate_index: 如果为 True,则将删除并重新创建索引,并使用原始映射和设置。如果为 False,则将使用delete_by_queryAPI 删除所有文档。
ElasticsearchDocumentStore.delete_all_documents_async
async def delete_all_documents_async(recreate_index: bool = False) -> None
异步删除文档存储中的所有文档。
一种快速清除文档存储中所有文档的方法,同时保留任何索引设置和映射。
参数:
recreate_index: 如果为 True,则将删除并重新创建索引,并使用原始映射和设置。如果为 False,则将使用delete_by_queryAPI 删除所有文档。
