Haystack 的 Langfuse 集成
模块 haystack_integrations.components.connectors.langfuse.langfuse_connector
LangfuseConnector
LangfuseConnector 将 Haystack LLM 框架与 Langfuse 连接起来,以便能够跟踪管道中各种组件的操作和数据流。
要使用 LangfuseConnector,请将其添加到您的管道中,而无需将其连接到任何其他组件。在启用跟踪的情况下,它将自动跟踪所有管道操作。
环境配置
LANGFUSE_SECRET_KEY和LANGFUSE_PUBLIC_KEY:必需的 Langfuse API 凭证。HAYSTACK_CONTENT_TRACING_ENABLED:必须设置为"true"以启用跟踪。HAYSTACK_LANGFUSE_ENFORCE_FLUSH:(可选)如果设置为"false",则禁用每次组件后刷新。请注意:这可能导致崩溃时数据丢失,除非您在关闭前手动刷新。默认情况下,数据在每个组件后都会被刷新,并阻塞线程直到数据发送到 Langfuse。
如果您禁用每次组件后刷新,请确保在程序退出前显式调用 langfuse.flush()。例如:
from haystack.tracing import tracer
try:
# your code here
finally:
tracer.actual_tracer.flush()
或在 FastAPI 中通过定义关机事件处理程序
from haystack.tracing import tracer
# ...
@app.on_event("shutdown")
async def shutdown_event():
tracer.actual_tracer.flush()
以下是如何在管道中使用 LangfuseConnector 的示例:
import os
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.connectors.langfuse import (
LangfuseConnector,
)
pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.connect("prompt_builder.prompt", "llm.messages")
messages = [
ChatMessage.from_system(
"Always respond in German even if some input data is in other languages."
),
ChatMessage.from_user("Tell me about {{location}}"),
]
response = pipe.run(
data={
"prompt_builder": {
"template_variables": {"location": "Berlin"},
"template": messages,
}
}
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
print(response["tracer"]["trace_id"])
对于更高级的用例,您还可以通过提供自定义 SpanHandler 来定制 span 的创建和处理方式。这允许您添加自定义指标、设置警告级别或将其他元数据附加到您的 Langfuse 跟踪中。
from haystack_integrations.tracing.langfuse import DefaultSpanHandler, LangfuseSpan
from typing import Optional
class CustomSpanHandler(DefaultSpanHandler):
def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None:
# Custom span handling logic, customize Langfuse spans however it fits you
# see DefaultSpanHandler for how we create and process spans by default
pass
connector = LangfuseConnector(span_handler=CustomSpanHandler())
LangfuseConnector.__init__
def __init__(name: str,
public: bool = False,
public_key: Optional[Secret] = Secret.from_env_var(
"LANGFUSE_PUBLIC_KEY"),
secret_key: Optional[Secret] = Secret.from_env_var(
"LANGFUSE_SECRET_KEY"),
httpx_client: Optional[httpx.Client] = None,
span_handler: Optional[SpanHandler] = None,
*,
host: Optional[str] = None,
langfuse_client_kwargs: Optional[Dict[str, Any]] = None) -> None
初始化 LangfuseConnector 组件。
参数:
name:跟踪的名称。此名称将用于在 Langfuse 仪表板中标识跟踪运行。public:跟踪数据应该是公开的还是私有的。如果设置为True,则拥有跟踪 URL 的任何人都可以公开访问跟踪数据。如果设置为False,则跟踪数据将是私有的,并且只有 Langfuse 账户所有者才能访问。默认为False.public_key:Langfuse 公共密钥。默认为从 LANGFUSE_PUBLIC_KEY 环境变量读取。secret_key:Langfuse 密钥。默认为从 LANGFUSE_SECRET_KEY 环境变量读取。httpx_client:用于 Langfuse API 调用的可选自定义 httpx.Client 实例。请注意,当从 YAML 反序列化管道时,任何自定义客户端都会被丢弃,Langfuse 将创建自己的默认客户端,因为 HTTPX 客户端无法序列化。span_handler:用于处理 span 的可选自定义处理程序。如果为 None,则使用 DefaultSpanHandler。span 处理程序控制 span 的创建和处理方式,允许基于组件类型定制 span 类型,并在 span 产生后进行附加处理。有关实现自定义处理程序的详细信息,请参阅 SpanHandler 类。host:Langfuse API 的主机。也可以通过LANGFUSE_HOST环境变量设置。默认设置为https://cloud.langfuse.com.langfuse_client_kwargs:Langfuse 客户端的可选自定义配置。这是一个包含 Langfuse 客户端任何附加配置选项的字典。有关可用配置选项的更多详细信息,请参阅 Langfuse 文档。
LangfuseConnector.run
@component.output_types(name=str, trace_url=str, trace_id=str)
def run(invocation_context: Optional[Dict[str, Any]] = None) -> Dict[str, str]
运行 LangfuseConnector 组件。
参数:
invocation_context:一个包含调用附加上下文的字典。当用户希望使用附加信息标记此特定调用时,此参数非常有用,例如来自他们自己的执行框架的 run ID、用户 ID 等。这些键值对随后可在 Langfuse 跟踪中可见。
返回值:
包含以下键的字典
name:跟踪组件的名称。trace_url:跟踪数据的 URL。trace_id:跟踪的 ID。
LangfuseConnector.to_dict
def to_dict() -> Dict[str, Any]
将此组件序列化为字典。
返回值:
序列化后的组件(字典格式)。
LangfuseConnector.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "LangfuseConnector"
从字典反序列化此组件。
参数:
data:此组件的字典表示。
返回值:
反序列化的组件实例。
模块 haystack_integrations.tracing.langfuse.tracer
LangfuseSpan
代表 Haystack span 跟踪 API 与 Langfuse 之间桥梁的内部类。
LangfuseSpan.__init__
def __init__(context_manager: AbstractContextManager) -> None
初始化 LangfuseSpan 实例。
参数:
context_manager:Langfuse 的上下文管理器,使用langfuse.get_client().start_as_current_span或langfuse.get_client().start_as_current_observation.
LangfuseSpan.set_tag
def set_tag(key: str, value: Any) -> None
为此 span 设置通用标签。
参数:
key:标签键。value:标签值。
LangfuseSpan.set_content_tag
def set_content_tag(key: str, value: Any) -> None
为此 span 设置内容特定的标签。
参数:
key:内容标签键。value:内容标签值。
LangfuseSpan.raw_span
def raw_span() -> LangfuseClientSpan
返回底层 span 实例。
返回值:
Langfuse span 实例。
LangfuseSpan.get_data
def get_data() -> Dict[str, Any]
返回与 span 相关联的数据。
返回值:
与 span 相关联的数据。
SpanContext
用于在 Langfuse 中创建 span 的上下文。
封装了在 Langfuse 跟踪中创建和配置 span 所需的信息。SpanHandler 使用它来确定 span 的类型(trace、generation 或 default)及其配置。
参数:
name:要创建的 span 的名称。对于组件,这通常是组件名称。operation_name:要跟踪的操作(例如,“haystack.pipeline.run”)。用于确定是否应创建新跟踪而不发出警告。component_type:创建 span 的组件类型(例如,“OpenAIChatGenerator”)。可用于确定要创建的 span 类型。tags:要附加到 span 的附加元数据。包含组件输入/输出数据和其他跟踪信息。parent_span:如果这是子 span,则为父 span。如果为 None,将创建一个新的跟踪。trace_name:创建父 span 时要用于跟踪的名称。默认为“Haystack”。public:跟踪是否应公开访问。默认为 False。
SpanContext.__post_init__
def __post_init__() -> None
验证 span 上下文属性。
引发:
ValueError:如果 name、operation_name 或 trace_name 为空TypeError:如果 tags 不是字典
SpanHandler
用于自定义 Langfuse span 创建和处理方式的抽象基类。
此类定义了两个关键的扩展点:
- create_span:控制创建哪种类型的 span(默认或 generation)。
- handle:在组件执行后处理 span(添加元数据、指标等)。
要实现自定义处理程序:
- 扩展此类或 DefaultSpanHandler。
- 重写 create_span 和 handle 方法。重写 handle 更为常见。
- 将您的处理程序传递给 LangfuseConnector init 方法。
SpanHandler.init_tracer
def init_tracer(tracer: langfuse.Langfuse) -> None
使用 Langfuse tracer 进行初始化。由 LangfuseTracer 内部调用。
参数:
tracer:用于创建 span 的 Langfuse 客户端实例。
SpanHandler.create_span
@abstractmethod
def create_span(context: SpanContext) -> LangfuseSpan
根据上下文创建适当类型的 span。
此方法决定要创建哪种类型的 span:
- 如果没有父 span,则创建一个新的跟踪。
- 为 LLM 组件创建一个 generation span。
- 为其他组件创建一个默认 span。
参数:
context:包含创建 span 所需所有信息的上下文。
返回值:
根据上下文配置的新 LangfuseSpan 实例。
SpanHandler.handle
@abstractmethod
def handle(span: LangfuseSpan, component_type: Optional[str]) -> None
在组件执行后处理 span,通过附加元数据和指标。
在组件或管道产生其 span 后调用此方法,允许您:
- 提取并附加 token 使用统计信息。
- 添加模型信息。
- 记录时间数据(例如,首个 token 时间)。
- 设置日志级别以进行质量监控。
- 添加自定义指标和观测。
参数:
span:由组件产生的 span。component_type:创建 span 的组件类型,用于确定要提取什么元数据以及如何处理它。
DefaultSpanHandler
DefaultSpanHandler 为 Haystack 提供默认的 Langfuse 跟踪行为。
LangfuseTracer
代表 Haystack tracer 与 Langfuse 之间桥梁的内部类。
LangfuseTracer.__init__
def __init__(tracer: langfuse.Langfuse,
name: str = "Haystack",
public: bool = False,
span_handler: Optional[SpanHandler] = None) -> None
初始化 LangfuseTracer 实例。
参数:
tracer:Langfuse tracer 实例。name:管道或组件的名称。此名称将用于在 Langfuse 仪表板中标识跟踪运行。public:跟踪数据应该是公开的还是私有的。如果设置为True,则拥有跟踪 URL 的任何人都可以公开访问跟踪数据。如果设置为False,跟踪数据将是私有的,并且只有 Langfuse 账户所有者才能访问。span_handler:用于处理 span 的自定义处理程序。如果为 None,则使用 DefaultSpanHandler。
LangfuseTracer.current_span
def current_span() -> Optional[Span]
返回当前活动的 span。
返回值:
如果可用,则返回当前 span,否则返回 None。
LangfuseTracer.get_trace_url
def get_trace_url() -> str
返回跟踪数据的 URL。
返回值:
跟踪数据的 URL。
LangfuseTracer.get_trace_id
def get_trace_id() -> str
返回跟踪 ID。
返回值:
跟踪 ID。
