File: //usr/lib/fm-agent/countermeasures/cm_driver.py
"""
Countermeasures driver program - executes one or more countermeasures
Copyright 2023 Fortinet, Inc. All Rights Reserved.
fm-ops@fortinet.com
TODO:
- global block file to prevent all countermeasures from running?
- sudo access validation
- run time limits
"""
try:
import argparse
except:
import p_argparse as argparse
try:
# Python 2.x
import ConfigParser as configparser
except:
# Python 3.x
import configparser
try:
import json
except ImportError:
import simplejson as json
import p_importlib
import logging
import logging.handlers
import os
import os.path
import subprocess
import sys
import tarfile
import tempfile
import time
import traceback
import types
from datetime import datetime
try:
# Python 2.x
import urllib2
except:
import urllib.request as urllib2
try:
# Python 2.x
import urlparse
except:
import urllib.parse as urlparse
import zipfile
import aggregator
import agent_util
from .plugins.CountermeasurePlugin import JsonPlugin
MAX_OUTPUT_LENGTH = 100 * 1024
# Backported for Python 2.4 support
def any(iterable):
for element in iterable:
if element:
return True
return False
def load_plugins():
"""
Dynamically load all available Countermeasure plugins, in both the application default
and the customer's local custom plugin directory
"""
log = setup_logging("countermeasure")
plugins = {}
for directory in (
os.path.join(LIB_DIR, "countermeasures", "plugins"),
BASE_CUSTOM_PLUGIN_DIR,
):
if not os.path.exists(directory):
continue
log.info("Loading Countermeasure plugins from %s" % directory)
sys.path.append(directory)
for mod_name in os.listdir(directory):
if mod_name.endswith(".py") and not mod_name.startswith("__"):
try:
mod = p_importlib.import_module(mod_name[:-3])
except:
log.error(
"Unable to import plugin %s: %s"
% (mod_name, traceback.format_exc())
)
continue
for name, obj in list(mod.__dict__.items()):
if (
(sys.version_info[0] == 3 and type(obj) == type)
or (sys.version_info[0] == 2 and type(obj) == types.ClassType)
) and name.endswith("Countermeasure"):
try:
plugin = obj()
plugins[plugin.textkey] = plugin
except:
log.error(
"Unable to instantiate plugin %s: %s"
% (mod_name, traceback.format_exc())
)
if mod_name.endswith(".json"):
try:
json_counter = open(os.path.join(directory, mod_name))
except Exception:
log.error("Unable to open %s" % os.path.join(directory, mod_name))
log.error(traceback.format_exc())
continue
file_content = json_counter.read()
json_counter.close()
try:
counter_data = json.loads(file_content)
except Exception:
log.error("%s file is not a valid json file to be read" % mod_name)
log.error(traceback.format_exc())
continue
required_fields = ["name", "textkey", "command", "author"]
existing_keys = counter_data.keys()
success = True
for key in required_fields:
if key not in existing_keys or not counter_data.get(key):
log.error(
"%s is missing from the countermeasure declaration of %s"
% (key, mod_name)
)
success = False
break
if not success:
continue
try:
max_runtime = int(counter_data.get("max_runtime"))
except:
log.error(
"max runtime %s is not valid"
% (counter_data.get("max_runtime"))
)
max_runtime = None
try:
max_frequency = int(counter_data.get("max_frequency"))
except:
log.error(
"max frequency %s is not valid"
% (counter_data.get("max_frequency"))
)
max_frequency = None
textkey = counter_data.get("textkey")
plugin = JsonPlugin(counter_data.get("command"))
plugin.textkey = counter_data.get("textkey")
plugin.name = counter_data.get("name")
plugin.description = counter_data.get("description")
plugin.wall_announce_delay = counter_data.get("wall_announce_delay")
plugin.author = counter_data.get("author")
plugin.max_runtime = counter_data.get("max_runtime")
plugin.max_frequency = counter_data.get("max_frequency")
plugins[textkey] = plugin
return plugins
def load_agent_config():
agent_config_file = os.path.join(BASE_CONFIG_DIR, PKG_DIR, "%s_agent.cfg" % BRAND)
config_file = configparser.ConfigParser()
config_file.read(agent_config_file)
return config_file
def setup_logging(name):
log_file = os.path.join(BASE_LOG_DIR, PKG_DIR, "countermeasure.log")
root_logger = logging.getLogger(name)
agg_logger = logging.getLogger("Client")
handler = logging.handlers.RotatingFileHandler(
log_file, "a", maxBytes=5 * 1024**2, backupCount=5
)
handler.setFormatter(
logging.Formatter(
"%(process)d) %(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
)
root_logger.addHandler(handler)
agg_logger.addHandler(handler)
# If we have a TTY, add a stdout handler
if sys.stdin.isatty():
root_logger.addHandler(logging.StreamHandler())
root_logger.setLevel(logging.INFO)
agg_logger.setLevel(logging.INFO)
return root_logger
def set_last_execution(textkey):
last_execution_directory = os.path.join(
BASE_DATA_DIR, PKG_DIR, "countermeasures/last_execution"
)
if not os.path.exists(last_execution_directory):
os.makedirs(last_execution_directory)
os.system("touch %s" % os.path.join(last_execution_directory, textkey))
def get_last_execution(textkey):
last_execution_directory = os.path.join(
BASE_DATA_DIR, PKG_DIR, "countermeasures/last_execution"
)
if not os.path.exists(last_execution_directory):
os.makedirs(last_execution_directory)
filename = os.path.join(last_execution_directory, textkey)
if not os.path.exists(filename):
return 0
stat = os.stat(filename)
return stat.st_mtime
def install_plugins(url):
log = setup_logging("countermeasure")
log.info("\nFetching remote plugins from %s" % url)
f = tempfile.NamedTemporaryFile(delete=False)
num_installed = 0
ext = url.split(".")[-1]
fname = url.split("/")[-1]
if not os.path.exists(BASE_CUSTOM_PLUGIN_DIR):
os.system("mkdir %s" % BASE_CUSTOM_PLUGIN_DIR)
try:
r = urllib2.urlopen(url)
if "content-disposition" in r.info():
ext = r.info().getheader("content-disposition").split(".")[-1]
f.write(r.read())
f.close()
except:
log.error("Unable to download URL: %s" % traceback.format_exc())
return
if ext in ("tar", "tgz"):
try:
t = tarfile.open(f.name)
for file in t.getnames():
if file.endswith(".py") or file.endswith(".json"):
log.info(" Installing plugin %s" % file)
t.extract(file, BASE_CUSTOM_PLUGIN_DIR)
num_installed += 1
except:
log.error("Unable to extract tar contents: %s" % traceback.format_exc())
elif ext == "zip":
try:
z = zipfile.ZipFile(f.name)
for file in z.namelist():
if file.endswith(".py") or file.endswith(".json"):
log.info(" Installing plugin %s" % file)
z.extract(file, BASE_CUSTOM_PLUGIN_DIR)
num_installed += 1
except:
log.error("Unable to extract zip contents: %s" % traceback.format_exc())
elif ext == "py" or ext == "json":
log.info(" Installing plugin %s" % fname)
os.system("cp %s %s" % (f.name, os.path.join(BASE_CUSTOM_PLUGIN_DIR, fname)))
num_installed += 1
else:
log.error("Unable to install Countermeasure, unknown extension: %s" % ext)
if num_installed:
log.info("\nInstalled %s Countermeasure plugins" % num_installed)
os.system("rm -f %s" % f.name)
def list_plugins():
plugins = load_plugins()
pairs = []
for textkey, plugin in plugins.items():
pairs.append((plugin.name, plugin.author or "", plugin.description))
pairs.sort()
max_name_len = max([len(p[0]) for p in pairs])
max_author_len = max([len(p[1]) for p in pairs])
print("\nAvailable Countermeasures")
print("=========================\n")
fmt = "%-" + str(max_name_len + 3) + "s %-" + str(max_author_len + 3) + "s %s"
print(fmt % ("Name", "Author", "Description"))
print("-" * 80)
for textkey, author, description in pairs:
print(fmt % (textkey, author, description))
print("")
def validate_plugins():
plugins = load_plugins()
pairs = []
for textkey, plugin in plugins.items():
output = plugin.validate()
if output:
pairs.append((plugin.name, output))
if pairs:
pairs.sort()
max_len = max([len(p[0]) for p in pairs])
print("\nInvalid Countermeasures")
print("=========================")
fmt = "%-" + str(max_len + 3) + "s %s"
for textkey, description in pairs:
print(fmt % (textkey, description))
else:
print("\nAll plugins are valid.")
print("")
def validate_sudo():
plugins = load_plugins()
for textkey, plugin in plugins.items():
if plugin.sudo_requirements:
print("\nVerifying sudo requirements for %s" % textkey)
devnull = open("/dev/null", "w")
for app in plugin.sudo_requirements:
# If the app is a list, then we just need at least one of these to be available
if type(app) == list:
valid = any(
[
subprocess.call(
("sudo -n -l %s" % subapp).split(),
stdout=devnull,
stderr=devnull,
)
== 0
for subapp in app
]
)
# If not a list, just check one item
else:
valid = (
subprocess.call(
("sudo -n -l %s" % app).split(),
stdout=devnull,
stderr=devnull,
)
== 0
)
print(
" %-30s %s"
% (
("%s:%s" % (textkey, app)),
valid and "Pass" or "Missing Permissions",
)
)
print("")
devnull.close()
def execute(hash, textkeys, metadata_file=None):
# Setup logging and get config file
log = setup_logging("countermeasure %s" % hash)
config = load_agent_config()
# Read and parse any metadata if given
cm_metadata = {}
if metadata_file:
try:
f = open(metadata_file, "r")
metadata_contents = f.read()
f.close()
cm_metadata = json.loads(metadata_contents)
os.remove(metadata_file)
except Exception:
log.error("Problem while proccessing metadata:")
log.error(traceback.format_exc())
cm_metadata = {}
# Get available plugins
plugins = load_plugins()
proxy_info = {}
if config.has_section("agent_proxy"):
proxy_info = config["agent_proxy"]
# Iterate through the plugin textkeys, executing each one in succession
for textkey in textkeys:
if textkey not in plugins:
print("Unable to execute unknown plugin %s" % textkey)
log.error("Unable to execute unknown plugin %s" % textkey)
current_time = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
report_output(
plugin_textkey=textkey,
server_key=config.get("agent", "server_key"),
aggregator_url=config.get("agent", "aggregator_url").lower(),
agent_version=config.get("agent", "version"),
output=[
{
"timestamp": current_time,
"format": "text",
"output": "Unable to execute plugin %s" % textkey,
}
],
hash=hash,
return_code=1,
status="error",
proxy_info=proxy_info,
)
continue
plugin = plugins[textkey]
plugin.set_metadata(cm_metadata)
# Check how recently the countermeasure has been run
last_execution = time.time() - get_last_execution(textkey)
if plugin.max_frequency and last_execution < plugin.max_frequency:
log.warning(
"%s countermeasure skipped, ran too %d seconds ago"
% (textkey, last_execution)
)
continue
print("Executing %s" % textkey)
log.info("Executing %s" % textkey)
# Set last execution time to current time
set_last_execution(textkey)
# Run prepare operation
plugin.prepare()
# Post wall message if needed
if plugin.wall_announce_delay is not None:
os.system("wall 'Executing %s countermeasure'" % textkey)
if plugin.wall_announce_delay:
time.sleep(plugin.wall_announce_delay)
# Execute the plugin
try:
plugin.run()
plugin.status = "success"
except:
plugin.status = "error"
plugin.output = [
{
"timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"format": "text",
"output": "Exception executing plugin: %s" % traceback.format_exc(),
}
]
# Truncate output if it's too long
if len(plugin.output) > MAX_OUTPUT_LENGTH:
plugin.output = plugin.output[:MAX_OUTPUT_LENGTH]
proxy_info = {}
if config.has_section("agent_proxy"):
proxy_info = config["agent_proxy"]
# Report the output
report_output(
plugin_textkey=plugin.textkey,
server_key=config.get("agent", "server_key"),
aggregator_url=config.get("agent", "aggregator_url").lower(),
agent_version=config.get("agent", "version"),
output=plugin.output,
return_code=plugin.return_code,
hash=hash,
status=plugin.status,
proxy_info=proxy_info,
)
log.info("Completed countermeasure %s" % textkey)
def report_output(**kwargs):
aggregator_url = kwargs.get("aggregator_url")
server_key = kwargs.get("server_key")
agent_version = kwargs.get("agent_version")
if not aggregator_url.startswith("http"):
try:
host, port = aggregator_url.split(":")
if port in ("443", "8443"):
aggregator_url = "https://" + aggregator_url
else:
aggregator_url = "http://" + aggregator_url
except:
aggregator_url = "https://" + aggregator_url
aggregator_url = urlparse.urljoin(aggregator_url, "/v2/countermeasure")
payload = {
"countermeasure_textkey": kwargs.get("plugin_textkey"),
"server_key": server_key,
"incident_hash": kwargs.get("hash"),
"output": kwargs.get("output"),
"return_code": kwargs.get("return_code"),
"status": kwargs.get("status"),
}
proxy_info = kwargs.get("proxy_info")
aggregator_client = aggregator.Client(
aggregator_url, agent_version, server_key, proxy_config=proxy_info
)
aggregator_client.call("countermeasure", payload)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"action",
choices=[
"execute",
"list_plugins",
"install_plugins",
"validate_plugins",
"validate_sudo",
],
)
parser.add_argument("--url", required=False, help="URL to download plugins from")
parser.add_argument("--hash", required=False, help="Incident hash")
parser.add_argument(
"--textkeys",
required=False,
nargs="+",
help="CounterMeasure plugin textkeys to execute",
)
parser.add_argument(
"--metadata-file",
required=False,
help="Path to file containing JSON metadata that will be passed to plugin",
)
args = parser.parse_args()
# Perform logical validation to make sure we got the correct arguments based on the action type
if args.action == "execute" and (args.hash == None or args.textkeys == None):
print("ERROR: Need to provide --hash and --textkey arguments\n")
parser.print_help()
sys.exit(1)
elif args.action == "install_plugins" and args.url == None:
print("ERROR: Need to provide --url argument\n")
parser.print_help()
sys.exit(1)
# Dispatch to action methods
if args.action == "execute":
execute(args.hash, args.textkeys, args.metadata_file)
elif args.action == "list_plugins":
list_plugins()
elif args.action == "install_plugins":
install_plugins(args.url)
elif args.action == "validate_plugins":
validate_plugins()
elif args.action == "validate_sudo":
validate_sudo()