S3Downloader
S3Downloader将文件从 AWS S3 存储桶下载到本地文件系统,并用本地文件路径丰富文档。
| pipeline 中的最常见位置 | 在需要本地文件路径的文件转换器或路由器之前 |
| 必需的初始化变量 | "file_root_path": 文件将被下载到的路径。可以使用以下方式设置FILE_ROOT_PATH 环境变量。"aws_access_key_id": AWS 访问密钥 ID。可以使用 AWS_ACCESS_KEY_ID 环境变量设置。 "aws_secret_access_key": AWS 密钥访问密钥。可以使用 AWS_SECRET_ACCESS_KEY 环境变量设置。 "aws_region_name": AWS 区域名称。可以使用 AWS_DEFAULT_REGION 环境变量设置。 |
| 强制运行变量 | "documents": 包含要下载文件的名称(在元数据中)的文档列表。 |
| 输出变量 | "documents": 一份文档列表,其中包含本地文件路径,位于meta['file_path'] |
| API 参考 | S3Downloader |
| GitHub 链接 | https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/amazon_bedrock |
概述
S3Downloader 将文件从 AWS S3 存储桶下载到您的本地文件系统,并用本地文件路径丰富 Document 对象。此组件对于需要处理存储在 S3 中的文件的管道非常有用,例如 PDF、图像或文本文件。
该组件默认通过环境变量支持 AWS 身份验证。您可以设置AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,以及AWS_DEFAULT_REGION 环境变量。或者,您可以使用 Secret API 在初始化时直接传递凭据。
from haystack.utils import Secret
from haystack_integrations.components.downloaders.s3 import S3Downloader
downloader = S3Downloader(
aws_access_key_id=Secret.from_token("<your-access-key-id>"),
aws_secret_access_key=Secret.from_token("<your-secret-access-key>"),
aws_region_name=Secret.from_token("<your-region>"),
file_root_path="/path/to/download/directory"
)
该组件使用max_workers 参数(默认为 32 个工作进程)并行下载多个文件,以加快处理大量文档的速度。下载的文件会在本地缓存,当缓存超过max_cache_size(默认为 100 个文件)时,最近最少访问的文件会自动删除。已下载的文件会被触碰以更新其访问时间,而无需重新下载。
必需的配置
该组件需要两个关键配置:
file_root_path参数或FILE_ROOT_PATH环境变量:指定文件将被下载的位置。调用warm_up()时,如果不存在,此目录将被创建。S3_DOWNLOADER_BUCKET环境变量:指定要从中下载文件的 S3 存储桶。
可选的环境变量S3_DOWNLOADER_PREFIX 可以设置,以将文件的前缀添加到所有生成的 S3 键中。
文件扩展名过滤
您可以使用file_extensions 参数,用于仅下载特定类型的文件,从而减少不必要的下载和处理时间。例如,file_extensions=[".pdf", ".txt"] 只下载 PDF 和 TXT 文件,而跳过其他文件。
自定义 S3 键生成
默认情况下,该组件使用 Document 元数据中的file_name 作为 S3 键。如果您的 S3 文件结构与元数据中的文件名不匹配,您可以提供一个可选的s3_key_generation_function 来自定义如何从 Document 元数据生成 S3 键。
用法
您需要安装amazon-bedrock-haystack 包以使用S3Downloader:
pip install amazon-bedrock-haystack
单独使用
在运行示例之前,请确保已设置必需的环境变量。
export AWS_ACCESS_KEY_ID="<your-access-key-id>"
export AWS_SECRET_ACCESS_KEY="<your-secret-access-key>"
export AWS_DEFAULT_REGION="<your-region>"
export S3_DOWNLOADER_BUCKET="<your-bucket-name>"
以下是如何使用S3Downloader 从 S3 下载文件。
from haystack.dataclasses import Document
from haystack_integrations.components.downloaders.s3 import S3Downloader
# Create documents with file names in metadata
documents = [
Document(meta={"file_name": "report.pdf"}),
Document(meta={"file_name": "data.txt"}),
]
# Initialize the downloader
downloader = S3Downloader(file_root_path="/tmp/s3_downloads")
# Warm up the component
downloader.warm_up()
# Download the files
result = downloader.run(documents=documents)
# Access the downloaded files
for doc in result["documents"]:
print(f"File downloaded to: {doc.meta['file_path']}")
使用文件扩展名过滤
from haystack.dataclasses import Document
from haystack_integrations.components.downloaders.s3 import S3Downloader
documents = [
Document(meta={"file_name": "report.pdf"}),
Document(meta={"file_name": "image.png"}),
Document(meta={"file_name": "data.txt"}),
]
# Only download PDF files
downloader = S3Downloader(
file_root_path="/tmp/s3_downloads",
file_extensions=[".pdf"]
)
downloader.warm_up()
result = downloader.run(documents=documents)
# Only report.pdf is downloaded
print(f"Downloaded {len(result['documents'])} file(s)")
# Output: Downloaded 1 file(s)
使用自定义 S3 键生成
from haystack.dataclasses import Document
from haystack_integrations.components.downloaders.s3 import S3Downloader
def custom_s3_key_function(document: Document) -> str:
"""Generate S3 key from custom metadata."""
folder = document.meta.get("folder", "default")
file_name = document.meta.get("file_name")
if not file_name:
raise ValueError("Document must have 'file_name' in metadata")
return f"{folder}/{file_name}"
documents = [
Document(meta={"file_name": "report.pdf", "folder": "reports/2025"}),
]
downloader = S3Downloader(
file_root_path="/tmp/s3_downloads",
s3_key_generation_function=custom_s3_key_function
)
downloader.warm_up()
result = downloader.run(documents=documents)
在 pipeline 中
以下是使用S3Downloader 在文档处理管道中的示例。
from haystack import Pipeline
from haystack.components.converters import PDFMinerToDocument
from haystack.components.routers import DocumentTypeRouter
from haystack.dataclasses import Document
from haystack_integrations.components.downloaders.s3 import S3Downloader
# Create a pipeline
pipe = Pipeline()
# Add S3Downloader to download files from S3
pipe.add_component(
"downloader",
S3Downloader(
file_root_path="/tmp/s3_downloads",
file_extensions=[".pdf", ".txt"]
)
)
# Route documents by file type
pipe.add_component(
"router",
DocumentTypeRouter(
file_path_meta_field="file_path",
mime_types=["application/pdf", "text/plain"]
)
)
# Convert PDFs to documents
pipe.add_component("pdf_converter", PDFMinerToDocument())
# Connect components
pipe.connect("downloader.documents", "router.documents")
pipe.connect("router.application/pdf", "pdf_converter.documents")
# Create documents with S3 file names
documents = [
Document(meta={"file_name": "report.pdf"}),
Document(meta={"file_name": "summary.txt"}),
]
# Run the pipeline
result = pipe.run({"downloader": {"documents": documents}})
更复杂的示例,包括图像处理和 LLM
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.converters.image import DocumentToImageContent
from haystack.components.routers import DocumentTypeRouter
from haystack.dataclasses import Document
from haystack_integrations.components.downloaders.s3 import S3Downloader
from haystack_integrations.components.generators.amazon_bedrock import AmazonBedrockChatGenerator
# Create documents with file names
documents = [
Document(meta={"file_name": "chart.png"}),
Document(meta={"file_name": "report.pdf"}),
]
# Create pipeline
pipe = Pipeline()
# Download files from S3
pipe.add_component(
"downloader",
S3Downloader(file_root_path="/tmp/s3_downloads")
)
# Route by document type
pipe.add_component(
"router",
DocumentTypeRouter(
file_path_meta_field="file_path",
mime_types=["image/png", "application/pdf"]
)
)
# Convert images for LLM
pipe.add_component("image_converter", DocumentToImageContent(detail="auto"))
# Create chat prompt with template
template = """{% message role="user" %}
Answer the question based on the provided images.
Question: {{ question }}
{% for image in image_contents %}
{{ image | templatize_part }}
{% endfor %}
{% endmessage %}"""
pipe.add_component(
"prompt_builder",
ChatPromptBuilder(template=template)
)
# Generate response
pipe.add_component(
"llm",
AmazonBedrockChatGenerator(model="anthropic.claude-3-haiku-20240307-v1:0")
)
# Connect components
pipe.connect("downloader.documents", "router.documents")
pipe.connect("router.image/png", "image_converter.documents")
pipe.connect("image_converter.image_contents", "prompt_builder.image_contents")
pipe.connect("prompt_builder.prompt", "llm.messages")
# Run pipeline
result = pipe.run({
"downloader": {"documents": documents},
"prompt_builder": {"question": "What information is shown in the chart?"}
})
更新于 4 天前
