Source code for elasticluster.utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-#
# Copyright (C) 2016, 2018, 2019 University of Zurich. All rights reserved.
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not,

__docformat__ = 'reStructuredText'
__author__ = 'Riccardo Murri <>'

# stdlib imports
from builtins import object
from contextlib import contextmanager
import functools
import os
import re
import signal
import shutil
import sys
import tempfile
import time
    # Python 2
    from UserDict import DictMixin
except ImportError:
    from import MutableMapping as DictMixin

# 3rd party imports
import click
import netaddr

[docs]def confirm_or_abort(prompt, exitcode=os.EX_TEMPFAIL, msg=None, **extra_args): """ Prompt user for confirmation and exit on negative reply. Arguments `prompt` and `extra_args` will be passed unchanged to `click.confirm`:func: (which is used for actual prompting). :param str prompt: Prompt string to display. :param int exitcode: Program exit code if negative reply given. :param str msg: Message to display before exiting. """ if click.confirm(prompt, **extra_args): return True else: # abort if msg: sys.stderr.write(msg) sys.stderr.write('\n') sys.exit(exitcode)
[docs]@contextmanager def environment(**kv): """ Context manager to run Python code with a modified UNIX process environment. All key/value pairs in the keyword arguments are added (or changed, if the key names an existing environmental variable) in the process environment upon entrance into the context. Changes are undone upon exit: added environmental variables are removed from the environment, and those whose value was changed are reset to their pristine value. """ added = [] changed = {} for key, value in kv.items(): if key not in os.environ: added.append(key) else: changed[key] = os.environ[key] os.environ[key] = value yield # restore pristine process environment for key in added: del os.environ[key] for key in changed: os.environ[key] = changed[key]
[docs]def expand_ssh_proxy_command(command, user, addr, port=22): """ Expand spacial digraphs ``%h``, ``%p``, and ``%r``. Return a copy of `command` with the following string substitutions applied: * ``%h`` is replaced by *addr* * ``%p`` is replaced by *port* * ``%r`` is replaced by *user* * ``%%`` is replaced by ``%``. See also: man page ``ssh_config``, section "TOKENS". """ translated = [] subst = { 'h': list(str(addr)), 'p': list(str(port)), 'r': list(str(user)), '%': ['%'], } escaped = False for char in command: if char == '%': escaped = True continue if escaped: try: translated.extend(subst[char]) escaped = False continue except KeyError: raise ValueError( "Unknown digraph `%{0}`" " in proxy command string `{1}`" .format(char, command)) else: translated.append(char) continue return ''.join(translated)
[docs]def fingerprint_str(key): """ Return a printable string representation of an SSH key fingerprint. """ from binascii import hexlify from codecs import decode return str( insert_char_every_n_chars(2, ':', decode(hexlify(key.get_fingerprint()), 'ascii')))
[docs]def get_num_processors(): """ Return number of online processor cores. """ # try different strategies and use first one that succeeeds try: return os.cpu_count() # Py3 only except AttributeError: pass try: import multiprocessing return multiprocessing.cpu_count() except ImportError: # no multiprocessing? pass except NotImplementedError: # multiprocessing cannot determine CPU count pass try: import subprocess32 try: ncpus = check_output('nproc') return int(ncpus) except subprocess32.CalledProcessError: # no `/usr/bin/nproc` pass except (ValueError, TypeError): # unexpected output from `nproc` pass except ImportError: # no subprocess32? pass try: import subprocess try: ncpus = subprocess.check_output('nproc') return int(ncpus) except subprocess.CalledProcessError: # no `/usr/bin/nproc` pass except (ValueError, TypeError): # unexpected output from `nproc` pass except ImportError: # no subprocess.check_call (Py 2.6) pass raise RuntimeError("Cannot determine number of processors")
[docs]def has_nested_keys(mapping, k1, *more): """ Return ``True`` if `mapping[k1][k2]...[kN]` is valid. Example:: >>> D = { ... 'a': { ... 'x':0, ... 'y':{ ... 'z': 1, ... }, ... }, ... 'b': 3 ... } >>> has_nested_keys(D, 'a', 'x') True >>> has_nested_keys(D, 'a', 'y', 'z') True >>> has_nested_keys(D, 'a', 'q') False When a single key is passed, this is just another way of writing ``k1 in mapping``:: >>> has_nested_keys(D, 'b') True """ if k1 in mapping: if more: return has_nested_keys(mapping[k1], *more) else: return True else: return False
# original source: def insert_char_every_n_chars(every, char, string): return char.join( string[i:i + every] for i in range(0, len(string), every))
[docs]class memoize(object): """ Cache a function's return value each time it is called within a TTL. If called within the TTL and the same arguments, the cached value is returned, If called outside the TTL or a different value, a fresh value is returned (and cached for future occurrences). .. warning:: Only works on functions that take *no keyword arguments*. Originally taken from: """ def __init__(self, ttl): self.cache = {} self.ttl = ttl def __call__(self, f): @functools.wraps(f) def wrapped_f(*args): now = time.time() try: value, last_update = self.cache[args] if self.ttl > 0 and now - last_update > self.ttl: raise AttributeError return value except (KeyError, AttributeError): value = f(*args) self.cache[args] = (value, now) return value except TypeError: # uncachable -- for instance, passing a list as an argument. # Better to not cache than to blow up entirely. return f(*args) return wrapped_f
# this is very liberal, in that it will accept malformed address # strings like `0:::1` or '0::1::2', but we are going to do validation # with `netaddr.IPAddress` later on so there is little advantage in # being strict here _IPV6_FRAG = r'[0-9a-z:]+' # likewise _IPV4_FRAG = r'[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' # should match a network interface name (for which there is no # standard, so let's just assume it's alphanumeric) _IFACE_FRAG = r'[a-z][0-9a-z]*' # XXX: order is important! IPv4 must come before IPv6 otherwise the # _IPV6_FRAG will match a *part* of an IPv4 adress.... _IP_ADDRESS_RE = [ # IPv4 literal, optionally with port re.compile( r'(?P<ip_addr>{0})(?P<port>:\d+)?' .format(_IPV4_FRAG), re.I), # the kind of IPv6 literals returned by Azure, e.g., `[fe80::dead:beef%eth0]:2222` re.compile( r'\[(?P<ip_addr>{0})(?P<iface>%{1})?\](?P<port>:\d+)?' .format(_IPV6_FRAG, _IFACE_FRAG), re.I), # IPv6 literal possibly with interface spec (note this cannot provide any port) re.compile( r'(?P<ip_addr>{0})(?P<iface>%{1})?' .format(_IPV6_FRAG, _IFACE_FRAG), re.I), ]
[docs]def parse_ip_address_and_port(addr, default_port=22): """ Return a pair (IP address, port) extracted from string `addr`. Different formats are accepted for the address/port string: * IPv6 literals in square brackets, with or without an optional port specification, as used in URLs:: >>> parse_ip_address_and_port('[fe80::dead:beef]:1234') (IPAddress('fe80::dead:beef'), 1234) >>> parse_ip_address_and_port('[fe80::dead:beef]') (IPAddress('fe80::dead:beef'), 22) * IPv6 literals with a "local interface" specification:: >>> parse_ip_address_and_port('[fe80::dead:beef%eth0]') (IPAddress('fe80::dead:beef'), 22) >>> parse_ip_address_and_port('fe80::dead:beef%eth0') (IPAddress('fe80::dead:beef'), 22) * bare IPv6 addresses:: >>> parse_ip_address_and_port('fe80::dead:beef') (IPAddress('fe80::dead:beef'), 22) >>> parse_ip_address_and_port('2001:db8:5ca1:1f0:f816:3eff:fe05:f40f') (IPAddress('2001:db8:5ca1:1f0:f816:3eff:fe05:f40f'), 22) * IPv4 addresses, with or without an additional port specification:: >>> parse_ip_address_and_port('') (IPAddress(''), 22) >>> parse_ip_address_and_port('') (IPAddress(''), 999) Note that the default port can be changed by passing an additional parameter:: >>> parse_ip_address_and_port('', 987) (IPAddress(''), 987) >>> parse_ip_address_and_port('fe80::dead:beef', 987) (IPAddress('fe80::dead:beef'), 987) :raise netaddr.AddrFormatError: Upon parse failure, e.g., syntactically incorrect IP address. """ # we assume one and only one of the regexps will match for regexp in _IP_ADDRESS_RE: match = if not match: continue # can raise netaddr.AddrFormatError ip_addr = netaddr.IPAddress('ip_addr')) try: port ='port') except IndexError: port = None if port is not None: port = int(port[1:]) # skip leading `:` else: port = default_port return ip_addr, port # parse failed raise netaddr.AddrFormatError( "Could not extract IP address and port from `{1}`" .format(addr))
[docs]def setitem_nested(mapping, path, value, cls=None): """ Set a leaf key to a value in a nested mapping. Argument *path* is a list of keys: *setitem_nested* will recursively descend *mapping* and set the last key in *path* to the given *value*. Example:: >>> D = { 'a': 1, 'b': {'c': {'d': 2}}} >>> setitem_nested(D, ['b', 'c', 'd'], 3) {'a': 1, 'b': {'c': {'d': 3}}} >>> assert D['b']['c']['d'] == 3 >>> assert D['a'] == 1 Argument *value* can be any valid Python value:: >>> setitem_nested(D, ['b', 'c', 'd'], [1, 2]) {'a': 1, 'b': {'c': {'d': [1, 2]}}} >>> assert D['b']['c']['d'] == [1, 2] Argument *path* need not lead to a leaf key; any nested key can be overwritten:: >>> setitem_nested(D, ['b', 'c'], {'e': 4}) {'a': 1, 'b': {'c': {'e': 4}}} >>> assert D['b']['c']['e'] == 4 As a degenerate case, if *path* is a list of one element, ``setitem_nested`` works exactly like dictionary key assignment:: >>> setitem_nested(D, ['a'], {'f': 5}) {'a': {'f': 5}, 'b': {'c': {'e': 4}}} >>> assert D['a']['f'] == 5 If, at any point during the descent (except at the end of *path*), a non-existing key is found, it will be bound to a new mapping of type *cls* (default: the same class as first argument `mapping` itself):: >>> setitem_nested(D, ['a', 'g', 'h'], 6) {'a': {'g': {'h': 6}, 'f': 5}, 'b': {'c': {'e': 4}}} >>> assert D['a']['g'] == {'h': 6} The type of such constructed mappings can be set with optional last argument *cls*. """ assert len(path) > 0, ( "Argument `path` to `setitem_nested` cannot be empty!") key = path[0] if cls is None: # use same class as `mapping` cls = mapping.__class__ if len(path) > 1: if key in mapping: branch = mapping[key] else: branch = cls() mapping[key] = setitem_nested(branch, path[1:], value) else: mapping[key] = value return mapping
# copied over from GC3Pie's ``
[docs]def string_to_boolean(word): """ Convert `word` to a Python boolean value and return it. The strings `true`, `yes`, `on`, `1` (with any capitalization and any amount of leading and trailing spaces) are recognized as meaning Python `True`:: >>> string_to_boolean('yes') True >>> string_to_boolean('Yes') True >>> string_to_boolean('YES') True >>> string_to_boolean(' 1 ') True >>> string_to_boolean('True') True >>> string_to_boolean('on') True Any other word is considered as boolean `False`:: >>> string_to_boolean('no') False >>> string_to_boolean('No') False >>> string_to_boolean('Nay!') False >>> string_to_boolean('woo-hoo') False This includes also the empty string and whitespace-only:: >>> string_to_boolean('') False >>> string_to_boolean(' ') False """ if word.strip().lower() in ['true', 'yes', 'on', '1']: return True else: return False
# In Python 2.7 still, `DictMixin` is an old-style class; thus, we need # to make `Struct` inherit from `object` otherwise we loose properties # when setting/pickling/unpickling and *very importantly* the ability to # use `@property` ...
[docs]class Struct(DictMixin, object): """ A `dict`-like object, whose keys can be accessed with the usual '[...]' lookup syntax, or with the '.' get attribute syntax. Examples:: >>> a = Struct() >>> a['x'] = 1 >>> a.x 1 >>> a.y = 2 >>> a['y'] 2 Values can also be initially set by specifying them as keyword arguments to the constructor:: >>> a = Struct(z=3) >>> a['z'] 3 >>> a.z 3 Like `dict` instances, `Struct`s have a `copy` method to get a shallow copy of the instance: >>> b = a.copy() >>> b.z 3 .. note:: This class is a clone of the `gc3libs.utils.Struct` class from the `GC3Pie package sources <>`_ """ def __init__(self, initializer=None, **extra): if initializer is not None: try: # initializer is `dict`-like? for name, value in initializer.items(): self[name] = value except AttributeError: # initializer is a sequence of (name,value) pairs? for name, value in initializer: self[name] = value for name, value in extra.items(): self[name] = value
[docs] def copy(self): """Return a (shallow) copy of this `Struct` instance.""" return Struct(self)
# the `DictMixin` class defines all std `dict` methods, provided # that `__getitem__`, `__setitem__` and `keys` are defined. def __delitem__(self, name): del self.__dict__[name] def __getitem__(self, name): return self.__dict__[name] def __setitem__(self, name, val): self.__dict__[name] = val def __iter__(self): return iter(self.__dict__) def __len__(self): return len(self.__dict__)
[docs] def keys(self): return list(self.__dict__.keys())
[docs]@contextmanager def sighandler(signum, handler): """ Context manager to run code with UNIX signal `signum` bound to `handler`. The existing handler is saved upon entering the context and restored upon exit. The `handler` argument may be anything that can be passed to Python's `signal.signal <>`_ standard library call. """ prev_handler = signal.getsignal(signum) signal.signal(signum, handler) yield signal.signal(signum, prev_handler)
[docs]@contextmanager def temporary_dir(delete=True, dir=None, prefix='elasticluster.', suffix='.d'): """ Make a temporary directory and make it current for the code in this context. Delete temporary directory upon exit from the context, unless ``delete=False`` is passed in the arguments. Arguments *suffix*, *prefix* and *dir* are exactly as in :func:`tempfile.mkdtemp()` (but have different defaults). """ cwd = os.getcwd() tmpdir = tempfile.mkdtemp(suffix, prefix, dir) os.chdir(tmpdir) yield os.chdir(cwd) if delete: shutil.rmtree(tmpdir, ignore_errors=True)
[docs]@contextmanager def timeout(delay, handler=None): """ Context manager to run code and deliver a SIGALRM signal after `delay` seconds. Note that `delay` must be a whole number; otherwise it is converted to an integer by Python's `int()` built-in function. For floating-point numbers, that means rounding off to the nearest integer from below. If the optional argument `handler` is supplied, it must be a callable that is invoked if the alarm triggers while the code is still running. If no `handler` is provided (default), then a `RuntimeError` with message ``Timeout`` is raised. """ delay = int(delay) if handler is None: def default_handler(signum, frame): raise RuntimeError("{:d} seconds timeout expired".format(delay)) handler = default_handler prev_sigalrm_handler = signal.getsignal(signal.SIGALRM) signal.signal(signal.SIGALRM, handler) signal.alarm(delay) yield signal.alarm(0) signal.signal(signal.SIGALRM, prev_sigalrm_handler)
## Warnings redirection # # This is a modified version of the `logging.captureWarnings()` code from # the Python 2.7 standard library: # # - backport the code to Python 2.6 # - make the logger configurable # # The original copyright notice is reproduced below: # # Copyright 2001-2014 by Vinay Sajip. All Rights Reserved. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose and without fee is hereby granted, # provided that the above copyright notice appear in all copies and that # both that copyright notice and this permission notice appear in # supporting documentation, and that the name of Vinay Sajip # not be used in advertising or publicity pertaining to distribution # of the software without specific, written prior permission. # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # import logging # ensure that `logging.NullHandler` is defined on Python 2.6 as well; # see: try: logging.NullHandler except AttributeError: class _NullHandler(logging.Handler): def emit(self, record): pass logging.NullHandler = _NullHandler import warnings _warnings_showwarning = None class _WarningsLogger(object): """ Redirect warning messages to a chosen logger. This is a callable object that implements a compatible interface to `warnings.showwarning` (which it is supposed to replace). """ def __init__(self, logger_name, format_warning=warnings.formatwarning): self._logger = logging.getLogger(logger_name) if not self._logger.handlers: self._logger.addHandler(logging.NullHandler()) self._format_warning = format_warning def __call__(self, message, category, filename, lineno, file=None, line=None): """ Implementation of showwarnings which redirects to logging, which will first check to see if the file parameter is None. If a file is specified, it will delegate to the original warnings implementation of showwarning. Otherwise, it will call warnings.formatwarning and will log the resulting string to a warnings logger named "py.warnings" with level logging.WARNING. """ if file is not None: assert _warnings_showwarning is not None _warnings_showwarning(message, category, filename, lineno, file, line) else: self._logger.warning( self._format_warning(message, category, filename, lineno))
[docs]def format_warning_oneline(message, category, filename, lineno, file=None, line=None): """ Format a warning for logging. The returned value should be a single-line string, for better logging style (although this is not enforced by the code). This methods' arguments have the same meaning of the like-named arguments from `warnings.formatwarning`. """ # `warnings.formatwarning` produces multi-line output that does # not look good in a log file, so let us replace it with something # simpler... return ('{category}: {message}' .format(message=message, category=category.__name__))
[docs]def redirect_warnings(capture=True, logger='py.warnings'): """ If capture is true, redirect all warnings to the logging package. If capture is False, ensure that warnings are not redirected to logging but to their original destinations. """ global _warnings_showwarning if capture: assert _warnings_showwarning is None _warnings_showwarning = warnings.showwarning # `warnings.showwarning` must be a function, a generic # callable object is not accepted ... warnings.showwarning = _WarningsLogger(logger, format_warning_oneline).__call__ else: assert _warnings_showwarning is not None warnings.showwarning = _warnings_showwarning _warnings_showwarning = None