Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.116.80.252
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/libcloud/storage/drivers/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/libcloud/storage/drivers/azure_blobs.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import with_statement

import base64
import hashlib
import hmac
import os
import binascii
from datetime import datetime, timedelta

from libcloud.utils.py3 import ET
from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import urlencode
from libcloud.utils.py3 import urlquote
from libcloud.utils.py3 import tostring
from libcloud.utils.py3 import b

from libcloud.utils.xml import fixxpath
from libcloud.utils.files import read_in_chunks
from libcloud.common.types import LibcloudError
from libcloud.common.azure import AzureConnection

from libcloud.storage.base import Object, Container, StorageDriver
from libcloud.storage.types import ContainerIsNotEmptyError
from libcloud.storage.types import ContainerAlreadyExistsError
from libcloud.storage.types import InvalidContainerNameError
from libcloud.storage.types import ContainerDoesNotExistError
from libcloud.storage.types import ObjectDoesNotExistError
from libcloud.storage.types import ObjectHashMismatchError

# Desired number of items in each response inside a paginated request
RESPONSES_PER_REQUEST = 100

# According to the Azure Docs:
# > The block must be less than or equal to 100 MB in size for version
# > 2016-05-31 and later (4 MB for older versions).
# However for performance reasons, using a lower upload chunk size
# usually leads to fewer dropped requests and retries.
AZURE_UPLOAD_CHUNK_SIZE = int(
    os.getenv('LIBCLOUD_AZURE_UPLOAD_CHUNK_SIZE_MB', '4')
) * 1024 * 1024

AZURE_DOWNLOAD_CHUNK_SIZE = int(
    os.getenv('LIBCLOUD_AZURE_DOWNLOAD_CHUNK_SIZE_MB', '4')
) * 1024 * 1024

# The time period (in seconds) for which a lease must be obtained.
# If set as -1, we get an infinite lease, but that is a bad idea. If
# after getting an infinite lease, there was an issue in releasing the
# lease, the object will remain 'locked' forever, unless the lease is
# released using the lease_id (which is not exposed to the user)
AZURE_LEASE_PERIOD = int(
    os.getenv('LIBCLOUD_AZURE_LEASE_PERIOD_SECONDS', '60')
)

AZURE_STORAGE_HOST_SUFFIX = 'blob.core.windows.net'
AZURE_STORAGE_HOST_SUFFIX_CHINA = 'blob.core.chinacloudapi.cn'
AZURE_STORAGE_HOST_SUFFIX_GOVERNMENT = 'blob.core.usgovcloudapi.net'
AZURE_STORAGE_HOST_SUFFIX_PRIVATELINK = 'privatelink.blob.core.windows.net'

AZURE_STORAGE_CDN_URL_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

AZURE_STORAGE_CDN_URL_START_MINUTES = float(
    os.getenv('LIBCLOUD_AZURE_STORAGE_CDN_URL_START_MINUTES', '5')
)

AZURE_STORAGE_CDN_URL_EXPIRY_HOURS = float(
    os.getenv('LIBCLOUD_AZURE_STORAGE_CDN_URL_EXPIRY_HOURS', '24')
)


class AzureBlobLease(object):
    """
    A class to help in leasing an azure blob and renewing the lease
    """
    def __init__(self, driver, object_path, use_lease):
        """
        :param driver: The Azure storage driver that is being used
        :type driver: :class:`AzureStorageDriver`

        :param object_path: The path of the object we need to lease
        :type object_path: ``str``

        :param use_lease: Indicates if we must take a lease or not
        :type use_lease: ``bool``
        """
        self.object_path = object_path
        self.driver = driver
        self.use_lease = use_lease
        self.lease_id = None
        self.params = {'comp': 'lease'}

    def renew(self):
        """
        Renew the lease if it is older than a predefined time period
        """
        if self.lease_id is None:
            return

        headers = {'x-ms-lease-action': 'renew',
                   'x-ms-lease-id': self.lease_id,
                   'x-ms-lease-duration': '60'}

        response = self.driver.connection.request(self.object_path,
                                                  headers=headers,
                                                  params=self.params,
                                                  method='PUT')

        if response.status != httplib.OK:
            raise LibcloudError('Unable to obtain lease', driver=self)

    def update_headers(self, headers):
        """
        Update the lease id in the headers
        """
        if self.lease_id:
            headers['x-ms-lease-id'] = self.lease_id

    def __enter__(self):
        if not self.use_lease:
            return self

        headers = {'x-ms-lease-action': 'acquire',
                   'x-ms-lease-duration': '60'}

        response = self.driver.connection.request(self.object_path,
                                                  headers=headers,
                                                  params=self.params,
                                                  method='PUT')

        if response.status == httplib.NOT_FOUND:
            return self
        elif response.status != httplib.CREATED:
            raise LibcloudError('Unable to obtain lease', driver=self)

        self.lease_id = response.headers['x-ms-lease-id']
        return self

    def __exit__(self, type, value, traceback):
        if self.lease_id is None:
            return

        headers = {'x-ms-lease-action': 'release',
                   'x-ms-lease-id': self.lease_id}
        response = self.driver.connection.request(self.object_path,
                                                  headers=headers,
                                                  params=self.params,
                                                  method='PUT')

        if response.status != httplib.OK:
            raise LibcloudError('Unable to release lease', driver=self)


