Source code for pycloudlib.gce.cloud

# This file is part of pycloudlib. See LICENSE file for license information.
"""GCE Cloud type.

This is an initial implementation of the GCE class. It enables
authentication into the cloud, finding an image, and launching an
instance. It however, does not allow any further actions from occurring.
"""

import logging
import os
import time
from itertools import count
from typing import Any, MutableMapping, Optional

from google.api_core.exceptions import GoogleAPICallError
from google.api_core.extended_operation import ExtendedOperation
from google.cloud import compute_v1

from pycloudlib.cloud import BaseCloud, ImageType
from pycloudlib.config import ConfigFile
from pycloudlib.errors import (
    CloudSetupError,
    ImageNotFoundError,
    PycloudlibError,
)
from pycloudlib.gce.instance import GceInstance
from pycloudlib.gce.util import get_credentials, raise_on_error
from pycloudlib.util import UBUNTU_RELEASE_VERSION_MAP, subp

logging.getLogger("google.cloud").setLevel(logging.WARNING)


[docs] class GCE(BaseCloud): """GCE Cloud Class.""" _type = "gce"
[docs] def __init__( self, tag: str, timestamp_suffix: bool = True, config_file: Optional[ConfigFile] = None, *, credentials_path: Optional[str] = None, project: Optional[str] = None, region: Optional[str] = None, zone: Optional[str] = None, service_account_email: Optional[str] = None, ): """Initialize the connection to GCE. Args: tag: string used to name and tag resources with timestamp_suffix: bool set True to append a timestamp suffix to the tag config_file: path to pycloudlib configuration file credentials_path: path to credentials file for GCE project: GCE project region: GCE region zone: GCE zone service_account_email: service account to bind launched instances to """ super().__init__( tag, timestamp_suffix, config_file, required_values=[credentials_path, project], ) self._log.debug("logging into GCE") self.credentials_path = "" if credentials_path: self.credentials_path = credentials_path elif "GOOGLE_APPLICATION_CREDENTIALS" in os.environ: self.credentials_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] elif "credentials_path" in self.config: self.credentials_path = os.path.expandvars( os.path.expanduser(self.config["credentials_path"]) ) credentials = get_credentials(self.credentials_path) if not project: if "project" in self.config: project = self.config["project"] elif "GOOGLE_CLOUD_PROJECT" in os.environ: project = os.environ["GOOGLE_CLOUD_PROJECT"] else: command = ["gcloud", "config", "get-value", "project"] exception_text = ( "Could not obtain GCE project id. Set it in the " "pycloudlib config or setup the gcloud cli." ) try: result = subp(command, rcs=()) except FileNotFoundError as e: raise CloudSetupError(exception_text) from e if not result.ok: exception_text += "\nstdout: {}\nstderr: {}".format( result.stdout, result.stderr ) raise CloudSetupError(exception_text) project = result.stdout self._images_client = compute_v1.ImagesClient(credentials=credentials) self._disks_client = compute_v1.DisksClient(credentials=credentials) self._instances_client = compute_v1.InstancesClient(credentials=credentials) self._zone_operations_client = compute_v1.ZoneOperationsClient(credentials=credentials) self._global_operations_client = compute_v1.GlobalOperationsClient(credentials=credentials) region = region or self.config.get("region") or "us-west2" zone = zone or self.config.get("zone") or "a" self.project = project self.region = region self.zone = "%s-%s" % (region, zone) self.instance_counter = count() # Prefer service_account_email from pycloudlib.toml and fallback to # the credentials listed in GOOGLE_APPLICATION_CREDENTIALS otherwise self.service_account_email = ( service_account_email or self.config.get("service_account_email") or getattr(credentials, "service_account_email", None) )
[docs] def released_image(self, release, *, image_type: ImageType = ImageType.GENERIC, **kwargs): """ID of the latest released image for a particular release. Args: release: The release to look for Returns: A single string with the latest released image ID for the specified release. """ return self.daily_image(release=release, image_type=image_type)
def _get_project(self, image_type: ImageType): return ( "ubuntu-os-cloud-devel" if image_type in (ImageType.GENERIC, ImageType.MINIMAL) else "ubuntu-os-pro-cloud" ) def _get_name_filter(self, release: str, image_type: ImageType): if image_type == ImageType.GENERIC: return "daily-ubuntu-{}-{}-*".format( UBUNTU_RELEASE_VERSION_MAP[release].replace(".", ""), release ) if image_type == ImageType.MINIMAL: return "daily-ubuntu-minimal-{}-{}-*".format( UBUNTU_RELEASE_VERSION_MAP[release].replace(".", ""), release ) if image_type == ImageType.PRO: return "ubuntu-pro-{}-{}-*".format( UBUNTU_RELEASE_VERSION_MAP[release].replace(".", ""), release ) if image_type == ImageType.PRO_FIPS: return "ubuntu-pro-fips-{}-{}-*".format( UBUNTU_RELEASE_VERSION_MAP[release].replace(".", ""), release ) if image_type == ImageType.PRO_FIPS_UPDATES: return "ubuntu-pro-fips-updates-{}-{}-*".format( UBUNTU_RELEASE_VERSION_MAP[release].replace(".", ""), release ) raise ValueError("Invalid image_type: {}".format(image_type.value)) def _query_image_list(self, release: str, project: str, name_filter: str, arch: str): """Query full list of images. image list API docs: https://googleapis.github.io/google-api-python-client/docs/dyn/compute_v1.images.html#list The image list API doesn't allow filtering and sorting in one request so we need to do one of those locally. Filtering via the API results in fewer requests on average than sorting via the API. So we filter via the API and loop through all pages to get the full image list matching that filter. 500 is the maximum allowed page size Then we can sort locally and grab the latest image. Args: release: string, Ubuntu release to look for project: GCE project name_filter: name to filter with arch: images' architecture Returns: list of images matching the given filters """ filter_string = "(name={}) AND (architecture={})".format(name_filter, arch.upper()) # SPECIAL CASE # Google didn't start including architecture in image descriptions # until after xenial stopped getting published # All xenial images are x86_64, so: # 1. we can return early for non-x86_64 xenial queries # 2. for xenial + x86_64 we don't include the architecture in the # filter if release == "xenial": if arch != "x86_64": return [] filter_string = "name={}".format(name_filter) image_list = [] page_token = "" reqs = 0 while page_token is not None: try: image_list_request = compute_v1.ListImagesRequest( project=project, filter=filter_string, max_results=500, page_token=page_token, ) image_list_result = self._images_client.list(image_list_request) except GoogleAPICallError as e: raise_on_error(e) reqs += 1 image_list += image_list_result.items page_token = image_list_result.next_page_token if page_token == "": break self._log.debug( ("Fetched entire image list (%i results) matching '%s' in %i requests"), len(image_list), filter_string, reqs, ) return image_list
[docs] def daily_image( self, release: str, *, arch: str = "x86_64", image_type: ImageType = ImageType.GENERIC, **kwargs, ): """Find the id of the latest image for a particular release. Args: release: string, Ubuntu release to look for Returns: string, path to latest daily image """ self._log.debug( "finding daily Ubuntu image for arch: %s and release: %s", arch, release, ) project = self._get_project(image_type=image_type) name_filter = self._get_name_filter(release=release, image_type=image_type) image_list = self._query_image_list(release, project, name_filter, arch) if not image_list: msg = "Could not find {} image for arch: {} and release: {}".format( image_type.value, arch, release, ) self._log.warning(msg) raise ImageNotFoundError(image_type=image_type.value, arch=arch, release=release) image = sorted(image_list, key=lambda x: x.creation_timestamp)[-1] self._log.debug( 'Found image name "%s" for arch "%s"', image.name, arch, ) return "projects/{}/global/images/{}".format(project, image.id)
[docs] def image_serial(self, image_id): """Find the image serial of the latest daily image for a particular release. Args: image_id: string, Ubuntu image id Returns: string, serial of latest image """ raise NotImplementedError
[docs] def delete_image(self, image_id, **kwargs): """Delete an image. Args: image_id: string, id of the image to delete """ try: images_get_request = compute_v1.GetImageRequest( project=self.project, image=os.path.basename(image_id), ) api_image_id = str(self._images_client.get(images_get_request).id) except GoogleAPICallError as e: if "was not found" not in str(e): raise return try: delete_image_request = compute_v1.DeleteImageRequest( project=self.project, image=api_image_id, ) operation: ExtendedOperation = self._images_client.delete(delete_image_request) raise_on_error(operation) except GoogleAPICallError as e: raise_on_error(e)
[docs] def get_instance( self, instance_id, name=None, *, username: Optional[str] = None, **kwargs, ): """Get an instance by id. Args: instance_id: The instance ID returned upon creation username: username to use when connecting via SSH Returns: An instance object to use to manipulate the instance further. """ return GceInstance( self.key_pair, instance_id, self.project, self.zone, self.credentials_path, name=name, username=username, )
[docs] def launch( self, image_id, instance_type="n1-standard-1", user_data=None, *, username: Optional[str] = None, **kwargs, ): """Launch instance on GCE and print the IP address. Args: image_id: string, image ID for instance to use instance_type: string, instance type to launch user_data: string, user-data to pass to instance username: username to use when connecting via SSH kwargs: other named arguments to add to instance JSON Raises: ValueError on invalid image_id """ if not image_id: raise ValueError(f"{self._type} launch requires image_id param. Found: {image_id}") instance_name = "i{}-{}".format(next(self.instance_counter), self.tag) config: MutableMapping[str, Any] = { "name": instance_name, "machine_type": "zones/%s/machineTypes/%s" % (self.zone, instance_type), "disks": [ { "boot": True, "auto_delete": True, "initialize_params": { "source_image": image_id, }, } ], "network_interfaces": [ { "network": "global/networks/default", "access_configs": [{"type_": "ONE_TO_ONE_NAT", "name": "External NAT"}], } ], "metadata": { "items": [ { "key": "ssh-keys", "value": "ubuntu:%s" % self.key_pair.public_key_content, } ] }, } config.update(kwargs) if self.service_account_email: config["service_accounts"] = [{"email": self.service_account_email}] if user_data: user_metadata = {"key": "user-data", "value": user_data} config["metadata"]["items"].append(user_metadata) try: insert_instance_request = compute_v1.InsertInstanceRequest( project=self.project, zone=self.zone, instance_resource=config, ) operation: ExtendedOperation = self._instances_client.insert(insert_instance_request) raise_on_error(operation) except GoogleAPICallError as e: raise_on_error(e) try: instance_get_request = compute_v1.GetInstanceRequest( project=self.project, zone=self.zone, instance=instance_name, ) result = self._instances_client.get(instance_get_request) except GoogleAPICallError as e: raise_on_error(e) instance = self.get_instance(result.id, name=result.name, username=username) self.created_instances.append(instance) return instance
[docs] def snapshot(self, instance: GceInstance, clean=True, **kwargs): """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot Returns: An image id """ try: list_disks_request = compute_v1.ListDisksRequest( project=self.project, zone=self.zone, ) response = self._disks_client.list(list_disks_request) except GoogleAPICallError as e: raise_on_error(e) instance_disks = [disk for disk in response.items if disk.name == instance.name] if len(instance_disks) > 1: raise PycloudlibError("Snapshotting an image with multiple disks not supported") instance.shutdown() snapshot_name = "{}-image".format(instance.name) try: image_resource = compute_v1.Image( name=snapshot_name, source_disk=instance_disks[0].self_link, ) insert_image_request = compute_v1.InsertImageRequest( project=self.project, image_resource=image_resource, ) operation: ExtendedOperation = self._images_client.insert(insert_image_request) raise_on_error(operation) except GoogleAPICallError as e: raise_on_error(e) self._wait_for_operation(operation) image_id = "projects/{}/global/images/{}".format(self.project, snapshot_name) self.created_images.append(image_id) return image_id
def _wait_for_operation(self, operation, operation_type="global", sleep_seconds=300): if operation_type == "zone": api = self._zone_operations_client request = compute_v1.GetZoneOperationRequest( project=self.project, zone=self.zone, operation=operation.name ) else: api = self._global_operations_client request = compute_v1.GetGlobalOperationRequest( project=self.project, operation=operation.name ) for _ in range(sleep_seconds): try: response = api.get(request) except GoogleAPICallError as e: raise_on_error(e) else: if response.status == compute_v1.types.Operation.Status.DONE: break time.sleep(1) else: raise PycloudlibError( "Expected DONE state, but found {} after waiting {} seconds. " "Check GCE console for more details. \n".format(response.status, sleep_seconds) )