| Viewing file:  span_processor.py (9.7 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from datetime import datetime
 from opentelemetry.context import get_value  # type: ignore
 from opentelemetry.sdk.trace import SpanProcessor  # type: ignore
 from opentelemetry.semconv.trace import SpanAttributes  # type: ignore
 from opentelemetry.trace import (  # type: ignore
 format_span_id,
 format_trace_id,
 get_current_span,
 SpanContext,
 Span as OTelSpan,
 SpanKind,
 )
 from opentelemetry.trace.span import (  # type: ignore
 INVALID_SPAN_ID,
 INVALID_TRACE_ID,
 )
 from sentry_sdk.consts import INSTRUMENTER
 from sentry_sdk.hub import Hub
 from sentry_sdk.integrations.opentelemetry.consts import (
 SENTRY_BAGGAGE_KEY,
 SENTRY_TRACE_KEY,
 )
 from sentry_sdk.scope import add_global_event_processor
 from sentry_sdk.tracing import Transaction, Span as SentrySpan
 from sentry_sdk.utils import Dsn
 from sentry_sdk._types import TYPE_CHECKING
 
 from urllib3.util import parse_url as urlparse
 
 if TYPE_CHECKING:
 from typing import Any, Dict, Optional, Union
 
 from sentry_sdk._types import Event, Hint
 
 OPEN_TELEMETRY_CONTEXT = "otel"
 
 
 def link_trace_context_to_error_event(event, otel_span_map):
 # type: (Event, Dict[str, Union[Transaction, SentrySpan]]) -> Event
 hub = Hub.current
 if not hub:
 return event
 
 if hub.client and hub.client.options["instrumenter"] != INSTRUMENTER.OTEL:
 return event
 
 if hasattr(event, "type") and event["type"] == "transaction":
 return event
 
 otel_span = get_current_span()
 if not otel_span:
 return event
 
 ctx = otel_span.get_span_context()
 trace_id = format_trace_id(ctx.trace_id)
 span_id = format_span_id(ctx.span_id)
 
 if trace_id == INVALID_TRACE_ID or span_id == INVALID_SPAN_ID:
 return event
 
 sentry_span = otel_span_map.get(span_id, None)
 if not sentry_span:
 return event
 
 contexts = event.setdefault("contexts", {})
 contexts.setdefault("trace", {}).update(sentry_span.get_trace_context())
 
 return event
 
 
 class SentrySpanProcessor(SpanProcessor):  # type: ignore
 """
 Converts OTel spans into Sentry spans so they can be sent to the Sentry backend.
 """
 
 # The mapping from otel span ids to sentry spans
 otel_span_map = {}  # type: Dict[str, Union[Transaction, SentrySpan]]
 
 def __new__(cls):
 # type: () -> SentrySpanProcessor
 if not hasattr(cls, "instance"):
 cls.instance = super(SentrySpanProcessor, cls).__new__(cls)
 
 return cls.instance
 
 def __init__(self):
 # type: () -> None
 @add_global_event_processor
 def global_event_processor(event, hint):
 # type: (Event, Hint) -> Event
 return link_trace_context_to_error_event(event, self.otel_span_map)
 
 def on_start(self, otel_span, parent_context=None):
 # type: (OTelSpan, Optional[SpanContext]) -> None
 hub = Hub.current
 if not hub:
 return
 
 if not hub.client or (hub.client and not hub.client.dsn):
 return
 
 try:
 _ = Dsn(hub.client.dsn or "")
 except Exception:
 return
 
 if hub.client and hub.client.options["instrumenter"] != INSTRUMENTER.OTEL:
 return
 
 if not otel_span.get_span_context().is_valid:
 return
 
 if self._is_sentry_span(hub, otel_span):
 return
 
 trace_data = self._get_trace_data(otel_span, parent_context)
 
 parent_span_id = trace_data["parent_span_id"]
 sentry_parent_span = (
 self.otel_span_map.get(parent_span_id, None) if parent_span_id else None
 )
 
 sentry_span = None
 if sentry_parent_span:
 sentry_span = sentry_parent_span.start_child(
 span_id=trace_data["span_id"],
 description=otel_span.name,
 start_timestamp=datetime.fromtimestamp(otel_span.start_time / 1e9),
 instrumenter=INSTRUMENTER.OTEL,
 )
 else:
 sentry_span = hub.start_transaction(
 name=otel_span.name,
 span_id=trace_data["span_id"],
 parent_span_id=parent_span_id,
 trace_id=trace_data["trace_id"],
 baggage=trace_data["baggage"],
 start_timestamp=datetime.fromtimestamp(otel_span.start_time / 1e9),
 instrumenter=INSTRUMENTER.OTEL,
 )
 
 self.otel_span_map[trace_data["span_id"]] = sentry_span
 
 def on_end(self, otel_span):
 # type: (OTelSpan) -> None
 hub = Hub.current
 if not hub:
 return
 
 if hub.client and hub.client.options["instrumenter"] != INSTRUMENTER.OTEL:
 return
 
 span_context = otel_span.get_span_context()
 if not span_context.is_valid:
 return
 
 span_id = format_span_id(span_context.span_id)
 sentry_span = self.otel_span_map.pop(span_id, None)
 if not sentry_span:
 return
 
 sentry_span.op = otel_span.name
 
 self._update_span_with_otel_status(sentry_span, otel_span)
 
 if isinstance(sentry_span, Transaction):
 sentry_span.name = otel_span.name
 sentry_span.set_context(
 OPEN_TELEMETRY_CONTEXT, self._get_otel_context(otel_span)
 )
 
 else:
 self._update_span_with_otel_data(sentry_span, otel_span)
 
 sentry_span.finish(
 end_timestamp=datetime.fromtimestamp(otel_span.end_time / 1e9)
 )
 
 def _is_sentry_span(self, hub, otel_span):
 # type: (Hub, OTelSpan) -> bool
 """
 Break infinite loop:
 HTTP requests to Sentry are caught by OTel and send again to Sentry.
 """
 otel_span_url = otel_span.attributes.get(SpanAttributes.HTTP_URL, None)
 dsn_url = hub.client and Dsn(hub.client.dsn or "").netloc
 
 if otel_span_url and dsn_url in otel_span_url:
 return True
 
 return False
 
 def _get_otel_context(self, otel_span):
 # type: (OTelSpan) -> Dict[str, Any]
 """
 Returns the OTel context for Sentry.
 See: https://develop.sentry.dev/sdk/performance/opentelemetry/#step-5-add-opentelemetry-context
 """
 ctx = {}
 
 if otel_span.attributes:
 ctx["attributes"] = dict(otel_span.attributes)
 
 if otel_span.resource.attributes:
 ctx["resource"] = dict(otel_span.resource.attributes)
 
 return ctx
 
 def _get_trace_data(self, otel_span, parent_context):
 # type: (OTelSpan, SpanContext) -> Dict[str, Any]
 """
 Extracts tracing information from one OTel span and its parent OTel context.
 """
 trace_data = {}
 span_context = otel_span.get_span_context()
 
 span_id = format_span_id(span_context.span_id)
 trace_data["span_id"] = span_id
 
 trace_id = format_trace_id(span_context.trace_id)
 trace_data["trace_id"] = trace_id
 
 parent_span_id = (
 format_span_id(otel_span.parent.span_id) if otel_span.parent else None
 )
 trace_data["parent_span_id"] = parent_span_id
 
 sentry_trace_data = get_value(SENTRY_TRACE_KEY, parent_context)
 trace_data["parent_sampled"] = (
 sentry_trace_data["parent_sampled"] if sentry_trace_data else None
 )
 
 baggage = get_value(SENTRY_BAGGAGE_KEY, parent_context)
 trace_data["baggage"] = baggage
 
 return trace_data
 
 def _update_span_with_otel_status(self, sentry_span, otel_span):
 # type: (SentrySpan, OTelSpan) -> None
 """
 Set the Sentry span status from the OTel span
 """
 if otel_span.status.is_unset:
 return
 
 if otel_span.status.is_ok:
 sentry_span.set_status("ok")
 return
 
 sentry_span.set_status("internal_error")
 
 def _update_span_with_otel_data(self, sentry_span, otel_span):
 # type: (SentrySpan, OTelSpan) -> None
 """
 Convert OTel span data and update the Sentry span with it.
 This should eventually happen on the server when ingesting the spans.
 """
 for key, val in otel_span.attributes.items():
 sentry_span.set_data(key, val)
 
 sentry_span.set_data("otel.kind", otel_span.kind)
 
 op = otel_span.name
 description = otel_span.name
 
 http_method = otel_span.attributes.get(SpanAttributes.HTTP_METHOD, None)
 db_query = otel_span.attributes.get(SpanAttributes.DB_SYSTEM, None)
 
 if http_method:
 op = "http"
 
 if otel_span.kind == SpanKind.SERVER:
 op += ".server"
 elif otel_span.kind == SpanKind.CLIENT:
 op += ".client"
 
 description = http_method
 
 peer_name = otel_span.attributes.get(SpanAttributes.NET_PEER_NAME, None)
 if peer_name:
 description += " {}".format(peer_name)
 
 target = otel_span.attributes.get(SpanAttributes.HTTP_TARGET, None)
 if target:
 description += " {}".format(target)
 
 if not peer_name and not target:
 url = otel_span.attributes.get(SpanAttributes.HTTP_URL, None)
 if url:
 parsed_url = urlparse(url)
 url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
 description += " {}".format(url)
 
 status_code = otel_span.attributes.get(
 SpanAttributes.HTTP_STATUS_CODE, None
 )
 if status_code:
 sentry_span.set_http_status(status_code)
 
 elif db_query:
 op = "db"
 statement = otel_span.attributes.get(SpanAttributes.DB_STATEMENT, None)
 if statement:
 description = statement
 
 sentry_span.op = op
 sentry_span.description = description
 
 |