| Viewing file:  arq.py (6.58 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from __future__ import absolute_import
 import sys
 
 from sentry_sdk._compat import reraise
 from sentry_sdk._types import TYPE_CHECKING
 from sentry_sdk import Hub
 from sentry_sdk.consts import OP
 from sentry_sdk.hub import _should_send_default_pii
 from sentry_sdk.integrations import DidNotEnable, Integration
 from sentry_sdk.integrations.logging import ignore_logger
 from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK
 from sentry_sdk.utils import (
 capture_internal_exceptions,
 event_from_exception,
 SENSITIVE_DATA_SUBSTITUTE,
 parse_version,
 )
 
 try:
 import arq.worker
 from arq.version import VERSION as ARQ_VERSION
 from arq.connections import ArqRedis
 from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker
 except ImportError:
 raise DidNotEnable("Arq is not installed")
 
 if TYPE_CHECKING:
 from typing import Any, Dict, Optional, Union
 
 from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint
 
 from arq.cron import CronJob
 from arq.jobs import Job
 from arq.typing import WorkerCoroutine
 from arq.worker import Function
 
 ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob)
 
 
 class ArqIntegration(Integration):
 identifier = "arq"
 
 @staticmethod
 def setup_once():
 # type: () -> None
 
 try:
 if isinstance(ARQ_VERSION, str):
 version = parse_version(ARQ_VERSION)
 else:
 version = ARQ_VERSION.version[:2]
 
 except (TypeError, ValueError):
 version = None
 
 if version is None:
 raise DidNotEnable("Unparsable arq version: {}".format(ARQ_VERSION))
 
 if version < (0, 23):
 raise DidNotEnable("arq 0.23 or newer required.")
 
 patch_enqueue_job()
 patch_run_job()
 patch_create_worker()
 
 ignore_logger("arq.worker")
 
 
 def patch_enqueue_job():
 # type: () -> None
 old_enqueue_job = ArqRedis.enqueue_job
 
 async def _sentry_enqueue_job(self, function, *args, **kwargs):
 # type: (ArqRedis, str, *Any, **Any) -> Optional[Job]
 hub = Hub.current
 
 if hub.get_integration(ArqIntegration) is None:
 return await old_enqueue_job(self, function, *args, **kwargs)
 
 with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function):
 return await old_enqueue_job(self, function, *args, **kwargs)
 
 ArqRedis.enqueue_job = _sentry_enqueue_job
 
 
 def patch_run_job():
 # type: () -> None
 old_run_job = Worker.run_job
 
 async def _sentry_run_job(self, job_id, score):
 # type: (Worker, str, int) -> None
 hub = Hub(Hub.current)
 
 if hub.get_integration(ArqIntegration) is None:
 return await old_run_job(self, job_id, score)
 
 with hub.push_scope() as scope:
 scope._name = "arq"
 scope.clear_breadcrumbs()
 
 transaction = Transaction(
 name="unknown arq task",
 status="ok",
 op=OP.QUEUE_TASK_ARQ,
 source=TRANSACTION_SOURCE_TASK,
 )
 
 with hub.start_transaction(transaction):
 return await old_run_job(self, job_id, score)
 
 Worker.run_job = _sentry_run_job
 
 
 def _capture_exception(exc_info):
 # type: (ExcInfo) -> None
 hub = Hub.current
 
 if hub.scope.transaction is not None:
 if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS:
 hub.scope.transaction.set_status("aborted")
 return
 
 hub.scope.transaction.set_status("internal_error")
 
 event, hint = event_from_exception(
 exc_info,
 client_options=hub.client.options if hub.client else None,
 mechanism={"type": ArqIntegration.identifier, "handled": False},
 )
 hub.capture_event(event, hint=hint)
 
 
 def _make_event_processor(ctx, *args, **kwargs):
 # type: (Dict[Any, Any], *Any, **Any) -> EventProcessor
 def event_processor(event, hint):
 # type: (Event, Hint) -> Optional[Event]
 
 hub = Hub.current
 
 with capture_internal_exceptions():
 if hub.scope.transaction is not None:
 hub.scope.transaction.name = ctx["job_name"]
 event["transaction"] = ctx["job_name"]
 
 tags = event.setdefault("tags", {})
 tags["arq_task_id"] = ctx["job_id"]
 tags["arq_task_retry"] = ctx["job_try"] > 1
 extra = event.setdefault("extra", {})
 extra["arq-job"] = {
 "task": ctx["job_name"],
 "args": args
 if _should_send_default_pii()
 else SENSITIVE_DATA_SUBSTITUTE,
 "kwargs": kwargs
 if _should_send_default_pii()
 else SENSITIVE_DATA_SUBSTITUTE,
 "retry": ctx["job_try"],
 }
 
 return event
 
 return event_processor
 
 
 def _wrap_coroutine(name, coroutine):
 # type: (str, WorkerCoroutine) -> WorkerCoroutine
 async def _sentry_coroutine(ctx, *args, **kwargs):
 # type: (Dict[Any, Any], *Any, **Any) -> Any
 hub = Hub.current
 if hub.get_integration(ArqIntegration) is None:
 return await coroutine(*args, **kwargs)
 
 hub.scope.add_event_processor(
 _make_event_processor({**ctx, "job_name": name}, *args, **kwargs)
 )
 
 try:
 result = await coroutine(ctx, *args, **kwargs)
 except Exception:
 exc_info = sys.exc_info()
 _capture_exception(exc_info)
 reraise(*exc_info)
 
 return result
 
 return _sentry_coroutine
 
 
 def patch_create_worker():
 # type: () -> None
 old_create_worker = arq.worker.create_worker
 
 def _sentry_create_worker(*args, **kwargs):
 # type: (*Any, **Any) -> Worker
 hub = Hub.current
 
 if hub.get_integration(ArqIntegration) is None:
 return old_create_worker(*args, **kwargs)
 
 settings_cls = args[0]
 
 functions = settings_cls.functions
 cron_jobs = settings_cls.cron_jobs
 
 settings_cls.functions = [_get_arq_function(func) for func in functions]
 settings_cls.cron_jobs = [_get_arq_cron_job(cron_job) for cron_job in cron_jobs]
 
 return old_create_worker(*args, **kwargs)
 
 arq.worker.create_worker = _sentry_create_worker
 
 
 def _get_arq_function(func):
 # type: (Union[str, Function, WorkerCoroutine]) -> Function
 arq_func = arq.worker.func(func)
 arq_func.coroutine = _wrap_coroutine(arq_func.name, arq_func.coroutine)
 
 return arq_func
 
 
 def _get_arq_cron_job(cron_job):
 # type: (CronJob) -> CronJob
 cron_job.coroutine = _wrap_coroutine(cron_job.name, cron_job.coroutine)
 
 return cron_job
 
 |