Source code for pycloudlib.instance

# This file is part of pycloudlib. See LICENSE file for license information.
"""Base class for all instances to provide consistent set of functions."""

from abc import ABC, abstractmethod, abstractproperty
import logging
import time

import paramiko
from paramiko.ssh_exception import (
    AuthenticationException,
    BadHostKeyException,
    NoValidConnectionsError,
    PasswordRequiredException,
    SSHException
)

from pycloudlib.result import Result
from pycloudlib.util import shell_quote, shell_pack


[docs]class BaseInstance(ABC): """Base instance object.""" _type = 'base'
[docs] def __init__(self, key_pair): """Set up instance.""" self._log = logging.getLogger(__name__) self._ssh_client = None self._sftp_client = None self._tmp_count = 0 self.boot_timeout = 120 self.key_pair = key_pair self.port = '22' self.username = 'ubuntu' self.connect_timeout = 60 self.banner_timeout = 60
@property @abstractmethod def name(self): """Return instance name.""" raise NotImplementedError @abstractproperty def ip(self): """Return IP address of instance.""" raise NotImplementedError
[docs] def console_log(self): """Return the instance console log. Raises NotImplementedError if the cloud does not support fetching the console log for this instance. """ raise NotImplementedError
[docs] @abstractmethod def delete(self, wait=True): """Delete the instance. Args: wait: wait for instance to be deleted """ raise NotImplementedError
[docs] @abstractmethod def restart(self, wait=True, **kwargs): """Restart an instance.""" raise NotImplementedError
[docs] @abstractmethod def shutdown(self, wait=True, **kwargs): """Shutdown the instance. Args: wait: wait for the instance to shutdown """ raise NotImplementedError
[docs] @abstractmethod def start(self, wait=True): """Start the instance. Args: wait: wait for the instance to start. """ raise NotImplementedError
def _wait_for_instance_start(self): """Wait for the cloud instance to be up. Subclasses should implement this if their cloud provides a way of detecting when an instance has started through their API. """
[docs] def wait(self): """Wait for instance to be up and cloud-init to be complete.""" self._wait_for_instance_start() self._wait_for_execute() self._wait_for_cloudinit()
[docs] @abstractmethod def wait_for_delete(self): """Wait for instance to be deleted.""" raise NotImplementedError
[docs] @abstractmethod def wait_for_stop(self): """Wait for instance stop.""" raise NotImplementedError
[docs] def add_network_interface(self) -> str: """Add nic to running instance.""" raise NotImplementedError
[docs] def remove_network_interface(self, ip_address: str): """Remove nic from running instance.""" raise NotImplementedError
def __del__(self): """Cleanup of instance.""" if self._sftp_client: try: self._sftp_client.close() except SSHException: self._log.warning('Failed to close SFTP connection.') self._sftp_client = None if self._ssh_client: try: self._ssh_client.close() except SSHException: self._log.warning('Failed to close SSH connection.') self._ssh_client = None
[docs] def clean(self): """Clean an instance to make it look prestine. This will clean out specifically the cloud-init files and system logs. """ self.execute('sudo cloud-init clean --logs') self.execute('sudo rm -rf /var/log/syslog')
def _run_command(self, command, stdin): """Run command in the instance.""" return self._ssh(list(command), stdin=stdin)
[docs] def execute(self, command, stdin=None, description=None, *, use_sudo=False, **kwargs): """Execute command in instance, recording output, error and exit code. Assumes functional networking and execution with the target filesystem being available at /. Args: command: the command to execute as root inside the image. If command is a string, then it will be executed as: `['sh', '-c', command]` stdin: bytes content for standard in description: purpose of command use_sudo: boolean to run the command as sudo Returns: Result object Raises SSHException if there are any problem with the ssh connection """ if isinstance(command, str): command = ['sh', '-c', command] if use_sudo: command = ['sudo', '--'] + command self._log.info('executing: %s', shell_quote(command)) if description: self._log.debug(description) else: self._log.debug('executing: %s', shell_quote(command)) return self._run_command(command, stdin, **kwargs)
[docs] def install(self, packages): """Install specific packages. Args: packages: string or list of package(s) to install Returns: result from install """ if isinstance(packages, str): packages = packages.split(' ') self.execute(['sudo', 'apt-get', 'update']) return self.execute( [ 'DEBIAN_FRONTEND=noninteractive', 'sudo', 'apt-get', 'install', '--yes' ] + packages )
[docs] def pull_file(self, remote_path, local_path): """Copy file at 'remote_path', from instance to 'local_path'. Args: remote_path: path on remote instance local_path: local path Raises SSHException if there are any problem with the ssh connection """ self._log.debug('pulling file %s to %s', remote_path, local_path) sftp = self._sftp_connect() sftp.get(remote_path, local_path)
[docs] def push_file(self, local_path, remote_path): """Copy file at 'local_path' to instance at 'remote_path'. Args: local_path: local path remote_path: path on remote instance Raises SSHException if there are any problem with the ssh connection """ self._log.debug('pushing file %s to %s', local_path, remote_path) sftp = self._sftp_connect() sftp.put(local_path, remote_path)
[docs] def run_script(self, script, description=None): """Run script in target and return stdout. Args: script: script contents description: purpose of script Returns: result from script execution Raises SSHException if there are any problem with the ssh connection """ # Just write to a file, add execute, run it, then remove it. shblob = '; '.join(( 'set -e', 's="$1"', 'shift', 'cat > "$s"', 'trap "rm -f $s" EXIT', 'chmod +x "$s"', '"$s" "$@"')) return self.execute( ['sh', '-c', shblob, 'runscript', self._tmpfile()], stdin=script, description=description)
[docs] def update(self): """Run apt-get update/upgrade on instance. Returns: result from upgrade """ self.execute(['sudo', 'apt-get', 'update']) return self.execute([ 'DEBIAN_FRONTEND=noninteractive', 'sudo', 'apt-get', '--yes', 'upgrade' ])
def _ssh(self, command, stdin=None): """Run a command via SSH. Args: command: string or list of the command to run stdin: optional, values to be passed in Returns: tuple of stdout, stderr and the return code """ cmd = shell_pack(command) client = self._ssh_connect() try: fp_in, fp_out, fp_err = client.exec_command(cmd) except (ConnectionResetError, NoValidConnectionsError) as e: raise SSHException from e channel = fp_in.channel if stdin is not None: fp_in.write(stdin) fp_in.close() channel.shutdown_write() out = fp_out.read() err = fp_err.read() return_code = channel.recv_exit_status() out = '' if not out else out.rstrip().decode("utf-8") err = '' if not err else err.rstrip().decode("utf-8") return Result(out, err, return_code) def _ssh_connect(self): """Connect to instance via SSH.""" if self._ssh_client and self._ssh_client.get_transport().is_active(): return self._ssh_client logging.getLogger("paramiko").setLevel(logging.INFO) client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Paramiko can barf on valid keys when initializing this way, # so the check here is _only_ for checking if we have a # password protected keyfile. The filename is passed directly # when connecting try: paramiko.RSAKey.from_private_key_file( self.key_pair.private_key_path) except PasswordRequiredException: self._log.warning( "The specified key (%s) requires a passphrase. If you have not" " added this key to a running SSH agent, you will see failures" " to connect after a long timeout.", self.key_pair.private_key_path, ) except SSHException: pass try: client.connect( username=self.username, hostname=self.ip, port=int(self.port), timeout=self.connect_timeout, banner_timeout=self.banner_timeout, key_filename=self.key_pair.private_key_path, ) except (ConnectionRefusedError, AuthenticationException, BadHostKeyException, ConnectionResetError, SSHException, OSError) as e: raise SSHException from e self._ssh_client = client return client def _sftp_connect(self): """Connect to instance via SFTP.""" if (self._sftp_client and self._sftp_client.get_channel().get_transport().is_active()): return self._sftp_client logging.getLogger("paramiko").setLevel(logging.INFO) # _ssh_connect() implements the required retry logic. client = self._ssh_connect() sftpclient = client.open_sftp() self._sftp_client = sftpclient return sftpclient def _tmpfile(self): """Get a tmp file in the target. Returns: path to new file in target """ path = "/tmp/%s-%04d" % (type(self).__name__, self._tmp_count) self._tmp_count += 1 return path def _wait_for_execute(self): """Wait until we can execute a command in the instance.""" self._log.debug('_wait_for_execute to complete') test_instance_command = "whoami" # Wait 10 minutes before failing start = time.time() end = start + 600 while time.time() < end: try: result = self.execute(test_instance_command) if result.ok: return except SSHException: pass time.sleep(1) raise OSError( "{}\n{}".format( "Instance can't be reached after 10 minutes. ", "Failed to execute {} command".format( test_instance_command) ) ) def _wait_for_cloudinit(self): """Wait until cloud-init has finished.""" self._log.debug('_wait_for_cloudinit to complete') if self.execute(['which', 'systemctl']).ok: # We may have issues with cloud-init status early boot, so also # ensure our cloud-init.target is active as an extra layer of # protection against connecting before the system is ready for _ in range(300): if self.execute([ 'systemctl', 'is-active', 'cloud-init.target' ]).ok: break time.sleep(1) cmd = ["cloud-init", "status", "--wait", "--long"] self.execute(cmd, description='waiting for start')