# This file is part of pycloudlib. See LICENSE file for license information.
"""Azure instance."""
import datetime
import time
from collections import namedtuple
from enum import Enum, auto
from typing import Any, Dict, List, Optional
import requests
from azure.core.exceptions import ResourceExistsError
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.network.models import NetworkInterface
from pycloudlib.errors import PycloudlibError, PycloudlibTimeoutError
from pycloudlib.instance import BaseInstance
from pycloudlib.util import update_nested
BootDiagnostics = namedtuple("BootDiagnostics", ["console_log_url", "logs"])
BOOT_DIAGNOSTICS_URI_DELAY = 60
[docs]
class VMInstanceStatus(Enum):
"""Represents VM Instance state during its lifecycle."""
FAILED_PROVISION = auto()
ACTIVE = auto()
DELETED = auto()
STOPPED = auto()
[docs]
class AzureInstance(BaseInstance):
"""Azure backed instance."""
_type = "azure"
[docs]
def __init__(
self,
key_pair,
client,
instance,
network_client,
*,
username: Optional[str] = None,
get_boot_diagnostics: bool = False,
status: VMInstanceStatus = VMInstanceStatus.ACTIVE,
):
"""Set up instance.
Args:
key_pair: SSH key object
client: Azure compute management client
instance: created azure instance object
username: username to use when connecting via SSH
"""
super().__init__(key_pair, username=username)
self._client: ComputeManagementClient = client
self._network_client: NetworkManagementClient = network_client
self._instance = instance
self.boot_timeout = 300
self._status: VMInstanceStatus = status
self._boot_diagnostics_log = self._get_boot_diagnostics() if get_boot_diagnostics else None
[docs]
def wait_for_delete(self):
"""Wait for instance to be deleted."""
raise NotImplementedError
[docs]
def wait_for_stop(self, **kwargs):
"""Wait for instance stop."""
for _ in range(100):
power_state = (
self._client.virtual_machines.get(
resource_group_name=self._instance["rg_name"],
vm_name=self.name,
expand="instanceView",
)
.instance_view.statuses[1]
.display_status
)
if power_state == "VM stopped":
return
time.sleep(1)
raise PycloudlibTimeoutError
@property
def image_id(self):
"""Return the image_id from which this instance was created."""
storage_profile = self._instance["vm"].as_dict().get("storage_profile", {})
image_ref = storage_profile.get("image_reference", {})
if image_ref:
return ":".join(
[
image_ref.get("publisher", "").lower(),
image_ref.get("offer", ""),
image_ref.get("sku", ""),
image_ref.get("version", ""),
]
)
# Snapshot instances will not contain such info. For them, we will
# return a default string
return "snapshot-image"
@property
def ip(self):
"""Return IP address of instance."""
return self._instance["ip_address"]
@property
def id(self):
"""Return instance id."""
return self._instance["vm"].id
@property
def name(self):
"""Return instance name."""
return self._instance["vm"].name
@property
def sku(self):
"""Return instance sku."""
image_profile = self._instance["vm"].storage_profile.image_reference
return getattr(image_profile, "sku", "")
@property
def offer(self):
"""Return instance sku."""
image_profile = self._instance["vm"].storage_profile.image_reference
return getattr(image_profile, "offer", "")
@property
def location(self) -> str:
"""Return instance location."""
return self._instance["vm"].location
[docs]
def console_log(self) -> Optional[str]:
"""Return the instance console log."""
if not self._boot_diagnostics_log:
return None
return self._boot_diagnostics_log
@property
def status(self) -> VMInstanceStatus:
"""Return VM instance status."""
return self._status
[docs]
def shutdown(self, wait=True, **kwargs):
"""Shutdown the instance.
Args:
wait: wait for the instance shutdown
"""
shutdown = self._client.virtual_machines.begin_power_off(
resource_group_name=self._instance["rg_name"], vm_name=self.name
)
if wait:
shutdown.wait()
[docs]
def generalize(self):
"""Set the OS state of the instance to generalized."""
self._client.virtual_machines.generalize(
resource_group_name=self._instance["rg_name"], vm_name=self.name
)
def _get_boot_diagnostics(self) -> Optional[str]:
"""Get VM boot diagnostics logs.
Returns the boot diagnostics logs.
"""
response = None
self._log.info(
"Obtaining boot diagnostics logs for instance: %s",
self._instance["rg_name"],
)
try:
virtual_machines = self._client.virtual_machines
diagnostics = virtual_machines.retrieve_boot_diagnostics_data(
self._instance["rg_name"], self.name
)
# Azure has a 60 secs delay for the boot diagnostics to be active.
time.sleep(BOOT_DIAGNOSTICS_URI_DELAY)
if not diagnostics.serial_console_log_blob_uri:
raise PycloudlibError("No serial console log blob uri has been set.")
response = requests.get(diagnostics.serial_console_log_blob_uri, timeout=10)
except ResourceExistsError:
self._log.warning("Boot diagnostics not enabled, so none is collected.")
return None
except requests.exceptions.Timeout as e:
self._log.error("Request timed out while getting boot diagnostics logs: %s", e)
return None
return response.text
[docs]
def start(self, wait=True):
"""Start the instance.
Args:
wait: wait for the instance to start.
"""
start = self._client.virtual_machines.begin_start(
resource_group_name=self._instance["rg_name"], vm_name=self.name
)
if wait:
start.wait()
self.wait()
self._status = VMInstanceStatus.ACTIVE
def _wait_for_instance_start(self, **kwargs):
for _ in range(120):
view = self._client.virtual_machines.instance_view(self._instance["rg_name"], self.name)
status = view.statuses[1].display_status
if status.lower() == "vm running":
return True
time.sleep(1)
raise PycloudlibTimeoutError("VM did not start.")
def _do_restart(self, **kwargs):
"""Restart the instance."""
self._client.virtual_machines.begin_restart(
resource_group_name=self._instance["rg_name"], vm_name=self.name
)
# pylint: disable=broad-except
[docs]
def delete(self, wait=True) -> List[Exception]:
"""Delete instance."""
if self._status == VMInstanceStatus.DELETED:
return []
try:
poller = self._client.virtual_machines.begin_delete(
resource_group_name=self._instance["rg_name"],
vm_name=self.name,
)
if wait:
poller.wait(timeout=300)
if not poller.done():
return [PycloudlibTimeoutError("Resource not deleted after 300 seconds")]
self._status = VMInstanceStatus.DELETED
self._instance = None
except Exception as e:
return [e]
return []
[docs]
def add_network_interface(self, **kwargs) -> str:
"""Add network interface to instance.
Creates NIC and adds to the VM instance.
NOTE: It will deallocate the virtual machine, add the NIC,
then start the virtual machine.
Returns the private ip address of the new NIC.
"""
# pylint: disable=too-many-locals
# get subnet id and network security group id of primary nic
default_nic_id = self._instance["vm"].network_profile.network_interfaces[0].id
all_nics = list(self._network_client.network_interfaces.list_all())
default_nic = [nic for nic in all_nics if nic.id == default_nic_id]
if len(default_nic) == 0:
raise PycloudlibError("Could not get the first/default NIC")
default_nic = default_nic[0] # type: ignore
subnet_id = default_nic.ip_configurations[0].subnet.id # type: ignore
nsg_id = default_nic.network_security_group.id # type: ignore
us = datetime.datetime.now().strftime("%f")
# get ip address object
ip_address_obj = self._create_ip_address()
ip_config_name = f"{self.name}-{us}-ip-config"
ip_config = {
"name": ip_config_name,
"subnet": {"id": subnet_id},
"public_ip_address": {"id": ip_address_obj.id},
}
default_config = {
"location": self.location,
"ip_configurations": [ip_config],
"network_security_group": {"id": nsg_id},
"tags": None,
}
nic_name = f"{self.name}-nic-{us}"
nic_poller = self._network_client.network_interfaces.begin_create_or_update( # type: ignore
self._instance["rg_name"], nic_name, default_config
)
created_nic = nic_poller.result()
nic_details = {"id": created_nic.id, "primary": False}
self._attach_nic_to_vm([nic_details])
return created_nic.ip_configurations[0].private_ip_address
[docs]
def remove_network_interface(self, ip_address: str):
"""Remove nic from running instance.
Args:
ip_address: private ip address of the NIC
"""
# Get details of the NICs attached to the VM.
vm_nics_ids = [nic.id for nic in self._instance["vm"].network_profile.network_interfaces]
all_nics: List[NetworkInterface] = list(self._network_client.network_interfaces.list_all())
vm_nics = [nic for nic in all_nics if nic.id in vm_nics_ids]
primary_nic: Optional[NetworkInterface] = None
primary_nic = [nic for nic in vm_nics if nic.primary][0]
nic_params = []
nic_to_remove: Optional[NetworkInterface] = None
for vm_nic in vm_nics:
nic_private_ip = vm_nic.ip_configurations[0].private_ip_address # type: ignore
if nic_private_ip == ip_address:
nic_to_remove = vm_nic
else:
nic_params.append({"id": vm_nic.id, "primary": vm_nic.primary})
if not nic_to_remove:
raise PycloudlibError(f"Did not find NIC with private ip address: {ip_address}")
# if primary nic is removed, then make the next NIC as primary
if nic_to_remove.primary:
primary_nic = None
for nic_param in nic_params:
primary_nics = [nic for nic in vm_nics if nic.id == nic_param["id"]]
if len(primary_nics) > 0:
primary_nic = primary_nics[0]
nic_param["primary"] = True
break
if not primary_nic:
raise PycloudlibError("Could not set Primary NIC.")
self._remove_nic_from_vm(nic_params, primary_nic)
# delete the removed NIC
self._network_client.network_interfaces.begin_delete(
self._instance["rg_name"],
nic_to_remove.name, # type: ignore
)
def _remove_nic_from_vm(
self,
new_nic_params: List[Dict[str, Any]],
primary_nic: NetworkInterface,
):
do_start: bool = False
if self._status != VMInstanceStatus.STOPPED:
self._log.debug("Deallocating instance to remove NICs")
# Azure deallocates VM before removing NiCs
self.deallocate()
do_start = True
# Deleting will be async, no need to wait
all_ips = list(self._network_client.public_ip_addresses.list_all())
params = self._instance["vm"].as_dict()
net_params = {"network_profile": {"network_interfaces": new_nic_params}}
update_nested(params, net_params)
poll = self._client.virtual_machines.begin_create_or_update(
self._instance["rg_name"], self.name, params
)
# Update VM and Ip address
self._instance["vm"] = poll.result()
self._instance["ip_address"] = [
ip_addr.ip_address
for ip_addr in all_ips
if ip_addr.id == primary_nic.ip_configurations[0].public_ip_address.id # type: ignore
][0]
if do_start:
self.start()
def _attach_nic_to_vm(self, nics: List[Dict[str, Any]]):
"""Attach nics to instance."""
do_start: bool = False
# NOTE: Azure needs the VM to be deallocated to add NICs
if self._status != VMInstanceStatus.STOPPED:
self._log.debug("Deallocating instance to attach NICs.")
self.deallocate()
do_start = True
params = self._instance["vm"].as_dict()
vm_attached_nics = params["network_profile"]["network_interfaces"]
vm_attached_nics.extend(nics)
net_params = {"network_profile": {"network_interfaces": vm_attached_nics}}
update_nested(params, net_params)
poll = self._client.virtual_machines.begin_create_or_update(
self._instance["rg_name"], self.name, params
)
self._instance["vm"] = poll.result()
if do_start:
self.start()
def _create_ip_address(self):
us = datetime.datetime.now().strftime("%f")
ip_name = f"{self.name}-{us}-ip"
parameters = {
"location": self.location,
"sku": {"name": "Standard"},
"public_ip_allocation_method": "Static",
"rpublic_ip_address_version": "IPV4",
"tags": None,
}
ip_poller = self._network_client.public_ip_addresses.begin_create_or_update(
self._instance["rg_name"],
ip_name,
parameters,
)
return ip_poller.result()
[docs]
def deallocate(self):
"""De-allocates the VM. Releases the resources to modify VM."""
self._client.virtual_machines.begin_deallocate(
resource_group_name=self._instance["rg_name"], vm_name=self.name
).result()
self._status = VMInstanceStatus.STOPPED