Source code for elasticluster.subcommands

#! /usr/bin/env python
#
# Copyright (C) 2013 GC3, University of Zurich
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
__author__ = 'Nicolas Baer <nicolas.baer@uzh.ch>'

# stdlib imports
from abc import ABCMeta, abstractmethod
from fnmatch import fnmatch
import os
import sys

# Elasticluster imports
from elasticluster.conf import Configurator
from elasticluster import log
from elasticluster.exceptions import ClusterNotFound, ConfigurationError, \
    ImageError, SecurityGroupError, NodeNotFound, ClusterError


[docs]class AbstractCommand(): """ Defines the general contract every command has to fulfill in order to be recognized by the arguments list and executed afterwards. """ __metaclass__ = ABCMeta def __init__(self, params): """ A reference to the parameters of the command line will be passed here to adjust the functionality of the command properly. """ self.params = params @abstractmethod
[docs] def setup(self, subparsers): """ This method handles the setup of the subcommand. In order to do so, every command has to add a parser to the subparsers reference given as parameter. The following example is the minimum implementation of such a setup procedure: parser = subparsers.add_parser("start") parser.set_defaults(func=self.execute) """ pass
@abstractmethod
[docs] def execute(self): """ This method is executed after a command was recognized and may vary in its behavior. """ pass
def __call__(self): return self.execute()
[docs] def pre_run(self): """ Overrides this method to execute any pre-run code, especially to check any command line options. """ pass
def cluster_summary(cluster): try: frontend = cluster.get_frontend_node().name except NodeNotFound, ex: frontend = 'unknown' log.error("Unable to get information on the frontend node: " "%s", str(ex)) msg = """ Cluster name: %s Frontend node: %s """ % (cluster.name, frontend) for cls in cluster.nodes: msg += "- %s nodes: %d\n" % (cls, len(cluster.nodes[cls])) msg += """ To login on the frontend node, run the command: elasticluster ssh %s To upload or download files to the cluster, use the command: elasticluster sftp %s """ % (cluster.name, cluster.name) return msg
[docs]class Start(AbstractCommand): """ Create a new cluster using the given cluster template. """ def setup(self, subparsers): parser = subparsers.add_parser( "start", help="Create a cluster using the supplied configuration.", description=self.__doc__) parser.set_defaults(func=self) parser.add_argument('cluster', help="Type of cluster. It refers to a " "configuration stanza [cluster/<name>]") parser.add_argument('-v', '--verbose', action='count', default=0, help="Increase verbosity.") parser.add_argument('-n', '--name', dest='cluster_name', help='Name of the cluster.') parser.add_argument('--nodes', metavar='N1:GROUP[,N2:GROUP2,...]', help='Override the values in of the configuration ' 'file and starts `N1` nodes of group `GROUP`,' 'N2 of GROUP2 etc...') parser.add_argument('--no-setup', action="store_true", default=False, help="Only start the cluster, do not configure it") def pre_run(self): self.params.extra_conf = {} try: if self.params.nodes: nodes = self.params.nodes.split(',') for nspec in nodes: n, group = nspec.split(':') if not n.isdigit(): raise ConfigurationError( "Invalid syntax for option `--nodes`: " "`%s` is not an integer." % n) n = int(n) self.params.extra_conf[group + '_nodes'] = n except ValueError: raise ConfigurationError( "Invalid argument for option --nodes: %s" % self.params.nodes)
[docs] def execute(self): """ Starts a new cluster. """ cluster_template = self.params.cluster if self.params.cluster_name: cluster_name = self.params.cluster_name else: cluster_name = self.params.cluster configurator = Configurator.fromConfig( self.params.config, storage_path=self.params.storage) # overwrite configuration for option, value in self.params.extra_conf.iteritems(): cconf = configurator.cluster_conf[cluster_template]['cluster'] if option in cconf: cconf[option] = value # First, check if the cluster is already created. try: cluster = configurator.load_cluster(cluster_name) except ClusterNotFound as e: try: cluster = configurator.create_cluster( cluster_template, cluster_name) except ConfigurationError, e: log.error("Starting cluster %s: %s\n" % (cluster_template, e)) return try: for cls in cluster.nodes: print("Starting cluster `%s` with %d %s nodes." % ( cluster.name, len(cluster.nodes[cls]), cls)) print("(this may take a while...)") conf = configurator.cluster_conf[cluster_template] min_nodes = dict( (k[:-10], int(v)) for k, v in conf['cluster'].iteritems() if k.endswith('_nodes_min')) cluster.start(min_nodes=min_nodes) if self.params.no_setup: print("NOT configuring the cluster as requested.") else: print("Configuring the cluster.") print("(this too may take a while...)") ret = cluster.setup() if ret: print("Your cluster is ready!") else: print("\nWARNING: YOUR CLUSTER IS NOT READY YET!") print(cluster_summary(cluster)) except (KeyError, ImageError, SecurityGroupError, ClusterError) as ex: print("Your cluster could not start `%s`" % ex)
[docs]class Stop(AbstractCommand): """ Stop a cluster and terminate all associated virtual machines. """
[docs] def setup(self, subparsers): """ @see abstract_command contract """ parser = subparsers.add_parser( "stop", help="Stop a cluster and all associated VM instances.", description=self.__doc__) parser.set_defaults(func=self) parser.add_argument('cluster', help='name of the cluster') parser.add_argument('-v', '--verbose', action='count', default=0, help="Increase verbosity.") parser.add_argument('--force', action="store_true", default=False, help="Remove the cluster even if not all the nodes" " have been terminated properly.") parser.add_argument('--yes', action="store_true", default=False, help="Assume `yes` to all queries and " "do not prompt.")
[docs] def execute(self): """ Stops the cluster if it's running. """ cluster_name = self.params.cluster configurator = Configurator.fromConfig( self.params.config, storage_path=self.params.storage) try: cluster = configurator.load_cluster(cluster_name) except (ClusterNotFound, ConfigurationError), ex: log.error("Stopping cluster %s: %s\n" % (cluster_name, ex)) return if not self.params.yes: # Ask for confirmation yesno = raw_input( "Do you want really want to stop " "cluster %s? [yN] " % cluster_name) if yesno.lower() not in ['yes', 'y']: print("Aborting as per user request.") sys.exit(0) print("Destroying cluster `%s`" % cluster_name) cluster.stop(force=self.params.force)
[docs]class ResizeCluster(AbstractCommand): """ Resize the cluster by adding or removing compute nodes. """ def setup(self, subparsers): parser = subparsers.add_parser( "resize", help="Resize a cluster by adding or removing " "compute nodes.", description=self.__doc__) parser.set_defaults(func=self) parser.add_argument('cluster', help='name of the cluster') parser.add_argument('-a', '--add', metavar='N1:GROUP1[,N2:GROUP2]', help="Add N1 nodes of group GROUP1, " "N2 of group GROUP2 etc...") parser.add_argument('-r', '--remove', metavar='N1:GROUP1[,N2:GROUP2]', help="Remove N1 nodes of group GROUP1, " "N2 of group GROUP2 etc...") parser.add_argument('-t', '--template', help='name of the template ' 'of this cluster') parser.add_argument('-v', '--verbose', action='count', default=0, help="Increase verbosity.") parser.add_argument('--no-setup', action="store_true", default=False, help="Only start the cluster, do not configure it") parser.add_argument('--yes', action="store_true", default=False, help="Assume `yes` to all queries and " "do not prompt.") def pre_run(self): self.params.nodes_to_add = {} self.params.nodes_to_remove = {} try: if self.params.add: nodes = self.params.add.split(',') for nspec in nodes: n, group = nspec.split(':') if not n.isdigit(): raise ConfigurationError( "Invalid syntax for option `--nodes`: " "`%s` is not an integer." % n) self.params.nodes_to_add[group] = int(n) if self.params.remove: nodes = self.params.remove.split(',') for nspec in nodes: n, group = nspec.split(':') self.params.nodes_to_remove[group] = int(n) except ValueError as ex: raise ConfigurationError( "Invalid syntax for argument: %s" % ex) def execute(self): configurator = Configurator.fromConfig( self.params.config, storage_path=self.params.storage) # Get current cluster configuration cluster_name = self.params.cluster template = self.params.template try: cluster = configurator.load_cluster(cluster_name) cluster.update() except (ClusterNotFound, ConfigurationError), ex: log.error("Listing nodes from cluster %s: %s\n" % (cluster_name, ex)) return for grp in self.params.nodes_to_add: print("Adding %d %s node(s) to the cluster" "" % (self.params.nodes_to_add[grp], grp)) # Currently we can't save which template was used to setup a # cluster, therefore we imply the configuration of the new nodes # to match already existent nodes in this group. If no node was # added to this group yet, it will abort and ask for the # `--template` argument. # TODO: find a better solution for this problem, it makes things # complicated for the user if (not grp in cluster.nodes or not cluster.nodes[grp]) \ and not template: print "Elasticluster can not infer which template to use for "\ "the new node(s). Please provide the template with " \ "the `-t` or `--template` option" return if not template: sample_node = cluster.nodes[grp][0] for i in range(self.params.nodes_to_add[grp]): cluster.add_node(grp, sample_node.image, sample_node.image_user, sample_node.flavor, sample_node.security_group, image_userdata=sample_node.image_userdata) else: conf = configurator.cluster_conf[template] conf_kind = conf['nodes'][grp] for i in range(self.params.nodes_to_add[grp]): image_user = conf['login']['image_user'] userdata = conf_kind.get('image_userdata', '') cluster.add_node(grp, conf_kind['image_id'], image_user, conf_kind['flavor'], conf_kind['security_group'], image_userdata=userdata) for grp in self.params.nodes_to_remove: n_to_rm = self.params.nodes_to_remove[grp] print("Removing %d %s node(s) from the cluster." "" % (n_to_rm, grp)) to_remove = cluster.nodes[grp][-n_to_rm:] print("The following nodes will be removed from the cluster.") print(" " + str.join("\n ", [n.name for n in to_remove])) if not self.params.yes: # Ask for confirmation. yesno = raw_input( "Do you really want to remove them? [yN] ") if yesno.lower() not in ['yes', 'y']: print("Aborting as per user request.") sys.exit(0) for node in to_remove: cluster.nodes[grp].remove(node) node.stop() cluster.start() if self.params.no_setup: print("NOT configuring the cluster as requested.") else: print("Reconfiguring the cluster.") cluster.setup() print(cluster_summary(cluster))
[docs]class ListClusters(AbstractCommand): """ Print a list of all clusters that have been started. """ def setup(self, subparsers): parser = subparsers.add_parser( "list", help="List all started clusters.", description=self.__doc__) parser.set_defaults(func=self) parser.add_argument('-v', '--verbose', action='count', default=0, help="Increase verbosity.") def execute(self): configurator = Configurator.fromConfig( self.params.config, storage_path=self.params.storage) repository = configurator.create_repository() clusters = repository.get_all() if not clusters: print("No clusters found.") else: print(""" The following clusters have been started. Please note that there's no guarantee that they are fully configured: """) for cluster in sorted(clusters): print("%s " % cluster.name) print("-" * len(cluster.name)) print(" name: %s" % cluster.name) for cls in cluster.nodes: print(" - %s nodes: %d" % (cls, len(cluster.nodes[cls]))) print("")
[docs]class ListTemplates(AbstractCommand): """ List the available templates defined in the configuration file. """ def setup(self, subparsers): parser = subparsers.add_parser( "list-templates", description=self.__doc__, help="Show the templates defined in the configuration file.") parser.set_defaults(func=self) parser.add_argument('-v', '--verbose', action='count', default=0, help="Increase verbosity.") parser.add_argument('clusters', nargs="*", help="List only this cluster. Accepts globbing.") def execute(self): configurator = Configurator.fromConfig( self.params.config, storage_path=self.params.storage) config = configurator.cluster_conf print("""%d cluster templates found in configuration file.""" % len(config)) templates = config.keys() for pattern in self.params.clusters: templates = [t for t in templates if fnmatch(t, pattern)] if self.params.clusters: print("""%d cluter templates found matching pattern(s) '%s'""" % (len(templates), str.join(", ", self.params.clusters))) for template in templates: try: cluster = configurator.create_cluster(template, template) print(""" name: %s""" % template) for nodekind in cluster.nodes: print("%s nodes: %d" % ( nodekind, len(cluster.nodes[nodekind]))) except ConfigurationError, ex: log.warning("unable to load cluster `%s`: %s", template, ex)
[docs]class ListNodes(AbstractCommand): """ Show some information on all the nodes belonging to a given cluster. """ def setup(self, subparsers): parser = subparsers.add_parser( "list-nodes", help="Show information about the nodes in the " "cluster", description=self.__doc__) parser.set_defaults(func=self) parser.add_argument('cluster', help='name of the cluster') parser.add_argument('-v', '--verbose', action='count', default=0, help="Increase verbosity.") parser.add_argument( '-u', '--update', action='store_true', default=False, help="By default `elasticluster list-nodes` will not contact the " "EC2 provider to get up-to-date information, unless `-u` " "option is given.")
[docs] def execute(self): """ Lists all nodes within the specified cluster with certain information like id and ip. """ configurator = Configurator.fromConfig( self.params.config, storage_path=self.params.storage) cluster_name = self.params.cluster try: cluster = configurator.load_cluster(cluster_name) if self.params.update: cluster.update() except (ClusterNotFound, ConfigurationError), ex: log.error("Listing nodes from cluster %s: %s\n" % (cluster_name, ex)) return print(cluster_summary(cluster)) for cls in cluster.nodes: print("%s nodes:" % cls) print("") for node in cluster.nodes[cls]: txt = [" " + i for i in node.pprint().splitlines()] print(' - ' + str.join("\n", txt)[4:]) print("")
[docs]class SetupCluster(AbstractCommand): """ Setup the given cluster by calling the setup provider defined for this cluster. """ def setup(self, subparsers): parser = subparsers.add_parser( "setup", help="Configure the cluster.", description=self.__doc__) parser.set_defaults(func=self) parser.add_argument('cluster', help='name of the cluster') parser.add_argument('-v', '--verbose', action='count', default=0, help="Increase verbosity.") def execute(self): configurator = Configurator.fromConfig( self.params.config, storage_path=self.params.storage) cluster_name = self.params.cluster try: cluster = configurator.load_cluster(cluster_name) cluster.update() except (ClusterNotFound, ConfigurationError), ex: log.error("Setting up cluster %s: %s\n" % (cluster_name, ex)) return print("Configuring cluster `%s`..." % cluster_name) ret = cluster.setup() if ret: print("Your cluster is ready!") else: print("\nWARNING: YOUR CLUSTER IS NOT READY YET!") print(cluster_summary(cluster))
[docs]class SshFrontend(AbstractCommand): """ Connect to the frontend of the cluster using `ssh`. """ def setup(self, subparsers): parser = subparsers.add_parser( "ssh", help="Connect to the frontend of the cluster using the " "`ssh` command", description=self.__doc__) parser.set_defaults(func=self) parser.add_argument('cluster', help='name of the cluster') parser.add_argument('-v', '--verbose', action='count', default=0, help="Increase verbosity.") parser.add_argument('ssh_args', metavar='args', nargs='*', help="Execute the following command on the remote " "machine instead of opening an interactive shell.") def execute(self): configurator = Configurator.fromConfig( self.params.config, storage_path=self.params.storage) cluster_name = self.params.cluster try: cluster = configurator.load_cluster(cluster_name) cluster.update() except (ClusterNotFound, ConfigurationError), ex: log.error("Setting up cluster %s: %s\n" % (cluster_name, ex)) return try: frontend = cluster.get_frontend_node() # ensure we can connect to the host if not frontend.preferred_ip: # Ensure we can connect to the node, and save the value of `preferred_ip` ssh = frontend.connect() if ssh: ssh.close() cluster.repository.save_or_update(cluster) except NodeNotFound, ex: log.error("Unable to connect to the frontend node: %s" % str(ex)) sys.exit(1) host = frontend.connection_ip() username = frontend.image_user log.warning("Ignoring known_hosts file.") ssh_cmdline = ["ssh", "-i", frontend.user_key_private, "-o", "UserKnownHostsFile=/dev/null", "-o", "StrictHostKeyChecking=no", '%s@%s' % (username, host)] ssh_cmdline.extend(self.params.ssh_args) log.debug("Running command `%s`" % str.join(' ', ssh_cmdline)) os.execlp("ssh", *ssh_cmdline)
[docs]class SftpFrontend(AbstractCommand): """ Open an SFTP session to the cluster frontend host. """ def setup(self, subparsers): parser = subparsers.add_parser( "sftp", help="Open an SFTP session to the cluster frontend host.", description=self.__doc__) parser.set_defaults(func=self) parser.add_argument('cluster', help='name of the cluster') parser.add_argument('-v', '--verbose', action='count', default=0, help="Increase verbosity.") parser.add_argument('sftp_args', metavar='args', nargs='*', help="Arguments to pass to ftp, instead of " "opening an interactive shell.") def execute(self): configurator = Configurator.fromConfig( self.params.config, storage_path=self.params.storage) cluster_name = self.params.cluster try: cluster = configurator.load_cluster(cluster_name) cluster.update() except (ClusterNotFound, ConfigurationError), ex: log.error("Setting up cluster %s: %s\n" % (cluster_name, ex)) return try: frontend = cluster.get_frontend_node() except NodeNotFound, ex: log.error("Unable to connect to the frontend node: %s" % str(ex)) sys.exit(1) host = frontend.connection_ip() username = frontend.image_user sftp_cmdline = ["sftp", "-o", "UserKnownHostsFile=/dev/null", "-o", "StrictHostKeyChecking=no", "-o", "IdentityFile=%s" % frontend.user_key_private] sftp_cmdline.extend(self.params.sftp_args) sftp_cmdline.append('%s@%s' % (username, host)) os.execlp("sftp", *sftp_cmdline)