Source code for elasticluster.providers.gce

#
# Copyright (C) 2013 GC3, University of Zurich
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
"""
Cloud provider for the Google Compute Engine.

See <https://code.google.com/p/google-cloud-platform-samples/source/browse/python-client-library-example/gce.py?repo=compute> for reference.
"""
__author__ = 'Riccardo Murri <riccardo.murri@uzh.ch>, ' \
             'Nicolas Baer <nicolas.baer@gmail.com>, '  \
             'Antonio Messina <antonio.s.messina@gmail.com>'


# stdlib imports
import copy
import httplib2
import os
import random
import threading
import time
import uuid

# External modules
from apiclient.discovery import build
from apiclient.errors import HttpError
from oauth2client.file import Storage
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.tools import run

# Elasticluster imports
from elasticluster import log
from elasticluster.providers import AbstractCloudProvider
from elasticluster.exceptions import ImageError, InstanceError, CloudProviderError


# constants and defaults

#: the OAuth scope for the GCE web API
GCE_SCOPE = 'https://www.googleapis.com/auth/compute'
GCE_API_NAME = 'compute'
GCE_API_VERSION = 'v1'
GCE_URL = 'https://www.googleapis.com/compute/%s/projects/' % GCE_API_VERSION
GCE_DEFAULT_ZONE = 'us-central1-a'
GCE_DEFAULT_SERVICE_EMAIL = 'default'
GCE_DEFAULT_SCOPES = ['https://www.googleapis.com/auth/devstorage'
                      '.full_control',
                      'https://www.googleapis.com/auth/compute']


