Server IP : 85.214.239.14 / Your IP : 3.141.197.1 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/community/aws/plugins/modules/ |
Upload File : |
#!/usr/bin/python # Copyright: Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = r''' module: ec2_vpc_nacl short_description: create and delete Network ACLs version_added: 1.0.0 description: - Read the AWS documentation for Network ACLS U(https://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/VPC_ACLs.html) options: name: description: - Tagged name identifying a network ACL. - One and only one of the I(name) or I(nacl_id) is required. required: false type: str nacl_id: description: - NACL id identifying a network ACL. - One and only one of the I(name) or I(nacl_id) is required. required: false type: str vpc_id: description: - VPC id of the requesting VPC. - Required when state present. required: false type: str subnets: description: - The list of subnets that should be associated with the network ACL. - Must be specified as a list - Each subnet can be specified as subnet ID, or its tagged name. required: false type: list elements: str default: [] egress: description: - A list of rules for outgoing traffic. Each rule must be specified as a list. Each rule may contain the rule number (integer 1-32766), protocol (one of ['tcp', 'udp', 'icmp', 'ipv6-icmp', '-1', 'all']), the rule action ('allow' or 'deny') the CIDR of the IPv4 or IPv6 network range to allow or deny, the ICMP type (-1 means all types), the ICMP code (-1 means all codes), the last port in the range for TCP or UDP protocols, and the first port in the range for TCP or UDP protocols. See examples. default: [] required: false type: list elements: list ingress: description: - List of rules for incoming traffic. Each rule must be specified as a list. Each rule may contain the rule number (integer 1-32766), protocol (one of ['tcp', 'udp', 'icmp', 'ipv6-icmp', '-1', 'all']), the rule action ('allow' or 'deny') the CIDR of the IPv4 or IPv6 network range to allow or deny, the ICMP type (-1 means all types), the ICMP code (-1 means all codes), the last port in the range for TCP or UDP protocols, and the first port in the range for TCP or UDP protocols. See examples. default: [] required: false type: list elements: list state: description: - Creates or modifies an existing NACL - Deletes a NACL and reassociates subnets to the default NACL required: false type: str choices: ['present', 'absent'] default: present author: Mike Mochan (@mmochan) extends_documentation_fragment: - amazon.aws.aws - amazon.aws.ec2 - amazon.aws.boto3 - amazon.aws.tags notes: - Support for I(purge_tags) was added in release 4.0.0. ''' EXAMPLES = r''' # Complete example to create and delete a network ACL # that allows SSH, HTTP and ICMP in, and all traffic out. - name: "Create and associate production DMZ network ACL with DMZ subnets" community.aws.ec2_vpc_nacl: vpc_id: vpc-12345678 name: prod-dmz-nacl region: ap-southeast-2 subnets: ['prod-dmz-1', 'prod-dmz-2'] tags: CostCode: CC1234 Project: phoenix Description: production DMZ ingress: # rule no, protocol, allow/deny, cidr, icmp_type, icmp_code, # port from, port to - [100, 'tcp', 'allow', '0.0.0.0/0', null, null, 22, 22] - [200, 'tcp', 'allow', '0.0.0.0/0', null, null, 80, 80] - [205, 'tcp', 'allow', '::/0', null, null, 80, 80] - [300, 'icmp', 'allow', '0.0.0.0/0', 0, 8] - [305, 'ipv6-icmp', 'allow', '::/0', 0, 8] egress: - [100, 'all', 'allow', '0.0.0.0/0', null, null, null, null] - [105, 'all', 'allow', '::/0', null, null, null, null] state: 'present' - name: "Remove the ingress and egress rules - defaults to deny all" community.aws.ec2_vpc_nacl: vpc_id: vpc-12345678 name: prod-dmz-nacl region: ap-southeast-2 subnets: - prod-dmz-1 - prod-dmz-2 tags: CostCode: CC1234 Project: phoenix Description: production DMZ state: present - name: "Remove the NACL subnet associations and tags" community.aws.ec2_vpc_nacl: vpc_id: 'vpc-12345678' name: prod-dmz-nacl region: ap-southeast-2 state: present - name: "Delete nacl and subnet associations" community.aws.ec2_vpc_nacl: vpc_id: vpc-12345678 name: prod-dmz-nacl state: absent - name: "Delete nacl by its id" community.aws.ec2_vpc_nacl: nacl_id: acl-33b4ee5b state: absent ''' RETURN = r''' task: description: The result of the create, or delete action. returned: success type: dict nacl_id: description: The id of the NACL (when creating or updating an ACL) returned: success type: str sample: acl-123456789abcdef01 ''' try: import botocore except ImportError: pass # Handled by AnsibleAWSModule from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule from ansible_collections.amazon.aws.plugins.module_utils.ec2 import AWSRetry from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ensure_ec2_tags from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_specifications # VPC-supported IANA protocol numbers # http://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml PROTOCOL_NUMBERS = {'all': -1, 'icmp': 1, 'tcp': 6, 'udp': 17, 'ipv6-icmp': 58} # Utility methods def icmp_present(entry): if len(entry) == 6 and entry[1] in ['icmp', 'ipv6-icmp'] or entry[1] in [1, 58]: return True def subnets_removed(nacl_id, subnets, client, module): results = find_acl_by_id(nacl_id, client, module) associations = results['NetworkAcls'][0]['Associations'] subnet_ids = [assoc['SubnetId'] for assoc in associations] return [subnet for subnet in subnet_ids if subnet not in subnets] def subnets_added(nacl_id, subnets, client, module): results = find_acl_by_id(nacl_id, client, module) associations = results['NetworkAcls'][0]['Associations'] subnet_ids = [assoc['SubnetId'] for assoc in associations] return [subnet for subnet in subnets if subnet not in subnet_ids] def subnets_changed(nacl, client, module): changed = False vpc_id = module.params.get('vpc_id') nacl_id = nacl['NetworkAcls'][0]['NetworkAclId'] subnets = subnets_to_associate(nacl, client, module) if not subnets: default_nacl_id = find_default_vpc_nacl(vpc_id, client, module)[0] subnets = find_subnet_ids_by_nacl_id(nacl_id, client, module) if subnets: replace_network_acl_association(default_nacl_id, subnets, client, module) changed = True return changed changed = False return changed subs_added = subnets_added(nacl_id, subnets, client, module) if subs_added: replace_network_acl_association(nacl_id, subs_added, client, module) changed = True subs_removed = subnets_removed(nacl_id, subnets, client, module) if subs_removed: default_nacl_id = find_default_vpc_nacl(vpc_id, client, module)[0] replace_network_acl_association(default_nacl_id, subs_removed, client, module) changed = True return changed def nacls_changed(nacl, client, module): changed = False params = dict() params['egress'] = module.params.get('egress') params['ingress'] = module.params.get('ingress') nacl_id = nacl['NetworkAcls'][0]['NetworkAclId'] nacl = describe_network_acl(client, module) entries = nacl['NetworkAcls'][0]['Entries'] egress = [rule for rule in entries if rule['Egress'] is True and rule['RuleNumber'] < 32767] ingress = [rule for rule in entries if rule['Egress'] is False and rule['RuleNumber'] < 32767] if rules_changed(egress, params['egress'], True, nacl_id, client, module): changed = True if rules_changed(ingress, params['ingress'], False, nacl_id, client, module): changed = True return changed def tags_changed(nacl_id, client, module): tags = module.params.get('tags') name = module.params.get('name') purge_tags = module.params.get('purge_tags') if name is None and tags is None: return False if module.params.get('tags') is None: # Only purge tags if tags is explicitly set to {} and purge_tags is True purge_tags = False new_tags = dict() if module.params.get('name') is not None: new_tags['Name'] = module.params.get('name') new_tags.update(module.params.get('tags') or {}) return ensure_ec2_tags(client, module, nacl_id, tags=new_tags, purge_tags=purge_tags, retry_codes=['InvalidNetworkAclID.NotFound']) def rules_changed(aws_rules, param_rules, Egress, nacl_id, client, module): changed = False rules = list() for entry in param_rules: rules.append(process_rule_entry(entry, Egress)) if rules == aws_rules: return changed else: removed_rules = [x for x in aws_rules if x not in rules] if removed_rules: params = dict() for rule in removed_rules: params['NetworkAclId'] = nacl_id params['RuleNumber'] = rule['RuleNumber'] params['Egress'] = Egress delete_network_acl_entry(params, client, module) changed = True added_rules = [x for x in rules if x not in aws_rules] if added_rules: for rule in added_rules: rule['NetworkAclId'] = nacl_id create_network_acl_entry(rule, client, module) changed = True return changed def is_ipv6(cidr): return ':' in cidr def process_rule_entry(entry, Egress): params = dict() params['RuleNumber'] = entry[0] params['Protocol'] = str(PROTOCOL_NUMBERS[entry[1]]) params['RuleAction'] = entry[2] params['Egress'] = Egress if is_ipv6(entry[3]): params['Ipv6CidrBlock'] = entry[3] else: params['CidrBlock'] = entry[3] if icmp_present(entry): params['IcmpTypeCode'] = {"Type": int(entry[4]), "Code": int(entry[5])} else: if entry[6] or entry[7]: params['PortRange'] = {"From": entry[6], 'To': entry[7]} return params def restore_default_associations(assoc_ids, default_nacl_id, client, module): if assoc_ids: params = dict() params['NetworkAclId'] = default_nacl_id[0] for assoc_id in assoc_ids: params['AssociationId'] = assoc_id restore_default_acl_association(params, client, module) return True def construct_acl_entries(nacl, client, module): for entry in module.params.get('ingress'): params = process_rule_entry(entry, Egress=False) params['NetworkAclId'] = nacl['NetworkAcl']['NetworkAclId'] create_network_acl_entry(params, client, module) for rule in module.params.get('egress'): params = process_rule_entry(rule, Egress=True) params['NetworkAclId'] = nacl['NetworkAcl']['NetworkAclId'] create_network_acl_entry(params, client, module) # Module invocations def setup_network_acl(client, module): changed = False nacl = describe_network_acl(client, module) if not nacl['NetworkAcls']: tags = {} if module.params.get('name'): tags['Name'] = module.params.get('name') tags.update(module.params.get('tags') or {}) nacl = create_network_acl(module.params.get('vpc_id'), client, module, tags) nacl_id = nacl['NetworkAcl']['NetworkAclId'] subnets = subnets_to_associate(nacl, client, module) replace_network_acl_association(nacl_id, subnets, client, module) construct_acl_entries(nacl, client, module) changed = True return changed, nacl['NetworkAcl']['NetworkAclId'] else: changed = False nacl_id = nacl['NetworkAcls'][0]['NetworkAclId'] changed |= subnets_changed(nacl, client, module) changed |= nacls_changed(nacl, client, module) changed |= tags_changed(nacl_id, client, module) return changed, nacl_id def remove_network_acl(client, module): changed = False result = dict() nacl = describe_network_acl(client, module) if nacl['NetworkAcls']: nacl_id = nacl['NetworkAcls'][0]['NetworkAclId'] vpc_id = nacl['NetworkAcls'][0]['VpcId'] associations = nacl['NetworkAcls'][0]['Associations'] assoc_ids = [a['NetworkAclAssociationId'] for a in associations] default_nacl_id = find_default_vpc_nacl(vpc_id, client, module) if not default_nacl_id: result = {vpc_id: "Default NACL ID not found - Check the VPC ID"} return changed, result if restore_default_associations(assoc_ids, default_nacl_id, client, module): delete_network_acl(nacl_id, client, module) changed = True result[nacl_id] = "Successfully deleted" return changed, result if not assoc_ids: delete_network_acl(nacl_id, client, module) changed = True result[nacl_id] = "Successfully deleted" return changed, result return changed, result # Boto3 client methods @AWSRetry.jittered_backoff() def _create_network_acl(client, *args, **kwargs): return client.create_network_acl(*args, **kwargs) def create_network_acl(vpc_id, client, module, tags): params = dict(VpcId=vpc_id) if tags: params['TagSpecifications'] = boto3_tag_specifications(tags, ['network-acl']) try: if module.check_mode: nacl = dict(NetworkAcl=dict(NetworkAclId="nacl-00000000")) else: nacl = _create_network_acl(client, **params) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) return nacl @AWSRetry.jittered_backoff(catch_extra_error_codes=['InvalidNetworkAclID.NotFound']) def _create_network_acl_entry(client, *args, **kwargs): return client.create_network_acl_entry(*args, **kwargs) def create_network_acl_entry(params, client, module): try: if not module.check_mode: _create_network_acl_entry(client, **params) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) @AWSRetry.jittered_backoff() def _delete_network_acl(client, *args, **kwargs): return client.delete_network_acl(*args, **kwargs) def delete_network_acl(nacl_id, client, module): try: if not module.check_mode: _delete_network_acl(client, NetworkAclId=nacl_id) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) @AWSRetry.jittered_backoff(catch_extra_error_codes=['InvalidNetworkAclID.NotFound']) def _delete_network_acl_entry(client, *args, **kwargs): return client.delete_network_acl_entry(*args, **kwargs) def delete_network_acl_entry(params, client, module): try: if not module.check_mode: _delete_network_acl_entry(client, **params) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) @AWSRetry.jittered_backoff() def _describe_network_acls(client, **kwargs): return client.describe_network_acls(**kwargs) @AWSRetry.jittered_backoff(catch_extra_error_codes=['InvalidNetworkAclID.NotFound']) def _describe_network_acls_retry_missing(client, **kwargs): return client.describe_network_acls(**kwargs) def describe_acl_associations(subnets, client, module): if not subnets: return [] try: results = _describe_network_acls_retry_missing(client, Filters=[ {'Name': 'association.subnet-id', 'Values': subnets} ]) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) associations = results['NetworkAcls'][0]['Associations'] return [a['NetworkAclAssociationId'] for a in associations if a['SubnetId'] in subnets] def describe_network_acl(client, module): try: if module.params.get('nacl_id'): nacl = _describe_network_acls(client, Filters=[ {'Name': 'network-acl-id', 'Values': [module.params.get('nacl_id')]} ]) else: nacl = _describe_network_acls(client, Filters=[ {'Name': 'tag:Name', 'Values': [module.params.get('name')]} ]) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) return nacl def find_acl_by_id(nacl_id, client, module): try: return _describe_network_acls_retry_missing(client, NetworkAclIds=[nacl_id]) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) def find_default_vpc_nacl(vpc_id, client, module): try: response = _describe_network_acls_retry_missing(client, Filters=[ {'Name': 'vpc-id', 'Values': [vpc_id]}]) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) nacls = response['NetworkAcls'] return [n['NetworkAclId'] for n in nacls if n['IsDefault'] is True] def find_subnet_ids_by_nacl_id(nacl_id, client, module): try: results = _describe_network_acls_retry_missing(client, Filters=[ {'Name': 'association.network-acl-id', 'Values': [nacl_id]} ]) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) if results['NetworkAcls']: associations = results['NetworkAcls'][0]['Associations'] return [s['SubnetId'] for s in associations if s['SubnetId']] else: return [] @AWSRetry.jittered_backoff(catch_extra_error_codes=['InvalidNetworkAclID.NotFound']) def _replace_network_acl_association(client, *args, **kwargs): return client.replace_network_acl_association(*args, **kwargs) def replace_network_acl_association(nacl_id, subnets, client, module): params = dict() params['NetworkAclId'] = nacl_id for association in describe_acl_associations(subnets, client, module): params['AssociationId'] = association try: if not module.check_mode: _replace_network_acl_association(client, **params) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) @AWSRetry.jittered_backoff(catch_extra_error_codes=['InvalidNetworkAclID.NotFound']) def _replace_network_acl_entry(client, *args, **kwargs): return client.replace_network_acl_entry(*args, **kwargs) def replace_network_acl_entry(entries, Egress, nacl_id, client, module): for entry in entries: params = entry params['NetworkAclId'] = nacl_id try: if not module.check_mode: _replace_network_acl_entry(client, **params) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) @AWSRetry.jittered_backoff(catch_extra_error_codes=['InvalidNetworkAclID.NotFound']) def _replace_network_acl_association(client, *args, **kwargs): return client.replace_network_acl_association(*args, **kwargs) def restore_default_acl_association(params, client, module): try: if not module.check_mode: _replace_network_acl_association(client, **params) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) @AWSRetry.jittered_backoff() def _describe_subnets(client, *args, **kwargs): return client.describe_subnets(*args, **kwargs) def subnets_to_associate(nacl, client, module): params = list(module.params.get('subnets')) if not params: return [] all_found = [] if any(x.startswith("subnet-") for x in params): try: subnets = _describe_subnets(client, Filters=[ {'Name': 'subnet-id', 'Values': params}]) all_found.extend(subnets.get('Subnets', [])) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) if len(params) != len(all_found): try: subnets = _describe_subnets(client, Filters=[ {'Name': 'tag:Name', 'Values': params}]) all_found.extend(subnets.get('Subnets', [])) except botocore.exceptions.ClientError as e: module.fail_json_aws(e) return list(set(s['SubnetId'] for s in all_found if s.get('SubnetId'))) def main(): argument_spec = dict( vpc_id=dict(), name=dict(), nacl_id=dict(), subnets=dict(required=False, type='list', default=list(), elements='str'), tags=dict(required=False, type='dict', aliases=['resource_tags']), purge_tags=dict(required=False, type='bool', default=True), ingress=dict(required=False, type='list', default=list(), elements='list'), egress=dict(required=False, type='list', default=list(), elements='list'), state=dict(default='present', choices=['present', 'absent']), ) module = AnsibleAWSModule(argument_spec=argument_spec, supports_check_mode=True, required_one_of=[['name', 'nacl_id']], required_if=[['state', 'present', ['vpc_id']]]) state = module.params.get('state').lower() client = module.client('ec2') invocations = { "present": setup_network_acl, "absent": remove_network_acl } (changed, results) = invocations[state](client, module) module.exit_json(changed=changed, nacl_id=results) if __name__ == '__main__': main()