Viewing file: cli.py (10.73 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging import pwd import re from pathlib import Path
from distutils.version import StrictVersion from defence360agent.utils import ( check_run, CheckRunError, async_lru_cache, ) from imav.wordpress import PLUGIN_PATH, PLUGIN_SLUG from imav.wordpress.utils import ( build_command_for_user, get_php_binary_path, wp_wrapper, ) from defence360agent.sentry import log_message from imav.model.wordpress import WPSite
logger = logging.getLogger(__name__)
def _validate_semver(version_str: str) -> bool: """Validate if a string is a valid semantic version.""" # Handle None and non-string inputs if not isinstance(version_str, str): return False
# Trim the string and return False if empty trimmed_str = version_str.strip() if not trimmed_str: return False
try: StrictVersion(trimmed_str) return True except ValueError: return False
def _extract_version_from_output(output: str) -> str: """ Extract version from WP CLI output, trying both first and last parts.
Args: output: The raw output from WP CLI
Returns: The extracted version string or None if no valid version found """ if not output: return None
# Split the output into parts parts = output.split() if not parts: return None
# Try the first part if len(parts) > 0: first_part = parts[0].strip() if _validate_semver(first_part): return first_part
# Try the last part if len(parts) > 1: last_part = parts[-1].strip() if _validate_semver(last_part): return last_part
# If neither first nor last part is valid semver, log to sentry log_message( "Failed to extract valid semver version from WP CLI output. Output:" " '{output}'", format_args={"output": output}, level="warning", component="wordpress", fingerprint="wp-plugin-version-extraction-failed", )
# Return None when no valid semver is found return None
async def _parse_version_from_plugin_file(site: WPSite) -> str: """ Parse the version of imunify-security plugin by reading the main plugin file.
Args: site: WordPress site object containing docroot path
Returns: str: Plugin version or None if not found or invalid """ content_dir = await get_content_dir(site) plugin_file = ( content_dir / "plugins" / "imunify-security" / "imunify-security.php" )
if not plugin_file.exists(): return None
version_pattern = re.compile(r"\* Version:\s*([0-9.]+)")
try: with open(plugin_file) as f: for line in f: match = version_pattern.search(line) if match: version = match.group(1) if _validate_semver(version): return version else: return None except Exception as e: logger.error( "Failed to read plugin file to determine version number %s: %s", plugin_file, e, ) return None
return None
async def plugin_install(site: WPSite): """Install the Imunify Security WordPress plugin on given WordPress site.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username)
args = [ *wp_wrapper(php_path, site.docroot), "plugin", "install", str(PLUGIN_PATH), "--activate", "--force", ]
command = build_command_for_user(username, args)
logger.info(f"Installing wp plugin {command}")
await check_run(command)
async def plugin_update(site: WPSite): """ Update the Imunify Security WordPress plugin on given WordPress site.
Currently, this is the same as install, but in the future it may differ. """ username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username)
args = [ *wp_wrapper(php_path, site.docroot), "plugin", "install", str(PLUGIN_PATH), "--activate", "--force", ]
command = build_command_for_user(username, args)
logger.info(f"Updating wp plugin {command}")
await check_run(command)
async def plugin_uninstall(site: WPSite): """Uninstall the imunify-security wp plugin from given wp site.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username)
args = [ *wp_wrapper(php_path, site.docroot), "plugin", "uninstall", PLUGIN_SLUG, "--deactivate", ]
command = build_command_for_user(username, args)
logger.info(f"Uninstalling wp plugin {command}")
await check_run(command)
async def _get_plugin_version(site: WPSite): """ Get the version of the imunify-security wp plugin installed on given WordPress site.
Uses WP CLI to get the version. """ username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "get", PLUGIN_SLUG, "--field=version", ] command = build_command_for_user(username, args)
logger.info(f"Getting wp plugin version {command}")
try: result = await check_run(command) output = result.decode("utf-8").strip() return _extract_version_from_output(output) except CheckRunError as e: logger.error( "Failed to get wp plugin version. Return code: %s", e.returncode, ) return None except UnicodeDecodeError as e: logger.error("Failed to decode wp plugin version output: %s", e) return None
async def get_plugin_version(site: WPSite): """ Get the version of the imunify-security wp plugin installed on given WordPress site.
First tries to parse the version from the plugin file, then falls back to WP CLI. """ # First try to parse version from plugin file try: version = await _parse_version_from_plugin_file(site) if version: return version except Exception as e: logger.warning("Failed to parse version from plugin file: %s", e)
# Fall back to WP CLI if file parsing fails logger.info("Plugin version not found in file, trying WP CLI") return await _get_plugin_version(site)
async def is_plugin_installed(site: WPSite): """Check if the imunify-security wp plugin is installed on given WordPress site.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "is-installed", PLUGIN_SLUG, ] command = build_command_for_user(username, args)
logger.info(f"Checking if wp plugin is installed {command}")
try: await check_run(command) except CheckRunError: # exit code other than 0 means plugin is not installed or there is an error return False
# exit code 0 means plugin is installed return True
async def is_wordpress_installed(site: WPSite): """Check if WordPress is installed and given site is accessible using WP CLI.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "core", "is-installed", ] command = build_command_for_user(username, args)
logger.info(f"Checking if WordPress is installed {command}")
try: # exit code other than 0 means WordPress is not installed or there is an error await check_run(command) except CheckRunError: return False
# exit code 0 means WordPress is installed return True
async def _get_content_directory(site: WPSite): """ Get the content directory of the WordPress site using WP CLI.
This should only be used if the default wp-content directory does not exist. """ username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "eval", "echo WP_CONTENT_DIR;", ] command = build_command_for_user(username, args)
logger.info(f"Getting content directory {command}")
try: result = await check_run(command) return result.decode("utf-8").strip() except CheckRunError as e: logger.error( "Failed to get content directory. Return code: %s", e.returncode, ) return None except UnicodeDecodeError as e: logger.error("Failed to decode content directory output: %s", e) return None
@async_lru_cache(maxsize=100) async def get_content_dir(site: WPSite): """ Get the WordPress content directory for the given WordPress site.
This function first checks if the default wp-content directory exists at the site's docroot. If the default path doesn't exist or isn't a directory, it attempts to get the actual content directory using WordPress CLI's WP_CONTENT_DIR constant.
Returns: Path: The WordPress content directory path """ content_dir = Path(site.docroot) / "wp-content"
# First check if content_dir exists and is a folder if not content_dir.exists() or not content_dir.is_dir(): # If not, try to get the content directory using WP CLI wp_content_dir = await _get_content_directory(site) if wp_content_dir: content_dir = Path(wp_content_dir)
return content_dir
def clear_get_content_dir_cache(): """Clear the async LRU cache for get_content_dir.""" get_content_dir.cache_clear()
async def get_data_dir(site: WPSite): """ Get the Imunify Security data directory for the given WordPress site. """ content_dir = await get_content_dir(site) if not content_dir: content_dir = Path(site.docroot) / "wp-content"
return Path(content_dir) / "imunify-security"
|