HEX
Server: Apache
System: Linux 185.122.168.184.host.secureserver.net 5.14.0-570.60.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Nov 5 05:00:59 EST 2025 x86_64
User: barbeatleanalyti (1024)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/lib/fm-agent/plugins/fortisase_connection.py
import os
import logging
from ipaddress import IPv4Address, IPv6Address
from configparser import ConfigParser, NoOptionError
import sys
import json
import time
import re

import agent_util

EPCTRL_LOG = "/Library/Application Support/Fortinet/FortiClient/Logs/epctrl.log"


class FortisaseVPNConnection:
    def __init__(self):
        self.connection_state = None
        self.connection_name = None
        self.connected_address = None
        self.valid_tunnels = [
            "Secure Internet Access",
            "Secure Internet Access - IPsec",
            "FortiSASE Cloud Security",
            "SASE Secure Internet Access",
        ]
        self.log = logging.getLogger("fortisase")

    def parse_connection(self, data):
        """
        Receive a json data that contains the information of the VPN Connection, and its ip address
        """
        connections = json.loads(data)
        for connection in connections:
            tunnel_name = connection.get("tunnel_name")
            connected = bool(connection.get("connected"))
            ip = connection.get("ip_address")
            if tunnel_name in self.valid_tunnels and connected:
                # Tunnel is valid
                self.connection_state = "Connected"
                self.connection_name = tunnel_name
                if ":" in ip:
                    self.connected_address = IPv6Address(ip)
                else:
                    self.connected_address = IPv4Address(ip)
                return True
        return False

    def is_sia_tunnel_up(self):
        """
        Return True only if the tunnel name and connection state is in acceptable
        parameters.
        """
        return (
            self.connection_name in self.valid_tunnels
            and self.connection_state == "Connected"
        )

    def get_public_ip_address(self):
        """
        Parse the epctrl log file from FortiClient, backwards and return the first succesfull hit
        of the public_ip_checker found. Break after finding it.
        """
        if not os.path.exists(EPCTRL_LOG):
            self.log.warning(f"{EPCTRL_LOG} file not found.")
            return
        start = time.time()
        ip_address = None
        with open(EPCTRL_LOG, "rb") as opened:
            file_size = opened.seek(0, os.SEEK_END)
            position = file_size
            while ip_address is None:
                if position == 0:
                    break
                position = max(0, position - 5000)
                opened.seek(position)
                chunk = opened.read(min(5000, file_size - position))
                ip_address = self._parse_log_chunk(chunk.decode())
        end = time.time()
        self.log.info(
            f"Parsed {EPCTRL_LOG} in {end - start} seconds. Found: {ip_address}"
        )
        return ip_address

    def _parse_log_chunk(self, chunk: str):
        for line in chunk.split("\n"):
            match = re.search(
                r"^.*\spublic_ip_checker:\d*\sGot public IP from ipify:\s(.*)$", line
            )
            if match:
                try:
                    return IPv4Address(match.groups()[0])
                except Exception as err:
                    self.log.warning(
                        f"Unable to parse ipadress {match.groups()}. Err {err}"
                    )


class FortiSaseConnection(agent_util.Plugin):
    textkey = "fortisase"
    label = "FortiSase"
    log = logging.getLogger("fortisase")
    # Default config file for Agent Config for FortiSase installations.
    # We use it to determine the installation type. If file is not present, is not valid
    config_file = "/usr/local/FortiMonitor/agent/config/fm-agent/fm_agent.cfg"

    @classmethod
    def get_metadata(cls, config):
        status = agent_util.SUPPORTED
        msg = None

        # Plugin only set to work on OSX for now.
        if sys.platform.lower() != "darwin":
            return {}

        # Agent needs to be a FortiSase installation to work as well.
        if not os.path.exists(cls.config_file):
            cls.log.info(
                "Agent config file not found. Unable to determine handshake type"
            )
            return {}
        config_reader = ConfigParser()
        config_reader.read(cls.config_file)
        try:
            is_fortisase_install = (
                config_reader.get("agent", "handshake_type").lower() == "forticlient"
            )
        except NoOptionError:
            is_fortisase_install = False
        if not is_fortisase_install:
            status = agent_util.UNSUPPORTED
            msg = "Agent installation is not FortiSase"

        if status == agent_util.UNSUPPORTED:
            cls.log.info(f"Fortisase connection plugin disabled. {msg}")
            return {}

        metadata = {
            "osx.connected_sia": {
                "label": "Connected SIA",
                "options": None,
                "status": status,
                "error_msg": msg,
                "unit": "bool",
            },
            "osx.turbo_ip": {
                "label": "Turbo IP",
                "options": None,
                "status": status,
                "error_msg": msg,
            },
            "osx.public_ip": {
                "label": "Endpoint Public Ip",
                "options": None,
                "status": status,
                "error_msg": msg,
            },
        }
        return metadata

    def check(self, textkey, data, config):
        try:
            # This file configuration is only available on FortiClient 7.4.4 and above.
            vpn_data = "/Library/Application Support/Fortinet/FortiClient/data/vpn_status_info.json"
            vpn_data_exists = os.path.exists(vpn_data)
            connected, metric = None, None
            client = FortisaseVPNConnection()
            if vpn_data_exists:
                with open(vpn_data, "r") as opened:
                    data = opened.read()
                connected = client.parse_connection(data)
            else:
                self.log.warning(
                    f"{vpn_data} file does not exist. Unable to fully collect data"
                )
            if textkey == "osx.connected_sia":
                # Grab the fortitray connection value from the log file.
                connected = client.is_sia_tunnel_up()
                if connected:
                    return 1
                else:
                    return 0
            elif textkey == "osx.turbo_ip":
                connected = client.is_sia_tunnel_up()
                if not connected:
                    # If the connection is not detected we don't need to check the turbo ip.
                    return
                ip_address = client.connected_address
                if ip_address:
                    metric = int(ip_address)
            elif textkey == "osx.public_ip":
                public_ip = client.get_public_ip_address()
                if public_ip:
                    metric = int(public_ip)
            if metric:
                return float(metric)
        except Exception as msg:
            self.log.warning(f"Unable to process the FortiSase files. Error: {msg}")
            return