文档API参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
文档

S3Downloader

S3Downloader将文件从 AWS S3 存储桶下载到本地文件系统,并用本地文件路径丰富文档。

pipeline 中的最常见位置在需要本地文件路径的文件转换器或路由器之前
必需的初始化变量"file_root_path": 文件将被下载到的路径。可以使用以下方式设置FILE_ROOT_PATH 环境变量。

"aws_access_key_id": AWS 访问密钥 ID。可以使用 AWS_ACCESS_KEY_ID 环境变量设置。

"aws_secret_access_key": AWS 密钥访问密钥。可以使用 AWS_SECRET_ACCESS_KEY 环境变量设置。

"aws_region_name": AWS 区域名称。可以使用 AWS_DEFAULT_REGION 环境变量设置。
强制运行变量"documents": 包含要下载文件的名称(在元数据中)的文档列表。
输出变量"documents": 一份文档列表,其中包含本地文件路径,位于meta['file_path']
API 参考S3Downloader
GitHub 链接https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/amazon_bedrock

概述

S3Downloader 将文件从 AWS S3 存储桶下载到您的本地文件系统,并用本地文件路径丰富 Document 对象。此组件对于需要处理存储在 S3 中的文件的管道非常有用,例如 PDF、图像或文本文件。

该组件默认通过环境变量支持 AWS 身份验证。您可以设置AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,以及AWS_DEFAULT_REGION 环境变量。或者,您可以使用 Secret API 在初始化时直接传递凭据。

from haystack.utils import Secret
from haystack_integrations.components.downloaders.s3 import S3Downloader

downloader = S3Downloader(
    aws_access_key_id=Secret.from_token("<your-access-key-id>"),
    aws_secret_access_key=Secret.from_token("<your-secret-access-key>"),
    aws_region_name=Secret.from_token("<your-region>"),
    file_root_path="/path/to/download/directory"
)

该组件使用max_workers 参数(默认为 32 个工作进程)并行下载多个文件,以加快处理大量文档的速度。下载的文件会在本地缓存,当缓存超过max_cache_size(默认为 100 个文件)时,最近最少访问的文件会自动删除。已下载的文件会被触碰以更新其访问时间,而无需重新下载。

📘

必需的配置

该组件需要两个关键配置:

  1. file_root_path 参数或FILE_ROOT_PATH 环境变量:指定文件将被下载的位置。调用warm_up() 时,如果不存在,此目录将被创建。
  2. S3_DOWNLOADER_BUCKET 环境变量:指定要从中下载文件的 S3 存储桶。

可选的环境变量S3_DOWNLOADER_PREFIX 可以设置,以将文件的前缀添加到所有生成的 S3 键中。

文件扩展名过滤

您可以使用file_extensions 参数,用于仅下载特定类型的文件,从而减少不必要的下载和处理时间。例如,file_extensions=[".pdf", ".txt"] 只下载 PDF 和 TXT 文件,而跳过其他文件。

自定义 S3 键生成

默认情况下,该组件使用 Document 元数据中的file_name 作为 S3 键。如果您的 S3 文件结构与元数据中的文件名不匹配,您可以提供一个可选的s3_key_generation_function 来自定义如何从 Document 元数据生成 S3 键。

用法

您需要安装amazon-bedrock-haystack 包以使用S3Downloader:

pip install amazon-bedrock-haystack

单独使用

在运行示例之前,请确保已设置必需的环境变量。

export AWS_ACCESS_KEY_ID="<your-access-key-id>"
export AWS_SECRET_ACCESS_KEY="<your-secret-access-key>"
export AWS_DEFAULT_REGION="<your-region>"
export S3_DOWNLOADER_BUCKET="<your-bucket-name>"

以下是如何使用S3Downloader 从 S3 下载文件。

from haystack.dataclasses import Document
from haystack_integrations.components.downloaders.s3 import S3Downloader

# Create documents with file names in metadata
documents = [
    Document(meta={"file_name": "report.pdf"}),
    Document(meta={"file_name": "data.txt"}),
]

# Initialize the downloader
downloader = S3Downloader(file_root_path="/tmp/s3_downloads")

# Warm up the component
downloader.warm_up()

# Download the files
result = downloader.run(documents=documents)

# Access the downloaded files
for doc in result["documents"]:
    print(f"File downloaded to: {doc.meta['file_path']}")

使用文件扩展名过滤

from haystack.dataclasses import Document
from haystack_integrations.components.downloaders.s3 import S3Downloader

