Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.144.254.149
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_collections/amazon/aws/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_collections/amazon/aws/plugins/module_utils//acm.py
# -*- coding: utf-8 -*-
#
# Copyright (c) 2019 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software.  If not, see <http://www.gnu.org/licenses/>.
#
# Author:
#   - Matthew Davis <Matthew.Davis.2@team.telstra.com>
#     on behalf of Telstra Corporation Limited
#
# Common functionality to be used by the modules:
#   - acm_certificate
#   - acm_certificate_info

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

"""
Common Amazon Certificate Manager facts shared between modules
"""

try:
    from botocore.exceptions import BotoCoreError, ClientError
except ImportError:
    pass

from ansible.module_utils._text import to_bytes
from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict

from .core import is_boto3_error_code
from .ec2 import AWSRetry
from .ec2 import ansible_dict_to_boto3_tag_list
from .ec2 import boto3_tag_list_to_ansible_dict


class ACMServiceManager(object):
    """Handles ACM Facts Services"""

    def __init__(self, module):
        self.module = module
        self.client = module.client('acm')

    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException'])
    def delete_certificate_with_backoff(self, client, arn):
        client.delete_certificate(CertificateArn=arn)

    def delete_certificate(self, client, module, arn):
        module.debug("Attempting to delete certificate %s" % arn)
        try:
            self.delete_certificate_with_backoff(client, arn)
        except (BotoCoreError, ClientError) as e:
            module.fail_json_aws(e, msg="Couldn't delete certificate %s" % arn)
        module.debug("Successfully deleted certificate %s" % arn)

    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException'])
    def list_certificates_with_backoff(self, client, statuses=None):
        paginator = client.get_paginator('list_certificates')
        kwargs = dict()
        if statuses:
            kwargs['CertificateStatuses'] = statuses
        return paginator.paginate(**kwargs).build_full_result()['CertificateSummaryList']

    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException', 'ResourceNotFoundException'])
    def get_certificate_with_backoff(self, client, certificate_arn):
        response = client.get_certificate(CertificateArn=certificate_arn)
        # strip out response metadata
        return {'Certificate': response['Certificate'],
                'CertificateChain': response['CertificateChain']}

    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException', 'ResourceNotFoundException'])
    def describe_certificate_with_backoff(self, client, certificate_arn):
        return client.describe_certificate(CertificateArn=certificate_arn)['Certificate']

    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException', 'ResourceNotFoundException'])
    def list_certificate_tags_with_backoff(self, client, certificate_arn):
        return client.list_tags_for_certificate(CertificateArn=certificate_arn)['Tags']

    # Returns a list of certificates
    # if domain_name is specified, returns only certificates with that domain
    # if an ARN is specified, returns only that certificate
    # only_tags is a dict, e.g. {'key':'value'}. If specified this function will return
    # only certificates which contain all those tags (key exists, value matches).
    def get_certificates(self, client, module, domain_name=None, statuses=None, arn=None, only_tags=None):
        try:
            all_certificates = self.list_certificates_with_backoff(client=client, statuses=statuses)
        except (BotoCoreError, ClientError) as e:
            module.fail_json_aws(e, msg="Couldn't obtain certificates")
        if domain_name:
            certificates = [cert for cert in all_certificates
                            if cert['DomainName'] == domain_name]
        else:
            certificates = all_certificates

        if arn:
            # still return a list, not just one item
            certificates = [c for c in certificates if c['CertificateArn'] == arn]

        results = []
        for certificate in certificates:
            try:
                cert_data = self.describe_certificate_with_backoff(client, certificate['CertificateArn'])
            except is_boto3_error_code('ResourceNotFoundException'):
                # The certificate was deleted after the call to list_certificates_with_backoff.
                continue
            except (BotoCoreError, ClientError) as e:  # pylint: disable=duplicate-except
                module.fail_json_aws(e, msg="Couldn't obtain certificate metadata for domain %s" % certificate['DomainName'])

            # in some states, ACM resources do not have a corresponding cert
            if cert_data['Status'] not in ['PENDING_VALIDATION', 'VALIDATION_TIMED_OUT', 'FAILED']:
                try:
                    cert_data.update(self.get_certificate_with_backoff(client, certificate['CertificateArn']))
                except is_boto3_error_code('ResourceNotFoundException'):
                    # The certificate was deleted after the call to list_certificates_with_backoff.
                    continue
                except (BotoCoreError, ClientError, KeyError) as e:  # pylint: disable=duplicate-except
                    module.fail_json_aws(e, msg="Couldn't obtain certificate data for domain %s" % certificate['DomainName'])
            cert_data = camel_dict_to_snake_dict(cert_data)
            try:
                tags = self.list_certificate_tags_with_backoff(client, certificate['CertificateArn'])
            except is_boto3_error_code('ResourceNotFoundException'):
                # The certificate was deleted after the call to list_certificates_with_backoff.
                continue
            except (BotoCoreError, ClientError) as e:  # pylint: disable=duplicate-except
                module.fail_json_aws(e, msg="Couldn't obtain tags for domain %s" % certificate['DomainName'])

            cert_data['tags'] = boto3_tag_list_to_ansible_dict(tags)
            results.append(cert_data)

        if only_tags:
            for tag_key in only_tags:
                try:
                    results = [c for c in results if ('tags' in c) and (tag_key in c['tags']) and (c['tags'][tag_key] == only_tags[tag_key])]
                except (TypeError, AttributeError) as e:
                    for c in results:
                        if 'tags' not in c:
                            module.debug("cert is %s" % str(c))
                    module.fail_json(msg="ACM tag filtering err", exception=e)

        return results

    # returns the domain name of a certificate (encoded in the public cert)
    # for a given ARN
    # A cert with that ARN must already exist
    def get_domain_of_cert(self, client, module, arn):
        if arn is None:
            module.fail(msg="Internal error with ACM domain fetching, no certificate ARN specified")
        try:
            cert_data = self.describe_certificate_with_backoff(client=client, certificate_arn=arn)
        except (BotoCoreError, ClientError) as e:
            module.fail_json_aws(e, msg="Couldn't obtain certificate data for arn %s" % arn)
        return cert_data['DomainName']

    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException'])
    def import_certificate_with_backoff(self, client, certificate, private_key, certificate_chain, arn):
        if certificate_chain:
            if arn:
                ret = client.import_certificate(Certificate=to_bytes(certificate),
                                                PrivateKey=to_bytes(private_key),
                                                CertificateChain=to_bytes(certificate_chain),
                                                CertificateArn=arn)
            else:
                ret = client.import_certificate(Certificate=to_bytes(certificate),
                                                PrivateKey=to_bytes(private_key),
                                                CertificateChain=to_bytes(certificate_chain))
        else:
            if arn:
                ret = client.import_certificate(Certificate=to_bytes(certificate),
                                                PrivateKey=to_bytes(private_key),
                                                CertificateArn=arn)
            else:
                ret = client.import_certificate(Certificate=to_bytes(certificate),
                                                PrivateKey=to_bytes(private_key))
        return ret['CertificateArn']

    # Tags are a normal Ansible style dict
    # {'Key':'Value'}
    @AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException', 'ResourceNotFoundException'])
    def tag_certificate_with_backoff(self, client, arn, tags):
        aws_tags = ansible_dict_to_boto3_tag_list(tags)
        client.add_tags_to_certificate(CertificateArn=arn, Tags=aws_tags)

    def import_certificate(self, client, module, certificate, private_key, arn=None, certificate_chain=None, tags=None):

        original_arn = arn

        # upload cert
        try:
            arn = self.import_certificate_with_backoff(client, certificate, private_key, certificate_chain, arn)
        except (BotoCoreError, ClientError) as e:
            module.fail_json_aws(e, msg="Couldn't upload new certificate")

        if original_arn and (arn != original_arn):
            # I'm not sure whether the API guarentees that the ARN will not change
            # I'm failing just in case.
            # If I'm wrong, I'll catch it in the integration tests.
            module.fail_json(msg="ARN changed with ACM update, from %s to %s" % (original_arn, arn))

        # tag that cert
        try:
            self.tag_certificate_with_backoff(client, arn, tags)
        except (BotoCoreError, ClientError) as e:
            module.debug("Attempting to delete the cert we just created, arn=%s" % arn)
            try:
                self.delete_certificate_with_backoff(client, arn)
            except (BotoCoreError, ClientError):
                module.warn("Certificate %s exists, and is not tagged. So Ansible will not see it on the next run.")
                module.fail_json_aws(e, msg="Couldn't tag certificate %s, couldn't delete it either" % arn)
            module.fail_json_aws(e, msg="Couldn't tag certificate %s" % arn)

        return arn

Anon7 - 2022
AnonSec Team