文档API 参考📓 教程🧑‍🍳 食谱🤝 集成💜 Discord🎨 Studio
API 参考

langfuse

Haystack 的 Langfuse 集成

模块 haystack_integrations.components.connectors.langfuse.langfuse_connector

LangfuseConnector

LangfuseConnector 将 Haystack LLM 框架与 Langfuse 连接起来,以便能够跟踪管道中各种组件的操作和数据流。

要使用 LangfuseConnector,请将其添加到您的管道中,而无需将其连接到任何其他组件。在启用跟踪的情况下,它将自动跟踪所有管道操作。

环境配置

  • LANGFUSE_SECRET_KEYLANGFUSE_PUBLIC_KEY:必需的 Langfuse API 凭证。
  • HAYSTACK_CONTENT_TRACING_ENABLED:必须设置为"true" 以启用跟踪。
  • HAYSTACK_LANGFUSE_ENFORCE_FLUSH:(可选)如果设置为"false",则禁用每次组件后刷新。请注意:这可能导致崩溃时数据丢失,除非您在关闭前手动刷新。默认情况下,数据在每个组件后都会被刷新,并阻塞线程直到数据发送到 Langfuse。

如果您禁用每次组件后刷新,请确保在程序退出前显式调用 langfuse.flush()。例如:

from haystack.tracing import tracer

try:
    # your code here
finally:
    tracer.actual_tracer.flush()

或在 FastAPI 中通过定义关机事件处理程序

from haystack.tracing import tracer

# ...

@app.on_event("shutdown")
async def shutdown_event():
    tracer.actual_tracer.flush()

以下是如何在管道中使用 LangfuseConnector 的示例:

import os

os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.connectors.langfuse import (
    LangfuseConnector,
)

pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini"))

pipe.connect("prompt_builder.prompt", "llm.messages")

messages = [
    ChatMessage.from_system(
        "Always respond in German even if some input data is in other languages."
    ),
    ChatMessage.from_user("Tell me about {{location}}"),
]

