文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
API 参考

MCP

Haystack 集成的 MCP

Module haystack_integrations.tools.mcp.mcp_tool

AsyncExecutor

用于从同步上下文中运行异步代码的线程安全事件循环执行器。

AsyncExecutor.get_instance

@classmethod
def get_instance(cls) -> "AsyncExecutor"

获取或创建全局单例执行器实例。

AsyncExecutor.__init__

def __init__()

初始化一个专用的事件循环

AsyncExecutor.run

def run(coro: Coroutine[Any, Any, Any], timeout: float | None = None) -> Any

在事件循环中运行协程。

参数:

  • coro: 要执行的协程
  • timeout: 可选的超时(秒)

引发:

  • TimeoutError: 如果执行超过超时

返回值:

协程的结果

AsyncExecutor.get_loop

def get_loop()

获取事件循环。

返回值:

事件循环

AsyncExecutor.run_background

def run_background(
    coro_factory: Callable[[asyncio.Event], Coroutine[Any, Any, Any]],
    timeout: float | None = None
) -> tuple[concurrent.futures.Future[Any], asyncio.Event]

计划coro_factory 在执行器的事件循环中运行,**不**阻塞

调用者线程。

工厂接收一个 :classasyncio.Event,可用于协同关闭协程。该方法返回**未来**(用于观察完成或失败)和创建的 *stop_event*,以便调用者可以发出终止信号。

参数:

  • coro_factory: 一个接收 stop_event 并返回要执行的协程的可调用对象。
  • timeout: 创建 stop_event 时等待的可选超时。

返回值:

元组(future, stop_event).

AsyncExecutor.shutdown

def shutdown(timeout: float = 2) -> None

关闭后台事件循环和线程。

参数:

  • timeout: 关闭事件循环的超时(秒)

MCPError

MCP 相关错误的基类。

MCPError.__init__

def __init__(message: str) -> None

初始化 MCPError。

参数:

  • message: 描述性错误消息

MCPConnectionError

连接到 MCP 服务器时出错。

MCPConnectionError.__init__

def __init__(message: str,
             server_info: "MCPServerInfo | None" = None,
             operation: str | None = None) -> None

初始化 MCPConnectionError。

参数:

  • message: 描述性错误消息
  • server_info: 使用过的服务器连接信息
  • operation: 尝试的操作的名称

MCPToolNotFoundError

服务器上找不到工具时出错。

MCPToolNotFoundError.__init__

def __init__(message: str,
             tool_name: str,
             available_tools: list[str] | None = None) -> None

初始化 MCPToolNotFoundError。

参数:

  • message: 描述性错误消息
  • tool_name: 请求但未找到的工具的名称
  • available_tools: 可用工具名称列表(如果已知)

MCPInvocationError

工具调用期间出错。

MCPInvocationError.__init__

def __init__(message: str,
             tool_name: str,
             tool_args: dict[str, Any] | None = None) -> None

初始化 MCPInvocationError。

参数:

  • message: 描述性错误消息
  • tool_name: 正在调用的工具的名称
  • tool_args: 传递给工具的参数

MCPClient

MCP 客户端的抽象基类。

此类定义了所有 MCP 客户端的通用接口和共享功能,无论使用何种传输机制。

MCPClient.connect

@abstractmethod
async def connect() -> list[types.Tool]

连接到 MCP 服务器。

引发:

  • MCPConnectionError: 如果服务器连接失败

返回值:

服务器上可用工具的列表

MCPClient.call_tool

async def call_tool(tool_name: str, tool_args: dict[str, Any]) -> str

调用已连接 MCP 服务器上的工具。

参数:

  • tool_name: 要调用的工具的名称
  • tool_args: 传递给工具的参数

引发:

  • MCPConnectionError: 如果未连接到 MCP 服务器
  • MCPInvocationError: 如果工具调用失败

返回值:

工具调用结果的 JSON 字符串表示

MCPClient.aclose

async def aclose() -> None

关闭连接并清理资源。

此方法确保即使发生错误也能正确释放所有资源。

StdioClient

使用 stdio 传输连接到服务器的 MCP 客户端。

StdioClient.__init__

def __init__(command: str,
             args: list[str] | None = None,
             env: dict[str, str | Secret] | None = None,
             max_retries: int = 3,
             base_delay: float = 1.0,
             max_delay: float = 30.0) -> None

初始化 stdio MCP 客户端。

参数:

  • command: 要运行的命令(例如,“python”、“node”)
  • args: 要传递给命令的参数
  • env: 命令的环境变量
  • max_retries: 最大重试次数
  • base_delay: 指数退避的基准延迟(秒)

