File: //var/opt/nydus/ops/customer_local_ops/operating_system/windows.py
from pathlib import Path
from typing import Any, Dict, List, Union, Tuple
import json
import logging
import os
import re
from customer_local_ops import NydusResult, Ops, OpType
from customer_local_ops.exceptions import DecryptError
from customer_local_ops.operating_system import POWERSHELL_PATH
from customer_local_ops.util import b64str, execute
from customer_local_ops.util.execute import runCommand, run_powershell
LOG = logging.getLogger(__name__)
def run_powershell_file(script: str, *args: Any, **kw: Any):
return execute.run_powershell_file(POWERSHELL_PATH / script, *args, **kw)
class Windows(Ops):
DISK_UTILIZATION_PATH = r"C:\\" # double backslash still required by Python syntax
op_type = OpType.OPERATING_SYSTEM
DISTRO = 'Windows'
QEMU_PACKAGE_NAME = 'virtio-win.iso'
DEVCON = 'devcon.exe'
VIRTIO = 'virtio-win-gt-x64.msi'
def _get_vm_tag(self):
"""Return this VM's tag."""
command_get_cn = r"""
$s = Get-childitem -Path Cert:\LocalMachine\My\ | Where-Object {$_.Issuer -match "Nydus Customer Services"} |
Select-Object -ExpandProperty Subject
$s = $s -replace "(CN=)(.*?),.*",'$2'
$s
"""
return run_powershell(command_get_cn, 'get_vm_tag', True)
def _run_powershell_op(self, script_file: Union[str, Path], op_name: str, **kw: Any) -> NydusResult:
"""Run a PowerShell script and return a Nydus result for the outcome.
:param script_file: PowerShell script name, no path (must be in operating_system/powershell/)
:param op_name: name of the Nydus operation
:param kw: additional keyword arguments to pass to run_powershell_file
:returns: standard CLO Nydus result with success based on script exit code (see build_result_from_cmd_output)
"""
tag = kw.pop('tag', op_name)
return self.build_result_from_cmd_output(
*run_powershell_file(script_file, tag, **kw),
op_name)
def add_user(self, payload: Dict[str, Any], unused: Any = None) -> NydusResult:
"""Create a user that can access this server over Remote Desktop.
:param payload: dictionary containing username and encrypted_password
:returns: result of the operation
"""
if payload.get('fail_if_exists', False):
raise NotImplementedError('fail_if_exists:True not implemented')
op_name = 'add_user'
username = payload['username']
encrypted_password = payload['encrypted_password']
try:
password = self.decrypt(encrypted_password)
except DecryptError as ex:
return False, self.build_result_dict(ex.outs, ex.errs, op_name)
# Having trouble decoding a UTF-8-encoded string with code points above 127 (like 両) on the other side.
# Base64-encoding password to limit character set in transit to [a-zA-Z0-9+/=].
password_b64 = b64str(password)
exit_code, outs, errs = run_powershell_file(
'add_user.ps1', op_name, script_file_args=[username, 'Remote Desktop Users'],
stdin=password_b64, quiet=True)
outs = outs.replace(password_b64, '') # Strip stdin echo from stdout
return exit_code == 0, self.build_result_dict(outs, errs, op_name)
def add_user_to_group(self, username: str, group: str) -> NydusResult:
"""Add a user to a group.
:param username: name of user to add to the group
:param group: user will be added to this group
:returns: result of the operation
"""
return self._run_powershell_op(
'add_user_to_group.ps1',
'add_user_to_group',
script_file_args=[username, group])
def _user_needs_logout(self, username) -> bool:
"""Check if the specified user needs to log out
:param username: name of user to check
:returns: True or False
"""
get_sessions_command = 'query user'
_, outs, _ = run_powershell(get_sessions_command, 'get_active_sessions')
if username and (username.lower() in outs):
return True
return False
def remove_user(self, username):
op_name = 'remove_user'
# check whether user is not performed logout
is_user_active = self._user_needs_logout(username)
if is_user_active:
return False, self.build_result_dict("User is still active.",
"User needs to perform logout before user can be removed.",
op_name)
command = 'net user %s /DELETE ; Remove-Item -Path "C:\\Users\\%s" -Recurse -Force' % (username, username)
exit_code, outs, errs = run_powershell(command, op_name)
return exit_code == 0, self.build_result_dict(outs, errs, op_name)
def change_password(self, payload: Dict[str, Any]) -> NydusResult:
"""Change a user's password.
:param payload: dictionary containing username and new encrypted_password
:returns: result of the operation
"""
op_name = 'change_password'
username = payload['username']
encrypted_password = payload['encrypted_password']
try:
password = self.decrypt(encrypted_password)
except DecryptError as ex:
return False, self.build_result_dict(ex.outs, ex.errs, op_name)
# Having trouble decoding a UTF-8-encoded string with code points above 127 (like 両) on the other side.
# Base64-encoding password to limit character set in transit to [a-zA-Z0-9+/=].
password_b64 = b64str(password)
exit_code, outs, errs = run_powershell_file(
'change_password.ps1', op_name, script_file_args=[username], stdin=password_b64, quiet=True)
outs = outs.replace(password_b64, '') # Strip stdin echo from stdout
return exit_code == 0, self.build_result_dict(outs, errs, op_name)
def configure_mta(self, payload, unused=None):
# Function is split in order that control panel ops can call underlying os function directly,
# without incurring op overhead such as formatted retry results
# The `unused` param is an artifact of Archon workflows requiring an I/O
# chain for sequencing.
#
op_name = 'configure_mta'
return self.do_configure_mta(payload, op_name)
def enable_admin(self, username, unused=None):
LOG.info("Adding user sudo permissions for %s via administrators group", username)
exit_code, outs, errs = runCommand(['net', 'localgroup', 'administrators', username, '/add'],
"add user %s to administrators group" % username)
op_name = 'enable_admin'
if exit_code != 0:
if 'already a member of the group' in errs or 'name is not a member of the group' in errs:
LOG.info("Group already properly set.")
else:
return False, self.build_result_dict(outs, errs, op_name)
return self.build_result_dict(outs, errs, op_name)
def disable_admin(self, username, unused=None):
LOG.info('Removing user sudo permissions for %s '
'via administrators group', username)
op_name = 'disable_admin'
commands = [
('add RDC Access', ['net', 'localgroup', 'Remote Desktop Users', username, '/add']),
('remove Admin Access', ['net', 'localgroup', 'administrators', username, '/delete']),
]
for purpose, cmd in commands:
exit_code, outs, errs = runCommand(cmd, purpose)
if exit_code != 0:
if 'already a member of the group' in errs or 'name is not a member of the group' in errs:
LOG.info('Group already properly set.')
else:
return False, self.build_result_dict(outs, errs, op_name)
return self.build_result_dict(outs, errs, op_name)
def disable_all_admins(self, unused=None):
# The `unused` param is an artifact of Archon workflows requiring an I/O
# chain for sequencing.
#
op_name = 'disable_all_admins'
command = r"""
net user tempHfsAdmin /delete
$usersToKeep=@("admin",
"Administrator",
"cloudbase-init",
"DefaultAccount",
"Guest",
"IME_ADMIN",
"IME_USER",
"IUSRPLESK_atmail",
"IUSRPLESK_horde",
"IUSRPLESK_smwebmail",
"IUSRPLESK_sqladmin",
"IUSR_FS_PUBLIC",
"IUSR_FS_UNLISTED",
"IWAM_FILESHARING",
"IWAM_plesk(default)",
"IWAM_sitepreview",
"nydus",
"Plesk Administrator",
"psaadm")
Get-CimInstance -ClassName win32_group -Filter "name = 'administrators'" | `
Get-CimAssociatedInstance -Association win32_groupuser | `
%{
if($usersToKeep -notcontains $_.name) {
$userName=$_.name
"Adding user to 'Remote Desktop Users' group: $userName"
net localgroup "Remote Desktop Users" $userName /add
"Removing user from 'Administrators' group: $userName"
net localgroup "administrators" $userName /delete
}
}"""
exit_code, outs, errs = run_powershell(command, 'Disable All Admins')
return exit_code == 0, self.build_result_dict(outs, errs, op_name)
def shutdown_clean(self, unused=None):
# The `unused` param is an artifact of Archon workflows requiring an I/O
# chain for sequencing.
#
raise NotImplementedError
def do_configure_mta(self, payload, op_name, intermediate_result: Dict[str, Any] = None):
relay = payload.get('relay_address')
LOG.debug("configuring MTA for %s", relay)
if relay is None:
return self.build_result_dict(
'No relay; skipping MTA configuration',
'', op_name)
smtp_registry = r"HKLM\SOFTWARE\Wow6432Node\Mail Enable\Mail Enable\Connectors\SMTP"
exit_code, outs, errs = runCommand(['reg', 'add', smtp_registry, '/v',
"Forward All Outbound Host", '/d', relay, '/f'],
"set mail relay")
if exit_code != 0:
return False, self.build_result_dict(outs, errs, op_name)
exit_code, outs, errs = runCommand(['reg', 'add', smtp_registry, '/v',
"Forward All Outbound Enabled", '/d', '1', '/f'],
"enable mail relay")
if exit_code != 0: # pylint: disable=no-else-return
return False, self.build_result_dict(outs, errs, op_name)
else:
return self.build_result_dict(outs, errs, op_name)
def configure_ips(self, vm_address, addresses, gateway, unused=None):
op_name = 'configure_ips'
exit_code, outs, errs = run_powershell_file(
'setNetwork.ps1', 'configure network',
script_file_args=[vm_address] + addresses)
if exit_code != 0:
LOG.error("failed to configure ips: %s (%s)", outs, errs)
return False, self.build_result_dict(outs, errs, op_name)
return self.build_result_dict(outs, errs, op_name)
def change_hostname(self, payload, unused=None):
op_name = 'change_hostname'
command = r"""
[string]$hostname='""" + payload['hostname'] + """';
if($hostname.tolower().StartsWith('www.')){
$hostname=$hostname.substring(4)
}
if($hostname.IndexOf('.') -gt -1){
$hostSet=$false;
$hostname.Split('.') | %{
if($_.Length -gt 0 -and $hostSet -eq $false){
$hostname=$_;
$hostSet=$true;
}
}
}
Rename-Computer -NewName $hostname -Force;
"""
exit_code, outs, errs = run_powershell(command, 'changeHostname')
return exit_code == 0, self.build_result_dict(outs, errs, op_name)
def _get_cpu_utilization(self):
_, outs, _ = run_powershell_file(
'cpu_utilization.ps1',
"get_utilization|get_cpu_utilization")
utilization = json.loads(outs)
cpu = float(utilization['cpuTimePercent'])
return {
'cpuUsed': cpu
}
def _get_memory_utilization(self):
_, outs, _ = run_powershell_file(
'memory_utilization.ps1',
"get_utilization|get_memory_utilization")
utilization = json.loads(outs)
memory_total = int(utilization['ramTotalMiB'])
memory_free = int(utilization['ramFreeMiB'])
memory_used = memory_total - memory_free
return {
'memoryTotal': memory_total,
'memoryUsed': memory_used,
}
def install_panopta(self, payload, *args, **kwargs):
# Installs Fortimonitor agent on the server
customer_key = payload['customer_key']
template_ids = payload['template_ids']
LOG.info("Installing Fortimonitor agent - Customer Key: %s, Template IDs: %s",
customer_key, template_ids)
manifest_file_contents = """templates = {template_ids}
enable_countermeasures = false""".format(**payload)
server_key = payload.get('server_key')
if server_key:
manifest_file_contents += "\nserver_key = {server_key}".format(server_key=server_key)
fqdn = payload.get('fqdn')
if fqdn:
manifest_file_contents += "\nfqdn = {fqdn}".format(fqdn=fqdn)
server_name = payload.get('serverName')
if server_name:
manifest_file_contents += "\nserver_name = {serverName}".format(serverName=server_name)
disable_server_match = payload.get('disable_server_match', False)
if disable_server_match:
manifest_file_contents += "\ndisable_server_match = true"
ssl_version = kwargs.get('ssl_version', '')
command = r"""{ssl_version}
$Manifest = @"
{manifest_file_contents}
"@
$Manifest | Out-File -FilePath "C:\FortimonitorAgent.manifest"
mkdir c:\fortimonitor_temp
cd c:\fortimonitor_temp
Invoke-WebRequest https://repo.fortimonitor.com/install/win/fortimonitor_agent_windows.ps1 -OutFile c:\fortimonitor_temp\fortimonitor_agent_windows.ps1 # noqa E501
c:\fortimonitor_temp\fortimonitor_agent_windows.ps1 -customer_key {customer_key}"""\
.format(ssl_version=ssl_version, manifest_file_contents=manifest_file_contents, customer_key=customer_key)
exit_code, outs, errs = run_powershell(command, 'Install Panopta')
return exit_code == 0, self.build_result_dict(outs, errs, 'install_panopta')
def _delete_agent(self, agent_name: str) -> Tuple[int, str, str]:
"""
delete agent if present from the server
"""
cmd = r"""Get-Package -Name '{agent_name} Agent'""".format(agent_name=agent_name)
_, outs, _ = run_powershell(cmd, "Check {agent_name} Installed".format(agent_name=agent_name))
if outs:
if agent_name == "Panopta":
command = r"""$app = Get-WmiObject -Class Win32_Product |
Where-Object { $_.Name -match "Panopta Agent" }
$app.Uninstall();"""
else:
command = r"""$app = Get-WmiObject -Class Win32_Product |
Where-Object { $_.Name -match "FortiMonitor Agent" }
$app.Uninstall();"""
return run_powershell(command, "Delete {agent_name}".format(agent_name=agent_name))
return 0, 'Panopta agent is not installed', ''
def _delete_agent_manifest(self, manifest_name: str) -> Tuple[int, str, str]:
"""
delete agent manifest file
"""
if os.path.exists(r"C:\{manifest_name}Agent.manifest".format(manifest_name=manifest_name)):
command = r"""Remove-Item –path C:\{manifest_name}Agent.manifest –force""".format(
manifest_name=manifest_name)
return run_powershell(command, "Delete {manifest_name} manifest file".format(manifest_name=manifest_name))
return 0, '{manifest_name} manifest file is not present'.format(manifest_name=manifest_name), ''
def _delete_temp_directory(self, temp_dir: str) -> Tuple[int, str, str]:
"""
delete agent's temporary directory
"""
if os.path.exists(r"C:\{temp_dir}".format(temp_dir=temp_dir)):
command = r"""Remove-Item -path C:\{temp_dir} -Recurse -force""".format(
temp_dir=temp_dir)
return run_powershell(command, "Delete {temp_dir} directory".format(temp_dir=temp_dir))
return 0, '{temp_dir} directory is not present'.format(temp_dir=temp_dir), ''
def delete_panopta(self, *args, **kwargs) -> NydusResult:
"""
delete panopta and fm-agent if present from the server and removes the panopta/fm-agent manifest file
:return result of uninstall operation
"""
exit_code1, outs, errs = self._delete_agent("Panopta")
exit_code2, output, error = self._delete_agent("FortiMonitor")
exit_code = exit_code1 + exit_code2
outs = outs + '\n' + output
errs = errs + '\n' + error
if exit_code != 0:
LOG.error('failed to remove panopta/fm-agent agent')
return False, self.build_result_dict(outs, errs, 'delete_panopta')
try:
exit_code1, outs2, errs2 = self._delete_agent_manifest("Panopta")
exit_code2, output, error = self._delete_agent_manifest("Fortimonitor")
outs2 = outs2 + '\n' + output
errs2 = errs2 + '\n' + error
outs = outs + '\n' + outs2
errs = errs + '\n' + errs2
exit_code3, outs2, errs2 = self._delete_temp_directory("panopta_temp")
exit_code4, output, error = self._delete_temp_directory("fortimonitor_temp")
exit_code = exit_code1 + exit_code2 + exit_code3 + exit_code4
outs2 = outs2 + '\n' + output
errs2 = errs2 + '\n' + error
outs = outs + '\n' + outs2
errs = errs + '\n' + errs2
except (IOError, OSError) as ex:
msg = "Failed to unlink Panopta/FortiMonitor manifest file or temporary directory"
LOG.exception(msg)
return False, self.build_result_dict('', str(ex), 'delete_panopta')
return exit_code == 0, self.build_result_dict(outs, errs, 'delete_panopta')
def upgrade_panopta(self, *args, **kwargs):
"""
upgrade panopta on the server
:return result of upgrade operation"""
cmd = r"""Get-Package -Name 'Panopta Agent'"""
exit_code, outs, _ = run_powershell(cmd, "Check Panopta Installed")
if outs:
command = r"""Invoke-WebRequest https://repo.fortimonitor.com/install/win/fm-upgrade.ps1 -OutFile fm-upgrade.ps1 # noqa E501 pylint: disable=line-too-long
.\fm-upgrade.ps1 -autoupdate"""
exit_code, outs, errs = run_powershell(command, "Upgrade Panopta")
if exit_code != 0:
LOG.error('failed to upgrade panopta agent to fortimonitor agent')
return False, self.build_result_dict(outs, errs, 'upgrade_panopta')
else:
exit_code, outs, errs = 0, 'Panopta agent is not installed', ''
def get_panopta_server_key(self, *args, **kwargs):
"""
Look into the Agent.config xml file and find the ServerKey add component.
:return: The value of the ServerKey add component in the Agent.config file.
"""
agent_config = r"""C:\Program Files (x86)\PanoptaAgent\Agent.config"""
if not os.path.exists(agent_config):
agent_config = r"""C:\Program Files (x86)\FortiMonitorAgent\Agent.config"""
cmd = r"""[System.IO.File]::Exists("{agent_config}")""".format(agent_config=agent_config)
exit_code, outs, _ = run_powershell(cmd, "Check Config File")
if outs == 'False':
raise ValueError('Panopta config file not found')
command = r"""[xml]$config = Get-Content "{agent_config}"
$serverKey = $config.agent.service.add | ? {{$_.key -eq "ServerKey"}} | % {{echo $_.value}}
echo $serverKey
""".format(agent_config=agent_config)
exit_code, outs, _ = run_powershell(command, "Get Server Key")
return exit_code == 0, {'outs': outs, 'append_info': False}
def update_invalid_resolvers(self, valid_resolvers: List[str], invalid_resolvers: List[str],
*args, **kwargs) -> NydusResult:
"""
If the server has any of the listed invalid resolvers, replace them with the valid resolvers.
Otherwise, leave the resolvers as-is.
:param valid_resolvers: A list of valid dns nameservers to be added to the server (if needed)
:param invalid_resolvers: A list of invalid dns nameservers to be removed from the server
:return: A Nydus result
"""
# Resolver values need to be in a comma-separated list for the powershell script args
# e.g. 10.1.1.1,10.1.2.1
valid_resolvers_arg = ','.join(valid_resolvers)
invalid_resolvers_arg = ','.join(invalid_resolvers)
exit_code, outs, errs = run_powershell_file(script="update_invalid_resolvers.ps1",
tag="update_invalid_resolvers",
script_file_args=[
'-validResolvers',
valid_resolvers_arg,
'-invalidResolvers',
invalid_resolvers_arg
])
if exit_code != 0:
if errs:
# Extract plain error message from powershell gubbins.
# If format is not as we expect, leave error message as-is.
err_array = errs.split('+')
err_msg = err_array[0]
err_msg_array = err_msg.split(':')
if len(err_msg_array) >= 3:
errs = err_msg_array[2].strip().replace('\r\n', '')
return exit_code == 0, self.build_result_dict(outs, errs, 'update_invalid_resolvers')
def enable_winexe(self, *_: Any) -> NydusResult:
"""Configure the system so it can be accessed remotely with winexe.
We use winexe in our integration tests to inspect VMs.
:param *_: op chaining arguments
:returns: operation result
"""
return self._run_powershell_op('enable_winexe.ps1', 'enable_winexe')
def open_port(self, port: int, *_: Any) -> NydusResult:
"""Configure the system so the specified port can be accessed remotely.
:param port: The port number to open
:param *_: op chaining arguments
:returns: operation result
"""
port = int(port)
return self._run_powershell_op('configure_port.ps1', 'open_port', script_file_args=[port, "open"])
def close_port(self, port: int, *_: Any) -> NydusResult:
"""Configure the system so the specified port cannot be accessed remotely.
:param port: The port number to close
:param *_: op chaining arguments
:returns: operation result
"""
port = int(port)
return self._run_powershell_op('configure_port.ps1', 'close_port', script_file_args=[port, "close"])
def install_qemu_agent(self, pypi_url: str, *args):
"""
Install qemu guest agent on the vm
:param pypi_url: The url for the pypi server where the package is located
"""
package_url = pypi_url + "/-/" + self.DISTRO + "/" + self.QEMU_PACKAGE_NAME
return self._run_powershell_op("install_qemu_agent.ps1", "install_qemu_agent",
script_file_args=[package_url])
def install_devcon(self, pypi_url: str, *args):
"""
Install devcon on the vm
:param pypi_url: The url for the pypi server where the package is located
"""
package_url = pypi_url + "/-/" + self.DISTRO + "/" + self.DEVCON
vertDriver_url = pypi_url + "/-/" + self.DISTRO + "/" + self.VIRTIO
return self._run_powershell_op("install_devcon.ps1", "install_devcon",
script_file_args=[package_url, vertDriver_url])
def get_os_info(self, *args: Any) -> Any:
"""Returns the Operating System information using systeminfo"""
op_name = 'get_os_info'
os_info = 'NAME=windows\n'
command = r"""
systeminfo | Select-String "OS"
"""
exit_code, outs, errs = run_powershell(command, op_name)
if exit_code == 0 and outs:
for line in outs.split('\n'):
if line.startswith('OS Name:'):
# Using regular expression to get the version/year.
# Example string: "OS Name: Microsoft Windows Server 2022 Standard"
pattern = r'\b\d{4}\b'
version = re.search(pattern, line)
os_info += 'VERSION={version}\n'.format(version=version.group(0))
return os_info
return False, self.build_result_dict(outs, errs, op_name)
def get_file_info(self, paths: str) -> Dict[str, Any]:
"""
Op for retrieving file info (if file exists), for now for Windows returning only info whether file exists
:param paths: Comma-separated list of paths of files
:return: Dict with filenames and files related info (if file present)
"""
paths_list = map(str.strip, paths.split(","))
def _get_file_dict(path: str) -> Dict[str, Any]:
file_loc = Path(os.path.normpath(path))
return {
"fileName": path,
"exists": file_loc.exists()
}
out_list = list(map(_get_file_dict, paths_list))
return {"files": out_list}
def get_disk_info(self, *args: Any) -> NydusResult:
"""Get disk information using powershell"""
_, outs, _ = run_powershell_file("get_disk_info.ps1", "get_disk_info")
try:
disk_info = json.loads(outs)
except json.JSONDecodeError as e:
LOG.error("Failed to parse disk info JSON: %s", e)
disk_info = {}
return disk_info
def shutdown_ovh_vm(self, *args):
"""
This Function is created to graceful shutdown ovh vm to avoid file corruption
"""
return self._run_powershell_op("shutdown_ovh_vm.ps1", "shutdown_ovh_vm")
def reset_vm_properties(self, *args):
"""
This Function is created to reset properties after graceful shutdown ovh vm
"""
return self._run_powershell_op("reset_vm_properties.ps1", "reset_vm_properties")
def get_system_info(self, *args):
"""
This Function is created to graceful shutdown ovh vm to avoid file corruption
"""
op_name = "get_system_info"
exit_code, outs, errs = run_powershell_file('get_system_info.ps1', op_name, quiet=True)
return exit_code == 0, self.build_result_dict(outs, errs, op_name)
class Windows2016(Windows):
def shutdown_clean(self, unused=None):
raise NotImplementedError
def install_panopta(self, payload, *args, **kwargs):
"""
Installs Panopta with the specified payload and additional arguments.
This method sets the SSL version to use TLS 1.1 and TLS 1.2 for the installation process.
Args:
payload (dict): The payload data required for the installation.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
The result of the superclass's install_panopta method with the specified SSL version.
"""
ssl_version = "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls11 -bor [System.Net.SecurityProtocolType]::Tls12;"
return super().install_panopta(payload, *args, **kwargs, ssl_version=ssl_version)
class Windows2019(Windows):
def shutdown_clean(self, unused=None):
raise NotImplementedError
class Windows2022(Windows):
def shutdown_clean(self, unused=None):
raise NotImplementedError
class Windows2025(Windows):
def shutdown_clean(self, unused=None):
raise NotImplementedError