Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.146.221.249
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_collections/amazon/aws/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_collections/amazon/aws/plugins/module_utils/urls.py
# Copyright: (c) 2018, Aaron Haaf <aabonh@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import datetime
import hashlib
import hmac
import operator

try:
    from boto3 import session
except ImportError:
    pass

from ansible.module_utils.six.moves.urllib.parse import urlencode
from ansible.module_utils.urls import open_url

from .ec2 import HAS_BOTO3
from .ec2 import get_aws_connection_info

import ansible.module_utils.common.warnings as ansible_warnings


def hexdigest(s):
    """
    Returns the sha256 hexdigest of a string after encoding.
    """

    ansible_warnings.deprecate(
        'amazon.aws.module_utils.urls.hexdigest is unused and has been deprecated.',
        version='7.0.0', collection_name='amazon.aws')

    return hashlib.sha256(s.encode("utf-8")).hexdigest()


def format_querystring(params=None):
    """
    Returns properly url-encoded query string from the provided params dict.

    It's specially sorted for cannonical requests
    """

    ansible_warnings.deprecate(
        'amazon.aws.module_utils.urls.format_querystring is unused and has been deprecated.',
        version='7.0.0', collection_name='amazon.aws')

    if not params:
        return ""

    # Query string values must be URL-encoded (space=%20). The parameters must be sorted by name.
    return urlencode(sorted(params.items(), operator.itemgetter(0)))


# Key derivation functions. See:
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
def sign(key, msg):
    '''
    Return digest for key applied to msg
    '''

    ansible_warnings.deprecate(
        'amazon.aws.module_utils.urls.sign is unused and has been deprecated.',
        version='7.0.0', collection_name='amazon.aws')

    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()


def get_signature_key(key, dateStamp, regionName, serviceName):
    '''
    Returns signature key for AWS resource
    '''

    ansible_warnings.deprecate(
        'amazon.aws.module_utils.urls.get_signature_key is unused and has been deprecated.',
        version='7.0.0', collection_name='amazon.aws')

    kDate = sign(("AWS4" + key).encode("utf-8"), dateStamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, "aws4_request")
    return kSigning


def get_aws_credentials_object(module):
    '''
    Returns aws_access_key_id, aws_secret_access_key, session_token for a module.
    '''

    ansible_warnings.deprecate(
        'amazon.aws.module_utils.urls.get_aws_credentials_object is unused and has been deprecated.',
        version='7.0.0', collection_name='amazon.aws')

    if not HAS_BOTO3:
        module.fail_json("get_aws_credentials_object requires boto3")

    dummy, dummy, boto_params = get_aws_connection_info(module, boto3=True)
    s = session.Session(**boto_params)

    return s.get_credentials()


# Reference: https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
def signed_request(
        module=None,
        method="GET", service=None, host=None, uri=None,
        query=None, body="", headers=None,
        session_in_header=True, session_in_query=False
):
    """Generate a SigV4 request to an AWS resource for a module

    This is used if you wish to authenticate with AWS credentials to a secure endpoint like an elastisearch domain.

    Returns :class:`HTTPResponse` object.

    Example:
        result = signed_request(
            module=this,
            service="es",
            host="search-recipes1-xxxxxxxxx.us-west-2.es.amazonaws.com",
        )

    :kwarg host: endpoint to talk to
    :kwarg service: AWS id of service (like `ec2` or `es`)
    :kwarg module: An AnsibleAWSModule to gather connection info from

    :kwarg body: (optional) Payload to send
    :kwarg method: (optional) HTTP verb to use
    :kwarg query: (optional) dict of query params to handle
    :kwarg uri: (optional) Resource path without query parameters

    :kwarg session_in_header: (optional) Add the session token to the headers
    :kwarg session_in_query: (optional) Add the session token to the query parameters

    :returns: HTTPResponse
    """

    module.deprecate(
        'amazon.aws.module_utils.urls.signed_request is unused and has been deprecated.',
        version='7.0.0', collection_name='amazon.aws')

    if not HAS_BOTO3:
        module.fail_json("A sigv4 signed_request requires boto3")

    # "Constants"

    t = datetime.datetime.utcnow()
    amz_date = t.strftime("%Y%m%dT%H%M%SZ")
    datestamp = t.strftime("%Y%m%d")  # Date w/o time, used in credential scope
    algorithm = "AWS4-HMAC-SHA256"

    # AWS stuff

    region, dummy, dummy = get_aws_connection_info(module, boto3=True)
    credentials = get_aws_credentials_object(module)
    access_key = credentials.access_key
    secret_key = credentials.secret_key
    session_token = credentials.token

    if not access_key:
        module.fail_json(msg="aws_access_key_id is missing")
    if not secret_key:
        module.fail_json(msg="aws_secret_access_key is missing")

    credential_scope = "/".join([datestamp, region, service, "aws4_request"])

    # Argument Defaults

    uri = uri or "/"
    query_string = format_querystring(query) if query else ""

    headers = headers or dict()
    query = query or dict()

    headers.update({
        "host": host,
        "x-amz-date": amz_date,
    })

    # Handle adding of session_token if present
    if session_token:
        if session_in_header:
            headers["X-Amz-Security-Token"] = session_token
        if session_in_query:
            query["X-Amz-Security-Token"] = session_token

    if method == "GET":
        body = ""

    # Derived data

    body_hash = hexdigest(body)
    signed_headers = ";".join(sorted(headers.keys()))

    # Setup Cannonical request to generate auth token

    cannonical_headers = "\n".join([
        key.lower().strip() + ":" + value for key, value in headers.items()
    ]) + "\n"  # Note additional trailing newline

    cannonical_request = "\n".join([
        method,
        uri,
        query_string,
        cannonical_headers,
        signed_headers,
        body_hash,
    ])

    string_to_sign = "\n".join([algorithm, amz_date, credential_scope, hexdigest(cannonical_request)])

    # Sign the Cannonical request

    signing_key = get_signature_key(secret_key, datestamp, region, service)
    signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()

    # Make auth header with that info

    authorization_header = "{0} Credential={1}/{2}, SignedHeaders={3}, Signature={4}".format(
        algorithm, access_key, credential_scope, signed_headers, signature
    )

    # PERFORM THE REQUEST!

    url = "https://" + host + uri

    if query_string != "":
        url = url + "?" + query_string

    final_headers = {
        "x-amz-date": amz_date,
        "Authorization": authorization_header,
    }

    final_headers.update(headers)

    return open_url(url, method=method, data=body, headers=final_headers)

Anon7 - 2022
AnonSec Team