Viewing file: utils.py (12.3 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging import os import shlex import subprocess
from collections import defaultdict from datetime import datetime, timedelta from functools import lru_cache from typing import Optional
from defence360agent.contracts.config import ( choose_value_from_config, MalwareScanScheduleInterval as Interval, ) from defence360agent.contracts.license import LicenseCLN from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.subsys.panels.plesk import Plesk from defence360agent.utils import async_lru_cache, atomic_rewrite
from imav.malwarelib.model import MalwareHit from imav.malwarelib.scan.queue_supervisor_sync import ( QueueSupervisorSync as ScanQueue, ) from imav.malwarelib.utils import user_list from imav.model.wordpress import WPSite
from imav.wordpress import ( WP_CLI_WRAPPER_PATH, )
from imav.wordpress.exception import PHPError
CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user" CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl"
logger = logging.getLogger(__name__)
@async_lru_cache(ttl=60) async def get_domain_paths() -> dict[str, list[str]]: """ Get a mapping of docroots to their associated domains, with caching. """ hosting_panel = HostingPanel() panel_paths = await hosting_panel.get_domain_paths() docroot_map = defaultdict(list) for domain, docroots in panel_paths.items(): for docroot in docroots: docroot_map[docroot].append(domain) return docroot_map
def wp_wrapper(php_path: str, docroot: str) -> list: """Get wp cli common command list""" return [str(WP_CLI_WRAPPER_PATH), php_path, docroot]
@lru_cache(maxsize=1) def get_cagefs_enabled_users() -> set: """Get the list of users enabled for CageFS.""" if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access( CAGEFS_CTL_PATH, os.X_OK ): return set()
result = subprocess.run( [CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True ) if result.returncode != 0: return set()
lines = result.stdout.strip().split("\n") return set(lines[1:]) # Skip the first line which is a summary
def clear_get_cagefs_enabled_users_cache(): """Clear the cache for get_cagefs_enabled_users.""" get_cagefs_enabled_users.cache_clear()
def build_command_for_user(username: str, args: list) -> list: """Build the necessary command to run the given cmdline args with specified user.""" if username in get_cagefs_enabled_users(): if os.path.isfile(CAGEFS_ENTER_PATH) and os.access( CAGEFS_ENTER_PATH, os.X_OK ): return [ CAGEFS_ENTER_PATH, "--no-io-and-memory-limit", username, *args, ]
return [ "su", "-s", "/bin/bash", username, "-c", shlex.join(args), ]
async def get_domains_for_docroot( docroot: str, domain_to_exclude: str ) -> list[str]: """ Get all domains associated with a given document root, excluding one domain. It's panel-agnostic and uses a cached mapping. """ docroot_map = await get_domain_paths() all_domains = docroot_map.get(docroot, []) return [domain for domain in all_domains if domain != domain_to_exclude]
async def get_php_binary_path(site: WPSite, username: str) -> Optional[str]: """Determine PHP binary path for the given WPSite.""" from clcommon.cpapi import ( get_domains_php_info, get_installed_php_versions, )
domains_php_info = get_domains_php_info() installed_php_versions = get_installed_php_versions()
def find_php_binary_for_domain(domain: str) -> Optional[str]: domain_info = domains_php_info.get(domain) if not domain_info or domain_info.get("username") != username: return None
php_display_version = domain_info.get("display_version") if not php_display_version: return None
for php_version in installed_php_versions: if php_version.get("identifier") == php_display_version: return php_version.get("bin") return None
# First, try with the main domain of the site. php_binary_path = find_php_binary_for_domain(site.domain) if php_binary_path: return php_binary_path
# If not found, try with other domains for the site's docroot. domains = await get_domains_for_docroot( site.docroot, domain_to_exclude=site.domain ) for domain in domains: php_binary_path = find_php_binary_for_domain(domain) if php_binary_path: return php_binary_path
raise PHPError( f"PHP binary was not identified for docroot: {site.docroot}, username:" f" {username}" )
def get_malware_history(username: str) -> list: """ Get malware history for the specified user.
This is an equivalent of calling `imunify360-agent malware history list --user {username}`. `` """ (max_count, hits) = MalwareHit.malicious_list(user=username) return hits
async def get_last_scan(sink, username: str) -> dict: """ Get the last scan for the specified user.
This is an equivalent of calling `imunify360-agent malware user list --user {username}`. """ queue = ScanQueue(sink) _, users = await user_list.fetch_user_list( queue.get_scans_from_paths, match={username} )
if not users: return {}
users = user_list.sort(users, "scan_date", desc=True) return users[0]
def calculate_next_scan_timestamp(interval, hour, day_of_month, day_of_week): """ Calculate the next scan timestamp based on schedule configuration.
Args: interval: Scan interval (DAY, WEEK, MONTH, or NONE) hour: Hour of day to run scan (0-23) day_of_month: Day of month to run scan (1-31) day_of_week: Day of week to run scan (0-6, where 0=Sunday)
Returns: Timestamp of next scan, or None if interval is NONE """
today = datetime.utcnow()
if interval == Interval.DAY: next_scan = today.replace( hour=hour, minute=0, second=0, microsecond=0, ) if today >= next_scan: next_scan += timedelta(days=1) return next_scan.timestamp()
if interval == Interval.WEEK: # today.weekday() returns 0 for Monday, 6 for Sunday, but day_of_week uses 0 for Sunday, # 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation. days_ahead = (day_of_week - (today.weekday() + 1) % 7 + 7) % 7 if days_ahead == 0 and today.hour >= hour: days_ahead = 7 next_scan_date = today + timedelta(days=days_ahead) return next_scan_date.replace( hour=hour, minute=0, second=0, microsecond=0 ).timestamp()
if interval == Interval.MONTH: from calendar import monthrange
def find_next_suitable_month(year, month, days): """Find the next month that has at least given number of days.""" current_year, current_month = year, month
# Always start with the next month when advancing current_month += 1 if current_month > 12: current_month = 1 current_year += 1
# Keep advancing months until we find one with enough days while True: days_in_month = monthrange(current_year, current_month)[1] if days <= days_in_month: return current_year, current_month
current_month += 1 if current_month > 12: current_month = 1 current_year += 1
# Check if we need to advance to next month should_advance_month = ( # Today is after the scheduled day, scan already ran this month today.day > day_of_month # Today is the scheduled day and the hour is after the scheduled hour, scan already ran earlier today or (today.day == day_of_month and today.hour >= hour) # Current month doesn't have enough days, scan should run next suitable month or day_of_month > monthrange(today.year, today.month)[1] )
if should_advance_month: # Find the next month that can accommodate the configured day next_year, next_month = find_next_suitable_month( today.year, today.month, day_of_month )
next_scan_date = today.replace( day=day_of_month, # Use the actual configured day month=next_month, year=next_year, hour=hour, minute=0, second=0, microsecond=0, ) else: # Current month can accommodate the configured day next_scan_date = today.replace( day=day_of_month, hour=hour, minute=0, second=0, microsecond=0, )
return next_scan_date.timestamp()
def prepare_scan_data( last_scan_time: float, next_scan_time: float, username: str, site: WPSite, malware_by_site: dict, ) -> dict: """ Prepare scan data JSON for a WordPress site.
Args: last_scan_time: Timestamp of the last scan next_scan_time: Timestamp of the next scheduled scan username: Username of the site owner site: WordPress site object malware_by_site: Dictionary mapping site docroots to their malware hits
Returns: dict: JSON data ready to be written to scan_data.php. The response includes: - lastScanTimestamp: Timestamp of the last scan - nextScanTimestamp: Timestamp of the next scheduled scan - username: Username of the site owner - malware: List of malware hits for the site - config: Configuration items for the site - license: License information including status and eligibility for Imunify patch """ # Define the config sections and options needed config_sections = [ ("MALWARE_SCANNING", "enable_scan_cpanel"), ("MALWARE_SCANNING", "default_action"), ("PROACTIVE_DEFENCE", "blamer"), ]
# Build the config items config_items = {} for section, option in config_sections: if section not in config_items: config_items[section] = {}
try: value, _ = choose_value_from_config( section, option, username=username, ) except KeyError: value = None config_items[section][option] = value
return { "lastScanTimestamp": last_scan_time, "nextScanTimestamp": next_scan_time, "username": username, "malware": malware_by_site.get(site.docroot, []), "config": config_items, "license": LicenseCLN.license_info(), }
def write_plugin_data_file_atomically( file_path, content: str, uid: int, gid: int ) -> None: """ Helper function to write a plugin data file atomically with optional touch.
Args: file_path: Path to the file to write content: Content to write to the file uid: User ID for file ownership gid: Group ID for file ownership """ if not file_path.exists(): file_path.touch()
# Set permissions based on hosting panel permissions = 0o440 if HostingPanel().NAME == Plesk.NAME else 0o400
atomic_rewrite( file_path, content, backup=False, uid=uid, gid=gid, permissions=permissions, )
|