# This file is part of pycloudlib. See LICENSE file for license information.
"""Helpers for shell string and processing."""
import base64
import collections.abc
import datetime
from errno import ENOENT
import platform
import os
import re
import shlex
import subprocess
import tempfile
from pycloudlib.result import Result
UBUNTU_RELEASE_VERSION_MAP = {
'focal': '20.04',
'bionic': '18.04',
'xenial': '16.04',
}
[docs]def chmod(path, mode):
"""Run chmod on a file or directory.
Args:
path: string of path to run on
mode: int of mode to apply
"""
real_mode = _safe_int(mode)
if path and real_mode:
os.chmod(path, real_mode)
[docs]def is_writable_dir(path):
"""Make sure dir is writable.
Args:
path: path to determine if writable
Returns:
boolean with result
"""
try:
touch(path)
os.remove(tempfile.mkstemp(dir=os.path.abspath(path))[1])
except (IOError, OSError):
return False
return True
[docs]def mkdtemp(prefix='pycloudlib'):
"""Make a temporary directory.
Args:
prefix: optional, temproary dir name prefix (default: pycloudlib)
Returns:
tempfile object that was created
"""
return tempfile.mkdtemp(prefix=prefix)
[docs]def rmfile(path):
"""Delete a file.
Args:
path: run unlink on specific path
"""
try:
os.unlink(path)
except OSError as error:
if error.errno != ENOENT:
raise error
[docs]def shell_pack(cmd):
"""Return a string that can shuffled through 'sh' and execute cmd.
In Python subprocess terms:
check_output(cmd) == check_output(shell_pack(cmd), shell=True)
Args:
cmd: list or string of command to pack up
Returns:
base64 encoded string
"""
if isinstance(cmd, str):
cmd = [cmd]
else:
cmd = list(cmd)
stuffed = shell_safe(cmd)
# for whatever reason b64encode returns bytes when it is clearly
# representable as a string by nature of being base64 encoded.
b64 = base64.b64encode(stuffed.encode()).decode()
return 'eval set -- "$(echo %s | base64 --decode)" && exec "$@"' % b64
[docs]def shell_quote(cmd):
"""Quote a shell string.
Args:
cmd: command to quote
Returns:
quoted string
"""
if isinstance(cmd, (tuple, list)):
return ' '.join([shlex.quote(x) for x in cmd])
return shlex.quote(cmd)
[docs]def shell_safe(cmd):
"""Produce string safe shell string.
Create a string that can be passed to $(set -- <string>) to produce
the same array that cmd represents.
Internally we utilize 'getopt's ability/knowledge on how to quote
strings to be safe for shell. This implementation could be changed
to be pure python. It is just a matter of correctly escaping
or quoting characters like: ' " ^ & $ ; ( ) ...
Args:
cmd: command as a list
Returns:
shell safe string
"""
out = subprocess.check_output(
["getopt", "--shell", "sh", "--options", "", "--", "--"] + list(cmd))
# out contains ' -- <data>\n'. drop the ' -- ' and the '\n'
return out.decode()[4:-1]
[docs]def subp(args, data=None, env=None, shell=False, rcs=(0,),
shortcircuit_stdin=True):
"""Subprocess wrapper.
Args:
args: command to run
data: data to pass
env: optional env to use
shell: optional shell to use
rcs: tuple of successful exit codes, default: (0)
shortcircuit_stdin: bind stdin to /dev/null if no data is given
Returns:
Tuple of out, err, return_code
"""
devnull_fp = None
if data is not None:
stdin = subprocess.PIPE
if not isinstance(data, bytes):
data = data.encode()
elif shortcircuit_stdin:
# using devnull assures any reads get null, rather
# than possibly waiting on input.
devnull_fp = open(os.devnull)
stdin = devnull_fp
else:
stdin = None
bytes_args = _convert_args(args)
try:
process = subprocess.Popen(
bytes_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=stdin, env=env, shell=shell
)
(out, err) = process.communicate(data)
finally:
if devnull_fp:
devnull_fp.close()
rc = process.returncode
out = '' if not out else out.rstrip().decode("utf-8")
err = '' if not err else err.rstrip().decode("utf-8")
if rcs and rc not in rcs:
if err:
errmsg = err
elif out:
errmsg = out
else:
errmsg = "command failed silently"
errmsg = "Failure (rc=%s): %s" % (rc, errmsg)
raise RuntimeError(errmsg)
return Result(out, err, rc)
[docs]def touch(path, mode=None):
"""Ensure a directory exists with a specific mode, it not create it.
Args:
path: path to directory to create
mode: optional, mode to set directory to
"""
if not os.path.isdir(path):
with os.path.dirname(path):
os.makedirs(path)
chmod(path, mode)
else:
chmod(path, mode)
def _get_local_ubuntu_arch():
"""Return the Ubuntu architecture suitable for the local system.
This is not simply the local machine hardware name, as in some cases it
differs from the Ubuntu architecture name. The most common case is x86_64
hardware, for which the Ubuntu architecture name is 'amd64'. This function
implements the required mapping.
On Debian and Ubuntu systmes the full mapping between the GNU architecture
names and the Ubuntu (Debian) architecture names is available in the
'/usr/share/dpkg/cputable' file, however the GNU architecture names are
again different from the machine hardware names from e.g. uname(1) or
os.uname(). The full mapping is available in the 'config.guess' script from
the GNU autotools, and it's complex. Let's keep it simple here, mapping
only what is relevant for Ubuntu.
"""
arch_map = dict(
i686='i386',
x86_64='amd64',
aarch64='arm64',
ppc='powerpc',
ppc64el='ppc64el',
ppcle='powerpcel'
)
local_arch = platform.machine()
local_ubuntu_arch = arch_map.get(local_arch, local_arch)
return local_ubuntu_arch
def _convert_args(args):
"""Convert subp arguments to bytes.
Popen converts entries in the arguments array from non-bytes to bytes.
When locale is unset it may use ascii for that encoding which can
cause UnicodeDecodeErrors. (LP: #1751051)
Args:
args: string, bytes, or list of arguments to convert to bytes
Returns:
byte argument list
"""
if isinstance(args, bytes):
bytes_args = args
elif isinstance(args, str):
bytes_args = args.encode("utf-8")
else:
bytes_args = [
x if isinstance(x, bytes) else x.encode("utf-8")
for x in args
]
return bytes_args
def _safe_int(possible_int):
"""Create an int as safely as possbile.
Args:
possible_int: variable to create into a integer
Returns:
integration or None
"""
try:
return int(possible_int)
except (ValueError, TypeError):
return None
[docs]def get_timestamped_tag(tag):
"""Create tag with current timestamp.
Args:
tag: string, Base tag to be used
Returns
An updated tag with current timestamp
"""
return'%s-%s' % (
tag, datetime.datetime.now().strftime("%m%d-%H%M%S")
)
[docs]def validate_tag(tag):
"""Ensure tag will work as name for clouds that use it."""
# Currently google is the most restrictive, so just use that
# regex verbatum. You can trigger the error message that contains
# this regex by attempting to create an instance with a name
# of '-'
regex = r'^(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)$'
if not re.match(regex, tag):
raise ValueError(
'Invalid tag specified. After being timestamped, '
'tag must pass regex.\n'
'Regex: {}\n'
'Tag : {}'.format(regex, tag)
)
return tag
[docs]def update_nested(mapping, update):
"""Update mapping with update value at given update key.
Example:
original_dict = {'a': {'b': {'c': 'd'}}}
update = {'a': {'b': {'c': 'e'}}}
update_nested(original_dict, update)
original_dict == {'a': {'b': {'c': 'e'}}}
"""
for key, value in update.items():
if isinstance(value, collections.abc.Mapping):
mapping[key] = update_nested(mapping.get(key, {}), value)
else:
mapping[key] = value
return mapping