Server IP : 85.214.239.14 / Your IP : 18.221.248.140 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/cwd/proc/3/task/3/cwd/proc/2/task/2/cwd/lib/python3/dist-packages/libcloud/common/ |
Upload File : |
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import datetime import hashlib import hmac from libcloud.utils.py3 import urlquote __all__ = [ 'OSCRequestSignerAlgorithmV4', ] class OSCRequestSigner(object): """ Class which handles signing the outgoing AWS requests. """ def __init__(self, access_key: str, access_secret: str, version: str, connection): """ :param access_key: Access key. :type access_key: ``str`` :param access_secret: Access secret. :type access_secret: ``str`` :param version: API version. :type version: ``str`` :param connection: Connection instance. :type connection: :class:`Connection` """ self.access_key = access_key self.access_secret = access_secret self.version = version self.connection = connection class OSCRequestSignerAlgorithmV4(OSCRequestSigner): @staticmethod def sign(key, msg): return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() @staticmethod def _get_signed_headers(headers: dict): return ';'.join([k.lower() for k in sorted(headers.keys())]) @staticmethod def _get_canonical_headers(headers: dict): return '\n'.join([':'.join([k.lower(), str(v).strip()]) for k, v in sorted(headers.items())]) + '\n' @staticmethod def _get_request_params(params: dict): return '&'.join(["%s=%s" % (urlquote(k, safe=''), urlquote(str(v), safe='~')) for k, v in sorted(params.items())]) def get_request_headers(self, service_name: str, region: str, action: str, data: str): date = datetime.utcnow() host = "{}.{}.outscale.com".format(service_name, region) headers = { 'Content-Type': "application/json; charset=utf-8", 'X-Osc-Date': date.strftime('%Y%m%dT%H%M%SZ'), 'Host': host, } path = "/{}/{}/{}".format( self.connection.service_name, self.version, action ) sig = self._get_authorization_v4_header( headers=headers, dt=date, method='POST', path=path, data=data ) headers.update({'Authorization': sig}) return headers def _get_authorization_v4_header(self, headers: dict, data: str, dt: datetime, method: str = 'GET', path: str = '/'): credentials_scope = self._get_credential_scope(dt=dt) signed_headers = self._get_signed_headers(headers=headers) signature = self._get_signature(headers=headers, dt=dt, method=method, path=path, data=data) return 'OSC4-HMAC-SHA256 Credential=%(u)s/%(c)s, ' \ 'SignedHeaders=%(sh)s, Signature=%(s)s' % { 'u': self.access_key, 'c': credentials_scope, 'sh': signed_headers, 's': signature } def _get_signature(self, headers: dict, dt: datetime, method: str, path: str, data: str): string_to_sign = self._get_string_to_sign(headers=headers, dt=dt, method=method, path=path, data=data) signing_key = self._get_key_to_sign_with( self.access_secret, dt.strftime('%Y%m%d') ) return hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest() def _get_key_to_sign_with(self, key: str, dt: str): k_date = self.sign(('OSC4' + key).encode('utf-8'), dt) k_region = self.sign(k_date, self.connection.region_name) k_service = self.sign(k_region, self.connection.service_name) return self.sign(k_service, 'osc4_request') def _get_string_to_sign(self, headers: dict, dt: datetime, method: str, path: str, data: str): canonical_request = self._get_canonical_request(headers=headers, method=method, path=path, data=data) return 'OSC4-HMAC-SHA256' + '\n' \ + dt.strftime('%Y%m%dT%H%M%SZ') + '\n' \ + self._get_credential_scope(dt) + '\n' \ + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest() def _get_credential_scope(self, dt): return '/'.join([dt.strftime('%Y%m%d'), self.connection.region_name, self.connection.service_name, 'osc4_request']) def _get_canonical_request(self, headers, method, path, data): return '\n'.join([ method, path, self._get_request_params({}), self._get_canonical_headers(headers), self._get_signed_headers(headers), hashlib.sha256(data.encode('utf-8')).hexdigest() ])