Source code for elasticluster.providers.ansible_provider

from __future__ import division

# compatiblity imports
from future import standard_library

# stdlib imports
from collections import defaultdict
from datetime import datetime
import logging
import os
import platform
import tempfile
import shlex
import shutil
from subprocess import call
from warnings import warn

# 3rd party imports
from pkg_resources import resource_filename
import yaml

# Elasticluster imports
import elasticluster
from elasticluster import log
from elasticluster.exceptions import ConfigurationError, ClusterSizeError
from elasticluster.providers import AbstractSetupProvider
from elasticluster.utils import get_num_processors, parse_ip_address_and_port, temporary_dir

__author__ = ','.join([
    'Nicolas Baer <>',
    'Antonio Messina <>',
    'Riccardo Murri <>',

[docs]class AnsibleSetupProvider(AbstractSetupProvider): """ This implementation uses ansible to configure and manage the cluster setup. See for details. :param dict groups: dictionary of node kinds with corresponding ansible groups to install on the node kind. e.g [node_kind] = ['ansible_group1', 'ansible_group2'] The group defined here can be references in each node. Therefore groups can make it easier to define multiple groups for one node. :param str playbook_path: path to playbook; if empty this will use the shared playbook of elasticluster :param dict environment_vars: dictonary to define variables per node kind, e.g. [node_kind][var] = value :param str storage_path: path to store the inventory file. By default the inventory file is saved temporarily in a temporary directory and deleted when the cluster in stopped. :param bool sudo: indication whether use sudo to gain root permission :param str sudo_user: user with root permission :param str ansible_module_dir: comma- or colon-separated path to additional ansible modules :param bool slow_but_safer: Avoid using ``eatmydata`` to speed up installation of many packages which comprise several smallish files. :param extra_conf: tbd. :ivar groups: node kind and ansible group mapping dictionary :ivar environment: additional environment variables """ #: to identify this provider type in messages HUMAN_READABLE_NAME = 'Ansible' def __init__(self, groups, playbook_path=None, environment_vars=None, storage_path=None, sudo=True, sudo_user='root', slow_but_safer=False, **extra_conf): self.groups = groups self._playbook_path = playbook_path self.environment = environment_vars or {} self.use_eatmydata = not slow_but_safer self._storage_path = storage_path self._sudo_user = sudo_user self._sudo = sudo if 'ssh_pipelining' in extra_conf: extra_conf['ansible_ssh_pipelining'] = extra_conf.pop('ssh_pipelining') warn( "Setup configuration option `ssh_pipelining`" " has been renamed to `ansible_ssh_pipelining`." " Please fix the configuration file(s), as support" " for the old spelling will be removed in a future release.", DeprecationWarning) if 'ansible_module_dir' in extra_conf: extra_conf['ansible_library'] = extra_conf.pop('ansible_module_dir') warn( "Setup configuration option `ansible_module_dir`" " has been renamed to `ansible_library`." " Please fix the configuration file(s), as support" " for the old spelling will be removed in a future release.", DeprecationWarning) self.extra_conf = extra_conf if not self._playbook_path: # according to # # requesting the filename to a directory causes all the # contained files and directories to be extracted as well playbook_dir = resource_filename('elasticluster', 'share/playbooks') self._playbook_path = os.path.join(playbook_dir, 'main.yml') else: self._playbook_path = os.path.expanduser(self._playbook_path) self._playbook_path = os.path.expandvars(self._playbook_path) # sanity check if not os.path.exists(self._playbook_path): raise ConfigurationError( "playbook `{playbook_path}` could not be found" .format(playbook_path=self._playbook_path)) if not os.path.isfile(self._playbook_path): raise ConfigurationError( "playbook `{playbook_path}` is not a file" .format(playbook_path=self._playbook_path)) potential_resume_playbook = os.path.join(os.path.dirname(self._playbook_path), 'resume.yml') if os.path.exists(potential_resume_playbook): self._resume_playbook_path = potential_resume_playbook else: self._resume_playbook_path = None if self._storage_path: self._storage_path = os.path.expanduser(self._storage_path) self._storage_path = os.path.expandvars(self._storage_path) self._storage_path_tmp = False if not os.path.exists(self._storage_path): os.makedirs(self._storage_path) else: self._storage_path = tempfile.mkdtemp() self._storage_path_tmp = True
[docs] def setup_cluster(self, cluster, extra_args=tuple()): """ Configure the cluster by running an Ansible playbook. The ElastiCluster configuration attribute `<kind>_groups` determines, for each node kind, what Ansible groups nodes of that kind are assigned to. :param cluster: cluster to configure :type cluster: :py:class:`elasticluster.cluster.Cluster` :param list extra_args: List of additional command-line arguments that are appended to each invocation of the setup program. :return: ``True`` on success, ``False`` otherwise. Please note, if nothing has to be configured, then ``True`` is returned. :raises: `ConfigurationError` if the playbook can not be found or is corrupt. """ return self._run_playbook(cluster, self._playbook_path, extra_args)
[docs] def resume_cluster(self, cluster, extra_args=tuple()): """ As `setup_cluster`, but prefers to run a resume playbook, if one is available. A resume playbook is a playbook which is designed to restart a cluster after it has been paused, and can be more efficient than a setup playbook (since it can assume that the required software is already installed). If no such playbook is available, it will use the standard setup playbook and print a warning. :param cluster: cluster to configure :type cluster: :py:class:`elasticluster.cluster.Cluster` :param list extra_args: List of additional command-line arguments that are appended to each invocation of the setup program. :return: ``True`` on success, ``False`` otherwise. Please note, if nothing has to be configured, then ``True`` is returned. :raises: `ConfigurationError` if the playbook can not be found or is corrupt. """ if self._resume_playbook_path is not None: return self._run_playbook(cluster, self._resume_playbook_path, extra_args) else: log.warning("No resume playbook is available - falling back to the setup " "playbook, which could be slow.") return self.setup_cluster(cluster, extra_args)
def _run_playbook(self, cluster, playbook, extra_args): run_id = ( 'elasticluster.{name}.{date}.{pid}@{host}' .format(,, pid=os.getpid(), host=platform.node(), ) ) inventory_path = self._build_inventory(cluster) if inventory_path is None: # no inventory file has been created: this can only happen # if no nodes have been started nor can be reached raise ClusterSizeError() assert os.path.exists(inventory_path), ( "inventory file `{inventory_path}` does not exist" .format(inventory_path=inventory_path)) # build list of directories to search for roles/include files ansible_roles_dirs = [ # include Ansible default first ... '/etc/ansible/roles', ] for root_path in [ # ... then ElastiCluster's built-in defaults resource_filename('elasticluster', 'share/playbooks'), # ... then wherever the playbook is os.path.dirname(playbook), ]: for path in [ root_path, os.path.join(root_path, 'roles'), ]: if path not in ansible_roles_dirs and os.path.exists(path): ansible_roles_dirs.append(path) # Use env vars to configure Ansible; # see all values in # # Ansible does not merge keys in configuration files: rather # it uses the first configuration file found. However, # environment variables can be used to selectively override # parts of the config; according to [1]: "they are mostly # considered to be a legacy system as compared to the config # file, but are equally valid." # # [1]: # # Provide default values for important configuration variables... ansible_env = { 'ANSIBLE_FORKS': '%d' % (4*get_num_processors()), 'ANSIBLE_HOST_KEY_CHECKING': 'no', 'ANSIBLE_RETRY_FILES_ENABLED': 'no', 'ANSIBLE_ROLES_PATH': ':'.join(reversed(ansible_roles_dirs)), 'ANSIBLE_SSH_PIPELINING': 'yes', 'ANSIBLE_TIMEOUT': '120', } try: import mitogen ansible_env['ANSIBLE_STRATEGY'] = 'mitogen_linear' ansible_env['ANSIBLE_STRATEGY_PLUGINS'] = resource_filename('ansible_mitogen', 'plugins/strategy') elasticluster.log.warning( "The `mitogen` module is installed," " and will be used for connections to remote hosts." " If you want to revert to the slower but safer" " plain SSH, please execute command" " 'export ANSIBLE_STRATEGY=linear'" " before running ElastiCluster.") except ImportError: pass try: import ara ara_location = os.path.dirname(ara.__file__) ansible_env['ANSIBLE_CALLBACK_PLUGINS'] = ( '{ara_location}/plugins/callbacks' .format(ara_location=ara_location)) ansible_env['ANSIBLE_ACTION_PLUGINS'] = ( '{ara_location}/plugins/actions' .format(ara_location=ara_location)) ansible_env['ANSIBLE_LIBRARY'] = ( '{ara_location}/plugins/modules' .format(ara_location=ara_location)) ara_dir = os.getcwd() ansible_env['ARA_DIR'] = ara_dir ansible_env['ARA_DATABASE'] = ( 'sqlite:///{ara_dir}/{run_id}.ara.sqlite' .format(ara_dir=ara_dir, run_id=run_id)) ansible_env['ARA_LOG_CONFIG'] = ( '{run_id}.ara.yml' .format(ara_dir=ara_dir, run_id=run_id)) ansible_env['ARA_LOG_FILE'] = ( '{run_id}.ara.log' .format(ara_dir=ara_dir, run_id=run_id)) ansible_env['ARA_LOG_LEVEL'] = 'DEBUG' ansible_env['ARA_PLAYBOOK_PER_PAGE'] = '0' ansible_env['ARA_RESULT_PER_PAGE'] = '0' except ImportError: "Could not import module `ara`:" " no detailed information about the playbook will be recorded.") # ...override them with key/values set in the config file(s) for k, v in self.extra_conf.items(): if k.startswith('ansible_'): ansible_env[k.upper()] = str(v) # ...finally allow the environment have the final word ansible_env.update(os.environ) # however, this is needed for correct detection of success/failure... ansible_env['ANSIBLE_ANY_ERRORS_FATAL'] = 'yes' # ...and this might be needed to connect (see issue #567) if cluster.ssh_proxy_command: ansible_env['ANSIBLE_SSH_ARGS'] = ( ansible_env.get('ANSIBLE_SSH_ARGS', '') + (" -o ProxyCommand='{proxy_command}'" # NOTE: in contrast to `Node.connect()`, we must # *not* expand %-escapes in the SSH proxy command: # it will be done by the `ssh` client .format(proxy_command=cluster.ssh_proxy_command))) # report on calling environment if __debug__: elasticluster.log.debug( "Calling `ansible-playbook` with the following environment:") for var, value in sorted(ansible_env.items()): # sanity check. Do not print password content.... if "password" in var.lower() or "secret" in var.lower(): elasticluster.log.debug("- %s=******", var) else: elasticluster.log.debug("- %s=%r", var, value) elasticluster.log.debug("Using playbook file %s.", playbook) # build `ansible-playbook` command-line cmd = shlex.split(self.extra_conf.get('ansible_command', 'ansible-playbook')) cmd += [ ('--private-key=' + cluster.user_key_private), os.path.realpath(playbook), ('--inventory=' + inventory_path), ] if self._sudo: cmd.extend([ # force all plays to use `sudo` (even if not marked as such) '--become', # desired sudo-to user ('--become-user=' + self._sudo_user), ]) # determine Ansible verbosity as a function of ElastiCluster's # log level (we cannot read `ElastiCluster().params.verbose` # here, still we can access the log configuration since it's # global). verbosity = int((logging.WARNING - elasticluster.log.getEffectiveLevel()) / 10) if verbosity > 0: cmd.append('-' + ('v' * verbosity)) # e.g., `-vv` # append any additional arguments provided by users in config file ansible_extra_args = self.extra_conf.get('ansible_extra_args', None) if ansible_extra_args: cmd += shlex.split(ansible_extra_args) # finally, append any additional arguments provided on command-line for arg in extra_args: # XXX: since we are going to change working directory, # make sure that anything that looks like a path to an # existing file is made absolute before appending to # Ansible's command line. (Yes, this is a ugly hack.) if os.path.exists(arg): arg = os.path.abspath(arg) cmd.append(arg) with temporary_dir(): # adjust execution environment, for the part that needs a # the current directory path cmd += [ '-e', ('@' + self._write_extra_vars(cluster)) ] # run it! cmdline = ' '.join(cmd) elasticluster.log.debug( "Running Ansible command `%s` ...", cmdline) rc = call(cmd, env=ansible_env, bufsize=1, close_fds=True) # check outcome ok = False # pessimistic default if rc != 0: elasticluster.log.error( "Command `%s` failed with exit code %d.", cmdline, rc) else: # even if Ansible exited with return code 0, the # playbook might still have failed -- so explicitly # check for a "done" report showing that each node run # the playbook until the very last task cluster_hosts = set( for node in cluster.get_all_nodes()) done_hosts = set() for node_name in cluster_hosts: try: with open(node_name + '.log') as stream: status = if status == 'done': done_hosts.add(node_name) except (OSError, IOError): # no status file for host, do not add it to # `done_hosts` pass if done_hosts == cluster_hosts: # success! ok = True elif len(done_hosts) == 0: # total failure elasticluster.log.error( "No host reported successfully running the setup playbook!") else: # partial failure elasticluster.log.error( "The following nodes did not report" " successful termination of the setup playbook:" " %s", (', '.join(cluster_hosts - done_hosts))) if ok:"Cluster correctly configured.") return True else: elasticluster.log.warning( "The cluster has likely *not* been configured correctly." " You may need to re-run `elasticluster setup`.") return False def _build_inventory(self, cluster): """ Builds the inventory for the given cluster and returns its path :param cluster: cluster to build inventory for :type cluster: :py:class:`elasticluster.cluster.Cluster` """ inventory_data = defaultdict(list) for node in cluster.get_all_nodes(): if node.preferred_ip is None: log.warning( "Ignoring node `{0}`: No IP address." .format( continue if node.kind not in self.groups: # FIXME: should this raise a `ConfigurationError` instead? log.warning( "Ignoring node `{0}`:" " Node kind `{1}` not defined in cluster!" .format(, node.kind)) continue extra_vars = ['ansible_user=%s' % node.image_user] ip_addr, port = parse_ip_address_and_port(node.preferred_ip) if port != 22: extra_vars.append('ansible_port=%s' % port) # write additional `ansible_*` variables to inventory; # `ansible_python_interpreter` gets special treatment # since we need to tell script `` that # it should create a wrapper script for running `eatmydata python` extra_conf = self.extra_conf.copy() ansible_python_interpreter = extra_conf.pop( 'ansible_python_interpreter', '/usr/bin/python') extra_vars.append('ansible_python_interpreter={python}{eatmydata}' .format( python=ansible_python_interpreter, eatmydata=('+eatmydata' if self.use_eatmydata else ''))) # abuse Python's %r fomat to provide quotes around the # value, and \-escape any embedded quote chars extra_vars.extend('%s=%r' % (k, str(v)) for k, v in extra_conf.items() if k.startswith('ansible_')) if node.kind in self.environment: # abuse Python's %r fomat to provide quotes around the # value, and \-escape any embedded quote chars extra_vars.extend('%s=%r' % (k, str(v)) for k, v in self.environment[node.kind].items()) for group in self.groups[node.kind]: inventory_data[group].append( (, ip_addr, ' '.join(extra_vars))) if not inventory_data:"No inventory file was created.") return None # create a temporary file to pass to ansible, since the # api is not stable yet... if self._storage_path_tmp: if not self._storage_path: self._storage_path = tempfile.mkdtemp() elasticluster.log.warning( "Writing inventory file to tmp dir `%s`", self._storage_path) inventory_path = os.path.join( self._storage_path, ( + '.inventory')) log.debug("Writing Ansible inventory to file `%s` ...", inventory_path) with open(inventory_path, 'w+') as inventory_file: for section, hosts in inventory_data.items(): # Ansible throws an error "argument of type 'NoneType' is not # iterable" if a section is empty, so ensure we have something # to write in there if hosts: inventory_file.write("\n[" + section + "]\n") for host in hosts: hostline = "{0} ansible_host={1} {2}\n".format(*host) inventory_file.write(hostline) return inventory_path
[docs] def cleanup(self, cluster): """Deletes the inventory file used last recently used. :param cluster: cluster to clear up inventory file for :type cluster: :py:class:`elasticluster.cluster.Cluster` """ if self._storage_path and os.path.exists(self._storage_path): filename = ( + '.inventory') inventory_path = os.path.join(self._storage_path, filename) if os.path.exists(inventory_path): try: os.unlink(inventory_path) if self._storage_path_tmp: if len(os.listdir(self._storage_path)) == 0: shutil.rmtree(self._storage_path) except OSError as ex: log.warning( "AnsibileProvider: Ignoring error while deleting " "inventory file %s: %s", inventory_path, ex)
def __setstate__(self, state): self.__dict__ = state # Compatibility fix: allow loading clusters created before # option `ssh_pipelining` was added. if 'ssh_pipelining' not in state: self.ssh_pipelining = True def _write_extra_vars(self, cluster, filename='extra_vars.yml'): # build dict of "extra vars" # XXX: we should not repeat here names of attributes that # should not be exported... it would be better to use a simple # naming convention (e.g., omit whatever starts with `_`) extra_vars = cluster.to_vars_dict() extra_vars.update(extra_vars.pop('extra', {})) extra_vars['cloud'] = cluster.cloud_provider.to_vars_dict() nodes = extra_vars.pop('nodes') extra_vars['nodes'] = {} for kind, instances in nodes.items(): for node in instances: node_vars = node.to_vars_dict() node_vars.update(node_vars.pop('extra', {})) extra_vars['nodes'][] = node_vars extra_vars['output_dir'] = os.getcwd() # save it to a YAML file log.debug("Writing extra vars %r to file %s", extra_vars, filename) with open(filename, 'w') as output: # ensure output file is not readable to other users, # as it may contain passwords os.fchmod(output.fileno(), 0o600) # dump variables in YAML format for Ansible to read yaml.dump({ 'elasticluster': extra_vars }, output) return filename