LangfuseConnector
了解如何在 Haystack 中使用 Langfuse。
| pipeline 中的最常见位置 | 任意位置,因为它未与其他组件连接 |
| 必需的初始化变量 | "name": 用于标识跟踪运行的管道或组件的名称 |
| 输出变量 | “name”: 跟踪组件的名称 ”trace_url”: 指向跟踪数据的链接 |
| API 参考 | langfuse |
| GitHub 链接 | https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/langfuse |
概述
LangfuseConnector 使用 Langfuse 将跟踪功能集成到 Haystack 管道中。它会捕获管道运行的详细信息,例如 API 调用、上下文数据、提示等。使用此组件可以
- 监控模型性能,例如 token 使用量和成本。
- 通过识别低质量输出和收集用户反馈来找到改进管道的领域。
- 从管道执行中创建用于微调和测试的数据集。
要使用此集成,请将LangfuseConnector 添加到管道中,运行管道,然后在 Langfuse 网站上查看跟踪数据。不要将此组件连接到任何其他组件——LangfuseConnector 将在管道的后台运行。
在使用此组件时,您可以选择定义另外两个参数
httpx_client:一个可选的自定义httpx.Client实例,用于 Langfuse API 调用。请注意,自定义客户端在从 YAML 反序列化管道时会被丢弃,因为 HTTPX 客户端无法序列化。在这种情况下,Langfuse 会创建一个默认客户端。span_handler:一个可选的自定义处理器,用于处理 span。如果未提供,则使用DefaultSpanHandler。Span 处理器定义了 span 的创建和处理方式,从而可以根据组件类型和 span 的后处理来自定义 span 类型。有关更多详细信息,请参阅下面的高级用法部分。
先决条件
在使用 LangfuseConnector 之前,您需要以下内容
- 确保您有一个有效的 Langfuse 账户。
- 设置
HAYSTACK_CONTENT_TRACING_ENABLED环境变量为true——这将启用管道的跟踪。 - 设置
LANGFUSE_SECRET_KEY和LANGFUSE_PUBLIC_KEY环境变量,并填入您在账户个人资料中找到的 Langfuse 密钥和公钥。
安装
首先,安装 langfuse-haystack 包,以使用 LangfuseConnector:
pip install langfuse-haystack
使用注意事项
为确保正确跟踪,请始终在导入任何 Haystack 组件之前设置环境变量。这一点至关重要,因为 Haystack 在导入期间会初始化其内部跟踪组件。在下面的示例中,我们首先设置环境变量,然后导入相关的 Haystack 组件。
或者,一个更好的做法是在运行脚本之前在 shell 中设置这些环境变量。这种方法可以将配置与代码分开,并允许更轻松地管理不同的环境。
用法
在下面的示例中,我们将LangfuseConnector 作为tracer添加到管道中。每次管道运行都会生成一个 trace,其中包含整个执行上下文,包括提示、补全和元数据。
然后,您可以通过遵循输出中打印的 URL 链接来查看 trace。
import os
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack import Pipeline
from haystack_integrations.components.connectors.langfuse import LangfuseConnector
if __name__ == "__main__":
pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("prompt_builder", DynamicChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))
pipe.connect("prompt_builder.prompt", "llm.messages")
messages = [
ChatMessage.from_system("Always respond in German even if some input data is in other languages."),
ChatMessage.from_user("Tell me about {{location}}"),
]
response = pipe.run(
data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "prompt_source": messages}}
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
与 Agent 一起使用
import os
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"
from typing import Annotated
from haystack.components.agents import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.tools import tool
from haystack import Pipeline
from haystack_integrations.components.connectors.langfuse import LangfuseConnector
@tool
def get_weather(city: Annotated[str, "The city to get weather for"]) -> str:
"""Get current weather information for a city."""
weather_data = {
"Berlin": "18°C, partly cloudy",
"New York": "22°C, sunny",
"Tokyo": "25°C, clear skies"
}
return weather_data.get(city, f"Weather information for {city} not available")
@tool
def calculate(operation: Annotated[str, "Mathematical operation: add, subtract, multiply, divide"],
a: Annotated[float, "First number"],
b: Annotated[float, "Second number"]) -> str:
"""Perform basic mathematical calculations."""
if operation == "add":
result = a + b
elif operation == "subtract":
result = a - b
elif operation == "multiply":
result = a * b
elif operation == "divide":
if b == 0:
return "Error: Division by zero"
result = a / b
else:
return f"Error: Unknown operation '{operation}'"
return f"The result of {a} {operation} {b} is {result}"
if __name__ == "__main__":
# Create components
chat_generator = OpenAIChatGenerator()
agent = Agent(
chat_generator=chat_generator,
tools=[get_weather, calculate],
system_prompt="You are a helpful assistant with access to weather and calculator tools. Use them when needed.",
exit_conditions=["text"]
)
langfuse_connector = LangfuseConnector("Agent Example")
# Create and run pipeline
pipe = Pipeline()
pipe.add_component("tracer", langfuse_connector)
pipe.add_component("agent", agent)
response = pipe.run(
data={
"agent": {"messages": [ChatMessage.from_user("What's the weather in Berlin and calculate 15 + 27?")]},
"tracer": {"invocation_context": {"test": "agent_with_tools"}}
}
)
print(response["agent"]["last_message"].text)
print(response["tracer"]["trace_url"])
高级用法
使用 SpanHandler 自定义 Langfuse Traces
该SpanHandler 接口在 Haystack 中允许您自定义如何为 Langfuse trace 创建和处理 span。这使您可以记录自定义指标、添加标签或集成元数据。
通过扩展SpanHandler 或其默认实现DefaultSpanHandler,您可以定义自定义的 span 处理逻辑,从而精确控制记录到 Langfuse 中的数据,以跟踪和分析管道执行。
这是一个示例
from haystack_integrations.tracing.langfuse import LangfuseConnector, DefaultSpanHandler, LangfuseSpan
from typing import Optional
class CustomSpanHandler(DefaultSpanHandler):
def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None:
# Custom logic to add metadata or modify span
if component_type == "OpenAIChatGenerator":
output = span._data.get("haystack.component.output", {})
if len(output.get("text", "")) < 10:
span._span.update(level="WARNING", status_message="Response too short")
# Add the custom handler to the LangfuseConnector
connector = LangfuseConnector(span_handler=CustomSpanHandler())
更新于 3 个月前
