Server IP : 85.214.239.14 / Your IP : 52.15.173.197 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/cwd/proc/3/cwd/usr/lib/python3/dist-packages/libcloud/storage/drivers/ |
Upload File : |
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os.path import random import hashlib from libcloud.utils.py3 import PY3 from libcloud.utils.py3 import b if PY3: from io import FileIO as file from libcloud.common.types import LibcloudError from libcloud.storage.base import Object, Container, StorageDriver from libcloud.storage.types import ContainerAlreadyExistsError from libcloud.storage.types import ContainerDoesNotExistError from libcloud.storage.types import ContainerIsNotEmptyError from libcloud.storage.types import ObjectDoesNotExistError class DummyFileObject(file): def __init__(self, yield_count=5, chunk_len=10): self._yield_count = yield_count self._chunk_len = chunk_len def read(self, size): i = 0 while i < self._yield_count: yield self._get_chunk(self._chunk_len) i += 1 def _get_chunk(self, chunk_len): chunk = [str(x) for x in random.randint(97, 120)] return chunk def __len__(self): return self._yield_count * self._chunk_len class DummyIterator(object): def __init__(self, data=None): self.hash = hashlib.md5() self._data = data or [] self._current_item = 0 def get_md5_hash(self): return self.hash.hexdigest() def next(self): if self._current_item == len(self._data): raise StopIteration value = self._data[self._current_item] self.hash.update(b(value)) self._current_item += 1 return value def __next__(self): return self.next() def __enter__(self): pass def __exit__(self, type, value, traceback): pass class DummyStorageDriver(StorageDriver): """ Dummy Storage driver. >>> from libcloud.storage.drivers.dummy import DummyStorageDriver >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container(container_name='test container') >>> container <Container: name=test container, provider=Dummy Storage Provider> >>> container.name 'test container' >>> container.extra['object_count'] 0 """ name = 'Dummy Storage Provider' website = 'http://example.com' def __init__(self, api_key, api_secret): """ :param api_key: API key or username to used (required) :type api_key: ``str`` :param api_secret: Secret password to be used (required) :type api_secret: ``str`` :rtype: ``None`` """ self._containers = {} def get_meta_data(self): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_meta_data()['object_count'] 0 >>> driver.get_meta_data()['container_count'] 0 >>> driver.get_meta_data()['bytes_used'] 0 >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container_name = 'test container 2' >>> container = driver.create_container(container_name=container_name) >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> driver.get_meta_data()['object_count'] 1 >>> driver.get_meta_data()['container_count'] 2 >>> driver.get_meta_data()['bytes_used'] 50 :rtype: ``dict`` """ container_count = len(self._containers) object_count = sum([len(self._containers[container]['objects']) for container in self._containers]) bytes_used = 0 for container in self._containers: objects = self._containers[container]['objects'] for _, obj in objects.items(): bytes_used += obj.size return {'container_count': int(container_count), 'object_count': int(object_count), 'bytes_used': int(bytes_used)} def iterate_containers(self): """ >>> driver = DummyStorageDriver('key', 'secret') >>> list(driver.iterate_containers()) [] >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> container_name = 'test container 2' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 2, provider=Dummy Storage Provider> >>> container = driver.create_container( ... container_name='test container 2') ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerAlreadyExistsError: >>> container_list=list(driver.iterate_containers()) >>> sorted([c.name for c in container_list]) ['test container 1', 'test container 2'] @inherits: :class:`StorageDriver.iterate_containers` """ for container in list(self._containers.values()): yield container['container'] def iterate_container_objects(self, container, prefix=None, ex_prefix=None): prefix = self._normalize_prefix_argument(prefix, ex_prefix) container = self.get_container(container.name) objects = self._containers[container.name]['objects'].values() return self._filter_listed_container_objects(objects, prefix) def get_container(self, container_name): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> driver.get_container('test container 1') <Container: name=test container 1, provider=Dummy Storage Provider> @inherits: :class:`StorageDriver.get_container` """ if container_name not in self._containers: raise ContainerDoesNotExistError(driver=self, value=None, container_name=container_name) return self._containers[container_name]['container'] def get_container_cdn_url(self, container): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> container.get_cdn_url() 'http://www.test.com/container/test_container_1' @inherits: :class:`StorageDriver.get_container_cdn_url` """ if container.name not in self._containers: raise ContainerDoesNotExistError(driver=self, value=None, container_name=container.name) return self._containers[container.name]['cdn_url'] def get_object(self, container_name, object_name): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_object('unknown', 'unknown') ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> driver.get_object( ... 'test container 1', 'unknown') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ObjectDoesNotExistError: >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj.name 'test object' >>> obj.size 50 @inherits: :class:`StorageDriver.get_object` """ self.get_container(container_name) container_objects = self._containers[container_name]['objects'] if object_name not in container_objects: raise ObjectDoesNotExistError(object_name=object_name, value=None, driver=self) return container_objects[object_name] def get_object_cdn_url(self, obj): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> obj = container.upload_object_via_stream( ... object_name='test object 5', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj.name 'test object 5' >>> obj.get_cdn_url() 'http://www.test.com/object/test_object_5' @inherits: :class:`StorageDriver.get_object_cdn_url` """ container_name = obj.container.name container_objects = self._containers[container_name]['objects'] if obj.name not in container_objects: raise ObjectDoesNotExistError(object_name=obj.name, value=None, driver=self) return container_objects[obj.name].meta_data['cdn_url'] def create_container(self, container_name): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerAlreadyExistsError: @inherits: :class:`StorageDriver.create_container` """ if container_name in self._containers: raise ContainerAlreadyExistsError(container_name=container_name, value=None, driver=self) extra = {'object_count': 0} container = Container(name=container_name, extra=extra, driver=self) self._containers[container_name] = {'container': container, 'objects': {}, 'cdn_url': 'http://www.test.com/container/%s' % (container_name.replace(' ', '_')) } return container def delete_container(self, container): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = Container(name = 'test container', ... extra={'object_count': 0}, driver=driver) >>> driver.delete_container(container=container) ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> len(driver._containers) 1 >>> driver.delete_container(container=container) True >>> len(driver._containers) 0 >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> driver.delete_container(container=container) ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerIsNotEmptyError: @inherits: :class:`StorageDriver.delete_container` """ container_name = container.name if container_name not in self._containers: raise ContainerDoesNotExistError(container_name=container_name, value=None, driver=self) container = self._containers[container_name] if len(container['objects']) > 0: raise ContainerIsNotEmptyError(container_name=container_name, value=None, driver=self) del self._containers[container_name] return True def download_object(self, obj, destination_path, overwrite_existing=False, delete_on_failure=True): kwargs_dict = {'obj': obj, 'response': DummyFileObject(), 'destination_path': destination_path, 'overwrite_existing': overwrite_existing, 'delete_on_failure': delete_on_failure} return self._save_object(**kwargs_dict) def download_object_as_stream(self, obj, chunk_size=None): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> stream = container.download_object_as_stream(obj) >>> stream #doctest: +ELLIPSIS <...closed...> @inherits: :class:`StorageDriver.download_object_as_stream` """ return DummyFileObject() def upload_object(self, file_path, container, object_name, extra=None, verify_hash=True, headers=None): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container.upload_object(file_path='/tmp/inexistent.file', ... object_name='test') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): LibcloudError: >>> file_path = path = os.path.abspath(__file__) >>> file_size = os.path.getsize(file_path) >>> obj = container.upload_object(file_path=file_path, ... object_name='test') >>> obj #doctest: +ELLIPSIS <Object: name=test, size=...> >>> obj.size == file_size True @inherits: :class:`StorageDriver.upload_object` """ if not os.path.exists(file_path): raise LibcloudError(value='File %s does not exist' % (file_path), driver=self) size = os.path.getsize(file_path) return self._add_object(container=container, object_name=object_name, size=size, extra=extra) def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> obj #doctest: +ELLIPSIS <Object: name=test object, size=50, ...> @inherits: :class:`StorageDriver.upload_object_via_stream` """ size = len(iterator) return self._add_object(container=container, object_name=object_name, size=size, extra=extra) def delete_object(self, obj): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj #doctest: +ELLIPSIS <Object: name=test object, size=50, ...> >>> container.delete_object(obj=obj) True >>> obj = Object(name='test object 2', ... size=1000, hash=None, extra=None, ... meta_data=None, container=container,driver=None) >>> container.delete_object(obj=obj) #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ObjectDoesNotExistError: @inherits: :class:`StorageDriver.delete_object` """ container_name = obj.container.name object_name = obj.name obj = self.get_object(container_name=container_name, object_name=object_name) del self._containers[container_name]['objects'][object_name] return True def _add_object(self, container, object_name, size, extra=None): container = self.get_container(container.name) extra = extra or {} meta_data = extra.get('meta_data', {}) meta_data.update({'cdn_url': 'http://www.test.com/object/%s' % (object_name.replace(' ', '_'))}) obj = Object(name=object_name, size=size, extra=extra, hash=None, meta_data=meta_data, container=container, driver=self) self._containers[container.name]['objects'][object_name] = obj return obj if __name__ == "__main__": import doctest doctest.testmod()