Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.221.136.116
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/aws/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/aws/plugins/modules/ecs_ecr.py
#!/usr/bin/python
# -*- coding: utf-8 -*

# Copyright: Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function

__metaclass__ = type


DOCUMENTATION = '''
---
module: ecs_ecr
version_added: 1.0.0
short_description: Manage Elastic Container Registry repositories
description:
    - Manage Elastic Container Registry repositories.
options:
    name:
        description:
            - The name of the repository.
        required: true
        type: str
    registry_id:
        description:
            - AWS account id associated with the registry.
            - If not specified, the default registry is assumed.
        required: false
        type: str
    policy:
        description:
            - JSON or dict that represents the new policy.
        required: false
        type: json
    force_absent:
        description:
            - If I(force_absent=true), the repository will be removed, even if images are present.
        required: false
        default: false
        type: bool
        version_added: 4.1.0
    force_set_policy:
        description:
            - If I(force_set_policy=false), it prevents setting a policy that would prevent you from
              setting another policy in the future.
        required: false
        default: false
        type: bool
    purge_policy:
        description:
            - If C(true), remove the policy from the repository.
            - Defaults to C(false).
        required: false
        type: bool
    image_tag_mutability:
        description:
            - Configure whether repository should be mutable (ie. an already existing tag can be overwritten) or not.
        required: false
        choices: [mutable, immutable]
        default: 'mutable'
        type: str
    lifecycle_policy:
        description:
            - JSON or dict that represents the new lifecycle policy.
        required: false
        type: json
    purge_lifecycle_policy:
        description:
            - if C(true), remove the lifecycle policy from the repository.
            - Defaults to C(false).
        required: false
        type: bool
    state:
        description:
            - Create or destroy the repository.
        required: false
        choices: [present, absent]
        default: 'present'
        type: str
    scan_on_push:
        description:
            - if C(true), images are scanned for known vulnerabilities after being pushed to the repository.
        required: false
        default: false
        type: bool
        version_added: 1.3.0
    encryption_configuration:
        description:
            - The encryption configuration for the repository.
        required: false
        suboptions:
            encryption_type:
                description:
                    - The encryption type to use.
                choices: [AES256, KMS]
                default: 'AES256'
                type: str
            kms_key:
                description:
                    - If I(encryption_type=KMS), specify the KMS key to use for encryption.
                    - The alias, key ID, or full ARN of the KMS key can be specified.
                type: str
        type: dict
        version_added: 5.2.0
author:
 - David M. Lee (@leedm777)
extends_documentation_fragment:
- amazon.aws.aws
- amazon.aws.ec2
- amazon.aws.boto3

'''

EXAMPLES = '''
# If the repository does not exist, it is created. If it does exist, would not
# affect any policies already on it.
- name: ecr-repo
  community.aws.ecs_ecr:
    name: super/cool

- name: destroy-ecr-repo
  community.aws.ecs_ecr:
    name: old/busted
    state: absent

- name: Cross account ecr-repo
  community.aws.ecs_ecr:
    registry_id: 123456789012
    name: cross/account

- name: set-policy as object
  community.aws.ecs_ecr:
    name: needs-policy-object
    policy:
      Version: '2008-10-17'
      Statement:
        - Sid: read-only
          Effect: Allow
          Principal:
            AWS: '{{ read_only_arn }}'
          Action:
            - ecr:GetDownloadUrlForLayer
            - ecr:BatchGetImage
            - ecr:BatchCheckLayerAvailability

- name: set-policy as string
  community.aws.ecs_ecr:
    name: needs-policy-string
    policy: "{{ lookup('template', 'policy.json.j2') }}"

- name: delete-policy
  community.aws.ecs_ecr:
    name: needs-no-policy
    purge_policy: true

- name: create immutable ecr-repo
  community.aws.ecs_ecr:
    name: super/cool
    image_tag_mutability: immutable

- name: set-lifecycle-policy
  community.aws.ecs_ecr:
    name: needs-lifecycle-policy
    scan_on_push: true
    lifecycle_policy:
      rules:
        - rulePriority: 1
          description: new policy
          selection:
            tagStatus: untagged
            countType: sinceImagePushed
            countUnit: days
            countNumber: 365
          action:
            type: expire

- name: purge-lifecycle-policy
  community.aws.ecs_ecr:
    name: needs-no-lifecycle-policy
    purge_lifecycle_policy: true

- name: set-encryption-configuration
  community.aws.ecs_ecr:
    name: uses-custom-kms-key
    encryption_configuration:
      encryption_type: KMS
      kms_key: custom-kms-key-alias
'''

