Source code for

# pylint: disable=E1101
# This file is part of pycloudlib. See LICENSE file for license information.
"""OCI Cloud type."""

import base64
import json
import os
import re

import oci

from import BaseCloud
from pycloudlib.config import ConfigFile
from pycloudlib.oci.instance import OciInstance
from pycloudlib.oci.utils import wait_till_ready
from pycloudlib.util import UBUNTU_RELEASE_VERSION_MAP, subp

[docs]class OCI(BaseCloud): """OCI (Oracle) cloud class.""" _type = 'oci'
[docs] def __init__( self, tag, timestamp_suffix=True, config_file: ConfigFile = None, *, availability_domain=None, compartment_id=None, config_path=None ): """ Initialize the connection to OCI. OCI must be initialized on the CLI first: Args: tag: Name of instance timestamp_suffix: bool set True to append a timestamp suffix to the tag config_file: path to pycloudlib configuration file compartment_id: A compartment found at availability_domain: One of the availability domains from: 'oci iam availability-domain list' config_path: Path of OCI config file """ super().__init__(tag, timestamp_suffix, config_file) self.availability_domain = availability_domain or self.config[ 'availability_domain' ] compartment_id = compartment_id or self.config.get('compartment_id') if not compartment_id: command = ['oci', 'iam', 'compartment', 'get'] exception_text = ( "Could not obtain OCI compartment id. Has the CLI client been " "setup?\nCommand attempted: '{}'".format(' '.join(command)) ) try: result = subp(command, rcs=()) except FileNotFoundError as e: raise Exception(exception_text) from e if not result.ok: exception_text += '\nstdout: {}\nstderr: {}'.format( result.stdout, result.stderr) raise Exception(exception_text) compartment_id = json.loads(result.stdout)['data']['id'] self.compartment_id = compartment_id config_path = ( config_path or self.config.get('config_path') or '~/.oci/config' ) if not os.path.isfile(os.path.expanduser(config_path)): raise ValueError( '{} is not a valid config file. Pass a valid config ' 'file.'.format(config_path)) self.oci_config = oci.config.from_file(config_path) self._log.debug('Logging into OCI') self.compute_client = oci.core.ComputeClient(self.oci_config) self.network_client = oci.core.VirtualNetworkClient(self.oci_config)
[docs] def delete_image(self, image_id): """Delete an image. Args: image_id: string, id of the image to delete """ self.compute_client.delete_image(image_id)
[docs] def released_image(self, release, operating_system='Canonical Ubuntu'): """Get the released image. OCI just has periodic builds, so "released" and "daily" don't really make sense here. Just call the same code for both Args: release: string, Ubuntu release to look for operating_system: string, operating system to use Returns: string, id of latest image """ return self.daily_image(release, operating_system)
[docs] def daily_image(self, release, operating_system='Canonical Ubuntu'): """Get the daily image. OCI just has periodic builds, so "released" and "daily" don't really make sense here. Just call the same code for both. Should be equivalent to the cli call: oci compute image list \ --operating-system="Canonical Ubuntu" \ --operating-system-version="<xx.xx>" \ --sort-by='TIMECREATED' \ --sort-order='DESC' Args: release: string, Ubuntu release to look for operating_system: string, Operating system to use Returns: string, id of latest image """ if operating_system == 'Canonical Ubuntu': if not re.match(r'^\d{2}\.\d{2}$', release): # 18.04, 20.04, etc try: release = UBUNTU_RELEASE_VERSION_MAP[release] except KeyError as e: raise ValueError("Invalid release") from e # OCI likes to keep a few of each image around, so sort by # timestamp descending and grab the first (most recent) one image_response = self.compute_client.list_images( self.compartment_id, operating_system=operating_system, operating_system_version=release, sort_by='TIMECREATED', sort_order='DESC' ) matching_image = [ i for i in if 'aarch64' not in i.display_name and 'GPU' not in i.display_name ] image_id = matching_image[0].id return image_id
[docs] def image_serial(self, image_id): """Find the image serial of the latest daily image for a particular release. Args: image_id: string, Ubuntu image id Returns: string, serial of latest image """ raise NotImplementedError
[docs] def get_instance(self, instance_id): """Get an instance by id. Args: instance_id: Returns: An instance object to use to manipulate the instance further. """ try: self.compute_client.get_instance(instance_id) except oci.exceptions.ServiceError as e: raise Exception( "Unable to retrieve instance with id: {} . " "Is it a valid instance id?".format(instance_id)) from e return OciInstance( key_pair=self.key_pair, instance_id=instance_id, compartment_id=self.compartment_id, oci_config=self.oci_config, )
[docs] def launch(self, image_id, instance_type='VM.Standard2.1', user_data=None, wait=True, **kwargs): """Launch an instance. Args: image_id: string, image ID to use for the instance instance_type: string, type of instance to create. user_data: used by Cloud-Init to run custom scripts or provide custom Cloud-Init configuration wait: wait for instance to be live **kwargs: dictionary of other arguments to pass as LaunchInstanceDetails Returns: An instance object to use to manipulate the instance further. """ vcn_id = self.network_client.list_vcns( self.compartment_id).data[0].id subnet = self.network_client.list_subnets( self.compartment_id, vcn_id=vcn_id).data[0] subnet_id = metadata = { 'ssh_authorized_keys': self.key_pair.public_key_content, } if user_data: metadata['user_data'] = base64.b64encode( user_data.encode('utf8')).decode('ascii') instance_details = oci.core.models.LaunchInstanceDetails( display_name=self.tag, availability_domain=self.availability_domain, compartment_id=self.compartment_id, shape=instance_type, subnet_id=subnet_id, image_id=image_id, metadata=metadata, **kwargs ) instance_data = self.compute_client.launch_instance( instance_details).data instance = self.get_instance( if wait: wait_till_ready( func=self.compute_client.get_instance, current_data=instance_data, desired_state='RUNNING', ) instance.wait() return instance
[docs] def snapshot(self, instance, clean=True, name=None): """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot name: (Optional) Name of created image Returns: An image object """ if clean: instance.clean() image_details = { 'compartment_id': self.compartment_id, 'instance_id': instance.instance_id, } if name: image_details['display_name'] = name image_data = self.compute_client.create_image( oci.core.models.CreateImageDetails(**image_details) ).data image_data = wait_till_ready( func=self.compute_client.get_image, current_data=image_data, desired_state='AVAILABLE' ) return