# This file is part of pycloudlib. See LICENSE file for license information.
"""Azure Cloud type."""
import base64
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.compute import ComputeManagementClient
import pycloudlib.azure.util as util
from pycloudlib.cloud import BaseCloud
from pycloudlib.azure.instance import AzureInstance
from pycloudlib.config import ConfigFile
from pycloudlib.util import get_timestamped_tag, update_nested
[docs]class Azure(BaseCloud):
"""Azure Cloud Class."""
_type = 'azure'
UBUNTU_RELEASE = {
"xenial": "Canonical:UbuntuServer:16.04-DAILY-LTS",
"bionic": "Canonical:UbuntuServer:18.04-DAILY-LTS",
"focal": "Canonical:0001-com-ubuntu-server-focal-daily:20_04-daily-lts", # noqa: E501
"groovy": "Canonical:0001-com-ubuntu-server-groovy-daily:20_10-daily",
"hirsute": "Canonical:0001-com-ubuntu-server-hirsute-daily:21_04-daily", # noqa: E501
}
[docs] def __init__(
self, tag, timestamp_suffix=True, config_file: ConfigFile = None, *,
client_id=None, client_secret=None, subscription_id=None,
tenant_id=None, region=None
):
"""Initialize the connection to Azure.
Azure will try to read user credentials form the /home/$USER/.azure
folder. However, we can overwrite those credentials with the provided
id parameters.
Args:
tag: string used to name and tag resources with
timestamp_suffix: bool set True to append a timestamp suffix to the
tag
config_file: path to pycloudlib configuration file
client_id: user's client id
client_secret: user's client secret access key
subscription_id: user's subscription id key
tenant_id: user's tenant id key
region: The region where the instance will be created
"""
super().__init__(tag, timestamp_suffix, config_file)
self._log.debug('logging into Azure')
self.location = region or self.config.get('region') or 'centralus'
self.username = "ubuntu"
self.registered_instances = {}
self.registered_images = {}
config_dict = {}
client_id = client_id or self.config.get('client_id')
if client_id:
config_dict["clientId"] = client_id
client_secret = client_secret or self.config.get('client_secret')
if client_secret:
config_dict["clientSecret"] = client_secret
subscription_id = subscription_id or self.config.get('subscription_id')
if subscription_id:
config_dict["subscriptionId"] = subscription_id
tenant_id = tenant_id or self.config.get('tenant_id')
if tenant_id:
config_dict["tenantId"] = tenant_id
self.resource_client = util.get_client(
ResourceManagementClient, config_dict
)
self.network_client = util.get_client(
NetworkManagementClient, config_dict
)
self.compute_client = util.get_client(
ComputeManagementClient, config_dict
)
self.resource_group = self._create_resource_group()
self.base_tag = tag
[docs] def image_serial(self, image_id):
"""Find the image serial of the latest daily image for a particular release.
Args:
image_id: string, Ubuntu image id
Returns:
string, serial of latest image
"""
raise NotImplementedError
def _create_network_security_group(self, inbound_ports):
"""Create a network security group.
This method creates a network security groups that allows the user
to ssh into the machine and execute commands.
Args:
inbound_ports: List of strings, optional inbound ports
to enable in the instance.
Returns:
The network security object created by Azure
"""
if not inbound_ports:
inbound_ports = []
# We need to guarantee that the 22 port is enabled
# here, otherwise we will not be able to ssh into it
if "22" not in inbound_ports:
inbound_ports = ["22"] + inbound_ports
security_group_name = "{}-sgn".format(self.tag)
nsg_group = self.network_client.network_security_groups
self._log.debug('Creating Azure network security group')
security_rules = []
# The lower the number, the higher is the priority of the rule.
# We are assuming here that the SSH rule will be the first item
# in the list
priority = 300
for port in inbound_ports:
security_rules.append(
{
"name": "port-{}".format(port),
"priority": priority,
"protocol": "TCP",
"access": "Allow",
"direction": "Inbound",
"sourceAddressPrefix": "*",
"sourcePortRange": "*",
"destinationAddressPrefix": "*",
"destinationPortRange": port
}
)
priority += 10
nsg_call = nsg_group.create_or_update(
resource_group_name=self.resource_group.name,
network_security_group_name=security_group_name,
parameters={
"location": self.location,
"security_rules": security_rules,
}
)
return nsg_call.result()
def _create_resource_group(self):
"""Create a resource group.
This method creates an Azure resource group. Every other component that
we create will be contained into this resource group. This means that
if we delete this resource group, we delete all resources associated
with it.
Returns:
The resource group created by Azure
"""
resource_name = "{}-rg".format(self.tag)
self._log.debug('Creating Azure resource group')
return self.resource_client.resource_groups.create_or_update(
resource_name,
{
"location": self.location,
"tags": {
"name": self.tag
}
}
)
def _create_virtual_network(self, address_prefixes=None):
"""Create a virtual network.
This method creates an Azure virtual network to be used
when provisioning a subnet.
Args:
address_prefixes: list of strings, A list of address prefixes
to be used in this virtual network.
Returns:
The virtual network created by Azure
"""
if address_prefixes is None:
address_prefixes = ["10.0.0.0/16"]
virtual_network_name = "{}-vnet".format(self.tag)
self._log.debug('Creating Azure virtual network')
network_call = self.network_client.virtual_networks.create_or_update(
self.resource_group.name,
virtual_network_name,
{
"location": self.location,
"address_space": {
"address_prefixes": address_prefixes
},
"tags": {
"name": self.tag
}
}
)
return network_call.result()
def _create_subnet(self, vnet_name, address_prefix="10.0.0.0/24"):
"""Create a subnet.
This method creates an Azure subnet to be used when
provisioning a network interface.
Args:
address_prefix: string, An address prefix to be used for
this subnet.
Returns:
The subnet created by Azure
"""
subnet_name = "{}-subnet".format(self.tag)
self._log.debug('Creating Azure subnet')
subnet_call = self.network_client.subnets.create_or_update(
self.resource_group.name,
vnet_name,
subnet_name,
{
"address_prefix": address_prefix,
"tags": {
"name": self.tag
}
}
)
return subnet_call.result()
def _create_ip_address(self):
"""Create an ip address.
This method creates an Azure ip address to be used when
provisioning a network interface
Returns:
The ip address created by Azure
"""
ip_name = "{}-ip".format(self.tag)
self._log.debug('Creating Azure ip address')
ip_call = self.network_client.public_ip_addresses.create_or_update(
self.resource_group.name,
ip_name,
{
"location": self.location,
"sku": {"name": "Standard"},
"public_ip_allocation_method": "Static",
"rpublic_ip_address_version": "IPV4",
"tags": {
"name": self.tag
}
}
)
return ip_call.result()
def _create_network_interface_client(self, ip_address_id, subnet_id,
nsg_id):
"""Create a network interface client.
This method creates an Azure network interface to be used when
provisioning a virtual machine
Args:
ip_address_id: string, The ip address id
subnet_id: string, the subnet id
nsg_id: string, the network security group id
Returns:
The ip address created by Azure
"""
nic_name = "{}-nic".format(self.tag)
ip_config_name = "{}-ip-config".format(self.tag)
self._log.debug('Creating Azure network interface')
nic_call = self.network_client.network_interfaces.create_or_update(
self.resource_group.name,
nic_name,
{
"location": self.location,
"ip_configurations": [
{
"name": ip_config_name,
"subnet": {
"id": subnet_id
},
"public_ip_address": {
"id": ip_address_id
}
}
],
"network_security_group": {
"id": nsg_id
},
"tags": {
"name": self.tag
}
}
)
return nic_call.result()
def _create_vm_parameters(self, name, image_id, instance_type, nic_id,
user_data):
"""Create the virtual machine parameters to be used for provision.
Composes the dict that will be used to provision an Azure virtual
machine. We check if the user has passed user_data and the type of
image_id we are receiving, which can be snapshots ids or not.
Args:
name: string, The name of the virtual machine.
image_id: string, The identifier of an image.
instance_type: string, Type of instance to create.
nic_id: string, The network interface id.
user_data: string, The user data to be passed to the
virtual machine.
Returns:
A dict containing the parameters to provision a virtual machine.
"""
vm_parameters = {
"location": self.location,
"hardware_profile": {
"vm_size": instance_type
},
"storage_profile": {
"image_reference": {}
},
"os_profile": {
"computer_name": name,
"admin_username": self.username,
"linux_configuration": {
"ssh": {
"public_keys": [
{
"path": "/home/{}/.ssh/authorized_keys".format(
self.username),
"key_data": self.key_pair.public_key_content
}
]
},
"disable_password_authentication": True
}
},
"network_profile": {
"network_interfaces": [{
"id": nic_id,
}]
},
"tags": {
"name": self.tag
}
}
if user_data:
# We need to encode the user_data into base64 before sending
# it to the virtual machine.
vm_parameters["os_profile"]["custom_data"] = base64.b64encode(
user_data.encode()).decode()
vm_parameters["storage_profile"][
"image_reference"] = util.get_image_reference_params(image_id)
# We can have pro images from two different sources; marketplaces
# and snapshots. A snapshot image does not have the necessary metadata
# encoded in the image_id to create the 'plan' dict. In this case,
# we get the necessary info from the registered_images dict
# where we store the required metadata about any snapshot created by
# pycloudlib.
registered_image = self.registered_images.get(image_id)
if util.is_pro_image(image_id, registered_image):
vm_parameters["plan"] = util.get_plan_params(
image_id, registered_image)
return vm_parameters
def _create_virtual_machine(
self, image_id, instance_type, nic_id, user_data, name, vm_params=None
):
"""Create a virtual machine.
This method provisions an Azure virtual machine for the image_id
provided by the user.
Args:
image_id: string, The image to be used when provisiong
a virtual machine.
instance_type: string, Type of instance to create
nic_id: string, The network interface to used for this
virtual machine.
user_data: string, user data used by cloud-init when
booting the virtual machine.
name: string, optional name to provide when creating the vm.
vm_params: dict containing values as vm_params to send to
virtual_machines.create_or_update.
Returns:
The virtual machine created by Azure
"""
if not name:
name = "{}-vm".format(self.tag)
params = self._create_vm_parameters(
name, image_id, instance_type, nic_id, user_data
)
if vm_params:
update_nested(params, vm_params)
self._log.debug('Creating Azure virtual machine: %s', name)
vm_call = self.compute_client.virtual_machines.create_or_update(
self.resource_group.name,
name,
params,
)
return vm_call.result()
[docs] def delete_image(self, image_id):
"""Delete an image from Azure.
Args:
image_id: string, The id of the image to be deleted
"""
image_name = util.get_resource_name_from_id(image_id)
resource_group_name = util.get_resource_group_name_from_id(image_id)
delete = self.compute_client.images.delete(
resource_group_name=resource_group_name,
image_name=image_name
)
delete_resp = delete._response # pylint: disable=protected-access
resp_code = delete_resp.status_code
if resp_code in (200, 202):
self._log.debug('Image %s was deleted', image_id)
del self.registered_images[image_id]
else:
self._log.debug(
'Error deleting %s. Request returned %d',
image_id, resp_code
)
[docs] def released_image(self, release):
"""Get the released image.
With the way we are indexing our images, it is hard to differentiate
between daily and released images, since we would need to have the
version of the image to properly provision it. Due to that limitation
we are just calling the daily images method here.
Args:
release: string, Ubuntu release to look for
Returns:
string, id of latest image
"""
return self.daily_image(release)
[docs] def daily_image(self, release):
"""Find the image info for the latest daily image for a given release.
Args:
release: string, Ubuntu release to look for.
Returns:
A string representing an Ubuntu image
"""
self._log.debug('finding daily Ubuntu image for %s', release)
release = self.UBUNTU_RELEASE.get(release)
if release is None:
msg = "No Ubuntu release image found for {}. Expected one of: {}"
raise ValueError(
msg.format(release, ' '.join(self.UBUNTU_RELEASE.keys()))
)
return release
def _check_for_network_interfaces(self):
"""
Check for existing networking interfaces in instance resource group.
Check if we already have a network interface that is not attached to
any virtual machines in the instance resource group. If we have one
of those reoources, we just return it.
Returns:
An Azure network interface resource
"""
all_nics = self.network_client.network_interfaces.list(
resource_group_name=self.resource_group.name
)
for nic in all_nics:
if nic.virtual_machine is None:
return nic
return None
[docs] def launch(self, image_id, instance_type='Standard_DS1_v2',
user_data=None, wait=True, name=None,
inbound_ports=None, **kwargs):
"""Launch virtual machine on Azure.
Args:
image_id: string, Ubuntu image to use
user_data: string, user-data to pass to virtual machine
wait: boolean, wait for instance to come up
name: string, optional name to give the vm when launching.
Default results in a name of <tag>-vm
inbound_ports: List of strings, optional inbound ports
to enable in the instance.
kwargs: dict, other named arguments to provide to
virtual_machines.create_or_update
Returns:
Azure Instance object
"""
# pylint: disable-msg=too-many-locals
self._log.debug(
'Launching Azure virtual machine: %s', image_id)
# For every new launch, we need to update the tag, since
# we are using it as a base for the name of all the
# resources we are creating.
self.tag = get_timestamped_tag(self.base_tag)
if self.resource_group is None:
self.resource_group = self._create_resource_group()
# We will not reuse existing network interfaces if we need to customize
# it to enable more ports. The rationale for is that we want to reuse
# those resources only if they are generic enough
nic = None
if not inbound_ports:
# Check if we already have an existing network interface that is
# not attached to a virtual machine. If we have, we will just
# use it
nic = self._check_for_network_interfaces()
if nic is None:
self._log.debug(
'Could not find a network interface. Creating one now'
)
virtual_network = self._create_virtual_network()
self._log.debug(
'Created virtual network with name: %s', virtual_network.name
)
subnet = self._create_subnet(vnet_name=virtual_network.name)
self._log.debug(
'Created subnet with name: %s', subnet.name
)
ip_address = self._create_ip_address()
ip_address_str = ip_address.ip_address
self._log.debug(
'Created ip address with name: %s', ip_address.name
)
network_security_group = self._create_network_security_group(
inbound_ports=inbound_ports
)
self._log.debug(
'Created network security group with name: %s',
network_security_group.name
)
nic = self._create_network_interface_client(
ip_address_id=ip_address.id,
subnet_id=subnet.id,
nsg_id=network_security_group.id
)
self._log.debug(
'Created network interface with name: %s',
nic.name
)
else:
ip_address_str = self._retrieve_ip_from_network_interface(
nic=nic)
self._log.debug(
'Found network interface: %s. Reusing it', nic.name
)
vm = self._create_virtual_machine(
image_id=image_id,
instance_type=instance_type,
nic_id=nic.id,
user_data=user_data,
name=name,
vm_params=kwargs.get('vm_params', None),
)
instance_info = {
"vm": vm,
"ip_address": ip_address_str,
"rg_name": self.resource_group.name
}
instance = AzureInstance(
key_pair=self.key_pair,
client=self.compute_client,
instance=instance_info
)
if wait:
instance.wait()
self.registered_instances[vm.name] = instance
return instance
def _create_ssh_resource(self, key_name):
"""Create a ssh resource.
This method creates an Azure ssh resource to be associated
with a resource group.
Args:
key_name: string, The name of the ssh resource.
"""
self.compute_client.ssh_public_keys.create(
self.resource_group.name,
key_name,
parameters={
"location": self.location,
"tags": {
"name": self.tag
}
}
)
[docs] def create_key_pair(self, key_name):
"""Create a pair of ssh keys.
This method creates an a pair of ssh keys in
the class resource group.
Args:
key_name: string, The name of the ssh resource.
"""
self._create_ssh_resource(key_name)
ssh_call = self.compute_client.ssh_public_keys.generate_key_pair(
resource_group_name=self.resource_group.name,
ssh_public_key_name=key_name)
# Azure's SDK returns multi-line DOS format for pubkeys.
# OpenSSH doesn't like this format and ignores it resulting in
# Unauthorized key errors. Issue: #88
return ssh_call.public_key.replace('\r\n', ''), ssh_call.private_key
[docs] def list_keys(self):
"""List all ssh keys in the class resource group."""
ssh_public_keys = self.compute_client.ssh_public_keys
return [
ssh.name
for ssh in ssh_public_keys.list_by_resource_group(
self.resource_group.name)
]
[docs] def delete_key(self, key_name):
"""Delete a ssh key from the class resource group.
Args:
key_name: string, The name of the ssh resource.
"""
ssh_public_keys = self.compute_client.ssh_public_keys
ssh_public_keys.delete(
resource_group_name=self.resource_group.name,
ssh_public_key_name=key_name
)
[docs] def use_key(self, public_key_path, private_key_path=None, name=None):
"""Use an existing already uploaded key.
Args:
public_key_path: path to the public key to upload
private_key_path: path to the private key to upload
name: name to reference key by
"""
if not name:
name = self.tag
super().use_key(public_key_path, private_key_path, name)
def _get_instances(self):
"""Return an iterable of Azure instances related to a subscription id.
Returns:
An list of azure virtual machine associated with the subscription
id
"""
return self.compute_client.virtual_machines.list_all()
def _retrieve_ip_from_network_interface(self, nic):
"""Retrieve the ip address associated with a network interface.
Args:
nic: An Azure network interface resource
Return:
A string representing the network interface ip address
"""
ip_address_id = nic.ip_configurations[0].public_ip_address.id
all_ips = self.network_client.public_ip_addresses.list_all()
for ip_address in all_ips:
if ip_address.id == ip_address_id:
return ip_address.ip_address
raise RuntimeError(
"""
Error locationg the ip address: {}.
This ip address was not found in this subscription.
"""
)
def _retrive_instance_ip(self, instance):
"""Retrieve public ip address of instance.
Args:
instance: An Azure Virtual Machine object
Returns:
A string represeting the instance ip_address
"""
# Right now, we are only supporting getting the ip address for
# virtual machines with only one network profile attached to it
nic_id = instance.network_profile.network_interfaces[0].id
all_nics = self.network_client.network_interfaces.list_all()
instance_nic = None
for nic in all_nics:
if nic.id == nic_id:
instance_nic = nic
if instance_nic is None:
raise RuntimeError(
"""
Error locationg the network interface: {}.
This network interface was not found in this subscription.
"""
)
return self._retrieve_ip_from_network_interface(
nic=instance_nic
)
[docs] def get_instance(self, instance_id, search_all=False):
"""Get an instance by id.
Args:
instance_id: string, The instance name to search by
search_all: boolean, Flag that indicates that if we should search
for the instance in the entire reach of the
subsctription id. If false, we will search only
in the resource group created by this instance.
Returns:
An instance object to use to manipulate the instance further.
"""
if search_all:
all_instances = self._get_instances()
for instance in all_instances:
if instance.name == instance_id:
ip_address = self._retrive_instance_ip(instance)
resource_group_name = util.get_resource_group_name_from_id(
instance.id
)
instance_info = {
"vm": instance,
"ip_address": ip_address,
"rg_name": resource_group_name
}
azure_instance = AzureInstance(
key_pair=self.key_pair,
client=self.compute_client,
instance=instance_info
)
self.registered_instances[instance.name] = azure_instance
return azure_instance
raise Exception(
"Could not locate the instance: {}".format(instance_id)
)
if instance_id in self.registered_instances:
instance = self.registered_instances[instance_id]
if instance.status == "deleted":
raise Exception(
"The image {} was already deleted".format(instance_id)
)
return instance
raise Exception(
"Could not find {}".format(instance_id)
)
[docs] def snapshot(self, instance, clean=True, delete_provisioned_user=True,
**kwargs):
"""Snapshot an instance and generate an image from it.
Args:
instance: Instance to snapshot
clean: Run instance clean method before taking snapshot
delete_provisioned_user: Deletes the last provisioned user
kwargs: Other named arguments specific to this implementation
Returns:
An image id string
"""
if clean:
instance.clean()
user = '+user' if delete_provisioned_user else ''
instance.execute("sudo waagent -deprovision{} -force".format(user))
instance.shutdown(wait=True)
instance.generalize()
self._log.debug(
'creating custom image from instance %s', instance.id
)
response = self.compute_client.images.create_or_update(
resource_group_name=self.resource_group.name,
image_name='%s-%s' % (self.tag, "image"),
parameters={
"location": self.location,
"source_virtual_machine": {
"id": instance.id
},
"tags": {
"name": self.tag,
"src-image-id": instance.image_id
}
}
)
image = response.result()
image_id = image.id
image_name = image.name
self.registered_images[image_id] = {
"name": image_name,
"sku": instance.sku,
"offer": instance.offer
}
return image_id
[docs] def delete_resource_group(self):
"""Delete a resource group."""
if self.resource_group:
self.resource_client.resource_groups.delete(
resource_group_name=self.resource_group.name
)
self.resource_group = None