RETURN = '''
state:
    type: str
    description: The asserted state of the repository (present, absent)
    returned: always
created:
    type: bool
    description: If true, the repository was created
    returned: always
name:
    type: str
    description: The name of the repository
    returned: I(state=absent)
policy:
    type: dict
    description: The existing, created or updated repository policy.
    returned: I(state=present)
    version_added: 4.0.0
repository:
    type: dict
    description: The created or updated repository
    returned: I(state=present)
    sample:
        createdAt: '2017-01-17T08:41:32-06:00'
        registryId: '123456789012'
        repositoryArn: arn:aws:ecr:us-east-1:123456789012:repository/ecr-test-1484664090
        repositoryName: ecr-test-1484664090
        repositoryUri: 123456789012.dkr.ecr.us-east-1.amazonaws.com/ecr-test-1484664090
'''

import json
import traceback

try:
    import botocore
except ImportError:
    pass  # Handled by AnsibleAWSModule

from ansible.module_utils.common.dict_transformations import snake_dict_to_camel_dict
from ansible.module_utils.six import string_types

from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule
from ansible_collections.amazon.aws.plugins.module_utils.core import is_boto3_error_code
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import boto_exception
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import compare_policies
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import sort_json_policy_dict


def build_kwargs(registry_id):
    """
    Builds a kwargs dict which may contain the optional registryId.

    :param registry_id: Optional string containing the registryId.
    :return: kwargs dict with registryId, if given
    """
    if not registry_id:
        return dict()
    else:
        return dict(registryId=registry_id)


class EcsEcr:
    def __init__(self, module):
        self.ecr = module.client('ecr')
        self.sts = module.client('sts')
        self.check_mode = module.check_mode
        self.changed = False
        self.skipped = False

    def get_repository(self, registry_id, name):
        try:
            res = self.ecr.describe_repositories(
                repositoryNames=[name], **build_kwargs(registry_id))
            repos = res.get('repositories')
            return repos and repos[0]
        except is_boto3_error_code('RepositoryNotFoundException'):
            return None

    def get_repository_policy(self, registry_id, name):
        try:
            res = self.ecr.get_repository_policy(
                repositoryName=name, **build_kwargs(registry_id))
            text = res.get('policyText')
            return text and json.loads(text)
        except is_boto3_error_code(['RepositoryNotFoundException', 'RepositoryPolicyNotFoundException']):
            return None

    def create_repository(self, registry_id, name, image_tag_mutability, encryption_configuration):
        if registry_id:
            default_registry_id = self.sts.get_caller_identity().get('Account')
            if registry_id != default_registry_id:
                raise Exception('Cannot create repository in registry {0}.'
                                'Would be created in {1} instead.'.format(registry_id, default_registry_id))

        if encryption_configuration is None:
            encryption_configuration = dict(encryptionType='AES256')

        if not self.check_mode:
            repo = self.ecr.create_repository(
                repositoryName=name,
                imageTagMutability=image_tag_mutability,
                encryptionConfiguration=encryption_configuration).get('repository')
            self.changed = True
            return repo
        else:
            self.skipped = True
            return dict(repositoryName=name)

    def set_repository_policy(self, registry_id, name, policy_text, force):
        if not self.check_mode:
            policy = self.ecr.set_repository_policy(
                repositoryName=name,
                policyText=policy_text,
                force=force,
                **build_kwargs(registry_id))
            self.changed = True
            return policy
        else:
            self.skipped = True
            if self.get_repository(registry_id, name) is None:
                printable = name
                if registry_id:
                    printable = '{0}:{1}'.format(registry_id, name)
                raise Exception(
                    'could not find repository {0}'.format(printable))
            return

    def delete_repository(self, registry_id, name, force):
        if not self.check_mode:
            repo = self.ecr.delete_repository(
                repositoryName=name, force=force, **build_kwargs(registry_id))
            self.changed = True
            return repo
        else:
            repo = self.get_repository(registry_id, name)
            if repo:
                self.skipped = True
                return repo
            return None

    def delete_repository_policy(self, registry_id, name):
        if not self.check_mode:
            policy = self.ecr.delete_repository_policy(
                repositoryName=name, **build_kwargs(registry_id))
            self.changed = True
            return policy
        else:
            policy = self.get_repository_policy(registry_id, name)
            if policy:
                self.skipped = True
                return policy
            return None

    def put_image_tag_mutability(self, registry_id, name, new_mutability_configuration):
        repo = self.get_repository(registry_id, name)
        current_mutability_configuration = repo.get('imageTagMutability')

        if current_mutability_configuration != new_mutability_configuration:
            if not self.check_mode:
                self.ecr.put_image_tag_mutability(
                    repositoryName=name,
                    imageTagMutability=new_mutability_configuration,
                    **build_kwargs(registry_id))
            else:
                self.skipped = True
            self.changed = True

        repo['imageTagMutability'] = new_mutability_configuration
        return repo

    def get_lifecycle_policy(self, registry_id, name):
        try:
            res = self.ecr.get_lifecycle_policy(
                repositoryName=name, **build_kwargs(registry_id))
            text = res.get('lifecyclePolicyText')
            return text and json.loads(text)
        except is_boto3_error_code(['LifecyclePolicyNotFoundException', 'RepositoryNotFoundException']):
            return None

    def put_lifecycle_policy(self, registry_id, name, policy_text):
        if not self.check_mode:
            policy = self.ecr.put_lifecycle_policy(
                repositoryName=name,
                lifecyclePolicyText=policy_text,
                **build_kwargs(registry_id))
            self.changed = True
            return policy
        else:
            self.skipped = True
            if self.get_repository(registry_id, name) is None:
                printable = name
                if registry_id:
                    printable = '{0}:{1}'.format(registry_id, name)
                raise Exception(
                    'could not find repository {0}'.format(printable))
            return

    def purge_lifecycle_policy(self, registry_id, name):
        if not self.check_mode:
            policy = self.ecr.delete_lifecycle_policy(
                repositoryName=name, **build_kwargs(registry_id))
            self.changed = True
            return policy
        else:
            policy = self.get_lifecycle_policy(registry_id, name)
            if policy:
                self.skipped = True
                return policy
            return None

    def put_image_scanning_configuration(self, registry_id, name, scan_on_push):
        if not self.check_mode:
            if registry_id:
                scan = self.ecr.put_image_scanning_configuration(
                    registryId=registry_id,
                    repositoryName=name,
                    imageScanningConfiguration={'scanOnPush': scan_on_push}
                )
            else:
                scan = self.ecr.put_image_scanning_configuration(
                    repositoryName=name,
                    imageScanningConfiguration={'scanOnPush': scan_on_push}
                )
            self.changed = True
            return scan
        else:
            self.skipped = True
            return None


