Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.219.7.43
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/sops/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/sops/plugins/modules/sops_encrypt.py
#!/usr/bin/python
# -*- coding: utf-8 -*-

# Copyright (c) 2020, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function
__metaclass__ = type


DOCUMENTATION = r'''
---
author: Felix Fontein (@felixfontein)
module: sops_encrypt
short_description: Encrypt data with sops
version_added: '0.1.0'
description:
  - Allows to encrypt binary data (Base64 encoded), text data, JSON or YAML data with sops.
options:
  path:
    description:
      - The sops encrypt file.
    type: path
    required: true
  force:
    description:
      - Force rewriting the encrypted file.
    type: bool
    default: false
  content_text:
    description:
      - The data to encrypt. Must be a Unicode text.
      - Please note that the module might not be idempotent if the text can be parsed as JSON or YAML.
      - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified.
    type: str
  content_binary:
    description:
      - The data to encrypt. Must be L(Base64 encoded,https://en.wikipedia.org/wiki/Base64) binary data.
      - Please note that the module might not be idempotent if the data can be parsed as JSON or YAML.
      - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified.
    type: str
  content_json:
    description:
      - The data to encrypt. Must be a JSON dictionary.
      - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified.
    type: dict
  content_yaml:
    description:
      - The data to encrypt. Must be a YAML dictionary.
      - Please note that Ansible only allows to pass data that can be represented as a JSON dictionary.
      - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified.
    type: dict
extends_documentation_fragment:
  - ansible.builtin.files
  - community.sops.sops
  - community.sops.sops.encrypt_specific
  - community.sops.attributes
  - community.sops.attributes.files
attributes:
  check_mode:
    support: full
  diff_mode:
    support: none
  safe_file_operations:
    support: full
seealso:
  - ref: community.sops.sops lookup <ansible_collections.community.sops.sops_lookup>
    description: The sops lookup can be used decrypt sops-encrypted files.
  # - plugin: community.sops.sops
  #   plugin_type: lookup
'''

EXAMPLES = r'''
- name: Encrypt a secret text
  community.sops.sops_encrypt:
    path: text-data.sops
    content_text: This is a secret text.

- name: Encrypt the contents of a file
  community.sops.sops_encrypt:
    path: binary-data.sops
    content_binary: "{{ lookup('ansible.builtin.file', '/path/to/file', rstrip=false) | b64encode }}"

- name: Encrypt some datastructure as YAML
  community.sops.sops_encrypt:
    path: stuff.sops.yaml
    content_yaml: "{{ result }}"
'''

RETURN = r''' # '''


import base64
import json
import os
import traceback

from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.common.text.converters import to_text

from ansible_collections.community.sops.plugins.module_utils.io import write_file
from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError, get_sops_argument_spec

try:
    import yaml
    YAML_IMP_ERR = None
    HAS_YAML = True
except ImportError:
    YAML_IMP_ERR = traceback.format_exc()
    HAS_YAML = False
    yaml = None


def get_data_type(module):
    if module.params['content_text'] is not None:
        return 'binary'
    if module.params['content_binary'] is not None:
        return 'binary'
    if module.params['content_json'] is not None:
        return 'json'
    if module.params['content_yaml'] is not None:
        return 'yaml'
    module.fail_json(msg='Internal error: unknown content type')


def compare_encoded_content(module, binary_data, content):
    if module.params['content_text'] is not None:
        return content == module.params['content_text'].encode('utf-8')
    if module.params['content_binary'] is not None:
        return content == binary_data
    if module.params['content_json'] is not None:
        # Compare JSON
        try:
            return json.loads(content) == module.params['content_json']
        except Exception:
            # Treat parsing errors as content not equal
            return False
    if module.params['content_yaml'] is not None:
        # Compare YAML
        try:
            return yaml.safe_load(content) == module.params['content_yaml']
        except Exception:
            # Treat parsing errors as content not equal
            return False
    module.fail_json(msg='Internal error: unknown content type')


def get_encoded_type_content(module, binary_data):
    if module.params['content_text'] is not None:
        return 'binary', module.params['content_text'].encode('utf-8')
    if module.params['content_binary'] is not None:
        return 'binary', binary_data
    if module.params['content_json'] is not None:
        return 'json', json.dumps(module.params['content_json']).encode('utf-8')
    if module.params['content_yaml'] is not None:
        return 'yaml', yaml.safe_dump(module.params['content_yaml']).encode('utf-8')
    module.fail_json(msg='Internal error: unknown content type')


def main():
    argument_spec = dict(
        path=dict(type='path', required=True),
        force=dict(type='bool', default=False),
        content_text=dict(type='str', no_log=True),
        content_binary=dict(type='str', no_log=True),
        content_json=dict(type='dict', no_log=True),
        content_yaml=dict(type='dict', no_log=True),
    )
    argument_spec.update(get_sops_argument_spec(add_encrypt_specific=True))
    module = AnsibleModule(
        argument_spec=argument_spec,
        mutually_exclusive=[
            ('content_text', 'content_binary', 'content_json', 'content_yaml'),
        ],
        required_one_of=[
            ('content_text', 'content_binary', 'content_json', 'content_yaml'),
        ],
        supports_check_mode=True,
        add_file_common_args=True,
    )

    # Check YAML
    if module.params['content_yaml'] is not None and not HAS_YAML:
        module.fail_json(msg=missing_required_lib('pyyaml'), exception=YAML_IMP_ERR)

    # Decode binary data
    binary_data = None
    if module.params['content_binary'] is not None:
        try:
            binary_data = base64.b64decode(module.params['content_binary'])
        except Exception as e:
            module.fail_json(msg='Cannot decode Base64 encoded data: {0}'.format(e))

    path = module.params['path']
    directory = os.path.dirname(path) or None
    changed = False

    def get_option_value(argument_name):
        return module.params.get(argument_name)

    try:
        if module.params['force'] or not os.path.exists(path):
            # Simply encrypt
            changed = True
        else:
            # Change detection: check if encrypted data equals new data
            decrypted_content = Sops.decrypt(
                path, decode_output=False, output_type=get_data_type(module), rstrip=False,
                get_option_value=get_option_value, module=module,
            )
            if not compare_encoded_content(module, binary_data, decrypted_content):
                changed = True

        if changed and not module.check_mode:
            input_type, input_data = get_encoded_type_content(module, binary_data)
            output_type = None
            if path.endswith('.json'):
                output_type = 'json'
            elif path.endswith('.yaml'):
                output_type = 'yaml'
            data = Sops.encrypt(
                data=input_data, cwd=directory, input_type=input_type, output_type=output_type,
                get_option_value=get_option_value, module=module,
            )
            write_file(module, data)
    except SopsError as e:
        module.fail_json(msg=to_text(e))

    file_args = module.load_file_common_arguments(module.params)
    changed = module.set_fs_attributes_if_different(file_args, changed)

    module.exit_json(changed=changed)


if __name__ == '__main__':
    main()

Anon7 - 2022
AnonSec Team