class AzureBlobsConnection(AzureConnection):
    """
    Represents a single connection to Azure Blobs.

    The main Azure Blob Storage service uses a prefix in the hostname to
    distinguish between accounts, e.g. ``theaccount.blob.core.windows.net``.
    However, some custom deployments of the service, such as the Azurite
    emulator, instead use a URL prefix such as ``/theaccount``. To support
    these deployments, the parameter ``account_prefix`` must be set on the
    connection. This is done by instantiating the driver with arguments such
    as ``host='somewhere.tld'`` and ``key='theaccount'``. To specify a custom
    host without an account prefix, e.g. to connect to Azure Government or
    Azure China, the driver can be instantiated with the appropriate storage
    endpoint suffix, e.g. ``host='blob.core.usgovcloudapi.net'`` and
    ``key='theaccount'``.

    :param account_prefix: Optional prefix identifying the storage account.
                           Used when connecting to a custom deployment of the
                           storage service like Azurite or IoT Edge Storage.
    :type account_prefix: ``str``
    """
    def __init__(self, *args, **kwargs):
        self.account_prefix = kwargs.pop('account_prefix', None)
        super(AzureBlobsConnection, self).__init__(*args, **kwargs)

    def morph_action_hook(self, action):
        action = super(AzureBlobsConnection, self).morph_action_hook(action)

        if self.account_prefix is not None:
            action = '/%s%s' % (self.account_prefix, action)

        return action

    API_VERSION = '2018-11-09'