def sort_lists_of_strings(policy):
    for statement_index in range(0, len(policy.get('Statement', []))):
        for key in policy['Statement'][statement_index]:
            value = policy['Statement'][statement_index][key]
            if isinstance(value, list) and all(isinstance(item, string_types) for item in value):
                policy['Statement'][statement_index][key] = sorted(value)
    return policy


def run(ecr, params):
    # type: (EcsEcr, dict, int) -> Tuple[bool, dict]
    result = {}
    try:
        name = params['name']
        state = params['state']
        policy_text = params['policy']
        purge_policy = params['purge_policy']
        force_absent = params['force_absent']
        registry_id = params['registry_id']
        force_set_policy = params['force_set_policy']
        image_tag_mutability = params['image_tag_mutability'].upper()
        lifecycle_policy_text = params['lifecycle_policy']
        purge_lifecycle_policy = params['purge_lifecycle_policy']
        scan_on_push = params['scan_on_push']
        encryption_configuration = snake_dict_to_camel_dict(params['encryption_configuration'])

        # Parse policies, if they are given
        try:
            policy = policy_text and json.loads(policy_text)
        except ValueError:
            result['policy'] = policy_text
            result['msg'] = 'Could not parse policy'
            return False, result

        try:
            lifecycle_policy = \
                lifecycle_policy_text and json.loads(lifecycle_policy_text)
        except ValueError:
            result['lifecycle_policy'] = lifecycle_policy_text
            result['msg'] = 'Could not parse lifecycle_policy'
            return False, result

        result['state'] = state
        result['created'] = False

        repo = ecr.get_repository(registry_id, name)

        if state == 'present':
            result['created'] = False

            if not repo:
                repo = ecr.create_repository(
                    registry_id, name, image_tag_mutability, encryption_configuration)
                result['changed'] = True
                result['created'] = True
            else:
                if encryption_configuration is not None:
                    if repo.get('encryptionConfiguration') != encryption_configuration:
                        result['msg'] = 'Cannot modify repository encryption type'
                        return False, result

                repo = ecr.put_image_tag_mutability(registry_id, name, image_tag_mutability)
            result['repository'] = repo

            if purge_lifecycle_policy:
                original_lifecycle_policy = \
                    ecr.get_lifecycle_policy(registry_id, name)

                result['lifecycle_policy'] = None

                if original_lifecycle_policy:
                    ecr.purge_lifecycle_policy(registry_id, name)
                    result['changed'] = True

            elif lifecycle_policy_text is not None:
                try:
                    lifecycle_policy = sort_json_policy_dict(lifecycle_policy)
                    result['lifecycle_policy'] = lifecycle_policy

                    original_lifecycle_policy = ecr.get_lifecycle_policy(
                        registry_id, name)

                    if original_lifecycle_policy:
                        original_lifecycle_policy = sort_json_policy_dict(
                            original_lifecycle_policy)

                    if original_lifecycle_policy != lifecycle_policy:
                        ecr.put_lifecycle_policy(registry_id, name,
                                                 lifecycle_policy_text)
                        result['changed'] = True
                except Exception:
                    # Some failure w/ the policy. It's helpful to know what the
                    # policy is.
                    result['lifecycle_policy'] = lifecycle_policy_text
                    raise

            if purge_policy:
                original_policy = ecr.get_repository_policy(registry_id, name)

                result['policy'] = None

                if original_policy:
                    ecr.delete_repository_policy(registry_id, name)
                    result['changed'] = True

            elif policy_text is not None:
                try:
                    # Sort any lists containing only string types
                    policy = sort_lists_of_strings(policy)

                    result['policy'] = policy

                    original_policy = ecr.get_repository_policy(
                        registry_id, name)
                    if original_policy:
                        original_policy = sort_lists_of_strings(original_policy)

                    if compare_policies(original_policy, policy):
                        ecr.set_repository_policy(
                            registry_id, name, policy_text, force_set_policy)
                        result['changed'] = True
                except Exception:
                    # Some failure w/ the policy. It's helpful to know what the
                    # policy is.
                    result['policy'] = policy_text
                    raise

            else:
                original_policy = ecr.get_repository_policy(registry_id, name)
                if original_policy:
                    result['policy'] = original_policy

            original_scan_on_push = ecr.get_repository(registry_id, name)
            if original_scan_on_push is not None:
                if scan_on_push != original_scan_on_push['imageScanningConfiguration']['scanOnPush']:
                    result['changed'] = True
                    result['repository']['imageScanningConfiguration']['scanOnPush'] = scan_on_push
                    response = ecr.put_image_scanning_configuration(registry_id, name, scan_on_push)

        elif state == 'absent':
            result['name'] = name
            if repo:
                ecr.delete_repository(registry_id, name, force_absent)
                result['changed'] = True

    except Exception as err:
        msg = str(err)
        if isinstance(err, botocore.exceptions.ClientError):
            msg = boto_exception(err)
        result['msg'] = msg
        result['exception'] = traceback.format_exc()
        return False, result

    if ecr.skipped:
        result['skipped'] = True

    if ecr.changed:
        result['changed'] = True

    return True, result


