| Viewing file:  env.py (7.58 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
#!coding: utf-8
 import os
 import shutil
 import textwrap
 
 from ..util.compat import u
 from ..script import Script, ScriptDirectory
 from .. import util
 from . import engines
 from . import provision
 
 
 def _get_staging_directory():
 if provision.FOLLOWER_IDENT:
 return "scratch_%s" % provision.FOLLOWER_IDENT
 else:
 return 'scratch'
 
 
 def staging_env(create=True, template="generic", sourceless=False):
 from alembic import command, script
 cfg = _testing_config()
 if create:
 path = os.path.join(_get_staging_directory(), 'scripts')
 if os.path.exists(path):
 shutil.rmtree(path)
 command.init(cfg, path, template=template)
 if sourceless:
 try:
 # do an import so that a .pyc/.pyo is generated.
 util.load_python_file(path, 'env.py')
 except AttributeError:
 # we don't have the migration context set up yet
 # so running the .env py throws this exception.
 # theoretically we could be using py_compiler here to
 # generate .pyc/.pyo without importing but not really
 # worth it.
 pass
 make_sourceless(os.path.join(path, "env.py"))
 
 sc = script.ScriptDirectory.from_config(cfg)
 return sc
 
 
 def clear_staging_env():
 shutil.rmtree(_get_staging_directory(), True)
 
 
 def script_file_fixture(txt):
 dir_ = os.path.join(_get_staging_directory(), 'scripts')
 path = os.path.join(dir_, "script.py.mako")
 with open(path, 'w') as f:
 f.write(txt)
 
 
 def env_file_fixture(txt):
 dir_ = os.path.join(_get_staging_directory(), 'scripts')
 txt = """
 from alembic import context
 
 config = context.config
 """ + txt
 
 path = os.path.join(dir_, "env.py")
 pyc_path = util.pyc_file_from_path(path)
 if os.access(pyc_path, os.F_OK):
 os.unlink(pyc_path)
 
 with open(path, 'w') as f:
 f.write(txt)
 
 
 def _sqlite_file_db(tempname="foo.db"):
 dir_ = os.path.join(_get_staging_directory(), 'scripts')
 url = "sqlite:///%s/%s" % (dir_, tempname)
 return engines.testing_engine(url=url)
 
 
 def _sqlite_testing_config(sourceless=False):
 dir_ = os.path.join(_get_staging_directory(), 'scripts')
 url = "sqlite:///%s/foo.db" % dir_
 
 return _write_config_file("""
 [alembic]
 script_location = %s
 sqlalchemy.url = %s
 sourceless = %s
 
 [loggers]
 keys = root
 
 [handlers]
 keys = console
 
 [logger_root]
 level = WARN
 handlers = console
 qualname =
 
 [handler_console]
 class = StreamHandler
 args = (sys.stderr,)
 level = NOTSET
 formatter = generic
 
 [formatters]
 keys = generic
 
 [formatter_generic]
 format = %%(levelname)-5.5s [%%(name)s] %%(message)s
 datefmt = %%H:%%M:%%S
 """ % (dir_, url, "true" if sourceless else "false"))
 
 
 
 
 def _multi_dir_testing_config(sourceless=False):
 dir_ = os.path.join(_get_staging_directory(), 'scripts')
 url = "sqlite:///%s/foo.db" % dir_
 
 return _write_config_file("""
 [alembic]
 script_location = %s
 sqlalchemy.url = %s
 sourceless = %s
 version_locations = %%(here)s/model1/ %%(here)s/model2/ %%(here)s/model3/
 
 [loggers]
 keys = root
 
 [handlers]
 keys = console
 
 [logger_root]
 level = WARN
 handlers = console
 qualname =
 
 [handler_console]
 class = StreamHandler
 args = (sys.stderr,)
 level = NOTSET
 formatter = generic
 
 [formatters]
 keys = generic
 
 [formatter_generic]
 format = %%(levelname)-5.5s [%%(name)s] %%(message)s
 datefmt = %%H:%%M:%%S
 """ % (dir_, url, "true" if sourceless else "false"))
 
 
 def _no_sql_testing_config(dialect="postgresql", directives=""):
 """use a postgresql url with no host so that
 connections guaranteed to fail"""
 dir_ = os.path.join(_get_staging_directory(), 'scripts')
 return _write_config_file("""
 [alembic]
 script_location = %s
 sqlalchemy.url = %s://
 %s
 
 [loggers]
 keys = root
 
 [handlers]
 keys = console
 
 [logger_root]
 level = WARN
 handlers = console
 qualname =
 
 [handler_console]
 class = StreamHandler
 args = (sys.stderr,)
 level = NOTSET
 formatter = generic
 
 [formatters]
 keys = generic
 
 [formatter_generic]
 format = %%(levelname)-5.5s [%%(name)s] %%(message)s
 datefmt = %%H:%%M:%%S
 
 """ % (dir_, dialect, directives))
 
 
 def _write_config_file(text):
 cfg = _testing_config()
 with open(cfg.config_file_name, 'w') as f:
 f.write(text)
 return cfg
 
 
 def _testing_config():
 from alembic.config import Config
 if not os.access(_get_staging_directory(), os.F_OK):
 os.mkdir(_get_staging_directory())
 return Config(os.path.join(_get_staging_directory(), 'test_alembic.ini'))
 
 
 def write_script(
 scriptdir, rev_id, content, encoding='ascii', sourceless=False):
 old = scriptdir.revision_map.get_revision(rev_id)
 path = old.path
 
 content = textwrap.dedent(content)
 if encoding:
 content = content.encode(encoding)
 with open(path, 'wb') as fp:
 fp.write(content)
 pyc_path = util.pyc_file_from_path(path)
 if os.access(pyc_path, os.F_OK):
 os.unlink(pyc_path)
 script = Script._from_path(scriptdir, path)
 old = scriptdir.revision_map.get_revision(script.revision)
 if old.down_revision != script.down_revision:
 raise Exception("Can't change down_revision "
 "on a refresh operation.")
 scriptdir.revision_map.add_revision(script, _replace=True)
 
 if sourceless:
 make_sourceless(path)
 
 
 def make_sourceless(path):
 # note that if -O is set, you'd see pyo files here,
 # the pyc util function looks at sys.flags.optimize to handle this
 pyc_path = util.pyc_file_from_path(path)
 assert os.access(pyc_path, os.F_OK)
 
 # look for a non-pep3147 path here.
 # if not present, need to copy from __pycache__
 simple_pyc_path = util.simple_pyc_file_from_path(path)
 
 if not os.access(simple_pyc_path, os.F_OK):
 shutil.copyfile(pyc_path, simple_pyc_path)
 os.unlink(path)
 
 
 def three_rev_fixture(cfg):
 a = util.rev_id()
 b = util.rev_id()
 c = util.rev_id()
 
 script = ScriptDirectory.from_config(cfg)
 script.generate_revision(a, "revision a", refresh=True)
 write_script(script, a, """\
 "Rev A"
 revision = '%s'
 down_revision = None
 
 from alembic import op
 
 def upgrade():
 op.execute("CREATE STEP 1")
 
 def downgrade():
 op.execute("DROP STEP 1")
 
 """ % a)
 
 script.generate_revision(b, "revision b", refresh=True)
 write_script(script, b, u("""# coding: utf-8
 "Rev B, méil"
 revision = '%s'
 down_revision = '%s'
 
 from alembic import op
 
 def upgrade():
 op.execute("CREATE STEP 2")
 
 def downgrade():
 op.execute("DROP STEP 2")
 
 """) % (b, a), encoding="utf-8")
 
 script.generate_revision(c, "revision c", refresh=True)
 write_script(script, c, """\
 "Rev C"
 revision = '%s'
 down_revision = '%s'
 
 from alembic import op
 
 def upgrade():
 op.execute("CREATE STEP 3")
 
 def downgrade():
 op.execute("DROP STEP 3")
 
 """ % (c, b))
 return a, b, c
 
 
 def _multidb_testing_config(engines):
 """alembic.ini fixture to work exactly with the 'multidb' template"""
 
 dir_ = os.path.join(_get_staging_directory(), 'scripts')
 
 databases = ", ".join(
 engines.keys()
 )
 engines = "\n\n".join(
 "[%s]\n"
 "sqlalchemy.url = %s" % (key, value.url)
 for key, value in engines.items()
 )
 
 return _write_config_file("""
 [alembic]
 script_location = %s
 sourceless = false
 
 databases = %s
 
 %s
 [loggers]
 keys = root
 
 [handlers]
 keys = console
 
 [logger_root]
 level = WARN
 handlers = console
 qualname =
 
 [handler_console]
 class = StreamHandler
 args = (sys.stderr,)
 level = NOTSET
 formatter = generic
 
 [formatters]
 keys = generic
 
 [formatter_generic]
 format = %%(levelname)-5.5s [%%(name)s] %%(message)s
 datefmt = %%H:%%M:%%S
 """ % (dir_, databases, engines)
 )
 
 |