| Viewing file:  sqlalchemy.py (4.14 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from __future__ import absolute_import
 from sentry_sdk._compat import text_type
 from sentry_sdk._types import TYPE_CHECKING
 from sentry_sdk.consts import SPANDATA
 from sentry_sdk.hub import Hub
 from sentry_sdk.integrations import Integration, DidNotEnable
 from sentry_sdk.tracing_utils import record_sql_queries
 
 from sentry_sdk.utils import parse_version
 
 try:
 from sqlalchemy.engine import Engine  # type: ignore
 from sqlalchemy.event import listen  # type: ignore
 from sqlalchemy import __version__ as SQLALCHEMY_VERSION  # type: ignore
 except ImportError:
 raise DidNotEnable("SQLAlchemy not installed.")
 
 if TYPE_CHECKING:
 from typing import Any
 from typing import ContextManager
 from typing import Optional
 
 from sentry_sdk.tracing import Span
 
 
 class SqlalchemyIntegration(Integration):
 identifier = "sqlalchemy"
 
 @staticmethod
 def setup_once():
 # type: () -> None
 
 version = parse_version(SQLALCHEMY_VERSION)
 
 if version is None:
 raise DidNotEnable(
 "Unparsable SQLAlchemy version: {}".format(SQLALCHEMY_VERSION)
 )
 
 if version < (1, 2):
 raise DidNotEnable("SQLAlchemy 1.2 or newer required.")
 
 listen(Engine, "before_cursor_execute", _before_cursor_execute)
 listen(Engine, "after_cursor_execute", _after_cursor_execute)
 listen(Engine, "handle_error", _handle_error)
 
 
 def _before_cursor_execute(
 conn, cursor, statement, parameters, context, executemany, *args
 ):
 # type: (Any, Any, Any, Any, Any, bool, *Any) -> None
 hub = Hub.current
 if hub.get_integration(SqlalchemyIntegration) is None:
 return
 
 ctx_mgr = record_sql_queries(
 hub,
 cursor,
 statement,
 parameters,
 paramstyle=context and context.dialect and context.dialect.paramstyle or None,
 executemany=executemany,
 )
 context._sentry_sql_span_manager = ctx_mgr
 
 span = ctx_mgr.__enter__()
 
 if span is not None:
 _set_db_data(span, conn)
 context._sentry_sql_span = span
 
 
 def _after_cursor_execute(conn, cursor, statement, parameters, context, *args):
 # type: (Any, Any, Any, Any, Any, *Any) -> None
 ctx_mgr = getattr(
 context, "_sentry_sql_span_manager", None
 )  # type: Optional[ContextManager[Any]]
 
 if ctx_mgr is not None:
 context._sentry_sql_span_manager = None
 ctx_mgr.__exit__(None, None, None)
 
 
 def _handle_error(context, *args):
 # type: (Any, *Any) -> None
 execution_context = context.execution_context
 if execution_context is None:
 return
 
 span = getattr(execution_context, "_sentry_sql_span", None)  # type: Optional[Span]
 
 if span is not None:
 span.set_status("internal_error")
 
 # _after_cursor_execute does not get called for crashing SQL stmts. Judging
 # from SQLAlchemy codebase it does seem like any error coming into this
 # handler is going to be fatal.
 ctx_mgr = getattr(
 execution_context, "_sentry_sql_span_manager", None
 )  # type: Optional[ContextManager[Any]]
 
 if ctx_mgr is not None:
 execution_context._sentry_sql_span_manager = None
 ctx_mgr.__exit__(None, None, None)
 
 
 # See: https://docs.sqlalchemy.org/en/20/dialects/index.html
 def _get_db_system(name):
 # type: (str) -> Optional[str]
 name = text_type(name)
 
 if "sqlite" in name:
 return "sqlite"
 
 if "postgres" in name:
 return "postgresql"
 
 if "mariadb" in name:
 return "mariadb"
 
 if "mysql" in name:
 return "mysql"
 
 if "oracle" in name:
 return "oracle"
 
 return None
 
 
 def _set_db_data(span, conn):
 # type: (Span, Any) -> None
 db_system = _get_db_system(conn.engine.name)
 if db_system is not None:
 span.set_data(SPANDATA.DB_SYSTEM, db_system)
 
 db_name = conn.engine.url.database
 if db_name is not None:
 span.set_data(SPANDATA.DB_NAME, db_name)
 
 server_address = conn.engine.url.host
 if server_address is not None:
 span.set_data(SPANDATA.SERVER_ADDRESS, server_address)
 
 server_port = conn.engine.url.port
 if server_port is not None:
 span.set_data(SPANDATA.SERVER_PORT, server_port)
 
 |