# This file is part of pycloudlib. See LICENSE file for license information.
# pylint: disable=too-many-public-methods
"""Base class for all instances to provide consistent set of functions."""
import logging
import time
from abc import ABC, abstractmethod
from contextlib import suppress
from typing import List, Optional
import paramiko
from paramiko.ssh_exception import (
AuthenticationException,
BadHostKeyException,
NoValidConnectionsError,
PasswordRequiredException,
SSHException,
)
from pycloudlib.errors import CleanupError, PycloudlibTimeoutError
from pycloudlib.result import Result
from pycloudlib.util import log_exception_list, shell_pack, shell_quote
[docs]
class BaseInstance(ABC):
"""Base instance object."""
_type = "base"
ready_timeout = 10 # Time to wait for instance to be ready after provisioning (in minutes)
[docs]
def __init__(self, key_pair, username: Optional[str] = None):
"""Set up instance."""
self._log = logging.getLogger(__name__)
self._ssh_client = None
self._sftp_client = None
self._tmp_count = 0
self.boot_timeout = 120
self.key_pair = key_pair
self.port = "22"
self.username = username or "ubuntu"
self.connect_timeout = 60
self.banner_timeout = 60
def __enter__(self):
"""Enter context manager for this class."""
return self
def __exit__(self, _type, _value, _traceback):
"""Exit context manager for this class."""
exceptions = self.delete()
log_exception_list(exceptions)
if exceptions:
raise CleanupError(exceptions)
@property
@abstractmethod
def id(self) -> str:
"""Return instance id."""
raise NotImplementedError
@property
def private_ip(self) -> str:
"""Return instance private ip."""
raise NotImplementedError
@property
@abstractmethod
def name(self):
"""Return instance name."""
raise NotImplementedError
@property
@abstractmethod
def ip(self):
"""Return IP address of instance."""
raise NotImplementedError
[docs]
def get_boot_id(self):
"""Get the instance boot_id.
Returns:
string with the boot UUID
"""
result = self.execute("cat /proc/sys/kernel/random/boot_id", no_log=True)
if result.failed:
raise OSError(
f"Failed to get boot_id. Return code: {result.return_code}, "
f"stdout: {result.stdout}, stderr: {result.stderr}"
)
return result
[docs]
def console_log(self):
"""Return the instance console log.
Raises NotImplementedError if the cloud does not support fetching the
console log for this instance.
"""
raise NotImplementedError
[docs]
@abstractmethod
def delete(self, wait=True) -> List[Exception]:
"""Delete the instance.
Args:
wait: wait for instance to be deleted
"""
raise NotImplementedError
[docs]
def restart(self, wait=True, **kwargs):
"""Restart an instance."""
self._sync_filesystem()
# If we're not waiting, just call subclass's restart and return.
if not wait:
self._do_restart(**kwargs)
return
pre_boot_id = None
# If we attempt to restart, but the instance is already in a
# non-connectable state, then don't check boot ids.
try:
pre_boot_id = self.get_boot_id()
except (SSHException, OSError):
# Case 2: wait=True, but the instance is unreachable.
# The best we can do is to send a reboot signal and wait.
self._log.debug("Instance seems down; will attempt restart and wait.")
self._do_restart()
self.wait()
return
self._log.debug("Pre-reboot boot_id: %s", pre_boot_id)
# The instance is reachable, so do the restart and wait for changed
# boot id
self._do_restart(**kwargs)
if wait:
self.wait_for_restart(old_boot_id=pre_boot_id)
@abstractmethod
def _do_restart(self, **kwargs):
raise NotImplementedError
[docs]
@abstractmethod
def shutdown(self, wait=True, **kwargs):
"""Shutdown the instance.
Args:
wait: wait for the instance to shutdown
"""
raise NotImplementedError
[docs]
@abstractmethod
def start(self, wait=True):
"""Start the instance.
Args:
wait: wait for the instance to start.
"""
raise NotImplementedError
def _wait_for_instance_start(self, **kwargs):
"""Wait for the cloud instance to be up.
Subclasses should implement this if their cloud provides a way of
detecting when an instance has started through their API.
"""
[docs]
def wait(
self,
ready_timeout: Optional[int] = None,
**kwargs,
):
"""
Wait for instance to be up and cloud-init to be complete.
Args:
ready_timeout (int): maximum time to wait for the instance to be ready for ssh after
instance provisioning is complete
Raises:
PycloudlibTimeoutError: If the instance is not ready after the timeout
"""
self._wait_for_instance_start(**kwargs)
self._wait_for_execute(timeout=ready_timeout)
self._wait_for_cloudinit()
[docs]
def wait_for_restart(self, old_boot_id):
"""Wait for instance to be restarted and cloud-init to be complete.
old_boot_id is the boot id prior to restart
"""
self._wait_for_instance_start()
self._wait_for_execute(old_boot_id=old_boot_id)
self._wait_for_cloudinit()
[docs]
@abstractmethod
def wait_for_delete(self, **kwargs):
"""Wait for instance to be deleted."""
raise NotImplementedError
[docs]
@abstractmethod
def wait_for_stop(self, **kwargs):
"""Wait for instance stop."""
raise NotImplementedError
[docs]
def add_network_interface(self, **kwargs) -> str:
"""Add nic to running instance."""
raise NotImplementedError
[docs]
def remove_network_interface(self, ip_address: str):
"""Remove nic from running instance."""
raise NotImplementedError
def __del__(self):
"""Cleanup of instance."""
if self._sftp_client:
try:
self._sftp_client.close()
except SSHException:
self._log.warning("Failed to close SFTP connection.")
self._sftp_client = None
if self._ssh_client:
try:
self._ssh_client.close()
except SSHException:
self._log.warning("Failed to close SSH connection.")
self._ssh_client = None
[docs]
def clean(self):
"""Clean an instance to make it look prestine.
This will clean out specifically the cloud-init files and system logs.
"""
# Note: revert this commit once bionic-pro images contain
# cloud-init >= v23.1 .
# We end up hitting LP: #1508766 on systemd == 237 (bionic) because
# the cloud-init's fix [1] for LP: #1999680 is not included on some
# bionic-pro cloud images.
#
# [1] https://github.com/canonical/cloud-init/commit/abfdf1d83995cc20e
self.execute("sudo cloud-init clean --logs")
self.execute("sudo echo 'uninitialized' > /etc/machine-id")
self.execute("sudo rm -rf /var/log/syslog")
def _run_command(self, command, stdin, get_pty=False):
"""Run command in the instance."""
return self._ssh(list(command), stdin=stdin, get_pty=get_pty)
[docs]
def execute(
self,
command,
stdin=None,
description=None,
*,
use_sudo=False,
no_log=False,
**kwargs,
):
"""Execute command in instance, recording output, error and exit code.
Assumes functional networking and execution with the target filesystem
being available at /.
Args:
command: the command to execute as root inside the image. If
command is a string, then it will be executed as:
`['sh', '-c', command]`
stdin: bytes content for standard in
description: purpose of command
use_sudo: boolean to run the command as sudo
Returns:
Result object
Raises SSHException if there are any problem with the ssh connection
"""
if isinstance(command, str):
command = ["sh", "-c", command]
if use_sudo:
command = ["sudo", "--"] + command
if not no_log:
self._log.info("executing: %s", shell_quote(command))
if description:
self._log.debug(description)
else:
self._log.debug("executing: %s", shell_quote(command))
return self._run_command(command, stdin, **kwargs)
[docs]
def install(self, packages):
"""Install specific packages.
Args:
packages: string or list of package(s) to install
Returns:
result from install
"""
if isinstance(packages, str):
packages = packages.split(" ")
self.execute(["sudo", "apt-get", "update"])
return self.execute(
[
"DEBIAN_FRONTEND=noninteractive",
"sudo",
"apt-get",
"install",
"--yes",
]
+ packages
)
[docs]
def pull_file(self, remote_path, local_path):
"""Copy file at 'remote_path', from instance to 'local_path'.
Args:
remote_path: path on remote instance
local_path: local path
Raises SSHException if there are any problem with the ssh connection
"""
self._log.debug("pulling file %s to %s", remote_path, local_path)
sftp = self._sftp_connect()
sftp.get(remote_path, local_path)
[docs]
def push_file(self, local_path, remote_path):
"""Copy file at 'local_path' to instance at 'remote_path'.
Args:
local_path: local path
remote_path: path on remote instance
Raises SSHException if there are any problem with the ssh connection
"""
self._log.debug("pushing file %s to %s", local_path, remote_path)
sftp = self._sftp_connect()
sftp.put(local_path, remote_path)
[docs]
def run_script(self, script, description=None):
"""Run script in target and return stdout.
Args:
script: script contents
description: purpose of script
Returns:
result from script execution
Raises SSHException if there are any problem with the ssh connection
"""
# Just write to a file, add execute, run it, then remove it.
shblob = "; ".join(
(
"set -e",
's="$1"',
"shift",
'cat > "$s"',
'trap "rm -f $s" EXIT',
'chmod +x "$s"',
'"$s" "$@"',
)
)
return self.execute(
["sh", "-c", shblob, "runscript", self._tmpfile()],
stdin=script,
description=description,
)
[docs]
def update(self):
"""Run apt-get update/upgrade on instance.
Returns:
result from upgrade
"""
self.execute(["sudo", "apt-get", "update"])
return self.execute(
[
"DEBIAN_FRONTEND=noninteractive",
"sudo",
"apt-get",
"--yes",
"upgrade",
]
)
def _ssh(self, command, stdin=None, get_pty=False):
"""Run a command via SSH.
Args:
command: string or list of the command to run
stdin: optional, values to be passed in
get_pty: optional, allocates a pty
Returns:
tuple of stdout, stderr and the return code
"""
cmd = shell_pack(command)
client = self._ssh_connect()
try:
fp_in, fp_out, fp_err = client.exec_command(cmd, get_pty=get_pty)
except (ConnectionResetError, NoValidConnectionsError, EOFError) as e:
raise SSHException from e
channel = fp_in.channel
if stdin is not None:
fp_in.write(stdin)
fp_in.close()
channel.shutdown_write()
out = fp_out.read()
err = fp_err.read()
return_code = channel.recv_exit_status()
out = "" if not out else out.rstrip().decode("utf-8")
err = "" if not err else err.rstrip().decode("utf-8")
return Result(out, err, return_code)
def _ssh_connect(self):
"""Connect to instance via SSH."""
if self._ssh_client and self._ssh_client.get_transport().is_active():
return self._ssh_client
logging.getLogger("paramiko").setLevel(logging.INFO)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Paramiko can barf on valid keys when initializing this way,
# so the check here is _only_ for checking if we have a
# password protected keyfile. The filename is passed directly
# when connecting
try:
paramiko.RSAKey.from_private_key_file(self.key_pair.private_key_path)
except PasswordRequiredException:
self._log.warning(
"The specified key (%s) requires a passphrase. If you have not"
" added this key to a running SSH agent, you will see failures"
" to connect after a long timeout.",
self.key_pair.private_key_path,
)
except SSHException:
pass
try:
client.connect(
username=self.username,
hostname=self.ip,
port=int(self.port),
timeout=self.connect_timeout,
banner_timeout=self.banner_timeout,
key_filename=self.key_pair.private_key_path,
)
except (
ConnectionRefusedError,
AuthenticationException,
BadHostKeyException,
ConnectionResetError,
SSHException,
OSError,
) as e:
error_msg = f"{type(e).__name__}: {str(e)}"
raise SSHException(error_msg) from e
self._ssh_client = client
return client
def _sftp_connect(self):
"""Connect to instance via SFTP."""
if self._sftp_client and self._sftp_client.get_channel().get_transport().is_active():
return self._sftp_client
logging.getLogger("paramiko").setLevel(logging.INFO)
# _ssh_connect() implements the required retry logic.
client = self._ssh_connect()
sftpclient = client.open_sftp()
self._sftp_client = sftpclient
return sftpclient
def _tmpfile(self):
"""Get a tmp file in the target.
Returns:
path to new file in target
"""
path = "/tmp/%s-%04d" % (type(self).__name__, self._tmp_count)
self._tmp_count += 1
return path
def _wait_for_execute(self, old_boot_id=None, timeout: Optional[int] = None):
"""
Wait until we can execute a command in the instance.
Args:
old_boot_id: boot id before restart. If provided, we will wait
until we find a new boot id
timeout: time to wait for instance to be reachable in minutes
Raises:
PycloudlibTimeoutError: if instance can't be reached after timeout
"""
self._log.info("_wait_for_execute to complete")
timeout = timeout or self.ready_timeout
start = time.time()
end = start + timeout * 60
while time.time() < end:
try:
boot_id = self.get_boot_id()
if not old_boot_id or boot_id != old_boot_id:
return
except (SSHException, OSError) as e:
self._log.debug("Failed to obtain new boot id: %s", e)
time.sleep(1)
raise PycloudlibTimeoutError(
f"Instance can't be reached after {timeout} minutes. Failed to obtain new boot id",
)
def _wait_for_cloudinit(self):
"""Wait until cloud-init has finished."""
self._log.info("_wait_for_cloudinit to complete")
if self.execute("command -v systemctl").ok:
# We may have issues with cloud-init status early boot, so also
# ensure our cloud-init.target is active as an extra layer of
# protection against connecting before the system is ready
for _ in range(300):
with suppress(SSHException):
if self.execute(
["systemctl", "is-active", "cloud-init.target"],
no_log=True,
).ok:
break
time.sleep(1)
cmd = ["cloud-init", "status", "--wait", "--long"]
self.execute(cmd, description="waiting for start")
def _sync_filesystem(self):
"""Sync the filesystem before powering down."""
with suppress(SSHException):
self.execute("sync")