| Viewing file:  server_id.py (5.14 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 from uuid import uuid4
 
 from clcommon.clconfpars import change_settings, load
 from lvestats.lib import config, lveinfolib
 from lvestats.orm.history import history, history_x60
 from lvestats.orm.history_gov import history_gov
 from lvestats.orm.incident import incident
 from lvestats.orm.servers import servers
 from lvestats.orm.user import user
 
 
 class UpdateOrCreateServerID(object):
 """Change old-style server ID 'localhost' to auto-generated uuid
 User can set his own server ID
 """
 
 restricted_server_id_list = ('localhost',)
 server_id_length = 10
 
 def __init__(self, engine, config_path=None):
 """
 :type engine: sqlalchemy.engine.base.Engine
 :type config_path: Union[str, None]
 """
 self.engine = engine
 self.config_path = config_path or config.GLOBAL_CONFIG_LOCATION
 
 def _get_current_server_id_cfg(self):
 """
 :rtype: Union[None, str]
 """
 try:
 cfg = load(path=self.config_path)
 return cfg.get('server_id')
 except Exception as e:
 print(f'Error: {e}')
 return None
 
 def _set_server_id_cfg(self, server_id):
 """Sets option 'server_id' in config file
 :type server_id: str
 :rtype: Union[None, str]
 """
 try:
 change_settings({'server_id': server_id}, self.config_path)
 return server_id
 except Exception as e:
 print(f'Error: {e}')
 return None
 
 def _update_server_id_table(self, conn, table, current_server_id, new_server_id):
 """
 updates server_id in one of the tables
 :type conn: sqlalchemy.engine.base.Connection
 :type table: Union[history, history_gov, incident, user, history_x60]
 :type current_server_id: str
 :type new_server_id: str
 """
 sql_update = table.__table__.update().where(
 table.server_id == current_server_id
 )
 conn.execute(sql_update.values(
 {'server_id': new_server_id}
 ))
 
 def _update_server_id_db(self, current_server_id, new_server_id):
 """
 updates server_id for all tables in the database
 :type current_server_id: str
 :type new_server_id: str
 :rtype: Union[None, str]
 """
 sql_tables = [history, history_gov, incident, user, history_x60]
 lve_version = lveinfolib.get_lve_version(self.engine, current_server_id)
 conn = self.engine.connect()
 tx = conn.begin()
 try:
 for table in sql_tables:
 self._update_server_id_table(conn, table, current_server_id, new_server_id)
 
 # update servers table, where server_id is a primary key
 conn.execute(servers.__table__.insert().values(
 {'server_id': new_server_id, "lve_version": lve_version}
 ))
 conn.execute(servers.__table__.delete().where(servers.server_id == current_server_id))
 except Exception as e:
 tx.rollback()
 conn.close()
 print(f'Error: {e}')
 return None
 else:
 tx.commit()
 conn.close()
 return new_server_id
 
 def _resolve_localhost(self, prompt=True, current_server_id=None):
 """
 :type current_server_id: Union[None, str]
 :type prompt: bool
 :rtype: Union[None, str]
 """
 server_id = str(uuid4())[:self.server_id_length]
 if prompt:
 res = input(f"Enter new server ID ['{server_id}']: ")
 print(res)
 if res:
 server_id = res.strip()
 if server_id in self.restricted_server_id_list:
 print(f"Server ID cannot be '{server_id}'")
 return None
 if server_id == current_server_id:
 print(f"Server ID is already set to '{server_id}'")
 return server_id
 if len(server_id) > 255:
 print("Server ID length should be less than 255")
 return None
 cur_server_id = self._get_current_server_id_cfg()
 if cur_server_id is not None and not prompt and \
 cur_server_id not in self.restricted_server_id_list:
 return cur_server_id
 if self._update_server_id_db(current_server_id or cur_server_id or 'localhost', server_id):
 return self._set_server_id_cfg(server_id)
 return server_id
 
 def prompt(self):
 """
 :rtype: Union[None, str]
 """
 cur_cfg_server_id = self._get_current_server_id_cfg()
 # if server ID = 'localhost'
 if isinstance(cur_cfg_server_id, str) and \
 cur_cfg_server_id.strip() in self.restricted_server_id_list:
 print(f"Server ID cannot be '{cur_cfg_server_id}'")
 return self._resolve_localhost(current_server_id=cur_cfg_server_id)
 
 def auto(self):
 """
 :rtype: Union[None, str]
 """
 return self._resolve_localhost(prompt=False)
 
 
 
 |