| Viewing file:  spark_driver.py (8.27 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from sentry_sdk import configure_scopefrom sentry_sdk.hub import Hub
 from sentry_sdk.integrations import Integration
 from sentry_sdk.utils import capture_internal_exceptions
 
 from sentry_sdk._types import MYPY
 
 if MYPY:
 from typing import Any
 from typing import Optional
 
 from sentry_sdk._types import Event, Hint
 
 
 class SparkIntegration(Integration):
 identifier = "spark"
 
 @staticmethod
 def setup_once():
 # type: () -> None
 patch_spark_context_init()
 
 
 def _set_app_properties():
 # type: () -> None
 """
 Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
 This allows worker integration to have access to app_name and application_id.
 """
 from pyspark import SparkContext
 
 spark_context = SparkContext._active_spark_context
 if spark_context:
 spark_context.setLocalProperty("sentry_app_name", spark_context.appName)
 spark_context.setLocalProperty(
 "sentry_application_id", spark_context.applicationId
 )
 
 
 def _start_sentry_listener(sc):
 # type: (Any) -> None
 """
 Start java gateway server to add custom `SparkListener`
 """
 from pyspark.java_gateway import ensure_callback_server_started
 
 gw = sc._gateway
 ensure_callback_server_started(gw)
 listener = SentryListener()
 sc._jsc.sc().addSparkListener(listener)
 
 
 def patch_spark_context_init():
 # type: () -> None
 from pyspark import SparkContext
 
 spark_context_init = SparkContext._do_init
 
 def _sentry_patched_spark_context_init(self, *args, **kwargs):
 # type: (SparkContext, *Any, **Any) -> Optional[Any]
 init = spark_context_init(self, *args, **kwargs)
 
 if Hub.current.get_integration(SparkIntegration) is None:
 return init
 
 _start_sentry_listener(self)
 _set_app_properties()
 
 with configure_scope() as scope:
 
 @scope.add_event_processor
 def process_event(event, hint):
 # type: (Event, Hint) -> Optional[Event]
 with capture_internal_exceptions():
 if Hub.current.get_integration(SparkIntegration) is None:
 return event
 
 event.setdefault("user", {}).setdefault("id", self.sparkUser())
 
 event.setdefault("tags", {}).setdefault(
 "executor.id", self._conf.get("spark.executor.id")
 )
 event["tags"].setdefault(
 "spark-submit.deployMode",
 self._conf.get("spark.submit.deployMode"),
 )
 event["tags"].setdefault(
 "driver.host", self._conf.get("spark.driver.host")
 )
 event["tags"].setdefault(
 "driver.port", self._conf.get("spark.driver.port")
 )
 event["tags"].setdefault("spark_version", self.version)
 event["tags"].setdefault("app_name", self.appName)
 event["tags"].setdefault("application_id", self.applicationId)
 event["tags"].setdefault("master", self.master)
 event["tags"].setdefault("spark_home", self.sparkHome)
 
 event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)
 
 return event
 
 return init
 
 SparkContext._do_init = _sentry_patched_spark_context_init
 
 
 class SparkListener(object):
 def onApplicationEnd(self, applicationEnd):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onApplicationStart(self, applicationStart):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onBlockManagerAdded(self, blockManagerAdded):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onBlockManagerRemoved(self, blockManagerRemoved):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onBlockUpdated(self, blockUpdated):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onEnvironmentUpdate(self, environmentUpdate):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onExecutorAdded(self, executorAdded):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onExecutorBlacklisted(self, executorBlacklisted):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onExecutorBlacklistedForStage(  # noqa: N802
 self, executorBlacklistedForStage  # noqa: N803
 ):
 # type: (Any) -> None
 pass
 
 def onExecutorMetricsUpdate(self, executorMetricsUpdate):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onExecutorRemoved(self, executorRemoved):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onJobEnd(self, jobEnd):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onJobStart(self, jobStart):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onNodeBlacklisted(self, nodeBlacklisted):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onNodeBlacklistedForStage(self, nodeBlacklistedForStage):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onNodeUnblacklisted(self, nodeUnblacklisted):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onOtherEvent(self, event):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onSpeculativeTaskSubmitted(self, speculativeTask):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onStageCompleted(self, stageCompleted):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onStageSubmitted(self, stageSubmitted):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onTaskEnd(self, taskEnd):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onTaskGettingResult(self, taskGettingResult):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onTaskStart(self, taskStart):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 def onUnpersistRDD(self, unpersistRDD):  # noqa: N802,N803
 # type: (Any) -> None
 pass
 
 class Java:
 implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
 
 
 class SentryListener(SparkListener):
 def __init__(self):
 # type: () -> None
 self.hub = Hub.current
 
 def onJobStart(self, jobStart):  # noqa: N802,N803
 # type: (Any) -> None
 message = "Job {} Started".format(jobStart.jobId())
 self.hub.add_breadcrumb(level="info", message=message)
 _set_app_properties()
 
 def onJobEnd(self, jobEnd):  # noqa: N802,N803
 # type: (Any) -> None
 level = ""
 message = ""
 data = {"result": jobEnd.jobResult().toString()}
 
 if jobEnd.jobResult().toString() == "JobSucceeded":
 level = "info"
 message = "Job {} Ended".format(jobEnd.jobId())
 else:
 level = "warning"
 message = "Job {} Failed".format(jobEnd.jobId())
 
 self.hub.add_breadcrumb(level=level, message=message, data=data)
 
 def onStageSubmitted(self, stageSubmitted):  # noqa: N802,N803
 # type: (Any) -> None
 stage_info = stageSubmitted.stageInfo()
 message = "Stage {} Submitted".format(stage_info.stageId())
 data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}
 self.hub.add_breadcrumb(level="info", message=message, data=data)
 _set_app_properties()
 
 def onStageCompleted(self, stageCompleted):  # noqa: N802,N803
 # type: (Any) -> None
 from py4j.protocol import Py4JJavaError  # type: ignore
 
 stage_info = stageCompleted.stageInfo()
 message = ""
 level = ""
 data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}
 
 # Have to Try Except because stageInfo.failureReason() is typed with Scala Option
 try:
 data["reason"] = stage_info.failureReason().get()
 message = "Stage {} Failed".format(stage_info.stageId())
 level = "warning"
 except Py4JJavaError:
 message = "Stage {} Completed".format(stage_info.stageId())
 level = "info"
 
 self.hub.add_breadcrumb(level=level, message=message, data=data)
 
 |