documents = [
    Document(meta={"file_name": "report.pdf"}),
    Document(meta={"file_name": "image.png"}),
    Document(meta={"file_name": "data.txt"}),
]

# Only download PDF files
downloader = S3Downloader(
    file_root_path="/tmp/s3_downloads",
    file_extensions=[".pdf"]
)

downloader.warm_up()

result = downloader.run(documents=documents)

# Only report.pdf is downloaded
print(f"Downloaded {len(result['documents'])} file(s)")
# Output: Downloaded 1 file(s)

使用自定义 S3 键生成

from haystack.dataclasses import Document
from haystack_integrations.components.downloaders.s3 import S3Downloader

def custom_s3_key_function(document: Document) -> str:
    """Generate S3 key from custom metadata."""
    folder = document.meta.get("folder", "default")
    file_name = document.meta.get("file_name")
    if not file_name:
        raise ValueError("Document must have 'file_name' in metadata")
    return f"{folder}/{file_name}"

documents = [
    Document(meta={"file_name": "report.pdf", "folder": "reports/2025"}),
]

downloader = S3Downloader(
    file_root_path="/tmp/s3_downloads",
    s3_key_generation_function=custom_s3_key_function
)

downloader.warm_up()
result = downloader.run(documents=documents)

在 pipeline 中

以下是使用S3Downloader 在文档处理管道中的示例。

from haystack import Pipeline
from haystack.components.converters import PDFMinerToDocument
from haystack.components.routers import DocumentTypeRouter
from haystack.dataclasses import Document

from haystack_integrations.components.downloaders.s3 import S3Downloader

# Create a pipeline
pipe = Pipeline()

# Add S3Downloader to download files from S3
pipe.add_component(
    "downloader", 
    S3Downloader(
        file_root_path="/tmp/s3_downloads",
        file_extensions=[".pdf", ".txt"]
    )
)

# Route documents by file type
pipe.add_component(
    "router", 
    DocumentTypeRouter(
        file_path_meta_field="file_path",
        mime_types=["application/pdf", "text/plain"]
    )
)

# Convert PDFs to documents
pipe.add_component("pdf_converter", PDFMinerToDocument())

# Connect components
pipe.connect("downloader.documents", "router.documents")
pipe.connect("router.application/pdf", "pdf_converter.documents")

# Create documents with S3 file names
documents = [
    Document(meta={"file_name": "report.pdf"}),
    Document(meta={"file_name": "summary.txt"}),
]

# Run the pipeline
result = pipe.run({"downloader": {"documents": documents}})

更复杂的示例,包括图像处理和 LLM

from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.converters.image import DocumentToImageContent
from haystack.components.routers import DocumentTypeRouter
from haystack.dataclasses import Document

from haystack_integrations.components.downloaders.s3 import S3Downloader
from haystack_integrations.components.generators.amazon_bedrock import AmazonBedrockChatGenerator

# Create documents with file names
documents = [
    Document(meta={"file_name": "chart.png"}),
    Document(meta={"file_name": "report.pdf"}),
]

# Create pipeline
pipe = Pipeline()

# Download files from S3
pipe.add_component(
    "downloader",
    S3Downloader(file_root_path="/tmp/s3_downloads")
)

# Route by document type
pipe.add_component(
    "router",
    DocumentTypeRouter(
        file_path_meta_field="file_path",
        mime_types=["image/png", "application/pdf"]
    )
)

# Convert images for LLM
pipe.add_component("image_converter", DocumentToImageContent(detail="auto"))

# Create chat prompt with template
template = """{% message role="user" %}
Answer the question based on the provided images.

Question: {{ question }}

{% for image in image_contents %}
{{ image | templatize_part }}
{% endfor %}
{% endmessage %}"""

pipe.add_component(
    "prompt_builder",
    ChatPromptBuilder(template=template)
)

# Generate response
pipe.add_component(
    "llm",
    AmazonBedrockChatGenerator(model="anthropic.claude-3-haiku-20240307-v1:0")
)

# Connect components
pipe.connect("downloader.documents", "router.documents")
pipe.connect("router.image/png", "image_converter.documents")
pipe.connect("image_converter.image_contents", "prompt_builder.image_contents")
pipe.connect("prompt_builder.prompt", "llm.messages")

# Run pipeline
result = pipe.run({
    "downloader": {"documents": documents},
    "prompt_builder": {"question": "What information is shown in the chart?"}
})