Source code for elasticluster.providers.azure_provider

#!/usr/bin/env python
# -*- coding: utf-8 -*-#
# Copyright (C) 2018-2019 University of Zurich. All rights reserved.
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

__docformat__ = 'reStructuredText'
__author__ = ', '.join([
    'Riccardo Murri <>'

# stdlib imports
from builtins import object
import hashlib
import json
import os
import re
import threading
from warnings import warn
import sys
from time import sleep

    from azure.common.credentials import ServicePrincipalCredentials
    from azure.mgmt.compute import ComputeManagementClient
    from import NetworkManagementClient
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.resource.resources.models import DeploymentMode
    from msrestazure.azure_exceptions import CloudError
except ImportError:
    # handle missing libraries in Python 2.6: delay ImportError until
    # actually used but raise a warning in the meantime
    class _Unavailable(object):
        Delay `ImportError` until actually used.

        Still, raise a warning when instanciated.
        def __init__(self, missing):
            self.__missing = missing
            warn("Trying to initialize `{module}` which is not available."
                 " A placeholder object will be used instead, but it will raise"
                 " `ImportError` later if there is any actual attempt at using it."
                 .format(module=self.__missing), ImportWarning)
        def __call__(self, *args, **kwargs):
            raise ImportError(
                "Trying to actually use client class from module `{module}`"
                " which could not be imported. Aborting."
    ServicePrincipalCredentials = _Unavailable("azure.common.credentials")
    ResourceManagementClient = _Unavailable("azure.mgmt.resource")
    NetworkManagementClient = _Unavailable("")
    ComputeManagementClient = _Unavailable("azure.mgmt.compute")
    CloudError = _Unavailable("msrestazure.azure_exceptions")

from paramiko import DSSKey, RSAKey, PasswordRequiredException
from paramiko.ssh_exception import SSHException

if sys.version_info[:2] != (3, 5):
    from pkg_resources import resource_string
    # on Python 3.5, `pkg_resources.resource_string` returns a `bytes`
    # object, which cannot directly be fed into `json.loads()`
    from pkg_resources import resource_string as _resource_string
    def resource_string(pkgname, resource):
        return _resource_string(pkgname, resource).decode('utf-8')

# Elasticluster imports
from elasticluster import log
from elasticluster.utils import memoize
from elasticluster.providers import AbstractCloudProvider
from elasticluster.exceptions import (

[docs]class AzureCloudProvider(AbstractCloudProvider): """ Use the Azure Python SDK to connect to the Azure clouds and manage virtual machines. An AzureCloudProvider owns a tree of Azure resources, rooted in one or more subscriptions and one or more storage accounts. """ __compiled_pattern_for_names = re.compile("^[a-z][a-z0-9-]{1,61}[a-z0-9]$") __lock = threading.Lock() """ Lock used for node startup. """ def __init__(self, subscription_id, tenant_id, client_id, secret, location, vm_deployment_template=None, net_deployment_template=None, **extra): self.subscription_id = subscription_id self.tenant_id = tenant_id self.client_id = client_id self.secret = secret self.location = location if vm_deployment_template: try: with open(vm_deployment_template) as template_file: self.vm_deployment_template = json.load(template_file) except Exception as err: raise ConfigurationError( "Could not load VM deployment template file `{0}`" " in provider for Azure cloud: {1}" .format(vm_deployment_template, err)) else: # Azure Resource Manager template for starting a new VM. # Initially taken from: # # Copyright (c) 2015 Microsoft Corporation self.vm_deployment_template = json.loads(resource_string( 'elasticluster', 'share/etc/azure_vm_template.json')) if net_deployment_template: try: with open(net_deployment_template) as template_file: self.net_deployment_template = json.load(template_file) except Exception as err: raise ConfigurationError( "Could not load net deployment template file `{0}`" " in provider for Azure cloud: {1}" .format(net_deployment_template, err)) else: # Azure Resource Manager template for creating a new network. self.net_deployment_template = json.loads(resource_string( 'elasticluster', 'share/etc/azure_net_template.json')) # these will be initialized later by `_init_az_api()` self._compute_client = None self._network_client = None self._resource_client = None # local state self._inventory = {} self._vm_details = {} self._resource_groups_created = set() self._networks_created = set()
[docs] def to_vars_dict(self): """ Return local state which is relevant for the cluster setup process. """ return { 'azure_client_id': self.client_id, 'azure_location': self.location, 'azure_secret': self.secret, 'azure_subscription_id': self.subscription_id, 'azure_tenant_id': self.tenant_id, }
def _init_az_api(self): """ Initialise client objects for talking to Azure API. This is in a separate function so to be called by ``__init__`` and ``__setstate__``. """ with self.__lock: if self._resource_client is None: log.debug("Making Azure `ServicePrincipalcredentials` object" " with tenant=%r, client_id=%r, secret=%r ...", self.tenant_id, self.client_id, ('<redacted>' if self.secret else None)) credentials = ServicePrincipalCredentials( tenant=self.tenant_id, client_id=self.client_id, secret=self.secret, ) log.debug("Initializing Azure `ComputeManagementclient` ...") self._compute_client = ComputeManagementClient(credentials, self.subscription_id) log.debug("Initializing Azure `NetworkManagementclient` ...") self._network_client = NetworkManagementClient(credentials, self.subscription_id) log.debug("Initializing Azure `ResourceManagementclient` ...") self._resource_client = ResourceManagementClient(credentials, self.subscription_id)"Azure API clients initialized.")
[docs] def start_instance(self, key_name, public_key_path, private_key_path, security_group, flavor, image_id, image_userdata, cluster_name, username='root', node_name=None, boot_disk_size=30, storage_account_type='Standard_LRS', **extra): """ Start a new VM using the given properties. :param str key_name: **unused in Azure**, only present for interface compatibility :param str public_key_path: path to ssh public key to authorize on the VM (for user `username`, see below) :param str private_key_path: **unused in Azure**, only present for interface compatibility :param str security_group: network security group to attach VM to, **currently unused** :param str flavor: machine type to use for the instance :param str image_id: disk image to use for the instance; has the form *publisher/offer/sku/version* (e.g., ``canonical/ubuntuserver/16.04.0-LTS/latest``) :param str image_userdata: command to execute after startup, **currently unused** :param int boot_disk_size: size of boot disk to use; values are specified in gigabytes. :param str username: username for the given ssh key (default is ``root`` as it's always guaranteed to exist, but you probably don't want to use that) :param str storage_account_type: Type of disks to attach to the VM. For a list of valid values, see: :return: tuple[str, str] -- resource group and node name of the started VM """ self._init_az_api() # Warn of unsupported parameters, if set. We do not warn # about `user_key` or `private_key_path` since they come from # a `[login/*]` section and those can be shared across # different cloud providers. if security_group and security_group != 'default': warn("Setting `security_group` is currently not supported" " in the Azure cloud; VMs will all be attached to" " a network security group named after the cluster name.") if image_userdata: warn("Parameter `image_userdata` is currently not supported" " in the Azure cloud and will be ignored.") # Use the cluster name to identify the Azure resource group; # however, `Node.cluster_name` is not passed down here so # extract it from the node name, which always contains it as # the substring before the leftmost dash (see ``, # line 1182) cluster_name, _ = node_name.rsplit('-', 1) if not self.__compiled_pattern_for_names.match(cluster_name): raise ConfigurationError("The cluster name `{0}` does not match the Azure requirement for names. " "Only numbers, lowercase letters and dashes are allowed, " "the value must begin with a lowercase letter and cannot end with a slash, " "and must also be less than 63 characters long." .format(cluster_name)) if not self.__compiled_pattern_for_names.match(node_name): raise ConfigurationError("The node name `{0}` does not match the Azure requirement for names. " "Only numbers, lowercase letters and dashes are allowed, " "the value must begin with a lowercase letter and cannot end with a slash, " "and must also be less than 63 characters long." .format(node_name)) with self.__lock: if cluster_name not in self._resource_groups_created: self._resource_client.resource_groups.create_or_update( cluster_name, {'location': self.location}) self._resource_groups_created.add(cluster_name) # read public SSH key with open(public_key_path, 'r') as public_key_file: public_key = image_publisher, image_offer, \ image_sku, image_version = self._split_image_id(image_id) if not security_group: security_group = (cluster_name + '-secgroup') net_parameters = { 'networkSecurityGroupName': { 'value': security_group, }, 'subnetName': { 'value': cluster_name }, } net_name = net_parameters['subnetName']['value'] with self.__lock: if net_name not in self._networks_created: log.debug( "Creating network `%s` in Azure ...", net_name) oper = self._resource_client.deployments.create_or_update( cluster_name, net_name, { 'mode': DeploymentMode.incremental, 'template': self.net_deployment_template, 'parameters': net_parameters, }) oper.wait() self._networks_created.add(net_name) boot_disk_size_gb = int(boot_disk_size) vm_parameters = { 'adminUserName': { 'value': username }, 'imagePublisher': { 'value': image_publisher }, # e.g., 'canonical' 'imageOffer': { 'value': image_offer }, # e.g., ubuntuserver 'imageSku': { 'value': image_sku }, # e.g., '16.04.0-LTS' 'imageVersion': { 'value': image_version }, # e.g., 'latest' 'networkSecurityGroupName': { 'value': security_group, }, 'sshKeyData': { 'value': public_key }, 'storageAccountName': { 'value': self._make_storage_account_name( cluster_name, node_name) }, 'storageAccountType': { 'value': storage_account_type }, 'subnetName': { 'value': cluster_name }, 'vmName': { 'value': node_name }, 'vmSize': { 'value': flavor }, 'bootDiskSize': { 'value': boot_disk_size_gb} } log.debug( "Deploying `%s` VM template to Azure ...", vm_parameters['vmName']['value']) oper = self._resource_client.deployments.create_or_update( cluster_name, node_name, { 'mode': DeploymentMode.incremental, 'template': self.vm_deployment_template, 'parameters': vm_parameters, }) oper.wait() # the `instance_id` is a composite type since we need both the # resource group name and the vm name to uniquely identify a VM return { 'instance_id': [cluster_name, node_name] }
@staticmethod def _split_image_id(image_id): try: publisher, offer, sku, version = image_id.split('/', 3) return (publisher, offer, sku, version) except (ValueError, TypeError): raise ConfigurationError( "The 'image_id' parameter in Azure" " has the form 'publisher/offer/sku/version'" " (e.g., 'canonical/ubuntuserver/16.04.0-LTS/latest');" " got '{0}' {1} instead!" .format(image_id, type(image_id))) @staticmethod def _make_storage_account_name(cluster_name, node_name): algo = hashlib.md5() algo.update(cluster_name.encode('utf-8')) algo.update(node_name.encode('utf-8')) # the `storageAccountName` parameter must be lowercase # alphanumeric and between 3 and 24 characters long... We # cannot use base64 encoding, and the full MD5 hash is 32 # characters -- truncate it and hope for the best. return algo.hexdigest()[:24] def _init_inventory(self, cluster_name): with self.__lock: if not self._inventory: for obj in self._resource_client.resources.list_by_resource_group(cluster_name): self._inventory[] =
[docs] def stop_instance(self, node): """ Destroy a VM. :param Node node: A `Node`:class: instance """ self._init_az_api() cluster_name, node_name = node.instance_id self._init_inventory(cluster_name) for name, api_version in [ # we must delete resources in a specific order: e.g., # a public IP address cannot be deleted if it's still # in use by a NIC... (node_name, '2018-06-01'), (node_name + '-nic', '2018-10-01'), (node_name + '-public-ip', '2018-10-01'), (node_name + '-disk', '2018-09-30'), (self._make_storage_account_name( cluster_name, node_name), '2018-07-01'), ]: rsc_id = self._inventory[name] log.debug("Deleting resource %s (`%s`) ...", name, rsc_id) oper = self._resource_client.resources.delete_by_id(rsc_id, api_version) oper.wait() del self._inventory[name] self._vm_details.pop(node_name, None) # if this was the last VM to be deleted, clean up leftover resource group with self.__lock: if len(self._inventory) == 2: log.debug("Cleaning up leftover resource group ...") oper = self._resource_client.resource_groups.delete(cluster_name) oper.wait() self._inventory = {}
[docs] def resume_instance(self, instance_state): raise NotImplementedError("This provider does not (yet) support pause / resume logic.")
[docs] def pause_instance(self, instance_id): raise NotImplementedError("This provider does not (yet) support pause / resume logic.")
[docs] def get_ips(self, instance_id): """ Retrieves all IP addresses associated to a given instance. :return: tuple (IPs) """ self._init_az_api() cluster_name, node_name = instance_id # XXX: keep in sync with contents of `vm_deployment_template` ip_name = ('{node_name}-public-ip'.format(node_name=node_name)) ip = self._network_client.public_ip_addresses.get(cluster_name, ip_name) if (ip.provisioning_state == 'Succeeded' and ip.ip_address): return [ip.ip_address] else: return []
[docs] def is_instance_running(self, instance_id): """ Check if the instance is up and running. :param str instance_id: instance identifier :return: bool - True if running, False otherwise """ self._init_az_api() # Here, it's always better if we update the instance. vm = self._get_vm(instance_id, force_reload=True) # FIXME: should we rather check `vm.instance_view.statuses` # and search for `.code == "PowerState/running"`? or # `vm.instance_view.vm_agent.statuses` and search for `.code # == 'ProvisioningState/suceeded'`? return vm.provisioning_state == u'Succeeded'
def _get_vm(self, instance_id, force_reload=True): """ Return details on the VM with the given name. :param str node_name: instance identifier :param bool force_reload: if ``True``, skip searching caches and reload instance from server and immediately reload instance data from cloud provider :return: py:class:`novaclient.v1_1.servers.Server` - instance :raises: `InstanceError` is returned if the instance can't be found in the local cache or in the cloud. """ self._init_az_api() if force_reload: # Remove from cache and get from server again self._inventory = {} cluster_name, node_name = instance_id self._init_inventory(cluster_name) # if instance is known, return it if node_name not in self._vm_details: vm_info = self._compute_client.virtual_machines.get( cluster_name, node_name, 'instanceView') self._vm_details[node_name] = vm_info try: return self._vm_details[node_name] except KeyError: raise InstanceNotFoundError( "Instance `{instance_id}` not found" .format(instance_id=instance_id)) # Fix pickler def __getstate__(self): return { 'subscription_id': self.subscription_id, 'tenant_id': self.tenant_id, 'client_id': self.client_id, 'secret': self.secret, 'location': self.location, '_inventory': self._inventory, '_resource_groups_created': self._resource_groups_created, } def __setstate__(self, state): # these will be initialized later by `_init_az_api()` self._compute_client = None self._network_client = None self._resource_client = None # local state self.subscription_id = state['subscription_id'] self.tenant_id = state['tenant_id'] self.client_id = state['client_id'] self.secret = state['secret'] self.location = state['location'] self._inventory = state['_inventory'] self._resource_groups_created = state['_resource_groups_created'] self._vm_details = {}