File: //var/opt/nydus/ops/customer_local_ops/control_panel/windows_plesk.py
# -*- coding: utf-8 -*-
from subprocess import list2cmdline
from typing import Any, Dict, Tuple, Union, List
import logging
import glob
import json
import os
from customer_local_ops import OpType, ResourceType, NydusResult
from customer_local_ops.operating_system.windows import Windows, Windows2016, Windows2019, Windows2022, Windows2025
from customer_local_ops.control_panel import SCRIPT_BASEPATH
from customer_local_ops.control_panel.plesk import OSPlesk
from customer_local_ops.util.execute import (runCommand, run_powershell_file, run_uapi_command,
run_multiple_uapi_commands, start_powershell_file)
from customer_local_ops.util.retry import Retry
LOG = logging.getLogger(__name__)
PLESK_DIR_17 = 'C:\\Program Files (x86)\\Plesk\\'
PLESK_DIR_12 = 'C:\\Program Files (x86)\\Parallels\\Plesk\\'
# Number of seconds for Nydus to wait before retrying the previous workflow stop if a DLL conflict was detected
DLL_CONFLICT_COMMAND_RETRY_INTERVAL = 10
PLESK_FIX_DLL_CONFLICT_SCRIPT_LOG_FILE = r'C:\Windows\TEMP\plesk-fix-dll-conflict.log'
CmdsType = Union[str, List[str], List[Tuple[str, Union[str, List[str]]]]]
def has_dll_version_conflict(text: str) -> bool:
"""
Check to see of the specified text contains an indication of the Python/PHP DLL conflict
:param text: The text to be checked
:return: True if the conflict is detected; otherwise, False
"""
# 'vcruntime140.dll' 14.0 is not compatible with this PHP build linked with 14.16 in Unknown on line 0
conflict_detected = "'vcruntime140.dll' 14.0 is not compatible" in text
if conflict_detected:
LOG.debug("DLL conflict detected")
else:
LOG.debug("No DLL conflict detected")
return conflict_detected
class WindowsPlesk(Windows, OSPlesk):
"""
Plesk Customer Local Ops for the Windows OS.
All function names should contain 'plesk' so as not to override the OS ops
"""
op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM
plesk_dir = None
RETRYABLE_ERRS = ['The Plesk administrator password cannot be changed until the server cloning is finished',
'No connection could be made because the target machine actively refused it',
'Could not resolve host']
def get_plesk_dir(self) -> str:
"""Find the installation directory for plesk
:raises RuntimeError: If a plesk path cannot be found
:return: full path to plesk installation
"""
if self.plesk_dir is None:
# environment variable for plesk dir.
if os.getenv('plesk_dir'):
self.plesk_dir = os.getenv('plesk_dir')
# plesk 17 default
elif os.path.exists(PLESK_DIR_17):
self.plesk_dir = PLESK_DIR_17
# plesk 12 default
elif os.path.exists(PLESK_DIR_12):
self.plesk_dir = PLESK_DIR_12
# go find a plesk!
else:
pleskpath = glob.glob('C:\\Program Files (x86)\\**\\Plesk\\', recursive=True)
if pleskpath and isinstance(pleskpath, list):
self.plesk_dir = pleskpath[0]
if self.plesk_dir is not None:
LOG.info("get_plesk_dir: %s", self.plesk_dir)
return self.plesk_dir
raise RuntimeError("plesk path could not be found")
def license_plesk(self, activation_key: str, *args: Any,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Run license utility on local vm with activation key
:param activation_key: Key value passed back from license plesk op on hfs executor
:param intermediate_result: Dict containing metadata for retries
:return: tuple with success, error data
"""
LOG.info("install plesk license on '%s'", str(self))
op_name = 'license_plesk'
command = 'license.exe'
full_command = self.get_path_plesk(command)
LOG.info("full command: %s", full_command)
cmd = '"{full_command}" --install {activation_key}'.format(full_command=full_command,
activation_key=activation_key)
exit_code, outs, errs = run_uapi_command(cmd,
'set Plesk License', 'license_plesk', use_shell=True)
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
# pylint: disable=too-many-locals
def enable_plesk(self, vm_ip: str, vm_resource: str, plesk_user: str, plesk_pass: str,
*args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Enable plesk on local server
:param vm_ip: External IP address of the server
:param vm_resource: The resource name for the third-party hosting provider
:param plesk_user: User name to be used on Plesk instance
:param plesk_pass: Password to be used on Plesk instance
:param intermediate_result: Dict containing metadata for retries
:raises DecryptError: if there is a problem with decrypting the password
:return: tuple with success, error data
"""
LOG.info("enable plesk for VM IP %s", vm_ip)
op_name = self.OP_ENABLE_PLESK
password_arg = list2cmdline([self.decrypt(plesk_pass)])
ops_map = self.PLESK_OPS_RESOURCE_ATTRIBUTE_MAP[ResourceType(vm_resource)][op_name]
init_conf_cmd = self.get_path_plesk('init_conf.exe')
check_configured_cmd = '"{}" --check-configured'.format(init_conf_cmd)
exit_code, outs, errs = run_uapi_command(check_configured_cmd, "Check Plesk configured", op_name,
use_shell=True)
res = self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
if isinstance(res, Retry):
return res
if not res[0]:
# Plesk is not configured
init_conf_setup_flag = '--init'
else:
# Plesk is configured
init_conf_setup_flag = '--update'
enable_cmds = [
('set minimum password strength',
'"{server_pref}" -u -min_password_strength medium'.format(
server_pref=self.get_path_plesk('server_pref.exe'))),
('setup Plesk', ops_map[self.SETUP_CMD].format(
init_conf_cmd='"{}"'.format(init_conf_cmd),
init_conf_setup_flag=init_conf_setup_flag,
plesk_user=plesk_user, password=password_arg)),
('set Poweruser', '"{poweruser_cmd}" --on'.format(
poweruser_cmd=self.get_path_plesk('poweruser.exe'))),
('reconfigurator', '"{reconfigurator}" /check=Services /no-gui'.format(
reconfigurator=self.get_path_plesk('reconfigurator.exe', ['admin', 'bin']))),
('Repair sslcerts', '"{}" repair web -sslcerts'.format(self.get_path_plesk('plesk.exe')))
]
exit_code, outs, errs = run_multiple_uapi_commands(enable_cmds, op_name, omit_string=password_arg)
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
def site_list_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any:
""" Retrieve a list of Plesk sites
:param intermediate_result: Dict containing metadata for retries
:return: list of sites or tuple with error data
"""
LOG.info("get site list")
op_name = 'site_list_plesk'
plesk_dir_arg = [str(self.get_plesk_dir())]
exit_code, outs, errs = run_powershell_file(SCRIPT_BASEPATH / 'powershell' / 'plesk_site_list.ps1',
op_name, False, plesk_dir_arg)
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
logging.info(outs)
return json.loads(outs) if outs else ''
def server_prep_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult:
""" Install Plesk on a server
:param intermediate_result: Dict containing metadata for retries
:return: tuple with success, error data
"""
LOG.info("install plesk on '%s'", str(self))
op_name = 'server_prep_plesk'
exit_code, outs, errs = run_powershell_file(
SCRIPT_BASEPATH / 'powershell' / 'plesk_server_prep.ps1', 'Prep Plesk')
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
def get_client_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any:
""" Get Plesk SSO URL
:param intermediate_result: Dict containing metadata for retries
:return: SSO URL string or tuple with error data
"""
op_name = 'get_client_plesk'
exit_code, outs, errs = runCommand(["plesk", "bin", "admin", "--get-login-link"], 'get sso link')
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
links = outs.split('\n')
sso_url = str(links[0])
return self.encrypt(sso_url).decode('utf-8')
def change_hostname_plesk(self, hostname: str, *args: Any,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
""" Change hostname on Plesk
:param hostname: New name to be assigned to the host
:param intermediate_result: Dict containing metadata for retries
:return: tuple with success, error data
"""
op_name = 'change_hostname_plesk'
rcmd = '"{plesk_path}" --update -hostname {hostname}'.format(plesk_path=self.get_path_plesk('server_pref.exe'),
hostname=hostname)
exit_code, outs, errs = run_uapi_command(rcmd, 'set Plesk hostname', op_name, use_shell=True)
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
def change_admin_password_plesk(self, plesk_admin_pass: str, *args: Any,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
""" Change admin password on Plesk
:param plesk_admin_pass: Encrypted password for Plesk admin user
:param intermediate_result: Dict containing metadata for retries
:return: tuple with success, error data
"""
op_name = 'change_admin_password_plesk'
password = self.decrypt(plesk_admin_pass)
# Perform operations on Control Panel
command_kwargs = {'omit_string': password}
plesk_bin = self.get_path_plesk(None, ['bin', 'admin'])
command = [plesk_bin, '--set-admin-password', '-passwd', password]
exit_code, outs, errs = run_uapi_command(
command, 'set Plesk admin password', op_name, **command_kwargs)
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result)
def _check_for_dll_conflict(self, exit_code: int, result: Dict[str, Any],
intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""
Check to see if the output of the previous command indicates that a DLL version conflict exists and, if so
attempt remediation.
:param exit_code: The exit code from the previous command
:param result: The result dictionary from the previous command
:param intermediate_result: Dict containing metadata for retries
:return: A 2-tuple if no remediation is necessary, otherwise a 3-tuple where the last element is a
retry interval in seconds
"""
if exit_code == 0:
if intermediate_result is not None and intermediate_result.get('delete_log_file', False):
try:
os.unlink(PLESK_FIX_DLL_CONFLICT_SCRIPT_LOG_FILE)
except Exception: # pylint: disable=broad-except
pass # We don't care if we're trying to delete a file that doesn't exist
return True, result
if has_dll_version_conflict(result.get('errs', '')):
LOG.error("DLL conflict detected; attempting to remediate")
code, _, _ = start_powershell_file(SCRIPT_BASEPATH / 'powershell' / 'plesk_fix_dll_conflict.ps1')
if code == 0:
if intermediate_result is None:
intermediate_result = {}
intermediate_result['delete_log_file'] = True
return Retry(intermediate_result) if code == 0 else (False, result)
return False, result
def _result_handler(self, exit_code: int, outs: str, errs: str, op_name: str,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Take the result from a run command and check for retryable errors.
If found, return a Retry. If not, return a Nydus result tuple.
:param exit_code: The exit code from the executed command
:param outs: The stdout output from the executed command
:param errs: The stderr output from the executed command
:param op_name: The name of the op
:param intermediate_result: Dict containing metadata for retries
:return: A Nydus result tuple, or Retry
"""
success, result = self.build_result_from_cmd_output(exit_code, outs, errs, op_name, op_name + " succeeded")
if not success:
dll_res = self._check_for_dll_conflict(exit_code, result, intermediate_result=intermediate_result)
if isinstance(dll_res, Retry):
return dll_res
return super()._result_handler(exit_code, outs, errs, op_name, intermediate_result)
return success, result
class Windows2016Plesk(Windows2016, WindowsPlesk):
pass
class Windows2019Plesk(Windows2019, WindowsPlesk):
pass
class Windows2022Plesk(Windows2022, WindowsPlesk):
pass
class Windows2025Plesk(Windows2025, WindowsPlesk):
pass