Server IP : 85.214.239.14 / Your IP : 18.221.191.25 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /usr/lib/python3/dist-packages/ansible_collections/amazon/aws/plugins/modules/ |
Upload File : |
#!/usr/bin/python # This file is part of Ansible # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import (absolute_import, division, print_function) __metaclass__ = type DOCUMENTATION = ''' --- module: iam_policy version_added: 5.0.0 short_description: Manage inline IAM policies for users, groups, and roles description: - Allows uploading or removing inline IAM policies for IAM users, groups or roles. - To administer managed policies please see M(community.aws.iam_user), M(community.aws.iam_role), M(community.aws.iam_group) and M(community.aws.iam_managed_policy) - This module was originally added to C(community.aws) in release 1.0.0. options: iam_type: description: - Type of IAM resource. required: true choices: [ "user", "group", "role"] type: str iam_name: description: - Name of IAM resource you wish to target for policy actions. In other words, the user name, group name or role name. required: true type: str policy_name: description: - The name label for the policy to create or remove. required: true type: str policy_json: description: - A properly json formatted policy as string. type: json state: description: - Whether to create or delete the IAM policy. choices: [ "present", "absent"] default: present type: str skip_duplicates: description: - When I(skip_duplicates=true) the module looks for any policies that match the document you pass in. If there is a match it will not make a new policy object with the same rules. default: false type: bool author: - "Jonathan I. Davila (@defionscode)" - "Dennis Podkovyrin (@sbj-ss)" extends_documentation_fragment: - amazon.aws.aws - amazon.aws.ec2 - amazon.aws.boto3 ''' EXAMPLES = ''' # Advanced example, create two new groups and add a READ-ONLY policy to both # groups. - name: Create Two Groups, Mario and Luigi community.aws.iam_group: name: "{{ item }}" state: present loop: - Mario - Luigi register: new_groups - name: Apply READ-ONLY policy to new groups that have been recently created amazon.aws.iam_policy: iam_type: group iam_name: "{{ item.iam_group.group.group_name }}" policy_name: "READ-ONLY" policy_json: "{{ lookup('template', 'readonly.json.j2') }}" state: present loop: "{{ new_groups.results }}" # Create a new S3 policy with prefix per user - name: Create S3 policy from template amazon.aws.iam_policy: iam_type: user iam_name: "{{ item.user }}" policy_name: "s3_limited_access_{{ item.prefix }}" state: present policy_json: "{{ lookup('template', 's3_policy.json.j2') }}" loop: - user: s3_user prefix: s3_user_prefix ''' RETURN = ''' policy_names: description: A list of names of the inline policies embedded in the specified IAM resource (user, group, or role). returned: always type: list elements: str ''' import json try: from botocore.exceptions import BotoCoreError, ClientError except ImportError: pass from ansible.module_utils.six import string_types from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule from ansible_collections.amazon.aws.plugins.module_utils.ec2 import AWSRetry from ansible_collections.amazon.aws.plugins.module_utils.ec2 import compare_policies from ansible_collections.amazon.aws.plugins.module_utils.botocore import is_boto3_error_code class PolicyError(Exception): pass class Policy: def __init__(self, client, name, policy_name, policy_json, skip_duplicates, state, check_mode): self.client = client self.name = name self.policy_name = policy_name self.policy_json = policy_json self.skip_duplicates = skip_duplicates self.state = state self.check_mode = check_mode self.changed = False self.original_policies = self.get_all_policies().copy() self.updated_policies = {} @staticmethod def _iam_type(): return '' def _list(self, name): return {} def list(self): try: return self._list(self.name).get('PolicyNames', []) except is_boto3_error_code('AccessDenied'): return [] def _get(self, name, policy_name): return '{}' def get(self, policy_name): try: return self._get(self.name, policy_name)['PolicyDocument'] except is_boto3_error_code('AccessDenied'): return {} def _put(self, name, policy_name, policy_doc): pass def put(self, policy_doc): self.changed = True if self.check_mode: return self._put(self.name, self.policy_name, json.dumps(policy_doc, sort_keys=True)) def _delete(self, name, policy_name): pass def delete(self): self.updated_policies = self.original_policies.copy() if self.policy_name not in self.list(): self.changed = False return self.changed = True self.updated_policies.pop(self.policy_name, None) if self.check_mode: return self._delete(self.name, self.policy_name) def get_policy_text(self): try: if self.policy_json is not None: return self.get_policy_from_json() except json.JSONDecodeError as e: raise PolicyError('Failed to decode the policy as valid JSON: %s' % str(e)) return None def get_policy_from_json(self): if isinstance(self.policy_json, string_types): pdoc = json.loads(self.policy_json) else: pdoc = self.policy_json return pdoc def get_all_policies(self): policies = {} for pol in self.list(): policies[pol] = self.get(pol) return policies def create(self): matching_policies = [] policy_doc = self.get_policy_text() policy_match = False for pol in self.list(): if not compare_policies(self.original_policies[pol], policy_doc): matching_policies.append(pol) policy_match = True self.updated_policies = self.original_policies.copy() if self.policy_name in matching_policies: return if self.skip_duplicates and policy_match: return self.put(policy_doc) self.updated_policies[self.policy_name] = policy_doc def run(self): if self.state == 'present': self.create() elif self.state == 'absent': self.delete() return { 'changed': self.changed, self._iam_type() + '_name': self.name, 'policies': self.list(), 'policy_names': self.list(), 'diff': dict( before=self.original_policies, after=self.updated_policies, ), } class UserPolicy(Policy): @staticmethod def _iam_type(): return 'user' def _list(self, name): return self.client.list_user_policies(aws_retry=True, UserName=name) def _get(self, name, policy_name): return self.client.get_user_policy(aws_retry=True, UserName=name, PolicyName=policy_name) def _put(self, name, policy_name, policy_doc): return self.client.put_user_policy(aws_retry=True, UserName=name, PolicyName=policy_name, PolicyDocument=policy_doc) def _delete(self, name, policy_name): return self.client.delete_user_policy(aws_retry=True, UserName=name, PolicyName=policy_name) class RolePolicy(Policy): @staticmethod def _iam_type(): return 'role' def _list(self, name): return self.client.list_role_policies(aws_retry=True, RoleName=name) def _get(self, name, policy_name): return self.client.get_role_policy(aws_retry=True, RoleName=name, PolicyName=policy_name) def _put(self, name, policy_name, policy_doc): return self.client.put_role_policy(aws_retry=True, RoleName=name, PolicyName=policy_name, PolicyDocument=policy_doc) def _delete(self, name, policy_name): return self.client.delete_role_policy(aws_retry=True, RoleName=name, PolicyName=policy_name) class GroupPolicy(Policy): @staticmethod def _iam_type(): return 'group' def _list(self, name): return self.client.list_group_policies(aws_retry=True, GroupName=name) def _get(self, name, policy_name): return self.client.get_group_policy(aws_retry=True, GroupName=name, PolicyName=policy_name) def _put(self, name, policy_name, policy_doc): return self.client.put_group_policy(aws_retry=True, GroupName=name, PolicyName=policy_name, PolicyDocument=policy_doc) def _delete(self, name, policy_name): return self.client.delete_group_policy(aws_retry=True, GroupName=name, PolicyName=policy_name) def main(): argument_spec = dict( iam_type=dict(required=True, choices=['user', 'group', 'role']), state=dict(default='present', choices=['present', 'absent']), iam_name=dict(required=True), policy_name=dict(required=True), policy_json=dict(type='json', default=None, required=False), skip_duplicates=dict(type='bool', default=False, required=False) ) required_if = [ ('state', 'present', ('policy_json',), True), ] module = AnsibleAWSModule( argument_spec=argument_spec, required_if=required_if, supports_check_mode=True ) args = dict( client=module.client('iam', retry_decorator=AWSRetry.jittered_backoff()), name=module.params.get('iam_name'), policy_name=module.params.get('policy_name'), policy_json=module.params.get('policy_json'), skip_duplicates=module.params.get('skip_duplicates'), state=module.params.get('state'), check_mode=module.check_mode, ) iam_type = module.params.get('iam_type') try: if iam_type == 'user': policy = UserPolicy(**args) elif iam_type == 'role': policy = RolePolicy(**args) elif iam_type == 'group': policy = GroupPolicy(**args) module.deprecate("The 'policies' return key is deprecated and will be replaced by 'policy_names'. Both values are returned for now.", date='2024-08-01', collection_name='amazon.aws') module.exit_json(**(policy.run())) except (BotoCoreError, ClientError) as e: module.fail_json_aws(e) except PolicyError as e: module.fail_json(msg=str(e)) if __name__ == '__main__': main()