Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.15.225.110
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible/config/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible/config/manager.py
# Copyright: (c) 2017, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import atexit
import configparser
import os
import os.path
import sys
import stat
import tempfile
import traceback

from collections import namedtuple
from collections.abc import Mapping, Sequence
from jinja2.nativetypes import NativeEnvironment

from ansible.errors import AnsibleOptionsError, AnsibleError
from ansible.module_utils._text import to_text, to_bytes, to_native
from ansible.module_utils.common.yaml import yaml_load
from ansible.module_utils.six import string_types
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.parsing.quoting import unquote
from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode
from ansible.utils import py3compat
from ansible.utils.path import cleanup_tmp_file, makedirs_safe, unfrackpath


Plugin = namedtuple('Plugin', 'name type')
Setting = namedtuple('Setting', 'name value origin type')

INTERNAL_DEFS = {'lookup': ('_terms',)}


def _get_entry(plugin_type, plugin_name, config):
    ''' construct entry for requested config '''
    entry = ''
    if plugin_type:
        entry += 'plugin_type: %s ' % plugin_type
        if plugin_name:
            entry += 'plugin: %s ' % plugin_name
    entry += 'setting: %s ' % config
    return entry


# FIXME: see if we can unify in module_utils with similar function used by argspec
def ensure_type(value, value_type, origin=None):
    ''' return a configuration variable with casting
    :arg value: The value to ensure correct typing of
    :kwarg value_type: The type of the value.  This can be any of the following strings:
        :boolean: sets the value to a True or False value
        :bool: Same as 'boolean'
        :integer: Sets the value to an integer or raises a ValueType error
        :int: Same as 'integer'
        :float: Sets the value to a float or raises a ValueType error
        :list: Treats the value as a comma separated list.  Split the value
            and return it as a python list.
        :none: Sets the value to None
        :path: Expands any environment variables and tilde's in the value.
        :tmppath: Create a unique temporary directory inside of the directory
            specified by value and return its path.
        :temppath: Same as 'tmppath'
        :tmp: Same as 'tmppath'
        :pathlist: Treat the value as a typical PATH string.  (On POSIX, this
            means colon separated strings.)  Split the value and then expand
            each part for environment variables and tildes.
        :pathspec: Treat the value as a PATH string. Expands any environment variables
            tildes's in the value.
        :str: Sets the value to string types.
        :string: Same as 'str'
    '''

    errmsg = ''
    basedir = None
    if origin and os.path.isabs(origin) and os.path.exists(to_bytes(origin)):
        basedir = origin

    if value_type:
        value_type = value_type.lower()

    if value is not None:
        if value_type in ('boolean', 'bool'):
            value = boolean(value, strict=False)

        elif value_type in ('integer', 'int'):
            value = int(value)

        elif value_type == 'float':
            value = float(value)

        elif value_type == 'list':
            if isinstance(value, string_types):
                value = [unquote(x.strip()) for x in value.split(',')]
            elif not isinstance(value, Sequence):
                errmsg = 'list'

        elif value_type == 'none':
            if value == "None":
                value = None

            if value is not None:
                errmsg = 'None'

        elif value_type == 'path':
            if isinstance(value, string_types):
                value = resolve_path(value, basedir=basedir)
            else:
                errmsg = 'path'

        elif value_type in ('tmp', 'temppath', 'tmppath'):
            if isinstance(value, string_types):
                value = resolve_path(value, basedir=basedir)
                if not os.path.exists(value):
                    makedirs_safe(value, 0o700)
                prefix = 'ansible-local-%s' % os.getpid()
                value = tempfile.mkdtemp(prefix=prefix, dir=value)
                atexit.register(cleanup_tmp_file, value, warn=True)
            else:
                errmsg = 'temppath'

        elif value_type == 'pathspec':
            if isinstance(value, string_types):
                value = value.split(os.pathsep)

            if isinstance(value, Sequence):
                value = [resolve_path(x, basedir=basedir) for x in value]
            else:
                errmsg = 'pathspec'

        elif value_type == 'pathlist':
            if isinstance(value, string_types):
                value = [x.strip() for x in value.split(',')]

            if isinstance(value, Sequence):
                value = [resolve_path(x, basedir=basedir) for x in value]
            else:
                errmsg = 'pathlist'

        elif value_type in ('dict', 'dictionary'):
            if not isinstance(value, Mapping):
                errmsg = 'dictionary'

        elif value_type in ('str', 'string'):
            if isinstance(value, (string_types, AnsibleVaultEncryptedUnicode, bool, int, float, complex)):
                value = unquote(to_text(value, errors='surrogate_or_strict'))
            else:
                errmsg = 'string'

        # defaults to string type
        elif isinstance(value, (string_types, AnsibleVaultEncryptedUnicode)):
            value = unquote(to_text(value, errors='surrogate_or_strict'))

        if errmsg:
            raise ValueError('Invalid type provided for "%s": %s' % (errmsg, to_native(value)))

    return to_text(value, errors='surrogate_or_strict', nonstring='passthru')


