Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.190.176.94
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/aws/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/aws/plugins/modules/wafv2_ip_set.py
#!/usr/bin/python
# Copyright: Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type


DOCUMENTATION = '''
---
module: wafv2_ip_set
version_added: 1.5.0
author:
  - "Markus Bergholz (@markuman)"
short_description: wafv2_ip_set
description:
  - Create, modify and delete IP sets for WAFv2.
options:
    state:
      description:
        - Whether the rule is present or absent.
      choices: ["present", "absent"]
      required: true
      type: str
    name:
      description:
        - The name of the IP set.
      required: true
      type: str
    description:
      description:
        - Description of the IP set.
      required: false
      type: str
    scope:
      description:
        - Specifies whether this is for an AWS CloudFront distribution or for a regional application,
          such as API Gateway or Application LoadBalancer.
      choices: ["CLOUDFRONT","REGIONAL"]
      required: true
      type: str
    ip_address_version:
      description:
        - Specifies whether this is an IPv4 or an IPv6 IP set.
        - Required when I(state=present).
      choices: ["IPV4","IPV6"]
      type: str
    addresses:
      description:
        - Contains an array of strings that specify one or more IP addresses or blocks of IP addresses in
          Classless Inter-Domain Routing (CIDR) notation.
        - Required when I(state=present).
        - When I(state=absent) and I(addresses) is defined, only the given IP addresses will be removed
          from the IP set. The entire IP set itself will stay present.
      type: list
      elements: str
    purge_addresses:
      description:
        - When set to C(no), keep the existing addresses in place. Will modify and add, but will not delete.
      default: true
      type: bool

notes:
  - Support for I(purge_tags) was added in release 4.0.0.

extends_documentation_fragment:
  - amazon.aws.aws
  - amazon.aws.ec2
  - amazon.aws.boto3
  - amazon.aws.tags

'''

EXAMPLES = '''
- name: test ip set
  wafv2_ip_set:
    name: test02
    state: present
    description: hallo eins
    scope: REGIONAL
    ip_address_version: IPV4
    addresses:
      - 8.8.8.8/32
      - 8.8.4.4/32
    tags:
      A: B
      C: D
'''

RETURN = """
addresses:
  description: Current addresses of the ip set
  sample:
    - 8.8.8.8/32
    - 8.8.4.4/32
  returned: Always, as long as the ip set exists
  type: list
arn:
  description: IP set arn
  sample: "arn:aws:wafv2:eu-central-1:11111111:regional/ipset/test02/4b007330-2934-4dc5-af24-82dcb3aeb127"
  type: str
  returned: Always, as long as the ip set exists
description:
  description: Description of the ip set
  sample: Some IP set description
  returned: Always, as long as the ip set exists
  type: str
ip_address_version:
  description: IP version of the ip set
  sample: IPV4
  type: str
  returned: Always, as long as the ip set exists
name:
  description: IP set name
  sample: test02
  returned: Always, as long as the ip set exists
  type: str
"""

try:
    from botocore.exceptions import ClientError, BotoCoreError
except ImportError:
    pass  # caught by AnsibleAWSModule

from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ansible_dict_to_boto3_tag_list
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import camel_dict_to_snake_dict
from ansible_collections.community.aws.plugins.module_utils.wafv2 import describe_wafv2_tags
from ansible_collections.community.aws.plugins.module_utils.wafv2 import ensure_wafv2_tags


class IpSet:
    def __init__(self, wafv2, name, scope, fail_json_aws):
        self.wafv2 = wafv2
        self.name = name
        self.scope = scope
        self.fail_json_aws = fail_json_aws
        self.existing_set, self.id, self.locktoken, self.arn = self.get_set()

    def description(self):
        return self.existing_set.get('Description')

    def _format_set(self, ip_set):
        if ip_set is None:
            return None
        return camel_dict_to_snake_dict(self.existing_set, ignore_list=['tags'])

    def get(self):
        return self._format_set(self.existing_set)

    def remove(self):
        try:
            response = self.wafv2.delete_ip_set(
                Name=self.name,
                Scope=self.scope,
                Id=self.id,
                LockToken=self.locktoken
            )
        except (BotoCoreError, ClientError) as e:
            self.fail_json_aws(e, msg="Failed to remove wafv2 ip set.")
        return {}

    def create(self, description, ip_address_version, addresses, tags):
        req_obj = {
            'Name': self.name,
            'Scope': self.scope,
            'IPAddressVersion': ip_address_version,
            'Addresses': addresses,
        }

        if description:
            req_obj['Description'] = description

        if tags:
            req_obj['Tags'] = ansible_dict_to_boto3_tag_list(tags)

        try:
            response = self.wafv2.create_ip_set(**req_obj)
        except (BotoCoreError, ClientError) as e:
            self.fail_json_aws(e, msg="Failed to create wafv2 ip set.")

        self.existing_set, self.id, self.locktoken, self.arn = self.get_set()
        return self._format_set(self.existing_set)

    def update(self, description, addresses):
        req_obj = {
            'Name': self.name,
            'Scope': self.scope,
            'Id': self.id,
            'Addresses': addresses,
            'LockToken': self.locktoken
        }

        if description:
            req_obj['Description'] = description

        try:
            response = self.wafv2.update_ip_set(**req_obj)
        except (BotoCoreError, ClientError) as e:
            self.fail_json_aws(e, msg="Failed to update wafv2 ip set.")

        self.existing_set, self.id, self.locktoken, self.arn = self.get_set()
        return self._format_set(self.existing_set)

    def get_set(self):
        response = self.list()
        existing_set = None
        id = None
        arn = None
        locktoken = None
        for item in response.get('IPSets'):
            if item.get('Name') == self.name:
                id = item.get('Id')
                locktoken = item.get('LockToken')
                arn = item.get('ARN')
        if id:
            try:
                existing_set = self.wafv2.get_ip_set(
                    Name=self.name,
                    Scope=self.scope,
                    Id=id
                ).get('IPSet')
            except (BotoCoreError, ClientError) as e:
                self.fail_json_aws(e, msg="Failed to get wafv2 ip set.")
            tags = describe_wafv2_tags(self.wafv2, arn, self.fail_json_aws)
            existing_set['tags'] = tags

        return existing_set, id, locktoken, arn

    def list(self, Nextmarker=None):
        # there is currently no paginator for wafv2
        req_obj = {
            'Scope': self.scope,
            'Limit': 100
        }
        if Nextmarker:
            req_obj['NextMarker'] = Nextmarker

        try:
            response = self.wafv2.list_ip_sets(**req_obj)
            if response.get('NextMarker'):
                response['IPSets'] += self.list(Nextmarker=response.get('NextMarker')).get('IPSets')
        except (BotoCoreError, ClientError) as e:
            self.fail_json_aws(e, msg="Failed to list wafv2 ip set.")

        return response


