PATH:
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
plugins
"""Services manager plugin. It enables/disables various service based on an imunify360 config change. """ import logging import shutil from pathlib import Path from random import randint from tempfile import NamedTemporaryFile from defence360agent import utils from defence360agent.contracts import plugins from defence360agent.subsys import svcctl from defence360agent.subsys.persistent_state import load_state, save_state from imav.plugins.service_manager import ServiceManager as BaseServiceManager from im360.contracts import config, messages as im360_messages from im360.subsys import csf __all__ = ["ServiceManager"] logger = logging.getLogger(__name__) UAL_CRON_TEMPLATE_PATH = Path( "/opt/imunify360/venv/share/imunify360/imunify360-ual.cron.template" ) UAL_OLD_CRON_PATH = Path("/etc/cron.d/imunify360-ual.cron") UAL_CRON_PATH = Path("/etc/cron.d/imunify360-ual") class ServiceManager(BaseServiceManager): """Service manager plugin: stop/start services based on config changes.""" SCOPE = utils.Scope.IM360 AUDITD_SHOULD_BE_RUNNING = config.FromConfig("LOGGER", "syscall_monitor") def __init__(self, *, unitctl=None): super().__init__(unitctl=unitctl) self._services.extend( [ self._ensure_consistent_dos_protector_state, self._ensure_consistent_ual_state, self._ensure_consistend_auditd_state, self._ensure_consistent_scanlogd_state, ] ) self._units.update( { "dos_protector": unitctl or svcctl.imunify360_dos_protector_service(), "ual": unitctl or svcctl.imunify360_ual_service(), "auditd": unitctl or svcctl.imunify360_auditd_service(), "scanlogd": unitctl or svcctl.imunify360_scanlogd_service(), } ) self._configs = load_state("service_manager") if not self._configs: self._configs = {"dos_protector": {}} async def _ensure_consistent_dos_protector_state(self): unitctl = self._units["dos_protector"] if not unitctl: # unsupported platform return old_config = self._configs["dos_protector"] new_config = config.EnhancedDOS.as_dict() should_be_running = ( config.EnhancedDOS.ENABLED and not await csf.is_running() ) await self.__ensure_service_status( unitctl, "DosProtector", should_be_running, reload=(old_config != new_config), ) self._configs["dos_protector"] = new_config save_state("service_manager", self._configs) @plugins.expect(im360_messages.StrategyChange) async def on_strategy_change( self, _message: im360_messages.StrategyChange ): async with self._lock: await self._ensure_consistent_dos_protector_state() async def _ensure_consistent_ual_state(self): should_be_running = config.UnifiedAccessLogger.ENABLED unitctl = self._units["ual"] if should_be_running: UAL_CRON_PATH.unlink(missing_ok=True) await self.__ensure_service_status( unitctl, "UnifiedAccessLogger", should_be_running, reload=False ) if not should_be_running: self._create_ual_cronjob() def _create_ual_cronjob(self): if UAL_OLD_CRON_PATH.exists(): UAL_OLD_CRON_PATH.unlink() try: cronjob_content = UAL_CRON_TEMPLATE_PATH.read_text().format( random_minute=randint(0, 59), report_interval="5m" ) except FileNotFoundError: logger.warning( "UAL cron template not found: %s", UAL_CRON_TEMPLATE_PATH ) return if UAL_CRON_PATH.exists(): old_cronjob_content = UAL_CRON_PATH.read_text() identical = True for old_line, new_line in zip( old_cronjob_content.splitlines(), cronjob_content.splitlines() ): # {random_minute} * * * * root /usr/sbin/imunify36... if "* * *" in old_line: # is cronjob line # ignore minute part old_line = old_line[old_line.find(" ") + 1 :] new_line = new_line[new_line.find(" ") + 1 :] if old_line != new_line: identical = False break if identical: return with NamedTemporaryFile("w", delete=False) as f: temp_cronjob_path = Path(f.name) f.write(cronjob_content) temp_cronjob_path.chmod(0o644) shutil.move(temp_cronjob_path, UAL_CRON_PATH) async def _ensure_consistent_scanlogd_state(self): should_be_running = config.Scanlogd.ENABLE unitctl = self._units["scanlogd"] await self.__ensure_service_status( unitctl, "Scanlogd", should_be_running, reload=False ) async def _ensure_consistend_auditd_state(self): unitctl = self._units["auditd"] if not unitctl: # unsupported platform return await self.__ensure_service_status( unitctl, "AuditD", self.AUDITD_SHOULD_BE_RUNNING, reload=False )
[-] repeater.py
[edit]
[-] whitelist_current_user.py
[edit]
[-] modsec_ruleset_checker.py
[edit]
[-] startup_actions.py
[edit]
[-] cpanel_uploader.py
[edit]
[-] service_manager.py
[edit]
[+]
__pycache__
[-] fgw.py
[edit]
[-] php_immunity.py
[edit]
[-] send_server_config.py
[edit]
[-] lfd.py
[edit]
[-] ossec_rules_checker.py
[edit]
[-] remoteip_install.py
[edit]
[-] strategy_getter.py
[edit]
[-] __init__.py
[edit]
[-] pam_manager.py
[edit]
[-] waf_rules_configurator.py
[edit]
[-] export_wblist.py
[edit]
[+]
..