Viewing file: apache_processor.py (11.65 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2024 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT
import logging import os import subprocess import shutil from typing import Dict, List, Optional, Any
class ApacheProcessor: """ Collects Apache configuration and module information. """
def __init__(self, logger: logging.Logger): # General Apache configuration directories (exclude user-specific subdirs) self.apache_config_locations = [ '/etc/apache2/conf/', '/etc/apache2/conf-available/', '/etc/apache2/conf.d/', # Will be filtered to exclude userdata subdirs '/etc/apache2/conf.modules.d/', ]
# User-specific configuration base paths (Apache versions 2.2 and 2.4) self.user_config_base_paths = [ '/etc/apache2/conf.d/userdata/ssl/2_4', # SSL configs for Apache 2.4 '/etc/apache2/conf.d/userdata/std/2_4', # Standard configs for Apache 2.4 '/etc/apache2/conf.d/userdata/ssl/2_2', # SSL configs for Apache 2.2 (legacy) '/etc/apache2/conf.d/userdata/std/2_2', # Standard configs for Apache 2.2 (legacy) ]
self.logger = logger
self._apache_binary_cache = None self._apache_version_cache = None self._apache_modules_cache = None
def _find_apache_binary(self) -> Optional[str]: """ Find Apache binary executable using which command with caching. """ if self._apache_binary_cache is not None: return self._apache_binary_cache
error = None try: for binary_name in ['httpd', 'apache2']: binary_path = shutil.which(binary_name) if binary_path and os.access(binary_path, os.X_OK): self.logger.debug("Found Apache binary: %s", binary_path) self._apache_binary_cache = binary_path return binary_path except Exception as e: error = e
self.logger.debug("Apache binary not found in PATH %s", error) self._apache_binary_cache = None return None
def _get_apache_version(self) -> str: """ Get Apache version information with caching. """ if self._apache_version_cache is not None: return self._apache_version_cache
apache_binary = self._find_apache_binary() if not apache_binary: self._apache_version_cache = "Apache binary not found" return self._apache_version_cache
try: result = subprocess.run( [apache_binary, '-v'], capture_output=True, text=True, timeout=10, check=False )
if result.returncode == 0: # Extract just the version line for cleaner metadata version_output = result.stdout.strip() if version_output: # Get first line which contains version info first_line = version_output.split('\n')[0] self._apache_version_cache = first_line else: self._apache_version_cache = "Version info empty" else: self._apache_version_cache = f"Error getting version: {result.stderr.strip()}"
except subprocess.TimeoutExpired: self._apache_version_cache = "Version check timed out" except Exception as e: self._apache_version_cache = f"Version check failed: {e}"
return self._apache_version_cache
def _get_apache_modules(self) -> List[str]: """ Get Apache modules using httpd -M command with caching.
Returns: List of module names (e.g., ['core_module', 'so_module', 'http_module', ...]) """ # Return cached modules if available if self._apache_modules_cache is not None: return self._apache_modules_cache
apache_binary = self._find_apache_binary() if not apache_binary: self.logger.debug("Apache binary not found, cannot get modules") self._apache_modules_cache = [] return self._apache_modules_cache
try: result = subprocess.run( [apache_binary, '-M'], capture_output=True, text=True, timeout=10, check=False )
if result.returncode == 0: modules = [] lines = result.stdout.split('\n')
# Skip first line 'Loaded Modules:' and process module lines for line in lines[1:]: if not line.strip(): continue
# Parse line like: "core_module (static)" or "so_module (shared)" try: module_name = line.strip().split(' ')[0] if module_name: modules.append(module_name) except (IndexError, AttributeError): continue # Skip malformed lines
self.logger.debug("Found %d Apache modules via %s -M", len(modules), apache_binary) self._apache_modules_cache = modules return modules
self.logger.debug("Apache modules command failed: %s", result.stderr.strip()) self._apache_modules_cache = [] return self._apache_modules_cache
except subprocess.TimeoutExpired: self.logger.error("[WEBSITE-COLLECTOR] Apache modules command timed out") self._apache_modules_cache = [] return self._apache_modules_cache except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Failed to get Apache modules: %s", e) self._apache_modules_cache = [] return self._apache_modules_cache
def get_apache_system_info(self) -> Dict[str, Any]: """ Get Apache system information for export as separate record.
Returns: Dict with system information to be exported as single record """ modules = self._get_apache_modules() version = self._get_apache_version()
system_info = { 'apache_version': version, 'loaded_modules': modules, }
return system_info
def _get_user_config_locations(self, usernames: List[str]) -> List[str]: """ Get Apache configuration directories for specific users only.
Args: usernames: List of usernames to get configurations for
Returns: List of directories containing user-specific Apache configurations """ user_locations = []
for username in usernames: for base_path in self.user_config_base_paths: user_path = os.path.join(base_path, username) if os.path.isdir(user_path): user_locations.append(user_path) self.logger.debug("Found user config directory: %s", user_path)
return user_locations
def collect_user_specific_config_paths(self, usernames: List[str]) -> Dict[str, Any]: """ Collect paths to user-specific Apache configuration files.
Args: usernames: List of usernames to collect configs for
Returns: Dict with collected config file paths as simple list """ config_data = { 'config_paths': [] }
if not usernames: return config_data
try: user_locations = self._get_user_config_locations(usernames) self.logger.debug("Scanning %d user-specific locations for %d users", len(user_locations), len(usernames))
for location in user_locations: if os.path.isdir(location): self._collect_config_paths_from_directory(location, config_data, exclude_user_dirs=False)
except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Error in collect_user_specific_config_paths: %s", e)
self.logger.debug("Found %d user-specific Apache config files", len(config_data['config_paths']))
return config_data
def collect_global_config_paths(self) -> Dict[str, Any]: """ Collect paths to general Apache configuration files (non-user-specific).
Returns: Dict with collected config file paths as simple list """ config_data = { 'config_paths': [] }
try: for location in self.apache_config_locations: if os.path.isdir(location): try: # For conf.d directory, exclude userdata subdirs to avoid collecting user-specific configs exclude_users = location.rstrip('/').endswith('/conf.d') self._collect_config_paths_from_directory(location, config_data, exclude_user_dirs=exclude_users) self.logger.debug("Collected config paths from %s (exclude_user_dirs=%s)", location, exclude_users) except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Error scanning config directory %s: %s", location, e)
self.logger.debug("Found %d general Apache config files", len(config_data['config_paths']))
except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Error in collect_global_config_paths: %s", e)
return config_data
def _collect_config_paths_from_directory(self, location: str, config_data: Dict[str, Any], exclude_user_dirs: bool = False) -> None: """ Helper method to collect configuration file paths from a specific directory.
Args: location: Directory path to scan for .conf files config_data: Dictionary with 'config_paths' list to store collected paths exclude_user_dirs: If True, skip userdata subdirectories to avoid collecting user-specific configs """ max_depth = 10 # Reasonable limit for Apache configs for root, dirs, files in os.walk(location): # Calculate current depth relative to base location depth = root[len(location):].count(os.sep) if depth >= max_depth: dirs[:] = [] # Don't descend further continue
# Exclude userdata directories when collecting general configs if exclude_user_dirs and 'userdata' in dirs: dirs.remove('userdata') self.logger.debug("Excluded userdata directory from general config collection in %s", root)
for filename in files: if filename.endswith('.conf'): file_path = os.path.join(root, filename) # Check if file exists (including symlinks) and is readable if (os.path.exists(file_path) and (os.path.isfile(file_path) or os.path.islink(file_path)) and os.access(file_path, os.R_OK)):
# Add to simple list - no grouping by directory needed config_data['config_paths'].append({ 'file_path': file_path, 'is_symlink': os.path.islink(file_path) }) else: self.logger.debug("Cannot access config file: %s", file_path)
|