| Viewing file:  huey.py (4.59 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from __future__ import absolute_import
 import sys
 from datetime import datetime
 
 from sentry_sdk._compat import reraise
 from sentry_sdk._types import TYPE_CHECKING
 from sentry_sdk import Hub
 from sentry_sdk.consts import OP
 from sentry_sdk.hub import _should_send_default_pii
 from sentry_sdk.integrations import DidNotEnable, Integration
 from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK
 from sentry_sdk.utils import (
 capture_internal_exceptions,
 event_from_exception,
 SENSITIVE_DATA_SUBSTITUTE,
 )
 
 if TYPE_CHECKING:
 from typing import Any, Callable, Optional, Union, TypeVar
 
 from sentry_sdk._types import EventProcessor, Event, Hint
 from sentry_sdk.utils import ExcInfo
 
 F = TypeVar("F", bound=Callable[..., Any])
 
 try:
 from huey.api import Huey, Result, ResultGroup, Task
 from huey.exceptions import CancelExecution, RetryTask, TaskLockedException
 except ImportError:
 raise DidNotEnable("Huey is not installed")
 
 
 HUEY_CONTROL_FLOW_EXCEPTIONS = (CancelExecution, RetryTask, TaskLockedException)
 
 
 class HueyIntegration(Integration):
 identifier = "huey"
 
 @staticmethod
 def setup_once():
 # type: () -> None
 patch_enqueue()
 patch_execute()
 
 
 def patch_enqueue():
 # type: () -> None
 old_enqueue = Huey.enqueue
 
 def _sentry_enqueue(self, task):
 # type: (Huey, Task) -> Optional[Union[Result, ResultGroup]]
 hub = Hub.current
 
 if hub.get_integration(HueyIntegration) is None:
 return old_enqueue(self, task)
 
 with hub.start_span(op=OP.QUEUE_SUBMIT_HUEY, description=task.name):
 return old_enqueue(self, task)
 
 Huey.enqueue = _sentry_enqueue
 
 
 def _make_event_processor(task):
 # type: (Any) -> EventProcessor
 def event_processor(event, hint):
 # type: (Event, Hint) -> Optional[Event]
 
 with capture_internal_exceptions():
 tags = event.setdefault("tags", {})
 tags["huey_task_id"] = task.id
 tags["huey_task_retry"] = task.default_retries > task.retries
 extra = event.setdefault("extra", {})
 extra["huey-job"] = {
 "task": task.name,
 "args": task.args
 if _should_send_default_pii()
 else SENSITIVE_DATA_SUBSTITUTE,
 "kwargs": task.kwargs
 if _should_send_default_pii()
 else SENSITIVE_DATA_SUBSTITUTE,
 "retry": (task.default_retries or 0) - task.retries,
 }
 
 return event
 
 return event_processor
 
 
 def _capture_exception(exc_info):
 # type: (ExcInfo) -> None
 hub = Hub.current
 
 if exc_info[0] in HUEY_CONTROL_FLOW_EXCEPTIONS:
 hub.scope.transaction.set_status("aborted")
 return
 
 hub.scope.transaction.set_status("internal_error")
 event, hint = event_from_exception(
 exc_info,
 client_options=hub.client.options if hub.client else None,
 mechanism={"type": HueyIntegration.identifier, "handled": False},
 )
 hub.capture_event(event, hint=hint)
 
 
 def _wrap_task_execute(func):
 # type: (F) -> F
 def _sentry_execute(*args, **kwargs):
 # type: (*Any, **Any) -> Any
 hub = Hub.current
 if hub.get_integration(HueyIntegration) is None:
 return func(*args, **kwargs)
 
 try:
 result = func(*args, **kwargs)
 except Exception:
 exc_info = sys.exc_info()
 _capture_exception(exc_info)
 reraise(*exc_info)
 
 return result
 
 return _sentry_execute  # type: ignore
 
 
 def patch_execute():
 # type: () -> None
 old_execute = Huey._execute
 
 def _sentry_execute(self, task, timestamp=None):
 # type: (Huey, Task, Optional[datetime]) -> Any
 hub = Hub.current
 
 if hub.get_integration(HueyIntegration) is None:
 return old_execute(self, task, timestamp)
 
 with hub.push_scope() as scope:
 with capture_internal_exceptions():
 scope._name = "huey"
 scope.clear_breadcrumbs()
 scope.add_event_processor(_make_event_processor(task))
 
 transaction = Transaction(
 name=task.name,
 status="ok",
 op=OP.QUEUE_TASK_HUEY,
 source=TRANSACTION_SOURCE_TASK,
 )
 
 if not getattr(task, "_sentry_is_patched", False):
 task.execute = _wrap_task_execute(task.execute)
 task._sentry_is_patched = True
 
 with hub.start_transaction(transaction):
 return old_execute(self, task, timestamp)
 
 Huey._execute = _sentry_execute
 
 |