Source code for pycloudlib.qemu.instance

# This file is part of pycloudlib. See LICENSE file for license information.
"""Instance class for QEMU."""

import asyncio
import logging
import shutil
import time
from pathlib import Path
from subprocess import Popen
from typing import Any, Dict, List, Optional

from qemu.qmp import QMPClient

from pycloudlib.errors import (
    CleanupError,
    MissingPrerequisiteError,
    PycloudlibTimeoutError,
)
from pycloudlib.instance import BaseInstance


[docs] class QmpConnection: """Stupid wrapper to handle asyncio."""
[docs] def __init__(self, qmp_socket: Path, log: logging.Logger): """Set up QMP connection. Args: qmp_socket: path to QMP socket log: logger to use """ self._log = log self.qmp = QMPClient() self.loop = asyncio.get_event_loop() self.loop.run_until_complete( asyncio.wait_for(self.qmp.connect(str(qmp_socket)), timeout=10) )
[docs] def execute(self, command: str, arguments: Optional[Dict[str, Any]] = None) -> Any: """Write data to QMP socket. Args: command: command to run arguments: arguments to pass to command Returns the response from QMP. """ return self.loop.run_until_complete(self.qmp.execute(command, arguments))
[docs] def disconnect(self): """Disconnect from QMP socket.""" return self.loop.run_until_complete(self.qmp.disconnect())
[docs] class QemuInstance(BaseInstance): """QEMU instance object.""" _type = "qemu"
[docs] def __init__( self, key_pair, *, instance_id: str, handle: Optional[Popen] = None, username: Optional[str] = None, ): """Set up instance. Args: key_pair: key pair to use for instance instance_id: ID identifying the instance, in the form <instance_path>::<port>::<telnet_port> handle: handle to qemu process username: username to use for ssh """ super().__init__(key_pair=key_pair, username=username) self.instance_path, self.port, self.telnet_port = instance_id.rsplit("::", 2) self.instance_id = instance_id self.handle = handle self.instance_dir: Path = Path(self.instance_id).parent self.qmp = self._setup_qmp(self.instance_dir)
def _setup_qmp(self, instance_dir): """Set up QMP connection. Args: instance_dir: directory containing instance files """ # If we don't get a socket file fairly quickly, something is wrong qmp_socket = None for _ in range(10): possible_qmp_socket = instance_dir / "qmp-socket" if possible_qmp_socket.exists(): qmp_socket = possible_qmp_socket break time.sleep(1) else: # Since our VM may still have started, we don't want to # force a cleanup here since it may still be useful for debugging self._log.error("Failed to find QMP socket. Instance likely in an unusable state") qmp = None if qmp_socket: try: qmp = QmpConnection(qmp_socket=qmp_socket, log=self._log) except AssertionError as e: self._log.error("QMP socket not working as expected: %s", str(e)) return qmp @property def id(self) -> str: """Return instance ID.""" return self.instance_id @property def name(self): """Return instance name.""" return self.id @property def ip(self): """Return IP address of instance.""" return "127.0.0.1"
[docs] def console_log(self): """Return the instance console log.""" return Path(self.instance_dir, "console.log").read_text(encoding="utf-8")
[docs] def delete(self, wait=True) -> List[Exception]: """Delete the instance. Args: wait: Ignored. Our 'quit' command is synchronous. """ if not self.instance_dir.exists(): # We've already cleaned up. Nothing left to do return [] errors = [] try: if self.qmp: self.qmp.execute("quit") self.qmp.disconnect() self.qmp = None elif self.handle: # No point in graceful shutdown. We want it gone self.handle.kill() else: raise CleanupError("No QMP connection or process handle. Manual cleanup required") except Exception as e: # pylint: disable=broad-except errors.append(e) try: shutil.rmtree(self.instance_dir) except Exception as e: # pylint: disable=broad-except errors.append(e) return errors
def _do_restart(self, **kwargs): """Restart the instance.""" self.shutdown(wait=True) self.start()
[docs] def shutdown(self, wait=True, **kwargs): """Shutdown the instance. Args: wait: wait for the instance to shutdown """ if self.qmp: if self.get_status() == "running": self.qmp.execute("system_powerdown") if wait: self.wait_for_stop() self.qmp.execute("system_reset") else: self._log.debug("Instance already shutdown.") else: self._log.warning("No QMP connection. Doing a soft shutdown") self.execute("shutdown now", use_sudo=True)
[docs] def start(self, wait=True): """Start the instance. Args: wait: wait for the instance to start. """ if not self.qmp: raise MissingPrerequisiteError("No QMP connection") self.qmp.execute("cont") if wait: self.wait()
def _wait_for_instance_start(self, **kwargs): """Wait for instance to be up.""" self.wait_till_status("running")
[docs] def wait_for_delete(self, **kwargs): """Not implemented as "quit" is executed synchronously."""
[docs] def wait_for_stop(self, **kwargs): """Wait for instance stop.""" self.wait_till_status("shutdown")
[docs] def get_status(self): """Get instance status.""" if not self.qmp: raise MissingPrerequisiteError("No QMP connection") return self.qmp.execute("query-status")["status"]
[docs] def wait_till_status(self, expected_status: str, timeout: int = 500): """Wait for instance to reach a certain status. Args: status: status to wait for timeout: timeout in seconds """ if not self.qmp: raise MissingPrerequisiteError("No QMP connection") start_time = time.time() while True: query_status = self.get_status() if query_status == expected_status: return if query_status == "prelaunch" and expected_status == "running": # It can take some time for the VM to be ready to run, # so retry here if we're not in expected state self.qmp.execute("cont") if time.time() - start_time > timeout: raise PycloudlibTimeoutError( f"Timed out waiting for instance to reach status {expected_status}" ) time.sleep(1)
[docs] def add_network_interface(self, **kwargs) -> str: """Add nic to running instance.""" raise NotImplementedError
[docs] def remove_network_interface(self, ip_address: str): """Remove nic from running instance.""" raise NotImplementedError