StdioClient.connect

async def connect() -> list[types.Tool]

使用 stdio 传输连接到 MCP 服务器。

引发:

  • MCPConnectionError: 如果服务器连接失败

返回值:

服务器上可用工具的列表

SSEClient

使用 SSE 传输连接到服务器的 MCP 客户端。

SSEClient.__init__

def __init__(server_info: "SSEServerInfo",
             max_retries: int = 3,
             base_delay: float = 1.0,
             max_delay: float = 30.0) -> None

使用服务器配置初始化 SSE MCP 客户端。

参数:

  • server_info: 包含 URL、令牌、超时等的配置对象。
  • max_retries: 最大重试次数
  • base_delay: 指数退避的基准延迟(秒)

SSEClient.connect

async def connect() -> list[types.Tool]

使用 SSE 传输连接到 MCP 服务器。

引发:

  • MCPConnectionError: 如果服务器连接失败

返回值:

服务器上可用工具的列表

StreamableHttpClient

使用可流式传输的 HTTP 传输连接到服务器的 MCP 客户端。

StreamableHttpClient.__init__

def __init__(server_info: "StreamableHttpServerInfo",
             max_retries: int = 3,
             base_delay: float = 1.0,
             max_delay: float = 30.0) -> None

使用服务器配置初始化可流式传输的 HTTP MCP 客户端。

参数:

  • server_info: 包含 URL、令牌、超时等的配置对象。
  • max_retries: 最大重试次数
  • base_delay: 指数退避的基准延迟(秒)

StreamableHttpClient.connect

async def connect() -> list[types.Tool]

使用可流式传输的 HTTP 传输连接到 MCP 服务器。

引发:

  • MCPConnectionError: 如果服务器连接失败

返回值:

服务器上可用工具的列表

MCPServerInfo

MCP 服务器连接参数的抽象基类。

此类定义了所有 MCP 服务器连接类型的通用接口。

MCPServerInfo.create_client

@abstractmethod
def create_client() -> MCPClient

为此服务器信息创建适当的 MCP 客户端。

返回值:

已使用此服务器信息配置的 MCPClient 实例

MCPServerInfo.to_dict

def to_dict() -> dict[str, Any]

将此服务器信息序列化为字典。

返回值:

此服务器信息的字典表示

MCPServerInfo.from_dict

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MCPServerInfo"

从字典反序列化服务器信息。

参数:

  • data: 包含序列化服务器信息的字典

返回值:

适当的服务器信息类的实例

SSEServerInfo

封装 SSE MCP 服务器连接参数的数据类。

对于包含敏感数据的身份验证令牌,您可以使用 Secret 对象进行安全处理和序列化

server_info = SSEServerInfo(
    url="https://my-mcp-server.com",
    token=Secret.from_env_var("API_KEY"),
)

参数:

  • url: MCP 服务器的完整 URL(包括 /sse 端点)
  • base_url: MCP 服务器的基本 URL(已弃用,请改用 url)
  • token: 服务器的身份验证令牌(可选)
  • timeout: 连接超时(秒)

base_url

已弃用

SSEServerInfo.__post_init__

def __post_init__()

验证是否提供了 url 或 base_url。

SSEServerInfo.create_client

def create_client() -> MCPClient

创建 SSE MCP 客户端。

返回值:

已配置的 MCPClient 实例

StreamableHttpServerInfo

封装可流式传输的 HTTP MCP 服务器连接参数的数据类。

对于包含敏感数据的身份验证令牌,您可以使用 Secret 对象进行安全处理和序列化

server_info = StreamableHttpServerInfo(
    url="https://my-mcp-server.com",
    token=Secret.from_env_var("API_KEY"),
)

参数:

  • url: MCP 服务器的完整 URL(可流式传输的 HTTP 端点)
  • token: 服务器的身份验证令牌(可选)
  • timeout: 连接超时(秒)

StreamableHttpServerInfo.__post_init__

def __post_init__()

验证 URL。

StreamableHttpServerInfo.create_client

def create_client() -> MCPClient

创建可流式传输的 HTTP MCP 客户端。

返回值:

已配置的 StreamableHttpClient 实例

StdioServerInfo

封装 stdio MCP 服务器连接参数的数据类。

参数:

  • command: 要运行的命令(例如,“python”、“node”)
  • args: 要传递给命令的参数
  • env: 命令的环境变量。对于包含敏感数据的环境变量,您可以使用 Secret 对象进行安全处理和序列化。
