Snowflake 集成 (Haystack)
模块 haystack_integrations.components.retrievers.snowflake.snowflake_table_retriever
SnowflakeTableRetriever
使用 ADBC 和 Polars 连接到 Snowflake 数据库并执行 SQL 查询。将结果作为 Pandas DataFrame(从 Polars DataFrame 转换而来)以及 Markdown 格式的字符串返回。有关更多信息,请参阅 Polars 文档 和 ADBC 文档。
使用示例
密码认证
executor = SnowflakeTableRetriever(
user="<ACCOUNT-USER>",
account="<ACCOUNT-IDENTIFIER>",
authenticator="SNOWFLAKE",
api_key=Secret.from_env_var("SNOWFLAKE_API_KEY"),
database="<DATABASE-NAME>",
db_schema="<SCHEMA-NAME>",
warehouse="<WAREHOUSE-NAME>",
)
密钥对认证 (MFA)
executor = SnowflakeTableRetriever(
user="<ACCOUNT-USER>",
account="<ACCOUNT-IDENTIFIER>",
authenticator="SNOWFLAKE_JWT",
private_key_file=Secret.from_env_var("SNOWFLAKE_PRIVATE_KEY_FILE"),
private_key_file_pwd=Secret.from_env_var("SNOWFLAKE_PRIVATE_KEY_PWD"),
database="<DATABASE-NAME>",
db_schema="<SCHEMA-NAME>",
warehouse="<WAREHOUSE-NAME>",
)
OAuth 认证 (MFA)
executor = SnowflakeTableRetriever(
user="<ACCOUNT-USER>",
account="<ACCOUNT-IDENTIFIER>",
authenticator="OAUTH",
oauth_client_id=Secret.from_env_var("SNOWFLAKE_OAUTH_CLIENT_ID"),
oauth_client_secret=Secret.from_env_var("SNOWFLAKE_OAUTH_CLIENT_SECRET"),
oauth_token_request_url="<TOKEN-REQUEST-URL>",
database="<DATABASE-NAME>",
db_schema="<SCHEMA-NAME>",
warehouse="<WAREHOUSE-NAME>",
)
执行查询
query = "SELECT * FROM table_name"
results = executor.run(query=query)
>> print(results["dataframe"].head(2))
column1 column2 column3
0 123 'data1' 2024-03-20
1 456 'data2' 2024-03-21
>> print(results["table"])
shape: (3, 3)
| column1 | column2 | column3 |
|---------|---------|------------|
| int | str | date |
|---------|---------|------------|
| 123 | data1 | 2024-03-20 |
| 456 | data2 | 2024-03-21 |
| 789 | data3 | 2024-03-22 |
SnowflakeTableRetriever.__init__
def __init__(user: str,
account: str,
authenticator: Literal["SNOWFLAKE", "SNOWFLAKE_JWT", "OAUTH"],
api_key: Optional[Secret] = None,
database: Optional[str] = None,
db_schema: Optional[str] = None,
warehouse: Optional[str] = None,
login_timeout: Optional[int] = 60,
return_markdown: bool = True,
private_key_file: Optional[Secret] = None,
private_key_file_pwd: Optional[Secret] = None,
oauth_client_id: Optional[Secret] = None,
oauth_client_secret: Optional[Secret] = None,
oauth_token_request_url: Optional[str] = None,
oauth_authorization_url: Optional[str] = None) -> None
参数:
user: 用户登录名。account: Snowflake 账户标识符。authenticator: 认证方法。必需。选项:“SNOWFLAKE”(密码)、“SNOWFLAKE_JWT”(密钥对)或“OAUTH”。api_key: Snowflake 账户密码。SNOWFLAKE 认证必需。database: 要使用的数据库名称。db_schema: 要使用的模式名称。warehouse: 要使用的仓库名称。login_timeout: 登录超时(秒)。return_markdown: 是否返回 DataFrame 的 Markdown 格式字符串。private_key_file: 包含私钥文件路径的密钥。SNOWFLAKE_JWT 认证必需。private_key_file_pwd: 包含私钥文件密码短语的密钥。仅当私钥文件加密时必需。oauth_client_id: 包含 OAuth 客户端 ID 的密钥。OAUTH 认证必需。oauth_client_secret: 包含 OAuth 客户端密钥的密钥。OAUTH 认证必需。oauth_token_request_url: 客户端凭据流程的 OAuth 令牌请求 URL。oauth_authorization_url: 授权码流程的 OAuth 授权 URL。
SnowflakeTableRetriever.test_connection
def test_connection() -> bool
使用当前的认证设置测试连接。
返回值:
如果连接成功,则返回 True,否则返回 False。
SnowflakeTableRetriever.to_dict
def to_dict() -> Dict[str, Any]
将组件序列化为字典。
返回值:
包含序列化数据的字典。
SnowflakeTableRetriever.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "SnowflakeTableRetriever"
从字典反序列化组件。
参数:
data: 要反序列化的字典。
返回值:
反序列化后的组件。
SnowflakeTableRetriever.run
@component.output_types(dataframe=DataFrame, table=str)
def run(query: str, return_markdown: Optional[bool] = None) -> Dict[str, Any]
使用 ADBC 和 Polars 对 Snowflake 数据库执行 SQL 查询。
参数:
query: 要执行的 SQL 查询。return_markdown: 是否返回 DataFrame 的 Markdown 格式字符串。如果未提供,则使用初始化时设置的值。
返回值:
一个包含
"dataframe": 包含查询结果的 Pandas DataFrame。"table": DataFrame 的 Markdown 格式字符串表示。
