# This file is part of pycloudlib. See LICENSE file for license information.
"""Base class for all instances to provide consistent set of functions."""
from abc import ABC, abstractmethod, abstractproperty
import logging
import time
import paramiko
from paramiko.ssh_exception import (
AuthenticationException,
BadHostKeyException,
NoValidConnectionsError,
PasswordRequiredException,
SSHException
)
from pycloudlib.result import Result
from pycloudlib.util import shell_quote, shell_pack
[docs]class BaseInstance(ABC):
"""Base instance object."""
_type = 'base'
[docs] def __init__(self, key_pair):
"""Set up instance."""
self._log = logging.getLogger(__name__)
self._ssh_client = None
self._sftp_client = None
self._tmp_count = 0
self.boot_timeout = 120
self.key_pair = key_pair
self.port = '22'
self.username = 'ubuntu'
self.connect_timeout = 60
self.banner_timeout = 60
@property
@abstractmethod
def name(self):
"""Return instance name."""
raise NotImplementedError
@abstractproperty
def ip(self):
"""Return IP address of instance."""
raise NotImplementedError
[docs] def console_log(self):
"""Return the instance console log.
Raises NotImplementedError if the cloud does not support fetching the
console log for this instance.
"""
raise NotImplementedError
[docs] @abstractmethod
def delete(self, wait=True):
"""Delete the instance.
Args:
wait: wait for instance to be deleted
"""
raise NotImplementedError
[docs] @abstractmethod
def restart(self, wait=True, **kwargs):
"""Restart an instance."""
raise NotImplementedError
[docs] @abstractmethod
def shutdown(self, wait=True, **kwargs):
"""Shutdown the instance.
Args:
wait: wait for the instance to shutdown
"""
raise NotImplementedError
[docs] @abstractmethod
def start(self, wait=True):
"""Start the instance.
Args:
wait: wait for the instance to start.
"""
raise NotImplementedError
def _wait_for_instance_start(self):
"""Wait for the cloud instance to be up.
Subclasses should implement this if their cloud provides a way of
detecting when an instance has started through their API.
"""
[docs] def wait(self):
"""Wait for instance to be up and cloud-init to be complete."""
self._wait_for_instance_start()
self._wait_for_execute()
self._wait_for_cloudinit()
[docs] @abstractmethod
def wait_for_delete(self):
"""Wait for instance to be deleted."""
raise NotImplementedError
[docs] @abstractmethod
def wait_for_stop(self):
"""Wait for instance stop."""
raise NotImplementedError
[docs] def add_network_interface(self) -> str:
"""Add nic to running instance."""
raise NotImplementedError
[docs] def remove_network_interface(self, ip_address: str):
"""Remove nic from running instance."""
raise NotImplementedError
def __del__(self):
"""Cleanup of instance."""
if self._sftp_client:
try:
self._sftp_client.close()
except SSHException:
self._log.warning('Failed to close SFTP connection.')
self._sftp_client = None
if self._ssh_client:
try:
self._ssh_client.close()
except SSHException:
self._log.warning('Failed to close SSH connection.')
self._ssh_client = None
[docs] def clean(self):
"""Clean an instance to make it look prestine.
This will clean out specifically the cloud-init files and system logs.
"""
self.execute('sudo cloud-init clean --logs')
self.execute('sudo rm -rf /var/log/syslog')
def _run_command(self, command, stdin):
"""Run command in the instance."""
return self._ssh(list(command), stdin=stdin)
[docs] def execute(self, command, stdin=None, description=None, *, use_sudo=False,
**kwargs):
"""Execute command in instance, recording output, error and exit code.
Assumes functional networking and execution with the target filesystem
being available at /.
Args:
command: the command to execute as root inside the image. If
command is a string, then it will be executed as:
`['sh', '-c', command]`
stdin: bytes content for standard in
description: purpose of command
use_sudo: boolean to run the command as sudo
Returns:
Result object
Raises SSHException if there are any problem with the ssh connection
"""
if isinstance(command, str):
command = ['sh', '-c', command]
if use_sudo:
command = ['sudo', '--'] + command
self._log.info('executing: %s', shell_quote(command))
if description:
self._log.debug(description)
else:
self._log.debug('executing: %s', shell_quote(command))
return self._run_command(command, stdin, **kwargs)
[docs] def install(self, packages):
"""Install specific packages.
Args:
packages: string or list of package(s) to install
Returns:
result from install
"""
if isinstance(packages, str):
packages = packages.split(' ')
self.execute(['sudo', 'apt-get', 'update'])
return self.execute(
[
'DEBIAN_FRONTEND=noninteractive',
'sudo', 'apt-get', 'install', '--yes'
] + packages
)
[docs] def pull_file(self, remote_path, local_path):
"""Copy file at 'remote_path', from instance to 'local_path'.
Args:
remote_path: path on remote instance
local_path: local path
Raises SSHException if there are any problem with the ssh connection
"""
self._log.debug('pulling file %s to %s', remote_path, local_path)
sftp = self._sftp_connect()
sftp.get(remote_path, local_path)
[docs] def push_file(self, local_path, remote_path):
"""Copy file at 'local_path' to instance at 'remote_path'.
Args:
local_path: local path
remote_path: path on remote instance
Raises SSHException if there are any problem with the ssh connection
"""
self._log.debug('pushing file %s to %s', local_path, remote_path)
sftp = self._sftp_connect()
sftp.put(local_path, remote_path)
[docs] def run_script(self, script, description=None):
"""Run script in target and return stdout.
Args:
script: script contents
description: purpose of script
Returns:
result from script execution
Raises SSHException if there are any problem with the ssh connection
"""
# Just write to a file, add execute, run it, then remove it.
shblob = '; '.join((
'set -e',
's="$1"',
'shift',
'cat > "$s"',
'trap "rm -f $s" EXIT',
'chmod +x "$s"',
'"$s" "$@"'))
return self.execute(
['sh', '-c', shblob, 'runscript', self._tmpfile()],
stdin=script, description=description)
[docs] def update(self):
"""Run apt-get update/upgrade on instance.
Returns:
result from upgrade
"""
self.execute(['sudo', 'apt-get', 'update'])
return self.execute([
'DEBIAN_FRONTEND=noninteractive',
'sudo', 'apt-get', '--yes', 'upgrade'
])
def _ssh(self, command, stdin=None):
"""Run a command via SSH.
Args:
command: string or list of the command to run
stdin: optional, values to be passed in
Returns:
tuple of stdout, stderr and the return code
"""
cmd = shell_pack(command)
client = self._ssh_connect()
try:
fp_in, fp_out, fp_err = client.exec_command(cmd)
except (ConnectionResetError, NoValidConnectionsError) as e:
raise SSHException from e
channel = fp_in.channel
if stdin is not None:
fp_in.write(stdin)
fp_in.close()
channel.shutdown_write()
out = fp_out.read()
err = fp_err.read()
return_code = channel.recv_exit_status()
out = '' if not out else out.rstrip().decode("utf-8")
err = '' if not err else err.rstrip().decode("utf-8")
return Result(out, err, return_code)
def _ssh_connect(self):
"""Connect to instance via SSH."""
if self._ssh_client and self._ssh_client.get_transport().is_active():
return self._ssh_client
logging.getLogger("paramiko").setLevel(logging.INFO)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Paramiko can barf on valid keys when initializing this way,
# so the check here is _only_ for checking if we have a
# password protected keyfile. The filename is passed directly
# when connecting
try:
paramiko.RSAKey.from_private_key_file(
self.key_pair.private_key_path)
except PasswordRequiredException:
self._log.warning(
"The specified key (%s) requires a passphrase. If you have not"
" added this key to a running SSH agent, you will see failures"
" to connect after a long timeout.",
self.key_pair.private_key_path,
)
except SSHException:
pass
try:
client.connect(
username=self.username,
hostname=self.ip,
port=int(self.port),
timeout=self.connect_timeout,
banner_timeout=self.banner_timeout,
key_filename=self.key_pair.private_key_path,
)
except (ConnectionRefusedError, AuthenticationException,
BadHostKeyException, ConnectionResetError, SSHException,
OSError) as e:
raise SSHException from e
self._ssh_client = client
return client
def _sftp_connect(self):
"""Connect to instance via SFTP."""
if (self._sftp_client and
self._sftp_client.get_channel().get_transport().is_active()):
return self._sftp_client
logging.getLogger("paramiko").setLevel(logging.INFO)
# _ssh_connect() implements the required retry logic.
client = self._ssh_connect()
sftpclient = client.open_sftp()
self._sftp_client = sftpclient
return sftpclient
def _tmpfile(self):
"""Get a tmp file in the target.
Returns:
path to new file in target
"""
path = "/tmp/%s-%04d" % (type(self).__name__, self._tmp_count)
self._tmp_count += 1
return path
def _wait_for_execute(self):
"""Wait until we can execute a command in the instance."""
self._log.debug('_wait_for_execute to complete')
test_instance_command = "whoami"
# Wait 10 minutes before failing
start = time.time()
end = start + 600
while time.time() < end:
try:
result = self.execute(test_instance_command)
if result.ok:
return
except SSHException:
pass
time.sleep(1)
raise OSError(
"{}\n{}".format(
"Instance can't be reached after 10 minutes. ",
"Failed to execute {} command".format(
test_instance_command)
)
)
def _wait_for_cloudinit(self):
"""Wait until cloud-init has finished."""
self._log.debug('_wait_for_cloudinit to complete')
if self.execute(['which', 'systemctl']).ok:
# We may have issues with cloud-init status early boot, so also
# ensure our cloud-init.target is active as an extra layer of
# protection against connecting before the system is ready
for _ in range(300):
if self.execute([
'systemctl', 'is-active', 'cloud-init.target'
]).ok:
break
time.sleep(1)
cmd = ["cloud-init", "status", "--wait", "--long"]
self.execute(cmd, description='waiting for start')