server_info = StdioServerInfo(
    command="uv",
    args=["run", "my-mcp-server"],
    env={
        "WORKSPACE_PATH": "/path/to/workspace",  # Plain string
        "API_KEY": Secret.from_env_var("API_KEY"),  # Secret object
    }
)

Secret 对象将被正确序列化和反序列化,而不会暴露 secret 值,而纯字符串将按原样保留。请使用 Secret 对象处理需要安全处理的敏感数据。

StdioServerInfo.create_client

def create_client() -> MCPClient

创建 stdio MCP 客户端。

返回值:

已配置的 StdioMCPClient 实例

MCPTool

代表 MCP 服务器上单个工具的工具。

此实现使用官方 MCP SDK 进行协议处理,同时保持与 Haystack 工具生态系统的兼容性。

响应处理

  • 文本和图像内容受支持,并作为 JSON 字符串返回
  • JSON 包含来自 MCP 服务器的结构化响应
  • 使用 json.loads() 将响应解析为字典

使用 Streamable HTTP 的示例

import json
from haystack_integrations.tools.mcp import MCPTool, StreamableHttpServerInfo

# Create tool instance
tool = MCPTool(
    name="multiply",
    server_info=StreamableHttpServerInfo(url="https://:8000/mcp")
)

# Use the tool and parse result
result_json = tool.invoke(a=5, b=3)
result = json.loads(result_json)

使用 SSE(已弃用)的示例

import json
from haystack.tools import MCPTool, SSEServerInfo

# Create tool instance
tool = MCPTool(
    name="add",
    server_info=SSEServerInfo(url="https://:8000/sse")
)

# Use the tool and parse result
result_json = tool.invoke(a=5, b=3)
result = json.loads(result_json)

使用 stdio 的示例

import json
from haystack.tools import MCPTool, StdioServerInfo

# Create tool instance
tool = MCPTool(
    name="get_current_time",
    server_info=StdioServerInfo(command="python", args=["path/to/server.py"])
)

# Use the tool and parse result
result_json = tool.invoke(timezone="America/New_York")
result = json.loads(result_json)

MCPTool.__init__

def __init__(name: str,
             server_info: MCPServerInfo,
             description: str | None = None,
             connection_timeout: int = 30,
             invocation_timeout: int = 30)

初始化 MCP 工具。

参数:

  • name: 要使用的工具的名称
  • server_info: 服务器连接信息
  • description: 自定义描述(如果为 None,将使用服务器描述)
  • connection_timeout: 服务器连接超时(秒)
  • invocation_timeout: 工具调用的默认超时(秒)

引发:

  • MCPConnectionError: 如果服务器连接失败
  • MCPToolNotFoundError: 如果没有可用的工具或找不到请求的工具
  • TimeoutError: 如果连接超时

MCPTool.ainvoke

async def ainvoke(**kwargs: Any) -> str

异步工具调用。

参数:

  • kwargs: 传递给工具的参数

引发:

  • MCPInvocationError: 如果工具调用失败
  • TimeoutError: 如果操作超时

返回值:

工具调用结果的 JSON 字符串表示

MCPTool.to_dict

def to_dict() -> dict[str, Any]

将 MCPTool 序列化为字典。

序列化保留了重新创建工具所需的所有信息,包括服务器连接参数和超时设置。请注意,活动连接不会被保留。

返回值:

字典,格式为:{"type": fully_qualified_class_name, "data": {parameters}}

MCPTool.from_dict

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Tool"

从字典反序列化 MCPTool。

此方法从序列化字典中重建 MCPTool 实例,包括重新创建 server_info 对象。将在初始化期间建立到 MCP 服务器的新连接。

参数:

  • data: 包含序列化工具数据的字典

引发:

  • None: 连接失败时可能引发的各种异常

返回值:

一个完全初始化的 MCPTool 实例

MCPTool.close

def close()

同步关闭工具。

MCPTool.__del__

def __del__()

在工具被垃圾回收时清理资源。

_MCPClientSessionManager

在 AsyncExecutor 的事件循环中运行 MCPClient 的连接/关闭。

生命周期

  1. 创建工作程序以在专用的后台循环中调度长时间运行的协程。
  2. 协程调用 mcp 客户端上的 *connect*;当它获取工具列表时,它会满足一个并发未来,以便同步线程可以继续。
  3. 然后它等待一个asyncio.Event.
  4. stop() 从任何线程设置事件。同一个协程然后调用 mcp 客户端上的 *close()* 并完成,而不会出现令人讨厌的Attempted to exit cancel scope in a different task than it was entered in 错误,从而正确关闭客户端。

_MCPClientSessionManager.tools

