File: //var/opt/nydus/ops/customer_local_ops/control_panel/linux_plesk.py
# -*- coding: utf-8 -*-
from typing import Dict, Any
import json
import logging
import os
import re
import shlex
import sys
from enum import IntEnum
from fileinput import FileInput
from customer_local_ops import OpType, ResourceType, NydusResult
from customer_local_ops.operating_system.linux import (AlmaLinux8, AlmaLinux9, Linux, CentOS, CentOS6, CentOS7, Debian,
Debian8, Debian10, Debian11, Debian12, Ubuntu1604,
Ubuntu2004, Ubuntu2204, Ubuntu2404)
from customer_local_ops.control_panel.plesk import OSPlesk
from customer_local_ops.util.execute import runCommand, run_uapi_command, run_multiple_uapi_commands
from customer_local_ops.util.retry import Retry
LOG = logging.getLogger(__name__)
PLESK_DIR_1 = "/opt/psa"
PLESK_DIR_2 = "/usr/local/psa"
class LinuxPlesk(Linux, OSPlesk):
"""
Plesk Customer Local Ops for the Linux OS.
All function names should contain 'plesk' so as not to override the OS ops
"""
op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM
PLESK_INSTALL_LOCATIONS = [
PLESK_DIR_1,
PLESK_DIR_2,
]
plesk_dir = None
RETRYABLE_ERRS = ['The Plesk administrator password cannot be changed until the server cloning is finished',
'No connection could be made because the target machine actively refused it',
'Could not resolve host']
def get_plesk_dir(self) -> str:
"""Find the installation directory for plesk
:return: full path to plesk installation
"""
# Look for plesk in any of several locations. On Ubuntu, it's currently installed in
# /opt/psa and on CentOS, it can be found in /usr/local/psa. The list below could
# easily be made into a configurable item so that new potential locations could be
# added without having to change the code.
if self.plesk_dir is not None:
return self.plesk_dir
for loc in self.PLESK_INSTALL_LOCATIONS:
if os.path.isdir(loc):
LOG.info(loc)
self.plesk_dir = loc
return self.plesk_dir
raise RuntimeError("plesk path could not be found")
def license_plesk(self, activation_key: str, *args: Any,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Run license utility on local vm with activation key
:param activation_key: Key value passed back from license plesk op on hfs executor
:param intermediate_result: Dict containing metadata for retries
:return: tuple with success, error data
"""
LOG.info("install plesk license on '%s'", str(self))
op_name = 'license_plesk'
command = 'license'
full_command = self.get_path_plesk(command)
LOG.info("full command: %s", full_command)
exit_code, outs, errs = run_uapi_command([full_command, '--install', activation_key],
'set Plesk License', 'license_plesk')
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)
# pylint: disable=too-many-locals
def enable_plesk(self, vm_ip: str, vm_resource: str, plesk_user: str, plesk_pass: str,
*args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Enable plesk on local vm
:param vm_ip: External IP address of the VM
:param vm_resource: The resource name for the third-party hosting provider
:param plesk_user: User name to be used on Plesk instance
:param plesk_pass: Password to be used on Plesk instance
:param intermediate_result: Dict containing metadata for retries
:raises DecryptError: if there is a problem with decrypting the password
:return: tuple with success, error data
"""
LOG.info("enable plesk")
op_name = self.OP_ENABLE_PLESK
password_arg = shlex.quote(self.decrypt(plesk_pass))
ops_map = self.PLESK_OPS_RESOURCE_ATTRIBUTE_MAP[ResourceType(vm_resource)][op_name]
init_conf_cmd = self.get_path_plesk('init_conf')
check_configured_cmd = '{} --check-configured'.format(init_conf_cmd)
exit_code, outs, errs = run_uapi_command(check_configured_cmd, "Check Plesk configured", op_name,
use_shell=True)
if exit_code != 0:
# Plesk is not configured
init_conf_setup_flag = '--init'
else:
# Plesk is configured
init_conf_setup_flag = '--update'
prep_cmds = [
('set minimum password strength',
'{server_pref} -u -min_password_strength medium'.format(
server_pref=self.get_path_plesk('server_pref'))),
('setup Plesk', ops_map[self.SETUP_CMD].format(
init_conf_cmd=init_conf_cmd,
# init_conf_setup_flag=ops_map[self.INIT_CONF_SETUP_FLAG],
init_conf_setup_flag=init_conf_setup_flag,
plesk_user=plesk_user, password=password_arg)),
('check config', check_configured_cmd),
('set Poweruser', '{set_poweruser_cmd} --on'.format(set_poweruser_cmd=self.get_path_plesk('poweruser'))),
('set auto updates', 'plesk db "INSERT INTO misc(param, val) ' +
'VALUES(\'automaticSystemPackageUpdates\', \'true\') ON DUPLICATE KEY UPDATE val = \'true\';"')
]
if ops_map[self.RUN_HIDE_INTERNAL_IP]:
prep_cmds += [
('hide internal ip', '''sed -i'' 's/blacklist=".*"/blacklist="{vm_ip}"/' '''
'/usr/local/psa/admin/conf/panel.ini'.format(vm_ip=vm_ip)),
('reread ips', 'plesk bin ipmanage --reread')
]
if ops_map[self.RUN_DISABLE_SESSION_IP_CHECK]:
prep_cmds += [
('disable session ip check',
'plesk db "'
'INSERT INTO misc(param,val) '
'VALUES(\'disable_check_session_ip\', \'true\') '
'ON DUPLICATE KEY UPDATE val = \'true\';"')
]
exit_code, outs, errs = run_multiple_uapi_commands(prep_cmds, op_name, use_shell=True, omit_string=password_arg)
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)
def site_list_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any:
"""Retrieve a list of Plesk sites
:param intermediate_result: Dict containing metadata for retries
:return: list of sites or tuple with error data
"""
LOG.info("get site list")
site_list = {}
exit_code, outs, errs = runCommand(self.get_path_plesk('subscription') + " --list",
"site_list_plesk: get_subscriptions", useShell=True)
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, 'site_list_plesk: get_subscriptions',
intermediate_result)
site_list['subscriptions'] = outs.split('\n')
site_cmd = self.get_path_plesk('site')
exit_code, outs, errs = runCommand(site_cmd + " --list", "site_list_plesk: get_sites", useShell=True)
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, 'site_list_plesk: get_sites', intermediate_result)
site_list['sites'] = outs.split('\n')
required_fields = {'FTP Login': 'ftp_login',
'IP address': 'ip_address',
'Disk space used by httpdocs': 'diskused',
'Hosting type': 'webspace'}
for i, site in enumerate(site_list['sites']):
site_data = {'name': site}
exit_code, outs, errs = runCommand(site_cmd + " --info " + site, "site_list_plesk: get_site_info",
useShell=True)
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, 'site_list_plesk: get_site_info',
intermediate_result)
output = outs.split('\n')
for line in output:
data = [x.strip() for x in line.split(':')]
logging.info("data: %s", data)
if len(data) == 2:
key = data[0]
if key in required_fields:
site_data[required_fields[key]] = data[1]
site_list['sites'][i] = site_data
logging.info(json.dumps(site_list))
return site_list
def server_prep_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult:
""" Install Plesk on a server
:param intermediate_result: Dict containing metadata for retries
:return: tuple with success, error data
"""
LOG.info("install plesk on '%s'", str(self))
op_name = 'server_prep_plesk'
# download plesk-installer. keep: provides compatability for updates, troubleshooting, etc.
prepCmds = [
('Download Plesk', 'wget https://autoinstall.plesk.com/plesk-installer'),
('Modify Perms', 'chmod +x plesk-installer'),
('Install Plesk', 'sh plesk-installer --select-product-id=plesk --installation-type Typical ' +
'--select-release-latest --notify-email admin@example.com'),
('Delete Docker Interface', 'ip link del docker0')
]
for purpose, cmd in prepCmds:
exit_code, outs, errs = run_uapi_command(cmd, purpose, op_name, use_shell=True)
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)
def get_client_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any:
""" Get Plesk SSO URL
:param intermediate_result: Dict containing metadata for retries
:return: SSO URL string or tuple with error data
"""
exit_code, outs, errs = runCommand(self.get_path_plesk('admin') + " --get-login-link",
"get sso link", useShell=True)
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, 'get_client_plesk',
intermediate_result=intermediate_result)
links = outs.split('\n')
sso_url = str(links[0])
return self.encrypt(sso_url).decode('utf-8')
def change_hostname_plesk(self, hostname: str, *args: Any,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
""" Change hostname on Plesk
:param hostname: The new server hostname
:param intermediate_result: Dict containing metadata for retries
:return: tuple with success, error data
"""
op_name = 'change_hostname_plesk'
rcmd = self.get_path_plesk('server_pref') + ' --update' + ' -hostname ' + hostname
exit_code, outs, errs = run_uapi_command(rcmd, 'set Plesk hostname', op_name, use_shell=True)
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)
def change_admin_password_plesk(self, plesk_admin_pass: str, *args: Any,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
""" Change admin password on Plesk
:param plesk_admin_pass: Encrypted password for Plesk admin user
:param intermediate_result: Dict containing metadata for retries
:return: tuple with success, error data
"""
op_name = 'change_admin_password_plesk'
password = self.decrypt(plesk_admin_pass)
# Perform operations on Control Panel
plesk_bin = self.get_path_plesk('init_conf')
command = [plesk_bin, '--set-admin-password', '-passwd', password]
exit_code, outs, errs = run_uapi_command(
command, 'set Plesk admin password', op_name, omit_string=password)
return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result)
def configure_postfix(self, relay_address: str, op_name: str,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Configure the postfix MTA.
:param relay_address: IP address or host name of the mail relay to set
:param op_name: name of the operation being executed
:param intermediate_result: Dict containing metadata for retries
"""
if relay_address is None:
return
class SearchState(IntEnum):
SEARCHING = 0
SKIPPING = 1
DONE = 2
# In the configuration file distributed with postfix, there are a either a number of commented-out relayhost
# lines or a blank relay host line. Add our new line immediately after the last commented-out relayhost line
# or replace the blank relayhost line
try:
relayhost_cmt = re.compile(r'^[ \t]*#[ \t]*relayhost[ \t]*=')
relayhost_blnk = re.compile(r'^[ \t]*relayhost[ \t]*=')
relayhost_line = "relayhost = [{}]\n".format(relay_address)
state = SearchState.SEARCHING
with FileInput('/etc/postfix/main.cf', inplace=True) as stream:
for line in stream:
if relayhost_blnk.match(line):
sys.stdout.write(relayhost_line)
state = SearchState.DONE
continue
if relayhost_cmt.match(line):
if state == SearchState.SEARCHING:
state = SearchState.SKIPPING
else:
if state == SearchState.SKIPPING:
# We've found the first line after the other commented-out relayhost lines
# Add the new relayhost line here and set the state to indicate that we've
# finished.
sys.stdout.write(relayhost_line)
state = SearchState.DONE
sys.stdout.write(line)
# If we get here and the state isn't DONE, then just add the line to the end of the file
if state != SearchState.DONE:
with open('/etc/postfix/main.cf', 'a', encoding='utf-8') as f:
f.write(relayhost_line)
except Exception as ex: # pylint: disable=broad-except
LOG.error("cp_os_op %s.configure_postfix result(fail): %s", op_name, ex)
return self._result_handler(1, '', str(ex), op_name, intermediate_result=intermediate_result)
LOG.info("restarting postfix")
my_op_name = op_name + ': restart postfix'
return self.run_command_and_handle_result(['systemctl', 'restart', 'postfix'], my_op_name,
intermediate_result=intermediate_result)
def set_outgoing_email_ip(self, address: str, *args: Any,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Set Plesk's outgoing e-mail IP address.
This only works with Postfix mail server. See General #6 for more information:
https://docs.plesk.com/en-US/obsidian/administrator-guide/mail/configuring-serverwide-mail-settings.59430/
:param address: IP address from which to send e-mail
:param intermediate_result: Dict containing result data and meta data for retries
:return: Nydus operation result
"""
return self.run_command_and_handle_result(
[self.get_path_plesk(filename='mailserver'),
'--set-outgoing-email-mode', 'explicit-ip', '-explicit-ipv4', address],
'set_outgoing_email_ip', intermediate_result=intermediate_result)
class CentOSPlesk(CentOS, LinuxPlesk):
PLESK_INSTALL_LOCATIONS = [
PLESK_DIR_2
]
def configure_mta(self, payload, unused=None, intermediate_result=None) -> NydusResult:
"""Configure mail transfer agent on a CentOS-based Plesk server
:param payload: Dict containing op params
:param intermediate_result: Dict containing result data and meta data for retries
:return: tuple with success, retry or error data
"""
LOG.info("%s.configure_mta", self)
return self.do_configure_mta(payload, 'CentOSPlesk.configure_mta', intermediate_result=intermediate_result)
def do_configure_mta(self, payload: Dict[str, Any], op_name: str,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Configure mail transfer agent on a CentOS-based Plesk server
:param payload: Dict containing op params
:param op_name: The name of the op, including the classname
:param intermediate_result: Dict containing result data and meta data for retries
:return: tuple with success, retry or error data
"""
LOG.info("%s.do_configure_mta", self)
result = self.install_postfix(op_name, intermediate_result=intermediate_result)
if isinstance(result, Retry) or not result[0]:
return result
relay_address = payload.get('relay_address')
LOG.info("%s %s start relay_address: %s", self.get_op_type().value, op_name, relay_address)
return self.configure_postfix(relay_address, op_name, intermediate_result=intermediate_result)
def install_postfix(self, op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Install postfix on a CentOS-based Plesk server
:param op_name: The name of the op, including the classname
:param intermediate_result: Dict containing result data and meta data for retries
:return: tuple with success, retry or error data
"""
LOG.info("%s.install_postfix", self)
my_op_name = op_name + ': yum_clean_all'
exit_code, outs, errs = self._run_yum_command(['yum', 'clean', 'all'], my_op_name)
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)
my_op_name = op_name + ': remove_sendmail'
exit_code, outs, errs = self._run_yum_command(['yum', '-y', 'remove', 'sendmail'], my_op_name)
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)
my_op_name = op_name + ': install postfix'
exit_code, outs, errs = self._run_yum_command(['yum', '-y', 'install', 'postfix'], my_op_name)
return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)
class CentOS6Plesk(CentOS6, CentOSPlesk): # pylint: disable=too-many-ancestors
def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called
raise NotImplementedError
class CentOS7Plesk(CentOS7, CentOSPlesk): # pylint: disable=too-many-ancestors
pass
class AlmaLinux8Plesk(AlmaLinux8, CentOSPlesk): # pylint: disable=too-many-ancestors
pass
class AlmaLinux9Plesk(AlmaLinux9, CentOSPlesk): # pylint: disable=too-many-ancestors
pass
class DebianPlesk(Debian, LinuxPlesk):
def configure_mta(self, payload, unused=None, intermediate_result=None) -> NydusResult:
"""Configure mail transfer agent on a Debian-based Plesk server
:param payload: Dict containing op params
:param intermediate_result: Dict containing result data and meta data for retries
:return: tuple with success, retry or error data
"""
LOG.info("DebianPlesk.configure_mta")
return self.do_configure_mta(payload, 'DebianPlesk.configure_mta', intermediate_result=intermediate_result)
def do_configure_mta(self, payload: Dict[str, Any], op_name: str,
intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Configure mail transfer agent on a Debian-based Plesk server
:param payload: Dict containing op params
:param op_name: The name of the op, including the classname
:param intermediate_result: Dict containing result data and meta data for retries
:return: tuple with success, retry or error data
"""
LOG.info("DebianPlesk.do_configure_mta")
result = self.install_postfix(op_name, intermediate_result=intermediate_result)
if isinstance(result, Retry) or not result[0]:
return result
relay_address = payload.get('relay_address')
LOG.info("%s %s start relay_address: %s", self.get_op_type().value, op_name, relay_address)
return self.configure_postfix(relay_address, op_name, intermediate_result=intermediate_result)
def install_postfix(self, op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult:
"""Install postfix on a CentOS-based Plesk server
:param op_name: The name of the op, including the classname
:param intermediate_result: Dict containing result data and meta data for retries
:return: tuple with success, retry or error data
"""
LOG.info("DebianPlesk.install_postfix")
my_op_name = op_name + ': remove_sendmail'
exit_code, outs, errs = runCommand(['apt-get', 'remove', '-y', 'sendmail'], my_op_name)
if exit_code != 0:
return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)
result = self._install('postfix')
if isinstance(result, Retry):
return result
my_op_name = op_name + ': install postfix'
exit_code, outs, errs = result
return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result)
class Debian8Plesk(Debian8, DebianPlesk): # pylint: disable=too-many-ancestors
def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called
raise NotImplementedError
class Debian10Plesk(Debian10, DebianPlesk): # pylint: disable=too-many-ancestors
def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called
raise NotImplementedError
class Debian11Plesk(Debian11, DebianPlesk): # pylint: disable=too-many-ancestors
def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called
raise NotImplementedError
class Debian12Plesk(Debian12, DebianPlesk): # pylint: disable=too-many-ancestors
def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called
raise NotImplementedError
class Ubuntu1604Plesk(Ubuntu1604, DebianPlesk): # pylint: disable=too-many-ancestors
PLESK_INSTALL_LOCATIONS = [
PLESK_DIR_1
]
class Ubuntu2004Plesk(Ubuntu2004, DebianPlesk): # pylint: disable=too-many-ancestors
PLESK_INSTALL_LOCATIONS = [
PLESK_DIR_1
]
class Ubuntu2204Plesk(Ubuntu2204, DebianPlesk): # pylint: disable=too-many-ancestors
PLESK_INSTALL_LOCATIONS = [
PLESK_DIR_1
]
class Ubuntu2404Plesk(Ubuntu2404, DebianPlesk): # pylint: disable=too-many-ancestors
PLESK_INSTALL_LOCATIONS = [
PLESK_DIR_1
]