HEX
Server: Apache
System: Linux 185.122.168.184.host.secureserver.net 5.14.0-570.60.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Nov 5 05:00:59 EST 2025 x86_64
User: barbeatleanalyti (1024)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/thread-self/root/usr/lib/fm-agent/library/forticlient_helper.py
import logging
from os import path
import hashlib


def calculate_customer_key(ems_serial: str, environment: str) -> str:
    """
    Calculate a new customer key based on the ems_serial and environment.
    """
    hasher = hashlib.new("sha256")
    hasher.update(ems_serial.encode())
    hasher.update(environment.encode())
    customer_key = hasher.hexdigest()[0:20]
    return customer_key


class ForticlientHelper:
    data_path = "/Library/Application Support/Fortinet/FortiClient/data/"
    vpn_data = "/Library/Application Support/Fortinet/FortiClient/conf/vpn.plist"

    def __init__(self):
        #
        # Map file containing data to aggregator handhake key
        #
        self.data_files_map = {
            "fct_uid.conf": "forticlient_serial",
            "EMS_sn.conf": "ems_serial",
        }
        self.log = logging.getLogger()

    def parse_address(self, url):
        try:
            import urllib.parse as urlparse

            parsed = urlparse.urlparse(url)
            if parsed.scheme == "":
                parsed = urlparse.urlparse("http://" + url)
            host = parsed.netloc
            try:
                idx = host.index(":")
                host = host[0:idx]
            except:
                pass
            host_parts = host.lower().split(".")
            if len(host_parts) < 4:
                return None
            top_level = host_parts[-1]
            domain = host_parts[-2]
            environment = host_parts[-3]
            if len(host_parts) == 6:
                edge = host_parts[-5]
            else:
                edge = host_parts[-4]
            if top_level == "com" and domain == "fortisase" and edge == "edge":
                return environment
            return None
        except:
            return None

    """
    To determine the environment for FortiSASE, we expect
    to find a server address in one of the 2 forms:
    <name0>.edge.<environment>.fortisase.com
    or
    <name0>.edge.<zone>.<environment>.fortisase.com
    where name0 is arbitrary, and environment is what
    we will pass to the aggregator.
    """

    def get_vpn_environment(self):
        import plistlib

        with open(self.vpn_data, "rb") as f:
            data = plistlib.load(f)
            for profileInfo in data["Profiles"].values():
                server_name = profileInfo.get("Server", None)
                if server_name:
                    env = self.parse_address(server_name)
                    if env:
                        return env

        raise Exception("No environment profile found")

    def get_handshake_data(self):
        if not path.isdir(self.data_path):
            raise Exception("Missing forticlient data")

        rv = {}
        key_count = 0
        for file, hs_key in self.data_files_map.items():
            key_count += 1
            with open(path.join(self.data_path, file), "r") as df:
                dv = df.readline().strip()
                if len(dv) > 0:
                    rv[hs_key] = dv

        if len(rv.keys()) != key_count:
            raise Exception("Missing forticlient keys")
        try:
            rv["forticlient_environment"] = self.get_vpn_environment()
        except:
            raise Exception("Missing VPN data")

        return rv