response = pipe.run(
    data={
        "prompt_builder": {
            "template_variables": {"location": "Berlin"},
            "template": messages,
        }
    }
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
print(response["tracer"]["trace_id"])

对于更高级的用例,您还可以通过提供自定义 SpanHandler 来定制 span 的创建和处理方式。这允许您添加自定义指标、设置警告级别或将其他元数据附加到您的 Langfuse 跟踪中。

from haystack_integrations.tracing.langfuse import DefaultSpanHandler, LangfuseSpan
from typing import Optional

class CustomSpanHandler(DefaultSpanHandler):

    def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None:
        # Custom span handling logic, customize Langfuse spans however it fits you
        # see DefaultSpanHandler for how we create and process spans by default
        pass

connector = LangfuseConnector(span_handler=CustomSpanHandler())

LangfuseConnector.__init__

def __init__(name: str,
             public: bool = False,
             public_key: Optional[Secret] = Secret.from_env_var(
                 "LANGFUSE_PUBLIC_KEY"),
             secret_key: Optional[Secret] = Secret.from_env_var(
                 "LANGFUSE_SECRET_KEY"),
             httpx_client: Optional[httpx.Client] = None,
             span_handler: Optional[SpanHandler] = None,
             *,
             host: Optional[str] = None,
             langfuse_client_kwargs: Optional[Dict[str, Any]] = None) -> None

初始化 LangfuseConnector 组件。

参数:

  • name:跟踪的名称。此名称将用于在 Langfuse 仪表板中标识跟踪运行。
  • public:跟踪数据应该是公开的还是私有的。如果设置为True,则拥有跟踪 URL 的任何人都可以公开访问跟踪数据。如果设置为False,则跟踪数据将是私有的,并且只有 Langfuse 账户所有者才能访问。默认为False.
  • public_key:Langfuse 公共密钥。默认为从 LANGFUSE_PUBLIC_KEY 环境变量读取。
  • secret_key:Langfuse 密钥。默认为从 LANGFUSE_SECRET_KEY 环境变量读取。
  • httpx_client:用于 Langfuse API 调用的可选自定义 httpx.Client 实例。请注意,当从 YAML 反序列化管道时,任何自定义客户端都会被丢弃,Langfuse 将创建自己的默认客户端,因为 HTTPX 客户端无法序列化。
  • span_handler:用于处理 span 的可选自定义处理程序。如果为 None,则使用 DefaultSpanHandler。span 处理程序控制 span 的创建和处理方式,允许基于组件类型定制 span 类型,并在 span 产生后进行附加处理。有关实现自定义处理程序的详细信息,请参阅 SpanHandler 类。host:Langfuse API 的主机。也可以通过LANGFUSE_HOST 环境变量设置。默认设置为https://cloud.langfuse.com.
  • langfuse_client_kwargs:Langfuse 客户端的可选自定义配置。这是一个包含 Langfuse 客户端任何附加配置选项的字典。有关可用配置选项的更多详细信息,请参阅 Langfuse 文档。

LangfuseConnector.run

@component.output_types(name=str, trace_url=str, trace_id=str)
def run(invocation_context: Optional[Dict[str, Any]] = None) -> Dict[str, str]

运行 LangfuseConnector 组件。

参数:

  • invocation_context:一个包含调用附加上下文的字典。当用户希望使用附加信息标记此特定调用时,此参数非常有用,例如来自他们自己的执行框架的 run ID、用户 ID 等。这些键值对随后可在 Langfuse 跟踪中可见。

返回值:

包含以下键的字典

  • name:跟踪组件的名称。
  • trace_url:跟踪数据的 URL。
  • trace_id:跟踪的 ID。

LangfuseConnector.to_dict

def to_dict() -> Dict[str, Any]

将此组件序列化为字典。

返回值:

序列化后的组件(字典格式)。

LangfuseConnector.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "LangfuseConnector"

从字典反序列化此组件。

参数:

  • data:此组件的字典表示。

返回值:

反序列化的组件实例。

模块 haystack_integrations.tracing.langfuse.tracer

LangfuseSpan

代表 Haystack span 跟踪 API 与 Langfuse 之间桥梁的内部类。

LangfuseSpan.__init__

def __init__(context_manager: AbstractContextManager) -> None

初始化 LangfuseSpan 实例。

参数:

  • context_manager:Langfuse 的上下文管理器,使用langfuse.get_client().start_as_current_spanlangfuse.get_client().start_as_current_observation.

LangfuseSpan.set_tag

def set_tag(key: str, value: Any) -> None

为此 span 设置通用标签。

参数:

  • key:标签键。
  • value:标签值。

LangfuseSpan.set_content_tag

def set_content_tag(key: str, value: Any) -> None

为此 span 设置内容特定的标签。

参数:

  • key:内容标签键。
  • value:内容标签值。

LangfuseSpan.raw_span

def raw_span() -> LangfuseClientSpan

返回底层 span 实例。

返回值:

Langfuse span 实例。

LangfuseSpan.get_data

def get_data() -> Dict[str, Any]

返回与 span 相关联的数据。

返回值:

与 span 相关联的数据。

SpanContext

用于在 Langfuse 中创建 span 的上下文。

封装了在 Langfuse 跟踪中创建和配置 span 所需的信息。SpanHandler 使用它来确定 span 的类型(trace、generation 或 default)及其配置。

参数:

  • name:要创建的 span 的名称。对于组件,这通常是组件名称。
  • operation_name:要跟踪的操作(例如,“haystack.pipeline.run”)。用于确定是否应创建新跟踪而不发出警告。
  • component_type:创建 span 的组件类型(例如,“OpenAIChatGenerator”)。可用于确定要创建的 span 类型。
  • tags:要附加到 span 的附加元数据。包含组件输入/输出数据和其他跟踪信息。
  • parent_span:如果这是子 span,则为父 span。如果为 None,将创建一个新的跟踪。
  • trace_name:创建父 span 时要用于跟踪的名称。默认为“Haystack”。
  • public:跟踪是否应公开访问。默认为 False。

SpanContext.__post_init__

def __post_init__() -> None

验证 span 上下文属性。

引发:

  • ValueError:如果 name、operation_name 或 trace_name 为空
  • TypeError:如果 tags 不是字典

SpanHandler

用于自定义 Langfuse span 创建和处理方式的抽象基类。

此类定义了两个关键的扩展点:

  1. create_span:控制创建哪种类型的 span(默认或 generation)。
  2. handle:在组件执行后处理 span(添加元数据、指标等)。

要实现自定义处理程序:

  • 扩展此类或 DefaultSpanHandler。
  • 重写 create_span 和 handle 方法。重写 handle 更为常见。
  • 将您的处理程序传递给 LangfuseConnector init 方法。

SpanHandler.init_tracer

def init_tracer(tracer: langfuse.Langfuse) -> None

使用 Langfuse tracer 进行初始化。由 LangfuseTracer 内部调用。

参数:

  • tracer:用于创建 span 的 Langfuse 客户端实例。

SpanHandler.create_span

@abstractmethod
def create_span(context: SpanContext) -> LangfuseSpan

根据上下文创建适当类型的 span。

此方法决定要创建哪种类型的 span:

  • 如果没有父 span,则创建一个新的跟踪。
  • 为 LLM 组件创建一个 generation span。
  • 为其他组件创建一个默认 span。

参数:

  • context:包含创建 span 所需所有信息的上下文。

返回值:

根据上下文配置的新 LangfuseSpan 实例。

SpanHandler.handle

@abstractmethod
def handle(span: LangfuseSpan, component_type: Optional[str]) -> None

在组件执行后处理 span,通过附加元数据和指标。

在组件或管道产生其 span 后调用此方法,允许您:

  • 提取并附加 token 使用统计信息。
  • 添加模型信息。
  • 记录时间数据(例如,首个 token 时间)。
  • 设置日志级别以进行质量监控。
  • 添加自定义指标和观测。

参数:

  • span:由组件产生的 span。
  • component_type:创建 span 的组件类型,用于确定要提取什么元数据以及如何处理它。

DefaultSpanHandler

DefaultSpanHandler 为 Haystack 提供默认的 Langfuse 跟踪行为。

LangfuseTracer

代表 Haystack tracer 与 Langfuse 之间桥梁的内部类。

LangfuseTracer.__init__

def __init__(tracer: langfuse.Langfuse,
             name: str = "Haystack",
             public: bool = False,
             span_handler: Optional[SpanHandler] = None) -> None

初始化 LangfuseTracer 实例。

参数:

  • tracer:Langfuse tracer 实例。
  • name:管道或组件的名称。此名称将用于在 Langfuse 仪表板中标识跟踪运行。
  • public:跟踪数据应该是公开的还是私有的。如果设置为True,则拥有跟踪 URL 的任何人都可以公开访问跟踪数据。如果设置为False,跟踪数据将是私有的,并且只有 Langfuse 账户所有者才能访问。
  • span_handler:用于处理 span 的自定义处理程序。如果为 None,则使用 DefaultSpanHandler。

LangfuseTracer.current_span

def current_span() -> Optional[Span]

返回当前活动的 span。

返回值:

如果可用,则返回当前 span,否则返回 None。

LangfuseTracer.get_trace_url

def get_trace_url() -> str

返回跟踪数据的 URL。

返回值:

跟踪数据的 URL。

LangfuseTracer.get_trace_id

def get_trace_id() -> str

返回跟踪 ID。

返回值:

跟踪 ID。