def compare(existing_set, addresses, purge_addresses, state):
    diff = False
    new_rules = []
    existing_rules = existing_set.get('addresses')
    if state == 'present':
        if purge_addresses:
            new_rules = addresses
            if sorted(addresses) != sorted(existing_set.get('addresses')):
                diff = True

        else:
            for requested_rule in addresses:
                if requested_rule not in existing_rules:
                    diff = True
                    new_rules.append(requested_rule)

            new_rules += existing_rules
    else:
        if purge_addresses and addresses:
            for requested_rule in addresses:
                if requested_rule in existing_rules:
                    diff = True
                    existing_rules.pop(existing_rules.index(requested_rule))
            new_rules = existing_rules

    return diff, new_rules


def main():

    arg_spec = dict(
        state=dict(type='str', required=True, choices=['present', 'absent']),
        name=dict(type='str', required=True),
        scope=dict(type='str', required=True, choices=['CLOUDFRONT', 'REGIONAL']),
        description=dict(type='str'),
        ip_address_version=dict(type='str', choices=['IPV4', 'IPV6']),
        addresses=dict(type='list', elements='str'),
        tags=dict(type='dict', aliases=['resource_tags']),
        purge_tags=dict(type='bool', default=True),
        purge_addresses=dict(type='bool', default=True),
    )

    module = AnsibleAWSModule(
        argument_spec=arg_spec,
        supports_check_mode=True,
        required_if=[['state', 'present', ['ip_address_version', 'addresses']]]
    )

    state = module.params.get("state")
    name = module.params.get("name")
    scope = module.params.get("scope")
    description = module.params.get("description")
    ip_address_version = module.params.get("ip_address_version")
    addresses = module.params.get("addresses")
    tags = module.params.get("tags")
    purge_tags = module.params.get("purge_tags")
    purge_addresses = module.params.get("purge_addresses")
    check_mode = module.check_mode

    wafv2 = module.client('wafv2')

    change = False
    retval = {}

    ip_set = IpSet(wafv2, name, scope, module.fail_json_aws)

    if state == 'present':

        if ip_set.get():
            tags_updated = ensure_wafv2_tags(wafv2, ip_set.arn, tags, purge_tags, module.fail_json_aws, module.check_mode)
            ips_updated, addresses = compare(ip_set.get(), addresses, purge_addresses, state)
            description_updated = bool(description) and ip_set.description() != description
            change = ips_updated or description_updated or tags_updated
            retval = ip_set.get()
            if module.check_mode:
                pass
            elif ips_updated or description_updated:
                retval = ip_set.update(
                    description=description,
                    addresses=addresses
                )
            elif tags_updated:
                retval, id, locktoken, arn = ip_set.get_set()
        else:
            if not check_mode:
                retval = ip_set.create(
                    description=description,
                    ip_address_version=ip_address_version,
                    addresses=addresses,
                    tags=tags
                )
            change = True

    if state == 'absent':
        if ip_set.get():
            if addresses:
                if len(addresses) > 0:
                    change, addresses = compare(ip_set.get(), addresses, purge_addresses, state)
                    if change and not check_mode:
                        retval = ip_set.update(
                            description=description,
                            addresses=addresses
                        )
            else:
                if not check_mode:
                    retval = ip_set.remove()
                change = True

    module.exit_json(changed=change, **retval)


if __name__ == '__main__':
    main()

Anon7 - 2022
AnonSec Team