# This file is part of pycloudlib. See LICENSE file for license information.
"""GCE Cloud type.
This is an initial implimentation of the GCE class. It enables
authentication into the cloud, finding an image, and launching an
instance. It however, does not allow any further actions from occuring.
"""
import logging
import os
import time
from itertools import count
import googleapiclient.discovery
from pycloudlib.cloud import BaseCloud
from pycloudlib.config import ConfigFile
from pycloudlib.gce.util import raise_on_error, get_credentials
from pycloudlib.gce.instance import GceInstance
from pycloudlib.util import subp
logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING)
[docs]class GCE(BaseCloud):
"""GCE Cloud Class."""
_type = 'gce'
[docs] def __init__(
self, tag, timestamp_suffix=True, config_file: ConfigFile = None, *,
credentials_path=None, project=None, region=None, zone=None,
service_account_email=None
):
"""Initialize the connection to GCE.
Args:
tag: string used to name and tag resources with
timestamp_suffix: bool set True to append a timestamp suffix to the
tag
config_file: path to pycloudlib configuration file
credentials_path: path to credentials file for GCE
project: GCE project
region: GCE region
zone: GCE zone
service_account_email: service account to bind launched
instances to
"""
super().__init__(tag, timestamp_suffix, config_file)
self._log.debug('logging into GCE')
self.credentials_path = ''
if credentials_path:
self.credentials_path = credentials_path
elif 'GOOGLE_APPLICATION_CREDENTIALS' in os.environ:
self.credentials_path = os.environ[
'GOOGLE_APPLICATION_CREDENTIALS']
elif 'credentials_path' in self.config:
self.credentials_path = self.config['credentials_path']
credentials = get_credentials(self.credentials_path)
if not project:
if 'project' in self.config:
project = self.config['project']
elif "GOOGLE_CLOUD_PROJECT" in os.environ:
project = os.environ["GOOGLE_CLOUD_PROJECT"]
else:
command = ['gcloud', 'config', 'get-value', 'project']
exception_text = (
"Could not obtain GCE project id. Set it in the "
"pycloudlib config or setup the gcloud cli."
)
try:
result = subp(command, rcs=())
except FileNotFoundError as e:
raise Exception(exception_text) from e
if not result.ok:
exception_text += '\nstdout: {}\nstderr: {}'.format(
result.stdout, result.stderr)
raise Exception(exception_text)
project = result.stdout
# disable cache_discovery due to:
# https://github.com/google/google-api-python-client/issues/299
self.compute = googleapiclient.discovery.build(
'compute',
'v1',
cache_discovery=False,
credentials=credentials,
)
region = region or self.config.get('region') or 'us-west2'
zone = zone or self.config.get('zone') or 'a'
self.project = project
self.region = region
self.zone = '%s-%s' % (region, zone)
self.instance_counter = count()
self.service_account_email = service_account_email or self.config.get(
'service_account_meail')
def _find_image(self, release, daily, arch='amd64'):
images = self._image_list(release, daily, arch)
image_id = images[0]['id']
return 'projects/ubuntu-os-cloud-devel/global/images/%s' % image_id
[docs] def released_image(self, release, arch='amd64'):
"""ID of the latest released image for a particular release.
Args:
release: The release to look for
arch: string, architecture to use
Returns:
A single string with the latest released image ID for the
specified release.
"""
return self.daily_image(release, arch)
[docs] def daily_image(self, release, arch='amd64'):
"""Find the id of the latest image for a particular release.
Args:
release: string, Ubuntu release to look for
arch: string, architecture to use
Returns:
string, path to latest daily image
"""
self._log.debug('finding daily Ubuntu image for %s', release)
return self._find_image(release, daily=True, arch=arch)
[docs] def image_serial(self, image_id):
"""Find the image serial of the latest daily image for a particular release.
Args:
image_id: string, Ubuntu image id
Returns:
string, serial of latest image
"""
raise NotImplementedError
[docs] def delete_image(self, image_id):
"""Delete an image.
Args:
image_id: string, id of the image to delete
"""
api_image_id = self.compute.images().get(
project=self.project,
image=os.path.basename(image_id)
).execute()['id']
response = self.compute.images().delete(
project=self.project,
image=api_image_id,
).execute()
raise_on_error(response)
[docs] def get_instance(self, instance_id, name=None):
"""Get an instance by id.
Args:
instance_id: The instance ID returned upon creation
Returns:
An instance object to use to manipulate the instance further.
"""
return GceInstance(self.key_pair, instance_id,
self.project, self.zone,
self.credentials_path, name=name)
[docs] def launch(self, image_id, instance_type='n1-standard-1', user_data=None,
wait=True, **kwargs):
"""Launch instance on GCE and print the IP address.
Args:
image_id: string, image ID for instance to use
instance_type: string, instance type to launch
user_data: string, user-data to pass to instance
wait: boolean, wait for instance to come up
kwargs: other named arguments to add to instance JSON
"""
instance_name = 'i{}-{}'.format(next(self.instance_counter), self.tag)
config = {
'name': instance_name,
'machineType': 'zones/%s/machineTypes/%s' % (
self.zone, instance_type
),
'disks': [{
'boot': True,
'autoDelete': True,
'initializeParams': {
'sourceImage': image_id,
}
}],
'networkInterfaces': [{
'network': 'global/networks/default',
'accessConfigs': [
{'type': 'ONE_TO_ONE_NAT', 'name': 'External NAT'}
]
}],
"metadata": {
"items": [{
"key": "ssh-keys",
"value": "ubuntu:%s" % self.key_pair.public_key_content,
}]
},
}
if self.service_account_email:
config["serviceAccounts"] = [
{
"email": self.service_account_email
}
]
if user_data:
user_metadata = {
'key': 'user-data',
'value': user_data
}
config['metadata']['items'].append(user_metadata)
operation = self.compute.instances().insert(
project=self.project,
zone=self.zone,
body=config
).execute()
raise_on_error(operation)
result = self.compute.instances().get(
project=self.project,
zone=self.zone,
instance=instance_name,
).execute()
raise_on_error(result)
instance = self.get_instance(result['id'], name=result['name'])
if wait:
self._wait_for_operation(operation, operation_type='zone')
instance.wait()
return instance
[docs] def snapshot(self, instance: GceInstance, clean=True, **kwargs):
"""Snapshot an instance and generate an image from it.
Args:
instance: Instance to snapshot
clean: run instance clean method before taking snapshot
Returns:
An image id
"""
response = self.compute.disks().list(
project=self.project, zone=self.zone
).execute()
instance_disks = [
disk for disk in response['items'] if disk['name'] == instance.name
]
if len(instance_disks) > 1:
raise Exception(
"Snapshotting an image with multiple disks not supported")
instance.shutdown()
snapshot_name = '{}-image'.format(instance.name)
operation = self.compute.images().insert(
project=self.project,
body={
'name': snapshot_name,
'sourceDisk': instance_disks[0]['selfLink'],
}
).execute()
raise_on_error(operation)
self._wait_for_operation(operation)
return 'projects/{}/global/images/{}'.format(
self.project, snapshot_name)
def _image_list(self, release, daily, arch='amd64'):
"""Find list of images with a filter.
Args:
release: string, Ubuntu release to look for
arch: string, architecture to use
Returns:
list of dictionaries of images
"""
filters = [
'arch=%s' % arch,
'endpoint=%s' % 'https://www.googleapis.com',
'region=%s' % self.region,
'release=%s' % release,
'virt=kvm'
]
return self._streams_query(filters, daily)
def _wait_for_operation(self, operation, operation_type='global',
sleep_seconds=300):
response = None
kwargs = {
'project': self.project,
'operation': operation['name']
}
if operation_type == 'zone':
kwargs['zone'] = self.zone
api = self.compute.zoneOperations()
else:
api = self.compute.globalOperations()
for _ in range(sleep_seconds):
try:
response = api.get(**kwargs).execute()
except ConnectionResetError:
# This exception is known to be raised by GCE every so often:
# https://github.com/canonical/pycloudlib/issues/101.
response = {
"status": "ConnectionResetError", "statusMessage": "n/a",
}
else:
if response['status'] == 'DONE':
break
time.sleep(1)
else:
raise Exception(
'Expected DONE state, but found {} after waiting {} seconds. '
'Check GCE console for more details. \n'
'Status message: {}'.format(
response['status'], sleep_seconds,
response['statusMessage']
)
)