Viewing file: store.py (25.99 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import functools import glob import json import os import pwd import re import time from collections import defaultdict import dataclasses from enum import Enum from logging import getLogger from typing import Any, Union
import peewee
import defence360agent.internals.logger from defence360agent.api import inactivity from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from defence360agent.model.simplification import run_in_executor from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.utils import Scope, nice_iterator from imav.contracts.messages import ( MalwareDatabaseCleanup, MalwareDatabaseScan, MalwareScan, ) from imav.contracts.plugins import ProcessOrder from defence360agent.contracts.hook_events import HookEvent from imav.malwarelib.config import ( CLEANUP, CLEANUP_ON_SCHEDULE, NOTIFY, MalwareEvent, MalwareEventPostponed, MalwareHitStatus, MalwareScanResourceType, MalwareScanType, ) from imav.malwarelib.model import ( MalwareHit, MalwareHitAlternate, VulnerabilityHit, ) from imav.malwarelib.model import ( MalwareScan as MalwareScanModel, ) from imav.malwarelib.plugins.detached_scan import ( MalwareScanMessageInfo, ) from imav.malwarelib.scan.mds.report import MalwareDatabaseHitInfo from imav.malwarelib.subsys.malware import ( HackerTrapHitsSaver, MalwareAction, MalwareActionIm360, ) from imav.malwarelib.model import MalwareIgnorePath
logger = getLogger(__name__)
class MalwareScanJSONEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, Enum): return o.value if isinstance(o, set): return list(o) if dataclasses.is_dataclass(o): return dataclasses.asdict(o) return super().default(o)
class StoreMalwareHits(MessageSink, MessageSource): PROCESSING_ORDER = ProcessOrder.STORE_SCAN SCOPE = Scope.AV malware_action = MalwareAction _loop, _sink = None, None
async def create_source(self, loop, sink): # type: ignore self._loop = loop self._sink = sink
async def create_sink(self, loop): pass
async def _call_scan_finished_hook(self, *, summary: dict): """Emit MalwareScanningFinished using summary dict, including perf stats.""" status = "failed" if summary.get("error") else "ok" stats_keys = ( "scan_time", "scan_time_hs", "scan_time_preg", "smart_time_hs", "smart_time_preg", "finder_time", "cas_time", "deobfuscate_time", "mem_peak", ) stats = {"total_files": summary.get("total_files")} for key in stats_keys: if key in summary: stats[key] = summary[key]
# Ensure path is a single string (not a list) for the Finished hook event_path = summary.get("path") if isinstance(event_path, list): event_path = event_path[0] if event_path else ""
finished_event = HookEvent.MalwareScanningFinished( scan_id=summary.get("scanid"), scan_type=summary.get("type"), path=event_path, started=summary.get("started"), total_files=summary.get("total_files"), total_malicious=summary.get("total_malicious", 0), error=summary.get("error"), status=status, stats=stats, scan_params=summary.get("scan_args") or {}, ) if self._sink: await self._sink.process_message(finished_event)
@expect(MessageType.MalwareScan, async_lock=False) async def process_hits(self, message): """MalwareScan is saved to DB when: 1. Detached scan started - message has no results 2. Any scan finished - message has summary and results Message without summary means that detached scan is finished and summary will arrive along with results in another message. """ if not message["summary"].get("path"): return with inactivity.track.task("store_scan"): await self._store_scan(message)
@expect(MessageType.MalwareScan) async def store_log(self, message): if message.get("results") is None: return
summary = message.get("summary", {}) scan_id = summary.get("scanid") if not scan_id: logger.error( "MalwareScan message received without a scanid: %s", message ) return
with defence360agent.internals.logger.openAibolitActionsLog( scan_id ) as logf: json.dump( dict(message), logf, indent=2, sort_keys=False, cls=MalwareScanJSONEncoder, )
@staticmethod def _store_hit(scanid, filename, status, resource_type, data): return MalwareHit.create( scanid=scanid, resource_type=resource_type, owner=data["owner"], user=data["user"], size=data["size"], hash=data["hash"], orig_file=filename, type=data["hits"][0]["matches"], timestamp=data["hits"][0]["timestamp"], status=status, malicious=not data["hits"][0]["suspicious"], )
@staticmethod def get_outdated_entries( path_obj: Union[str, list], scan_type: str | None = None, ): """ Return files that may already not be infected, yet we still consider them such.
For example, an infected file might have been removed manually. """ possibly_infected_statuses = [MalwareHitStatus.FOUND] paths = [path_obj] if isinstance(path_obj, str) else path_obj if scan_type == MalwareScanType.REALTIME: # to avoid duplicates (DEF-10404) yield from iter(paths) return for target_path in paths: for path in glob.iglob(target_path): path = os.path.realpath(path) if ( os.path.isfile(path) and MalwareHit.select() .where( (MalwareHit.orig_file == path) & (MalwareHit.status.in_(possibly_infected_statuses)) & ( MalwareHit.resource_type == MalwareScanResourceType.FILE.value ) ) .first() ): yield path else: scanned_dir = re.escape(path) + r"(/.*|\b)" yield from ( i.orig_file for i in MalwareHit.select().where( (MalwareHit.orig_file.regexp(scanned_dir)) & ( MalwareHit.status.in_( possibly_infected_statuses ) ) & ( MalwareHit.resource_type == MalwareScanResourceType.FILE.value ) ) )
async def _store_scan(self, message: MalwareScan) -> None: """Process scan message results.
message: MalwareScan message """ summary = message["summary"] if not summary["started"]: # Scan is queued/aborted. return
message_type = MalwareScanMessageInfo(message) if message_type.is_summary: if not ( MalwareScanModel.select() .where(MalwareScanModel.scanid == message["summary"]["scanid"]) .exists() ): scan = MalwareScanModel.create( **summary, resource_type=MalwareScanResourceType.FILE.value, initiator=message.initiator, ) scan.total_malicious = 0 scan.save() else: logger.warning( "Scan %s already in database", message["summary"]["scanid"] ) else: await self._store_scan_from_results(message)
@classmethod def _delete_outdated_entries(cls, summary: dict) -> None: file_patterns = summary.pop("file_patterns", None) exclude_patterns = summary.pop("exclude_patterns", None) if ( summary.get("error") is None and file_patterns is None and exclude_patterns is None ): outdated_entries = cls.get_outdated_entries( summary["path"], scan_type=summary["type"] ) MalwareHit.delete_hits(outdated_entries)
@staticmethod async def _process_default_action_results( hit_data, default_action_results ): pass
async def _store_scan_from_results(self, message: MalwareScan): summary = message["summary"] scan_id = summary["scanid"] scan, created = MalwareScanModel.get_or_create( scanid=scan_id, defaults={ **summary, "resource_type": MalwareScanResourceType.FILE.value, }, ) if not created: # Detached scan only (second message). # Update completed time if scan already exists. scan.completed = summary["completed"]
# get('path') indicates that this is the second message, # even if they are out of order if message["results"] is not None and summary.get("path") is not None: self._delete_outdated_entries(summary)
# vulnerabilities processed in a separate plugin results = { file: data for file, data in message["results"].items() if not VulnerabilityHit.match(data["hits"][0]["matches"]) } hits = { hit.orig_file: hit for hit in MalwareHit.get_hits(files=list(results)) } postponed_hits = defaultdict(list) # type: dict total_malicious = 0
def _hit_status_race_detected(hit: MalwareHit, detected_timestamp): return ( hit.status == MalwareHitStatus.CLEANUP_STARTED or hit.status in ( MalwareHitStatus.CLEANUP_DONE, MalwareHitStatus.CLEANUP_REMOVED, ) and hit.cleaned_at > detected_timestamp )
# ignore hits are already processed by another scan # to avoid send its to CH multiple times async for file in nice_iterator(tuple(results.keys())): if file in hits and _hit_status_race_detected( hits[file], results[file]["hits"][0]["timestamp"] ): results.pop(file, None) # ignore it in further processing of the message message["results"].pop(file, None)
# filter out ignored paths before applying default actions async for file in nice_iterator(tuple(results.keys())): try: if await MalwareIgnorePath.is_path_ignored(file): results.pop(file, None) message["results"].pop(file, None) except Exception as exc: # be conservative: if ignore check fails, do not drop the hit logger.exception( "Ignore check failed for file %s: %s; keeping hit", file, exc, )
malicious_hits = [ MalwareHitAlternate.create(scan.scanid, file, data) for file, data in results.items() if not data["hits"][0]["suspicious"] ]
action_results = await self.malware_action.apply_default_action( hits=malicious_hits, initiator=message.get("initiator"), cause=summary["type"], sink=self._sink, )
apply_dict = {} for hit_info, event, action, try_restore in action_results: apply_dict[hit_info.orig_file] = (event, action, try_restore)
for file, data in results.items(): # do not store suspicious hits if data["hits"][0]["suspicious"]: continue status = MalwareHitStatus.FOUND result = None if file in apply_dict: ( result, default_action, try_restore, ) = apply_dict[file]
# sent to CH if ( isinstance(result, MalwareEventPostponed) and result.action == CLEANUP_ON_SCHEDULE ): # report to CH only well-known `cleanup` / `notify` actions default_action = ( CLEANUP if summary["type"] == MalwareScanType.BACKGROUND else NOTIFY ) data["default_action"] = default_action data["try_restore"] = try_restore
total_malicious += 1
if isinstance(result, MalwareEvent): if result.malware_eliminated: continue
hit = await run_in_executor( self._loop, functools.partial( self._store_hit, scan.scanid, file, status, MalwareScanResourceType.FILE.value, data, ), )
if isinstance(result, MalwareEventPostponed): key = ( result.message, ( result.cause, result.initiator, result.post_action, result.action, ), ) postponed_hits[key].append(hit)
scan.total_malicious = total_malicious scan.total_resources = summary["total_files"] scan.timestamp = int(time.time()) if error := summary.get("error"): scan.error = error scan.save()
# Emit MalwareScanningFinished after persistence completes. try: summary["total_malicious"] = scan.total_malicious await self._call_scan_finished_hook(summary=summary) except Exception: logger.exception( "Failed to emit MalwareScanningFinished after store (DB," " no hits)" )
if self._sink: for ( (msg_cls, (cause, initiator, post_action, action)), hits, ) in postponed_hits.items(): if ( action == CLEANUP_ON_SCHEDULE and summary["type"] != MalwareScanType.BACKGROUND ): logger.info( "Skipping auto-cleanup because it's allowed for " "scheduled scans only" ) else: await self._sink.process_message( msg_cls( hits=hits, scan_id=scan_id, cause=cause, initiator=initiator, post_action=post_action, ) )
await self._process_default_action_results( results, {hit.orig_file: event for hit, event, _, _ in action_results}, )
class StoreMalwareHitsIm360(StoreMalwareHits): SCOPE = Scope.IM360 malware_action = MalwareActionIm360
async def create_sink(self, loop): await super().create_sink(loop) await HackerTrapHitsSaver.init()
@staticmethod async def _process_default_action_results( hit_data, default_action_results ): """Do additional processing for malicious files"""
hacker_trap_hits = [] hacker_trap_sa_hits = []
for path, data in hit_data.items(): result = default_action_results.get(path) if not isinstance(result, MalwareEvent): continue if result.malware_eliminated: hacker_trap_hits.append(path)
if any( HackerTrapHitsSaver.STANDALONE_MARK in hit["matches"] for hit in data["hits"] ): hacker_trap_sa_hits.append(path)
await HackerTrapHitsSaver.add_hits(hacker_trap_hits) await HackerTrapHitsSaver.update_sa_hits(hacker_trap_sa_hits, [])
@expect(MessageType.MalwareDatabaseScan) async def store_db_scan(self, message: MalwareDatabaseScan) -> None: if not message.started or message.type is None: # Scan is queued/aborted or stopped while AVD is not finished yet return
try: scan = MalwareScanModel.create( scanid=message.scan_id, started=message.started, completed=message.completed, type=message.type, path=message.path, error=message.error, total_resources=message.total_resources, total_malicious=message.total_malicious, resource_type=MalwareScanResourceType.DB.value, initiator=message.initiator, ) except peewee.IntegrityError: scan = MalwareScanModel.get(scanid=message.scan_id) if ( not message.completed or message.error or (scan.completed and not scan.error) or not scan.completed or message.path != scan.path or message.type != scan.type or scan.resource_type != MalwareScanResourceType.DB.value ): logger.error( "The scan %s has already been saved: type=%s, path=%s," " completed=%s", scan.scanid, scan.resource_type, scan.path, scan.completed, ) return
# Update scan with latest data from message scan.started = message.started scan.completed = message.completed scan.error = message.error scan.total_resources = message.total_resources scan.total_malicious = message.total_malicious scan.resource_type = MalwareScanResourceType.DB.value scan.initiator = message.initiator scan.save() logger.info( f"Updated scan {scan.scanid} with new data from message" )
scan_id = message.get("scan_id") if scan_id: with defence360agent.internals.logger.openMdsActionsLog( scan_id ) as logf: json.dump( dict(message), logf, indent=2, sort_keys=False, cls=MalwareScanJSONEncoder, )
if not message.hits: # no malware found if not message.error: # remove outdated entries as rescan did not find anything MalwareHit.delete().where( (MalwareHit.orig_file == message.path) & ( MalwareHit.resource_type == MalwareScanResourceType.DB.value ) & (MalwareHit.status == MalwareHitStatus.FOUND) ).execute() # Emit MalwareScanningFinished for DB scans without hits try: summary = { "scanid": message.scan_id, "type": message.type, "path": message.path, "started": message.started, "completed": message.completed, "total_files": message.total_resources, "total_malicious": message.total_malicious or 0, "error": message.error, "scan_args": {}, } await self._call_scan_finished_hook(summary=summary) except Exception: logger.exception( "Failed to emit MalwareScanningFinished after store (DB," " no hits)" ) return
# FIXME: remove this mapping # when we start to store UID instead of username in the db panel_users = set(await HostingPanel().get_users()) uid_to_name = { pw.pw_uid: pw.pw_name for pw in pwd.getpwall() if pw.pw_name in panel_users } self._delete_outdated_db_entries(message.hits)
# apply default action to all hits (to store them in history table) action_results = await self.malware_action.apply_default_action( hits=message.hits, initiator=message.get("initiator"), cause=message.get("type"), sink=self._sink, resource_type=MalwareScanResourceType.DB.value, )
apply_dict = {} for hit, event, action, _ in action_results: apply_dict[hit.path] = (event, action)
postponed_hits = defaultdict(list) # type: dict
unique_hits_info = MalwareDatabaseHitInfo.get_hits_per_db(message.hits) for hit_info in unique_hits_info: result = None if hit_info.path in apply_dict: ( result, default_action, ) = apply_dict[hit_info.path]
# FIXME: DEF-18112 add default_action to hit and send to CH
if isinstance(result, MalwareEvent): if result.malware_eliminated: continue
new_hit: MalwareHit = MalwareHit.create( scanid=scan, owner=uid_to_name.get(hit_info.owner, hit_info.owner), user=uid_to_name.get(hit_info.user, hit_info.user), orig_file=hit_info.path, type=hit_info.signature, malicious=True, hash=None, size=None, timestame=None, status=MalwareHitStatus.FOUND, cleaned_at=None, resource_type=MalwareScanResourceType.DB.value, app_name=hit_info.app_name, db_host=hit_info.db_host, db_port=hit_info.db_port, db_name=hit_info.db_name, snippet=hit_info.snippet, ) if isinstance(result, MalwareEventPostponed): key = ( result.message, (result.cause, result.initiator, result.post_action), ) postponed_hits[key].append(new_hit)
if self._sink: # Emit MalwareScanningFinished for DB scans with hits before posting postponed events try: summary = { "scanid": message.scan_id, "type": message.type, "path": message.path, "started": message.started, "completed": message.completed, "total_files": message.total_resources, "total_malicious": message.total_malicious or 0, "error": message.error, "scan_args": {}, } await self._call_scan_finished_hook(summary=summary) except Exception: logger.exception( "Failed to emit MalwareScanningFinished after store (DB," " hits)" )
for ( (msg_cls, (cause, initiator, post_action)), hits, ) in postponed_hits.items(): await self._sink.process_message( msg_cls( hits=hits, scan_id=message.scan_id, cause=cause, initiator=initiator, post_action=post_action, ) )
@staticmethod def _delete_outdated_db_entries(hits): orig_files = [hit.path for hit in hits] MalwareHit.delete_hits(orig_files)
@expect(MessageType.MalwareDatabaseCleanup) async def store_db_cleanup_log(self, message: MalwareDatabaseCleanup): scan_id = message.get("scan_id") if not scan_id: logger.error( "MalwareDatabaseCleanup message received without a" " scan_id: %s", message, ) return
with defence360agent.internals.logger.openMdsActionsLog( scan_id ) as logf: json.dump( dict(message), logf, indent=2, sort_keys=False, cls=MalwareScanJSONEncoder, )
|