Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.119.213.78
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/aws/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/aws/plugins/module_utils//ec2.py
# Copyright: Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function
__metaclass__ = type

from copy import deepcopy

from ansible_collections.amazon.aws.plugins.module_utils.core import is_boto3_error_code
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import AWSRetry
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ansible_dict_to_boto3_filter_list
from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_list_to_ansible_dict
from ansible_collections.amazon.aws.plugins.module_utils.tagging import ansible_dict_to_boto3_tag_list
from ansible_collections.amazon.aws.plugins.module_utils.tagging import compare_aws_tags
from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_specifications

from ansible_collections.community.aws.plugins.module_utils.base import BaseResourceManager
from ansible_collections.community.aws.plugins.module_utils.base import BaseWaiterFactory
from ansible_collections.community.aws.plugins.module_utils.base import Boto3Mixin


class Ec2WaiterFactory(BaseWaiterFactory):
    def __init__(self, module):
        # the AWSRetry wrapper doesn't support the wait functions (there's no
        # public call we can cleanly wrap)
        client = module.client('ec2')
        super(Ec2WaiterFactory, self).__init__(module, client)

    @property
    def _waiter_model_data(self):
        data = super(Ec2WaiterFactory, self)._waiter_model_data
        return data


class Ec2Boto3Mixin(Boto3Mixin):

    @AWSRetry.jittered_backoff()
    def _paginated_describe_subnets(self, **params):
        paginator = self.client.get_paginator('describe_subnets')
        return paginator.paginate(**params).build_full_result()

    @Boto3Mixin.aws_error_handler('describe subnets')
    def _describe_subnets(self, **params):
        try:
            result = self._paginated_describe_subnets(**params)
        except is_boto3_error_code('SubnetID.NotFound'):
            return None
        return result.get('Subnets', None)


class BaseEc2Manager(Ec2Boto3Mixin, BaseResourceManager):

    resource_id = None
    TAG_RESOURCE_TYPE = None
    # This can be overridden by a subclass *if* 'Tags' isn't returned as a part of
    # the standard Resource description
    TAGS_ON_RESOURCE = True
    # If the resource supports using "TagSpecifications" on creation we can
    TAGS_ON_CREATE = 'TagSpecifications'

    def __init__(self, module, id=None):
        r"""
        Parameters:
            module (AnsibleAWSModule): An Ansible module.
        """
        super(BaseEc2Manager, self).__init__(module)
        self.client = self._create_client()
        self._tagging_updates = dict()
        self.resource_id = id

        # Name parameter is unique (by region) and can not be modified.
        if self.resource_id:
            resource = deepcopy(self.get_resource())
            self.original_resource = resource

    def _flush_update(self):
        changed = False
        changed |= self._do_tagging()
        changed |= super(BaseEc2Manager, self)._flush_update()
        return changed

    @Boto3Mixin.aws_error_handler('connect to AWS')
    def _create_client(self, client_name='ec2'):
        client = self.module.client(client_name, retry_decorator=AWSRetry.jittered_backoff())
        return client

    @Boto3Mixin.aws_error_handler('set tags on resource')
    def _add_tags(self, **params):
        self.client.create_tags(aws_retry=True, **params)
        return True

    @Boto3Mixin.aws_error_handler('unset tags on resource')
    def _remove_tags(self, **params):
        self.client.delete_tags(aws_retry=True, **params)
        return True

    @AWSRetry.jittered_backoff()
    def _paginated_describe_tags(self, **params):
        paginator = self.client.get_paginator('describe_tags')
        return paginator.paginate(**params).build_full_result()

    @Boto3Mixin.aws_error_handler('list tags on resource')
    def _describe_tags(self, id=None):
        if not id:
            id = self.resource_id
        filters = ansible_dict_to_boto3_filter_list({"resource-id": id})
        tags = self._paginated_describe_tags(Filters=filters)
        return tags

    def _get_tags(self, id=None):
        if id is None:
            id = self.resource_id
        # If the Tags are available from the resource, then use them
        if self.TAGS_ON_RESOURCE:
            tags = self._preupdate_resource.get('Tags', [])
        # Otherwise we'll have to look them up
        else:
            tags = self._describe_tags(id=id)
        return boto3_tag_list_to_ansible_dict(tags)

    def _do_tagging(self):
        changed = False
        tags_to_add = self._tagging_updates.get('add')
        tags_to_remove = self._tagging_updates.get('remove')

        if tags_to_add:
            changed = True
            tags = ansible_dict_to_boto3_tag_list(tags_to_add)
            if not self.module.check_mode:
                self._add_tags(Resources=[self.resource_id], Tags=tags)
        if tags_to_remove:
            changed = True
            if not self.module.check_mode:
                tag_list = [dict(Key=tagkey) for tagkey in tags_to_remove]
                self._remove_tags(Resources=[self.resource_id], Tags=tag_list)

        return changed

    def _merge_resource_changes(self, filter_immutable=True, creation=False):

        resource = super(BaseEc2Manager, self)._merge_resource_changes(
            filter_immutable=filter_immutable,
            creation=creation
        )

        if creation:
            if not self.TAGS_ON_CREATE:
                resource.pop('Tags', None)
            elif self.TAGS_ON_CREATE == 'TagSpecifications':
                tags = boto3_tag_list_to_ansible_dict(resource.pop('Tags', []))
                tag_specs = boto3_tag_specifications(tags, types=[self.TAG_RESOURCE_TYPE])
                if tag_specs:
                    resource['TagSpecifications'] = tag_specs

        return resource

    def set_tags(self, tags, purge_tags):

        if tags is None:
            return False
        changed = False

        # Tags are returned as a part of the resource, but have to be updated
        # via dedicated tagging methods
        current_tags = self._get_tags()

        # So that diff works in check mode we need to know the full target state
        if purge_tags:
            desired_tags = deepcopy(tags)
        else:
            desired_tags = deepcopy(current_tags)
            desired_tags.update(tags)

        tags_to_add, tags_to_remove = compare_aws_tags(current_tags, tags, purge_tags)

        if tags_to_add:
            self._tagging_updates['add'] = tags_to_add
            changed = True
        if tags_to_remove:
            self._tagging_updates['remove'] = tags_to_remove
            changed = True

        if changed:
            # Tags are a stored as a list, but treated like a list, the
            # simplisic '==' in _set_resource_value doesn't do the comparison
            # properly
            return self._set_resource_value('Tags', ansible_dict_to_boto3_tag_list(desired_tags))

        return False

Anon7 - 2022
AnonSec Team