文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
文档

LangfuseConnector

了解如何在 Haystack 中使用 Langfuse。

pipeline 中的最常见位置任意位置,因为它未与其他组件连接
必需的初始化变量"name": 用于标识跟踪运行的管道或组件的名称
输出变量“name”: 跟踪组件的名称

”trace_url”: 指向跟踪数据的链接
API 参考langfuse
GitHub 链接https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/langfuse

概述

LangfuseConnector 使用 Langfuse 将跟踪功能集成到 Haystack 管道中。它会捕获管道运行的详细信息,例如 API 调用、上下文数据、提示等。使用此组件可以

  • 监控模型性能,例如 token 使用量和成本。
  • 通过识别低质量输出和收集用户反馈来找到改进管道的领域。
  • 从管道执行中创建用于微调和测试的数据集。

要使用此集成,请将LangfuseConnector 添加到管道中,运行管道,然后在 Langfuse 网站上查看跟踪数据。不要将此组件连接到任何其他组件——LangfuseConnector 将在管道的后台运行。

在使用此组件时,您可以选择定义另外两个参数

  • httpx_client:一个可选的自定义httpx.Client 实例,用于 Langfuse API 调用。请注意,自定义客户端在从 YAML 反序列化管道时会被丢弃,因为 HTTPX 客户端无法序列化。在这种情况下,Langfuse 会创建一个默认客户端。
  • span_handler:一个可选的自定义处理器,用于处理 span。如果未提供,则使用DefaultSpanHandler。Span 处理器定义了 span 的创建和处理方式,从而可以根据组件类型和 span 的后处理来自定义 span 类型。有关更多详细信息,请参阅下面的高级用法部分。

先决条件

在使用 LangfuseConnector 之前,您需要以下内容

  1. 确保您有一个有效的 Langfuse 账户
  2. 设置HAYSTACK_CONTENT_TRACING_ENABLED 环境变量为true——这将启用管道的跟踪。
  3. 设置LANGFUSE_SECRET_KEYLANGFUSE_PUBLIC_KEY 环境变量,并填入您在账户个人资料中找到的 Langfuse 密钥和公钥。

安装

首先,安装 langfuse-haystack 包,以使用 LangfuseConnector:

pip install langfuse-haystack

📘

使用注意事项

为确保正确跟踪,请始终在导入任何 Haystack 组件之前设置环境变量。这一点至关重要,因为 Haystack 在导入期间会初始化其内部跟踪组件。在下面的示例中,我们首先设置环境变量,然后导入相关的 Haystack 组件。

或者,一个更好的做法是在运行脚本之前在 shell 中设置这些环境变量。这种方法可以将配置与代码分开,并允许更轻松地管理不同的环境。

用法

在下面的示例中,我们将LangfuseConnector 作为tracer添加到管道中。每次管道运行都会生成一个 trace,其中包含整个执行上下文,包括提示、补全和元数据。

然后,您可以通过遵循输出中打印的 URL 链接来查看 trace。

import os

os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack import Pipeline

from haystack_integrations.components.connectors.langfuse import LangfuseConnector

if __name__ == "__main__":
    pipe = Pipeline()
    pipe.add_component("tracer", LangfuseConnector("Chat example"))
    pipe.add_component("prompt_builder", DynamicChatPromptBuilder())
    pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))

    pipe.connect("prompt_builder.prompt", "llm.messages")

    messages = [
        ChatMessage.from_system("Always respond in German even if some input data is in other languages."),
        ChatMessage.from_user("Tell me about {{location}}"),
    ]

    response = pipe.run(
        data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "prompt_source": messages}}
    )
    print(response["llm"]["replies"][0])
    print(response["tracer"]["trace_url"])

与 Agent 一起使用

import os

os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from typing import Annotated

from haystack.components.agents import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.tools import tool
from haystack import Pipeline

from haystack_integrations.components.connectors.langfuse import LangfuseConnector


@tool
def get_weather(city: Annotated[str, "The city to get weather for"]) -> str:
"""Get current weather information for a city."""
weather_data = {
  "Berlin": "18°C, partly cloudy",
  "New York": "22°C, sunny",
  "Tokyo": "25°C, clear skies"
}
return weather_data.get(city, f"Weather information for {city} not available")

@tool
def calculate(operation: Annotated[str, "Mathematical operation: add, subtract, multiply, divide"], 
          a: Annotated[float, "First number"], 
          b: Annotated[float, "Second number"]) -> str:
"""Perform basic mathematical calculations."""
if operation == "add":
  result = a + b
  elif operation == "subtract":
  result = a - b
  elif operation == "multiply":
  result = a * b
  elif operation == "divide":
  if b == 0:
      return "Error: Division by zero"
      result = a / b
  else:
  return f"Error: Unknown operation '{operation}'"

return f"The result of {a} {operation} {b} is {result}"


if __name__ == "__main__":
# Create components
chat_generator = OpenAIChatGenerator()

agent = Agent(
  chat_generator=chat_generator,
  tools=[get_weather, calculate],
  system_prompt="You are a helpful assistant with access to weather and calculator tools. Use them when needed.",
  exit_conditions=["text"]
)

langfuse_connector = LangfuseConnector("Agent Example")

# Create and run pipeline
pipe = Pipeline()
pipe.add_component("tracer", langfuse_connector)
pipe.add_component("agent", agent)

response = pipe.run(
  data={
      "agent": {"messages": [ChatMessage.from_user("What's the weather in Berlin and calculate 15 + 27?")]},
      "tracer": {"invocation_context": {"test": "agent_with_tools"}}
    }
)

print(response["agent"]["last_message"].text)
print(response["tracer"]["trace_url"])

高级用法

使用 SpanHandler 自定义 Langfuse Traces

SpanHandler 接口在 Haystack 中允许您自定义如何为 Langfuse trace 创建和处理 span。这使您可以记录自定义指标、添加标签或集成元数据。

通过扩展SpanHandler 或其默认实现DefaultSpanHandler,您可以定义自定义的 span 处理逻辑,从而精确控制记录到 Langfuse 中的数据,以跟踪和分析管道执行。

这是一个示例

from haystack_integrations.tracing.langfuse import LangfuseConnector, DefaultSpanHandler, LangfuseSpan
from typing import Optional

class CustomSpanHandler(DefaultSpanHandler):
    def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None:
        # Custom logic to add metadata or modify span
        if component_type == "OpenAIChatGenerator":
            output = span._data.get("haystack.component.output", {})
            if len(output.get("text", "")) < 10:
                span._span.update(level="WARNING", status_message="Response too short")

# Add the custom handler to the LangfuseConnector
connector = LangfuseConnector(span_handler=CustomSpanHandler())

相关链接

查看 GitHub 仓库中的 API 参考