File: //proc/thread-self/root/lib/fm-agent/plugins/cert.py
from subprocess import check_output, CalledProcessError
import agent_util
import logging
import os
from os import listdir
from os.path import isfile, join
import datetime
"""
Config file example:
[cert]
certpath = /path/to/the/certs
"""
class CertPlugin(agent_util.Plugin):
textkey = "cert"
label = "Cert Expiration Date"
@classmethod
def get_metadata(cls, config):
try:
certpath = config.get("certpath")
if not os.path.exists(certpath):
status = agent_util.UNSUPPORTED
msg = "Missing cert directory in configuration"
options = []
else:
status = agent_util.SUPPORTED
msg = []
options = CertPlugin.get_certs(certpath, config)
except Exception:
status = agent_util.UNSUPPORTED
msg = "Exception gathering cert directory"
options = []
metadata = {
"days_to_expire": {
"label": "Certificate Expiration Status",
"options": options,
"status": status,
"error_message": msg,
"unit": "days",
},
}
return metadata
def check(self, textkey, data, config):
self.log.debug("Checking cert expiration {} - {}".format(textkey, data))
day = CertPlugin._get_cert_data(config, data)
return day
@staticmethod
def _get_cert_data(config, name):
certpath = config.get("certpath")
certs = CertPlugin.get_certs(certpath, config)
for each in certs:
if each != name:
continue
cert = certpath + "/" + each
openssl = "openssl x509 -in " + cert + " -noout -enddate"
try:
ret, output = agent_util.execute_command(openssl)
if ret != 0:
logging.error(output)
raise ValueError("Error Reading cert file")
except ValueError:
return False
datestr = output.split("=")[-1]
datespl = datestr.split("\n", 1)[0]
dateformat = datetime.datetime.strptime(datespl, "%b %d %H:%M:%S %Y %Z")
diff = dateformat - datetime.datetime.utcnow()
day = diff.days
return day
@staticmethod
def get_certs(certpath, config):
# Collect all the .crt files in a list
if not os.path.isdir(certpath):
logging.error(
"certpath specified on config file:" + certpath + " does not exist"
)
else:
certfiles = [
f
for f in listdir(certpath)
if isfile(join(certpath, f)) and f.endswith(".crt")
]
logging.info(
"Certificates found under" + str(certpath) + ":" + str(certfiles)
)
return certfiles