File: //lib/fm-agent/library/agent.py
from agent_util import execute_command, which
from datetime import datetime, timedelta
from inspector import Inspector, get_fqdn, get_server_name
from ipc_client import DEMClient, IPCClient
from agent_exceptions import NoAgentSectionHeaderException, NoManifestFileException
from forticlient_helper import ForticlientHelper, calculate_customer_key
from process_manager import ProcessManager
from os.path import basename, exists, isdir, isfile, join
from pickle_database import PickleDatabase
from plugin_manager import PluginManager
from pprint import pprint
from progress_printer import ProgressPrinter
from result_queue import ResultQueue
from schedule import Schedule
from sys import exit
# In case of Python3
try:
import StringIO
except:
import io as StringIO
import aggregator
import calendar
import container_discovery
import csv
import difflib
import display
import fcntl
import p_importlib
import logging
import logging.handlers
import optparse
import os
import random
import re
import subprocess
import sys
import tempfile
import time
import traceback
import types
from blacklister import PluginBlacklister
try:
import six
except:
# Legacy support for Python 2.4
class Six:
PY2 = True
six = Six()
# In case of python 3
try:
import ConfigParser as configparser
except:
import configparser
try:
import json
except ImportError:
try:
import simplejson as json
# it's possible that we may not need json for the action that we're taking.
# for example, for the rpm post install script, on a python version that
# doesn't have json, we'll get this far in the code. but the post
# install doesn't use json, so we're fine
except ImportError:
json = None
# Import a SHA function, either from hashlib for newer Python's or sha for older
try:
import hashlib
sha_func = hashlib.sha1
except:
import sha
sha_func = sha.new
# Backport subprocess.check_output for Python versions < 2.7
# Adapted from http://stackoverflow.com/questions/4814970/subprocess-check-output-doesnt-seem-to-exist-python-2-6-5
if "check_output" not in dir(subprocess): # duck punch it in!
def f(*popenargs, **kwargs):
if "stdout" in kwargs:
raise ValueError("stdout argument not allowed, it will be overridden.")
process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
output, unused_err = process.communicate()
retcode = process.poll()
return output
subprocess.check_output = f
try:
# Python 2.x
import urlparse
except:
import urllib.parse as urlparse
import urllib.request as urlrequest
import ssl
try:
# Python 2.x
import httplib
except ImportError:
import http.client as httplib
DEFAULT_MEDIATOR_URL = "https://global.fortimonitor.com"
AGENT_INSTALL_BLOCK = "/usr/local/FortiMonitor/agent/data/fm_agent_install_block"
class ExceptionMediator(Exception):
pass
def defer_installation(timeout):
"""
Return wheter the installation should be delayed or not. An installation is delayed
if there is a timestamp and our current time is less than that value.
"""
if timeout and timeout > time.time():
return True
return False
def get_next_wait_period(timestamp, waited):
"""
Calculate the next wait period until the installation can be attempted again, is the last timestamp
plus the last seconds we waited times 2, until a maximum of 12 hours have been reached.
"""
if not waited:
waited = 30
if not timestamp:
timestamp = time.time()
next_wait_secs = min(waited * 2, 43200)
return timestamp + next_wait_secs, next_wait_secs
def load_failed_tries_file():
"""
Load the file that contains the time to wait until trying the next install, and the
amount of seconds we have waited so far.
"""
if not os.path.exists(AGENT_INSTALL_BLOCK):
return None, None
timestamp, seconds = 0.0, 0
with open(AGENT_INSTALL_BLOCK) as opened:
data = opened.read()
timestamp, seconds = data.split(";")
timestamp = float(timestamp)
seconds = int(seconds)
return timestamp, seconds
def save_failed_tries_file(timestamp, seconds):
"""
Save a file containing the next time the install is allowed to proceed and the seconds we are
waiting for that timestamp.
"""
with open(AGENT_INSTALL_BLOCK, "w+") as opened:
opened.write(f"{timestamp};{seconds}")
def get_regional_agg_url(customer_key):
"""
Pull out the URL for the customer from the global mediator api to use instead of
the default.
Args:
customer_key: identifier for the customer to pull a single regional CP url.
Returns:
regional_agg_url: URL for the aggregator that the customer should use.
"""
if os.path.exists("/etc/fm_mediator_url"):
with open("/etc/fm_mediator_url", "rb") as opened:
mediator_url = opened.read().decode()
mediator_url = mediator_url.strip("\n")
else:
mediator_url = DEFAULT_MEDIATOR_URL
uri = "/aggregator_url/{}".format(customer_key)
if mediator_url.startswith("https://"):
base_url = mediator_url.split("https://")[-1]
elif mediator_url.startswith("http://"):
base_url = mediator_url.split("http://")[-1]
else:
base_url = mediator_url
aggregator_url = None
connection = httplib.HTTPSConnection(
host=base_url, timeout=10, context=ssl._create_unverified_context()
)
connection.request("GET", uri)
resp = connection.getresponse()
if resp.status == 200:
aggregator_url = resp.read().decode("utf-8")
else:
raise ExceptionMediator(
f"Failed to grab agg url using customer key {customer_key}. {resp.status}"
)
return aggregator_url
class Agent(object):
CUSTOM = "custom"
DEFAULT_LOG_LEVEL = "INFO"
MAX_IMPORT_FILES = 20
def safe_to_start_agent(self, timeout=2, sleep_time=10, counter=3):
"Check to see if it's safe to start up the agent"
# Safe if there are no other instances running
if not os.path.exists(self.pid_file):
self.log.debug("No existing PID file found, proceeding to run")
return True
# There's an existing PID file, so let's see if it's still active
try:
pid, timestamp = open(self.pid_file).read().strip().split(":")
pid = int(pid)
timestamp = int(timestamp)
except:
# If we couldn't read it, assume that the other instance just exited - should be safe to continue
self.log.critical(
"Error reading existing PID file: %s" % traceback.format_exc()
)
return True
# See if the process is still running
try:
os.getpgid(pid)
except OSError:
# It's exited, safe to proceed
return True
try:
import pwd
username = pwd.getpwuid(os.stat("/proc/%d" % pid).st_uid)[0]
psout = execute_command("ps -o cmd= %d" % pid)[1].lower()
if (
username != self.user
or "python" not in psout
or ("%s_agent" % self.brand) not in psout
):
self.remove_pid_file()
return True
except:
pass
# Process is running, see how old it is
if timeout and (time.time() - timestamp) / 60.0 > timeout:
self.log.critical("Found stale agent process %s - killing" % pid)
# Other process is too old, kill it off and start a new one
os.kill(pid, 9)
return True
# Checking if the process is to uninstall, in which case, kill the running process.
parser = optparse.OptionParser()
options, args = self.parse_arguments(parser)
if options.uninstall and self.user != "root":
self.log.critical(
"Uninstalling. Killing all process from the username %s " % self.user
)
try:
# We could get an exception if an uninstall is happening, and the agent user is removed.
manager = ProcessManager()
pids = manager.filter_non_pid_process(os.listdir("/proc"))
pids = manager.get_process_from_user(pids, self.user)
self.log.critical("Found pids %s " % " ".join(pids))
for pid in pids:
os.kill(int(pid), 9)
except:
pass
return True
# Other process should still be running, we bail for now
if counter != 0:
self.current_agent_delay += 10
counter -= 1
self.log.critical(
"Found existing agent process %s, sleeping for %s and checking %s more times if safe to start."
% (pid, sleep_time, counter)
)
time.sleep(sleep_time)
return self.safe_to_start_agent(timeout, counter=counter)
else:
self.log.critical(
"Found existing agent process %s, exiting to wait for it to finish"
% pid
)
return False
def write_pid_file(self):
"Create a new PID file to track our instance"
pid = os.getpid()
now = int(time.time())
f = open(self.pid_file, "w")
f.write("%s:%s" % (pid, now))
def remove_pid_file(self):
"Remove an old PID file to clean up on the way out"
# Need to check to see if it exists to avoid a problem on package uninstall
if os.path.exists(self.pid_file):
os.remove(self.pid_file)
# removes the agent from the system
def uninstall(self, aggregator_client, remove_instance=False):
indent = 1
pp = ProgressPrinter("Notifying %s of uninstall" % self.brand, indent=indent)
success = aggregator_client.notify_of_uninstall(remove_instance)
if success:
pp.finish()
else:
pp.finish("ERROR CONNECTING")
# Remove logging and DB directories. We'll leave CUSTOM_PLUGIN_DIR in tact in case they're
# uninstalling and reinstalling.
pp = ProgressPrinter("Removing %r directory" % self.log_dir, indent=indent)
os.system("rm -rf %s %s %s" % (self.db_dir, self.log_dir, self.config_dir))
pp.finish()
indent = 1
ProgressPrinter("\n\nUninstalling %s\n" % self.pkg_dir, section=True)
pp.finish()
print(("\nUninstall of %s complete\n" % self.pkg_dir))
def get_manifest(self):
"""
Get the manifest configuration if it exists. Also, throw a deprecation
warning if the the manifest does not conform to the new-style format
(It must have an [agent] section heading).
"""
manifest = configparser.ConfigParser()
try:
manifest_file = manifest.read(self.manifest_file)
if not manifest_file:
raise NoManifestFileException("No manifest file found")
if not manifest.has_section("agent"):
raise NoAgentSectionHeaderException(
"Using a manifest file without the section heading "
'"[agent]" is deprecated; please add this heading to '
"the file. Example:"
"""
[agent]
customer_key = customerkey
server_group = 123
"""
)
except (configparser.MissingSectionHeaderError, NoAgentSectionHeaderException):
self.log.warn(str(traceback.format_exc()))
if sys.version_info[0] == 3:
amended_manifest_file = StringIO.StringIO(
"[agent]\n" + open(self.manifest_file, "r").read()
)
else:
amended_manifest_file = StringIO.StringIO(
"[agent]\n" + open(self.manifest_file, "r").read().decode("utf-8")
)
manifest.readfp(amended_manifest_file)
except NoManifestFileException:
self.log.info(str(traceback.format_exc()))
return manifest
def write_config(self, manifest):
"""
Create/update the config file with the settings from the manifest.
Return the config.
"""
new_config = configparser.ConfigParser()
# Get the existing config file (if it exists) for creating a diff. See
# below.
old_config_lines = None
if os.path.exists(self.config_file):
self.log.info("Existing config file found")
old_config_file = open(self.config_file, "rb")
old_config_lines = old_config_file.readlines()
old_config_file.close()
# Copy old config settings into the new config
old_config = configparser.ConfigParser()
old_config.read(self.config_file)
new_config = self.copy_config_settings(old_config, new_config)
# Copy the manifest settings into the new config
new_config = self.copy_config_settings(manifest, new_config)
# Ensure the required settings are set.
if not new_config.has_section("agent"):
new_config.add_section("agent")
if not new_config.has_option("agent", "aggregator_url"):
new_config.set("agent", "aggregator_url", self.agg_url)
new_config.set("agent", "version", self.version)
if "plugin_blacklist" in new_config.options("agent"):
original_plugins = new_config.get("agent", "plugin_blacklist")
else:
original_plugins = []
updated_plugins = self._blacklister.update_list(original_plugins)
if updated_plugins:
new_config.set("agent", "plugin_blacklist", updated_plugins)
proxies = urlrequest.getproxies()
if not new_config.has_section("agent_proxy") and proxies:
agg_url = new_config.get("agent", "aggregator_url")
try:
agg_url_option = urlparse.urlparse(agg_url)
if agg_url_option.scheme:
agg_hostname = agg_url_option.hostname
else:
agg_hostname = agg_url_option.path
if not urlrequest.proxy_bypass(agg_hostname):
new_config.add_section("agent_proxy")
for key in ["https", "http"]:
p_url = proxies.get(key, None)
if p_url is not None:
new_config.set("agent_proxy", key, p_url.strip("/"))
except:
err = sys.exc_info()[1]
error = str(err)
self.log.error("Install proxy error: {}".format(error))
new_config_file = open(self.config_file, "w")
new_config.write(new_config_file)
new_config_file.close()
os.system("chmod 640 %s" % self.config_file)
if old_config_lines is not None:
# Create a diff of the old config vs new config.
differ = difflib.Differ()
diff_lines = differ.compare(
old_config_lines, open(self.config_file, "r").readlines()
)
diff_lines = list(diff_lines)
changes = [
line
for line in diff_lines
if line.startswith("+ ") or line.startswith("- ")
]
if len(changes):
self.log.info("Config file overwritten")
self.log.debug("Config diff:\n%s", "".join(diff_lines))
else:
self.log.info("No change to config file")
else:
self.log.info("Created new config file: %s", self.config_file)
return new_config
def copy_config_settings(self, original, destination):
"""
Copy settings from the original to the destination, overwriting
destination's settings if they already exist.
"""
for section in original.sections():
if not destination.has_section(section):
destination.add_section(section)
for option, value in original.items(section):
destination.set(section, option, value)
return destination
def install(self, agg_url, version, server_key, customer_key, force=False):
self.log.info("Begining installation")
block_found, waited_for = load_failed_tries_file()
if defer_installation(block_found):
until = datetime.fromtimestamp(block_found or time.time())
self.log.error(
f"Agent installation block found at {AGENT_INSTALL_BLOCK}. Preventing install until {until}."
)
sys.exit(-3)
if self.is_installed and force is False:
print("Agent already installed")
self.log.info("Agent already installed")
return
# Make dirs for logging, DB, config, and plugins.
dirs = (self.log_dir, self.db_dir, self.config_dir, self.custom_plugin_dir)
os.system("mkdir -p %s %s %s %s" % dirs)
self.log.info("Created directories: %s %s %s %s" % dirs)
# Create a new config from the manifest (if it exists).
manifest = self.get_manifest()
config = self.write_config(manifest)
proxy_config = {}
if config.has_section("agent_proxy"):
proxy_config = config["agent_proxy"]
aggregator_client = aggregator.Client(
agg_url, version, server_key, customer_key, proxy_config=proxy_config
)
agent_settings = dict(
(option, value.strip("'\"")) for option, value in config.items("agent")
)
if config.has_section("agent_proxy"):
aggregator_client.proxy_config = config["agent_proxy"]
pp = ProgressPrinter("\nHandshaking with %s servers" % self.brand, indent=1)
# Check for a custom aggregator URL, and set it in the client if present
handshake_type = agent_settings.get("handshake_type", "standard").lower()
if handshake_type != "forticlient":
try:
agg_url = config.get("agent", "aggregator_url")
print(
(
"Using manifest file aggregator for initial handshake: %s"
% agg_url
)
)
self.log.info(
"Using manifest file aggregator for initial handshake: %s" % agg_url
)
aggregator_client.agg_url = agg_url
except:
pass
elif handshake_type == "forticlient":
# If we are a FortiSase install, pull the regional aggregator url using the calculated customer key,
# and overwrite the config of the agent with it.
try:
handshake_data = self.get_fortisase_attributes()
agent_settings["forticlient_metadata"] = handshake_data
# Calculate the expected customer key from ems_serial and environment
ems_serial = handshake_data["ems_serial"]
environment = handshake_data["forticlient_environment"]
expected_customer_key = calculate_customer_key(ems_serial, environment)
agent_settings["customer_key"] = expected_customer_key
agg_url = get_regional_agg_url(expected_customer_key)
logging.info(
f"Overwriting agg url with {agg_url} for customer key {expected_customer_key}"
)
aggregator_client.agg_url = agg_url
agent_settings["aggregator_url"] = agg_url
# Save the agg url on the config.
config.set("agent", "aggregator_url", agg_url)
config.write(open(self.config_file, "w"))
if os.path.exists(AGENT_INSTALL_BLOCK):
# Remove the installation caching block file.
os.remove(AGENT_INSTALL_BLOCK)
except Exception as err:
block_until, seconds = get_next_wait_period(block_found, waited_for)
save_failed_tries_file(block_until, seconds)
self.log.exception(
f"Mediator error grabbing agg url {err}. Blocking agent installation for {seconds}."
)
sys.exit(-3)
else:
raise ValueError(
f"Unexpected handshake type {handshake_type}. Aborting handshake"
)
if config.has_section("attributes"):
server_attributes = dict(
(option, value) for option, value in config.items("attributes")
)
else:
server_attributes = {}
try:
success, server_key, found_server, error, log_level = (
aggregator_client.handshake(
self.get_all_ips(), agent_settings, server_attributes
)
)
except:
print(
"\n\nThere was an error in the initial handshake with the aggregator, please"
)
print(
"check your aggregator URL, and ensure you have connectivity to retrieve:\n"
)
for url in agg_url.split(","):
print((" %s\n" % os.path.join(url, "v2/hello")))
self.log.error("Error in initial handshake: %s" % traceback.format_exc())
sys.exit()
if not server_key or not found_server:
print("Handshake failed: %s" % error)
self.log.error("Handshake failed: %s" % error)
sys.exit()
self.log.debug(
"%s, %s, %s, %s, %s" % (success, server_key, found_server, error, log_level)
)
if log_level:
self.db["log_level"] = log_level
# Install remote countermeasures plugins, if specfied
if (
"enable_countermeasures" in config.options("agent")
and config.get("agent", "enable_countermeasures").lower() == "true"
and "countermeasures_remote_plugins" in config.options("agent")
):
for url in config.get("agent", "countermeasures_remote_plugins").split(","):
cmd = "%s %s/countermeasure.py install_plugins --url %s" % (
sys.executable,
self.bin_dir,
url.strip(),
)
os.system(cmd)
if success:
pp.finish()
else:
self.log.critical("Installation failed:\n%s", error)
pp.finish("ERROR CONNECTING: %s" % error)
if success and server_key:
config.set("agent", "server_key", server_key)
config.write(open(self.config_file, "w"))
if found_server:
print(
(
"""Installation of %s complete. Your server will now sync automatically with the %s ControlPanel.
"""
% (self.pkg_dir, self.brand)
)
)
self.log.info("Agent will automatically sync with aggregator")
else:
padding = int(80 / 2 - (len(server_key) / 2))
server_key = (" " * padding) + server_key
print(
(
""" Installation of %s complete. Please copy and paste the following
server key into the %s ControlPanel for your server: %s"""
% (self.pkg_dir, self.brand, server_key)
)
)
self.log.warn(
"The server key must be manually entered into the "
"Control Panel before agent will begin syncing"
)
else:
print(
(
""" Installation of %s had an error (%s). The %s is installed but it cannot sync correctly.
Please contact %s and send them the log file at %s """
% (self.pkg_dir, error, self.pkg_dir, self.brand, self.log_file)
)
)
self.log.critical("Aggregator sync failed:\n%s", error)
self.migrate_config()
# This is used for collecting all the IPs associated with the machine, to be
# passed to the aggregator through aggregator.Client.handshake(). The
# aggregator will then check all of these IPs in sequence to try to find a
# matching server.
def get_all_ips(self):
ips = []
ifconfig_path = which("ifconfig")
ifconfig_cmd = ifconfig_path
# special logig for solaris b/c if the way ifconfig syntax is changed
if "sunos" in sys.platform or "aix" in sys.platform:
ifconfig_cmd = ifconfig_path + " -a"
if "hp-ux" in sys.platform:
netstat = which("netstat")
if netstat:
code, output = execute_command("%s -in" % netstat)
if code == 0:
for l in output.split("\n"):
if l.lower().startswith("name") or not l or l == "":
continue
line = l.split()
ips.append(line[3])
elif ifconfig_path and "hp-ux" not in sys.platform:
code, output = execute_command(ifconfig_cmd)
if code == 0:
if sys.platform in ("freebsd", "darwin"):
ips = re.findall(r"inet6? (.+?)\s", output)
ips = [ip.strip().split("%")[0] for ip in ips]
else:
ips = [
x.strip("addr:") for x in re.findall(r"inet6? (.+?)\s", output)
]
else:
ip_addr_path = which("ip")
code, output = execute_command("%s addr show" % ip_addr_path)
ips = [x for x in re.findall(r"inet6? (.+?)\s", output)]
ips = [x for x in ips if x]
# Remove any stray whitespace and CIDR notation from IPv6 addresses
# AIX reports an inet6 address like '::1%1/64' - account for that.
ips = [ip.strip().split("/")[0].split("%")[0] for ip in ips]
if "1" in ips:
ips[ips.index("1")] = "::1"
# If that didn't work, get the IP address by making an outbound
# connection with the aggregator.
if not ips:
self.log.warning(
"Unable to retrieve IP address(es) locally, contacting aggregator"
)
aggregator_client = aggregator.Client(
self.agg_url, self.version, proxy_config=self.proxy_config
)
try:
ips = [aggregator_client.get_local_ip()]
except Exception as e:
self.log.error("IP address lookup failure: {}".format(e))
ips = []
if not ips:
self.log.error("Unable to determine IP address(es)")
else:
self.log.debug("IP addresses: %s", ips)
return ips
def get_old_style_config_properties(self, manfile):
# Return with no error if the manifest file doesn't exist
if not os.path.exists(manfile):
return {}
try:
mf = open(manfile).read().strip().split("\n")
return dict([list(map(str.strip, line.split("="))) for line in mf])
except:
print("Error reading manifest file")
return {}
def _open_file(self, fname, mode="r+"):
ofile = open(fname, mode)
# Acquire lock
locked = True
for i in range(10):
try:
fcntl.flock(ofile, fcntl.LOCK_EX | fcntl.LOCK_NB)
locked = False
break
except:
time.sleep(1.0)
if locked:
self.log.exception("Could not acquire lock on %s" % fname)
ofile.close()
return None
return ofile
def get_metric_values(self):
if exists(self.report_file):
# Read current metrics
csvfile = self._open_file(self.report_file)
if not csvfile:
return {}
unique_values = {}
try:
csv_reader = csv.reader(csvfile)
for textkey, value, timestamp in csv_reader:
timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
value = float(value)
unique_values[
"%s:%s" % (textkey, timestamp.strftime("%Y%m%d%H%M"))
] = [textkey, value, time.mktime(timestamp.timetuple())]
except:
self.log.error("Unable to parse custom metric file")
unique_values = {}
unique_values = list(unique_values.values())
unique_values.sort(key=lambda v: v[2])
custom_values = {}
for textkey, value, timestamp in unique_values:
if textkey not in custom_values:
custom_values[textkey] = [[value, timestamp]]
else:
custom_values[textkey].append([value, timestamp])
# Remove all synced metrics
csvfile.seek(0)
csvfile.truncate()
# Release lock
fcntl.flock(csvfile, fcntl.LOCK_UN)
csvfile.close()
return custom_values
else:
return {}
def get_registered_metrics(self):
if exists(self.register_file):
# Read current metrics
csvfile = self._open_file(self.register_file)
if not csvfile:
return {}
csvreader = csv.reader(csvfile)
try:
metrics = dict([(row[0], row[1]) for row in csvreader])
except Exception:
self.log.exception("Error reading custom metric register file")
metrics = {}
# Remove all synced metrics
csvfile.seek(0)
csvfile.truncate()
# Release lock
fcntl.flock(csvfile, fcntl.LOCK_UN)
csvfile.close()
return metrics
else:
return {}
def get_existing_metrics(self):
existing_tkeys = []
for sr_id, schedule in list(self.db["schedules"].items()):
tkey = "%s.%s" % (schedule.plugin_textkey, schedule.resource_textkey)
if tkey not in existing_tkeys:
existing_tkeys.append(tkey)
return existing_tkeys
def ignore_metric(self, plugin_textkey, resource_textkey):
if plugin_textkey == "com.pnp-hcl.dominostats":
if resource_textkey.startswith("Mem.PID."):
return True
return False
def process_imports(self, config):
req_top_keys = ["plugin_textkey", "plugin_category_name"]
req_metric_keys = ["textkey", "value", "unit", "timestamp"]
req_incident_keys = ["textkey", "description", "action", "timestamp"]
existing_metrics = self.get_existing_metrics()
self.log.info("Processing incoming import files")
new_metrics = {}
new_values = {}
custom_incidents = []
import_dirs = [self.custom_import_dir]
additional_dirs = (
config.has_option("agent", "metric_incoming_directory")
and config.get("agent", "metric_incoming_directory")
or None
)
if additional_dirs:
import_dirs.extend(additional_dirs.split(","))
max_files = self.MAX_IMPORT_FILES
max_override = (
config.has_option("agent", "max_incoming_files_override")
and config.get("agent", "max_incoming_files_override")
or None
)
if max_override:
max_files = int(max_override)
files = []
for directory in import_dirs:
if not isdir(directory):
continue
if len(files) >= max_files:
break
self.log.info("Looking in %s", directory)
for f in os.listdir(directory):
if len(files) >= max_files:
break
if isfile(join(directory, f)):
files.append(join(directory, f))
for full_path in files:
fname = basename(full_path)
# Check if we can delete this file when we're done
if not os.access(full_path, os.W_OK):
self.log.error("Can not delete %s so will not process.", full_path)
continue
f = open(full_path, "r+")
try:
self.log.info("Processing %s", full_path)
j = json.loads(f.read())
f.close()
for req in req_top_keys:
if req not in list(j.keys()):
logging.error(
"Can not process file %s! Missing required key: %s",
fname,
req,
)
# TODO: Log full file here?
continue
metrics = j.get("metrics", [])
for m in metrics:
for req in req_metric_keys:
if req not in list(m.keys()):
logging.error(
"Can not process metric! Missing required key: %s", req
)
pprint(m)
continue
if self.ignore_metric(j["plugin_textkey"], m["textkey"]):
continue
try:
try:
unix_timestamp = int(m["timestamp"])
except Exception:
timestamp = datetime.strptime(
m["timestamp"], "%Y-%m-%d %H:%M:%S"
)
unix_timestamp = calendar.timegm(timestamp.timetuple())
except Exception:
self.log.error(
"Could not process timestamp %s for metric %s",
m["timestamp"],
m["textkey"],
)
continue
new_value = (m["value"], unix_timestamp)
tkey = "%s.%s" % (j["plugin_textkey"], m["textkey"])
if tkey not in existing_metrics:
if tkey in new_metrics:
new_metrics[tkey].setdefault("first_values", []).append(
new_value
)
else:
label = m.get("label", None)
if label is None:
label = m["textkey"]
new_metrics[tkey] = {
"plugin_textkey": j["plugin_textkey"],
"plugin_name": j["plugin_category_name"],
"resource_textkey": m["textkey"],
"label": label,
"unit": m.get("unit", None),
"first_values": [new_value],
}
else:
new_values.setdefault(tkey, []).append(new_value)
incidents = j.get("incidents", [])
for incident in incidents:
for req in req_incident_keys:
if req not in list(incident.keys()):
logging.error(
"Can not process incident! Missing required key: %s",
req,
)
pprint(incident)
continue
try:
try:
unix_timestamp = int(incident["timestamp"])
except Exception:
timestamp = datetime.strptime(
incident["timestamp"], "%Y-%m-%d %H:%M:%S"
)
unix_timestamp = calendar.timegm(timestamp.timetuple())
except Exception:
self.log.error(
"Could not process timestamp %s for incident %s",
incident["timestamp"],
incident["textkey"],
)
continue
obj = {
"plugin_textkey": j["plugin_textkey"],
"resource_textkey": incident["textkey"],
"timestamp": unix_timestamp,
"description": incident["description"],
"action": incident["action"],
}
if "match_key" in incident:
obj["match_key"] = incident["match_key"]
if "metadata" in incident:
obj["metadata"] = incident["metadata"]
custom_incidents.append(obj)
# All done with this file, delete it
os.remove(full_path)
except Exception:
if f.closed:
f = open(full_path, f.mode)
self.log.error("Error processing %s:", fname)
# TODO: Can this be debug instead?
f.seek(0)
self.log.info(f.read())
self.log.error(traceback.format_exc())
self.log.error("Deleting file")
f.close()
os.remove(full_path)
continue
return new_metrics, new_values, custom_incidents
def get_update_config(self):
config = {"fqdn": get_fqdn()}
if os.path.exists(self.update_config_file):
manfile = self._open_file(self.update_config_file)
if not manfile:
return config
# Read current properties
properties = self.get_old_style_config_properties(self.update_config_file)
# Release lock and remove
manfile.seek(0)
manfile.truncate()
fcntl.flock(manfile, fcntl.LOCK_UN)
manfile.close()
try:
os.remove(self.update_config_file)
except:
pass
return properties
else:
if self.is_fortisase_install:
server_name = get_server_name()
if server_name:
config["server_name"] = server_name
return config
def __init__(
self,
brand,
agg_url,
version,
user,
bin_dir,
lib_dir,
pkg_dir,
timeout,
base_config_dir,
base_custom_plugin_dir,
base_data_dir,
base_log_dir,
acceptable_sync_delay,
):
self.brand = brand
self.agg_url = agg_url
self.version = version
self.user = user
self.lib_dir = lib_dir
self.bin_dir = bin_dir
self.pkg_dir = pkg_dir
self.tmp_dir = tempfile.gettempdir()
self.metadata_rebuild_freq = (
3600 # How often do we want to rebuild metadata (seconds)
)
self.is_root = os.getuid() == 0 or os.geteuid() == 0
self.acceptable_sync_delay = acceptable_sync_delay
# XXX I think these dir settings might need to be moved back into the
# configs.
# These dirs and files are managed by the script, not the package.
# Need to be created by the script by --install, and removed by --uninstall.
self.db_dir = os.path.join(base_data_dir, self.pkg_dir)
self.db_file = join(self.db_dir, "%s.db" % self.pkg_dir)
self.log_dir = os.path.join(base_log_dir, self.pkg_dir)
self.log_file = join(self.log_dir, "%s.log" % self.pkg_dir)
if len(sys.argv) > 0 and sys.argv[1] == "--power-status":
self.log_file = os.path.join(self.log_dir, "power_status.log")
self.config_dir = os.path.join(base_config_dir, self.pkg_dir)
self.config_file = join(self.config_dir, "%s_agent.cfg" % self.brand)
self.custom_plugin_dir = os.path.join(base_custom_plugin_dir, self.pkg_dir)
self.countermeasures_custom_plugin_dir = os.path.join(
self.custom_plugin_dir, "countermeasures"
)
self.custom_import_dir = os.path.join(self.custom_plugin_dir, "incoming")
self.manifest_file = os.path.join(
base_config_dir, "%s-agent-manifest" % self.brand
)
data_dir = os.path.join(base_data_dir, self.pkg_dir)
self.pid_file = os.path.join(data_dir, "agent.pid")
self.update_config_file = os.path.join(
base_data_dir, self.pkg_dir, "update-config"
)
# Plugin textkey for custom metrics specified by the user as well as register and report files
if "freebsd" in sys.platform.lower():
self.register_file = os.path.join(lib_dir, "register")
self.report_file = os.path.join(lib_dir, "report")
elif "darwin" == sys.platform.lower():
self.register_file = os.path.join(self.custom_plugin_dir, "register")
self.report_file = os.path.join(self.custom_plugin_dir, "report")
else:
self.register_file = os.path.join(base_data_dir, self.pkg_dir, "register")
self.report_file = os.path.join(base_data_dir, self.pkg_dir, "report")
# See if we've been installed - the BIN_DIR directory neeeds to exist, and then we need to
# make sure there is a server_key in the config file
self.is_installed = True
self.has_dem = False
self.dem_port = "demservice"
self.update_service_port = "updateservice"
self.ipcPath = "/tmp/com.fortinet.fortimonitor"
self.auto_update = False
self.scheduled_update = None
self.is_fortisase_install = False
self.proxy_config = None
try:
if not exists(self.bin_dir):
raise Exception("No bin directory")
if not os.path.exists(self.config_file):
raise Exception("No config file {}".format(self.config_file))
config_file = configparser.ConfigParser()
config_file.read(self.config_file)
if config_file.has_section("agent_proxy"):
self.proxy_config = config_file["agent_proxy"]
if sys.platform in ("darwin", "linux"):
if config_file.has_option("dem", "enabled"):
self.has_dem = config_file.get("dem", "enabled").lower() == "true"
if config_file.has_option("dem", "server_port"):
self.dem_port = config_file.get("dem", "server_port")
if config_file.has_option("agent", "updateservice.port"):
self.update_service_port = config_file.get(
"agent", "updateservice.port"
)
if config_file.has_option("agent", "ipc_path"):
self.ipcPath = config_file.get("agent", "ipc_path")
if "darwin" == sys.platform:
if config_file.has_option("agent", "auto_update"):
self.auto_update = (
config_file.get("agent", "auto_update").lower() == "true"
)
if config_file.has_option("agent", "scheduled_update"):
self.scheduled_update = config_file.get(
"agent", "scheduled_update"
)
if config_file.has_option("agent", "handshake_type"):
if (
"forticlient"
== config_file.get("agent", "handshake_type").lower()
):
self.is_fortisase_install = True
server_key = config_file.get("agent", "server_key")
if not server_key:
raise Exception("Missing server key")
except Exception as e:
logging.exception(f"Initialization failure: {e}")
sys.stderr.write("Initialize exception: {}".format(e))
self.is_installed = False
# Custom OS block
# Here we'll update sys.platform for all plugins to be able to use
if "VMkernel" in os.uname():
sys.platform = "vmware"
# Actual run of the agent delay.
self.current_agent_delay = 0
self.set_up_logging()
self.log = logging.getLogger(self.__class__.__name__)
try:
self.timeout = float(config_file.get("agent", "startup_timeout"))
except Exception:
self.timeout = timeout
self.db = None
self._blacklister = PluginBlacklister()
def migrate_config(self):
"""
Update agent configs to use "[agent]" instead of "[AgentConfig]" as
the main heading and "aggregator_url" instead of "agg_url" (in order to
match the option in the manifest file).
"""
if self.db["config_migrated"]:
self.log.info("Config is in the correct format")
return
config = configparser.ConfigParser()
if config.read(self.config_file):
config_has_changed = False
if not config.has_section("agent"):
config.add_section("agent")
config_has_changed = True
self.log.info("Added [agent] section to config")
if config.has_section("AgentConfig"):
for option, value in config.items("AgentConfig"):
if option == "agg_url":
option = "aggregator_url"
config.set("agent", option, value)
config.remove_section("AgentConfig")
config_has_changed = True
self.log.info(
"Copied deprecated [AgentConfig] section to [agent] and removed it from config"
)
if config_has_changed:
config_file = open(self.config_file, "w")
config.write(config_file)
config_file.close()
self.db["config_migrated"] = True
def __del__(self):
self.remove_pid_file()
def set_up_logging(self):
root_logger = logging.getLogger()
if not os.path.isdir(self.log_dir):
os.system("mkdir -p {}".format(self.log_dir))
try:
log_file = open(self.log_file, "a")
except IOError:
print(
(
'Cannot open log file %s: "%s"'
% (self.log_file, str(traceback.format_exc()))
)
)
print("Logging to stderr instead")
handler = logging.StreamHandler()
else:
log_file.close()
handler = logging.handlers.RotatingFileHandler(
self.log_file, "a", maxBytes=5 * 1024**2, backupCount=5
)
handler.setFormatter(
logging.Formatter(
"%(process)d) %(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
)
root_logger.addHandler(handler)
# We initialize the level to NOTSET here to make sure that all
# logging inside PickleDatabase is captured because the root
# logger's default log level is WARNING. See
# https://docs.python.org/2/library/logging.html#logging.Logger.setLevel
# for details).
root_logger.setLevel(logging.NOTSET)
db = self.open_db()
try:
log_level = getattr(logging, db["log_level"].upper())
except:
log_level = getattr(logging, self.DEFAULT_LOG_LEVEL)
root_logger.setLevel(log_level)
def parse_arguments(self, parser):
"""
Return the options and arguments parsed from the parser.
"""
if self.is_installed:
parser.add_option("--server-key", dest="server_key", action="store")
parser.add_option(
"--rebuild-metadata",
action="store_true",
dest="rebuild_metadata",
default=False,
)
parser.add_option("--status", action="store_true", dest="status", default=False)
parser.add_option("--stats", action="store_true", dest="stats", default=False)
parser.add_option(
"--from-cron", action="store_true", dest="from_cron", default=False
)
parser.add_option("--aggregator", action="store", dest="aggregator")
parser.add_option(
"--install", action="store_true", default=False, dest="install"
)
parser.add_option(
"--uninstall", action="store_true", default=False, dest="uninstall"
)
parser.add_option(
"--remove-instance",
action="store_true",
default=False,
dest="remove_instance",
)
parser.add_option(
"--customer-key", default=None, action="store", dest="customer_key"
)
parser.add_option(
"--unpause", default=None, action="store_true", dest="unpause"
)
# Docker
parser.add_option(
"--list-containers",
default=None,
action="store_true",
dest="list_containers",
)
parser.add_option(
"--rebuild-container-metadata",
default=None,
action="store_true",
dest="rebuild_container_metadata",
)
parser.add_option(
"--power-status", dest="system_power_status", action="store", default=None
)
options, args = parser.parse_args()
return options, args
def main(self):
activityStart = datetime.now()
server_key = None
config = configparser.RawConfigParser()
config.read(self.config_file)
try:
safe_counter = int(config.get("agent", "safe_counter"))
except:
safe_counter = 3
if self.is_installed and config != []:
try:
server_key = config.get("agent", "server_key")
except:
server_key = None
try:
self.agg_url = config.get("agent", "aggregator_url") or self.agg_url
except:
self.agg_url = None
# installed? just print out the server key
usage = """%%prog [options]
%s, server key: %s, aggregator endpoint: %s
""" % (self.pkg_dir, server_key, self.agg_url)
# not installed? print out the install usage
else:
usage = """sudo python %%prog --install [--customer-key=YOUR_CUSTOMER_KEY]
%s""" % (self.pkg_dir,)
parser = optparse.OptionParser(usage=usage)
options, args = self.parse_arguments(parser)
if options.system_power_status:
try:
self.log.info(
"Power status -> {} UID {} EUID {}".format(
options.system_power_status, os.getuid(), os.geteuid()
)
)
aggregator_client = aggregator.Client(
self.agg_url,
self.version,
server_key,
proxy_config=self.proxy_config,
)
data = {"reason": options.system_power_status}
aggregator_client.call("agent_power_change", data)
except:
pass
exit(0)
if not self.safe_to_start_agent(self.timeout, counter=safe_counter):
# Need to overwrite delete to avoid removing a pid
self.__del__ = lambda: self.log.warning("Preventing pid file removal")
self.log.warning(
"Exiting without running - other agent process already running"
)
sys.exit(1)
self.write_pid_file()
db = False
self.db = self.open_db()
if self.is_installed:
db = self.db
# XXX This may be removed at a later date, when all agents' configs have
# been migrated.
self.migrate_config()
self.log.info("Activity started")
if options.status:
plugins = PluginManager(
db,
self.config_file,
join(self.lib_dir, "plugins"),
self.custom_plugin_dir,
)
display.status(self, server_key, db["schedules"], plugins)
if options.stats:
display.stats(db["schedules"], db["num_syncs"], db["last_sync"])
if options.uninstall:
aggregator_client = aggregator.Client(
self.agg_url, self.version, server_key, proxy_config=self.proxy_config
)
self.uninstall(aggregator_client, options.remove_instance)
exit()
if not self.is_installed or options.install:
if options.aggregator:
self.agg_url = options.aggregator
customer_key = options.customer_key or None
if "darwin" == sys.platform.lower():
dirs_to_create = [
(self.db_dir, None),
(self.config_dir, None),
(self.custom_plugin_dir, 0o777),
(self.countermeasures_custom_plugin_dir, 0o777),
(self.custom_import_dir, 0o777),
]
for dir, perms in dirs_to_create:
os.system("mkdir -p {}".format(dir))
if perms:
os.chmod(dir, perms)
for rfile in [self.register_file, self.report_file]:
with open(rfile, "a+") as rf:
pass
if not os.path.isfile(rfile):
self.log.warning("Installer did not create {}".format(rfile))
self.install(self.agg_url, self.version, server_key, customer_key)
return
# Require at least one of these options
valid_options = [
"from_cron",
"aggregator",
"rebuild_metadata",
"server_key",
"unpause",
"list_containers",
"rebuild_container_metadata",
]
option_given = False
for valid_option in valid_options:
if getattr(options, valid_option, None):
option_given = True
break
if not option_given:
msg = "%s Agent v%s, server key: %s, aggregator endpoint: %s" % (
self.brand,
self.version,
server_key,
self.agg_url,
)
print(msg)
self.log.info(msg)
return
# Support unpausing from the command line
if options.unpause:
print("Unpausing agent, will run as usual on next run")
db["pause"] = None
db.save()
return
# Docker cli commands
if options.list_containers:
if "docker_containers" not in db or db["docker_containers"] == {}:
print("No monitored containers")
return
containers = db["docker_containers"]
print("Monitored Containers:\n")
print("CONTAINER ID\tIMAGE\t\tCOMMAND\t\t\tSTATUS")
for short_id, metadata in containers.items():
cont_image = metadata.get("Image", "?")
cont_command = metadata.get("Command", "?")
cont_status = metadata.get("Status", "?")
print(
'%s\t%s\t"%s"\t%s'
% (short_id, cont_image, cont_command[:20], cont_status)
)
return
if options.rebuild_container_metadata:
db["rebuild_container_metadata"] = True
print("Metadata queued for rebuild")
self.log.info("Container metadata rebuild queued")
return
requested_auto_update = False
try:
just_set_option_and_quit = False
if options.server_key:
just_set_option_and_quit = True
key = options.server_key
print(("Setting server key to %s" % key))
config.set("agent", "server_key", key)
if options.aggregator:
just_set_option_and_quit = True
agg = options.aggregator
print(("Setting aggregator endpoint to %s" % agg))
config.set("agent", "aggregator_url", agg)
if just_set_option_and_quit:
config.write(open(self.config_file, "wb"))
exit(0)
# Linux agent should not run if executed as root
if self.is_root and not options.rebuild_metadata:
self.log.error("Linux agent should not run if executed as root")
print("Linux agent should not run if executed as root")
return
server_key = config.get("agent", "server_key")
aggregator_client = aggregator.Client(
self.agg_url, self.version, server_key, proxy_config=self.proxy_config
)
# should never be here
if not server_key:
print("No server key found, please re-install the agent.")
exit(1)
if self.has_dem:
try:
needs_schedules = False
dem_client = DEMClient(self.dem_port, self.ipcPath)
schedules_received = dem_client.send_receive("schedules-init")
if schedules_received is None or "no" == schedules_received:
needs_schedules = True
if needs_schedules or self._agent_version_updated(db):
self._init_dem_schedules(aggregator_client)
except:
pass
plugins = PluginManager(
db,
self.config_file,
join(self.lib_dir, "plugins"),
self.custom_plugin_dir,
)
dem_client = DEMClient(self.dem_port)
wifi_info = dem_client.get_dem_wifi_info()
if wifi_info:
plugins.add_dem_wifi_results(wifi_info)
# Check on Countermeasures remote plugins update
if (
"enable_countermeasures" in config.options("agent")
and config.get("agent", "enable_countermeasures").lower() == "true"
and "countermeasures_remote_plugins" in config.options("agent")
and "countermeasures_refresh_plugins" in config.options("agent")
):
# See if we need to refresh
refresh_cycle = (
int(config.get("agent", "countermeasures_refresh_plugins")) * 3600
)
if (
"countermeasures_last_refresh" not in db
or time.time() - db["countermeasures_last_refresh"] > refresh_cycle
):
for url in config.get(
"agent", "countermeasures_remote_plugins"
).split(","):
self.log.info(
"Refreshing CounterMeasures plugins from %s" % url
)
cmd = "%s %s/countermeasure.py install_plugins --url %s &" % (
sys.executable,
self.bin_dir,
url.strip(),
)
os.system(cmd)
db["countermeasures_last_refresh"] = time.time()
elif "countermeasures_last_refresh" in db:
self.log.info(
"Waiting to refresh CM plugins in %d minutes"
% (
(
db["countermeasures_last_refresh"]
+ refresh_cycle
- time.time()
)
/ 60
)
)
# run
all_plugins_start_time = datetime.now()
results_to_send = []
custom_metrics = self.get_metric_values()
new_import_metrics, new_import_values, custom_incidents = (
self.process_imports(config)
)
# Create an anomalies container if it isn't already there
if "anomalies" not in db or db["anomalies"] == None:
db["anomalies"] = {}
for sr_id, schedule in list(db["schedules"].items()):
schedule_tkey = "%s.%s" % (
schedule.plugin_textkey,
schedule.resource_textkey,
)
# FIXME I gave the next check time a five-second leeway. There must be a better way!
# Ignore schedule freuqency for custom metrics from JSON files and report.py calls
leeway_time = 5
if (
schedule.plugin_textkey != self.CUSTOM
and schedule.resource_textkey not in custom_metrics
and schedule_tkey not in new_import_values
and schedule.next_check_time
> (all_plugins_start_time + timedelta(seconds=leeway_time))
):
self.log.info("%r too early to check", schedule)
continue
frequency = timedelta(seconds=schedule.frequency)
current_agent_delay = timedelta(seconds=self.current_agent_delay)
# Gave more leeway time to compensate the time sleeping if any. There must be a better way!
schedule.next_check_time = (
all_plugins_start_time + frequency - current_agent_delay
)
if schedule_tkey in new_import_values:
scale = plugins.config.get(schedule_tkey, {}).get("scale", 1.0)
for value, timestamp in new_import_values[schedule_tkey]:
if value is not None:
value *= scale
if value is None:
continue
results_to_send.append((sr_id, timestamp, value))
anomalies = {}
elif schedule.plugin_textkey == self.CUSTOM:
if schedule.resource_textkey not in custom_metrics:
continue
scale = plugins.config.get(schedule.plugin_textkey, {}).get(
"scale", 1.0
)
for value, timestamp in custom_metrics[schedule.resource_textkey]:
if value is not None:
value *= scale
if value is None:
continue
results_to_send.append((sr_id, timestamp, value))
anomalies = {}
elif schedule.plugin_textkey not in plugins.plugins:
# Likely a custom metric that didn't report in this period
# TODO: Better way to do this?
self.log.info("No custom value or plugin for %s", schedule_tkey)
continue
else:
plugin_start_time = datetime.now()
t0 = time.time()
value, anomalies = schedule.check(
plugins, db["anomalies"].get(schedule.id, {})
)
t1 = time.time()
self.log.debug(
"%r returned %s in %.2f seconds" % (schedule, value, t1 - t0)
)
if value is None:
continue
results_to_send.append(
(
sr_id,
time.mktime(plugin_start_time.timetuple()),
value,
)
)
self.log.info(
"Running all plugins took %s", datetime.now() - all_plugins_start_time
)
# Add data to our queue
db["result_queue"].update(results_to_send)
# sync
# If we're paused, we don't want to sync and will just exit here.
if db["pause"]:
if time.time() < db["pause"]:
db.save()
time_left = (db["pause"] - time.time()) / 60.0
self.log.info(
"Pause command received. Processing stopped. Process will resume in %.2f minutes."
% time_left
)
return
else:
# We've paused as long as instructed, now set pause to None and resume with the sync
db["pause"] = None
db.save()
self.log.info(
"Pause duration exceeded, unpausing the agent for the next run"
)
return
start_time = time.time()
# do we need to resend and re-cache metadata?
metadata = None
fortisase_attributes = {}
countermeasures_metadata = []
facts = None
# let's just ensure that once a day they push, just in case something
# gets out of sync
lucky_day = random.randint(0, 1440) == 0
# See if we need to rebuild the metadata - performed every hour, or if specified by the --rebuild-metadata
# command line option, or if the agent config file has changed since the last time we saw it
rebuild_metadata = False
if "last_metadata_time" not in db:
db["last_metadata_time"] = 0
if time.time() - db["last_metadata_time"] > self.metadata_rebuild_freq:
rebuild_metadata = True
if options.rebuild_metadata:
rebuild_metadata = True
if "last_config_file_time" not in db:
db["last_config_file_time"] = time.time()
last_config_file_time = os.path.getmtime(self.config_file)
if last_config_file_time > db["last_config_file_time"]:
rebuild_metadata = True
db["last_config_file_time"] = last_config_file_time
if rebuild_metadata:
self.log.info("Rebuilding plugin metadata")
db["last_metadata_time"] = time.time()
if "custom_plugin_url" in config.options("agent"):
plugins.install_remote_plugins(
config.get("agent", "custom_plugin_url")
)
stale = plugins.is_metadata_stale()
if (
stale
or options.rebuild_metadata
or lucky_day
or not db["has_connected_with_aggregator"]
):
metadata = plugins.metadata
if stale:
self.log.info("metadata changed")
elif options.rebuild_metadata:
self.log.info("rebuilding metadata")
elif lucky_day:
self.log.info("randomly forcing metadata rebuild")
elif not db["has_connected_with_aggregator"]:
self.log.info("we've never pushed up metadata before")
# If we're rebuilding metadata, also get the server facts
facts = Inspector(self).get_all_facts(wifi_info)
fortisase_attributes = self.get_fortisase_attributes()
# If Countermeasures is enabled, rebuild Countermeasure plugin metadata
# Dynamically load all available plugins, both in the default install directory
# and the customer's custom directory
countermeasures_metadata = []
if (
"enable_countermeasures" in config.options("agent")
and config.get("agent", "enable_countermeasures").lower() == "true"
):
for directory in (
os.path.join(self.lib_dir, "countermeasures", "plugins"),
self.countermeasures_custom_plugin_dir,
):
if not os.path.exists(directory):
continue
sys.path.append(directory)
for mod_name in os.listdir(directory):
if mod_name.endswith(".py") and not mod_name.startswith(
"__"
):
try:
mod = p_importlib.import_module(mod_name[:-3])
except:
self.log.exception(
"Unable to import module %s" % mod_name
)
continue
# Compute the hash of the plugin, being
if sha_func:
if six.PY2:
hash = sha_func(
open(
os.path.join(directory, mod_name)
).read()
).hexdigest()
else:
hash = sha_func(
open(os.path.join(directory, mod_name))
.read()
.encode("utf-8")
).hexdigest()
else:
hash = ""
for name, obj in list(mod.__dict__.items()):
if (
(sys.version_info[0] == 3 and type(obj) == type)
or (
sys.version_info[0] == 2
and type(obj) == types.ClassType
)
) and name.endswith("Countermeasure"):
try:
plugin = obj()
countermeasures_metadata.append(
{
"textkey": plugin.textkey,
"name": plugin.name,
"author": plugin.author,
"hash": hash,
"description": plugin.description,
}
)
except:
pass
if mod_name.endswith(".json"):
try:
json_counter = open(
os.path.join(directory, mod_name)
)
except:
self.log.error(
"Unable to open %s"
% os.path.join(directory, mod_name)
)
self.log.error(traceback.format_exc())
continue
file_content = json_counter.read()
if sha_func:
hash = sha_func(
file_content.encode("utf-8")
).hexdigest()
else:
hash = ""
json_counter.close()
try:
counter_data = json.loads(file_content)
except Exception:
self.log.error(
"%s file is not a valid json file to be read"
% mod_name
)
self.log.error(traceback.format_exc())
continue
required_fields = [
"name",
"textkey",
"command",
"author",
]
existing_keys = counter_data.keys()
success = True
for key in required_fields:
if key not in existing_keys or not counter_data.get(
key
):
self.log.error(
"%s is missing from the countermeasure declaration in %s"
% (key, mod_name)
)
success = False
break
if not success:
continue
textkey = counter_data.get("textkey")
countermeasures_metadata.append(
{
"textkey": textkey,
"name": counter_data.get("name"),
"author": counter_data.get("author"),
"hash": hash,
"description": counter_data.get("description"),
}
)
# Check if we can access Docker
if "docker_supported" not in db or not db["docker_supported"]:
can_access_docker = container_discovery.check_access()
if can_access_docker == "success":
db["docker_supported"] = True
self.log.info("Docker supported")
elif can_access_docker == "no-permission":
self.log.info("Missing permission to access Docker socket")
if "docker_supported" in db and db["docker_supported"]:
db["rebuild_container_metadata"] = True
# actually sync
response = {}
command_results = {}
# Send results of a log request back to the agent
if "log_request" in db and db["log_request"]:
command_results["log_request"] = str(db["log_request"])
if "diagnostics" in db and db["diagnostics"]:
command_results["diagnostics"] = str(db["diagnostics"])
if "socket_stats" in db and db["socket_stats"]:
command_results["socket_stats"] = str(db["socket_stats"])
if "mtr" in db and db["mtr"]:
command_results["mtr"] = str(db["mtr"])
auto_topo_scans = []
if "auto_topo_scans" in db:
auto_topo_scans = db["auto_topo_scans"]
try:
anomalies_to_report = []
self.log.info(
"Syncing with aggregator: %d results, %d anomalies",
len(results_to_send),
len(anomalies_to_report),
)
if metadata:
metadata_summary = dict(
(plugin_key, len(list(plugin_metadata.keys())))
for plugin_key, (_, plugin_metadata) in list(metadata.items())
)
self.log.debug("Metadata summary: %r", metadata_summary)
force_send_schedules = False
if db["num_syncs"] == 0 or db["schedules"] == {}:
force_send_schedules = True
if rebuild_metadata or db["sync_schedules"]:
force_send_schedules = True
# We have a lot of results coming into the aggregator all at once from
# various agents (every minute usually). We put an artificial random delay here before syncing
# to stagger the results that come in.
delay = random.randint(1, self.acceptable_sync_delay or 1)
time.sleep(delay)
# Pull results out of our queue to send
# If single result time is set, we only want to send the latest result, and not anything else
# in the queue.
if db["single_result"]:
if time.time() < db["single_result"]:
result_data = db["result_queue"].pop_results(
len(db["schedules"])
)
else:
db["single_result"] = None
result_data = db["result_queue"].pop_results()
else:
result_data = db["result_queue"].pop_results()
# See if we have any queued discovered containers to send up
discovered_containers = []
deleted_containers = []
MAX_CONTAINERS_SYNC = 20
if "discovered_containers" in db:
container_queue = db["discovered_containers"]
for i in range(min(len(container_queue), MAX_CONTAINERS_SYNC)):
discovered_containers.append(container_queue.pop(0))
if "deleted_containers" in db:
deleted_container_queue = db["deleted_containers"]
for i in range(len(deleted_container_queue)):
deleted_containers.append(deleted_container_queue.pop(0))
dem_results = self._getDemResults(db)
try:
# Set traceback limit 0 to include only the error message w/o the traceback
sys.tracebacklimit = 0
new_import_metrics = list(new_import_metrics.values())
if server_key:
response = aggregator_client.sync(
result_data,
anomalies_to_report,
metadata,
countermeasures_metadata,
facts,
discovered_containers,
deleted_containers,
self.get_registered_metrics(),
new_import_metrics,
custom_incidents,
self.get_update_config(),
self.get_all_ips(),
auto_topo_scans,
force_send_schedules,
command_results,
dem_enabled=self.has_dem,
dem_service_results=dem_results,
fortisase_attributes=fortisase_attributes,
)
db["log_request"] = None
db["diagnostics"] = None
db["socket_stats"] = None
db["mtr"] = None
db["auto_topo_scans"] = []
db["sync_schedules"] = None
dem_updates = {
"icmp_server_resources": response.get(
"icmp_server_resources", {}
),
"monitor_schedules": response.get("monitor_schedules", {}),
"traceroutes": response.get("traceroutes", []),
"traceroute_checks": response.get("traceroute_checks", {}),
}
self._updateDEMServiceSchedules(dem_updates)
else:
self.log.info("No server_key found, skipping sync")
except aggregator.ReinstallResponse as err:
self.log.warning(
f"Received a request tor reinstall the agent due to a existing conflict {err}"
)
# Reinstall response is for FortiSase agents so calculate the customer key dynamically
handshake_data = self.get_fortisase_attributes()
if handshake_data:
ems_serial = handshake_data["ems_serial"]
environment = handshake_data["forticlient_environment"]
customer_key = calculate_customer_key(ems_serial, environment)
self.install(
self.agg_url,
self.version,
server_key,
customer_key,
force=True,
)
else:
logging.warning(
f"Reinstall request received, forticlient data not available. Please verify. {handshake_data}"
)
return
except aggregator.UnauthorizedResponse as err:
self.log.warning(
f"Received an unauthorized response from the agg. Pausing execution of agent by {err} seconds"
)
self.db["pause"] = time.time() + int(str(err))
return
except:
# Failed to hit aggregator, so we'll put those results back into the queue
db["result_queue"].update(result_data)
for demKey in dem_results.keys():
q = db[demKey]
q.update(dem_results[demKey])
self.log.exception("Could not sync with aggregator")
self.log.debug("Saving results locally: %r", result_data)
db.save()
# Note: sys.exit() only raises a SystemExit exception.
return
if response.get("found_server", False):
db["has_connected_with_aggregator"] = True
db["num_syncs"] += 1
db["last_sync"] = datetime.now().strftime("%m/%d/%Y %H:%M")
except:
self.log.exception("Error syncing with aggregator")
else:
if rebuild_metadata:
db["last_metadata"] = plugins.hashed_metadata()
self.log.info(
"syncing took %.2f seconds", time.time() - start_time - delay
)
# Execute any Countermeasures in the response, spawned as separate background processes which can
# continue to execute after the agent exits
if (
"enable_countermeasures" in config.options("agent")
and config.get("agent", "enable_countermeasures").lower() == "true"
):
for countermeasure in response.get("countermeasures", []):
hash = countermeasure.get("hash")
textkeys = countermeasure.get("textkeys", [])
cm_metadata = countermeasure.get("metadata", {})
metadata_file = ""
# Write the JSON metadataout to a temp file
try:
fname = "countermeasure-metadata-%s.json" % hash
metadata_file = os.path.join(self.tmp_dir, fname)
f = open(metadata_file, "w")
f.write(json.dumps(cm_metadata))
f.close()
except Exception:
self.log.error(
"Failed parsing countermeasure metadata for %s: %s"
% (hash, textkeys)
)
self.log.error(traceback.format_exc())
self.log.info(
"Queueing countermeasures for %s: %s" % (hash, textkeys)
)
if textkeys:
cmd = (
"%s %s/countermeasure.py execute --hash %s --textkeys %s"
% (sys.executable, self.bin_dir, hash, " ".join(textkeys))
)
if metadata_file:
cmd += " --metadata-file %s" % metadata_file
os.spawnvp(os.P_NOWAIT, sys.executable, cmd.split())
# now process what we got back from the sync
self.update_schedules(response.get("schedules", []))
# process our agent commands
if response.get("commands", []):
self.log.info(
"got %d agent commands", len(list(response["commands"].keys()))
)
if "pause" in response["commands"]:
seconds = response["commands"]["pause"] # Seconds
db["pause"] = time.time() + seconds
if "single_result" in response["commands"]:
seconds = response["commands"]["single_result"]
db["single_result"] = time.time() + seconds
if "log_request" in response["commands"]:
lines = response["commands"][
"log_request"
] # Number of lines to tail from log
log_output = subprocess.check_output(
"tail -%d %s" % (lines, self.log_file), shell=True
)
db["log_request"] = log_output # We'll send back log output
if "queue_batch_size" in response["commands"]:
queue_batch_size = response["commands"]["queue_batch_size"]
db["result_queue"].queue_batch_size = queue_batch_size
if "queue_max_results" in response["commands"]:
queue_max_results = response["commands"]["queue_max_results"]
db["result_queue"].queue_max_results = queue_max_results
if "socket_stats" in response["commands"]:
try:
args = response["commands"].get("socket_stats")
timeout = args.get("timeout")
if timeout is None:
timeout = 10
timeout = int(timeout)
ss_cmd = "ss -t -u -r 2>&1"
if which("timeout"):
ss_cmd = "timeout %d %s" % (timeout, ss_cmd)
socket_stats = subprocess.check_output(ss_cmd, shell=True)
db["socket_stats"] = socket_stats
except:
db["socket_stats"] = traceback.format_exc()
if "mtr" in response["commands"]:
try:
args = response["commands"].get("mtr")
host = args.get("host")
timeout = args.get("timeout")
if timeout is None:
timeout = 10
timeout = int(timeout)
if host is None:
parsed_url = urlparse.urlparse(self.agg_url)
if parsed_url.hostname is None:
parsed_url = urlparse.urlparse("http://" + self.agg_url)
host = parsed_url.hostname
mtr_cmd = "mtr --csv -c 1 %s 2>&1"
mtr_cmd %= host
if which("timeout"):
mtr_cmd = "timeout %d %s" % (timeout, mtr_cmd)
mtr_output = subprocess.check_output(mtr_cmd, shell=True)
db["mtr"] = mtr_output
except:
db["mtr"] = traceback.format_exc()
# Change severity of log level
log_level_key = response["commands"].get("log_level")
if log_level_key is not None:
log_level_key = log_level_key.upper()
try:
log_level = getattr(logging, log_level_key)
db["log_level"] = log_level_key
self.log.setLevel(log_level)
level = logging.INFO
message = 'Set log level to "%s"'
except AttributeError:
level = logging.WARNING
message = 'Invalid log level command: "%s"'
self.log.log(level, message % log_level_key)
if "diagnostics" in response["commands"]:
db["diagnostics"] = self.build_diagnostics(
db, self.version, self.brand
)
if "metadata_resync" in response["commands"]:
db["last_metadata_time"] = 0
db["last_metadata"] = None
if (
"refresh_countermeasures" in response["commands"]
and "enable_countermeasures" in config.options("agent")
and config.get("agent", "enable_countermeasures").lower() == "true"
and "countermeasures_remote_plugins" in config.options("agent")
and "countermeasures_refresh_plugins" in config.options("agent")
):
for url in config.get(
"agent", "countermeasures_remote_plugins"
).split(","):
self.log.info(
"Refreshing CounterMeasures plugins from %s" % url
)
cmd = "%s %s/countermeasure.py install_plugins --url %s &" % (
sys.executable,
self.bin_dir,
url.strip(),
)
os.system(cmd)
db["countermeasures_last_refresh"] = time.time()
if "rebuild_container_metadata" in response["commands"]:
db["rebuild_container_metadata"] = True
if "update_agent" in response["commands"]:
requested_auto_update = True
if "sync_schedules" in response["commands"]:
db["sync_schedules"] = True
if "get_logs" in response["commands"]:
try:
self.upload_logs(server_key)
except:
pass
if self.is_root:
self.log.info(
"Linux agent running as root, skipping container discovery"
)
print("Linux agent running as root, skipping container discovery")
else:
if "docker_supported" in db and db["docker_supported"]:
if "docker_containers" not in db:
db["docker_containers"] = {}
rebuild_container_metadata = False
if (
"rebuild_container_metadata" in db
and db["rebuild_container_metadata"]
):
rebuild_container_metadata = True
db["rebuild_container_metadata"] = False
existing_containers = db["docker_containers"]
existing_container_ids = list(existing_containers.keys())
try:
found_containers = (
container_discovery.discover_docker_containers(
config,
plugins,
existing_containers,
rebuild=rebuild_container_metadata,
)
)
except Exception:
t, e = sys.exc_info()[:2]
self.log.error(e)
self.log.error(
"Docker has been enabled but the fm-agent user needs to be added to the docker group.\n"
"You can do so with `sudo usermod -a -G docker fm-agent`"
)
found_containers = None
if found_containers:
found_container_ids = [c["Id"][:12] for c in found_containers]
new_containers = []
for container in found_containers:
container_id = container["Id"][:12]
# Always update the db copy, in case something changed
existing_containers[container_id] = container
if (
rebuild_container_metadata
or container_id not in existing_container_ids
):
new_containers.append(container)
if "updated" in container and container["updated"]:
del container["updated"]
new_containers.append(container)
deleted_containers = []
for container_id, container in existing_containers.items():
if container_id not in found_container_ids:
deleted_containers.append(container_id)
# Actually delete
for container_id in deleted_containers:
del existing_containers[container_id]
if "discovered_containers" not in db:
db["discovered_containers"] = []
if "deleted_containers" not in db:
db["deleted_containers"] = []
db["discovered_containers"].extend(new_containers)
db["deleted_containers"].extend(deleted_containers)
self.log.info(
"Discovered %d new/updated containers", len(new_containers)
)
self.log.info(
"Found %d newly deleted containers", len(deleted_containers)
)
self.run_auto_topo_scans(config)
except:
self.log.exception("Error in main loop")
self.checkForUpdate(
db=db,
server_key=server_key,
agg_client=aggregator_client,
force=requested_auto_update,
)
# ideally this should be in a finally block, but older python verisons
# don't support try/except/finally, and we need all three
db.save()
self.log.info(
"Activity finished in {}s".format(
(datetime.now() - activityStart).total_seconds()
)
)
def get_fortisase_attributes(self):
try:
helper = ForticlientHelper()
return helper.get_handshake_data()
except Exception as err:
logging.warning(err)
return {}
def upload_logs(self, server_key):
import shutil
import tempfile
with tempfile.TemporaryDirectory() as tmpdirname:
shutil.copytree(self.log_dir, tmpdirname, dirs_exist_ok=True)
now = datetime.now()
zip_file_prefix = "agent-logs-{}".format(now.strftime("%Y%m%d%H%M%S"))
zip_output = os.path.join(tmpdirname, "zip")
os.mkdir(zip_output)
zip_name = shutil.make_archive(
os.path.join("/tmp", zip_file_prefix), "zip", tmpdirname
)
try:
endpoint = "{}/v2/agent_logs".format(self.agg_url)
cc = 'curl -F file=@{} -H "Accept: application/json" -H "Authorization: {}" {}'.format(
zip_name, server_key, endpoint
)
os.system(cc)
self.log.info("Uploaded log file {}".format(zip_name))
except Exception as e:
self.log.exception(e)
finally:
if os.path.isfile(zip_name):
os.remove(zip_name)
def checkForUpdate(self, db, server_key=None, agg_client=None, force=False):
if force:
self.log.info("Admin update request")
self._onCheckUpdates(agg_client=agg_client)
return
db_key = "next_update_check"
if not self.auto_update:
if db_key in db:
db.pop(db_key)
return
next_update_check = None
if not server_key:
self.log.warn("checkForUpdate: no server key")
return
if not agg_client:
agg_client = aggregator.Client(
self.agg_url, self.version, server_key, proxy_config=self.proxy_config
)
db_key = "next_update_check"
try:
update_period = timedelta(days=1)
if db_key not in db:
if self.scheduled_update is None:
from random import randrange
randomSec = randrange(int(update_period.total_seconds()))
db[db_key] = datetime.now() + timedelta(seconds=randomSec)
else:
try:
h, m = self.scheduled_update.split(":")
rn = datetime.now()
ct = datetime(
year=rn.year,
month=rn.month,
day=rn.day,
hour=int(h),
minute=int(m),
)
if ct < rn:
ct = ct + update_period
db[db_key] = ct
except Exception as e:
self.log.error(
"Could not calculate next check {}: {}".format(
self.scheduled_update, str(e)
)
)
return
self.log.info("Next update check at {}".format(db[db_key]))
return
next_update_check = db[db_key]
if not next_update_check or datetime.now() > next_update_check:
self._onCheckUpdates(agg_client)
if next_update_check is None:
next_update_check = datetime.now()
db[db_key] = next_update_check + update_period
self.log.info("Next update check at {}".format(db[db_key]))
except Exception as e:
self.log.error("checkForUpdates problem: {}".format(e))
def _onCheckUpdates(self, agg_client):
self.log.info("Performing updates check...")
#
# Note the agent_update_info endpoint expects a framework version, an artifact
# of the Windows agent. The aggregator won't use it for darwin, so just send the
# our version as the framework.
#
try:
endpoint = "agent_update_info/darwin/{}".format(self.version)
updates = agg_client.call(endpoint, method="GET")
if len(updates) > 0:
client = IPCClient(self.update_service_port, self.ipcPath)
client.send_receive("updates", payload=json.dumps(updates))
except Exception as e:
self.log.exception("Update check failure: {}".format(e))
def get_reportable_anomalies(self):
# Get the anomalies that are cleared and have previously been reported.
self.log.info("Gathering reportable anomalies")
# Get the anomalies that exceed duration and have not previously been
# reported. Also, mark them as reported.
cleared_anomalies = []
lengthy_anomalies = []
for schedule_id, anomalies in list(self.db["anomalies"].items()):
schedule = self.db["schedules"].get(schedule_id)
if not schedule:
# Resource schedule has been deleted from the central aggregator, but
# we still have an anomaly - clear that out and proceed
del self.db["anomalies"][schedule_id]
continue
for threshold_id, anomaly in list(anomalies.items()):
self.log.debug("Threshold %s", threshold_id)
if not anomaly.reported_as_cleared and anomaly.has_cleared(
schedule.number_of_checks
):
cleared_anomalies.append(
(
schedule_id,
threshold_id,
time.mktime(anomaly.time_last_detected.timetuple()),
False, # False indicates that this anomaly has cleared
)
)
anomaly.reported_as_cleared = True
self.log.debug("Cleared anomaly: %s", anomaly)
if (
not anomaly.reported_as_exceeded_duration
and anomaly.exceeds_duration()
):
lengthy_anomalies.append(
(
schedule_id,
threshold_id,
time.mktime(anomaly.time_last_detected.timetuple()),
True, # True indicates that this anomaly has exceeded duration
)
)
anomaly.reported_as_exceeded_duration = True
self.log.debug("Lengthy anomaly: %s", anomaly)
self.log.info("Found %d anomalies that have cleared", len(cleared_anomalies))
self.log.debug("Cleared anomalies: %r", cleared_anomalies)
self.log.info(
"Found %d anomalies that exceed the threshold duration",
len(lengthy_anomalies),
)
self.log.debug("Lengthy anomalies: %r", lengthy_anomalies)
self.db.save()
return cleared_anomalies + lengthy_anomalies
def remove_reported_cleared_anomalies(self):
self.log.info("Checking for reported cleared anomalies")
for schedule_id, anomalies in list(self.db["anomalies"].items()):
for threshold_id, anomaly in list(anomalies.items()):
if anomaly.reported_as_cleared:
anomaly = anomalies.pop(threshold_id)
self.log.info("Removed reported cleared anomaly")
self.log.debug("Anomaly: %s", anomaly)
if not anomalies:
self.db["anomalies"].pop(schedule_id)
self.log.debug("Remaining anomalies: %s", self.db["anomalies"])
self.db.save()
def update_schedules(self, new_schedules):
if new_schedules == [] or new_schedules == None:
self.log.info("No schedule changes received from aggregator")
return
existing_schedules = self.db["schedules"]
self.db["schedules"] = {}
for new_schedule_data in new_schedules:
new_schedule_id = new_schedule_data["id"]
self.log.info("Received schedule %s from aggregator", new_schedule_id)
schedule = existing_schedules.get(new_schedule_id, None)
try:
if schedule:
schedule.update(new_schedule_data)
del existing_schedules[schedule.id]
action = "Edited"
elif not schedule:
schedule = Schedule(new_schedule_data)
action = "Created"
self.db["schedules"][schedule.id] = schedule
self.log.info("%s schedule %s locally", action, schedule.id)
self.log.debug("Schedule data: %r", new_schedule_data)
except Exception:
err = sys.exc_info()[1]
error = str(err)
self.log.error(
"Invalid schedule {} data: {}".format(new_schedule_id, error)
)
# Our schedule setting doesn't call the correct setitem method,
# so we'll save here explicitly
self.db.save()
self.log.info("Created/updated %d schedules", len(new_schedules))
# Everything that's left is deleted.
self.log.info("Deleted %d schedules", len(existing_schedules))
def build_diagnostics(self, db, version, brand):
"""Function to build a string of diagnostics data to send
back to the aggregator."""
string = "AGENT DIAGNOSTICS\n"
string += "Agent version: %s\n" % self.version
string += "Agent server hostname: %s" % subprocess.check_output(
"hostname", shell=True
)
if "darwin" == sys.platform:
string += "Agent OS: %s" % subprocess.check_output(
"sw_vers | grep ProductVersion", shell=True
)
else:
string += "Agent OS: %s" % subprocess.check_output(
"cat /etc/*-release | grep PRETTY_NAME", shell=True
)
string += "uname output: %s" % subprocess.check_output("uname -a", shell=True)
if "darwin" != sys.platform:
string += "Package information: %s\n" % subprocess.check_output(
"apt-cache show %s-agent || true" % self.brand, shell=True
)
string += "ip output:\n%s" % subprocess.check_output("ip addr show", shell=True)
# Build pickle data
string += "Local agent pickle file data:\n%s\n" % json.dumps(
db.data, indent=2, default=self.defaultprint
)
return string
def defaultprint(self, obj):
if isinstance(obj, Schedule):
return obj.__repr__()
else:
return None
def open_db(self):
if not os.path.isdir(self.db_dir):
os.system("mkdir -p {}".format(self.db_dir))
try:
db = PickleDatabase(self.db_file)
except:
return None
# If something went wrong reading the pickle file, our data dict will
# be empty and we'll need to rebuild it. To be safe, always go through
# and add the keys that need to be there, in case something happened
# to them.
defaults = {
"anomalies": {},
"config_migrated": False,
"diagnostics": None,
# 'has_connected_with_aggregator' is to get around the problem of
# the aggregator issuing a "pause" command to an agent when the
# server key sent by the agent isn't found on the controlpanel. When
# an agent is first installed, this is the case, but we don't want
# to pause the agent. So we add this extra flag so that an agent
# will only pause if it has communicated with the aggregator before.
"has_connected_with_aggregator": False,
"last_metadata": None,
"last_sync": None,
"log_level": self.DEFAULT_LOG_LEVEL,
"num_syncs": 0,
"pause": None,
"result_queue": ResultQueue(),
"schedules": {},
"single_result": None,
"sync_schedules": None,
"check_results": ResultQueue(queue_max_results=1000, queue_batch_size=50),
"server_resource_levels": ResultQueue(
queue_max_results=1000, queue_batch_size=50
),
"traceroutes": ResultQueue(queue_max_results=100, queue_batch_size=5),
"traceroute_checks": ResultQueue(queue_max_results=50, queue_batch_size=5),
}
for key, default in list(defaults.items()):
if key not in db:
db[key] = default
return db
def should_run_auto_topo_scans(self, config):
try:
return config.get("topo", "auto_scan") == "1"
except:
return False
def get_num_topo_scans(self, config):
try:
return int(config.get("topo", "scans_per_sync"))
except:
return 0
def get_topo_scan_sleep(self, config):
try:
return int(config.get("topo", "scan_sleep"))
except:
return 1
def run_topo_scan(self):
ss_cmd = "ss -t -u -r 2>&1"
result = ""
t = time.time()
self.log.info("Starting topo scan")
try:
result = str(subprocess.check_output(ss_cmd, shell=True))
except:
result = traceback.format_exc()
elapsed = time.time() - t
self.log.info("Topo scan complete. Elapsed time: %.2f seconds" % elapsed)
return result
def run_auto_topo_scans(self, config):
if not self.should_run_auto_topo_scans(config):
return
n = self.get_num_topo_scans(config)
scan_sleep = self.get_topo_scan_sleep(config)
if "auto_topo_scans" not in self.db:
self.db["auto_topo_scans"] = []
for i in range(n):
t = time.time()
scan = self.run_topo_scan()
self.db["auto_topo_scans"].append((t, scan))
time.sleep(scan_sleep)
def _getDemResults(self, db):
rv = {}
if not self.has_dem:
return rv
client = DEMClient(self.dem_port, self.ipcPath)
response = client.send_receive("collect")
if response is None:
return rv
latestResults = json.loads(response)
for key in latestResults.keys():
try:
q = db[key]
if q.isEmpty():
rv[key] = latestResults[key]
else:
q.update(latestResults[key])
rv[key] = q.pop_results()
except Exception as e:
self.log.error("_getDemResults: {}".format(e))
continue
return rv
def _agent_version_updated(self, db):
has_update = False
if "last_ran_version" in db and db["last_ran_version"]:
if db["last_ran_version"] != self.version:
db["last_ran_version"] = self.version
has_update = True
else:
db["last_ran_version"] = self.version
has_update = True
return has_update
def _init_dem_schedules(self, client):
try:
response = client.call("schedules", method="GET")
schedules = {
"icmp_server_resources": response.get("icmp_server_resources", []),
"monitor_schedules": response.get("monitor_schedules", []),
"traceroute_checks": response.get("traceroute_checks", []),
}
dem_client = DEMClient(self.dem_port, self.ipcPath)
dem_client.send_receive("initSchedules", payload=json.dumps(schedules))
except Exception as aggEx:
logging.error("/schedules error: {}".format(str(aggEx)))
return
def _updateDEMServiceSchedules(self, newSchedules):
if not self.has_dem:
return
client = DEMClient(self.dem_port, self.ipcPath)
_ = client.send_receive("update-schedules", payload=json.dumps(newSchedules))