From 89f96b34ef69da566e6cd03b575f7b45f0f1c142 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Thu, 25 Jun 2026 14:03:47 +0200 Subject: [PATCH] fix(tracing): apply environments to processor spans --- langfuse/_client/resource_manager.py | 1 + langfuse/_client/span_processor.py | 34 +++++++++- tests/unit/test_span_processor.py | 94 +++++++++++++++++++++++++++- 3 files changed, 126 insertions(+), 3 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 14e746b86..9be1ecdd7 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -294,6 +294,7 @@ def _initialize_instance( span_exporter=span_exporter, media_manager=self._media_manager, mask_otel_spans=mask_otel_spans, + environment=environment, ) tracer_provider.add_span_processor(langfuse_processor) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index bfebf7f87..3a37ef33b 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -14,7 +14,7 @@ import base64 import os import threading -from typing import Callable, Dict, List, Optional, cast +from typing import Any, Callable, Dict, List, Optional, cast from opentelemetry import context as context_api from opentelemetry.context import Context @@ -35,7 +35,7 @@ ) from langfuse._client.span_exporter import LangfuseTransformingSpanExporter from langfuse._client.span_filter import is_default_export_span, is_langfuse_span -from langfuse._client.utils import span_formatter +from langfuse._client.utils import get_string_span_attribute, span_formatter from langfuse._task_manager.media_manager import MediaManager from langfuse._version import __version__ as langfuse_version from langfuse.logger import langfuse_logger @@ -74,8 +74,10 @@ def __init__( span_exporter: Optional[SpanExporter] = None, media_manager: Optional[MediaManager] = None, mask_otel_spans: Optional[MaskOtelSpansFunction] = None, + environment: Optional[str] = None, ): self.public_key = public_key + self._environment = environment self.blocked_instrumentation_scopes = ( blocked_instrumentation_scopes if blocked_instrumentation_scopes is not None @@ -143,6 +145,7 @@ def __init__( def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: context = parent_context or context_api.get_current() propagated_attributes = _get_propagated_attributes_from_context(context) + self._apply_default_environment(span=span, attributes=propagated_attributes) if propagated_attributes: span.set_attributes(propagated_attributes) @@ -162,6 +165,33 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None return super().on_start(span, parent_context) + def _apply_default_environment( + self, *, span: Span, attributes: Dict[str, Any] + ) -> None: + """Apply the processor environment to spans without an explicit environment. + + Langfuse-created wrapper spans set ``langfuse.environment`` themselves, and + ``propagate_attributes(environment=...)`` adds it to the active context for + request-scoped overrides. Third-party OpenTelemetry spans only pass through + this processor, so they need the client-level environment applied here when + neither of those more specific sources is present. + """ + + if LangfuseOtelSpanAttributes.ENVIRONMENT in attributes: + return + + environment = getattr(self, "_environment", None) + if environment is None: + return + + if ( + get_string_span_attribute(span, LangfuseOtelSpanAttributes.ENVIRONMENT) + is not None + ): + return + + attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] = environment + def on_end(self, span: ReadableSpan) -> None: try: # Only export spans that belong to the scoped project diff --git a/tests/unit/test_span_processor.py b/tests/unit/test_span_processor.py index 5d12813a0..240e7db30 100644 --- a/tests/unit/test_span_processor.py +++ b/tests/unit/test_span_processor.py @@ -1,8 +1,11 @@ from typing import Sequence -from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from langfuse import propagate_attributes +from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.environment_variables import ( LANGFUSE_FLUSH_AT, LANGFUSE_FLUSH_INTERVAL, @@ -18,6 +21,68 @@ def shutdown(self) -> None: pass +class InMemorySpanExporter(SpanExporter): + def __init__(self) -> None: + self.spans: list[ReadableSpan] = [] + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + self.spans.extend(spans) + + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + pass + + +def _export_third_party_span( + *, + processor_environment: str, + span_attributes: dict[str, str] | None = None, + resource_attributes: dict[str, str] | None = None, + propagated_environment: str | None = None, +) -> ReadableSpan: + exporter = InMemorySpanExporter() + provider = TracerProvider( + resource=Resource.create( + {"service.name": "test", **(resource_attributes or {})} + ) + ) + processor = LangfuseSpanProcessor( + public_key="pk-test", + secret_key="sk-test", + base_url="http://localhost:3000", + flush_at=10, + flush_interval=1, + span_exporter=exporter, + environment=processor_environment, + should_export_span=lambda span: True, + ) + provider.add_span_processor(processor) + + try: + tracer = provider.get_tracer("third-party.instrumentation", "1.0.0") + + if propagated_environment is None: + with tracer.start_as_current_span( + "third-party-span", attributes=span_attributes + ): + pass + else: + with propagate_attributes(environment=propagated_environment): + with tracer.start_as_current_span( + "third-party-span", attributes=span_attributes + ): + pass + + provider.force_flush() + + assert len(exporter.spans) == 1 + + return exporter.spans[0] + finally: + provider.shutdown() + + def test_span_processor_uses_constructor_flush_settings_without_env(monkeypatch): monkeypatch.delenv(LANGFUSE_FLUSH_AT, raising=False) monkeypatch.delenv(LANGFUSE_FLUSH_INTERVAL, raising=False) @@ -37,6 +102,33 @@ def test_span_processor_uses_constructor_flush_settings_without_env(monkeypatch) processor.shutdown() +def test_span_processor_applies_environment_to_third_party_spans(): + span = _export_third_party_span(processor_environment="proxy-prod") + + assert span.attributes is not None + assert span.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] == "proxy-prod" + + +def test_span_processor_prefers_propagated_environment_for_third_party_spans(): + span = _export_third_party_span( + processor_environment="proxy-prod", + propagated_environment="staging", + ) + + assert span.attributes is not None + assert span.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] == "staging" + + +def test_span_processor_preserves_explicit_third_party_span_environment(): + span = _export_third_party_span( + processor_environment="proxy-prod", + span_attributes={LangfuseOtelSpanAttributes.ENVIRONMENT: "span-env"}, + ) + + assert span.attributes is not None + assert span.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] == "span-env" + + def test_span_processor_uses_env_flush_settings_when_constructor_omits_them( monkeypatch, ):