Source code for elasticluster.providers.ec2_boto

#
# Copyright (C) 2013 GC3, University of Zurich
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
__author__ = 'Nicolas Baer <nicolas.baer@uzh.ch>, Antonio Messina <antonio.s.messina@gmail.com>'

# System imports
import os
import urllib
import threading

# External modules
import boto
from boto import ec2
from paramiko import DSSKey, RSAKey, PasswordRequiredException
from paramiko.ssh_exception import SSHException

# Elasticluster imports
from elasticluster import log
from elasticluster.providers import AbstractCloudProvider
from elasticluster.exceptions import SecurityGroupError, KeypairError, \
    ImageError, InstanceError, ClusterError


[docs]class BotoCloudProvider(AbstractCloudProvider): """This implementation of :py:class:`elasticluster.providers.AbstractCloudProvider` uses the boto ec2 interface to connect to ec2 compliant clouds and manage instances. Please check https://github.com/boto/boto for further information about the supported cloud platforms. :param str ec2_url: url to connect to cloud web service :param str ec2_region: region identifier :param str ec2_access_key: access key of the user account :param str ec2_secret_key: secret key of the user account :param str storage_path: path to store temporary data :param bool request_floating_ip: Whether ip are assigned automatically `True` or floating ips have to be assigned manually `False` """ __node_start_lock = threading.Lock() # lock used for node startup def __init__(self, ec2_url, ec2_region, ec2_access_key, ec2_secret_key, storage_path=None, request_floating_ip=False): self._url = ec2_url self._region_name = ec2_region self._access_key = ec2_access_key self._secret_key = ec2_secret_key self.request_floating_ip = request_floating_ip # read all parameters from url proto, opaqueurl = urllib.splittype(ec2_url) self._host, self._ec2path = urllib.splithost(opaqueurl) self._ec2host, port = urllib.splitport(self._host) if port: port = int(port) self._ec2port = port if proto == "https": self._secure = True else: self._secure = False # will be initialized upon first connect self._connection = None self._region = None self._instances = {} self._cached_instances = [] self._images = None def _connect(self): """Connects to the ec2 cloud provider :return: :py:class:`boto.ec2.connection.EC2Connection` :raises: Generic exception on error """ # check for existing connection if self._connection: return self._connection try: log.debug("Connecting to ec2 host %s", self._ec2host) region = ec2.regioninfo.RegionInfo(name=self._region_name, endpoint=self._ec2host) # connect to webservice self._connection = boto.connect_ec2( aws_access_key_id=self._access_key, aws_secret_access_key=self._secret_key, is_secure=self._secure, host=self._ec2host, port=self._ec2port, path=self._ec2path, region=region) # list images to see if the connection works log.debug("Connection has been successful.") # images = self._connection.get_all_images() # log.debug("%d images found on cloud %s", # len(images), self._ec2host) except Exception as e: log.error("connection to cloud could not be " "established: message=`%s`", str(e)) raise return self._connection
[docs] def start_instance(self, key_name, public_key_path, private_key_path, security_group, flavor, image_id, image_userdata, username=None, node_name=None, **kwargs): """Starts a new instance on the cloud using the given properties. The following tasks are done to start an instance: * establish a connection to the cloud web service * check ssh keypair and upload it if it does not yet exist. This is a locked process, since this function might be called in multiple threads and we only want the key to be stored once. * check if the security group exists * run the instance with the given properties :param str key_name: name of the ssh key to connect :param str public_key_path: path to ssh public key :param str private_key_path: path to ssh private key :param str security_group: firewall rule definition to apply on the instance :param str flavor: machine type to use for the instance :param str image_id: image type (os) to use for the instance :param str image_userdata: command to execute after startup :param str username: username for the given ssh key, default None :return: str - instance id of the started instance """ connection = self._connect() log.debug("Checking keypair `%s`.", key_name) # the `_check_keypair` method has to be called within a lock, # since it will upload the key if it does not exist and if this # happens for every node at the same time ec2 will throw an error # message (see issue #79) with BotoCloudProvider.__node_start_lock: self._check_keypair(key_name, public_key_path, private_key_path) log.debug("Checking security group `%s`.", security_group) self._check_security_group(security_group) # image_id = self._find_image_id(image_id) try: reservation = connection.run_instances( image_id, key_name=key_name, security_groups=[security_group], instance_type=flavor, user_data=image_userdata) except Exception, ex: log.error("Error starting instance: %s", ex) if "TooManyInstances" in ex: raise ClusterError(ex) else: raise InstanceError(ex) vm = reservation.instances[-1] vm.add_tag("Name", node_name) # cache instance object locally for faster access later on self._instances[vm.id] = vm return vm.id
[docs] def stop_instance(self, instance_id): """Stops the instance gracefully. :param str instance_id: instance identifier """ instance = self._load_instance(instance_id) instance.terminate() del self._instances[instance_id]
[docs] def get_ips(self, instance_id): """Retrieves the private and public ip addresses for a given instance. :return: list (ips) """ self._load_instance(instance_id) instance = self._load_instance(instance_id) IPs = [ip for ip in instance.private_ip_address, instance.ip_address if ip] # We also need to check if there is any floating IP associated if self.request_floating_ip: # We need to list the floating IPs for this instance floating_ips = [ip for ip in self._connection.get_all_addresses() if ip.instance_id == instance.id] if not floating_ips: log.debug("Public ip address has to be assigned through " "elasticluster.") ip = self._allocate_address(instance) # This is probably the preferred IP we want to use IPs.insert(0, ip) else: IPs = [ip.public_ip for ip in floating_ips] + IPs return list(set(IPs))
[docs] def is_instance_running(self, instance_id): """Checks if the instance is up and running. :param str instance_id: instance identifier :return: bool - True if running, False otherwise """ instance = self._load_instance(instance_id) if instance.update() == "running": # If the instance is up&running, ensure it has an IP # address. if not instance.ip_address and self.request_floating_ip: log.debug("Public ip address has to be assigned through " "elasticluster.") self._allocate_address(instance) instance.update() return True else: return False
def _allocate_address(self, instance): """Allocates a free public ip address to the given instance :param instance: instance to assign address to :type instance: py:class:`boto.ec2.instance.Reservation` :return: public ip address """ connection = self._connect() free_addresses = [ ip for ip in connection.get_all_addresses() if not ip.instance_id] if not free_addresses: try: address = connection.allocate_address() except Exception, ex: log.error("Unable to allocate a public IP address to instance `%s`", instance.id) return None try: address = free_addresses.pop() instance.use_ip(address) return address.public_ip except Exception, ex: log.error("Unable to associate IP address %s to instance `%s`", address, instance.id) return None def _load_instance(self, instance_id): """Checks if an instance with the given id is cached. If not it will connect to the cloud and put it into the local cache _instances. :param str instance_id: instance identifier :return: py:class:`boto.ec2.instance.Reservation` - instance :raises: `InstanceError` is returned if the instance can't be found in the local cache or in the cloud. """ connection = self._connect() if instance_id in self._instances: return self._instances[instance_id] # Instance not in the internal dictionary. # First, check the internal cache: if instance_id not in [i.id for i in self._cached_instances]: # Refresh the cache, just in case self._cached_instances = [] reservations = connection.get_all_instances() for res in reservations: self._cached_instances.extend(res.instances) for inst in self._cached_instances: if inst.id == instance_id: self._instances[instance_id] = inst return inst # If we reached this point, the instance was not found neither # in the cache or on the website. raise InstanceError("the given instance `%s` was not found " "on the coud" % instance_id) def _check_keypair(self, name, public_key_path, private_key_path): """First checks if the keypair is valid, then checks if the keypair is registered with on the cloud. If not the keypair is added to the users ssh keys. :param str name: name of the ssh key :param str public_key_path: path to the ssh public key file :param str private_key_path: path to the ssh private key file :raises: `KeypairError` if key is not a valid RSA or DSA key, the key could not be uploaded or the fingerprint does not match to the one uploaded to the cloud. """ connection = self._connect() keypairs = connection.get_all_key_pairs() keypairs = dict((k.name, k) for k in keypairs) # decide if dsa or rsa key is provided pkey = None is_dsa_key = False try: pkey = DSSKey.from_private_key_file(private_key_path) is_dsa_key = True except PasswordRequiredException: log.warning( "Unable to check key file `%s` because it is encrypted with a " "password. Please, ensure that you added it to the SSH agent " "with `ssh-add %s`", private_key_path, private_key_path) except SSHException: try: pkey = RSAKey.from_private_key_file(private_key_path) except PasswordRequiredException: log.warning( "Unable to check key file `%s` because it is encrypted with a " "password. Please, ensure that you added it to the SSH agent " "with `ssh-add %s`", private_key_path, private_key_path) except SSHException: raise KeypairError('File `%s` is neither a valid DSA key ' 'or RSA key.' % private_key_path) # create keys that don't exist yet if name not in keypairs: log.warning( "Keypair `%s` not found on resource `%s`, Creating a new one", name, self._url) with open(os.path.expanduser(public_key_path)) as f: key_material = f.read() try: # check for DSA on amazon if "amazon" in self._ec2host and is_dsa_key: log.error( "Apparently, amazon does not support DSA keys. " "Please specify a valid RSA key.") raise KeypairError( "Apparently, amazon does not support DSA keys." "Please specify a valid RSA key.") connection.import_key_pair(name, key_material) except Exception, ex: log.error( "Could not import key `%s` with name `%s` to `%s`", name, public_key_path, self._url) raise KeypairError( "could not create keypair `%s`: %s" % (name, ex)) else: # check fingerprint cloud_keypair = keypairs[name] if pkey: fingerprint = str.join( ':', (i.encode('hex') for i in pkey.get_fingerprint())) if fingerprint != cloud_keypair.fingerprint: if "amazon" in self._ec2host: log.error( "Apparently, Amazon does not compute the RSA key " "fingerprint as we do! We cannot check if the " "uploaded keypair is correct!") else: raise KeypairError( "Keypair `%s` is present but has " "different fingerprint. Aborting!" % name) def _check_security_group(self, name): """Checks if the security group exists. :param str name: name of the security group :raises: `SecurityGroupError` if group does not exist """ connection = self._connect() security_groups = connection.get_all_security_groups() if not security_groups: raise SecurityGroupError( "the specified security group %s does not exist" % name) security_groups = dict((s.name, s) for s in security_groups) if name not in security_groups: raise SecurityGroupError( "the specified security group %s does not exist" % name) def _find_image_id(self, image_id): """Finds an image id to a given id or name. :param str image_id: name or id of image :return: str - identifier of image """ if not self._images: connection = self._connect() self._images = connection.get_all_images() image_id_cloud = None for i in self._images: if i.id == image_id or i.name == image_id: image_id_cloud = i.id break if image_id_cloud: return image_id_cloud else: raise ImageError( "Could not find given image id `%s`" % image_id)