Server IP : 85.214.239.14 / Your IP : 18.191.234.202 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/root/proc/2/task/2/root/proc/self/root/lib/python3/dist-packages/libcloud/common/ |
Upload File : |
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import hashlib import hmac import sys import time import uuid from libcloud.utils.py3 import ET from libcloud.common.base import ConnectionUserAndKey, XmlResponse from libcloud.common.types import MalformedResponseError from libcloud.utils.py3 import b, u, urlquote, PY3 from libcloud.utils.xml import findtext __all__ = [ 'AliyunXmlResponse', 'AliyunRequestSigner', 'AliyunRequestSignerAlgorithmV1_0', 'SignedAliyunConnection', 'AliyunConnection', 'SIGNATURE_VERSION_1_0', 'DEFAULT_SIGNATURE_VERSION' ] SIGNATURE_VERSION_1_0 = '1.0' DEFAULT_SIGNATURE_VERSION = SIGNATURE_VERSION_1_0 class AliyunXmlResponse(XmlResponse): namespace = None def success(self): return 200 <= self.status < 300 def parse_body(self): """ Each response from Aliyun contains a request id and a host id. The response body is in utf-8 encoding. """ if len(self.body) == 0 and not self.parse_zero_length_body: return self.body try: if PY3: parser = ET.XMLParser(encoding='utf-8') body = ET.XML(self.body.encode('utf-8'), parser=parser) else: try: body = ET.XML(self.body) except ValueError: body = ET.XML(self.body.encode('utf-8')) except Exception: raise MalformedResponseError('Failed to parse XML', body=self.body, driver=self.connection.driver) self.request_id = findtext(element=body, xpath='RequestId', namespace=self.namespace) self.host_id = findtext(element=body, xpath='HostId', namespace=self.namespace) return body def parse_error(self): """ Parse error responses from Aliyun. """ body = super(AliyunXmlResponse, self).parse_error() code, message = self._parse_error_details(element=body) request_id = findtext(element=body, xpath='RequestId', namespace=self.namespace) host_id = findtext(element=body, xpath='HostId', namespace=self.namespace) error = {'code': code, 'message': message, 'request_id': request_id, 'host_id': host_id} return u(error) def _parse_error_details(self, element): """ Parse error code and message from the provided error element. :return: ``tuple`` with two elements: (code, message) :rtype: ``tuple`` """ code = findtext(element=element, xpath='Code', namespace=self.namespace) message = findtext(element=element, xpath='Message', namespace=self.namespace) return (code, message) class AliyunRequestSigner(object): """ Class handles signing the outgoing Aliyun requests. """ def __init__(self, access_key, access_secret, version): """ :param access_key: Access key. :type access_key: ``str`` :param access_secret: Access secret. :type access_secret: ``str`` :param version: API version. :type version: ``str`` """ self.access_key = access_key self.access_secret = access_secret self.version = version def get_request_params(self, params, method='GET', path='/'): return params def get_request_headers(self, params, headers, method='GET', path='/'): return params, headers class AliyunRequestSignerAlgorithmV1_0(AliyunRequestSigner): """Aliyun request signer using signature version 1.0.""" def get_request_params(self, params, method='GET', path='/'): params['Format'] = 'XML' params['Version'] = self.version params['AccessKeyId'] = self.access_key params['SignatureMethod'] = 'HMAC-SHA1' params['SignatureVersion'] = SIGNATURE_VERSION_1_0 params['SignatureNonce'] = _get_signature_nonce() # TODO: Support 'ResourceOwnerAccount' params['Timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) params['Signature'] = self._sign_request(params, method, path) return params def _sign_request(self, params, method, path): """ Sign Aliyun requests parameters and get the signature. StringToSign = HTTPMethod + '&' + percentEncode('/') + '&' + percentEncode(CanonicalizedQueryString) """ keys = list(params.keys()) keys.sort() pairs = [] for key in keys: pairs.append('%s=%s' % (_percent_encode(key), _percent_encode(params[key]))) qs = urlquote('&'.join(pairs), safe='-_.~') string_to_sign = '&'.join((method, urlquote(path, safe=''), qs)) b64_hmac = base64.b64encode( hmac.new(b(self._get_access_secret()), b(string_to_sign), digestmod=hashlib.sha1).digest() ) return b64_hmac.decode('utf8') def _get_access_secret(self): return '%s&' % self.access_secret class AliyunConnection(ConnectionUserAndKey): pass class SignedAliyunConnection(AliyunConnection): api_version = None def __init__(self, user_id, key, secure=True, host=None, port=None, url=None, timeout=None, proxy_url=None, retry_delay=None, backoff=None, api_version=None, signature_version=DEFAULT_SIGNATURE_VERSION): super(SignedAliyunConnection, self).__init__(user_id=user_id, key=key, secure=secure, host=host, port=port, url=url, timeout=timeout, proxy_url=proxy_url, retry_delay=retry_delay, backoff=backoff) self.signature_version = str(signature_version) if self.signature_version == '1.0': signer_cls = AliyunRequestSignerAlgorithmV1_0 else: raise ValueError('Unsupported signature_version: %s' % signature_version) if api_version is not None: self.api_version = str(api_version) else: if self.api_version is None: raise ValueError('Unsupported null api_version') self.signer = signer_cls(access_key=self.user_id, access_secret=self.key, version=self.api_version) def add_default_params(self, params): params = self.signer.get_request_params(params=params, method=self.method, path=self.action) return params def _percent_encode(encode_str): """ Encode string to utf8, quote for url and replace '+' with %20, '*' with %2A and keep '~' not converted. :param src_str: ``str`` in the same encoding with sys.stdin, default to encoding cp936. :return: ``str`` represents the encoded result :rtype: ``str`` """ encoding = sys.stdin.encoding or 'cp936' decoded = str(encode_str) if PY3: if isinstance(encode_str, bytes): decoded = encode_str.decode(encoding) else: decoded = str(encode_str).decode(encoding) res = urlquote( decoded.encode('utf8'), '') res = res.replace('+', '%20') res = res.replace('*', '%2A') res = res.replace('%7E', '~') return res def _get_signature_nonce(): return str(uuid.uuid4())