| Viewing file:  spark_worker.py (3.89 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from __future__ import absolute_import
 import sys
 
 from sentry_sdk import configure_scope
 from sentry_sdk.hub import Hub
 from sentry_sdk.integrations import Integration
 from sentry_sdk.utils import (
 capture_internal_exceptions,
 exc_info_from_error,
 single_exception_from_error_tuple,
 walk_exception_chain,
 event_hint_with_exc_info,
 )
 
 from sentry_sdk._types import MYPY
 
 if MYPY:
 from typing import Any
 from typing import Optional
 
 from sentry_sdk._types import ExcInfo, Event, Hint
 
 
 class SparkWorkerIntegration(Integration):
 identifier = "spark_worker"
 
 @staticmethod
 def setup_once():
 # type: () -> None
 import pyspark.daemon as original_daemon
 
 original_daemon.worker_main = _sentry_worker_main
 
 
 def _capture_exception(exc_info, hub):
 # type: (ExcInfo, Hub) -> None
 client = hub.client
 
 client_options = client.options  # type: ignore
 
 mechanism = {"type": "spark", "handled": False}
 
 exc_info = exc_info_from_error(exc_info)
 
 exc_type, exc_value, tb = exc_info
 rv = []
 
 # On Exception worker will call sys.exit(-1), so we can ignore SystemExit and similar errors
 for exc_type, exc_value, tb in walk_exception_chain(exc_info):
 if exc_type not in (SystemExit, EOFError, ConnectionResetError):
 rv.append(
 single_exception_from_error_tuple(
 exc_type, exc_value, tb, client_options, mechanism
 )
 )
 
 if rv:
 rv.reverse()
 hint = event_hint_with_exc_info(exc_info)
 event = {"level": "error", "exception": {"values": rv}}
 
 _tag_task_context()
 
 hub.capture_event(event, hint=hint)
 
 
 def _tag_task_context():
 # type: () -> None
 from pyspark.taskcontext import TaskContext
 
 with configure_scope() as scope:
 
 @scope.add_event_processor
 def process_event(event, hint):
 # type: (Event, Hint) -> Optional[Event]
 with capture_internal_exceptions():
 integration = Hub.current.get_integration(SparkWorkerIntegration)
 task_context = TaskContext.get()
 
 if integration is None or task_context is None:
 return event
 
 event.setdefault("tags", {}).setdefault(
 "stageId", str(task_context.stageId())
 )
 event["tags"].setdefault("partitionId", str(task_context.partitionId()))
 event["tags"].setdefault(
 "attemptNumber", str(task_context.attemptNumber())
 )
 event["tags"].setdefault(
 "taskAttemptId", str(task_context.taskAttemptId())
 )
 
 if task_context._localProperties:
 if "sentry_app_name" in task_context._localProperties:
 event["tags"].setdefault(
 "app_name", task_context._localProperties["sentry_app_name"]
 )
 event["tags"].setdefault(
 "application_id",
 task_context._localProperties["sentry_application_id"],
 )
 
 if "callSite.short" in task_context._localProperties:
 event.setdefault("extra", {}).setdefault(
 "callSite", task_context._localProperties["callSite.short"]
 )
 
 return event
 
 
 def _sentry_worker_main(*args, **kwargs):
 # type: (*Optional[Any], **Optional[Any]) -> None
 import pyspark.worker as original_worker
 
 try:
 original_worker.main(*args, **kwargs)
 except SystemExit:
 if Hub.current.get_integration(SparkWorkerIntegration) is not None:
 hub = Hub.current
 exc_info = sys.exc_info()
 with capture_internal_exceptions():
 _capture_exception(exc_info, hub)
 
 |