class AzureBlobsStorageDriver(StorageDriver):
    name = 'Microsoft Azure (blobs)'
    website = 'http://windows.azure.com/'
    connectionCls = AzureBlobsConnection
    hash_type = 'md5'
    supports_chunked_encoding = False

    def __init__(self, key, secret=None, secure=True, host=None, port=None,
                 **kwargs):
        self._host = host

        # B64decode() this key and keep it, so that we don't have to do
        # so for every request. Minor performance improvement
        secret = base64.b64decode(b(secret))

        super(AzureBlobsStorageDriver, self).__init__(key=key, secret=secret,
                                                      secure=secure, host=host,
                                                      port=port, **kwargs)

    def _ex_connection_class_kwargs(self):
        # if the user didn't provide a custom host value, assume we're
        # targeting the default Azure Storage endpoints
        if self._host is None:
            return {'host': '%s.%s' % (self.key, AZURE_STORAGE_HOST_SUFFIX)}

        # connecting to a special storage region like Azure Government or
        # Azure China requires setting a custom storage endpoint but we
        # still use the same scheme to identify a specific account as for
        # the standard storage endpoint
        try:
            host_suffix = next(
                host_suffix
                for host_suffix in (
                    AZURE_STORAGE_HOST_SUFFIX_CHINA,
                    AZURE_STORAGE_HOST_SUFFIX_GOVERNMENT,
                    AZURE_STORAGE_HOST_SUFFIX_PRIVATELINK,
                )
                if self._host.endswith(host_suffix)
            )
        except StopIteration:
            pass
        else:
            return {'host': '%s.%s' % (self.key, host_suffix)}

        # if the host isn't targeting one of the special storage regions, it
        # must be pointing to Azurite or IoT Edge Storage so switch to prefix
        # identification
        return {'account_prefix': self.key}

    def _xml_to_container(self, node):
        """
        Converts a container XML node to a container instance

        :param node: XML info of the container
        :type node: :class:`xml.etree.ElementTree.Element`

        :return: A container instance
        :rtype: :class:`Container`
        """

        name = node.findtext(fixxpath(xpath='Name'))
        props = node.find(fixxpath(xpath='Properties'))
        metadata = node.find(fixxpath(xpath='Metadata'))

        extra = {
            'url': node.findtext(fixxpath(xpath='Url')),
            'last_modified': node.findtext(fixxpath(xpath='Last-Modified')),
            'etag': props.findtext(fixxpath(xpath='Etag')),
            'lease': {
                'status': props.findtext(fixxpath(xpath='LeaseStatus')),
                'state': props.findtext(fixxpath(xpath='LeaseState')),
                'duration': props.findtext(fixxpath(xpath='LeaseDuration')),
            },
            'meta_data': {}
        }

        if metadata is not None:
            for meta in list(metadata):
                extra['meta_data'][meta.tag] = meta.text

        return Container(name=name, extra=extra, driver=self)

    def _response_to_container(self, container_name, response):
        """
        Converts a HTTP response to a container instance

        :param container_name: Name of the container
        :type container_name: ``str``

        :param response: HTTP Response
        :type node: L{}

        :return: A container instance
        :rtype: :class:`Container`
        """

        headers = response.headers
        extra = {
            'url': 'http://%s%s' % (response.connection.host,
                                    response.connection.action),
            'etag': headers['etag'],
            'last_modified': headers['last-modified'],
            'lease': {
                'status': headers.get('x-ms-lease-status', None),
                'state': headers.get('x-ms-lease-state', None),
                'duration': headers.get('x-ms-lease-duration', None),
            },
            'meta_data': {}
        }

        for key, value in response.headers.items():
            if key.startswith('x-ms-meta-'):
                key = key.split('x-ms-meta-')[1]
                extra['meta_data'][key] = value

        return Container(name=container_name, extra=extra, driver=self)

    def _xml_to_object(self, container, blob):
        """
        Converts a BLOB XML node to an object instance

        :param container: Instance of the container holding the blob
        :type: :class:`Container`

        :param blob: XML info of the blob
        :type blob: L{}

        :return: An object instance
        :rtype: :class:`Object`
        """

        name = blob.findtext(fixxpath(xpath='Name'))
        props = blob.find(fixxpath(xpath='Properties'))
        metadata = blob.find(fixxpath(xpath='Metadata'))
        etag = props.findtext(fixxpath(xpath='Etag'))
        size = int(props.findtext(fixxpath(xpath='Content-Length')))

        extra = {
            'content_type': props.findtext(fixxpath(xpath='Content-Type')),
            'etag': etag,
            'md5_hash': props.findtext(fixxpath(xpath='Content-MD5')),
            'last_modified': props.findtext(fixxpath(xpath='Last-Modified')),
            'url': blob.findtext(fixxpath(xpath='Url')),
            'hash': props.findtext(fixxpath(xpath='Etag')),
            'lease': {
                'status': props.findtext(fixxpath(xpath='LeaseStatus')),
                'state': props.findtext(fixxpath(xpath='LeaseState')),
                'duration': props.findtext(fixxpath(xpath='LeaseDuration')),
            },
            'content_encoding': props.findtext(fixxpath(
                                               xpath='Content-Encoding')),
            'content_language': props.findtext(fixxpath(
                                               xpath='Content-Language')),
            'blob_type': props.findtext(fixxpath(xpath='BlobType'))
        }

        if extra['md5_hash']:
            value = binascii.hexlify(base64.b64decode(b(extra['md5_hash'])))
            value = value.decode('ascii')
            extra['md5_hash'] = value

        meta_data = {}
        if metadata is not None:
            for meta in list(metadata):
                meta_data[meta.tag] = meta.text

        return Object(name=name, size=size, hash=etag, meta_data=meta_data,
                      extra=extra, container=container, driver=self)

    def _response_to_object(self, object_name, container, response):
        """
        Converts a HTTP response to an object (from headers)

        :param object_name: Name of the object
        :type object_name: ``str``

        :param container: Instance of the container holding the blob
        :type: :class:`Container`

        :param response: HTTP Response
        :type node: L{}

        :return: An object instance
        :rtype: :class:`Object`
        """

        headers = response.headers
        size = int(headers['content-length'])
        etag = headers['etag']

        extra = {
            'url': 'http://%s%s' % (response.connection.host,
                                    response.connection.action),
            'etag': etag,
            'md5_hash': headers.get('content-md5', None),
            'content_type': headers.get('content-type', None),
            'content_language': headers.get('content-language', None),
            'content_encoding': headers.get('content-encoding', None),
            'last_modified': headers['last-modified'],
            'lease': {
                'status': headers.get('x-ms-lease-status', None),
                'state': headers.get('x-ms-lease-state', None),
                'duration': headers.get('x-ms-lease-duration', None),
            },
            'blob_type': headers['x-ms-blob-type']
        }

        if extra['md5_hash']:
            value = binascii.hexlify(base64.b64decode(b(extra['md5_hash'])))
            value = value.decode('ascii')
            extra['md5_hash'] = value

        meta_data = {}
        for key, value in response.headers.items():
            if key.startswith('x-ms-meta-'):
                key = key.split('x-ms-meta-')[1]
                meta_data[key] = value

        return Object(name=object_name, size=size, hash=etag, extra=extra,
                      meta_data=meta_data, container=container, driver=self)

    def iterate_containers(self):
        """
        @inherits: :class:`StorageDriver.iterate_containers`
        """
        params = {'comp': 'list',
                  'maxresults': RESPONSES_PER_REQUEST,
                  'include': 'metadata'}

        while True:
            response = self.connection.request('/', params)
            if response.status != httplib.OK:
                raise LibcloudError('Unexpected status code: %s' %
                                    (response.status), driver=self)

            body = response.parse_body()
            containers = body.find(fixxpath(xpath='Containers'))
            containers = containers.findall(fixxpath(xpath='Container'))

            for container in containers:
                yield self._xml_to_container(container)

            params['marker'] = body.findtext('NextMarker')
            if not params['marker']:
                break

    def iterate_container_objects(self, container, prefix=None,
                                  ex_prefix=None):
        """
        @inherits: :class:`StorageDriver.iterate_container_objects`
        """
        prefix = self._normalize_prefix_argument(prefix, ex_prefix)

        params = {'restype': 'container',
                  'comp': 'list',
                  'maxresults': RESPONSES_PER_REQUEST,
                  'include': 'metadata'}

        if prefix:
            params['prefix'] = prefix

        container_path = self._get_container_path(container)

        while True:
            response = self.connection.request(container_path,
                                               params=params)

            if response.status == httplib.NOT_FOUND:
                raise ContainerDoesNotExistError(value=None,
                                                 driver=self,
                                                 container_name=container.name)

            elif response.status != httplib.OK:
                raise LibcloudError('Unexpected status code: %s' %
                                    (response.status), driver=self)

            body = response.parse_body()
            blobs = body.find(fixxpath(xpath='Blobs'))
            blobs = blobs.findall(fixxpath(xpath='Blob'))

            for blob in blobs:
                yield self._xml_to_object(container, blob)

            params['marker'] = body.findtext('NextMarker')
            if not params['marker']:
                break

    def get_container(self, container_name):
        """
        @inherits: :class:`StorageDriver.get_container`
        """
        params = {'restype': 'container'}

        container_path = '/%s' % (container_name)

        response = self.connection.request(container_path, params=params,
                                           method='HEAD')

        if response.status == httplib.NOT_FOUND:
            raise ContainerDoesNotExistError('Container %s does not exist' %
                                             (container_name), driver=self,
                                             container_name=container_name)
        elif response.status != httplib.OK:
            raise LibcloudError('Unexpected status code: %s' %
                                (response.status), driver=self)

        return self._response_to_container(container_name, response)

    def get_object(self, container_name, object_name):
        """
        @inherits: :class:`StorageDriver.get_object`
        """

        container = self.get_container(container_name=container_name)
        object_path = self._get_object_path(container, object_name)

        response = self.connection.request(object_path, method='HEAD')

        if response.status == httplib.OK:
            obj = self._response_to_object(object_name, container, response)
            return obj

        raise ObjectDoesNotExistError(value=None, driver=self,
                                      object_name=object_name)

    def get_object_cdn_url(self, obj,
                           ex_expiry=AZURE_STORAGE_CDN_URL_EXPIRY_HOURS):
        """
        Return a SAS URL that enables reading the given object.

        :param obj: Object instance.
        :type  obj: :class:`Object`

        :param ex_expiry: The number of hours after which the URL expires.
                          Defaults to 24 hours.
        :type  ex_expiry: ``float``

        :return: A SAS URL for the object.
        :rtype: ``str``
        """
        object_path = self._get_object_path(obj.container, obj.name)

        now = datetime.utcnow()
        start = now - timedelta(minutes=AZURE_STORAGE_CDN_URL_START_MINUTES)
        expiry = now + timedelta(hours=ex_expiry)

        params = {
            'st': start.strftime(AZURE_STORAGE_CDN_URL_DATE_FORMAT),
            'se': expiry.strftime(AZURE_STORAGE_CDN_URL_DATE_FORMAT),
            'sp': 'r',
            'spr': 'https' if self.secure else 'http,https',
            'sv': self.connectionCls.API_VERSION,
            'sr': 'b',
        }

        string_to_sign = '\n'.join((
            params['sp'],
            params['st'],
            params['se'],
            '/blob/{}{}'.format(self.key, object_path),
            '',  # signedIdentifier
            '',  # signedIP
            params['spr'],
            params['sv'],
            params['sr'],
            '',  # snapshot
            '',  # rscc
            '',  # rscd
            '',  # rsce
            '',  # rscl
            '',  # rsct
        ))

        params['sig'] = base64.b64encode(
            hmac.new(
                self.secret,
                string_to_sign.encode('utf-8'),
                hashlib.sha256
            ).digest()
        ).decode('utf-8')

        return '{scheme}://{host}:{port}{action}?{sas_token}'.format(
            scheme='https' if self.secure else 'http',
            host=self.connection.host,
            port=self.connection.port,
            action=self.connection.morph_action_hook(object_path),
            sas_token=urlencode(params),
        )

    def _get_container_path(self, container):
        """
        Return a container path

        :param container: Container instance
        :type  container: :class:`Container`

        :return: A path for this container.
        :rtype: ``str``
        """
        return '/%s' % (container.name)

    def _get_object_path(self, container, object_name):
        """
        Return an object's CDN path.

        :param container: Container instance
        :type  container: :class:`Container`

        :param object_name: Object name
        :type  object_name: :class:`str`

        :return: A  path for this object.
        :rtype: ``str``
        """
        container_url = self._get_container_path(container)
        object_name_cleaned = urlquote(object_name)
        object_path = '%s/%s' % (container_url, object_name_cleaned)
        return object_path

    def create_container(self, container_name):
        """
        @inherits: :class:`StorageDriver.create_container`
        """
        params = {'restype': 'container'}

        container_path = '/%s' % (container_name)
        response = self.connection.request(container_path, params=params,
                                           method='PUT')

        if response.status == httplib.CREATED:
            return self._response_to_container(container_name, response)
        elif response.status == httplib.CONFLICT:
            raise ContainerAlreadyExistsError(
                value='Container with this name already exists. The name must '
                      'be unique among all the containers in the system',
                container_name=container_name, driver=self)
        elif response.status == httplib.BAD_REQUEST:
            raise InvalidContainerNameError(value='Container name contains ' +
                                            'invalid characters.',
                                            container_name=container_name,
                                            driver=self)

        raise LibcloudError('Unexpected status code: %s' % (response.status),
                            driver=self)

    def delete_container(self, container):
        """
        @inherits: :class:`StorageDriver.delete_container`
        """
        # Azure does not check if the container is empty. So, we will do
        # a check to ensure that the behaviour is similar to other drivers
        for obj in container.iterate_objects():
            raise ContainerIsNotEmptyError(
                value='Container must be empty before it can be deleted.',
                container_name=container.name, driver=self)

        params = {'restype': 'container'}
        container_path = self._get_container_path(container)

        # Note: All the objects in the container must be deleted first
        response = self.connection.request(container_path, params=params,
                                           method='DELETE')

        if response.status == httplib.ACCEPTED:
            return True
        elif response.status == httplib.NOT_FOUND:
            raise ContainerDoesNotExistError(value=None,
                                             driver=self,
                                             container_name=container.name)

        return False

    def download_object(self, obj, destination_path, overwrite_existing=False,
                        delete_on_failure=True):
        """
        @inherits: :class:`StorageDriver.download_object`
        """
        obj_path = self._get_object_path(obj.container, obj.name)
        response = self.connection.request(obj_path, raw=True, data=None)

        return self._get_object(obj=obj, callback=self._save_object,
                                response=response,
                                callback_kwargs={
                                    'obj': obj,
                                    'response': response.response,
                                    'destination_path': destination_path,
                                    'overwrite_existing': overwrite_existing,
                                    'delete_on_failure': delete_on_failure},
                                success_status_code=httplib.OK)

    def download_object_as_stream(self, obj, chunk_size=None):
        """
        @inherits: :class:`StorageDriver.download_object_as_stream`
        """
        obj_path = self._get_object_path(obj.container, obj.name)
        response = self.connection.request(obj_path, method='GET',
                                           stream=True, raw=True)
        iterator = response.iter_content(AZURE_DOWNLOAD_CHUNK_SIZE)

        return self._get_object(obj=obj,
                                callback=read_in_chunks,
                                response=response,
                                callback_kwargs={'iterator': iterator,
                                                 'chunk_size': chunk_size},
                                success_status_code=httplib.OK)

    def download_object_range(self, obj, destination_path, start_bytes,
                              end_bytes=None, overwrite_existing=False,
                              delete_on_failure=True):
        self._validate_start_and_end_bytes(start_bytes=start_bytes,
                                           end_bytes=end_bytes)

        obj_path = self._get_object_path(obj.container, obj.name)
        headers = {'x-ms-range': self._get_standard_range_str(start_bytes,
                                                              end_bytes)}
        response = self.connection.request(obj_path, headers=headers,
                                           raw=True, data=None)

        # NOTE: Some Azure Blobs implementation return 200 instead of 206
        # status code, see
        # https://github.com/c-w/libcloud-tests/pull/2#issuecomment-592765323
        # for details.
        success_status_codes = [httplib.OK, httplib.PARTIAL_CONTENT]

        return self._get_object(obj=obj, callback=self._save_object,
                                response=response,
                                callback_kwargs={
                                    'obj': obj,
                                    'response': response.response,
                                    'destination_path': destination_path,
                                    'overwrite_existing': overwrite_existing,
                                    'delete_on_failure': delete_on_failure,
                                    'partial_download': True},
                                success_status_code=success_status_codes)

    def download_object_range_as_stream(self, obj, start_bytes, end_bytes=None,
                                        chunk_size=None):
        self._validate_start_and_end_bytes(start_bytes=start_bytes,
                                           end_bytes=end_bytes)

        obj_path = self._get_object_path(obj.container, obj.name)

        headers = {'x-ms-range': self._get_standard_range_str(start_bytes,
                                                              end_bytes)}
        response = self.connection.request(obj_path, method='GET',
                                           headers=headers,
                                           stream=True, raw=True)
        iterator = response.iter_content(AZURE_DOWNLOAD_CHUNK_SIZE)
        success_status_codes = [httplib.OK, httplib.PARTIAL_CONTENT]

        return self._get_object(
            obj=obj, callback=read_in_chunks,
            response=response,
            callback_kwargs={'iterator': iterator,
                             'chunk_size': chunk_size},
            success_status_code=success_status_codes)

    def _upload_in_chunks(self, stream, object_path, lease, meta_data,
                          content_type, object_name, file_path, verify_hash,
                          headers):
        """
        Uploads data from an interator in fixed sized chunks to Azure Storage
        """

        data_hash = None
        if verify_hash:
            data_hash = self._get_hash_function()

        bytes_transferred = 0
        count = 1
        chunks = []
        headers = headers or {}

        lease.update_headers(headers)

        params = {'comp': 'block'}

        # Read the input data in chunk sizes suitable for Azure
        for data in read_in_chunks(stream, AZURE_UPLOAD_CHUNK_SIZE,
                                   fill_size=True):
            data = b(data)
            content_length = len(data)
            bytes_transferred += content_length

            if verify_hash:
                data_hash.update(data)

            chunk_hash = self._get_hash_function()
            chunk_hash.update(data)
            chunk_hash = base64.b64encode(b(chunk_hash.digest()))

            headers['Content-MD5'] = chunk_hash.decode('utf-8')
            headers['Content-Length'] = str(content_length)

            # Block id can be any unique string that is base64 encoded
            # A 10 digit number can hold the max value of 50000 blocks
            # that are allowed for azure
            block_id = base64.b64encode(b('%10d' % (count)))
            block_id = block_id.decode('utf-8')
            params['blockid'] = block_id

            # Keep this data for a later commit
            chunks.append(block_id)

            # Renew lease before updating
            lease.renew()

            resp = self.connection.request(object_path, method='PUT',
                                           data=data, headers=headers,
                                           params=params)

            if resp.status != httplib.CREATED:
                resp.parse_error()
                raise LibcloudError('Error uploading chunk %d. Code: %d' %
                                    (count, resp.status), driver=self)

            count += 1

        if verify_hash:
            data_hash = base64.b64encode(b(data_hash.digest()))
            data_hash = data_hash.decode('utf-8')

        response = self._commit_blocks(object_path=object_path,
                                       chunks=chunks,
                                       lease=lease,
                                       headers=headers,
                                       meta_data=meta_data,
                                       content_type=content_type,
                                       data_hash=data_hash,
                                       object_name=object_name,
                                       file_path=file_path)

        # According to the Azure docs:
        # > This header refers to the content of the request, meaning, in this
        # > case, the list of blocks, and not the content of the blob itself.
        # However, the validation code assumes that the content-md5 in the
        # server response refers to the object so we must discard the value
        response.headers['content-md5'] = None

        return {
            'response': response,
            'data_hash': data_hash,
            'bytes_transferred': bytes_transferred,
        }

    def _commit_blocks(self, object_path, chunks, lease, headers,
                       meta_data, content_type, data_hash,
                       object_name, file_path):
        """
        Makes a final commit of the data.
        """

        root = ET.Element('BlockList')

        for block_id in chunks:
            part = ET.SubElement(root, 'Uncommitted')
            part.text = str(block_id)

        data = tostring(root)
        params = {'comp': 'blocklist'}
        headers = headers or {}

        lease.update_headers(headers)
        lease.renew()

        headers['x-ms-blob-content-type'] = self._determine_content_type(
            content_type, object_name, file_path)

        if data_hash is not None:
            headers['x-ms-blob-content-md5'] = data_hash

        self._update_metadata(headers, meta_data)

        data_hash = self._get_hash_function()
        data_hash.update(data.encode('utf-8'))
        data_hash = base64.b64encode(b(data_hash.digest()))
        headers['Content-MD5'] = data_hash.decode('utf-8')

        headers['Content-Length'] = len(data)

        headers = self._fix_headers(headers)

        response = self.connection.request(object_path, data=data,
                                           params=params, headers=headers,
                                           method='PUT')

        if response.status != httplib.CREATED:
            raise LibcloudError('Error in blocklist commit', driver=self)

        return response

    def upload_object(self, file_path, container, object_name,
                      verify_hash=True, extra=None, headers=None,
                      ex_use_lease=False,
                      **deprecated_kwargs):
        """
        Upload an object currently located on a disk.

        @inherits: :class:`StorageDriver.upload_object`

        :param ex_use_lease: Indicates if we must take a lease before upload
        :type ex_use_lease: ``bool``
        """
        if deprecated_kwargs:
            raise ValueError('Support for arguments was removed: %s'
                             % ', '.join(deprecated_kwargs.keys()))

        blob_size = os.stat(file_path).st_size

        with open(file_path, 'rb') as fobj:
            return self._put_object(container=container,
                                    object_name=object_name,
                                    extra=extra, verify_hash=verify_hash,
                                    use_lease=ex_use_lease, headers=headers,
                                    blob_size=blob_size, file_path=file_path,
                                    stream=fobj)

    def upload_object_via_stream(self, iterator, container, object_name,
                                 verify_hash=True, extra=None, headers=None,
                                 ex_use_lease=False,
                                 **deprecated_kwargs):
        """
        @inherits: :class:`StorageDriver.upload_object_via_stream`

        :param ex_use_lease: Indicates if we must take a lease before upload
        :type ex_use_lease: ``bool``
        """
        if deprecated_kwargs:
            raise ValueError('Support for arguments was removed: %s'
                             % ', '.join(deprecated_kwargs.keys()))

        return self._put_object(container=container,
                                object_name=object_name,
                                extra=extra, verify_hash=verify_hash,
                                use_lease=ex_use_lease,
                                headers=headers,
                                blob_size=None,
                                stream=iterator)

    def delete_object(self, obj):
        """
        @inherits: :class:`StorageDriver.delete_object`
        """
        object_path = self._get_object_path(obj.container, obj.name)
        response = self.connection.request(object_path, method='DELETE')

        if response.status == httplib.ACCEPTED:
            return True
        elif response.status == httplib.NOT_FOUND:
            raise ObjectDoesNotExistError(value=None, driver=self,
                                          object_name=obj.name)

        return False

    def _fix_headers(self, headers):
        """
        Update common HTTP headers to their equivalent in Azure Storage

        :param headers: The headers dictionary to be updated
        :type headers: ``dict``
        """
        to_fix = (
            'cache-control',
            'content-encoding',
            'content-language',
        )

        fixed = {}

        for key, value in headers.items():
            key_lower = key.lower()

            if key_lower in to_fix:
                fixed['x-ms-blob-%s' % key_lower] = value
            else:
                fixed[key] = value

        return fixed

    def _update_metadata(self, headers, meta_data):
        """
        Update the given metadata in the headers

        :param headers: The headers dictionary to be updated
        :type headers: ``dict``

        :param meta_data: Metadata key value pairs
        :type meta_data: ``dict``
        """
        for key, value in list(meta_data.items()):
            key = 'x-ms-meta-%s' % (key)
            headers[key] = value

    def _put_object(self, container, object_name, stream,
                    extra=None, verify_hash=True, headers=None,
                    blob_size=None, file_path=None,
                    use_lease=False):
        """
        Control function that does the real job of uploading data to a blob
        """
        extra = extra or {}
        content_type = extra.get('content_type', None)
        meta_data = extra.get('meta_data', {})

        object_path = self._get_object_path(container, object_name)

        # Get a lease if required and do the operations
        with AzureBlobLease(self, object_path, use_lease) as lease:
            if blob_size is not None and blob_size <= AZURE_UPLOAD_CHUNK_SIZE:
                result_dict = self._upload_directly(stream=stream,
                                                    object_path=object_path,
                                                    lease=lease,
                                                    blob_size=blob_size,
                                                    meta_data=meta_data,
                                                    headers=headers,
                                                    content_type=content_type,
                                                    object_name=object_name,
                                                    file_path=file_path)
            else:
                result_dict = self._upload_in_chunks(stream=stream,
                                                     object_path=object_path,
                                                     lease=lease,
                                                     meta_data=meta_data,
                                                     headers=headers,
                                                     content_type=content_type,
                                                     object_name=object_name,
                                                     file_path=file_path,
                                                     verify_hash=verify_hash)

            response = result_dict['response']
            bytes_transferred = result_dict['bytes_transferred']
            data_hash = result_dict['data_hash']
            headers = response.headers

        if response.status != httplib.CREATED:
            raise LibcloudError(
                'Unexpected status code, status_code=%s' % (response.status),
                driver=self)

        server_hash = headers.get('content-md5')

        if server_hash:
            server_hash = binascii.hexlify(base64.b64decode(b(server_hash)))
            server_hash = server_hash.decode('utf-8')
        else:
            # TODO: HACK - We could poll the object for a while and get
            # the hash
            pass

        if (verify_hash and server_hash and data_hash != server_hash):
            raise ObjectHashMismatchError(
                value='MD5 hash checksum does not match',
                object_name=object_name, driver=self)

        return Object(name=object_name, size=bytes_transferred,
                      hash=headers['etag'], extra=None,
                      meta_data=meta_data, container=container,
                      driver=self)

    def _upload_directly(self, stream, object_path, lease, blob_size,
                         meta_data, content_type, object_name, file_path,
                         headers):

        headers = headers or {}
        lease.update_headers(headers)

        self._update_metadata(headers, meta_data)

        headers['Content-Length'] = str(blob_size)
        headers['x-ms-blob-type'] = 'BlockBlob'

        return self._upload_object(object_name=object_name,
                                   file_path=file_path,
                                   content_type=content_type,
                                   request_path=object_path,
                                   stream=stream,
                                   headers=headers)

    def ex_set_object_metadata(self, obj, meta_data):
        """
        Set metadata for an object

        :param obj: The blob object
        :type obj: :class:`Object`

        :param meta_data: Metadata key value pairs
        :type meta_data: ``dict``
        """
        object_path = self._get_object_path(obj.container, obj.name)
        params = {'comp': 'metadata'}
        headers = {}

        self._update_metadata(headers, meta_data)

        response = self.connection.request(object_path, method='PUT',
                                           params=params,
                                           headers=headers)

        if response.status != httplib.OK:
            # pylint: disable=too-many-function-args
            response.parse_error('Setting metadata')

Anon7 - 2022
AnonSec Team