File: //usr/lib/fm-agent/plugins/fortisase_connection.py
import os
import logging
from ipaddress import IPv4Address, IPv6Address
from configparser import ConfigParser, NoOptionError
import sys
import json
import time
import re
import agent_util
EPCTRL_LOG = "/Library/Application Support/Fortinet/FortiClient/Logs/epctrl.log"
class FortisaseVPNConnection:
def __init__(self):
self.connection_state = None
self.connection_name = None
self.connected_address = None
self.valid_tunnels = [
"Secure Internet Access",
"Secure Internet Access - IPsec",
"FortiSASE Cloud Security",
"SASE Secure Internet Access",
]
self.log = logging.getLogger("fortisase")
def parse_connection(self, data):
"""
Receive a json data that contains the information of the VPN Connection, and its ip address
"""
connections = json.loads(data)
for connection in connections:
tunnel_name = connection.get("tunnel_name")
connected = bool(connection.get("connected"))
ip = connection.get("ip_address")
if tunnel_name in self.valid_tunnels and connected:
# Tunnel is valid
self.connection_state = "Connected"
self.connection_name = tunnel_name
if ":" in ip:
self.connected_address = IPv6Address(ip)
else:
self.connected_address = IPv4Address(ip)
return True
return False
def is_sia_tunnel_up(self):
"""
Return True only if the tunnel name and connection state is in acceptable
parameters.
"""
return (
self.connection_name in self.valid_tunnels
and self.connection_state == "Connected"
)
def get_public_ip_address(self):
"""
Parse the epctrl log file from FortiClient, backwards and return the first succesfull hit
of the public_ip_checker found. Break after finding it.
"""
if not os.path.exists(EPCTRL_LOG):
self.log.warning(f"{EPCTRL_LOG} file not found.")
return
start = time.time()
ip_address = None
with open(EPCTRL_LOG, "rb") as opened:
file_size = opened.seek(0, os.SEEK_END)
position = file_size
while ip_address is None:
if position == 0:
break
position = max(0, position - 5000)
opened.seek(position)
chunk = opened.read(min(5000, file_size - position))
ip_address = self._parse_log_chunk(chunk.decode())
end = time.time()
self.log.info(
f"Parsed {EPCTRL_LOG} in {end - start} seconds. Found: {ip_address}"
)
return ip_address
def _parse_log_chunk(self, chunk: str):
for line in chunk.split("\n"):
match = re.search(
r"^.*\spublic_ip_checker:\d*\sGot public IP from ipify:\s(.*)$", line
)
if match:
try:
return IPv4Address(match.groups()[0])
except Exception as err:
self.log.warning(
f"Unable to parse ipadress {match.groups()}. Err {err}"
)
class FortiSaseConnection(agent_util.Plugin):
textkey = "fortisase"
label = "FortiSase"
log = logging.getLogger("fortisase")
# Default config file for Agent Config for FortiSase installations.
# We use it to determine the installation type. If file is not present, is not valid
config_file = "/usr/local/FortiMonitor/agent/config/fm-agent/fm_agent.cfg"
@classmethod
def get_metadata(cls, config):
status = agent_util.SUPPORTED
msg = None
# Plugin only set to work on OSX for now.
if sys.platform.lower() != "darwin":
return {}
# Agent needs to be a FortiSase installation to work as well.
if not os.path.exists(cls.config_file):
cls.log.info(
"Agent config file not found. Unable to determine handshake type"
)
return {}
config_reader = ConfigParser()
config_reader.read(cls.config_file)
try:
is_fortisase_install = (
config_reader.get("agent", "handshake_type").lower() == "forticlient"
)
except NoOptionError:
is_fortisase_install = False
if not is_fortisase_install:
status = agent_util.UNSUPPORTED
msg = "Agent installation is not FortiSase"
if status == agent_util.UNSUPPORTED:
cls.log.info(f"Fortisase connection plugin disabled. {msg}")
return {}
metadata = {
"osx.connected_sia": {
"label": "Connected SIA",
"options": None,
"status": status,
"error_msg": msg,
"unit": "bool",
},
"osx.turbo_ip": {
"label": "Turbo IP",
"options": None,
"status": status,
"error_msg": msg,
},
"osx.public_ip": {
"label": "Endpoint Public Ip",
"options": None,
"status": status,
"error_msg": msg,
},
}
return metadata
def check(self, textkey, data, config):
try:
# This file configuration is only available on FortiClient 7.4.4 and above.
vpn_data = "/Library/Application Support/Fortinet/FortiClient/data/vpn_status_info.json"
vpn_data_exists = os.path.exists(vpn_data)
connected, metric = None, None
client = FortisaseVPNConnection()
if vpn_data_exists:
with open(vpn_data, "r") as opened:
data = opened.read()
connected = client.parse_connection(data)
else:
self.log.warning(
f"{vpn_data} file does not exist. Unable to fully collect data"
)
if textkey == "osx.connected_sia":
# Grab the fortitray connection value from the log file.
connected = client.is_sia_tunnel_up()
if connected:
return 1
else:
return 0
elif textkey == "osx.turbo_ip":
connected = client.is_sia_tunnel_up()
if not connected:
# If the connection is not detected we don't need to check the turbo ip.
return
ip_address = client.connected_address
if ip_address:
metric = int(ip_address)
elif textkey == "osx.public_ip":
public_ip = client.get_public_ip_address()
if public_ip:
metric = int(public_ip)
if metric:
return float(metric)
except Exception as msg:
self.log.warning(f"Unable to process the FortiSase files. Error: {msg}")
return