File: //proc/thread-self/root/usr/lib/fm-agent/plugins/package_upgrade.py
# from dateutil.parser import parse
# from dateutil.relativedelta import relativedelta
from datetime import datetime, date
import time
import gzip
import os
import agent_util
import re
DEBIAN = 1
REDHAT = 2
IS_DEBIAN = agent_util.which("/usr/bin/apt-get") is not None
IS_REDHAT = agent_util.which("/usr/bin/yum") is not None
class PackageUpgradePlugin(agent_util.Plugin):
textkey = "package_upgrade"
label = "Package Upgrades"
@classmethod
def get_metadata(self, config):
package_count_status = agent_util.UNSUPPORTED
package_date_status = agent_util.UNSUPPORTED
package_count_msg = None
package_date_msg = None
if IS_DEBIAN or IS_REDHAT:
package_date_status = agent_util.SUPPORTED
if IS_DEBIAN:
try:
import apt_check
if (
agent_util.execute_command("sudo /usr/bin/apt-get --version")[0]
== 0
):
package_count_status = agent_util.SUPPORTED
except:
package_count_msg = "Insufficient permission - enable sudo access to apt-get for agent user"
else:
if PackageUpgradePlugin._can_use_sudo():
package_count_status = agent_util.SUPPORTED
else:
package_count_msg = "Insufficient permission - enable sudo access to yum for agent user"
else:
package_date_msg = "Unsupported platform"
package_count_msg = "Unsupported platform"
self.log.info("Unsupported platform")
return {}
metadata = {
"packages.security": {
"label": "Security-related packages waiting to be updated",
"options": None,
"status": package_count_status,
"error_message": package_count_msg,
"unit": "",
"option_string": False,
},
"packages.nonsecurity": {
"label": "Non-security-related packages waiting to be updated",
"options": None,
"status": package_count_status,
"error_message": package_count_msg,
"unit": "",
"option_string": False,
},
"packages.lastupdated": {
"label": "Days since the last package update was run",
"options": None,
"status": package_date_status,
"error_message": package_date_msg,
"unit": "",
"option_string": False,
},
"packages.check_installation": {
"label": "Check for Package installation",
"options": None,
"status": package_date_status,
"error_message": package_date_msg,
"unit": "",
"option_string": True,
},
}
return metadata
@classmethod
def _can_use_sudo(self):
"""
Verify that the user running the agent has enough permissions to run.
"""
if agent_util.execute_command("sudo /usr/bin/yum --help")[0] == 0:
return True
else:
self.log.error(
"Insufficient permission - Enable sudo access for agent user."
)
return False
def check(self, textkey, data, config={}):
if IS_DEBIAN:
if textkey in ["packages.security", "packages.nonsecurity"]:
try:
import apt_check
except:
return 0
if agent_util.execute_command("sudo /usr/bin/apt-get update")[0] != 0:
return 0
upgrades, security_updates = apt_check.check()
if textkey == "packages.security":
return security_updates
else:
return upgrades
if textkey == "packages.security":
return security_updates
else:
return upgrades
elif textkey == "packages.lastupdated":
# Get list of apt history log files, from newest to oldest, search each one til we find an update
files = (
agent_util.execute_command("ls -t /var/log/apt/history*")[1]
.strip()
.split("\n")
)
for f in files:
if f.endswith(".gz"):
lines = agent_util.execute_command("zcat %s" % f)[1]
else:
lines = agent_util.execute_command("cat %s" % f)[1]
matches = re.findall("Upgrade:.+\nEnd-Date:(.*)\n", lines)
if matches:
try:
dt = matches[-1].strip()
d, t = dt.split()
d = datetime.strptime(d, "%Y-%m-%d")
age = (datetime.now() - d).days
return max(age, 0)
except:
self.log.error("Error parsing last upgrade time in %s" % f)
# if we got here, we didn't find anything. Return None as a marker
return None
elif textkey == "packages.check_installation":
if data:
command = "dpkg-query -l %s" % data.strip()
if agent_util.execute_command(command)[0] != 0:
return 0
else:
return 1
elif IS_REDHAT:
if textkey in ["packages.security", "packages.nonsecurity"]:
if not self._can_use_sudo():
return None
retcode, output = agent_util.execute_command(
"sudo yum check-update --security"
)
if "\n\n" not in output:
num_sec_packages = 0
else:
num_sec_packages = len(output.split("\n\n")[-1].split("\n"))
if textkey == "packages.security":
return num_sec_packages
else:
retcode, output = agent_util.execute_command(
"sudo yum check-update"
)
if "\n\n" not in output:
num_packages = 0
else:
num_packages = len(output.split("\n\n")[-1].split("\n"))
return max(0, num_packages - num_sec_packages)
elif textkey == "packages.lastupdated":
# Get list of apt history log files, from newest to oldest,
# search each one til we find an update
if not self._can_use_sudo():
# Return 0 cause we can't determine the update time
return None
files = (
agent_util.execute_command("ls -t /var/log/yum.log*")[1]
.strip()
.split("\n")
)
for f in files:
lines = (
agent_util.execute_command(
"sudo zgrep -A 1 Updated: %s | tail -n 1" % f
)[1]
.strip()
.split("\n")
)
if len(lines) >= 1:
d = lines[-1][:6]
if not d:
# Logs came up empty due to rotation.
continue
try:
d = datetime.strptime(d, "%b %d").replace(
year=date.today().year
)
except AttributeError:
# strptime doesn't appeared until Py2.5, using fallback time.
d = datetime(*(time.strptime(d, "%b %d")[0:6])).replace(
year=date.today().year
)
if d > datetime.now():
d = d.replace(year=date.today().year - 1)
age = (datetime.now() - d).days
return max(age, 0)
elif textkey == "packages.check_installation":
if data:
command = "rpm -qi %s" % data.strip()
if agent_util.execute_command(command)[0] != 0:
return 0
else:
return 1
# if we got here, we didn't find anything. Return None as a marker
return None
# If we get here, we aren't running on a system where we can actually determine what's available.
# Default to None to signify error.
return None