Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.222.184.207
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/self/root/proc/2/cwd/proc/2/task/2/cwd/lib/python3/dist-packages/libcloud/storage/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/self/root/proc/2/cwd/proc/2/task/2/cwd/lib/python3/dist-packages/libcloud/storage/base.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Provides base classes for working with storage
"""

# Backward compatibility for Python 2.5
from __future__ import with_statement

from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Type

import os.path                          # pylint: disable-msg=W0404
import hashlib
import warnings
import errno
from os.path import join as pjoin

from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import b

import libcloud.utils.files
from libcloud.common.types import LibcloudError
from libcloud.common.base import Connection
from libcloud.common.base import ConnectionUserAndKey, BaseDriver
from libcloud.storage.types import ObjectDoesNotExistError

__all__ = [
    'Object',
    'Container',
    'StorageDriver',

    'CHUNK_SIZE',
    'DEFAULT_CONTENT_TYPE'
]

CHUNK_SIZE = 8096

# Default Content-Type which is sent when uploading an object if one is not
# supplied and can't be detected when using non-strict mode.
DEFAULT_CONTENT_TYPE = 'application/octet-stream'


class Object(object):
    """
    Represents an object (BLOB).
    """

    def __init__(self,
                 name,  # type: str
                 size,  # type: int
                 hash,  # type: str
                 extra,  # type: dict
                 meta_data,  # type: dict
                 container,  # type: Container
                 driver,  # type: StorageDriver
                 ):
        """
        :param name: Object name (must be unique per container).
        :type  name: ``str``

        :param size: Object size in bytes.
        :type  size: ``int``

        :param hash: Object hash.
        :type  hash: ``str``

        :param container: Object container.
        :type  container: :class:`libcloud.storage.base.Container`

        :param extra: Extra attributes.
        :type  extra: ``dict``

        :param meta_data: Optional object meta data.
        :type  meta_data: ``dict``

        :param driver: StorageDriver instance.
        :type  driver: :class:`libcloud.storage.base.StorageDriver`
        """

        self.name = name
        self.size = size
        self.hash = hash
        self.container = container
        self.extra = extra or {}
        self.meta_data = meta_data or {}
        self.driver = driver

    def get_cdn_url(self):
        # type: () -> str
        return self.driver.get_object_cdn_url(obj=self)

    def enable_cdn(self):
        # type: () -> bool
        return self.driver.enable_object_cdn(obj=self)

    def download(self, destination_path, overwrite_existing=False,
                 delete_on_failure=True):
        # type: (str, bool, bool) -> bool
        return self.driver.download_object(
            obj=self,
            destination_path=destination_path,
            overwrite_existing=overwrite_existing,
            delete_on_failure=delete_on_failure)

    def as_stream(self, chunk_size=None):
        # type: (Optional[int]) -> Iterator[bytes]
        return self.driver.download_object_as_stream(obj=self,
                                                     chunk_size=chunk_size)

    def download_range(self, destination_path, start_bytes, end_bytes=None,
                       overwrite_existing=False,
                       delete_on_failure=True):
        # type: (str, int, Optional[int], bool, bool) -> bool
        return self.driver.download_object_range(
            obj=self,
            destination_path=destination_path,
            start_bytes=start_bytes,
            end_bytes=end_bytes,
            overwrite_existing=overwrite_existing,
            delete_on_failure=delete_on_failure)

    def range_as_stream(self, start_bytes, end_bytes=None, chunk_size=None):
        # type: (int, Optional[int], Optional[int]) -> Iterator[bytes]
        return self.driver.download_object_range_as_stream(
            obj=self,
            start_bytes=start_bytes,
            end_bytes=end_bytes,
            chunk_size=chunk_size)

    def delete(self):
        # type: () -> bool
        return self.driver.delete_object(self)

    def __repr__(self):
        return ('<Object: name=%s, size=%s, hash=%s, provider=%s ...>' %
                (self.name, self.size, self.hash, self.driver.name))


class Container(object):
    """
    Represents a container (bucket) which can hold multiple objects.
    """

    def __init__(self,
                 name,  # type: str
                 extra,  # type: dict
                 driver,  # type: StorageDriver
                 ):
        """
        :param name: Container name (must be unique).
        :type name: ``str``

        :param extra: Extra attributes.
        :type extra: ``dict``

        :param driver: StorageDriver instance.
        :type driver: :class:`libcloud.storage.base.StorageDriver`
        """

        self.name = name
        self.extra = extra or {}
        self.driver = driver

    def iterate_objects(self, prefix=None, ex_prefix=None):
        # type: (Optional[str], Optional[str]) -> Iterator[Object]
        return self.driver.iterate_container_objects(container=self,
                                                     prefix=prefix,
                                                     ex_prefix=ex_prefix)

    def list_objects(self, prefix=None, ex_prefix=None):
        # type: (Optional[str], Optional[str]) -> List[Object]
        return self.driver.list_container_objects(container=self,
                                                  prefix=prefix,
                                                  ex_prefix=ex_prefix)

    def get_cdn_url(self):
        # type: () -> str
        return self.driver.get_container_cdn_url(container=self)

    def enable_cdn(self):
        # type: () -> bool
        return self.driver.enable_container_cdn(container=self)

    def get_object(self, object_name):
        # type: (str) -> Object
        return self.driver.get_object(container_name=self.name,
                                      object_name=object_name)

    def upload_object(self, file_path, object_name, extra=None,
                      verify_hash=True, headers=None):
        # type: (str, str, Optional[dict], bool, Optional[Dict[str, str]]) -> Object  # noqa: E501
        return self.driver.upload_object(
            file_path, self, object_name, extra=extra,
            verify_hash=verify_hash, headers=headers)

    def upload_object_via_stream(self, iterator, object_name, extra=None,
                                 headers=None):
        # type: (Iterator[bytes], str, Optional[dict], Optional[Dict[str, str]]) -> Object  # noqa: E501
        return self.driver.upload_object_via_stream(
            iterator, self, object_name, extra=extra, headers=headers)

    def download_object(self, obj, destination_path, overwrite_existing=False,
                        delete_on_failure=True):
        # type: (Object, str, bool, bool) -> bool
        return self.driver.download_object(
            obj, destination_path, overwrite_existing=overwrite_existing,
            delete_on_failure=delete_on_failure)

    def download_object_as_stream(self, obj, chunk_size=None):
        # type: (Object, Optional[int]) -> Iterator[bytes]
        return self.driver.download_object_as_stream(obj, chunk_size)

    def download_object_range(self, obj, destination_path, start_bytes,
                              end_bytes=None, overwrite_existing=False,
                              delete_on_failure=True):
        # type: (Object, str, int, Optional[int], bool, bool) -> bool
        return self.driver.download_object_range(
            obj=obj,
            destination_path=destination_path,
            start_bytes=start_bytes,
            end_bytes=end_bytes,
            overwrite_existing=overwrite_existing,
            delete_on_failure=delete_on_failure)

    def download_object_range_as_stream(self, obj, start_bytes, end_bytes=None,
                                        chunk_size=None):
        # type: (Object, int, Optional[int], Optional[int]) -> Iterator[bytes]
        return self.driver.download_object_range_as_stream(
            obj=obj,
            start_bytes=start_bytes,
            end_bytes=end_bytes,
            chunk_size=chunk_size)

    def delete_object(self, obj):
        # type: (Object) -> bool
        return self.driver.delete_object(obj)

    def delete(self):
        # type: () -> bool
        return self.driver.delete_container(self)

    def __repr__(self):
        return ('<Container: name=%s, provider=%s>'
                % (self.name, self.driver.name))


class StorageDriver(BaseDriver):
    """
    A base StorageDriver to derive from.
    """

    connectionCls = ConnectionUserAndKey  # type: Type[Connection]
    name = None  # type: str
    hash_type = 'md5'  # type: str
    supports_chunked_encoding = False  # type: bool

    # When strict mode is used, exception will be thrown if no content type is
    # provided and none can be detected when uploading an object
    strict_mode = False  # type: bool

    def iterate_containers(self):
        # type: () -> Iterator[Container]
        """
        Return a iterator of containers for the given account

        :return: A iterator of Container instances.
        :rtype: ``iterator`` of :class:`libcloud.storage.base.Container`
        """
        raise NotImplementedError(
            'iterate_containers not implemented for this driver')

    def list_containers(self):
        # type: () -> List[Container]
        """
        Return a list of containers.

        :return: A list of Container instances.
        :rtype: ``list`` of :class:`Container`
        """
        return list(self.iterate_containers())

    def iterate_container_objects(self, container, prefix=None,
                                  ex_prefix=None):
        # type: (Container, Optional[str], Optional[str]) -> Iterator[Object]
        """
        Return a iterator of objects for the given container.

        :param container: Container instance
        :type container: :class:`libcloud.storage.base.Container`

        :param prefix: Filter objects starting with a prefix.
        :type  prefix: ``str``

        :param ex_prefix: (Deprecated.) Filter objects starting with a prefix.
        :type  ex_prefix: ``str``

        :return: A iterator of Object instances.
        :rtype: ``iterator`` of :class:`libcloud.storage.base.Object`
        """
        raise NotImplementedError(
            'iterate_container_objects not implemented for this driver')

    def list_container_objects(self, container, prefix=None, ex_prefix=None):
        # type: (Container, Optional[str], Optional[str]) -> List[Object]
        """
        Return a list of objects for the given container.

        :param container: Container instance.
        :type container: :class:`libcloud.storage.base.Container`

        :param prefix: Filter objects starting with a prefix.
        :type  prefix: ``str``

        :param ex_prefix: (Deprecated.) Filter objects starting with a prefix.
        :type  ex_prefix: ``str``

        :return: A list of Object instances.
        :rtype: ``list`` of :class:`libcloud.storage.base.Object`
        """
        return list(self.iterate_container_objects(container,
                                                   prefix=prefix,
                                                   ex_prefix=ex_prefix))

    def _normalize_prefix_argument(self, prefix, ex_prefix):
        if ex_prefix:
            warnings.warn('The ``ex_prefix`` argument is deprecated - '
                          'please update code to use ``prefix``',
                          DeprecationWarning)
            return ex_prefix

        return prefix

    def _filter_listed_container_objects(self, objects, prefix):
        if prefix is not None:
            warnings.warn('Driver %s does not implement native object '
                          'filtering; falling back to filtering the full '
                          'object stream.' % self.__class__.__name__)

        for obj in objects:
            if prefix is None or obj.name.startswith(prefix):
                yield obj

    def get_container(self, container_name):
        # type: (str) -> Container
        """
        Return a container instance.

        :param container_name: Container name.
        :type container_name: ``str``

        :return: :class:`Container` instance.
        :rtype: :class:`libcloud.storage.base.Container`
        """
        raise NotImplementedError(
            'get_object not implemented for this driver')

    def get_container_cdn_url(self, container):
        # type: (Container) -> str
        """
        Return a container CDN URL.

        :param container: Container instance
        :type  container: :class:`libcloud.storage.base.Container`

        :return: A CDN URL for this container.
        :rtype: ``str``
        """
        raise NotImplementedError(
            'get_container_cdn_url not implemented for this driver')

    def get_object(self, container_name, object_name):
        # type: (str, str) -> Object
        """
        Return an object instance.

        :param container_name: Container name.
        :type  container_name: ``str``

        :param object_name: Object name.
        :type  object_name: ``str``

        :return: :class:`Object` instance.
        :rtype: :class:`libcloud.storage.base.Object`
        """
        raise NotImplementedError(
            'get_object not implemented for this driver')

    def get_object_cdn_url(self, obj):
        # type: (Object) -> str
        """
        Return an object CDN URL.

        :param obj: Object instance
        :type  obj: :class:`libcloud.storage.base.Object`

        :return: A CDN URL for this object.
        :rtype: ``str``
        """
        raise NotImplementedError(
            'get_object_cdn_url not implemented for this driver')

    def enable_container_cdn(self, container):
        # type: (Container) -> bool
        """
        Enable container CDN.

        :param container: Container instance
        :type  container: :class:`libcloud.storage.base.Container`

        :rtype: ``bool``
        """
        raise NotImplementedError(
            'enable_container_cdn not implemented for this driver')

    def enable_object_cdn(self, obj):
        # type: (Object) -> bool
        """
        Enable object CDN.

        :param obj: Object instance
        :type  obj: :class:`libcloud.storage.base.Object`

        :rtype: ``bool``
        """
        raise NotImplementedError(
            'enable_object_cdn not implemented for this driver')

    def download_object(self, obj, destination_path, overwrite_existing=False,
                        delete_on_failure=True):
        # type: (Object, str, bool, bool) -> bool
        """
        Download an object to the specified destination path.

        :param obj: Object instance.
        :type obj: :class:`libcloud.storage.base.Object`

        :param destination_path: Full path to a file or a directory where the
                                 incoming file will be saved.
        :type destination_path: ``str``

        :param overwrite_existing: True to overwrite an existing file,
                                   defaults to False.
        :type overwrite_existing: ``bool``

        :param delete_on_failure: True to delete a partially downloaded file if
                                   the download was not successful (hash
                                   mismatch / file size).
        :type delete_on_failure: ``bool``

        :return: True if an object has been successfully downloaded, False
                 otherwise.
        :rtype: ``bool``
        """
        raise NotImplementedError(
            'download_object not implemented for this driver')

    def download_object_as_stream(self, obj, chunk_size=None):
        # type: (Object, Optional[int]) -> Iterator[bytes]
        """
        Return a iterator which yields object data.

        :param obj: Object instance
        :type obj: :class:`libcloud.storage.base.Object`

        :param chunk_size: Optional chunk size (in bytes).
        :type chunk_size: ``int``

        :rtype: ``iterator`` of ``bytes``
        """
        raise NotImplementedError(
            'download_object_as_stream not implemented for this driver')

    def download_object_range(self, obj, destination_path, start_bytes,
                              end_bytes=None, overwrite_existing=False,
                              delete_on_failure=True):
        # type: (Object, str, int, Optional[int], bool, bool) -> bool
        """
        Download part of an object.

        :param obj: Object instance.
        :type obj: :class:`libcloud.storage.base.Object`

        :param destination_path: Full path to a file or a directory where the
                                 incoming file will be saved.
        :type destination_path: ``str``

        :param start_bytes: Start byte offset (inclusive) for the range
                            download. Offset is 0 index based so the first
                            byte in file file is "0".
        :type start_bytes: ``int``

        :param end_bytes: End byte offset (non-inclusive) for the range
                          download. If not provided, it will default to the
                          end of the file.
        :type end_bytes: ``int``

        :param overwrite_existing: True to overwrite an existing file,
                                   defaults to False.
        :type overwrite_existing: ``bool``

        :param delete_on_failure: True to delete a partially downloaded file if
                                   the download was not successful (hash
                                   mismatch / file size).
        :type delete_on_failure: ``bool``

        :return: True if an object has been successfully downloaded, False
                 otherwise.
        :rtype: ``bool``

        """
        raise NotImplementedError(
            'download_object_range not implemented for this driver')

    def download_object_range_as_stream(self, obj, start_bytes, end_bytes=None,
                                        chunk_size=None):
        # type: (Object, int, Optional[int], Optional[int]) -> Iterator[bytes]
        """
        Return a iterator which yields range / part of the object data.

        :param obj: Object instance
        :type obj: :class:`libcloud.storage.base.Object`

        :param start_bytes: Start byte offset (inclusive) for the range
                            download. Offset is 0 index based so the first
                            byte in file file is "0".
        :type start_bytes: ``int``

        :param end_bytes: End byte offset (non-inclusive) for the range
                          download. If not provided, it will default to the
                          end of the file.
        :type end_bytes: ``int``

        :param chunk_size: Optional chunk size (in bytes).
        :type chunk_size: ``int``

        :rtype: ``iterator`` of ``bytes``
        """
        raise NotImplementedError(
            'download_object_range_as_stream not implemented for this driver')

    def upload_object(self, file_path, container, object_name, extra=None,
                      verify_hash=True, headers=None):
        # type: (str, Container, str, Optional[dict], bool, Optional[Dict[str, str]]) -> Object  # noqa: E501
        """
        Upload an object currently located on a disk.

        :param file_path: Path to the object on disk.
        :type file_path: ``str``

        :param container: Destination container.
        :type container: :class:`libcloud.storage.base.Container`

        :param object_name: Object name.
        :type object_name: ``str``

        :param verify_hash: Verify hash
        :type verify_hash: ``bool``

        :param extra: Extra attributes (driver specific). (optional)
        :type extra: ``dict``

        :param headers: (optional) Additional request headers,
            such as CORS headers. For example:
            headers = {'Access-Control-Allow-Origin': 'http://mozilla.com'}
        :type headers: ``dict``

        :rtype: :class:`libcloud.storage.base.Object`
        """
        raise NotImplementedError(
            'upload_object not implemented for this driver')

    def upload_object_via_stream(self, iterator, container,
                                 object_name,
                                 extra=None,
                                 headers=None):
        # type: (Iterator[bytes], Container, str, Optional[dict], Optional[Dict[str, str]]) -> Object  # noqa: E501
        """
        Upload an object using an iterator.

        If a provider supports it, chunked transfer encoding is used and you
        don't need to know in advance the amount of data to be uploaded.

        Otherwise if a provider doesn't support it, iterator will be exhausted
        so a total size for data to be uploaded can be determined.

        Note: Exhausting the iterator means that the whole data must be
        buffered in memory which might result in memory exhausting when
        uploading a very large object.

        If a file is located on a disk you are advised to use upload_object
        function which uses fs.stat function to determine the file size and it
        doesn't need to buffer whole object in the memory.

        :param iterator: An object which implements the iterator interface.
        :type iterator: :class:`object`

        :param container: Destination container.
        :type container: :class:`libcloud.storage.base.Container`

        :param object_name: Object name.
        :type object_name: ``str``

        :param extra: (optional) Extra attributes (driver specific). Note:
            This dictionary must contain a 'content_type' key which represents
            a content type of the stored object.
        :type extra: ``dict``

        :param headers: (optional) Additional request headers,
            such as CORS headers. For example:
            headers = {'Access-Control-Allow-Origin': 'http://mozilla.com'}
        :type headers: ``dict``

        :rtype: ``libcloud.storage.base.Object``
        """
        raise NotImplementedError(
            'upload_object_via_stream not implemented for this driver')

    def delete_object(self, obj):
        # type: (Object) -> bool
        """
        Delete an object.

        :param obj: Object instance.
        :type obj: :class:`libcloud.storage.base.Object`

        :return: ``bool`` True on success.
        :rtype: ``bool``
        """
        raise NotImplementedError(
            'delete_object not implemented for this driver')

    def create_container(self, container_name):
        # type: (str) -> Container
        """
        Create a new container.

        :param container_name: Container name.
        :type container_name: ``str``

        :return: Container instance on success.
        :rtype: :class:`libcloud.storage.base.Container`
        """
        raise NotImplementedError(
            'create_container not implemented for this driver')

    def delete_container(self, container):
        # type: (Container) -> bool
        """
        Delete a container.

        :param container: Container instance
        :type container: :class:`libcloud.storage.base.Container`

        :return: ``True`` on success, ``False`` otherwise.
        :rtype: ``bool``
        """
        raise NotImplementedError(
            'delete_container not implemented for this driver')

    def _get_object(self, obj, callback, callback_kwargs, response,
                    success_status_code=None):
        """
        Call passed callback and start transfer of the object'

        :param obj: Object instance.
        :type obj: :class:`Object`

        :param callback: Function which is called with the passed
            callback_kwargs
        :type callback: :class:`function`

        :param callback_kwargs: Keyword arguments which are passed to the
             callback.
        :type callback_kwargs: ``dict``

        :param response: Response instance.
        :type response: :class:`Response`

        :param success_status_code: Status code which represents a successful
                                    transfer (defaults to httplib.OK)
        :type success_status_code: ``int``

        :return: ``True`` on success, ``False`` otherwise.
        :rtype: ``bool``
        """
        success_status_code = success_status_code or httplib.OK

        if not isinstance(success_status_code, (list, tuple)):
            success_status_codes = [success_status_code]
        else:
            success_status_codes = success_status_code

        if response.status in success_status_codes:
            return callback(**callback_kwargs)
        elif response.status == httplib.NOT_FOUND:
            raise ObjectDoesNotExistError(object_name=obj.name,
                                          value='', driver=self)

        raise LibcloudError(value='Unexpected status code: %s' %
                                  (response.status),
                            driver=self)

    def _save_object(self, response, obj, destination_path,
                     overwrite_existing=False, delete_on_failure=True,
                     chunk_size=None, partial_download=False):
        """
        Save object to the provided path.

        :param response: RawResponse instance.
        :type response: :class:`RawResponse`

        :param obj: Object instance.
        :type obj: :class:`Object`

        :param destination_path: Destination directory.
        :type destination_path: ``str``

        :param delete_on_failure: True to delete partially downloaded object if
                                  the download fails.
        :type delete_on_failure: ``bool``

        :param overwrite_existing: True to overwrite a local path if it already
                                   exists.
        :type overwrite_existing: ``bool``

        :param chunk_size: Optional chunk size
            (defaults to ``libcloud.storage.base.CHUNK_SIZE``, 8kb)
        :type chunk_size: ``int``

        :param partial_download: True if this is a range (partial) save,
                                 False otherwise.
        :type partial_download: ``bool``

        :return: ``True`` on success, ``False`` otherwise.
        :rtype: ``bool``
        """

        chunk_size = chunk_size or CHUNK_SIZE

        base_name = os.path.basename(destination_path)

        if not base_name and not os.path.exists(destination_path):
            raise LibcloudError(
                value='Path %s does not exist' % (destination_path),
                driver=self)

        if not base_name:
            file_path = pjoin(destination_path, obj.name)
        else:
            file_path = destination_path

        if os.path.exists(file_path) and not overwrite_existing:
            raise LibcloudError(
                value='File %s already exists, but ' % (file_path) +
                'overwrite_existing=False',
                driver=self)

        bytes_transferred = 0

        with open(file_path, 'wb') as file_handle:
            for chunk in response._response.iter_content(chunk_size):
                file_handle.write(b(chunk))
                bytes_transferred += len(chunk)

        if not partial_download and int(obj.size) != int(bytes_transferred):
            # Transfer failed, support retry?
            # NOTE: We only perform this check if this is a regular and not a
            # partial / range download
            if delete_on_failure:
                try:
                    os.unlink(file_path)
                except Exception:
                    pass

            return False

        return True

    def _upload_object(self, object_name, content_type, request_path,
                       request_method='PUT',
                       headers=None, file_path=None, stream=None,
                       chunked=False, multipart=False):
        """
        Helper function for setting common request headers and calling the
        passed in callback which uploads an object.
        """
        headers = headers or {}

        if file_path and not os.path.exists(file_path):
            raise OSError('File %s does not exist' % (file_path))

        if stream is not None and not hasattr(stream, 'next') and not \
                hasattr(stream, '__next__'):
            raise AttributeError('iterator object must implement next() ' +
                                 'method.')

        headers['Content-Type'] = self._determine_content_type(
            content_type, object_name, file_path=file_path)

        if stream:
            response = self.connection.request(
                request_path,
                method=request_method, data=stream,
                headers=headers, raw=True)
            stream_hash, stream_length = self._hash_buffered_stream(
                stream,
                self._get_hash_function())
        else:
            with open(file_path, 'rb') as file_stream:
                response = self.connection.request(
                    request_path,
                    method=request_method, data=file_stream,
                    headers=headers, raw=True)
            with open(file_path, 'rb') as file_stream:
                stream_hash, stream_length = self._hash_buffered_stream(
                    file_stream,
                    self._get_hash_function())

        if not response.success():
            response.parse_error()

        return {'response': response,
                'bytes_transferred': stream_length,
                'data_hash': stream_hash}

    def _determine_content_type(self, content_type, object_name,
                                file_path=None):
        if content_type:
            return content_type

        name = file_path or object_name
        content_type, _ = libcloud.utils.files.guess_file_mime_type(name)

        if self.strict_mode and not content_type:
            raise AttributeError('File content-type could not be guessed for '
                                 '"%s" and no content_type value is provided'
                                 % name)

        return content_type or DEFAULT_CONTENT_TYPE

    def _hash_buffered_stream(self, stream, hasher, blocksize=65536):
        total_len = 0

        if hasattr(stream, '__next__') or hasattr(stream, 'next'):
            # Ensure we start from the begining of a stream in case stream is
            # not at the beginning
            if hasattr(stream, 'seek'):
                try:
                    stream.seek(0)
                except OSError as e:
                    if e.errno != errno.ESPIPE:
                        # This represents "OSError: [Errno 29] Illegal seek"
                        # error. This could either mean that the underlying
                        # handle doesn't support seek operation (e.g. pipe) or
                        # that the invalid seek position is provided. Sadly
                        # there is no good robust way to distinghuish that so
                        # we simply ignore all the "Illeal seek" errors so
                        # this function works correctly with pipes.
                        # See https://github.com/apache/libcloud/pull/1427 for
                        # details
                        raise e

            for chunk in libcloud.utils.files.read_in_chunks(iterator=stream):
                hasher.update(b(chunk))
                total_len += len(chunk)

            return (hasher.hexdigest(), total_len)

        if not hasattr(stream, '__exit__'):
            for s in stream:
                hasher.update(s)
                total_len = total_len + len(s)
            return (hasher.hexdigest(), total_len)

        with stream:
            buf = stream.read(blocksize)
            while len(buf) > 0:
                total_len = total_len + len(buf)
                hasher.update(buf)
                buf = stream.read(blocksize)

        return (hasher.hexdigest(), total_len)

    def _get_hash_function(self):
        """
        Return instantiated hash function for the hash type supported by
        the provider.
        """
        try:
            func = getattr(hashlib, self.hash_type)()
        except AttributeError:
            raise RuntimeError('Invalid or unsupported hash type: %s' %
                               (self.hash_type))

        return func

    def _validate_start_and_end_bytes(self, start_bytes, end_bytes=None):
        # type: (int, Optional[int]) -> bool
        """
        Method which validates that start_bytes and end_bytes arguments contain
        valid values.
        """
        if start_bytes < 0:
            raise ValueError('start_bytes must be greater than 0')

        if end_bytes is not None:
            if start_bytes > end_bytes:
                raise ValueError('start_bytes must be smaller than end_bytes')
            elif start_bytes == end_bytes:
                raise ValueError('start_bytes and end_bytes can\'t be the '
                                 'same. end_bytes is non-inclusive')

        return True

    def _get_standard_range_str(self, start_bytes, end_bytes=None,
                                end_bytes_inclusive=False):
        # type: (int, Optional[int], bool) -> str
        """
        Return range string which is used as a Range header value for range
        requests for drivers which follow standard Range header notation

        This returns range string in the following format:
        bytes=<start_bytes>-<end bytes>.

        For example:

        bytes=1-10
        bytes=0-2
        bytes=5-
        bytes=100-5000

        :param end_bytes_inclusive: True if "end_bytes" offset should be
        inclusive (aka opposite from the Python indexing behavior where the end
        index is not inclusive).
        """
        range_str = 'bytes=%s-' % (start_bytes)

        if end_bytes is not None:
            if end_bytes_inclusive:
                range_str += str(end_bytes)
            else:
                range_str += str(end_bytes - 1)

        return range_str

Anon7 - 2022
AnonSec Team