Haystack 集成的 MCP
Module haystack_integrations.tools.mcp.mcp_tool
AsyncExecutor
用于从同步上下文中运行异步代码的线程安全事件循环执行器。
AsyncExecutor.get_instance
@classmethod
def get_instance(cls) -> "AsyncExecutor"
获取或创建全局单例执行器实例。
AsyncExecutor.__init__
def __init__()
初始化一个专用的事件循环
AsyncExecutor.run
def run(coro: Coroutine[Any, Any, Any], timeout: float | None = None) -> Any
在事件循环中运行协程。
参数:
coro: 要执行的协程timeout: 可选的超时(秒)
引发:
TimeoutError: 如果执行超过超时
返回值:
协程的结果
AsyncExecutor.get_loop
def get_loop()
获取事件循环。
返回值:
事件循环
AsyncExecutor.run_background
def run_background(
coro_factory: Callable[[asyncio.Event], Coroutine[Any, Any, Any]],
timeout: float | None = None
) -> tuple[concurrent.futures.Future[Any], asyncio.Event]
计划coro_factory 在执行器的事件循环中运行,**不**阻塞
调用者线程。
工厂接收一个 :classasyncio.Event,可用于协同关闭协程。该方法返回**未来**(用于观察完成或失败)和创建的 *stop_event*,以便调用者可以发出终止信号。
参数:
coro_factory: 一个接收 stop_event 并返回要执行的协程的可调用对象。timeout: 创建 stop_event 时等待的可选超时。
返回值:
元组(future, stop_event).
AsyncExecutor.shutdown
def shutdown(timeout: float = 2) -> None
关闭后台事件循环和线程。
参数:
timeout: 关闭事件循环的超时(秒)
MCPError
MCP 相关错误的基类。
MCPError.__init__
def __init__(message: str) -> None
初始化 MCPError。
参数:
message: 描述性错误消息
MCPConnectionError
连接到 MCP 服务器时出错。
MCPConnectionError.__init__
def __init__(message: str,
server_info: "MCPServerInfo | None" = None,
operation: str | None = None) -> None
初始化 MCPConnectionError。
参数:
message: 描述性错误消息server_info: 使用过的服务器连接信息operation: 尝试的操作的名称
MCPToolNotFoundError
服务器上找不到工具时出错。
MCPToolNotFoundError.__init__
def __init__(message: str,
tool_name: str,
available_tools: list[str] | None = None) -> None
初始化 MCPToolNotFoundError。
参数:
message: 描述性错误消息tool_name: 请求但未找到的工具的名称available_tools: 可用工具名称列表(如果已知)
MCPInvocationError
工具调用期间出错。
MCPInvocationError.__init__
def __init__(message: str,
tool_name: str,
tool_args: dict[str, Any] | None = None) -> None
初始化 MCPInvocationError。
参数:
message: 描述性错误消息tool_name: 正在调用的工具的名称tool_args: 传递给工具的参数
MCPClient
MCP 客户端的抽象基类。
此类定义了所有 MCP 客户端的通用接口和共享功能,无论使用何种传输机制。
MCPClient.connect
@abstractmethod
async def connect() -> list[types.Tool]
连接到 MCP 服务器。
引发:
MCPConnectionError: 如果服务器连接失败
返回值:
服务器上可用工具的列表
MCPClient.call_tool
async def call_tool(tool_name: str, tool_args: dict[str, Any]) -> str
调用已连接 MCP 服务器上的工具。
参数:
tool_name: 要调用的工具的名称tool_args: 传递给工具的参数
引发:
MCPConnectionError: 如果未连接到 MCP 服务器MCPInvocationError: 如果工具调用失败
返回值:
工具调用结果的 JSON 字符串表示
MCPClient.aclose
async def aclose() -> None
关闭连接并清理资源。
此方法确保即使发生错误也能正确释放所有资源。
StdioClient
使用 stdio 传输连接到服务器的 MCP 客户端。
StdioClient.__init__
def __init__(command: str,
args: list[str] | None = None,
env: dict[str, str | Secret] | None = None,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0) -> None
初始化 stdio MCP 客户端。
参数:
command: 要运行的命令(例如,“python”、“node”)args: 要传递给命令的参数env: 命令的环境变量max_retries: 最大重试次数base_delay: 指数退避的基准延迟(秒)
StdioClient.connect
async def connect() -> list[types.Tool]
使用 stdio 传输连接到 MCP 服务器。
引发:
MCPConnectionError: 如果服务器连接失败
返回值:
服务器上可用工具的列表
SSEClient
使用 SSE 传输连接到服务器的 MCP 客户端。
SSEClient.__init__
def __init__(server_info: "SSEServerInfo",
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0) -> None
使用服务器配置初始化 SSE MCP 客户端。
参数:
server_info: 包含 URL、令牌、超时等的配置对象。max_retries: 最大重试次数base_delay: 指数退避的基准延迟(秒)
SSEClient.connect
async def connect() -> list[types.Tool]
使用 SSE 传输连接到 MCP 服务器。
引发:
MCPConnectionError: 如果服务器连接失败
返回值:
服务器上可用工具的列表
StreamableHttpClient
使用可流式传输的 HTTP 传输连接到服务器的 MCP 客户端。
StreamableHttpClient.__init__
def __init__(server_info: "StreamableHttpServerInfo",
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0) -> None
使用服务器配置初始化可流式传输的 HTTP MCP 客户端。
参数:
server_info: 包含 URL、令牌、超时等的配置对象。max_retries: 最大重试次数base_delay: 指数退避的基准延迟(秒)
StreamableHttpClient.connect
async def connect() -> list[types.Tool]
使用可流式传输的 HTTP 传输连接到 MCP 服务器。
引发:
MCPConnectionError: 如果服务器连接失败
返回值:
服务器上可用工具的列表
MCPServerInfo
MCP 服务器连接参数的抽象基类。
此类定义了所有 MCP 服务器连接类型的通用接口。
MCPServerInfo.create_client
@abstractmethod
def create_client() -> MCPClient
为此服务器信息创建适当的 MCP 客户端。
返回值:
已使用此服务器信息配置的 MCPClient 实例
MCPServerInfo.to_dict
def to_dict() -> dict[str, Any]
将此服务器信息序列化为字典。
返回值:
此服务器信息的字典表示
MCPServerInfo.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MCPServerInfo"
从字典反序列化服务器信息。
参数:
data: 包含序列化服务器信息的字典
返回值:
适当的服务器信息类的实例
SSEServerInfo
封装 SSE MCP 服务器连接参数的数据类。
对于包含敏感数据的身份验证令牌,您可以使用 Secret 对象进行安全处理和序列化
server_info = SSEServerInfo(
url="https://my-mcp-server.com",
token=Secret.from_env_var("API_KEY"),
)
参数:
url: MCP 服务器的完整 URL(包括 /sse 端点)base_url: MCP 服务器的基本 URL(已弃用,请改用 url)token: 服务器的身份验证令牌(可选)timeout: 连接超时(秒)
base_url
已弃用
SSEServerInfo.__post_init__
def __post_init__()
验证是否提供了 url 或 base_url。
SSEServerInfo.create_client
def create_client() -> MCPClient
创建 SSE MCP 客户端。
返回值:
已配置的 MCPClient 实例
StreamableHttpServerInfo
封装可流式传输的 HTTP MCP 服务器连接参数的数据类。
对于包含敏感数据的身份验证令牌,您可以使用 Secret 对象进行安全处理和序列化
server_info = StreamableHttpServerInfo(
url="https://my-mcp-server.com",
token=Secret.from_env_var("API_KEY"),
)
参数:
url: MCP 服务器的完整 URL(可流式传输的 HTTP 端点)token: 服务器的身份验证令牌(可选)timeout: 连接超时(秒)
StreamableHttpServerInfo.__post_init__
def __post_init__()
验证 URL。
StreamableHttpServerInfo.create_client
def create_client() -> MCPClient
创建可流式传输的 HTTP MCP 客户端。
返回值:
已配置的 StreamableHttpClient 实例
StdioServerInfo
封装 stdio MCP 服务器连接参数的数据类。
参数:
command: 要运行的命令(例如,“python”、“node”)args: 要传递给命令的参数env: 命令的环境变量。对于包含敏感数据的环境变量,您可以使用 Secret 对象进行安全处理和序列化。
server_info = StdioServerInfo(
command="uv",
args=["run", "my-mcp-server"],
env={
"WORKSPACE_PATH": "/path/to/workspace", # Plain string
"API_KEY": Secret.from_env_var("API_KEY"), # Secret object
}
)
Secret 对象将被正确序列化和反序列化,而不会暴露 secret 值,而纯字符串将按原样保留。请使用 Secret 对象处理需要安全处理的敏感数据。
StdioServerInfo.create_client
def create_client() -> MCPClient
创建 stdio MCP 客户端。
返回值:
已配置的 StdioMCPClient 实例
MCPTool
代表 MCP 服务器上单个工具的工具。
此实现使用官方 MCP SDK 进行协议处理,同时保持与 Haystack 工具生态系统的兼容性。
响应处理
- 文本和图像内容受支持,并作为 JSON 字符串返回
- JSON 包含来自 MCP 服务器的结构化响应
- 使用 json.loads() 将响应解析为字典
使用 Streamable HTTP 的示例
import json
from haystack_integrations.tools.mcp import MCPTool, StreamableHttpServerInfo
# Create tool instance
tool = MCPTool(
name="multiply",
server_info=StreamableHttpServerInfo(url="https://:8000/mcp")
)
# Use the tool and parse result
result_json = tool.invoke(a=5, b=3)
result = json.loads(result_json)
使用 SSE(已弃用)的示例
import json
from haystack.tools import MCPTool, SSEServerInfo
# Create tool instance
tool = MCPTool(
name="add",
server_info=SSEServerInfo(url="https://:8000/sse")
)
# Use the tool and parse result
result_json = tool.invoke(a=5, b=3)
result = json.loads(result_json)
使用 stdio 的示例
import json
from haystack.tools import MCPTool, StdioServerInfo
# Create tool instance
tool = MCPTool(
name="get_current_time",
server_info=StdioServerInfo(command="python", args=["path/to/server.py"])
)
# Use the tool and parse result
result_json = tool.invoke(timezone="America/New_York")
result = json.loads(result_json)
MCPTool.__init__
def __init__(name: str,
server_info: MCPServerInfo,
description: str | None = None,
connection_timeout: int = 30,
invocation_timeout: int = 30)
初始化 MCP 工具。
参数:
name: 要使用的工具的名称server_info: 服务器连接信息description: 自定义描述(如果为 None,将使用服务器描述)connection_timeout: 服务器连接超时(秒)invocation_timeout: 工具调用的默认超时(秒)
引发:
MCPConnectionError: 如果服务器连接失败MCPToolNotFoundError: 如果没有可用的工具或找不到请求的工具TimeoutError: 如果连接超时
MCPTool.ainvoke
async def ainvoke(**kwargs: Any) -> str
异步工具调用。
参数:
kwargs: 传递给工具的参数
引发:
MCPInvocationError: 如果工具调用失败TimeoutError: 如果操作超时
返回值:
工具调用结果的 JSON 字符串表示
MCPTool.to_dict
def to_dict() -> dict[str, Any]
将 MCPTool 序列化为字典。
序列化保留了重新创建工具所需的所有信息,包括服务器连接参数和超时设置。请注意,活动连接不会被保留。
返回值:
字典,格式为:{"type": fully_qualified_class_name, "data": {parameters}}
MCPTool.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Tool"
从字典反序列化 MCPTool。
此方法从序列化字典中重建 MCPTool 实例,包括重新创建 server_info 对象。将在初始化期间建立到 MCP 服务器的新连接。
参数:
data: 包含序列化工具数据的字典
引发:
None: 连接失败时可能引发的各种异常
返回值:
一个完全初始化的 MCPTool 实例
MCPTool.close
def close()
同步关闭工具。
MCPTool.__del__
def __del__()
在工具被垃圾回收时清理资源。
_MCPClientSessionManager
在 AsyncExecutor 的事件循环中运行 MCPClient 的连接/关闭。
生命周期
- 创建工作程序以在专用的后台循环中调度长时间运行的协程。
- 协程调用 mcp 客户端上的 *connect*;当它获取工具列表时,它会满足一个并发未来,以便同步线程可以继续。
- 然后它等待一个
asyncio.Event. stop()从任何线程设置事件。同一个协程然后调用 mcp 客户端上的 *close()* 并完成,而不会出现令人讨厌的Attempted to exit cancel scope in a different task than it was entered in错误,从而正确关闭客户端。
_MCPClientSessionManager.tools
def tools() -> list[types.Tool]
返回在启动期间已收集的工具列表。
_MCPClientSessionManager.stop
def stop() -> None
请求工作程序关闭并阻塞直到完成。
Module haystack_integrations.tools.mcp.mcp_toolset
MCP Toolkits
一个 Toolset,它连接到 MCP(模型上下文协议)服务器并提供对其工具的访问。
MCP Toolset 动态发现并从任何 MCP 兼容服务器加载所有工具,支持基于网络的流式连接(Streamable HTTP、SSE)和基于本地进程的 stdio 连接。这种双重连接允许与远程和本地 MCP 服务器集成。
在 Haystack Pipeline 中使用 MCP Toolset 的示例
# Prerequisites:
# 1. pip install uvx mcp-server-time # Install required MCP server and tools
# 2. export OPENAI_API_KEY="your-api-key" # Set up your OpenAI API key
import os
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.tools import ToolInvoker
from haystack.dataclasses import ChatMessage
from haystack_integrations.tools.mcp import MCPToolset, StdioServerInfo
# Create server info for the time service (can also use SSEServerInfo for remote servers)
server_info = StdioServerInfo(command="uvx", args=["mcp-server-time", "--local-timezone=Europe/Berlin"])
# Create the toolset - this will automatically discover all available tools
# You can optionally specify which tools to include
mcp_toolset = MCPToolset(
server_info=server_info,
tool_names=["get_current_time"] # Only include the get_current_time tool
)
# Create a pipeline with the toolset
pipeline = Pipeline()
pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini", tools=mcp_toolset))
pipeline.add_component("tool_invoker", ToolInvoker(tools=mcp_toolset))
pipeline.add_component(
"adapter",
OutputAdapter(
template="{{ initial_msg + initial_tool_messages + tool_messages }}",
output_type=list[ChatMessage],
unsafe=True,
),
)
pipeline.add_component("response_llm", OpenAIChatGenerator(model="gpt-4o-mini"))
pipeline.connect("llm.replies", "tool_invoker.messages")
pipeline.connect("llm.replies", "adapter.initial_tool_messages")
pipeline.connect("tool_invoker.tool_messages", "adapter.tool_messages")
pipeline.connect("adapter.output", "response_llm.messages")
# Run the pipeline with a user question
user_input = "What is the time in New York? Be brief."
user_input_msg = ChatMessage.from_user(text=user_input)
result = pipeline.run({"llm": {"messages": [user_input_msg]}, "adapter": {"initial_msg": [user_input_msg]}})
print(result["response_llm"]["replies"][0].text)
您还可以使用 Streamable HTTP 的 Toolset 来与远程服务器通信
from haystack_integrations.tools.mcp import MCPToolset, StreamableHttpServerInfo
# Create the toolset with streamable HTTP connection
toolset = MCPToolset(
server_info=StreamableHttpServerInfo(url="https://:8000/mcp"),
tool_names=["multiply"] # Optional: only include specific tools
)
# Use the toolset as shown in the pipeline example above
使用 SSE(已弃用)的示例
from haystack_integrations.tools.mcp import MCPToolset, SSEServerInfo
from haystack.components.tools import ToolInvoker
# Create the toolset with an SSE connection
sse_toolset = MCPToolset(
server_info=SSEServerInfo(url="http://some-remote-server.com:8000/sse"),
tool_names=["add", "subtract"] # Only include specific tools
)
# Use the toolset as shown in the pipeline example above
MCPToolset.__init__
def __init__(server_info: MCPServerInfo,
tool_names: list[str] | None = None,
connection_timeout: float = 30.0,
invocation_timeout: float = 30.0)
初始化 MCP toolset。
参数:
server_info: MCP 服务器的连接信息tool_names: 可选的要包含的工具名称列表。如果提供,则只有名称匹配的工具将被添加到 toolset 中。connection_timeout: 服务器连接超时(秒)invocation_timeout: 工具调用的默认超时(秒)
引发:
MCPToolNotFoundError: 如果在服务器上找不到指定的任何工具名称
MCPToolset.to_dict
def to_dict() -> dict[str, Any]
将 MCP Toolset 序列化为字典。
返回值:
MCP Toolset 的字典表示
MCPToolset.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MCPToolset"
从字典反序列化 MCP Toolset。
参数:
data: MCP Toolset 的字典表示
返回值:
一个新的 MCP Toolset 实例
MCPToolset.close
def close()
安全关闭底层的 MCP 客户端。
