Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.138.178.162
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/libcloud/common/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/libcloud/common//osc.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime
import hashlib
import hmac

from libcloud.utils.py3 import urlquote

__all__ = [
    'OSCRequestSignerAlgorithmV4',
]


class OSCRequestSigner(object):
    """
    Class which handles signing the outgoing AWS requests.
    """

    def __init__(self, access_key: str,
                 access_secret: str,
                 version: str,
                 connection):
        """
        :param access_key: Access key.
        :type access_key: ``str``

        :param access_secret: Access secret.
        :type access_secret: ``str``

        :param version: API version.
        :type version: ``str``

        :param connection: Connection instance.
        :type connection: :class:`Connection`
        """
        self.access_key = access_key
        self.access_secret = access_secret
        self.version = version
        self.connection = connection


class OSCRequestSignerAlgorithmV4(OSCRequestSigner):
    @staticmethod
    def sign(key, msg):
        return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()

    @staticmethod
    def _get_signed_headers(headers: dict):
        return ';'.join([k.lower() for k in sorted(headers.keys())])

    @staticmethod
    def _get_canonical_headers(headers: dict):
        return '\n'.join([':'.join([k.lower(), str(v).strip()])
                          for k, v in sorted(headers.items())]) + '\n'

    @staticmethod
    def _get_request_params(params: dict):
        return '&'.join(["%s=%s" %
                         (urlquote(k, safe=''), urlquote(str(v), safe='~'))
                         for k, v in sorted(params.items())])

    def get_request_headers(self, service_name: str, region: str, action: str,
                            data: str):
        date = datetime.utcnow()
        host = "{}.{}.outscale.com".format(service_name, region)
        headers = {
            'Content-Type': "application/json; charset=utf-8",
            'X-Osc-Date': date.strftime('%Y%m%dT%H%M%SZ'),
            'Host': host,
        }
        path = "/{}/{}/{}".format(
            self.connection.service_name,
            self.version,
            action
        )
        sig = self._get_authorization_v4_header(
            headers=headers,
            dt=date,
            method='POST',
            path=path,
            data=data
        )
        headers.update({'Authorization': sig})
        return headers

    def _get_authorization_v4_header(self, headers: dict,
                                     data: str,
                                     dt: datetime,
                                     method: str = 'GET',
                                     path: str = '/'):
        credentials_scope = self._get_credential_scope(dt=dt)
        signed_headers = self._get_signed_headers(headers=headers)
        signature = self._get_signature(headers=headers, dt=dt,
                                        method=method, path=path,
                                        data=data)
        return 'OSC4-HMAC-SHA256 Credential=%(u)s/%(c)s, ' \
               'SignedHeaders=%(sh)s, Signature=%(s)s' % {
                   'u': self.access_key,
                   'c': credentials_scope,
                   'sh': signed_headers,
                   's': signature
               }

    def _get_signature(self, headers: dict, dt: datetime,
                       method: str, path: str, data: str):
        string_to_sign = self._get_string_to_sign(headers=headers, dt=dt,
                                                  method=method, path=path,
                                                  data=data)
        signing_key = self._get_key_to_sign_with(
            self.access_secret,
            dt.strftime('%Y%m%d')
        )
        return hmac.new(signing_key, string_to_sign.encode('utf-8'),
                        hashlib.sha256).hexdigest()

    def _get_key_to_sign_with(self, key: str, dt: str):
        k_date = self.sign(('OSC4' + key).encode('utf-8'), dt)
        k_region = self.sign(k_date, self.connection.region_name)
        k_service = self.sign(k_region, self.connection.service_name)
        return self.sign(k_service, 'osc4_request')

    def _get_string_to_sign(self, headers: dict, dt: datetime, method: str,
                            path: str, data: str):
        canonical_request = self._get_canonical_request(headers=headers,
                                                        method=method,
                                                        path=path,
                                                        data=data)
        return 'OSC4-HMAC-SHA256' + '\n' \
            + dt.strftime('%Y%m%dT%H%M%SZ') + '\n' \
            + self._get_credential_scope(dt) + '\n' \
            + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()

    def _get_credential_scope(self, dt):
        return '/'.join([dt.strftime('%Y%m%d'),
                         self.connection.region_name,
                         self.connection.service_name,
                         'osc4_request'])

    def _get_canonical_request(self, headers, method, path, data):
        return '\n'.join([
            method,
            path,
            self._get_request_params({}),
            self._get_canonical_headers(headers),
            self._get_signed_headers(headers),
            hashlib.sha256(data.encode('utf-8')).hexdigest()
        ])

Anon7 - 2022
AnonSec Team