File: //var/opt/nydus/ops/customer_local_ops/control_panel/linux_ispconfig.py
# -*- coding: utf-8 -*-
import logging
import re
from typing import Dict, Any
from mysql.connector import connection
from customer_local_ops import OpType
from customer_local_ops.operating_system.linux import Linux, CentOS, CentOS6, CentOS7, Debian, Debian8, Ubuntu1604
from customer_local_ops.control_panel.ispconfig import OSISPConfig
LOG = logging.getLogger(__name__)
class LinuxISPConfig(Linux, OSISPConfig):
"""
ISPConfig Customer Local Ops for the Linux OS.
All function names should contain 'ispconfig' so as not to override the OS ops
"""
op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM
os_op_instance = None
def change_ispconfig_hostname(self, payload: Dict[str, Any]) -> None:
"""Changes the server hostname for ispconfig
:param payload: A dict containing input data
"""
new_hostname = payload['hostname']
self._change_hostname_in_db(new_hostname)
def _change_hostname_in_db(self, new_hostname: str) -> None:
"""Changes the server hostname for ispconfig in the database
:param new_hostname: The new hostname string
"""
cnx = self._get_database_connection()
cursor = cnx.cursor()
try:
cursor.execute('select config from server where server_id = 1')
config = cursor.fetchone()[0]
config = self._replace_hostname_in_config(config, new_hostname)
cursor.execute('UPDATE server SET config = %s WHERE server_id = 1', (config, ))
cnx.commit()
finally:
cursor.close()
cnx.close()
def _get_database_connection(self) -> connection.MySQLConnection:
"""Returns an ispconfig MySQL database connection"""
sql_username = self._get_sql_username()
sql_password = self._get_sql_password()
cnx = connection.MySQLConnection(user=sql_username, password=sql_password, host='127.0.0.1',
database='dbispconfig')
return cnx
def _replace_hostname_in_config(self, config: str, new_hostname: str) -> str:
"""Replaces hostname in ispconfig configuration and returns the new configuration string
:param config: Configuration string in which to replace the hostname
:param new_hostname: The new hostname string
"""
pattern = r'hostname=(?P<hostname>[\w\-.]*)'
match = re.search(pattern, config)
old_hostname = match.groupdict()['hostname']
config = config.replace(old_hostname, new_hostname)
return config
def change_ispconfig_password(self, payload: Dict[str, Any], op_name: str) -> None:
"""Changes the user password for ispconfig
:param payload: A dict containing input data
:param op_name: The name of the op
"""
new_password = self._get_new_password(payload)
self._change_password_in_db(new_password)
def _change_password_in_db(self, new_password: str) -> None:
"""Changes the server password for ispconfig in the database
:param new_password: The new password string
"""
cnx = self._get_database_connection()
cursor = cnx.cursor()
try:
cursor.execute('UPDATE sys_user SET passwort = md5(%s) WHERE username = %s', (new_password, 'admin'))
cnx.commit()
finally:
cursor.close()
cnx.close()
def _get_new_password(self, payload: Dict[str, Any]) -> str:
"""Decrypts the new password from an encrypted string
:param payload: A dict containing input data
"""
encrypted_password = payload['encrypted_password']
new_password = self.decrypt(encrypted_password)
return new_password
def _get_sql_password(self) -> str:
"""Returns the MySQL database password from configuration"""
return self._get_config_value('db_password')
def _get_sql_username(self) -> str:
"""Returns the MySQL database user from configuration"""
return self._get_config_value('db_user')
def _get_config_value(self, key: str) -> str:
"""Retrieves a value from configuration from its key
:param key: The configuration key for the value
"""
with open('/usr/local/ispconfig/interface/lib/config.inc.php', encoding='utf-8') as f:
ispConfig = f.read()
pattern = re.escape(r"$conf['{0}']".format(key)) + \
r'\s*=\s*[\'"](?P<value>[^"\']*)["\']'
match = re.search(pattern, ispConfig)
value = match.groupdict()['value']
return value
class CentOSISPConfig(CentOS, LinuxISPConfig):
def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called
raise NotImplementedError
class CentOS6ISPConfig(CentOS6, CentOSISPConfig): # pylint: disable=too-many-ancestors
pass
class CentOS7ISPConfig(CentOS7, CentOSISPConfig): # pylint: disable=too-many-ancestors
pass
class DebianISPConfig(Debian, LinuxISPConfig):
pass
class Debian8ISPConfig(Debian8, DebianISPConfig): # pylint: disable=too-many-ancestors
def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called
raise NotImplementedError
class Ubuntu1604ISPConfig(Ubuntu1604, DebianISPConfig): # pylint: disable=too-many-ancestors
pass