| Viewing file:  __init__.py (7.75 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from __future__ import absolute_import
 from sentry_sdk import Hub
 from sentry_sdk.consts import OP, SPANDATA
 from sentry_sdk.hub import _should_send_default_pii
 from sentry_sdk.utils import (
 SENSITIVE_DATA_SUBSTITUTE,
 capture_internal_exceptions,
 logger,
 )
 from sentry_sdk.integrations import Integration, DidNotEnable
 
 from sentry_sdk._types import TYPE_CHECKING
 
 if TYPE_CHECKING:
 from typing import Any, Sequence
 from sentry_sdk.tracing import Span
 
 _SINGLE_KEY_COMMANDS = frozenset(
 ["decr", "decrby", "get", "incr", "incrby", "pttl", "set", "setex", "setnx", "ttl"]
 )
 _MULTI_KEY_COMMANDS = frozenset(["del", "touch", "unlink"])
 
 _COMMANDS_INCLUDING_SENSITIVE_DATA = [
 "auth",
 ]
 
 _MAX_NUM_ARGS = 10  # Trim argument lists to this many values
 _MAX_NUM_COMMANDS = 10  # Trim command lists to this many values
 
 _DEFAULT_MAX_DATA_SIZE = 1024
 
 
 def _get_safe_command(name, args):
 # type: (str, Sequence[Any]) -> str
 command_parts = [name]
 
 for i, arg in enumerate(args):
 if i > _MAX_NUM_ARGS:
 break
 
 name_low = name.lower()
 
 if name_low in _COMMANDS_INCLUDING_SENSITIVE_DATA:
 command_parts.append(SENSITIVE_DATA_SUBSTITUTE)
 continue
 
 arg_is_the_key = i == 0
 if arg_is_the_key:
 command_parts.append(repr(arg))
 
 else:
 if _should_send_default_pii():
 command_parts.append(repr(arg))
 else:
 command_parts.append(SENSITIVE_DATA_SUBSTITUTE)
 
 command = " ".join(command_parts)
 return command
 
 
 def _set_pipeline_data(
 span, is_cluster, get_command_args_fn, is_transaction, command_stack
 ):
 # type: (Span, bool, Any, bool, Sequence[Any]) -> None
 span.set_tag("redis.is_cluster", is_cluster)
 transaction = is_transaction if not is_cluster else False
 span.set_tag("redis.transaction", transaction)
 
 commands = []
 for i, arg in enumerate(command_stack):
 if i >= _MAX_NUM_COMMANDS:
 break
 
 command = get_command_args_fn(arg)
 commands.append(_get_safe_command(command[0], command[1:]))
 
 span.set_data(
 "redis.commands",
 {
 "count": len(command_stack),
 "first_ten": commands,
 },
 )
 
 
 def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
 # type: (Any, bool, Any) -> None
 old_execute = pipeline_cls.execute
 
 def sentry_patched_execute(self, *args, **kwargs):
 # type: (Any, *Any, **Any) -> Any
 hub = Hub.current
 
 if hub.get_integration(RedisIntegration) is None:
 return old_execute(self, *args, **kwargs)
 
 with hub.start_span(
 op=OP.DB_REDIS, description="redis.pipeline.execute"
 ) as span:
 with capture_internal_exceptions():
 _set_pipeline_data(
 span,
 is_cluster,
 get_command_args_fn,
 self.transaction,
 self.command_stack,
 )
 span.set_data(SPANDATA.DB_SYSTEM, "redis")
 
 return old_execute(self, *args, **kwargs)
 
 pipeline_cls.execute = sentry_patched_execute
 
 
 def _get_redis_command_args(command):
 # type: (Any) -> Sequence[Any]
 return command[0]
 
 
 def _parse_rediscluster_command(command):
 # type: (Any) -> Sequence[Any]
 return command.args
 
 
 def _patch_redis(StrictRedis, client):  # noqa: N803
 # type: (Any, Any) -> None
 patch_redis_client(StrictRedis, is_cluster=False)
 patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
 try:
 strict_pipeline = client.StrictPipeline
 except AttributeError:
 pass
 else:
 patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
 
 try:
 import redis.asyncio
 except ImportError:
 pass
 else:
 from sentry_sdk.integrations.redis.asyncio import (
 patch_redis_async_client,
 patch_redis_async_pipeline,
 )
 
 patch_redis_async_client(redis.asyncio.client.StrictRedis)
 patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
 
 
 def _patch_rb():
 # type: () -> None
 try:
 import rb.clients  # type: ignore
 except ImportError:
 pass
 else:
 patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
 patch_redis_client(rb.clients.MappingClient, is_cluster=False)
 patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
 
 
 def _patch_rediscluster():
 # type: () -> None
 try:
 import rediscluster  # type: ignore
 except ImportError:
 return
 
 patch_redis_client(rediscluster.RedisCluster, is_cluster=True)
 
 # up to v1.3.6, __version__ attribute is a tuple
 # from v2.0.0, __version__ is a string and VERSION a tuple
 version = getattr(rediscluster, "VERSION", rediscluster.__version__)
 
 # StrictRedisCluster was introduced in v0.2.0 and removed in v2.0.0
 # https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst
 if (0, 2, 0) < version < (2, 0, 0):
 pipeline_cls = rediscluster.pipeline.StrictClusterPipeline
 patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True)
 else:
 pipeline_cls = rediscluster.pipeline.ClusterPipeline
 
 patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command)
 
 
 class RedisIntegration(Integration):
 identifier = "redis"
 
 def __init__(self, max_data_size=_DEFAULT_MAX_DATA_SIZE):
 # type: (int) -> None
 self.max_data_size = max_data_size
 
 @staticmethod
 def setup_once():
 # type: () -> None
 try:
 from redis import StrictRedis, client
 except ImportError:
 raise DidNotEnable("Redis client not installed")
 
 _patch_redis(StrictRedis, client)
 _patch_rb()
 
 try:
 _patch_rediscluster()
 except Exception:
 logger.exception("Error occurred while patching `rediscluster` library")
 
 
 def _get_span_description(name, *args):
 # type: (str, *Any) -> str
 description = name
 
 with capture_internal_exceptions():
 description = _get_safe_command(name, args)
 
 return description
 
 
 def _set_client_data(span, is_cluster, name, *args):
 # type: (Span, bool, str, *Any) -> None
 span.set_data(SPANDATA.DB_SYSTEM, "redis")
 span.set_tag("redis.is_cluster", is_cluster)
 if name:
 span.set_tag("redis.command", name)
 span.set_tag(SPANDATA.DB_OPERATION, name)
 
 if name and args:
 name_low = name.lower()
 if (name_low in _SINGLE_KEY_COMMANDS) or (
 name_low in _MULTI_KEY_COMMANDS and len(args) == 1
 ):
 span.set_tag("redis.key", args[0])
 
 
 def patch_redis_client(cls, is_cluster):
 # type: (Any, bool) -> None
 """
 This function can be used to instrument custom redis client classes or
 subclasses.
 """
 old_execute_command = cls.execute_command
 
 def sentry_patched_execute_command(self, name, *args, **kwargs):
 # type: (Any, str, *Any, **Any) -> Any
 hub = Hub.current
 integration = hub.get_integration(RedisIntegration)
 
 if integration is None:
 return old_execute_command(self, name, *args, **kwargs)
 
 description = _get_span_description(name, *args)
 
 data_should_be_truncated = (
 integration.max_data_size and len(description) > integration.max_data_size
 )
 if data_should_be_truncated:
 description = description[: integration.max_data_size - len("...")] + "..."
 
 with hub.start_span(op=OP.DB_REDIS, description=description) as span:
 _set_client_data(span, is_cluster, name, *args)
 
 return old_execute_command(self, name, *args, **kwargs)
 
 cls.execute_command = sentry_patched_execute_command
 
 |