#!/usr/bin/env python
# -*- coding: utf-8 -*-#
#
# Copyright (C) 2018-2019 University of Zurich. All rights reserved.
#
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
__docformat__ = 'reStructuredText'
__author__ = ', '.join([
'Riccardo Murri <riccardo.murri@gmail.com>'
])
# stdlib imports
from builtins import object
import hashlib
import json
import os
import re
import threading
from warnings import warn
import sys
from time import sleep
try:
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.resource.resources.models import DeploymentMode
from msrestazure.azure_exceptions import CloudError
except ImportError:
# handle missing libraries in Python 2.6: delay ImportError until
# actually used but raise a warning in the meantime
class _Unavailable(object):
"""
Delay `ImportError` until actually used.
Still, raise a warning when instanciated.
"""
def __init__(self, missing):
self.__missing = missing
warn("Trying to initialize `{module}` which is not available."
" A placeholder object will be used instead, but it will raise"
" `ImportError` later if there is any actual attempt at using it."
.format(module=self.__missing), ImportWarning)
def __call__(self, *args, **kwargs):
raise ImportError(
"Trying to actually use client class from module `{module}`"
" which could not be imported. Aborting."
.format(module=self.__missing))
ServicePrincipalCredentials = _Unavailable("azure.common.credentials")
ResourceManagementClient = _Unavailable("azure.mgmt.resource")
NetworkManagementClient = _Unavailable("azure.mgmt.network")
ComputeManagementClient = _Unavailable("azure.mgmt.compute")
CloudError = _Unavailable("msrestazure.azure_exceptions")
from paramiko import DSSKey, RSAKey, PasswordRequiredException
from paramiko.ssh_exception import SSHException
if sys.version_info[:2] != (3, 5):
from pkg_resources import resource_string
else:
# on Python 3.5, `pkg_resources.resource_string` returns a `bytes`
# object, which cannot directly be fed into `json.loads()`
from pkg_resources import resource_string as _resource_string
def resource_string(pkgname, resource):
return _resource_string(pkgname, resource).decode('utf-8')
# Elasticluster imports
from elasticluster import log
from elasticluster.utils import memoize
from elasticluster.providers import AbstractCloudProvider
from elasticluster.exceptions import (
ConfigurationError,
FlavorError,
ImageError,
InstanceNotFoundError,
KeypairError,
SecurityGroupError,
)
[docs]class AzureCloudProvider(AbstractCloudProvider):
"""
Use the Azure Python SDK to connect to the Azure clouds and
manage virtual machines.
An AzureCloudProvider owns a tree of Azure resources, rooted in one or
more subscriptions and one or more storage accounts.
"""
__compiled_pattern_for_names = re.compile("^[a-z][a-z0-9-]{1,61}[a-z0-9]$")
__lock = threading.Lock()
"""
Lock used for node startup.
"""
def __init__(self, subscription_id, tenant_id,
client_id, secret, location,
vm_deployment_template=None,
net_deployment_template=None, **extra):
self.subscription_id = subscription_id
self.tenant_id = tenant_id
self.client_id = client_id
self.secret = secret
self.location = location
if vm_deployment_template:
try:
with open(vm_deployment_template) as template_file:
self.vm_deployment_template = json.load(template_file)
except Exception as err:
raise ConfigurationError(
"Could not load VM deployment template file `{0}`"
" in provider for Azure cloud: {1}"
.format(vm_deployment_template, err))
else:
# Azure Resource Manager template for starting a new VM.
# Initially taken from:
# https://github.com/Azure-Samples/resource-manager-python-template-deployment/blob/master/templates/template.json
# Copyright (c) 2015 Microsoft Corporation
self.vm_deployment_template = json.loads(resource_string(
'elasticluster', 'share/etc/azure_vm_template.json'))
if net_deployment_template:
try:
with open(net_deployment_template) as template_file:
self.net_deployment_template = json.load(template_file)
except Exception as err:
raise ConfigurationError(
"Could not load net deployment template file `{0}`"
" in provider for Azure cloud: {1}"
.format(net_deployment_template, err))
else:
# Azure Resource Manager template for creating a new network.
self.net_deployment_template = json.loads(resource_string(
'elasticluster', 'share/etc/azure_net_template.json'))
# these will be initialized later by `_init_az_api()`
self._compute_client = None
self._network_client = None
self._resource_client = None
# local state
self._inventory = {}
self._vm_details = {}
self._resource_groups_created = set()
self._networks_created = set()
[docs] def to_vars_dict(self):
"""
Return local state which is relevant for the cluster setup process.
"""
return {
'azure_client_id': self.client_id,
'azure_location': self.location,
'azure_secret': self.secret,
'azure_subscription_id': self.subscription_id,
'azure_tenant_id': self.tenant_id,
}
def _init_az_api(self):
"""
Initialise client objects for talking to Azure API.
This is in a separate function so to be called by ``__init__``
and ``__setstate__``.
"""
with self.__lock:
if self._resource_client is None:
log.debug("Making Azure `ServicePrincipalcredentials` object"
" with tenant=%r, client_id=%r, secret=%r ...",
self.tenant_id, self.client_id,
('<redacted>' if self.secret else None))
credentials = ServicePrincipalCredentials(
tenant=self.tenant_id,
client_id=self.client_id,
secret=self.secret,
)
log.debug("Initializing Azure `ComputeManagementclient` ...")
self._compute_client = ComputeManagementClient(credentials, self.subscription_id)
log.debug("Initializing Azure `NetworkManagementclient` ...")
self._network_client = NetworkManagementClient(credentials, self.subscription_id)
log.debug("Initializing Azure `ResourceManagementclient` ...")
self._resource_client = ResourceManagementClient(credentials, self.subscription_id)
log.info("Azure API clients initialized.")
[docs] def start_instance(self, key_name, public_key_path, private_key_path,
security_group, flavor, image_id, image_userdata,
cluster_name,
username='root',
node_name=None,
boot_disk_size=30,
storage_account_type='Standard_LRS',
**extra):
"""
Start a new VM using the given properties.
:param str key_name:
**unused in Azure**, only present for interface compatibility
:param str public_key_path:
path to ssh public key to authorize on the VM (for user `username`, see below)
:param str private_key_path:
**unused in Azure**, only present for interface compatibility
:param str security_group:
network security group to attach VM to, **currently unused**
:param str flavor:
machine type to use for the instance
:param str image_id:
disk image to use for the instance;
has the form *publisher/offer/sku/version*
(e.g., ``canonical/ubuntuserver/16.04.0-LTS/latest``)
:param str image_userdata:
command to execute after startup, **currently unused**
:param int boot_disk_size:
size of boot disk to use; values are specified in gigabytes.
:param str username:
username for the given ssh key
(default is ``root`` as it's always guaranteed to exist,
but you probably don't want to use that)
:param str storage_account_type:
Type of disks to attach to the VM. For a list of valid values,
see: https://docs.microsoft.com/en-us/rest/api/compute/disks/createorupdate#diskstorageaccounttypes
:return: tuple[str, str] -- resource group and node name of the started VM
"""
self._init_az_api()
# Warn of unsupported parameters, if set. We do not warn
# about `user_key` or `private_key_path` since they come from
# a `[login/*]` section and those can be shared across
# different cloud providers.
if security_group and security_group != 'default':
warn("Setting `security_group` is currently not supported"
" in the Azure cloud; VMs will all be attached to"
" a network security group named after the cluster name.")
if image_userdata:
warn("Parameter `image_userdata` is currently not supported"
" in the Azure cloud and will be ignored.")
# Use the cluster name to identify the Azure resource group;
# however, `Node.cluster_name` is not passed down here so
# extract it from the node name, which always contains it as
# the substring before the leftmost dash (see `cluster.py`,
# line 1182)
cluster_name, _ = node_name.rsplit('-', 1)
if not self.__compiled_pattern_for_names.match(cluster_name):
raise ConfigurationError("The cluster name `{0}` does not match the Azure requirement for names. "
"Only numbers, lowercase letters and dashes are allowed, "
"the value must begin with a lowercase letter and cannot end with a slash, "
"and must also be less than 63 characters long."
.format(cluster_name))
if not self.__compiled_pattern_for_names.match(node_name):
raise ConfigurationError("The node name `{0}` does not match the Azure requirement for names. "
"Only numbers, lowercase letters and dashes are allowed, "
"the value must begin with a lowercase letter and cannot end with a slash, "
"and must also be less than 63 characters long."
.format(node_name))
with self.__lock:
if cluster_name not in self._resource_groups_created:
self._resource_client.resource_groups.create_or_update(
cluster_name, {'location': self.location})
self._resource_groups_created.add(cluster_name)
# read public SSH key
with open(public_key_path, 'r') as public_key_file:
public_key = public_key_file.read()
image_publisher, image_offer, \
image_sku, image_version = self._split_image_id(image_id)
if not security_group:
security_group = (cluster_name + '-secgroup')
net_parameters = {
'networkSecurityGroupName': {
'value': security_group,
},
'subnetName': { 'value': cluster_name },
}
net_name = net_parameters['subnetName']['value']
with self.__lock:
if net_name not in self._networks_created:
log.debug(
"Creating network `%s` in Azure ...", net_name)
oper = self._resource_client.deployments.create_or_update(
cluster_name, net_name, {
'mode': DeploymentMode.incremental,
'template': self.net_deployment_template,
'parameters': net_parameters,
})
oper.wait()
self._networks_created.add(net_name)
boot_disk_size_gb = int(boot_disk_size)
vm_parameters = {
'adminUserName': { 'value': username },
'imagePublisher': { 'value': image_publisher }, # e.g., 'canonical'
'imageOffer': { 'value': image_offer }, # e.g., ubuntuserver
'imageSku': { 'value': image_sku }, # e.g., '16.04.0-LTS'
'imageVersion': { 'value': image_version }, # e.g., 'latest'
'networkSecurityGroupName': {
'value': security_group,
},
'sshKeyData': { 'value': public_key },
'storageAccountName': {
'value': self._make_storage_account_name(
cluster_name, node_name)
},
'storageAccountType': { 'value': storage_account_type },
'subnetName': { 'value': cluster_name },
'vmName': { 'value': node_name },
'vmSize': { 'value': flavor },
'bootDiskSize': { 'value': boot_disk_size_gb}
}
log.debug(
"Deploying `%s` VM template to Azure ...",
vm_parameters['vmName']['value'])
oper = self._resource_client.deployments.create_or_update(
cluster_name, node_name, {
'mode': DeploymentMode.incremental,
'template': self.vm_deployment_template,
'parameters': vm_parameters,
})
oper.wait()
# the `instance_id` is a composite type since we need both the
# resource group name and the vm name to uniquely identify a VM
return { 'instance_id': [cluster_name, node_name] }
@staticmethod
def _split_image_id(image_id):
try:
publisher, offer, sku, version = image_id.split('/', 3)
return (publisher, offer, sku, version)
except (ValueError, TypeError):
raise ConfigurationError(
"The 'image_id' parameter in Azure"
" has the form 'publisher/offer/sku/version'"
" (e.g., 'canonical/ubuntuserver/16.04.0-LTS/latest');"
" got '{0}' {1} instead!"
.format(image_id, type(image_id)))
@staticmethod
def _make_storage_account_name(cluster_name, node_name):
algo = hashlib.md5()
algo.update(cluster_name.encode('utf-8'))
algo.update(node_name.encode('utf-8'))
# the `storageAccountName` parameter must be lowercase
# alphanumeric and between 3 and 24 characters long... We
# cannot use base64 encoding, and the full MD5 hash is 32
# characters -- truncate it and hope for the best.
return algo.hexdigest()[:24]
def _init_inventory(self, cluster_name):
with self.__lock:
if not self._inventory:
for obj in self._resource_client.resources.list_by_resource_group(cluster_name):
self._inventory[obj.name] = obj.id
[docs] def stop_instance(self, node):
"""
Destroy a VM.
:param Node node: A `Node`:class: instance
"""
self._init_az_api()
cluster_name, node_name = node.instance_id
self._init_inventory(cluster_name)
for name, api_version in [
# we must delete resources in a specific order: e.g.,
# a public IP address cannot be deleted if it's still
# in use by a NIC...
(node_name, '2018-06-01'),
(node_name + '-nic', '2018-10-01'),
(node_name + '-public-ip', '2018-10-01'),
(node_name + '-disk', '2018-09-30'),
(self._make_storage_account_name(
cluster_name, node_name),
'2018-07-01'),
]:
rsc_id = self._inventory[name]
log.debug("Deleting resource %s (`%s`) ...", name, rsc_id)
oper = self._resource_client.resources.delete_by_id(rsc_id, api_version)
oper.wait()
del self._inventory[name]
self._vm_details.pop(node_name, None)
# if this was the last VM to be deleted, clean up leftover resource group
with self.__lock:
if len(self._inventory) == 2:
log.debug("Cleaning up leftover resource group ...")
oper = self._resource_client.resource_groups.delete(cluster_name)
oper.wait()
self._inventory = {}
[docs] def resume_instance(self, instance_state):
raise NotImplementedError("This provider does not (yet) support pause / resume logic.")
[docs] def pause_instance(self, instance_id):
raise NotImplementedError("This provider does not (yet) support pause / resume logic.")
[docs] def get_ips(self, instance_id):
"""
Retrieves all IP addresses associated to a given instance.
:return: tuple (IPs)
"""
self._init_az_api()
cluster_name, node_name = instance_id
# XXX: keep in sync with contents of `vm_deployment_template`
ip_name = ('{node_name}-public-ip'.format(node_name=node_name))
ip = self._network_client.public_ip_addresses.get(cluster_name, ip_name)
if (ip.provisioning_state == 'Succeeded' and ip.ip_address):
return [ip.ip_address]
else:
return []
[docs] def is_instance_running(self, instance_id):
"""
Check if the instance is up and running.
:param str instance_id: instance identifier
:return: bool - True if running, False otherwise
"""
self._init_az_api()
# Here, it's always better if we update the instance.
vm = self._get_vm(instance_id, force_reload=True)
# FIXME: should we rather check `vm.instance_view.statuses`
# and search for `.code == "PowerState/running"`? or
# `vm.instance_view.vm_agent.statuses` and search for `.code
# == 'ProvisioningState/suceeded'`?
return vm.provisioning_state == u'Succeeded'
def _get_vm(self, instance_id, force_reload=True):
"""
Return details on the VM with the given name.
:param str node_name: instance identifier
:param bool force_reload:
if ``True``, skip searching caches and reload instance from server
and immediately reload instance data from cloud provider
:return: py:class:`novaclient.v1_1.servers.Server` - instance
:raises: `InstanceError` is returned if the instance can't
be found in the local cache or in the cloud.
"""
self._init_az_api()
if force_reload:
# Remove from cache and get from server again
self._inventory = {}
cluster_name, node_name = instance_id
self._init_inventory(cluster_name)
# if instance is known, return it
if node_name not in self._vm_details:
vm_info = self._compute_client.virtual_machines.get(
cluster_name, node_name, 'instanceView')
self._vm_details[node_name] = vm_info
try:
return self._vm_details[node_name]
except KeyError:
raise InstanceNotFoundError(
"Instance `{instance_id}` not found"
.format(instance_id=instance_id))
# Fix pickler
def __getstate__(self):
return {
'subscription_id': self.subscription_id,
'tenant_id': self.tenant_id,
'client_id': self.client_id,
'secret': self.secret,
'location': self.location,
'_inventory': self._inventory,
'_resource_groups_created': self._resource_groups_created,
}
def __setstate__(self, state):
# these will be initialized later by `_init_az_api()`
self._compute_client = None
self._network_client = None
self._resource_client = None
# local state
self.subscription_id = state['subscription_id']
self.tenant_id = state['tenant_id']
self.client_id = state['client_id']
self.secret = state['secret']
self.location = state['location']
self._inventory = state['_inventory']
self._resource_groups_created = state['_resource_groups_created']
self._vm_details = {}