# This file is part of pycloudlib. See LICENSE file for license information.
"""IBM Cloud type."""
import itertools
import re
from typing import List, Optional
from ibm_cloud_sdk_core import ApiException
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from ibm_platform_services import ResourceManagerV2
from ibm_vpc import VpcV1
from ibm_vpc.vpc_v1 import Image, ListImagesEnums
from pycloudlib.cloud import BaseCloud
from pycloudlib.config import ConfigFile
from pycloudlib.errors import InvalidTagNameError
from pycloudlib.ibm._util import get_first as _get_first
from pycloudlib.ibm._util import iter_resources as _iter_resources
from pycloudlib.ibm._util import wait_until as _wait_until
from pycloudlib.ibm.errors import IBMException
from pycloudlib.ibm.instance import VPC, IBMInstance
from pycloudlib.instance import BaseInstance
from pycloudlib.util import UBUNTU_RELEASE_VERSION_MAP
DEFAULT_RESOURCE_GROUP: str = "Default"
[docs]
class IBM(BaseCloud):
"""IBM Virtual Private Cloud Class."""
_type = "ibm"
[docs]
def __init__(
self,
tag: str,
timestamp_suffix: bool = True,
config_file: Optional[ConfigFile] = None,
*,
resource_group: Optional[str] = None,
vpc: Optional[str] = None,
api_key: Optional[str] = None,
region: Optional[str] = None,
zone: Optional[str] = None,
):
"""Initialize the connection to IBM VPC.
Args:
tag: string used to name and tag resources with
timestamp_suffix: Append a timestamped suffix to the tag string.
config_file: path to pycloudlib configuration file
"""
super().__init__(
tag,
timestamp_suffix,
config_file,
required_values=[resource_group, api_key, region],
)
self.created_vpcs: List[VPC] = []
self.created_keys: List[str] = []
self._resource_group = (
resource_group or self.config.get("resource_group") or DEFAULT_RESOURCE_GROUP
)
self._resource_group_id: Optional[str] = None
self.region = str(region or self.config.get("region")).lower()
zone = zone or self.config.get("zone") or f"{self.region}-1"
self.zone = str(zone).lower()
self._vpc_name = vpc or self.config.get("vpc")
self._vpc: Optional[VPC] = None
self._log.debug("logging into IBM")
api_key = api_key or self.config.get("api_key")
authenticator = IAMAuthenticator(api_key)
self.instance_counter = itertools.count(1)
# Note this pins API version to ibm-vpc 0.28.0 also in setup.cfg.
# If updating API version:
# 1. Check latest https://github.com/IBM/vpc-python-sdk/releases/
# 2. Update setup.cfg ibm-vpc < conditional
self._client = VpcV1(authenticator=authenticator, version="2025-04-22")
self._client.set_service_url(f"https://{self.region}.iaas.cloud.ibm.com/v1")
self._resource_manager_service = ResourceManagerV2(authenticator=authenticator)
self._floating_ip_substring = self.config.get("floating_ip_substring")
@property
def resource_group_id(self) -> str:
"""Resource Group ID used to create new things under."""
if self._resource_group_id is None:
self._resource_group_id = self._get_resource_group_id(self._resource_group)
if self._resource_group_id is None:
raise IBMException(f"Resource Group not found: {self._resource_group}")
return self._resource_group_id
@property
def vpc(self) -> VPC:
"""Virtual Private Cloud."""
if self._vpc is not None:
return self._vpc
kwargs = {
"client": self._client,
"resource_group_id": self.resource_group_id,
"region": self.region,
"zone": self.zone,
}
if self._vpc_name is not None:
self._vpc = VPC.from_existing(self.key_pair, name=self._vpc_name, **kwargs)
else:
self._vpc = VPC.from_default(self.key_pair, **kwargs)
return self._vpc
def _get_resource_group_id(self, name: Optional[str] = None) -> Optional[str]:
name = name or f"{self.tag}-rg"
result = self._resource_manager_service.list_resource_groups(name=name).get_result()
if result.get("resources"):
return result["resources"][0]["id"]
return None
[docs]
def delete_image(self, image_id: str, **kwargs):
"""Delete an image.
Args:
image_id: string, id of the image to delete
**kwargs: dictionary of other arguments to pass to delete_image
"""
try:
self._client.delete_image(image_id).get_result()
except ApiException as e:
if "does not exist" not in str(e):
raise
[docs]
def released_image(self, release, *, arch: str = "amd64", **kwargs):
"""ID of the latest released image for a particular release.
Args:
release: The release to look for
Returns:
A single string with the latest released image ID for the
specified release.
"""
list_images_kwargs = {
"start": None,
"limit": None,
"resource_group_id": None,
"name": None,
"visibility": ListImagesEnums.Visibility.PUBLIC.value,
}
version = UBUNTU_RELEASE_VERSION_MAP[release].replace(".", "-")
os_name = f"ubuntu-{version}-{arch}"
# Images are sorted by (created_at, id), thus we return the first
# one matching the criterion.
image = _get_first(
self._client.list_images,
resource_name="images",
filter_fn=lambda img: img["operating_system"]["name"] == os_name,
**list_images_kwargs,
)
if image is None:
raise ValueError(f"Image not found: {os_name}")
return image["id"]
[docs]
def daily_image(self, release: str, **kwargs) -> str:
"""ID of the latest daily image for a particular release.
Args:
release: The release to look for
Returns:
A single string with the latest daily image ID for the
specified release.
"""
self._log.info("There are no daily images in IBM Cloud. Using released image instead")
return self.released_image(release, **kwargs)
[docs]
def image_serial(self, image_id):
"""Find the image serial of the latest daily image for a particular release.
Args:
image_id: string, Ubuntu image id
Returns:
string, serial of latest image
"""
raise NotImplementedError("IBM Cloud does not contain Ubuntu daily images")
[docs]
def get_image_id_from_name(self, name: str) -> str:
"""
Get the id of the first image whose name contains the given name.
The name does not need to be an exact match, just a substring of
the image name.
Returns:
string, image ID
"""
image = _get_first(
self._client.list_images,
resource_name="images",
filter_fn=lambda image: name in image["name"],
)
if image is None:
raise IBMException(f"Image not found: {name}")
return image["id"]
[docs]
def get_instance(
self, instance_id: str, *, username: Optional[str] = None, **kwargs
) -> BaseInstance:
"""Get an instance by id.
Args:
instance_id: ID used identify the instance
username: username to use when connecting via SSH
Returns:
An instance object to use to manipulate the instance further.
"""
return IBMInstance.find_existing(
self.key_pair,
client=self._client,
instance_id=instance_id,
username=username,
)
[docs]
def get_or_create_vpc(self, name: str) -> VPC:
"""Get a VPC by name or create it if not found."""
args = (self.key_pair,)
kwargs = {
"client": self._client,
"name": name,
"resource_group_id": self.resource_group_id,
"zone": self.zone,
}
try:
return VPC.from_existing(*args, **kwargs)
except IBMException:
vpc = VPC.create(*args, **kwargs)
self.created_vpcs.append(vpc)
return vpc
[docs]
def launch(
self,
image_id: str,
instance_type: str = "bx2-2x8",
user_data=None,
*,
name: Optional[str] = None,
vpc: Optional[VPC] = None,
username: Optional[str] = None,
floating_ip_substring: Optional[str] = None,
**kwargs,
) -> BaseInstance:
"""Launch an instance.
Args:
image_id: string, image ID to use for the instance
instance_type: string, type of instance to create
user_data: used by cloud-init to run custom scripts/configuration
name: instance name
vpc: VPC to allocate the instance in. If not given, the instance
username: username to use when connecting via SSH
will be allocated in the zone's default VPC.
floating_ip_substring: use existing floating IP whose name
contains this substring. This floating IP will not be deleted
when the instance is deleted.
**kwargs: dictionary of other arguments to pass to launch
Returns:
An instance object to use to manipulate the instance further.
"""
if not image_id:
raise ValueError(f"{self._type} launch requires image_id param. Found: {image_id}")
vpc = vpc or self.vpc
name = name or f"{self.tag}-vm{next(self.instance_counter)}"
floating_ip_substring = floating_ip_substring or self._floating_ip_substring
raw_instance = IBMInstance.create_raw_instance(
client=self._client,
name=name,
image_id=image_id,
vpc=vpc,
instance_type=instance_type,
resource_group_id=self.resource_group_id,
zone=self.zone,
user_data=user_data,
key_id=self._get_or_create_key(),
**kwargs,
)
instance: IBMInstance = IBMInstance.from_raw_instance(
self.key_pair,
client=self._client,
instance=raw_instance,
username=username,
)
# add instance to cleanup list before attaching floating ip in case of error during attach
self.created_instances.append(instance)
instance.attach_floating_ip(floating_ip_substring=floating_ip_substring)
return instance
[docs]
def snapshot(self, instance: IBMInstance, clean: bool = True, **kwargs) -> str:
"""Snapshot an instance and generate an image from it.
Args:
instance: Instance to snapshot
clean: run instance clean method before taking snapshot
Returns:
An image id
"""
if clean:
instance.clean()
instance.shutdown()
self._log.debug("creating snapshot from instance %s", instance.id)
image_prototype = {
"name": f"{self.tag}-image",
"resource_group": {"id": self.resource_group_id},
"source_volume": {"id": instance.boot_volume_id},
}
snapshot_id = self._client.create_image(image_prototype).get_result()["id"]
timeout_seconds = 300
_wait_until(
lambda: self._client.get_image(snapshot_id).get_result()["status"]
== Image.StatusEnum.AVAILABLE.value,
timeout_seconds=timeout_seconds,
timeout_msg_fn=lambda: (
f"Snapshot not available after {timeout_seconds} seconds. Check IBM VPC console."
),
)
self.created_images.append(snapshot_id)
return snapshot_id
[docs]
def list_keys(self) -> List[str]:
"""List ssh key names present on the cloud for accessing instances.
Returns:
A list of strings of key pair names accessible to the cloud.
"""
return list(
_iter_resources(
self._client.list_keys,
resource_name="keys",
map_fn=lambda key: key["name"],
)
)
[docs]
def delete_key(self, name: str):
"""Delete SSH key by name."""
key = _get_first(
self._client.list_keys,
resource_name="keys",
filter_fn=lambda key: key["name"] == name,
)
if not key:
return
self._log.debug("Deleting SSH key: %s", name)
self._client.delete_key(key["id"])
def _get_or_create_key(self) -> str:
key = _get_first(
self._client.list_keys,
resource_name="keys",
filter_fn=lambda key: key["name"] == self.key_pair.name,
)
if key is not None:
return key["id"]
self._log.info("Creating SSH key: %s", self.key_pair.name)
key_id = self._client.create_key(
public_key=self.key_pair.public_key_content,
name=self.key_pair.name,
resource_group={"id": self.resource_group_id},
).get_result()["id"]
self.created_keys.append(key_id)
return key_id
# pylint: disable=broad-except
[docs]
def clean(self) -> List[Exception]:
"""Cleanup ALL artifacts associated with this Cloud instance.
Cleanup any cloud artifacts created at any time during this class's
existence. This includes all instances, snapshots, resources, etc.
"""
# Not cleaning up floating ips here because they're 1:1
# with an instance and get cleaned up by the instance
exceptions = super().clean()
for vpc in self.created_vpcs:
try:
vpc.delete()
except Exception as e:
exceptions.append(e)
for key_id in self.created_keys:
try:
self._client.delete_key(key_id)
except Exception as e:
exceptions.append(e)
return exceptions
@staticmethod
def _validate_tag(tag: str):
"""
Ensure that this tag is a valid name for cloud resources.
Rules:
- All letters must be lowercase
- Must be between 1 and 63 characters long
- Must not start or end with a hyphen
- Must be alphanumeric and hyphens only
- Must start with a letter
:param tag: tag to validate
:return: tag if it is valid
:raises InvalidTagNameError: if the tag is invalid
"""
rules_failed = []
# all letters must be lowercase
if any(c.isupper() for c in tag):
rules_failed.append("All letters must be lowercase")
# must be between 1 and 63 characters long
if len(tag) < 1 or len(tag) > 63:
rules_failed.append("Must be between 1 and 63 characters long")
# must not start or end with a hyphen
if tag and (tag[0] in ("-") or tag[-1] in ("-")):
rules_failed.append("Must not start or end with a hyphen")
# must be alphanumeric and hyphens only
if not re.match(r"^[a-z0-9-]*$", tag):
rules_failed.append("Must be alphanumeric and hyphens only")
# must start with a letter
if tag and not tag[0].isalpha():
rules_failed.append("Must start with a letter")
if rules_failed:
raise InvalidTagNameError(tag=tag, rules_failed=rules_failed)