PATH:
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
import contextlib import logging from typing import Generator from defence360agent.api.server import send_message, NATSSendMessageException from defence360agent.contracts.plugins import MessageSink from defence360agent.plugins.client import SendToServerClient from defence360agent.utils import Scope logger = logging.getLogger(__name__) class SendToServerNATS(SendToServerClient, MessageSink): SCOPE = Scope.IM360 SHUTDOWN_PRIORITY = 900 # Shutdown late, after Accumulate has flushed def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._nats_api = None @contextlib.contextmanager def _get_api( self, ) -> Generator[send_message.NATSGatewayAPI, None, None]: # Reuse the same API instance to keep the NATS connection alive if self._nats_api is None: self._nats_api = send_message.NATSGatewayAPI() yield self._nats_api async def _send_pending_messages(self) -> None: if self._pending.empty(): return None if self._shutting_down.is_set(): logger.warning("Shutdown signal received, skipping NATS send") return None messages = self._pending.pop_all() logger.info("Sending %s messages via NATS", len(messages)) with self._get_api() as api: try: await api.send_messages(messages) if self._pending.qsize() > 0: logger.info( "Still need to send %s messages", self._pending.qsize(), ) except NATSSendMessageException as e: unsent = messages[e.published :] if unsent: self._pending.put_many(unsent) logger.warning( "Failed to send messages via NATS: " "%d published, %d re-queued: %s", e.published, len(unsent), e, ) except BaseException: # CancelledError (BaseException in 3.9+) or other fatal # errors — re-queue all messages to avoid data loss. self._pending.put_many(messages) raise async def shutdown(self) -> None: await super().shutdown() if self._nats_api is not None: await self._nats_api.close() logger.info("NATS connection closed")
[+]
application
[+]
simple_rpc
[+]
ioc
[+]
__pycache__
[+]
files
[-] __main__.py
[edit]
[+]
internals
[+]
model
[-] rpc_handlers.py
[edit]
[-] aibolit_job.py
[edit]
[+]
subsys
[+]
plugins
[-] _version.py
[edit]
[-] __init__.py
[edit]
[+]
migrations
[-] nats_gateway.py
[edit]
[+]
..
[-] cli.py
[edit]
[-] run.py
[edit]
[+]
contracts
[+]
utils
[+]
api