[docs]class GoogleCloudProvider(AbstractCloudProvider): """Cloud provider for the Google Compute Engine. :param str gce_client_id: Client ID to use in OAuth authentication. :param str gce_client_secret: Client secret (password) to use in OAuth authentication. :param str gce_project_id: Project name to log in to GCE. :param str zone: gce zone, default is `us-central1-a` :param str network: network to use, default is `default` :param str email: service email to use, default is `default` :param str storage_path: path to store authentication data (oauth.dat file). If no path is specified, the login data has to be entered after every request. """ __gce_lock = threading.Lock() def __init__(self, gce_client_id, gce_client_secret, gce_project_id, zone=GCE_DEFAULT_ZONE, network='default', email=GCE_DEFAULT_SERVICE_EMAIL, storage_path=None): self._client_id = gce_client_id self._client_secret = gce_client_secret self._project_id = gce_project_id self._zone = zone self._network = network self._email = email self._storage_path = storage_path # will be initialized upon first connect self._gce = None self._auth_http = None self._instances = {} self._cached_instances = [] self._images = None def _connect(self): """Connects to the cloud web services. If this is the first authentication, a web browser will be started to authenticate against google and provide access to elasticluster. :return: A Resource object with methods for interacting with the service. """ # check for existing connection with GoogleCloudProvider.__gce_lock: if self._gce: return self._gce flow = OAuth2WebServerFlow(self._client_id, self._client_secret, GCE_SCOPE) # The `Storage` object holds the credentials that your # application needs to authorize access to the user's # data. The name of the credentials file is provided. If the # file does not exist, it is created. This object can only # hold credentials for a single user. It stores the access # priviledges for the application, so a user only has to grant # access through the web interface once. storage_path = os.path.join(self._storage_path, self._client_id + '.oauth.dat') storage = Storage(storage_path) credentials = storage.get() if credentials is None or credentials.invalid: # try to start a browser to have the user authenticate with Google # TODO: what kind of exception is raised if the browser # cannot be started? credentials = run(flow, storage) http = httplib2.Http() self._auth_http = credentials.authorize(http) self._gce = build(GCE_API_NAME, GCE_API_VERSION, http=http) return self._gce def _execute_request(self, request): """Helper method to execute a request, since a lock should be used to not fire up multiple requests at the same time. :return: Result of `request.execute` """ with GoogleCloudProvider.__gce_lock: return request.execute(http=self._auth_http) # The following function was adapted from # https://developers.google.com/compute/docs/api/python_guide # (function _blocking_call) def _wait_until_done(self, response, wait=30): """Blocks until the operation status is done for the given operation. :param response: The response object used in a previous GCE call. :param int wait: Wait up to this number of seconds in between successive polling of the GCE status. """ gce = self._connect() status = response['status'] while status != 'DONE' and response: # wait a random amount of time (up to `wait` seconds) if wait: time.sleep(1 + random.randrange(wait)) operation_id = response['name'] # Identify if this is a per-zone resource if 'zone' in response: zone_name = response['zone'].split('/')[-1] request = gce.zoneOperations().get( project=self._project_id, operation=operation_id, zone=zone_name) else: request = gce.globalOperations().get( project=self._project_id, operation=operation_id) response = self._execute_request(request) if response: status = response['status'] return response
[docs] def start_instance(self, # these are common to any # CloudProvider.start_instance() call key_name, public_key_path, private_key_path, security_group, flavor, image_id, image_userdata, username=None, # these params are specific to the # GoogleCloudProvider instance_name=None, **kwargs): """Starts a new instance with the given properties and returns the instance id. :param str key_name: name of the ssh key to connect :param str public_key_path: path to ssh public key :param str private_key_path: path to ssh private key :param str security_group: firewall rule definition to apply on the instance :param str flavor: machine type to use for the instance :param str image_id: image type (os) to use for the instance :param str image_userdata: command to execute after startup :param str username: username for the given ssh key, default None :param str instance_name: name of the instance :return: str - instance id of the started instance """ # construct URLs project_url = '%s%s' % (GCE_URL, self._project_id) machine_type_url = '%s/zones/%s/machineTypes/%s' \ % (project_url, self._zone, flavor) network_url = '%s/global/networks/%s' % (project_url, self._network) os = image_id.split("-")[0] os_cloud = "%s-cloud" % os image_url = '%s%s/global/images/%s' % ( GCE_URL, os_cloud, image_id) # construct the request body if instance_name is None: instance_name = 'elasticluster-%s' % uuid.uuid4() public_key_content = file(public_key_path).read() instance = { 'name': instance_name, 'machineType': machine_type_url, 'disks': [{ 'autoDelete': 'true', 'boot': 'true', 'type': 'PERSISTENT', 'initializeParams' : { 'diskName': "%s-disk" % instance_name, 'sourceImage': image_url } }], 'networkInterfaces': [ {'accessConfigs': [ {'type': 'ONE_TO_ONE_NAT', 'name': 'External NAT' }], 'network': network_url }], 'serviceAccounts': [ {'email': self._email, 'scopes': GCE_DEFAULT_SCOPES }], "metadata": { "kind": "compute#metadata", "items": [ { "key": "sshKeys", "value": "%s:%s" % (username, public_key_content) } ] } } # create the instance gce = self._connect() request = gce.instances().insert( project=self._project_id, body=instance, zone=self._zone) try: response = self._execute_request(request) response = self._wait_until_done(response) self._check_response(response) return instance_name except (HttpError, CloudProviderError) as e: log.error("Error creating instance `%s`" % e) raise InstanceError("Error creating instance `%s`" % e)
[docs] def stop_instance(self, instance_id): """Stops the instance gracefully. :param str instance_id: instance identifier :raises: `InstanceError` if instance can not be stopped """ gce = self._connect() try: request = gce.instances().delete(project=self._project_id, instance=instance_id, zone=self._zone) response = self._execute_request(request) self._check_response(response) except (HttpError, CloudProviderError) as e: raise InstanceError("Could not stop instance `%s`: `%s`" % (instance_id, e))
[docs] def list_instances(self, filter=None): """List instances on GCE, optionally filtering the results. :param str filter: Filter specification; see https://developers.google.com/compute/docs/reference/latest/instances/list for details. :return: list of instances """ gce = self._connect() try: request = gce.instances().list( project=self._project_id, filter=filter, zone=self._zone) response = self._execute_request(request) self._check_response(response) except (HttpError, CloudProviderError) as e: raise InstanceError("could not retrieve all instances on the " "cloud: ``" % e) if response and 'items' in response: return response['items'] else: return list()
[docs] def get_ips(self, instance_id): """Retrieves the ip addresses (private and public) from the cloud provider by the given instance id. :param str instance_id: id of the instance :return: list (ips) :raises: InstanceError if the ip could not be retrieved. """ gce = self._connect() instances = gce.instances() try: request = instances.get(instance=instance_id, project=self._project_id, zone=self._zone) response = self._execute_request(request) ip_private = None ip_public = None if response and "networkInterfaces" in response: interfaces = response['networkInterfaces'] if interfaces: ip_private = interfaces[0]['networkIP'] if "accessConfigs" in interfaces[0]: ip_public = interfaces[0]['accessConfigs'][0]['natIP'] if ip_private and ip_public: return [ip_private, ip_public] else: raise InstanceError("could not retrieve the ip address for " "node `%s`, please check the node " "through the cloud provider interface" % instance_id) except (HttpError, CloudProviderError) as e: raise InstanceError('could not retrieve the ip address of `%s`: ' '`%s`' % (instance_id, e))
[docs] def is_instance_running(self, instance_id): """Check whether the instance is up and running. :param str instance_id: instance identifier :reutrn: True if instance is running, False otherwise """ items = self.list_instances(filter=('name eq "%s"' % instance_id)) for item in items: if item['status'] == 'RUNNING': return True return False
def _get_image_url(self, image_id): """Gets the url for the specified image. Unfortunatly this only works for images uploaded by the user. The images provided by google will not be found. :param str image_id: image identifier :return: str - api url of the image """ gce = self._connect() filter = "name eq %s" % image_id request = gce.images().list(project=self._project_id, filter=filter) response = self._execute_request(request) response = self._wait_until_done(response) image_url = None if "items" in response: image_url = response["items"][0]["selfLink"] if image_url: return image_url else: raise ImageError("Could not find given image id `%s`" % image_id) def _check_response(self, response): """Checks the response from GCE for error messages. :param response: GCE response :return: nothing :raises: CloudProviderError with error message from GCE """ if "error" in response: error = response['error']['errors'][0]['message'] raise CloudProviderError("The following error occurred while " "interacting with the cloud provider " "`%s`" % error) def __getstate__(self): """ Overwrites the default dictionary for pickle. Only the gce connection is reset in this method in order to enforce a reconnect. """ pickle_dict = copy.deepcopy(self.__dict__) # the gce connection might be lost when unpickling, therefore we just # save an empty gce connection to mitigate the problems in the first # place. pickle_dict['_gce'] = None return pickle_dict