def main():
    argument_spec = dict(
        name=dict(required=True),
        registry_id=dict(required=False),
        state=dict(required=False, choices=['present', 'absent'],
                   default='present'),
        force_absent=dict(required=False, type='bool', default=False),
        force_set_policy=dict(required=False, type='bool', default=False),
        policy=dict(required=False, type='json'),
        image_tag_mutability=dict(required=False, choices=['mutable', 'immutable'],
                                  default='mutable'),
        purge_policy=dict(required=False, type='bool'),
        lifecycle_policy=dict(required=False, type='json'),
        purge_lifecycle_policy=dict(required=False, type='bool'),
        scan_on_push=(dict(required=False, type='bool', default=False)),
        encryption_configuration=dict(
            required=False,
            type='dict',
            options=dict(
                encryption_type=dict(required=False, type='str', default='AES256', choices=['AES256', 'KMS']),
                kms_key=dict(required=False, type='str', no_log=False),
            ),
            required_if=[
                ['encryption_type', 'KMS', ['kms_key']],
            ],
        ),
    )
    mutually_exclusive = [
        ['policy', 'purge_policy'],
        ['lifecycle_policy', 'purge_lifecycle_policy']]

    module = AnsibleAWSModule(argument_spec=argument_spec, supports_check_mode=True, mutually_exclusive=mutually_exclusive)

    ecr = EcsEcr(module)
    passed, result = run(ecr, module.params)

    if passed:
        module.exit_json(**result)
    else:
        module.fail_json(**result)


if __name__ == '__main__':
    main()

Anon7 - 2022
AnonSec Team