# This file is part of pycloudlib. See LICENSE file for license information.
# pylint: disable=C0302
"""Azure Cloud type."""
import base64
import contextlib
import datetime
import logging
from typing import Dict, List, Optional
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.resource import ResourceManagementClient
from pycloudlib.azure import security_types, util
from pycloudlib.azure.instance import AzureInstance, VMInstanceStatus
from pycloudlib.cloud import BaseCloud, ImageType
from pycloudlib.config import ConfigFile
from pycloudlib.errors import (
InstanceNotFoundError,
NetworkNotFoundError,
PycloudlibError,
PycloudlibTimeoutError,
)
from pycloudlib.util import get_timestamped_tag, update_nested
# Use Azure CLI for valid images: az vm image list -p Canonical --all -o table
UBUNTU_DAILY_IMAGES = {
"xenial": "Canonical:UbuntuServer:16.04-DAILY-LTS:latest",
"bionic": "Canonical:UbuntuServer:18.04-DAILY-LTS:latest",
"focal": "Canonical:0001-com-ubuntu-server-focal-daily:20_04-daily-lts:latest", # noqa: E501
"jammy": "Canonical:0001-com-ubuntu-server-jammy-daily:22_04-daily-lts:latest", # noqa: E501
"noble": "Canonical:ubuntu-24_04-lts-daily:server:latest",
"plucky": "Canonical:ubuntu-25_04-daily:server:latest",
"questing": "Canonical:ubuntu-25_10-daily:server:latest",
"resolute": "Canonical:ubuntu-26_04-lts-daily:server:latest",
}
UBUNTU_MINIMAL_DAILY_IMAGES = {
"focal": "Canonical:0001-com-ubuntu-minimal-focal-daily:minimal-20_04-daily-lts:latest", # noqa: E501
"jammy": "Canonical:0001-com-ubuntu-minimal-jammy-daily:minimal-22_04-daily-lts:latest", # noqa: E501
"mantic": "Canonical:0001-com-ubuntu-minimal-mantic-daily:minimal-23_10-daily:latest", # noqa: E501
"noble": "Canonical:ubuntu-24_04-lts-daily:minimal:latest",
"plucky": "Canonical:ubuntu-25_04-daily:minimal:latest",
"questing": "Canonical:ubuntu-25_04-daily:minimal:latest",
"resolute": "Canonical:ubuntu-26_04-lts-daily:minimal:latest",
}
UBUNTU_DAILY_PRO_IMAGES = {
"xenial": "Canonical:0001-com-ubuntu-pro-xenial:pro-16_04-lts:latest",
"bionic": "Canonical:0001-com-ubuntu-pro-bionic:pro-18_04-lts:latest",
"focal": "Canonical:0001-com-ubuntu-pro-focal:pro-20_04-lts:latest",
"jammy": "Canonical:0001-com-ubuntu-pro-jammy:pro-22_04-lts:latest",
"noble": "Canonical:ubuntu-24_04-lts:ubuntu-pro:latest",
}
UBUNTU_DAILY_PRO_FIPS_IMAGES = {
"xenial": "Canonical:0001-com-ubuntu-pro-xenial-fips:pro-fips-16_04-private:latest", # noqa: E501
"bionic": "Canonical:0001-com-ubuntu-pro-bionic-fips:pro-fips-18_04:latest", # noqa: E501
"focal": "Canonical:0001-com-ubuntu-pro-focal-fips:pro-fips-20_04:latest",
}
UBUNTU_DAILY_PRO_FIPS_UPDATES_IMAGES = {
"jammy": "Canonical:0001-com-ubuntu-pro-jammy-fips:pro-fips-22_04-gen1:latest",
}
UBUNTU_RELEASE_IMAGES = {
"xenial": "Canonical:UbuntuServer:16.04-LTS:latest",
"bionic": "Canonical:UbuntuServer:18.04-LTS:latest",
"focal": "Canonical:0001-com-ubuntu-server-focal:20_04-lts-gen2:latest",
"jammy": "Canonical:0001-com-ubuntu-server-jammy:22_04-lts-gen2:latest",
"noble": "Canonical:ubuntu-24_04-lts:server:latest",
"plucky": "Canonical:ubuntu-25_04-daily:server:latest",
"questing": "Canonical:ubuntu-25_10-daily:server:latest",
# TODO(20241031: drop -daily once resolute release is published)
"resolute": "Canonical:ubuntu-26_04-lts-daily:server:latest",
}
UBUNTU_CVM_IMAGES = {
"focal": "Canonical:0001-com-ubuntu-confidential-vm-focal:20_04-lts-cvm:latest", # noqa: E501
"jammy": "Canonical:0001-com-ubuntu-confidential-vm-jammy:22_04-lts-cvm:latest", # noqa: E501
"noble": "Canonical:ubuntu-24_04-lts:cvm:latest",
}
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.WARNING)
[docs]
class Azure(BaseCloud):
"""Azure Cloud Class."""
_type = "azure"
[docs]
def __init__(
self,
tag: str,
timestamp_suffix: bool = True,
config_file: Optional[ConfigFile] = None,
*,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
subscription_id: Optional[str] = None,
tenant_id: Optional[str] = None,
region: Optional[str] = None,
resource_group_params: Optional[util.AzureParams] = None,
username: Optional[str] = None,
enable_boot_diagnostics: bool = False,
):
"""Initialize the connection to Azure.
Azure will try to read user credentials form the /home/$USER/.azure
folder. However, we can overwrite those credentials with the provided
id parameters.
Args:
tag: string used to name and tag resources with
timestamp_suffix: bool set True to append a timestamp suffix to the
tag
config_file: path to pycloudlib configuration file
client_id: user's client id
client_secret: user's client secret access key
subscription_id: user's subscription id key
tenant_id: user's tenant id key
region: The region where the instance will be created
resource_group_params: The resource group override parameters.
enable_boot_diagnostics: flag to configure if boot diagnostics
logs will be enabled and obtained for instances created.
"""
super().__init__(
tag,
timestamp_suffix,
config_file,
required_values=[
client_id,
client_secret,
subscription_id,
tenant_id,
],
)
self.created_resource_groups: List = []
self._log.debug("logging into Azure")
self.location = region or self.config.get("region") or "centralus"
self.username = username or "ubuntu"
self.registered_instances: Dict[str, AzureInstance] = {}
self.registered_images: Dict[str, dict] = {}
config_dict = {}
client_id = client_id or self.config.get("client_id")
if client_id:
config_dict["clientId"] = client_id
client_secret = client_secret or self.config.get("client_secret")
if client_secret:
config_dict["clientSecret"] = client_secret
subscription_id = subscription_id or self.config.get("subscription_id")
if subscription_id:
config_dict["subscriptionId"] = subscription_id
tenant_id = tenant_id or self.config.get("tenant_id")
if tenant_id:
config_dict["tenantId"] = tenant_id
self.resource_client = util.get_client(ResourceManagementClient, config_dict)
self.network_client = util.get_client(NetworkManagementClient, config_dict)
self.compute_client = util.get_client(ComputeManagementClient, config_dict)
self.resource_group = self._create_resource_group(resource_group_params)
self.base_tag = tag
self._enable_boot_diagnostics = enable_boot_diagnostics
def __exit__(self, exc_type, exc_value, exc_traceback):
"""Log azure boot diagnostics and then cleanup."""
if exc_type:
for instance in self.created_instances:
if instance.status == VMInstanceStatus.FAILED_PROVISION:
self._log.info("Boot diagnostics for %s:", instance.name)
self._log.info("%s", instance.console_log())
super().__exit__(exc_type, exc_value, exc_traceback)
[docs]
def image_serial(self, image_id):
"""Find the image serial of the latest daily image for a particular release.
Args:
image_id: string, Ubuntu image id
Returns:
string, serial of latest image
"""
raise NotImplementedError
def _create_network_security_group(
self,
inbound_ports,
network_security_group_params: Optional[util.AzureCreateParams] = None,
):
"""Create a network security group.
This method creates a network security groups that allows the user
to ssh into the machine and execute commands.
Args:
inbound_ports: List of strings, optional inbound ports
to enable in the instance.
network_security_group_params: Azure network security group details
Returns:
The network security object created by Azure
"""
if not inbound_ports:
inbound_ports = []
# We need to guarantee that the 22 port is enabled
# here, otherwise we will not be able to ssh into it
if "22" not in inbound_ports:
inbound_ports = ["22"] + inbound_ports
security_group_name = (
network_security_group_params.name
if network_security_group_params
else "{}-sgn".format(self.tag)
)
resource_group_name = (
network_security_group_params.resource_group_name
if network_security_group_params
else self.resource_group.name
)
nsg_group = self.network_client.network_security_groups
self._log.debug("Creating Azure network security group")
security_rules = []
# The lower the number, the higher is the priority of the rule.
# We are assuming here that the SSH rule will be the first item
# in the list
priority = 300
for port in inbound_ports:
security_rules.append(
{
"name": "port-{}".format(port),
"priority": priority,
"protocol": "TCP",
"access": "Allow",
"direction": "Inbound",
"sourceAddressPrefix": "*",
"sourcePortRange": "*",
"destinationAddressPrefix": "*",
"destinationPortRange": port,
}
)
priority += 10
parameters = {
"location": self.location,
"security_rules": security_rules,
}
if network_security_group_params and network_security_group_params.parameters:
update_nested(parameters, network_security_group_params.parameters)
nsg_poller = nsg_group.begin_create_or_update(
resource_group_name=resource_group_name,
network_security_group_name=security_group_name,
parameters=parameters,
)
return nsg_poller.result()
def _create_resource_group(self, resource_group_params: Optional[util.AzureParams] = None):
"""Create a resource group.
This method creates an Azure resource group. Every other component that
we create will be contained into this resource group. This means that
if we delete this resource group, we delete all resources associated
with it.
Args:
resource_group_params: Azure resource group override parameters.
Returns:
The resource group created by Azure
"""
resource_name = (
resource_group_params.name if resource_group_params else "{}-rg".format(self.tag)
)
self._log.debug("Creating Azure resource group")
with contextlib.suppress(ResourceNotFoundError):
return self.resource_client.resource_groups.get(resource_name)
parameters = {"location": self.location, "tags": {"name": self.tag}}
if resource_group_params and resource_group_params.parameters:
update_nested(parameters, resource_group_params.parameters)
resource_group = self.resource_client.resource_groups.create_or_update(
resource_name,
parameters,
)
self.created_resource_groups.append(resource_group)
return resource_group
def _create_virtual_network(
self,
address_prefixes=None,
virtual_network_params: Optional[util.AzureCreateParams] = None,
):
"""Create a virtual network.
This method creates an Azure virtual network to be used
when provisioning a subnet.
Args:
address_prefixes: list of strings, A list of address prefixes
to be used in this virtual network.
virtual_network_params: Azure virtual network override details.
Returns:
The virtual network created by Azure
"""
if address_prefixes is None:
address_prefixes = ["10.0.0.0/16"]
virtual_network_name = (
virtual_network_params.name if virtual_network_params else "{}-vnet".format(self.tag)
)
resource_group_name = (
virtual_network_params.resource_group_name
if virtual_network_params
else self.resource_group.name
)
parameters = {
"location": self.location,
"address_space": {"address_prefixes": address_prefixes},
"tags": {"name": self.tag},
}
if virtual_network_params and virtual_network_params.parameters:
update_nested(parameters, virtual_network_params.parameters)
self._log.debug("Creating Azure virtual network")
network_poller = self.network_client.virtual_networks.begin_create_or_update(
resource_group_name,
virtual_network_name,
parameters,
)
return network_poller.result()
def _create_subnet(
self,
vnet_name,
subnet_params: Optional[util.AzureCreateParams] = None,
address_prefix="10.0.0.0/24",
):
"""Create a subnet.
This method creates an Azure subnet to be used when
provisioning a network interface.
Args:
subnet_params: AzureCreateParams, subnet options/parameters
to override/create subnet.
address_prefix: string, An address prefix to be used for
this subnet.
Returns:
The subnet created by Azure
"""
subnet_name = subnet_params.name if subnet_params else "{}-subnet".format(self.tag)
resource_group_name = (
subnet_params.resource_group_name if subnet_params else self.resource_group.name
)
parameters = {
"address_prefix": address_prefix,
"tags": {"name": self.tag},
}
if subnet_params and subnet_params.parameters:
update_nested(parameters, subnet_params.parameters)
self._log.debug("Creating Azure subnet")
subnet_poller = self.network_client.subnets.begin_create_or_update(
resource_group_name,
vnet_name,
subnet_name,
parameters,
)
return subnet_poller.result()
def _create_ip_address(self, ip_addr_params: Optional[util.AzureCreateParams] = None):
"""Create an ip address.
This method creates an Azure ip address to be used when
provisioning a network interface
Args:
ip_addr_params: AzureCreateParams, ip address params to
override/create ip addr options.
Returns:
The ip address created by Azure
"""
us = datetime.datetime.now().strftime("%f")
ip_name = ip_addr_params.name if ip_addr_params else "{}-{}-ip".format(self.tag, us)
resource_group_name = (
ip_addr_params.resource_group_name if ip_addr_params else self.resource_group.name
)
parameters = {
"location": self.location,
"sku": {"name": "Standard"},
"public_ip_allocation_method": "Static",
"rpublic_ip_address_version": "IPV4",
"tags": {"name": self.tag},
}
if ip_addr_params and ip_addr_params.parameters:
update_nested(parameters, ip_addr_params.parameters)
self._log.debug("Creating Azure ip address")
ip_poller = self.network_client.public_ip_addresses.begin_create_or_update(
resource_group_name,
ip_name,
parameters,
)
return ip_poller.result()
def _create_network_interface_client(
self,
ip_address_id,
subnet_id,
nsg_id,
nic_params: Optional[util.AzureCreateParams] = None,
):
"""Create a network interface client.
This method creates an Azure network interface to be used when
provisioning a virtual machine
Args:
ip_address_id: string, The ip address id
subnet_id: string, the subnet id
nsg_id: string, the network security group id
nic_params: AzureCreateParams, NIC params to override/create
NIC options.
Returns:
The ip address created by Azure
"""
nic_name = nic_params.name if nic_params else "{}-nic".format(self.tag)
us = datetime.datetime.now().strftime("%f")
ip_config_name = "{}-{}-ip-config".format(nic_params.name if nic_params else self.tag, us)
resource_group_name = (
nic_params.resource_group_name if nic_params else self.resource_group.name
)
nic_config = {
"location": self.location,
"ip_configurations": [
{
"name": ip_config_name,
"subnet": {"id": subnet_id},
"public_ip_address": {"id": ip_address_id},
}
],
"network_security_group": {"id": nsg_id},
"tags": {"name": self.tag},
}
if nic_params and nic_params.parameters:
update_nested(nic_config, nic_params.parameters)
self._log.debug("Creating Azure network interface")
nic_poller = self.network_client.network_interfaces.begin_create_or_update(
resource_group_name, nic_name, nic_config
)
return nic_poller.result()
def _create_vm_parameters(self, name, image_id, instance_type, nic_ids, user_data):
"""Create the virtual machine parameters to be used for provision.
Composes the dict that will be used to provision an Azure virtual
machine. We check if the user has passed user_data and the type of
image_id we are receiving, which can be snapshots ids or not.
Args:
name: string, The name of the virtual machine.
image_id: string, The identifier of an image.
instance_type: string, Type of instance to create.
nic_ids: list[string], The network interface ids.
user_data: string, The user data to be passed to the
virtual machine.
Returns:
A dict containing the parameters to provision a virtual machine.
"""
nics = [
{
"id": nic_id,
"properties": {"primary": i == 0},
}
for i, nic_id in enumerate(nic_ids)
]
vm_parameters = {
"location": self.location,
"hardware_profile": {"vm_size": instance_type},
"storage_profile": {"image_reference": {}},
"os_profile": {
"computer_name": name,
"admin_username": self.username,
"linux_configuration": {
"ssh": {
"public_keys": [
{
"path": "/home/{}/.ssh/authorized_keys".format(self.username),
"key_data": self.key_pair.public_key_content,
}
]
},
"disable_password_authentication": True,
},
},
"diagnostics_profile": {"boot_diagnostics": {"enabled": self._enable_boot_diagnostics}},
"network_profile": {
"network_interfaces": nics,
},
"tags": {"name": self.tag},
}
if user_data:
# We need to encode the user_data into base64 before sending
# it to the virtual machine.
vm_parameters["os_profile"]["custom_data"] = base64.b64encode(
user_data.encode()
).decode()
vm_parameters["storage_profile"]["image_reference"] = util.get_image_reference_params(
image_id
)
# We can have pro images from two different sources; marketplaces
# and snapshots. A snapshot image does not have the necessary metadata
# encoded in the image_id to create the 'plan' dict. In this case,
# we get the necessary info from the registered_images dict
# where we store the required metadata about any snapshot created by
# pycloudlib.
registered_image = self.registered_images.get(image_id)
if util.is_pro_image(image_id, registered_image):
vm_parameters["plan"] = util.get_plan_params(image_id, registered_image)
return vm_parameters
def _create_virtual_machine(
self,
image_id,
instance_type,
nic_ids,
user_data,
name,
vm_params=None,
provisioning_timeout=None,
):
"""Create a virtual machine.
This method provisions an Azure virtual machine for the image_id
provided by the user.
Args:
image_id: string, The image to be used when provisiong
a virtual machine.
instance_type: string, Type of instance to create
nic_ids: string, The network interfaces to used for this
virtual machine.
user_data: string, user data used by cloud-init when
booting the virtual machine.
name: string, optional name to provide when creating the vm.
vm_params: dict containing values as vm_params to send to
virtual_machines.begin_create_or_update.
provisioning_timeout: int, timeout in seconds for provisioning
the VM, defaults to None i.e. use Azure's default.
Returns:
The virtual machine created by Azure
"""
if not name:
name = "{}-vm".format(self.tag)
params = self._create_vm_parameters(name, image_id, instance_type, nic_ids, user_data)
if vm_params:
update_nested(params, vm_params)
self._log.debug("Creating Azure virtual machine: %s", name)
try:
vm_poller = self.compute_client.virtual_machines.begin_create_or_update(
self.resource_group.name,
name,
params,
)
vm_poller.wait(provisioning_timeout)
if not vm_poller.done():
raise PycloudlibTimeoutError("Virtual machine creation timed out.")
return vm_poller.result()
except HttpResponseError as e:
err_code = e.error.code
err_msg = e.error.message
raise PycloudlibError(f"Virtual machine creation error: {err_code}\n{err_msg}") from e
[docs]
def delete_image(self, image_id, **kwargs):
"""Delete an image from Azure.
Args:
image_id: string, The id of the image to be deleted
"""
image_name = util.get_resource_name_from_id(image_id)
if not image_name:
return
resource_group_name = util.get_resource_group_name_from_id(image_id)
delete_poller = self.compute_client.images.begin_delete(
resource_group_name=resource_group_name, image_name=image_name
)
delete_poller.wait()
if delete_poller.status() == "Succeeded":
if image_id in self.registered_images:
del self.registered_images[image_id]
self._log.debug("Image %s was deleted", image_id)
else:
self._log.debug(
"Error deleting %s. Status: %d",
image_id,
delete_poller.status(),
)
def _get_image(self, release, image_map):
image_id = image_map.get(release)
if image_id is None:
msg = "No Ubuntu image found for {}. Expected one of: {}"
raise ValueError(msg.format(release, " ".join(image_map.keys())))
return image_id
[docs]
def released_image(self, release):
"""Get the released image.
Args:
release: string, Ubuntu release to look for
Returns:
string, id of latest image
"""
self._log.debug("finding release Ubuntu image for %s", release)
return self._get_image(release, UBUNTU_RELEASE_IMAGES)
[docs]
def confidential_vm_image(self, release):
"""Get the confidential computing vm image.
Args:
release: string, Ubuntu release to look for
Returns:
string, id of latest image
"""
self._log.debug("finding confidential vm Ubuntu image for %s", release)
return self._get_image(release, UBUNTU_CVM_IMAGES)
def _get_images_dict(self, image_type: ImageType):
if image_type == ImageType.GENERIC:
return UBUNTU_DAILY_IMAGES
if image_type == ImageType.PRO:
return UBUNTU_DAILY_PRO_IMAGES
if image_type == ImageType.PRO_FIPS:
return UBUNTU_DAILY_PRO_FIPS_IMAGES
if image_type == ImageType.PRO_FIPS_UPDATES:
return UBUNTU_DAILY_PRO_FIPS_UPDATES_IMAGES
if image_type == ImageType.MINIMAL:
return UBUNTU_MINIMAL_DAILY_IMAGES
raise ValueError("Invalid image_type")
[docs]
def daily_image(
self,
release: str,
*,
image_type: ImageType = ImageType.GENERIC,
**kwargs,
):
"""Find the image info for the latest daily image for a given release.
Args:
release: string, Ubuntu release to look for.
Returns:
A string representing an Ubuntu image
"""
self._log.debug("finding daily Ubuntu image for %s", release)
return self._get_image(release, self._get_images_dict(image_type))
def _check_for_network_interfaces(self):
"""
Check for existing networking interfaces in instance resource group.
Check if we already have a network interface that is not attached to
any virtual machines in the instance resource group. If we have one
of those reoources, we just return it.
Returns:
An Azure network interface resource
"""
all_nics = self.network_client.network_interfaces.list(
resource_group_name=self.resource_group.name
)
for nic in all_nics:
if nic.virtual_machine is None:
return nic
return None
[docs]
def launch(
self,
image_id,
instance_type="Standard_DS1_v2",
user_data=None,
name=None,
inbound_ports=None,
username: Optional[str] = None,
resource_group_params: Optional[util.AzureParams] = None,
network_security_group_params: Optional[util.AzureCreateParams] = None,
virtual_network_params: Optional[util.AzureCreateParams] = None,
subnet_params: Optional[util.AzureCreateParams] = None,
ip_addresses_params: Optional[List[Optional[util.AzureCreateParams]]] = None,
network_interfaces_params: Optional[List[Optional[util.AzureCreateParams]]] = None,
security_type=security_types.AzureSecurityType.STANDARD,
provisioning_timeout: Optional[int] = None,
**kwargs,
):
"""Launch virtual machine on Azure.
Args:
image_id: string, Ubuntu image to use
user_data: string, user-data to pass to virtual machine
name: string, optional name to give the vm when launching.
Default results in a name of <tag>-vm
inbound_ports: List of strings, optional inbound ports
to enable in the instance.
security_type: AzureSecurityType, security on vm image.
Defaults to STANDARD
username: username to use when connecting via SSH
resource_group_params: AzureParams, options containing the resource
group details to use.
network_security_group_params: AzureParams, options containing the
network security group to use.
virtual_network_params: AzureCreateParams, options to override
and create vnet options.
subnet_params: AzureCreateParams, options to override and create
subnet options.
ip_addresses_params: list[AzureCreateParams], options to override
and create ip_address.
network_interfaces_params: list[AzureCreateParams],
options to override and create NICs.
provisioning_timeout: int, timeout in seconds for provisioning
the VM, defaults to None i.e. use Azure's default.
kwargs:
- vm_params: dict to override configuration for
virtual_machines.begin_create_or_update
- security_type_params: dict to configure security_types
Returns:
Azure Instance object
Raises: ValueError on invalid image_id
"""
# pylint: disable-msg=too-many-locals
# pylint: disable-msg=too-many-statements
if not image_id:
raise ValueError(f"{self._type} launch requires image_id param. Found: {image_id}")
if not ip_addresses_params:
ip_addresses_params = [None]
if not network_interfaces_params:
network_interfaces_params = [None]
if len(ip_addresses_params) > len(network_interfaces_params):
raise PycloudlibError(
"The number of `ip_addresses_params` cannot be more than "
"the number of `network_interfaces_params`"
)
self._log.debug("Launching Azure virtual machine: %s", image_id)
# For every new launch, we need to update the tag, since
# we are using it as a base for the name of all the
# resources we are creating.
self.tag = get_timestamped_tag(self.base_tag)
if self.resource_group is None or resource_group_params:
self.resource_group = self._create_resource_group(resource_group_params)
# We will not reuse existing network interfaces if we need to customize
# it to enable more ports. The rationale for is that we want to reuse
# those resources only if they are generic enough
nic = None
created_nics = []
created_ip_addresses = []
if not inbound_ports:
# Check if we already have an existing network interface that is
# not attached to a virtual machine. If we have, we will just
# use it
if not network_interfaces_params:
nic = self._check_for_network_interfaces()
created_nics.append(nic)
if nic is None:
self._log.debug("Could not find a network interface. Creating one now")
virtual_network = self._create_virtual_network(
virtual_network_params=virtual_network_params
)
self._log.debug("Created virtual network with name: %s", virtual_network.name)
subnet = self._create_subnet(
vnet_name=virtual_network.name, subnet_params=subnet_params
)
self._log.debug("Created subnet with name: %s", subnet.name)
ip_nics_diff = len(network_interfaces_params) - len(ip_addresses_params)
ip_addresses = ip_addresses_params + [None for _ in range(ip_nics_diff)]
for ip_address_ in ip_addresses:
ip_address = self._create_ip_address(ip_address_)
self._log.debug("Created ip address with name: %s", ip_address.name)
created_ip_addresses.append(ip_address)
ip_address_str = created_ip_addresses[0].ip_address
network_security_group = self._create_network_security_group(
inbound_ports=inbound_ports,
network_security_group_params=network_security_group_params,
)
self._log.debug(
"Created network security group with name: %s",
network_security_group.name,
)
for nic_obj, ip_addr in zip(network_interfaces_params, created_ip_addresses):
nic = self._create_network_interface_client(
ip_address_id=ip_addr.id,
subnet_id=subnet.id,
nsg_id=network_security_group.id,
nic_params=nic_obj,
)
created_nics.append(nic)
self._log.debug("Created network interface with name: %s", nic.name)
else:
ip_address_str = self._retrieve_ip_from_network_interface(nic=created_nics[0])
self._log.debug("Found network interface: %s. Reusing it", nic.name)
vm_params = kwargs.get("vm_params", {})
os_disk_encryption = kwargs.get("security_type_params", {}).get("os_disk_encryption", None)
security_types.configure_security_types_vm_params(
security_type, vm_params, os_disk_encryption
)
nic_ids = [nic.id for nic in created_nics]
vm_state: VMInstanceStatus
try:
vm = self._create_virtual_machine(
image_id=image_id,
instance_type=instance_type,
nic_ids=nic_ids,
user_data=user_data,
name=name,
vm_params=vm_params,
provisioning_timeout=provisioning_timeout,
)
vm_state = VMInstanceStatus.ACTIVE
except PycloudlibTimeoutError:
self._log.error("Provisioning timeout for instance %s.", name)
virtual_machines = self.compute_client.virtual_machines
vm = virtual_machines.get(self.resource_group.name, name)
vm_state = VMInstanceStatus.FAILED_PROVISION
instance_info = {
"vm": vm,
"ip_address": ip_address_str,
"rg_name": self.resource_group.name,
}
instance = AzureInstance(
key_pair=self.key_pair,
client=self.compute_client,
instance=instance_info,
network_client=self.network_client,
username=username,
get_boot_diagnostics=self._enable_boot_diagnostics,
status=vm_state,
)
self.created_instances.append(instance)
self.registered_instances[vm.name] = instance
return instance
def _create_ssh_resource(self, key_name):
"""Create a ssh resource.
This method creates an Azure ssh resource to be associated
with a resource group.
Args:
key_name: string, The name of the ssh resource.
"""
self.compute_client.ssh_public_keys.create(
self.resource_group.name,
key_name,
parameters={"location": self.location, "tags": {"name": self.tag}},
)
[docs]
def create_key_pair(self, key_name):
"""Create a pair of ssh keys.
This method creates an a pair of ssh keys in
the class resource group.
Args:
key_name: string, The name of the ssh resource.
"""
self._create_ssh_resource(key_name)
ssh_call = self.compute_client.ssh_public_keys.generate_key_pair(
resource_group_name=self.resource_group.name,
ssh_public_key_name=key_name,
)
# Azure's SDK returns multi-line DOS format for pubkeys.
# OpenSSH doesn't like this format and ignores it resulting in
# Unauthorized key errors. Issue: #88
return ssh_call.public_key.replace("\r\n", ""), ssh_call.private_key
[docs]
def list_keys(self):
"""List all ssh keys in the class resource group."""
ssh_public_keys = self.compute_client.ssh_public_keys
return [
ssh.name for ssh in ssh_public_keys.list_by_resource_group(self.resource_group.name)
]
[docs]
def delete_key(self, key_name):
"""Delete a ssh key from the class resource group.
Args:
key_name: string, The name of the ssh resource.
"""
ssh_public_keys = self.compute_client.ssh_public_keys
ssh_public_keys.delete(
resource_group_name=self.resource_group.name,
ssh_public_key_name=key_name,
)
[docs]
def use_key(self, public_key_path, private_key_path=None, name=None):
"""Use an existing already uploaded key.
Args:
public_key_path: path to the public key to upload
private_key_path: path to the private key to upload
name: name to reference key by
"""
if not name:
name = self.tag
super().use_key(public_key_path, private_key_path, name)
def _get_instances(self):
"""Return an iterable of Azure instances related to a subscription id.
Returns:
An list of azure virtual machine associated with the subscription
id
"""
return self.compute_client.virtual_machines.list_all()
def _retrieve_ip_from_network_interface(self, nic):
"""Retrieve the ip address associated with a network interface.
Args:
nic: An Azure network interface resource
Return:
A string representing the network interface ip address
"""
ip_address_id = nic.ip_configurations[0].public_ip_address.id
all_ips = self.network_client.public_ip_addresses.list_all()
for ip_address in all_ips:
if ip_address.id == ip_address_id:
return ip_address.ip_address
raise PycloudlibError(
f"""
Error locating the ip address: {ip_address_id}.
This ip address was not found in this subscription.
"""
)
def _retrive_instance_ip(self, instance):
"""Retrieve public ip address of instance.
Args:
instance: An Azure Virtual Machine object
Returns:
A string represeting the instance ip_address
"""
# Right now, we are only supporting getting the ip address for
# virtual machines with only one network profile attached to it
nic_id = instance.network_profile.network_interfaces[0].id
all_nics = self.network_client.network_interfaces.list_all()
instance_nic = None
for nic in all_nics:
if nic.id == nic_id:
instance_nic = nic
if instance_nic is None:
raise NetworkNotFoundError(resource_id=nic_id)
return self._retrieve_ip_from_network_interface(nic=instance_nic)
[docs]
def get_instance(
self,
instance_id,
search_all=False,
*,
username: Optional[str] = None,
**kwargs,
):
"""Get an instance by id.
Args:
instance_id: string, The instance name to search by
search_all: boolean, Flag that indicates that if we should search
for the instance in the entire reach of the
subsctription id. If false, we will search only
in the resource group created by this instance.
username: username to use when connecting via SSH
**kwargs: dictionary of other arguments to be used by this
method. Currently unused but provided for base
class compatibility.
Returns:
An instance object to use to manipulate the instance further.
"""
if search_all:
all_instances = self._get_instances()
for instance in all_instances:
if instance.name == instance_id:
ip_address = self._retrive_instance_ip(instance)
resource_group_name = util.get_resource_group_name_from_id(instance.id)
instance_info = {
"vm": instance,
"ip_address": ip_address,
"rg_name": resource_group_name,
}
azure_instance = AzureInstance(
key_pair=self.key_pair,
client=self.compute_client,
instance=instance_info,
network_client=self.network_client,
username=username,
)
self.registered_instances[instance.name] = azure_instance
return azure_instance
raise InstanceNotFoundError(instance_id)
if instance_id in self.registered_instances:
instance = self.registered_instances[instance_id]
if instance.status == VMInstanceStatus.DELETED:
raise PycloudlibError(f"The image {instance_id} was already deleted")
return instance
raise InstanceNotFoundError(resource_id=instance_id)
[docs]
def snapshot(self, instance, clean=True, delete_provisioned_user=True, **kwargs):
"""Snapshot an instance and generate an image from it.
Args:
instance: Instance to snapshot
clean: Run instance clean method before taking snapshot
delete_provisioned_user: Deletes the last provisioned user
kwargs: Other named arguments specific to this implementation
Returns:
An image id string
"""
if clean:
instance.clean()
user = "+user" if delete_provisioned_user else ""
instance.execute("sudo waagent -deprovision{} -force".format(user))
instance.shutdown(wait=True)
instance.generalize()
self._log.debug("creating custom image from instance %s", instance.id)
image_poller = self.compute_client.images.begin_create_or_update(
resource_group_name=self.resource_group.name,
image_name="%s-%s" % (self.tag, "image"),
parameters={
"location": self.location,
"source_virtual_machine": {"id": instance.id},
"tags": {"name": self.tag, "src-image-id": instance.image_id},
},
)
image = image_poller.result()
image_id = image.id
image_name = image.name
self.created_images.append(image_id)
self.registered_images[image_id] = {
"name": image_name,
"sku": instance.sku,
"offer": instance.offer,
}
return image_id
[docs]
def delete_resource_group(self, resource_group_name: Optional[str] = None):
"""Delete a resource group.
If no resource group is provided, delete self.resource_group
"""
if resource_group_name is None and self.resource_group:
resource_group_name = self.resource_group.name
self.resource_group = None
if resource_group_name:
with contextlib.suppress(ResourceNotFoundError):
poller = self.resource_client.resource_groups.begin_delete(
resource_group_name=resource_group_name
)
poller.wait(timeout=300)
if not poller.done():
raise PycloudlibTimeoutError("Resource not deleted after 300 seconds")
# pylint: disable=broad-except
[docs]
def clean(self) -> List[Exception]:
"""Cleanup ALL artifacts associated with this Cloud instance.
This includes all instances, snapshots, resources, etc.
To ensure cleanup isn't interrupted, any exceptions raised during
cleanup operations will be collected and returned.
"""
exceptions = super().clean()
for resource_group in self.created_resource_groups:
try:
self.delete_resource_group(resource_group.name)
except Exception as e:
exceptions.append(e)
return exceptions