File: //var/opt/nydus/ops/customer_local_ops/__init__.py
from datetime import datetime
from typing import Any, Dict, List, Tuple, Union
from enum import Enum
import importlib
import logging
import shutil
from primordial.constants import CANONICAL_TIMESTRING_FORMAT
from primordial.encryptor import Encryptor
from customer_local_ops.exceptions import EncryptionKeyNotFound
from customer_local_ops.util.execute import runCommand
from customer_local_ops.util.helpers import create_file
from customer_local_ops.util.retry import Retry
LOG = logging.getLogger(__name__)
class OpType(Enum):
CONTROL_PANEL = "cp_op"
OPERATING_SYSTEM = "os_op"
CONTROL_PANEL_OPERATING_SYSTEM = "cp_os_op"
UNKNOWN = "unknown"
# TODO: Move to central repo
class ResourceType(Enum):
OVH = "ovh"
OPENSTACK = "openstack"
VIRTUOZZO_VM = "virtuozzo_vm"
OPENSTACK_HOSTINGCLOUD = "openstack_hostingcloud"
RESOURCE_TYPES = list(ResourceType)
NydusResult = Union[Retry, Tuple[bool, Dict[str, Any]], Tuple[bool, Dict[str, Any], int]]
class OpResult:
"""
Class whose objects encapsulate the complete result of an operation.
"""
def __init__(self, success: bool = False, result: Dict[str, Any] = None, retry_interval: int = None):
"""
Initialize an OpResult instance
:param success: Indicates whether or not the operation was successful
:param result: A dictionary containing any messages or other data from the operation
:param retry_interval: An optional retry interval, in seconds. If this value is greater that or equal to
zero, the operation will be retried in that many seconds.
"""
self.success = success
self.result = result
self.retry_interval = retry_interval
@property
def errs(self) -> str:
"""Convenience property for accessing the 'errs` key in the result dict"""
return self.result.get('errs', '') if isinstance(self.result, dict) else ''
@property
def outs(self) -> str:
"""Convenience property for accessing the 'outs` key in the result dict"""
return self.result.get('outs', '') if isinstance(self.result, dict) else ''
def as_tuple(self) -> NydusResult:
"""
Construct and return a correctly-sized tuple, based on whether or not a retry_interval value is present, for
passing back to Nydus.
"""
return ((self.success, self.result) if self.retry_interval is None
else (self.success, self.result, self.retry_interval))
class Ops:
DISK_UTILIZATION_PATH = None
RETRYABLE_ERRS = []
PANOPTA_MANIFEST_FILE = '/etc/panopta-agent-manifest'
DISTRO = None # override
OS_VERSION = None # override
QEMU_PACKAGE_NAME = None # override
def __str__(self):
return self.__class__.__name__
op_type = OpType.UNKNOWN
def get_op_type(self):
return self.op_type
def _get_vm_tag(self):
"""Return this VM's tag. Same return format as runCommand."""
raise NotImplementedError
def _encryption_key(self):
"""The encryption key."""
exit_code, outs, errs = self._get_vm_tag()
if exit_code != 0 or not outs:
LOG.error('Could not obtain encryption key')
raise EncryptionKeyNotFound(exit_code, outs, errs)
return outs
def decrypt(self, encrypted):
"""Decrypt an encrypted value."""
try:
key = self._encryption_key()
return Encryptor.decrypt(encrypted, key)
except Exception:
LOG.exception('Decrypt failed')
raise
def encrypt(self, target: str) -> str:
"""
Encrypt a plain-text value.
:param target: The value to be encrypted
:return: An encrypted string
"""
try:
key = self._encryption_key()
return Encryptor.encrypt(target, key)
except Exception:
LOG.exception('Encrypt failed')
raise
def write_file(self, path, content):
LOG.debug('Writing file: %s', path)
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
def get_os_op(self, os_op: str, control_panel_class_name: str = None) -> "Ops":
"""
Get an instance of an operating system Ops class or ControlPanel-OS Ops class
:param os_op: The operating system class name e.g. linux.CentOS7
:param control_panel_class_name: The control panel class name e.g. cpanel.CPanel
:return: An instance of the operating system class e.g. CentOS7 or CP-OS class e.g. CentOS7CPanel
"""
parts = os_op.split(".")
# Last part of os_op should be the family name followed by the os class name, e.g. linux.CentOS7
os_name = parts[-1]
family_name = parts[-2]
prefix = "customer_local_ops"
if control_panel_class_name is None and self.get_op_type() == OpType.CONTROL_PANEL:
# If we're requesting to get the os op class from a control panel class,
# assume we want the operating system/control panel hybrid class
control_panel_class_name = str(self)
if control_panel_class_name is not None:
# Return the operating system/control panel hybrid class
os_name = os_name + control_panel_class_name
cp_class_name = control_panel_class_name.lower()
module_path = "{prefix}.control_panel.{family_name}_{cp_class_name}".format(prefix=prefix,
family_name=family_name,
cp_class_name=cp_class_name)
else:
module_path = "{prefix}.operating_system.{family_name}".format(prefix=prefix, family_name=family_name)
mod = importlib.import_module(module_path)
class_ = getattr(mod, os_name)
os_op = class_()
return os_op
def build_result_dict(self, outs: str, errs: str, op_name: str) -> Dict[str, Any]:
"""
This function takes command output (outs, errs) and builds and returns a result
dict with op metadata and errors if appropriate
:param outs: The stdout from the run command
:param errs: The stderr from the run command
:param op_name: the name of the op calling this function
:return: a result dict
"""
unicode_null = "\u0000"
outs = outs.replace(unicode_null, "")
errs = errs.replace(unicode_null, "")
result = {'outs': outs, 'errs': errs, 'cls_name': str(self), 'op_type': self.get_op_type().value,
'op_name': op_name}
return result
def get_result_data(self, result: Any) -> OpResult:
"""
This function takes a result tuple or result dict and parses it to a result tuple of a consistent format
that can then be used by ops to inspect the result or build another result based on the first one
:param result: a dict or tuple result from an op
:return: A data structure containing the complete result
"""
if isinstance(result, tuple):
result_len = len(result)
if result_len == 1:
return OpResult(True, result[0])
if result_len == 2:
return OpResult(result[0], result[1])
if result_len == 3:
return OpResult(result[0], result[1], result[2])
# Should never get here
raise ValueError("result tuple has too many elements: {}".format(result_len))
return OpResult(True, result)
def build_result_from_other_result(self, result: Any, op_name: str) -> NydusResult:
"""
This function takes a result tuple/dict/Retry/str from a secondary op (usually an os op) and (if not a Retry)
extracts the metadata, output and errors from it and places it inside a result with metadata from the primary
op (usually a control panel op).
In this way it is clear:
a). what the primary op was and
b). that the error/output is coming from the secondary op
:param result: a dict or tuple result from an op
:param op_name: the name of the op calling this function
:return: a result dict, tuple or Retry instance
"""
if isinstance(result, Retry):
return result
data = self.get_result_data(result)
if isinstance(data.result, dict):
op_type = data.result.get('op_type', self.get_op_type().value)
op_name_other = data.result.get('op_name', op_name)
cls_name = data.result.get('cls_name', str(self))
prefix = "{op_type} {cls_name}.{op_name_other}".format(op_type=op_type, cls_name=cls_name,
op_name_other=op_name_other)
outs = "{prefix} outs: {outs}".format(prefix=prefix, outs=data.outs)
errs = data.errs
if errs:
errs = "{prefix} errs: {errs}".format(prefix=prefix, errs=data.errs)
else:
if data.success:
outs = str(data.result)
errs = ''
else:
outs = ''
errs = str(data.result)
data.result = self.build_result_dict(outs, errs, op_name)
return data.as_tuple()
def build_result_from_cmd_output(self, exit_code: int, outs: str, errs: str, op_name: str,
success_message: str = None) -> NydusResult:
"""
This function takes a result from a command output (exit_code, outs, errs) and builds and returns a result
tuple with op metadata and errors if appropriate
:param exit_code: The exit code returned from the run command
:param outs: The stdout from the run command
:param errs: The stderr from the run command
:param op_name: the name of the op calling this function
:param success_message: A message to use in place of 'outs' if the command ran successfully (optional)
:return: a result tuple
"""
success = exit_code == 0
if success and success_message is not None:
outs = success_message
return success, self.build_result_dict(outs, errs, op_name)
def run_command_and_handle_result(self,
command: Union[List[str], str],
op_name: str,
*args: Any,
intermediate_result: Dict[str, Any] = None,
**kw: Any) -> NydusResult:
"""Run command, return result suitable for a Nydus operation.
:param command: command list or string as per runCommand
:param op_name: name of the operation; used as runCommand tag and result op_name
:param args: additional positional arguments for runCommand
:param intermediate_result: Dict containing metadata for retries
:param kw: additional keyword arguments for runCommand
:return: command result as a Nydus operation result
"""
exit_code, outs, errs = runCommand(command, op_name, *args, **kw)
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)
# def configure_ips(self, network_node):
# raise NotImplementedError
# def patch_system(self, payload):
# raise NotImplementedError
# def patch_system_specific_update(self, payload):
# raise NotImplementedError
# def list_vm_patches(self, payload):
# raise NotImplementedError
def add_user(self, payload, unused=None):
raise NotImplementedError
# def remove_user(self, payload):
# raise NotImplementedError
# def change_password(self, payload):
# raise NotImplementedError
# def change_hostname(self, payload):
# raise NotImplementedError
# def install_package(self, payload):
# raise NotImplementedError
# def uninstall_package(self, payload):
# raise NotImplementedError
def shutdown_clean(self, unused=None):
# The `unused` param is an artifact of Archon workflows requiring an I/O
# chain for sequencing.
raise NotImplementedError
# def stop_web_server(self, none):
# raise NotImplementedError
# def start_web_server(self, none):
# raise NotImplementedError
# def prepare_web_server_ssl(self, none):
# raise NotImplementedError
# def deploy_cert(self, payload):
# raise NotImplementedError
# def install_cert(self, payload):
# raise NotImplementedError
# def configure_mta(self, payload):
# raise NotImplementedError
def enable_admin(self, username, unused=None):
# The `unused` param is an artifact of Archon workflows requiring an I/O
# chain for sequencing.
raise NotImplementedError
def disable_admin(self, username, unused=None):
# The `unused` param is an artifact of Archon workflows requiring an I/O
# chain for sequencing.
raise NotImplementedError
def disable_all_admins(self, unused=None):
# The `unused` param is an artifact of Archon workflows requiring an I/O
# chain for sequencing.
raise NotImplementedError
def _get_cpu_utilization(self) -> Dict[str, float]:
"""Return current CPU utilization.
:returns: dictionary with keys:
- cpuUsed: percentage in range 0-100
"""
raise NotImplementedError
def _get_disk_utilization(self) -> Dict[str, int]:
"""Return current disk utilization.
:returns: dictionary with keys:
- diskTotal: mebibytes
- diskUsed: mebibytes
"""
disk = shutil.disk_usage(self.DISK_UTILIZATION_PATH)
return {'diskTotal': disk.total >> 20, # B > MiB
'diskUsed': disk.used >> 20}
def _get_memory_utilization(self) -> Dict[str, int]:
"""Return current memory utilization.
:returns: dictionary with keys:
- memoryTotal: mebibytes
- memoryUsed: mebibytes
"""
raise NotImplementedError
def get_utilization(self) -> Dict[str, Union[float, int, str]]:
"""Return current system utilization.
:returns: dictionary with keys:
- collected: current time in
:data:`~primordial.constants.CANONICAL_TIMESTRING_FORMAT`
- memoryTotal: mebibytes
- memoryUsed: mebibytes
- diskTotal: mebibytes
- diskUsed: mebibytes
- cpuUsed: percentage in range 0-100
"""
result = {}
for stats in (self._get_cpu_utilization(),
self._get_disk_utilization(),
self._get_memory_utilization()):
result.update(stats)
result['collected'] = datetime.utcnow().strftime(
CANONICAL_TIMESTRING_FORMAT)
return result
def _create_panopta_manifest_file(self, payload):
""" Panopta agent uses a manifest file for configuration
of the customer id and templates to use. This method creates that manifest file.
"""
manifest_file = self.PANOPTA_MANIFEST_FILE
customer_key = payload['customer_key']
template_ids = payload['template_ids']
cust_key_contents = """[agent]
customer_key = {customer_key}
templates = {template_ids}
enable_countermeasures = false""".format(customer_key=customer_key, template_ids=template_ids)
server_key = payload.get('server_key')
if server_key:
cust_key_contents += "\nserver_key = {server_key}".format(server_key=server_key)
fqdn = payload.get('fqdn')
if fqdn:
cust_key_contents += "\nfqdn = {fqdn}".format(fqdn=fqdn)
server_name = payload.get('serverName')
if server_name:
cust_key_contents += "\nserver_name = {serverName}".format(serverName=server_name)
disable_server_match = payload.get('disable_server_match', False)
if disable_server_match:
cust_key_contents += "\ndisable_server_match = true"
create_file(manifest_file, cust_key_contents)
def install_panopta(self, payload, *args, **kwargs):
raise NotImplementedError
def delete_panopta(self, *args, **kwargs):
raise NotImplementedError
def get_panopta_server_key(self, *args, **kwargs):
raise NotImplementedError
def _result_handler(self, exit_code: int, outs: str, errs: str, op_name: str,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Take the result from a run command and check for retryable errors.
If found, return a Retry. If not, return a Nydus result tuple.
:param exit_code: The exit code from the executed command
:param outs: The stdout output from the executed command
:param errs: The stderr output from the executed command
:param op_name: The name of the op
:param intermediate_result: Dict containing metadata for retries
:return: A Nydus result tuple, or Retry
"""
LOG.info(self.RETRYABLE_ERRS)
success, result = self.build_result_from_cmd_output(exit_code, outs, errs, op_name, op_name + " succeeded")
if not success:
for retryable_err in self.RETRYABLE_ERRS:
if retryable_err in errs:
if intermediate_result is None:
intermediate_result = {}
intermediate_result['retryable_error'] = retryable_err
return Retry(intermediate_result)
return success, result