| Viewing file:  provision.py (12.95 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
import collectionsimport logging
 import os
 import time
 
 from . import config
 from . import engines
 from .. import create_engine
 from .. import exc
 from .. import text
 from ..engine import url as sa_url
 from ..util import compat
 
 
 log = logging.getLogger(__name__)
 
 FOLLOWER_IDENT = None
 
 
 class register(object):
 def __init__(self):
 self.fns = {}
 
 @classmethod
 def init(cls, fn):
 return register().for_db("*")(fn)
 
 def for_db(self, dbname):
 def decorate(fn):
 self.fns[dbname] = fn
 return self
 
 return decorate
 
 def __call__(self, cfg, *arg):
 if isinstance(cfg, compat.string_types):
 url = sa_url.make_url(cfg)
 elif isinstance(cfg, sa_url.URL):
 url = cfg
 else:
 url = cfg.db.url
 backend = url.get_backend_name()
 if backend in self.fns:
 return self.fns[backend](cfg, *arg)
 else:
 return self.fns["*"](cfg, *arg)
 
 
 def create_follower_db(follower_ident):
 for cfg in _configs_for_db_operation():
 log.info("CREATE database %s, URI %r", follower_ident, cfg.db.url)
 _create_db(cfg, cfg.db, follower_ident)
 
 
 def configure_follower(follower_ident):
 for cfg in config.Config.all_configs():
 _configure_follower(cfg, follower_ident)
 
 
 def setup_config(db_url, options, file_config, follower_ident):
 if follower_ident:
 db_url = _follower_url_from_main(db_url, follower_ident)
 db_opts = {}
 _update_db_opts(db_url, db_opts)
 eng = engines.testing_engine(db_url, db_opts)
 _post_configure_engine(db_url, eng, follower_ident)
 eng.connect().close()
 
 cfg = config.Config.register(eng, db_opts, options, file_config)
 if follower_ident:
 _configure_follower(cfg, follower_ident)
 return cfg
 
 
 def drop_follower_db(follower_ident):
 for cfg in _configs_for_db_operation():
 log.info("DROP database %s, URI %r", follower_ident, cfg.db.url)
 _drop_db(cfg, cfg.db, follower_ident)
 
 
 def _configs_for_db_operation():
 hosts = set()
 
 for cfg in config.Config.all_configs():
 cfg.db.dispose()
 
 for cfg in config.Config.all_configs():
 url = cfg.db.url
 backend = url.get_backend_name()
 host_conf = (backend, url.username, url.host, url.database)
 
 if host_conf not in hosts:
 yield cfg
 hosts.add(host_conf)
 
 for cfg in config.Config.all_configs():
 cfg.db.dispose()
 
 
 @register.init
 def _create_db(cfg, eng, ident):
 raise NotImplementedError("no DB creation routine for cfg: %s" % eng.url)
 
 
 @register.init
 def _drop_db(cfg, eng, ident):
 raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)
 
 
 @register.init
 def _update_db_opts(db_url, db_opts):
 pass
 
 
 @register.init
 def _configure_follower(cfg, ident):
 pass
 
 
 @register.init
 def _post_configure_engine(url, engine, follower_ident):
 pass
 
 
 @register.init
 def _follower_url_from_main(url, ident):
 url = sa_url.make_url(url)
 url.database = ident
 return url
 
 
 @_update_db_opts.for_db("mssql")
 def _mssql_update_db_opts(db_url, db_opts):
 db_opts["legacy_schema_aliasing"] = False
 
 
 @_follower_url_from_main.for_db("sqlite")
 def _sqlite_follower_url_from_main(url, ident):
 url = sa_url.make_url(url)
 if not url.database or url.database == ":memory:":
 return url
 else:
 return sa_url.make_url("sqlite:///%s.db" % ident)
 
 
 @_post_configure_engine.for_db("sqlite")
 def _sqlite_post_configure_engine(url, engine, follower_ident):
 from sqlalchemy import event
 
 @event.listens_for(engine, "connect")
 def connect(dbapi_connection, connection_record):
 # use file DBs in all cases, memory acts kind of strangely
 # as an attached
 if not follower_ident:
 dbapi_connection.execute(
 'ATTACH DATABASE "test_schema.db" AS test_schema'
 )
 else:
 dbapi_connection.execute(
 'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
 % follower_ident
 )
 
 
 @_create_db.for_db("postgresql")
 def _pg_create_db(cfg, eng, ident):
 template_db = cfg.options.postgresql_templatedb
 
 with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
 try:
 _pg_drop_db(cfg, conn, ident)
 except Exception:
 pass
 if not template_db:
 template_db = conn.scalar("select current_database()")
 
 attempt = 0
 while True:
 try:
 conn.execute(
 "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db)
 )
 except exc.OperationalError as err:
 attempt += 1
 if attempt >= 3:
 raise
 if "accessed by other users" in str(err):
 log.info(
 "Waiting to create %s, URI %r, "
 "template DB %s is in use sleeping for .5",
 ident,
 eng.url,
 template_db,
 )
 time.sleep(0.5)
 except:
 raise
 else:
 break
 
 
 @_create_db.for_db("mysql")
 def _mysql_create_db(cfg, eng, ident):
 with eng.connect() as conn:
 try:
 _mysql_drop_db(cfg, conn, ident)
 except Exception:
 pass
 
 conn.execute("CREATE DATABASE %s CHARACTER SET utf8mb4" % ident)
 conn.execute(
 "CREATE DATABASE %s_test_schema CHARACTER SET utf8mb4" % ident
 )
 conn.execute(
 "CREATE DATABASE %s_test_schema_2 CHARACTER SET utf8mb4" % ident
 )
 
 
 @_configure_follower.for_db("mysql")
 def _mysql_configure_follower(config, ident):
 config.test_schema = "%s_test_schema" % ident
 config.test_schema_2 = "%s_test_schema_2" % ident
 
 
 @_create_db.for_db("sqlite")
 def _sqlite_create_db(cfg, eng, ident):
 pass
 
 
 @_drop_db.for_db("postgresql")
 def _pg_drop_db(cfg, eng, ident):
 with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
 conn.execute(
 text(
 "select pg_terminate_backend(pid) from pg_stat_activity "
 "where usename=current_user and pid != pg_backend_pid() "
 "and datname=:dname"
 ),
 dname=ident,
 )
 conn.execute("DROP DATABASE %s" % ident)
 
 
 @_drop_db.for_db("sqlite")
 def _sqlite_drop_db(cfg, eng, ident):
 if ident:
 os.remove("%s_test_schema.db" % ident)
 else:
 os.remove("%s.db" % ident)
 
 
 @_drop_db.for_db("mysql")
 def _mysql_drop_db(cfg, eng, ident):
 with eng.connect() as conn:
 conn.execute("DROP DATABASE %s_test_schema" % ident)
 conn.execute("DROP DATABASE %s_test_schema_2" % ident)
 conn.execute("DROP DATABASE %s" % ident)
 
 
 @_create_db.for_db("oracle")
 def _oracle_create_db(cfg, eng, ident):
 # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
 # similar, so that the default tablespace is not "system"; reflection will
 # fail otherwise
 with eng.connect() as conn:
 conn.execute("create user %s identified by xe" % ident)
 conn.execute("create user %s_ts1 identified by xe" % ident)
 conn.execute("create user %s_ts2 identified by xe" % ident)
 conn.execute("grant dba to %s" % (ident,))
 conn.execute("grant unlimited tablespace to %s" % ident)
 conn.execute("grant unlimited tablespace to %s_ts1" % ident)
 conn.execute("grant unlimited tablespace to %s_ts2" % ident)
 
 
 @_configure_follower.for_db("oracle")
 def _oracle_configure_follower(config, ident):
 config.test_schema = "%s_ts1" % ident
 config.test_schema_2 = "%s_ts2" % ident
 
 
 def _ora_drop_ignore(conn, dbname):
 try:
 conn.execute("drop user %s cascade" % dbname)
 log.info("Reaped db: %s", dbname)
 return True
 except exc.DatabaseError as err:
 log.warning("couldn't drop db: %s", err)
 return False
 
 
 @_drop_db.for_db("oracle")
 def _oracle_drop_db(cfg, eng, ident):
 with eng.connect() as conn:
 # cx_Oracle seems to occasionally leak open connections when a large
 # suite it run, even if we confirm we have zero references to
 # connection objects.
 # while there is a "kill session" command in Oracle,
 # it unfortunately does not release the connection sufficiently.
 _ora_drop_ignore(conn, ident)
 _ora_drop_ignore(conn, "%s_ts1" % ident)
 _ora_drop_ignore(conn, "%s_ts2" % ident)
 
 
 @_update_db_opts.for_db("oracle")
 def _oracle_update_db_opts(db_url, db_opts):
 pass
 
 
 def reap_dbs(idents_file):
 log.info("Reaping databases...")
 
 urls = collections.defaultdict(set)
 idents = collections.defaultdict(set)
 
 with open(idents_file) as file_:
 for line in file_:
 line = line.strip()
 db_name, db_url = line.split(" ")
 url_obj = sa_url.make_url(db_url)
 url_key = (url_obj.get_backend_name(), url_obj.host)
 urls[url_key].add(db_url)
 idents[url_key].add(db_name)
 
 for url_key in urls:
 backend = url_key[0]
 url = list(urls[url_key])[0]
 ident = idents[url_key]
 if backend == "oracle":
 _reap_oracle_dbs(url, ident)
 elif backend == "mssql":
 _reap_mssql_dbs(url, ident)
 
 
 def _reap_oracle_dbs(url, idents):
 log.info("db reaper connecting to %r", url)
 eng = create_engine(url)
 with eng.connect() as conn:
 
 log.info("identifiers in file: %s", ", ".join(idents))
 
 to_reap = conn.execute(
 "select u.username from all_users u where username "
 "like 'TEST_%' and not exists (select username "
 "from v$session where username=u.username)"
 )
 all_names = {username.lower() for (username,) in to_reap}
 to_drop = set()
 for name in all_names:
 if name.endswith("_ts1") or name.endswith("_ts2"):
 continue
 elif name in idents:
 to_drop.add(name)
 if "%s_ts1" % name in all_names:
 to_drop.add("%s_ts1" % name)
 if "%s_ts2" % name in all_names:
 to_drop.add("%s_ts2" % name)
 
 dropped = total = 0
 for total, username in enumerate(to_drop, 1):
 if _ora_drop_ignore(conn, username):
 dropped += 1
 log.info(
 "Dropped %d out of %d stale databases detected", dropped, total
 )
 
 
 @_follower_url_from_main.for_db("oracle")
 def _oracle_follower_url_from_main(url, ident):
 url = sa_url.make_url(url)
 url.username = ident
 url.password = "xe"
 return url
 
 
 @_create_db.for_db("mssql")
 def _mssql_create_db(cfg, eng, ident):
 with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
 conn.execute("create database %s" % ident)
 conn.execute(
 "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident
 )
 conn.execute(
 "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident
 )
 conn.execute("use %s" % ident)
 conn.execute("create schema test_schema")
 conn.execute("create schema test_schema_2")
 
 
 @_drop_db.for_db("mssql")
 def _mssql_drop_db(cfg, eng, ident):
 with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
 _mssql_drop_ignore(conn, ident)
 
 
 def _mssql_drop_ignore(conn, ident):
 try:
 # typically when this happens, we can't KILL the session anyway,
 # so let the cleanup process drop the DBs
 # for row in conn.execute(
 #     "select session_id from sys.dm_exec_sessions "
 #        "where database_id=db_id('%s')" % ident):
 #    log.info("killing SQL server sesssion %s", row['session_id'])
 #    conn.execute("kill %s" % row['session_id'])
 
 conn.execute("drop database %s" % ident)
 log.info("Reaped db: %s", ident)
 return True
 except exc.DatabaseError as err:
 log.warning("couldn't drop db: %s", err)
 return False
 
 
 def _reap_mssql_dbs(url, idents):
 log.info("db reaper connecting to %r", url)
 eng = create_engine(url)
 with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
 
 log.info("identifiers in file: %s", ", ".join(idents))
 
 to_reap = conn.execute(
 "select d.name from sys.databases as d where name "
 "like 'TEST_%' and not exists (select session_id "
 "from sys.dm_exec_sessions "
 "where database_id=d.database_id)"
 )
 all_names = {dbname.lower() for (dbname,) in to_reap}
 to_drop = set()
 for name in all_names:
 if name in idents:
 to_drop.add(name)
 
 dropped = total = 0
 for total, dbname in enumerate(to_drop, 1):
 if _mssql_drop_ignore(conn, dbname):
 dropped += 1
 log.info(
 "Dropped %d out of %d stale databases detected", dropped, total
 )
 
 |