PATH:
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
subsys
""" This module provides functions for exporting whitelist for Real-time Blackhole List (RBL). """ import itertools import ipaddress import logging import os from pathlib import Path from typing import Optional from defence360agent.subsys.panels.base import PanelException from defence360agent.subsys import web_server from defence360agent.utils import ( COPY_TO_MODSEC_MAXTRIES, atomic_rewrite, check_run, CheckRunError, log_failed_to_copy_to_modsec, recurring_check, retry_on, ) from defence360agent.utils.common import CoalesceCalls from im360.subsys.int_config import is_force_use_coraza from im360.subsys.panels.base import use_modsec_lock from im360.subsys.panels.hosting_panel import HostingPanel from im360.model.global_whitelist import GlobalWhitelist from im360.model.custom_lists import CustomWhitelist from im360.internals.core.ipset.ip import IPSetWhiteFullAccess, IPSetWhite logger = logging.getLogger(__name__) async def reload_wafd(): if Path("/usr/bin/imunify360-wsctl").is_file(): args = ["imunify360-wsctl", "reload"] else: args = ["systemctl", "reload", "imunify360-wafd"] try: await check_run(args) except CheckRunError: logger.warning("Failed to reload 'imunify360-wafd'") #: how often to check the rbl_whitelist file POLLING_PERIOD = 60 # seconds #: Dedicated coalescer for Apache reloads triggered by create_rbl_whitelist. #: Independent from the shared 300 s webserver_gracefull_restart — we want a #: longer default window for the RBL path, and we don't want our throttle to #: interfere with other reload callers. _rbl_reload_coalescer = CoalesceCalls() async def _schedule_apache_reload_for_rbl(): """Schedule an Apache graceful reload on behalf of create_rbl_whitelist. Env-var tunables (read on every call so support can flip them via ``systemctl set-environment`` + ``systemctl restart imunify360`` without re-deploy): * ``IM360_RBL_RELOAD_DISABLED=1`` — suppress the reload entirely. The file on disk still advances; mod_security picks up changes on the next reload from any other code path (vendor update, panel integration, etc.). * ``IM360_RBL_RELOAD_MIN_PERIOD=<seconds>`` — minimum interval between reloads from this code path. Default 600 (10 min). """ if os.environ.get("IM360_RBL_RELOAD_DISABLED", "").lower() in ( "1", "true", "yes", ): logger.info( "Apache reload from create_rbl_whitelist suppressed " "(IM360_RBL_RELOAD_DISABLED is set)" ) return # Read at call time (not import time) so operators can retune without a # redeploy. Fall back to the default on a non-numeric value instead of # letting ValueError propagate — the file on disk has already been # rewritten at this point, and a raised exception here would also skip # the Coraza/wafd reload path below, leaving mod_security with stale # data until something else triggered a reload. raw_period = os.environ.get("IM360_RBL_RELOAD_MIN_PERIOD") try: period = max(0, int(raw_period)) if raw_period is not None else 10 * 60 except ValueError: logger.warning( "Invalid IM360_RBL_RELOAD_MIN_PERIOD=%r; " "falling back to default %d s", raw_period, 10 * 60, ) period = 10 * 60 # Building the wrapper on each call is cheap; state lives on # _rbl_reload_coalescer, not on the wrapper — coalescing across calls # behaves identically. coalesced = _rbl_reload_coalescer.coalesce_calls(period)( web_server._graceful_restart ) await coalesced() async def _get_whitelists_data(): global_white_list = await GlobalWhitelist.load() full_access_white_list = ( item["ip"] for item in IPSetWhiteFullAccess().query_all() ) # ignore "whitelisted by passing captcha" ips (DEF-13665) manual_white_list = IPSetWhite().get_non_captcha_passed_ips() custom_white_list = await CustomWhitelist.load() return itertools.chain( global_white_list, full_access_white_list, manual_white_list, custom_white_list, ) @use_modsec_lock @retry_on( FileNotFoundError, max_tries=COPY_TO_MODSEC_MAXTRIES, on_error=log_failed_to_copy_to_modsec, silent=True, ) async def create_rbl_whitelist(): rbl_whitelist_path = await _get_rbl_whitelist_path() if not rbl_whitelist_path: return whitelist_chain = await _get_whitelists_data() whitelist_chain = _convert_ip_addresses(whitelist_chain) new_whitelist = list(whitelist_chain) current_whitelist = _read_whitelist_from_file(rbl_whitelist_path) if set(new_whitelist) != set(current_whitelist): logger.info("Create RBL whitelist: %s", rbl_whitelist_path) text = "\n".join(sorted(new_whitelist)) # rbl_whitelist is a data file read by an @ipMatchFromFile SecRule — # safe_update_config's configtest/revert machinery is for Apache config # files and doesn't apply here. We write directly so we can attach our # own longer-window coalescer to the reload step (DEF-41807). if atomic_rewrite(str(rbl_whitelist_path), text, backup=False): logger.info("RBL whitelist was successfully updated") await _schedule_apache_reload_for_rbl() # Sort of a workaround to avoid redundant imports which can # cause circular dependencies if ( is_force_use_coraza() or HostingPanel().__class__.__name__ == "cPanelCoraza" ): logger.info( "Reloading 'imunify360-wafd' as coraza ruleset is in" " action" ) await reload_wafd() else: logger.info("No changes in RBL whitelist, no restart required") @recurring_check(POLLING_PERIOD) async def ensure_rbl_whitelist(): """Make sure rbl_whitelist is not empty.""" rbl_whitelist_path = await _get_rbl_whitelist_path() if not rbl_whitelist_path: return # do nothing at this time try: empty = not os.path.getsize(str(rbl_whitelist_path)) except FileNotFoundError: return else: if empty: await create_rbl_whitelist() # recreate def _convert_ip_addresses(iterable): for ip in iterable: ip = ipaddress.ip_network(ip) # RBL whitelist can't handle /32 nets. # we need to convert /32 nets to ips if ip.num_addresses == 1: ip = ipaddress.ip_address(ip.network_address) yield str(ip) async def _get_rbl_whitelist_path() -> Optional[Path]: """RBL whitelist stored in ModSec ruleset directory, returns Path for RBL whitelist file, or None if panel errors, or modsec rulest dir doesn't exists. """ try: rbl_whitelist_path = await HostingPanel().get_rbl_whitelist_path() except PanelException as e: logging.warning("Can't create rbl whitelist: %s", e) return None if not rbl_whitelist_path: logger.info("RBL whitelist path is undefined. Creation skipped") return rbl_whitelist_path def _read_whitelist_from_file(rbl_whitelist_path): logger.info("Read RBL whitelist: %s", rbl_whitelist_path) try: # `rbl_whitelist_path.open()` does not have to raise FileNotFoundError, # it might be, for example, OSError in the case of `py.path.local` # with open(str(rbl_whitelist_path), "r") as f: yield from map(str.strip, f) except FileNotFoundError: logger.info("RBL whitelist doest not exist: %s", rbl_whitelist_path)
[-] smtp_blocking.py
[edit]
[-] fail2ban.py
[edit]
[-] modsec_audit_log.py
[edit]
[-] proactive.py
[edit]
[-] whitelist_rbl.py
[edit]
[-] running_ids.py
[edit]
[-] ossec.py
[edit]
[+]
__pycache__
[-] pam.py
[edit]
[+]
features
[+]
panels
[-] webshield.py
[edit]
[-] shared_disabled_rules.py
[edit]
[-] int_config.py
[edit]
[-] modsec_app_version_detector.py
[edit]
[-] modsec_cache_dir.py
[edit]
[-] __init__.py
[edit]
[-] waf_rules_configurator.py
[edit]
[-] webshield_mode.py
[edit]
[+]
..
[-] csf.py
[edit]
[-] remoteip.py
[edit]