Source code for elasticluster.repository

# -*- coding: utf-8 -*-#
# Copyright (C) 2013, 2015, 2019  University of Zurich.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
__author__ = 'Nicolas Baer <nicolas.baer@uzh.ch>, Antonio Messina <antonio.s.messina@gmail.com>'

# compatibility imports
from future.utils import with_metaclass

# stdlib imports
from builtins import object
import os
import pickle
from abc import ABCMeta, abstractmethod
import glob
import json
import yaml

# Elasticluster imports
from elasticluster import log
from elasticluster.cluster import Cluster
from elasticluster.exceptions import ClusterNotFound, ClusterError


[docs]def migrate_cluster(cluster): """Called when loading a cluster when it comes from an older version of elasticluster""" for old, new in [('_user_key_public', 'user_key_public'), ('_user_key_private', 'user_key_private'), ('_user_key_name', 'user_key_name'),]: if hasattr(cluster, old): setattr(cluster, new, getattr(cluster, old)) delattr(cluster, old) for kind, nodes in cluster.nodes.items(): for node in nodes: if hasattr(node, 'image'): image_id = getattr(node, 'image_id', None) or node.image setattr(node, 'image_id', image_id) delattr(node, 'image') # Possibly related to issue #129 if not hasattr(cluster, 'thread_pool_max_size'): cluster.thread_pool_max_size = 10 return cluster
[docs]class AbstractClusterRepository(with_metaclass(ABCMeta, object)): """Defines the contract for a cluster repository to store clusters in a persistent state. """
[docs] @abstractmethod def save_or_update(self, cluster): """Save or update the cluster in a persistent state. Elasticluster will call this method multiple times, so the implementation should handle save and update seamlessly :param cluster: cluster object to store :type cluster: :py:class:`elasticluster.cluster.Cluster` """ pass
[docs] @abstractmethod def get(self, name): """Retrieves the cluster by the given name. :param str name: name of the cluster (identifier) :return: instance of :py:class:`elasticluster.cluster.Cluster` that matches the given name """ pass
[docs] @abstractmethod def get_all(self): """Retrieves all stored clusters from the persistent state. :return: list of :py:class:`elasticluster.cluster.Cluster` """ pass
[docs] @abstractmethod def delete(self, cluster): """Deletes the cluster from persistent state. :param cluster: cluster to delete from persistent state :type cluster: :py:class:`elasticluster.cluster.Cluster` """ pass
[docs]class MemRepository(AbstractClusterRepository): """ This implementation of :py:class:`AbstractClusterRepository` stores the clusters in memory, without actually saving the data on disk. """ def __init__(self): self.clusters = {}
[docs] def save_or_update(self, cluster): """Save or update the cluster in a memory. :param cluster: cluster object to store :type cluster: :py:class:`elasticluster.cluster.Cluster` """ self.clusters[cluster.name] = cluster
[docs] def get(self, name): """Retrieves the cluster by the given name. :param str name: name of the cluster (identifier) :return: instance of :py:class:`elasticluster.cluster.Cluster` that matches the given name """ if name not in self.clusters: raise ClusterNotFound("Cluster %s not found." % name) return self.clusters.get(name)
[docs] def get_all(self): """Retrieves all stored clusters from the memory. :return: list of :py:class:`elasticluster.cluster.Cluster` """ return list(self.clusters.values())
[docs] def delete(self, cluster): """Deletes the cluster from memory. :param cluster: cluster to delete :type cluster: :py:class:`elasticluster.cluster.Cluster` """ if cluster.name not in self.clusters: raise ClusterNotFound( "Unable to delete non-existent cluster %s" % cluster.name) del self.clusters[cluster.name]
[docs]class DiskRepository(AbstractClusterRepository): """This is a generic repository class that assumes each cluster is saved on a file on disk. It only defines a few methods, to avoid duplication of code. """ def __init__(self, storage_path): storage_path = os.path.expanduser(storage_path) storage_path = os.path.expandvars(storage_path) self.storage_path = storage_path
[docs] def get_all(self): """Retrieves all clusters from the persistent state. :return: list of :py:class:`elasticluster.cluster.Cluster` """ clusters = [] cluster_files = glob.glob("%s/*.%s" % (self.storage_path, self.file_ending)) for fname in cluster_files: try: name = fname[:-len(self.file_ending)-1] clusters.append(self.get(name)) except (ImportError, AttributeError) as ex: log.error("Unable to load cluster %s: `%s`", fname, ex) log.error("If cluster %s was created with a previous version of elasticluster, you may need to run `elasticluster migrate %s %s` to update it.", cluster_file, self.storage_path, fname) return clusters
def _get_cluster_storage_path(self, name): cluster_file = '%s.%s' % (name, self.file_ending) return os.path.join(self.storage_path, cluster_file)
[docs] def get(self, name): """Retrieves the cluster with the given name. :param str name: name of the cluster (identifier) :return: :py:class:`elasticluster.cluster.Cluster` """ path = self._get_cluster_storage_path(name) try: with open(path, self.open_mode_for_loading) as storage: cluster = self.load(storage) # Compatibility with previous version of Node for node in sum(cluster.nodes.values(), []): if not hasattr(node, 'ips'): log.debug("Monkey patching old version of `Node` class: %s", node.name) node.ips = [node.ip_public, node.ip_private] node.preferred_ip = None cluster.storage_file = path return cluster except (IOError, OSError, ClusterNotFound) as err: raise ClusterNotFound( "Error reading cluster state file `%s`: %s" % (path, err))
[docs] def save_or_update(self, cluster): """Save or update the cluster to persistent state. :param cluster: cluster to save or update :type cluster: :py:class:`elasticluster.cluster.Cluster` """ if not os.path.exists(self.storage_path): os.makedirs(self.storage_path) path = self._get_cluster_storage_path(cluster.name) cluster.storage_file = path with open(path, self.open_mode_for_saving) as storage: self.dump(cluster, storage)
[docs] def delete(self, cluster): """Deletes the cluster from persistent state. :param cluster: cluster to delete from persistent state :type cluster: :py:class:`elasticluster.cluster.Cluster` """ path = self._get_cluster_storage_path(cluster.name) if os.path.exists(path): os.unlink(path)
[docs]class PickleRepository(DiskRepository): """This implementation of :py:class:`AbstractClusterRepository` stores the cluster on the local disc using pickle. Therefore the cluster object and all its dependencies will be saved in a pickle (binary) file. :param str storage_path: path to the folder to store the cluster information """ file_ending = 'pickle' open_mode_for_loading = 'rb' open_mode_for_saving = 'wb' def __init__(self, storage_path): DiskRepository.__init__(self, storage_path) self.repository_types = [PickleRepository]
[docs] def load(self, fp): """Load cluster from file descriptor fp""" cluster = pickle.load(fp) cluster.repository = self return cluster
@staticmethod def dump(cluster, fp): pickle.dump(cluster, fp, pickle.HIGHEST_PROTOCOL)
[docs]class JsonRepository(DiskRepository): """This implementation of :py:class:`AbstractClusterRepository` stores the cluster on a file in json format. :param str storage_path: path to the folder to store the cluster information """ file_ending = 'json' open_mode_for_loading = 'r' open_mode_for_saving = 'w' def load(self, fp): data = json.load(fp) cluster = Cluster(**data) cluster.repository = self return cluster @staticmethod def dump(cluster, fp): state = cluster.to_dict(omit=( '_cloud_provider', '_naming_policy', '_setup_provider', 'repository', 'storage_file', )) json.dump(state, fp, default=dict, indent=4)
[docs]class YamlRepository(DiskRepository): """This implementation of :py:class:`AbstractClusterRepository` stores the cluster on a file in yaml format. :param str storage_path: path to the folder to store the cluster information """ file_ending = 'yaml' open_mode_for_loading = 'r' open_mode_for_saving = 'w' def load(self, fp): data = yaml.safe_load(fp) if not data: raise ClusterError("Empty cluster state file: {0}".format(fp.name)) cluster = Cluster(**data) cluster.repository = self return cluster @staticmethod def dump(cluster, fp): state = cluster.to_dict(omit=( '_cloud_provider', '_naming_policy', '_setup_provider', 'repository', 'storage_file', )) # FIXME: This round-trip to JSON and back is used to # deep-convert the contents of `state` into basic Python # types, so that PyYAML can handle serialization without # additional hints. It should be rewritten to use PyYAML's # native "representers" mechanism, see: # http://pyyaml.org/wiki/PyYAMLDocumentation#Constructorsrepresentersresolvers state = json.loads(json.dumps(state, default=dict)) yaml.safe_dump(state, fp, default_flow_style=False, indent=4)
[docs]class MultiDiskRepository(AbstractClusterRepository): """ This class is able to deal with multiple type of storage types. """ storage_type_map = {'pickle': PickleRepository, 'json': JsonRepository, 'yaml': YamlRepository} def __init__(self, storage_path, default_store='yaml'): storage_path = os.path.expanduser(storage_path) storage_path = os.path.expandvars(storage_path) self.storage_path = storage_path try: self.default_store = self.storage_type_map[default_store] except KeyError: raise ValueError( "Invalid storage type %s. Allowed values: %s" % ( default_store, ', '.join(self.storage_type_map)))
[docs] def get_all(self): clusters = [] for cls in self.storage_type_map.values(): cluster_files = glob.glob( '%s/*.%s' % (self.storage_path, cls.file_ending)) for fname in cluster_files: try: store = cls(self.storage_path) name = fname[:-len(store.file_ending)-1] cluster = store.get(name) cluster = migrate_cluster(cluster) clusters.append(cluster) except (ImportError, AttributeError) as ex: log.error("Unable to load cluster %s: `%s`", fname, ex) log.error("If cluster %s was created with a previous version of elasticluster, you may need to run `elasticluster migrate %s %s` to update it.", fname, self.storage_path, fname) except (ClusterError, ClusterNotFound) as cx: log.error("Unable to load cluster {0}: `{1}`".format(fname,cx)) return clusters
def _get_store_by_name(self, name): """Return an instance of the correct DiskRepository based on the *first* file that matches the standard syntax for repository files""" for cls in self.storage_type_map.values(): filename = os.path.join(self.storage_path, '{name}.{ending}'.format( name=name, ending=cls.file_ending)) if os.path.exists(filename): try: return cls(self.storage_path) except: continue raise ClusterNotFound( "No state file for cluster `{name}` was found in directory `{path}`" .format(name=name, path=self.storage_path))
[docs] def get(self, name): store = self._get_store_by_name(name) return store.get(name)
[docs] def save_or_update(self, cluster): if cluster.repository is self: # That's a pity, we have to find out the best class try: store = self._get_store_by_name(cluster.name) except ClusterNotFound: # Use one of the substores store = self.default_store(self.storage_path) store.save_or_update(cluster)
[docs] def delete(self, cluster): store = self._get_store_by_name(name) store.delete(cluster)