Viewing file: schedule_watcher.py (8.41 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ from defence360agent.subsys.persistent_state import register_lock_file from defence360agent.utils.common import HOUR from imav.malwarelib.config import ( MalwareScanResourceType, MalwareScanType, ) from imav.malwarelib.utils.crontab import CronTab import json from datetime import datetime, timedelta from logging import getLogger from pathlib import Path
from defence360agent.utils import recurring_check from defence360agent.utils.check_lock import check_lock from defence360agent.contracts.config import ( ANTIVIRUS_MODE, MalwareScanSchedule, SystemConfig, UserConfig, effective_user_config, ) from defence360agent.contracts.config import ( MalwareScanScheduleInterval as Interval, ) from imav.malwarelib.utils import user_list, reset_malware_schedule from defence360agent.contracts.license import LicenseCLN from defence360agent.contracts.plugins import ( MessageSink, MessageSource, ) from imav.contracts.messages import MalwareScanQueuePut
logger = getLogger(__file__)
AVAILABLE_INTERVALS = [ Interval.NONE, Interval.DAY, Interval.WEEK, Interval.MONTH, ]
AVP_INTERVALS = [ Interval.NONE, Interval.MONTH, ]
NEVER_SCHEDULE = "0 0 31 2 0"
_DEFAULT_LAST_CHECK_FILE = Path("/var/imunify360/last_check_dttm.json") _DEFAULT_RECURRING_CHECK_INTERVAL = HOUR / 2 # seconds _DEFAULT_LOCK_FILE_NAME = "schedule_watcher"
# maker created during imav-deploy.sh if AV+ Revisium license # to prevent SCANNING_SCHEDULE params being reset due to absense of CLN issued license # it is removed right after imav-deploy.sh is done REVISIUM_PREMIUM_MARKER = Path("/var/imunify360/premium_revisium_license.flag")
def allowed_schedule_interval(): valid_avp = LicenseCLN.is_valid_av_plus() revisium_license_exists = REVISIUM_PREMIUM_MARKER.exists() condition = (not ANTIVIRUS_MODE) or valid_avp or revisium_license_exists return AVAILABLE_INTERVALS if condition else AVP_INTERVALS
def get_user_schedule_config( user: str, admin_config: SystemConfig ) -> tuple[str, str, str, str]: """ Get schedule configuration for a given user.
Returns a tuple of (interval, hour, day_of_month, day_of_week). Falls back to system defaults if user config is incomplete or missing.
Args: user: Username to get schedule configuration for admin_config: System configuration object
Returns: Tuple of (interval, hour, day_of_month, day_of_week) """ eff = effective_user_config(admin_config, UserConfig(username=user)) schedule_cfg = eff.get("MALWARE_SCAN_SCHEDULE", {})
interval = schedule_cfg.get("interval", MalwareScanSchedule.INTERVAL) hour = schedule_cfg.get("hour", MalwareScanSchedule.HOUR) day_of_month = schedule_cfg.get( "day_of_month", MalwareScanSchedule.DAY_OF_MONTH ) day_of_week = schedule_cfg.get( "day_of_week", MalwareScanSchedule.DAY_OF_WEEK )
return interval, hour, day_of_month, day_of_week
class ScheduleWatcher(MessageSink, MessageSource): def __init__( self, check_file: Path = _DEFAULT_LAST_CHECK_FILE, check_interval: float = 0, lock_file: str = _DEFAULT_LOCK_FILE_NAME, ): self._check_file = check_file self._check_interval = ( check_interval or _DEFAULT_RECURRING_CHECK_INTERVAL ) self._lock_file = register_lock_file(lock_file, self.SCOPE)
async def create_sink(self, loop): pass
async def create_source(self, loop, sink): self._loop = loop self._sink = sink self._task = loop.create_task( recurring_check( check_lock, check_period_first=True, check_lock_period=self._check_interval, lock_file=self._lock_file, )(self.schedule_scan)() )
async def shutdown(self): self._task.cancel() # CancelledError is handled by @recurring_check(): await self._task
def create_schedule( self, interval: str, hour: str | None = None, day_of_month: str | None = None, day_of_week: str | None = None, ) -> str: if interval == Interval.NONE: return NEVER_SCHEDULE elif interval not in AVAILABLE_INTERVALS: logger.error("Unsupported interval value: %s", interval) return NEVER_SCHEDULE elif interval not in (intervals := allowed_schedule_interval()): logger.info( "Malware schedule is not in allowed intervals: schedule=%s," " allowed=%s", interval, intervals, ) return NEVER_SCHEDULE
hour = hour or "0" day_of_month = day_of_month or "1" day_of_week = day_of_week or "0" if interval == Interval.DAY: cron_args = hour, "*", "*" elif interval == Interval.WEEK: cron_args = hour, "*", day_of_week else: # interval == Interval.MONTH: cron_args = hour, day_of_month, "*"
return "0 {} {} * {}".format(*cron_args)
def _read_last_check_dttm(self) -> datetime: try: return datetime.fromisoformat( json.loads(self._check_file.read_text()) ) except FileNotFoundError: return datetime.now() - timedelta(seconds=self._check_interval)
def _write_last_check_dttm(self, dttm: datetime) -> None: self._check_file.write_text(json.dumps(dttm.isoformat()))
@staticmethod def _is_it_time( schedule: str, now: datetime, last_check: datetime ) -> bool: if schedule == NEVER_SCHEDULE: return False next_run_dttm: float | None = CronTab(schedule).next( last_check, delta=False, default_utc=False ) return next_run_dttm is not None and next_run_dttm < now.timestamp()
async def schedule_scan(self) -> None: last_check = self._read_last_check_dttm() now = datetime.now() await self._schedule_scan(now, last_check) self._write_last_check_dttm(now)
async def _schedule_scan( self, now: datetime, last_check: datetime ) -> None: if MalwareScanSchedule.INTERVAL not in allowed_schedule_interval(): logger.info("Malware schedule interval is being reset to defaults") reset_malware_schedule()
users = await user_list.panel_users() admin_config = SystemConfig()
to_scan: list[str] = []
# Build per-user effective config the same way as "config show" does. for u in users: ( interval, hour, day_of_month, day_of_week, ) = get_user_schedule_config(u["user"], admin_config)
schedule = self.create_schedule( interval, hour, day_of_month, day_of_week )
if self._is_it_time(schedule, now, last_check): to_scan.append(u["home"])
if to_scan: logger.info( "Trigger scheduled background malware scan for paths: %s", ", ".join(to_scan), ) await self.trigger_malware_scan(to_scan) else: logger.info( "No paths to scan in scheduled background malware scan." )
async def trigger_malware_scan(self, paths: list[str]) -> None: if not paths: return await self._sink.process_message( MalwareScanQueuePut( paths=paths, scan_args={ "resource_type": MalwareScanResourceType.FILE, "scan_type": MalwareScanType.BACKGROUND, }, ) )
|