# FIXME: see if this can live in utils/path
def resolve_path(path, basedir=None):
    ''' resolve relative or 'variable' paths '''
    if '{{CWD}}' in path:  # allow users to force CWD using 'magic' {{CWD}}
        path = path.replace('{{CWD}}', os.getcwd())

    return unfrackpath(path, follow=False, basedir=basedir)


# FIXME: generic file type?
def get_config_type(cfile):

    ftype = None
    if cfile is not None:
        ext = os.path.splitext(cfile)[-1]
        if ext in ('.ini', '.cfg'):
            ftype = 'ini'
        elif ext in ('.yaml', '.yml'):
            ftype = 'yaml'
        else:
            raise AnsibleOptionsError("Unsupported configuration file extension for %s: %s" % (cfile, to_native(ext)))

    return ftype


# FIXME: can move to module_utils for use for ini plugins also?
def get_ini_config_value(p, entry):
    ''' returns the value of last ini entry found '''
    value = None
    if p is not None:
        try:
            value = p.get(entry.get('section', 'defaults'), entry.get('key', ''), raw=True)
        except Exception:  # FIXME: actually report issues here
            pass
    return value


def find_ini_config_file(warnings=None):
    ''' Load INI Config File order(first found is used): ENV, CWD, HOME, /etc/ansible '''
    # FIXME: eventually deprecate ini configs

    if warnings is None:
        # Note: In this case, warnings does nothing
        warnings = set()

    # A value that can never be a valid path so that we can tell if ANSIBLE_CONFIG was set later
    # We can't use None because we could set path to None.
    SENTINEL = object

    potential_paths = []

    # Environment setting
    path_from_env = os.getenv("ANSIBLE_CONFIG", SENTINEL)
    if path_from_env is not SENTINEL:
        path_from_env = unfrackpath(path_from_env, follow=False)
        if os.path.isdir(to_bytes(path_from_env)):
            path_from_env = os.path.join(path_from_env, "ansible.cfg")
        potential_paths.append(path_from_env)

    # Current working directory
    warn_cmd_public = False
    try:
        cwd = os.getcwd()
        perms = os.stat(cwd)
        cwd_cfg = os.path.join(cwd, "ansible.cfg")
        if perms.st_mode & stat.S_IWOTH:
            # Working directory is world writable so we'll skip it.
            # Still have to look for a file here, though, so that we know if we have to warn
            if os.path.exists(cwd_cfg):
                warn_cmd_public = True
        else:
            potential_paths.append(to_text(cwd_cfg, errors='surrogate_or_strict'))
    except OSError:
        # If we can't access cwd, we'll simply skip it as a possible config source
        pass

    # Per user location
    potential_paths.append(unfrackpath("~/.ansible.cfg", follow=False))

    # System location
    potential_paths.append("/etc/ansible/ansible.cfg")

    for path in potential_paths:
        b_path = to_bytes(path)
        if os.path.exists(b_path) and os.access(b_path, os.R_OK):
            break
    else:
        path = None

    # Emit a warning if all the following are true:
    # * We did not use a config from ANSIBLE_CONFIG
    # * There's an ansible.cfg in the current working directory that we skipped
    if path_from_env != path and warn_cmd_public:
        warnings.add(u"Ansible is being run in a world writable directory (%s),"
                     u" ignoring it as an ansible.cfg source."
                     u" For more information see"
                     u" https://docs.ansible.com/ansible/devel/reference_appendices/config.html#cfg-in-world-writable-dir"
                     % to_text(cwd))

    return path