def tools() -> list[types.Tool]

返回在启动期间已收集的工具列表。

_MCPClientSessionManager.stop

def stop() -> None

请求工作程序关闭并阻塞直到完成。

Module haystack_integrations.tools.mcp.mcp_toolset

MCP Toolkits

一个 Toolset,它连接到 MCP(模型上下文协议)服务器并提供对其工具的访问。

MCP Toolset 动态发现并从任何 MCP 兼容服务器加载所有工具,支持基于网络的流式连接(Streamable HTTP、SSE)和基于本地进程的 stdio 连接。这种双重连接允许与远程和本地 MCP 服务器集成。

在 Haystack Pipeline 中使用 MCP Toolset 的示例

# Prerequisites:
# 1. pip install uvx mcp-server-time  # Install required MCP server and tools
# 2. export OPENAI_API_KEY="your-api-key"  # Set up your OpenAI API key

import os
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.tools import ToolInvoker
from haystack.dataclasses import ChatMessage
from haystack_integrations.tools.mcp import MCPToolset, StdioServerInfo

# Create server info for the time service (can also use SSEServerInfo for remote servers)
server_info = StdioServerInfo(command="uvx", args=["mcp-server-time", "--local-timezone=Europe/Berlin"])

# Create the toolset - this will automatically discover all available tools
# You can optionally specify which tools to include
mcp_toolset = MCPToolset(
    server_info=server_info,
    tool_names=["get_current_time"]  # Only include the get_current_time tool
)

# Create a pipeline with the toolset
pipeline = Pipeline()
pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini", tools=mcp_toolset))
pipeline.add_component("tool_invoker", ToolInvoker(tools=mcp_toolset))
pipeline.add_component(
    "adapter",
    OutputAdapter(
        template="{{ initial_msg + initial_tool_messages + tool_messages }}",
        output_type=list[ChatMessage],
        unsafe=True,
    ),
)
pipeline.add_component("response_llm", OpenAIChatGenerator(model="gpt-4o-mini"))
pipeline.connect("llm.replies", "tool_invoker.messages")
pipeline.connect("llm.replies", "adapter.initial_tool_messages")
pipeline.connect("tool_invoker.tool_messages", "adapter.tool_messages")
pipeline.connect("adapter.output", "response_llm.messages")

# Run the pipeline with a user question
user_input = "What is the time in New York? Be brief."
user_input_msg = ChatMessage.from_user(text=user_input)

result = pipeline.run({"llm": {"messages": [user_input_msg]}, "adapter": {"initial_msg": [user_input_msg]}})
print(result["response_llm"]["replies"][0].text)

您还可以使用 Streamable HTTP 的 Toolset 来与远程服务器通信

from haystack_integrations.tools.mcp import MCPToolset, StreamableHttpServerInfo

# Create the toolset with streamable HTTP connection
toolset = MCPToolset(
    server_info=StreamableHttpServerInfo(url="https://:8000/mcp"),
    tool_names=["multiply"]  # Optional: only include specific tools
)
# Use the toolset as shown in the pipeline example above

使用 SSE(已弃用)的示例

from haystack_integrations.tools.mcp import MCPToolset, SSEServerInfo
from haystack.components.tools import ToolInvoker

# Create the toolset with an SSE connection
sse_toolset = MCPToolset(
    server_info=SSEServerInfo(url="http://some-remote-server.com:8000/sse"),
    tool_names=["add", "subtract"]  # Only include specific tools
)

# Use the toolset as shown in the pipeline example above

MCPToolset.__init__

def __init__(server_info: MCPServerInfo,
             tool_names: list[str] | None = None,
             connection_timeout: float = 30.0,
             invocation_timeout: float = 30.0)

初始化 MCP toolset。

参数:

  • server_info: MCP 服务器的连接信息
  • tool_names: 可选的要包含的工具名称列表。如果提供,则只有名称匹配的工具将被添加到 toolset 中。
  • connection_timeout: 服务器连接超时(秒)
  • invocation_timeout: 工具调用的默认超时(秒)

引发:

  • MCPToolNotFoundError: 如果在服务器上找不到指定的任何工具名称

MCPToolset.to_dict

def to_dict() -> dict[str, Any]

将 MCP Toolset 序列化为字典。

返回值:

MCP Toolset 的字典表示

MCPToolset.from_dict

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MCPToolset"

从字典反序列化 MCP Toolset。

参数:

  • data: MCP Toolset 的字典表示

返回值:

一个新的 MCP Toolset 实例

MCPToolset.close

def close()

安全关闭底层的 MCP 客户端。