def _add_base_defs_deprecations(base_defs):
    '''Add deprecation source 'ansible.builtin' to deprecations in base.yml'''
    def process(entry):
        if 'deprecated' in entry:
            entry['deprecated']['collection_name'] = 'ansible.builtin'

    for dummy, data in base_defs.items():
        process(data)
        for section in ('ini', 'env', 'vars'):
            if section in data:
                for entry in data[section]:
                    process(entry)


class ConfigManager(object):

    DEPRECATED = []  # type: list[tuple[str, dict[str, str]]]
    WARNINGS = set()  # type: set[str]

    def __init__(self, conf_file=None, defs_file=None):

        self._base_defs = {}
        self._plugins = {}
        self._parsers = {}

        self._config_file = conf_file

        self._base_defs = self._read_config_yaml_file(defs_file or ('%s/base.yml' % os.path.dirname(__file__)))
        _add_base_defs_deprecations(self._base_defs)

        if self._config_file is None:
            # set config using ini
            self._config_file = find_ini_config_file(self.WARNINGS)

        # consume configuration
        if self._config_file:
            # initialize parser and read config
            self._parse_config_file()

        # ensure we always have config def entry
        self._base_defs['CONFIG_FILE'] = {'default': None, 'type': 'path'}

    def _read_config_yaml_file(self, yml_file):
        # TODO: handle relative paths as relative to the directory containing the current playbook instead of CWD
        # Currently this is only used with absolute paths to the `ansible/config` directory
        yml_file = to_bytes(yml_file)
        if os.path.exists(yml_file):
            with open(yml_file, 'rb') as config_def:
                return yaml_load(config_def) or {}
        raise AnsibleError(
            "Missing base YAML definition file (bad install?): %s" % to_native(yml_file))

    def _parse_config_file(self, cfile=None):
        ''' return flat configuration settings from file(s) '''
        # TODO: take list of files with merge/nomerge

        if cfile is None:
            cfile = self._config_file

        ftype = get_config_type(cfile)
        if cfile is not None:
            if ftype == 'ini':
                self._parsers[cfile] = configparser.ConfigParser(inline_comment_prefixes=(';',))
                with open(to_bytes(cfile), 'rb') as f:
                    try:
                        cfg_text = to_text(f.read(), errors='surrogate_or_strict')
                    except UnicodeError as e:
                        raise AnsibleOptionsError("Error reading config file(%s) because the config file was not utf8 encoded: %s" % (cfile, to_native(e)))
                try:
                    self._parsers[cfile].read_string(cfg_text)
                except configparser.Error as e:
                    raise AnsibleOptionsError("Error reading config file (%s): %s" % (cfile, to_native(e)))
            # FIXME: this should eventually handle yaml config files
            # elif ftype == 'yaml':
            #     with open(cfile, 'rb') as config_stream:
            #         self._parsers[cfile] = yaml_load(config_stream)
            else:
                raise AnsibleOptionsError("Unsupported configuration file type: %s" % to_native(ftype))

    def _find_yaml_config_files(self):
        ''' Load YAML Config Files in order, check merge flags, keep origin of settings'''
        pass

    def get_plugin_options(self, plugin_type, name, keys=None, variables=None, direct=None):

        options = {}
        defs = self.get_configuration_definitions(plugin_type, name)
        for option in defs:
            options[option] = self.get_config_value(option, plugin_type=plugin_type, plugin_name=name, keys=keys, variables=variables, direct=direct)

        return options

    def get_plugin_vars(self, plugin_type, name):

        pvars = []
        for pdef in self.get_configuration_definitions(plugin_type, name).values():
            if 'vars' in pdef and pdef['vars']:
                for var_entry in pdef['vars']:
                    pvars.append(var_entry['name'])
        return pvars

    def get_plugin_options_from_var(self, plugin_type, name, variable):

        options = []
        for option_name, pdef in self.get_configuration_definitions(plugin_type, name).items():
            if 'vars' in pdef and pdef['vars']:
                for var_entry in pdef['vars']:
                    if variable == var_entry['name']:
                        options.append(option_name)
        return options

    def get_configuration_definition(self, name, plugin_type=None, plugin_name=None):

        ret = {}
        if plugin_type is None:
            ret = self._base_defs.get(name, None)
        elif plugin_name is None:
            ret = self._plugins.get(plugin_type, {}).get(name, None)
        else:
            ret = self._plugins.get(plugin_type, {}).get(plugin_name, {}).get(name, None)

        return ret

    def has_configuration_definition(self, plugin_type, name):

        has = False
        if plugin_type in self._plugins:
            has = (name in self._plugins[plugin_type])

        return has

    def get_configuration_definitions(self, plugin_type=None, name=None, ignore_private=False):
        ''' just list the possible settings, either base or for specific plugins or plugin '''

        ret = {}
        if plugin_type is None:
            ret = self._base_defs
        elif name is None:
            ret = self._plugins.get(plugin_type, {})
        else:
            ret = self._plugins.get(plugin_type, {}).get(name, {})

        if ignore_private:
            for cdef in list(ret.keys()):
                if cdef.startswith('_'):
                    del ret[cdef]

        return ret

    def _loop_entries(self, container, entry_list):
        ''' repeat code for value entry assignment '''

        value = None
        origin = None
        for entry in entry_list:
            name = entry.get('name')
            try:
                temp_value = container.get(name, None)
            except UnicodeEncodeError:
                self.WARNINGS.add(u'value for config entry {0} contains invalid characters, ignoring...'.format(to_text(name)))
                continue
            if temp_value is not None:  # only set if entry is defined in container
                # inline vault variables should be converted to a text string
                if isinstance(temp_value, AnsibleVaultEncryptedUnicode):
                    temp_value = to_text(temp_value, errors='surrogate_or_strict')

                value = temp_value
                origin = name

                # deal with deprecation of setting source, if used
                if 'deprecated' in entry:
                    self.DEPRECATED.append((entry['name'], entry['deprecated']))

        return value, origin

    def get_config_value(self, config, cfile=None, plugin_type=None, plugin_name=None, keys=None, variables=None, direct=None):
        ''' wrapper '''

        try:
            value, _drop = self.get_config_value_and_origin(config, cfile=cfile, plugin_type=plugin_type, plugin_name=plugin_name,
                                                            keys=keys, variables=variables, direct=direct)
        except AnsibleError:
            raise
        except Exception as e:
            raise AnsibleError("Unhandled exception when retrieving %s:\n%s" % (config, to_native(e)), orig_exc=e)
        return value

    def get_config_value_and_origin(self, config, cfile=None, plugin_type=None, plugin_name=None, keys=None, variables=None, direct=None):
        ''' Given a config key figure out the actual value and report on the origin of the settings '''
        if cfile is None:
            # use default config
            cfile = self._config_file

        if config == 'CONFIG_FILE':
            return cfile, ''

        # Note: sources that are lists listed in low to high precedence (last one wins)
        value = None
        origin = None

        defs = self.get_configuration_definitions(plugin_type, plugin_name)
        if config in defs:

            aliases = defs[config].get('aliases', [])

            # direct setting via plugin arguments, can set to None so we bypass rest of processing/defaults
            if direct:
                if config in direct:
                    value = direct[config]
                    origin = 'Direct'
                else:
                    direct_aliases = [direct[alias] for alias in aliases if alias in direct]
                    if direct_aliases:
                        value = direct_aliases[0]
                        origin = 'Direct'

            if value is None and variables and defs[config].get('vars'):
                # Use 'variable overrides' if present, highest precedence, but only present when querying running play
                value, origin = self._loop_entries(variables, defs[config]['vars'])
                origin = 'var: %s' % origin

            # use playbook keywords if you have em
            if value is None and defs[config].get('keyword') and keys:
                value, origin = self._loop_entries(keys, defs[config]['keyword'])
                origin = 'keyword: %s' % origin

            # automap to keywords
            # TODO: deprecate these in favor of explicit keyword above
            if value is None and keys:
                if config in keys:
                    value = keys[config]
                    keyword = config

                elif aliases:
                    for alias in aliases:
                        if alias in keys:
                            value = keys[alias]
                            keyword = alias
                            break

                if value is not None:
                    origin = 'keyword: %s' % keyword

            if value is None and 'cli' in defs[config]:
                # avoid circular import .. until valid
                from ansible import context
                value, origin = self._loop_entries(context.CLIARGS, defs[config]['cli'])
                origin = 'cli: %s' % origin

            # env vars are next precedence
            if value is None and defs[config].get('env'):
                value, origin = self._loop_entries(py3compat.environ, defs[config]['env'])
                origin = 'env: %s' % origin

            # try config file entries next, if we have one
            if self._parsers.get(cfile, None) is None:
                self._parse_config_file(cfile)

            if value is None and cfile is not None:
                ftype = get_config_type(cfile)
                if ftype and defs[config].get(ftype):
                    if ftype == 'ini':
                        # load from ini config
                        try:  # FIXME: generalize _loop_entries to allow for files also, most of this code is dupe
                            for ini_entry in defs[config]['ini']:
                                temp_value = get_ini_config_value(self._parsers[cfile], ini_entry)
                                if temp_value is not None:
                                    value = temp_value
                                    origin = cfile
                                    if 'deprecated' in ini_entry:
                                        self.DEPRECATED.append(('[%s]%s' % (ini_entry['section'], ini_entry['key']), ini_entry['deprecated']))
                        except Exception as e:
                            sys.stderr.write("Error while loading ini config %s: %s" % (cfile, to_native(e)))
                    elif ftype == 'yaml':
                        # FIXME: implement, also , break down key from defs (. notation???)
                        origin = cfile

            # set default if we got here w/o a value
            if value is None:
                if defs[config].get('required', False):
                    if not plugin_type or config not in INTERNAL_DEFS.get(plugin_type, {}):
                        raise AnsibleError("No setting was provided for required configuration %s" %
                                           to_native(_get_entry(plugin_type, plugin_name, config)))
                else:
                    origin = 'default'
                    value = defs[config].get('default')
                    if isinstance(value, string_types) and (value.startswith('{{') and value.endswith('}}')) and variables is not None:
                        # template default values if possible
                        # NOTE: cannot use is_template due to circular dep
                        try:
                            t = NativeEnvironment().from_string(value)
                            value = t.render(variables)
                        except Exception:
                            pass  # not templatable

            # ensure correct type, can raise exceptions on mismatched types
            try:
                value = ensure_type(value, defs[config].get('type'), origin=origin)
            except ValueError as e:
                if origin.startswith('env:') and value == '':
                    # this is empty env var for non string so we can set to default
                    origin = 'default'
                    value = ensure_type(defs[config].get('default'), defs[config].get('type'), origin=origin)
                else:
                    raise AnsibleOptionsError('Invalid type for configuration option %s (from %s): %s' %
                                              (to_native(_get_entry(plugin_type, plugin_name, config)).strip(), origin, to_native(e)))

            # deal with restricted values
            if value is not None and 'choices' in defs[config] and defs[config]['choices'] is not None:
                invalid_choices = True  # assume the worst!
                if defs[config].get('type') == 'list':
                    # for a list type, compare all values in type are allowed
                    invalid_choices = not all(choice in defs[config]['choices'] for choice in value)
                else:
                    # these should be only the simple data types (string, int, bool, float, etc) .. ignore dicts for now
                    invalid_choices = value not in defs[config]['choices']

                if invalid_choices:

                    if isinstance(defs[config]['choices'], Mapping):
                        valid = ', '.join([to_text(k) for k in defs[config]['choices'].keys()])
                    elif isinstance(defs[config]['choices'], string_types):
                        valid = defs[config]['choices']
                    elif isinstance(defs[config]['choices'], Sequence):
                        valid = ', '.join([to_text(c) for c in defs[config]['choices']])
                    else:
                        valid = defs[config]['choices']

                    raise AnsibleOptionsError('Invalid value "%s" for configuration option "%s", valid values are: %s' %
                                              (value, to_native(_get_entry(plugin_type, plugin_name, config)), valid))

            # deal with deprecation of the setting
            if 'deprecated' in defs[config] and origin != 'default':
                self.DEPRECATED.append((config, defs[config].get('deprecated')))
        else:
            raise AnsibleError('Requested entry (%s) was not defined in configuration.' % to_native(_get_entry(plugin_type, plugin_name, config)))

        return value, origin

    def initialize_plugin_configuration_definitions(self, plugin_type, name, defs):

        if plugin_type not in self._plugins:
            self._plugins[plugin_type] = {}

        self._plugins[plugin_type][name] = defs

Anon7 - 2022
AnonSec Team