Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.221.161.43
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/task/3/root/lib/python3/dist-packages/ansible/utils/collection_loader/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/task/3/root/lib/python3/dist-packages/ansible/utils/collection_loader/_collection_finder.py
# (c) 2019 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

# CAUTION: This implementation of the collection loader is used by ansible-test.
#          Because of this, it must be compatible with all Python versions supported on the controller or remote.

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import os
import os.path
import pkgutil
import re
import sys
from keyword import iskeyword
from tokenize import Name as _VALID_IDENTIFIER_REGEX


# DO NOT add new non-stdlib import deps here, this loader is used by external tools (eg ansible-test import sanity)
# that only allow stdlib and module_utils
from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes
from ansible.module_utils.six import string_types, PY3
from ._collection_config import AnsibleCollectionConfig

from contextlib import contextmanager
from types import ModuleType

try:
    from importlib import import_module
except ImportError:
    def import_module(name):  # type: ignore[misc]
        __import__(name)
        return sys.modules[name]

try:
    from importlib import reload as reload_module
except ImportError:
    # 2.7 has a global reload function instead...
    reload_module = reload  # type: ignore[name-defined]  # pylint:disable=undefined-variable

try:
    from importlib.util import spec_from_loader
except ImportError:
    pass

try:
    from importlib.machinery import FileFinder
except ImportError:
    HAS_FILE_FINDER = False
else:
    HAS_FILE_FINDER = True

# NB: this supports import sanity test providing a different impl
try:
    from ._collection_meta import _meta_yml_to_dict
except ImportError:
    _meta_yml_to_dict = None


if not hasattr(__builtins__, 'ModuleNotFoundError'):
    # this was introduced in Python 3.6
    ModuleNotFoundError = ImportError


_VALID_IDENTIFIER_STRING_REGEX = re.compile(
    ''.join((_VALID_IDENTIFIER_REGEX, r'\Z')),
)


try:  # NOTE: py3/py2 compat
    # py2 mypy can't deal with try/excepts
    is_python_identifier = str.isidentifier  # type: ignore[attr-defined]
except AttributeError:  # Python 2
    def is_python_identifier(self):  # type: (str) -> bool
        """Determine whether the given string is a Python identifier."""
        # Ref: https://stackoverflow.com/a/55802320/595220
        return bool(re.match(_VALID_IDENTIFIER_STRING_REGEX, self))


PB_EXTENSIONS = ('.yml', '.yaml')


class _AnsibleCollectionFinder:
    def __init__(self, paths=None, scan_sys_paths=True):
        # TODO: accept metadata loader override
        self._ansible_pkg_path = to_native(os.path.dirname(to_bytes(sys.modules['ansible'].__file__)))

        if isinstance(paths, string_types):
            paths = [paths]
        elif paths is None:
            paths = []

        # expand any placeholders in configured paths
        paths = [os.path.expanduser(to_native(p, errors='surrogate_or_strict')) for p in paths]

        # add syspaths if needed
        if scan_sys_paths:
            paths.extend(sys.path)

        good_paths = []
        # expand any placeholders in configured paths
        for p in paths:

            # ensure we always have ansible_collections
            if os.path.basename(p) == 'ansible_collections':
                p = os.path.dirname(p)

            if p not in good_paths and os.path.isdir(to_bytes(os.path.join(p, 'ansible_collections'), errors='surrogate_or_strict')):
                good_paths.append(p)

        self._n_configured_paths = good_paths
        self._n_cached_collection_paths = None
        self._n_cached_collection_qualified_paths = None

        self._n_playbook_paths = []

    @classmethod
    def _remove(cls):
        for mps in sys.meta_path:
            if isinstance(mps, _AnsibleCollectionFinder):
                sys.meta_path.remove(mps)

        # remove any path hooks that look like ours
        for ph in sys.path_hooks:
            if hasattr(ph, '__self__') and isinstance(ph.__self__, _AnsibleCollectionFinder):
                sys.path_hooks.remove(ph)

        # zap any cached path importer cache entries that might refer to us
        sys.path_importer_cache.clear()

        AnsibleCollectionConfig._collection_finder = None

        # validate via the public property that we really killed it
        if AnsibleCollectionConfig.collection_finder is not None:
            raise AssertionError('_AnsibleCollectionFinder remove did not reset AnsibleCollectionConfig.collection_finder')

    def _install(self):
        self._remove()
        sys.meta_path.insert(0, self)

        sys.path_hooks.insert(0, self._ansible_collection_path_hook)

        AnsibleCollectionConfig.collection_finder = self

    def _ansible_collection_path_hook(self, path):
        path = to_native(path)
        interesting_paths = self._n_cached_collection_qualified_paths
        if not interesting_paths:
            interesting_paths = []
            for p in self._n_collection_paths:
                if os.path.basename(p) != 'ansible_collections':
                    p = os.path.join(p, 'ansible_collections')

                if p not in interesting_paths:
                    interesting_paths.append(p)

            interesting_paths.insert(0, self._ansible_pkg_path)
            self._n_cached_collection_qualified_paths = interesting_paths

        if any(path.startswith(p) for p in interesting_paths):
            return _AnsiblePathHookFinder(self, path)

        raise ImportError('not interested')

    @property
    def _n_collection_paths(self):
        paths = self._n_cached_collection_paths
        if not paths:
            self._n_cached_collection_paths = paths = self._n_playbook_paths + self._n_configured_paths
        return paths

    def set_playbook_paths(self, playbook_paths):
        if isinstance(playbook_paths, string_types):
            playbook_paths = [playbook_paths]

        # track visited paths; we have to preserve the dir order as-passed in case there are duplicate collections (first one wins)
        added_paths = set()

        # de-dupe
        self._n_playbook_paths = [os.path.join(to_native(p), 'collections') for p in playbook_paths if not (p in added_paths or added_paths.add(p))]
        self._n_cached_collection_paths = None
        # HACK: playbook CLI sets this relatively late, so we've already loaded some packages whose paths might depend on this. Fix those up.
        # NB: this should NOT be used for late additions; ideally we'd fix the playbook dir setup earlier in Ansible init
        # to prevent this from occurring
        for pkg in ['ansible_collections', 'ansible_collections.ansible']:
            self._reload_hack(pkg)

    def _reload_hack(self, fullname):
        m = sys.modules.get(fullname)
        if not m:
            return
        reload_module(m)

    def _get_loader(self, fullname, path=None):
        split_name = fullname.split('.')
        toplevel_pkg = split_name[0]
        module_to_find = split_name[-1]
        part_count = len(split_name)

        if toplevel_pkg not in ['ansible', 'ansible_collections']:
            # not interested in anything other than ansible_collections (and limited cases under ansible)
            return None

        # sanity check what we're getting from import, canonicalize path values
        if part_count == 1:
            if path:
                raise ValueError('path should not be specified for top-level packages (trying to find {0})'.format(fullname))
            else:
                # seed the path to the configured collection roots
                path = self._n_collection_paths

        if part_count > 1 and path is None:
            raise ValueError('path must be specified for subpackages (trying to find {0})'.format(fullname))

        if toplevel_pkg == 'ansible':
            # something under the ansible package, delegate to our internal loader in case of redirections
            initialize_loader = _AnsibleInternalRedirectLoader
        elif part_count == 1:
            initialize_loader = _AnsibleCollectionRootPkgLoader
        elif part_count == 2:  # ns pkg eg, ansible_collections, ansible_collections.somens
            initialize_loader = _AnsibleCollectionNSPkgLoader
        elif part_count == 3:  # collection pkg eg, ansible_collections.somens.somecoll
            initialize_loader = _AnsibleCollectionPkgLoader
        else:
            # anything below the collection
            initialize_loader = _AnsibleCollectionLoader

        # NB: actual "find"ing is delegated to the constructors on the various loaders; they'll ImportError if not found
        try:
            return initialize_loader(fullname=fullname, path_list=path)
        except ImportError:
            # TODO: log attempt to load context
            return None

    def find_module(self, fullname, path=None):
        # Figure out what's being asked for, and delegate to a special-purpose loader
        return self._get_loader(fullname, path)

    def find_spec(self, fullname, path, target=None):
        loader = self._get_loader(fullname, path)

        if loader is None:
            return None

        spec = spec_from_loader(fullname, loader)
        if spec is not None and hasattr(loader, '_subpackage_search_paths'):
            spec.submodule_search_locations = loader._subpackage_search_paths
        return spec


# Implements a path_hook finder for iter_modules (since it's only path based). This finder does not need to actually
# function as a finder in most cases, since our meta_path finder is consulted first for *almost* everything, except
# pkgutil.iter_modules, and under py2, pkgutil.get_data if the parent package passed has not been loaded yet.
class _AnsiblePathHookFinder:
    def __init__(self, collection_finder, pathctx):
        # when called from a path_hook, find_module doesn't usually get the path arg, so this provides our context
        self._pathctx = to_native(pathctx)
        self._collection_finder = collection_finder
        if PY3:
            # cache the native FileFinder (take advantage of its filesystem cache for future find/load requests)
            self._file_finder = None

    # class init is fun- this method has a self arg that won't get used
    def _get_filefinder_path_hook(self=None):
        _file_finder_hook = None
        if PY3:
            # try to find the FileFinder hook to call for fallback path-based imports in Py3
            _file_finder_hook = [ph for ph in sys.path_hooks if 'FileFinder' in repr(ph)]
            if len(_file_finder_hook) != 1:
                raise Exception('need exactly one FileFinder import hook (found {0})'.format(len(_file_finder_hook)))
            _file_finder_hook = _file_finder_hook[0]

        return _file_finder_hook

    _filefinder_path_hook = _get_filefinder_path_hook()

    def _get_finder(self, fullname):
        split_name = fullname.split('.')
        toplevel_pkg = split_name[0]

        if toplevel_pkg == 'ansible_collections':
            # collections content? delegate to the collection finder
            return self._collection_finder
        else:
            # Something else; we'd normally restrict this to `ansible` descendent modules so that any weird loader
            # behavior that arbitrary Python modules have can be serviced by those loaders. In some dev/test
            # scenarios (eg a venv under a collection) our path_hook signs us up to load non-Ansible things, and
            # it's too late by the time we've reached this point, but also too expensive for the path_hook to figure
            # out what we *shouldn't* be loading with the limited info it has. So we'll just delegate to the
            # normal path-based loader as best we can to service it. This also allows us to take advantage of Python's
            # built-in FS caching and byte-compilation for most things.
            if PY3:
                # create or consult our cached file finder for this path
                if not self._file_finder:
                    try:
                        self._file_finder = _AnsiblePathHookFinder._filefinder_path_hook(self._pathctx)
                    except ImportError:
                        # FUTURE: log at a high logging level? This is normal for things like python36.zip on the path, but
                        # might not be in some other situation...
                        return None

                return self._file_finder

            # call py2's internal loader
            return pkgutil.ImpImporter(self._pathctx)

    def find_module(self, fullname, path=None):
        # we ignore the passed in path here- use what we got from the path hook init
        finder = self._get_finder(fullname)

        if finder is None:
            return None
        elif HAS_FILE_FINDER and isinstance(finder, FileFinder):
            # this codepath is erroneously used under some cases in py3,
            # and the find_module method on FileFinder does not accept the path arg
            # see https://github.com/pypa/setuptools/pull/2918
            return finder.find_module(fullname)
        else:
            return finder.find_module(fullname, path=[self._pathctx])

    def find_spec(self, fullname, target=None):
        split_name = fullname.split('.')
        toplevel_pkg = split_name[0]

        finder = self._get_finder(fullname)

        if finder is None:
            return None
        elif toplevel_pkg == 'ansible_collections':
            return finder.find_spec(fullname, path=[self._pathctx])
        else:
            return finder.find_spec(fullname)

    def iter_modules(self, prefix):
        # NB: this currently represents only what's on disk, and does not handle package redirection
        return _iter_modules_impl([self._pathctx], prefix)

    def __repr__(self):
        return "{0}(path='{1}')".format(self.__class__.__name__, self._pathctx)


class _AnsibleCollectionPkgLoaderBase:
    _allows_package_code = False

    def __init__(self, fullname, path_list=None):
        self._fullname = fullname
        self._redirect_module = None
        self._split_name = fullname.split('.')
        self._rpart_name = fullname.rpartition('.')
        self._parent_package_name = self._rpart_name[0]  # eg ansible_collections for ansible_collections.somens, '' for toplevel
        self._package_to_load = self._rpart_name[2]  # eg somens for ansible_collections.somens

        self._source_code_path = None
        self._decoded_source = None
        self._compiled_code = None

        self._validate_args()

        self._candidate_paths = self._get_candidate_paths([to_native(p) for p in path_list])
        self._subpackage_search_paths = self._get_subpackage_search_paths(self._candidate_paths)

        self._validate_final()

    # allow subclasses to validate args and sniff split values before we start digging around
    def _validate_args(self):
        if self._split_name[0] != 'ansible_collections':
            raise ImportError('this loader can only load packages from the ansible_collections package, not {0}'.format(self._fullname))

    # allow subclasses to customize candidate path filtering
    def _get_candidate_paths(self, path_list):
        return [os.path.join(p, self._package_to_load) for p in path_list]

    # allow subclasses to customize finding paths
    def _get_subpackage_search_paths(self, candidate_paths):
        # filter candidate paths for existence (NB: silently ignoring package init code and same-named modules)
        return [p for p in candidate_paths if os.path.isdir(to_bytes(p))]

    # allow subclasses to customize state validation/manipulation before we return the loader instance
    def _validate_final(self):
        return

    @staticmethod
    @contextmanager
    def _new_or_existing_module(name, **kwargs):
        # handle all-or-nothing sys.modules creation/use-existing/delete-on-exception-if-created behavior
        created_module = False
        module = sys.modules.get(name)
        try:
            if not module:
                module = ModuleType(name)
                created_module = True
                sys.modules[name] = module
            # always override the values passed, except name (allow reference aliasing)
            for attr, value in kwargs.items():
                setattr(module, attr, value)
            yield module
        except Exception:
            if created_module:
                if sys.modules.get(name):
                    sys.modules.pop(name)
            raise

    # basic module/package location support
    # NB: this does not support distributed packages!
    @staticmethod
    def _module_file_from_path(leaf_name, path):
        has_code = True
        package_path = os.path.join(to_native(path), to_native(leaf_name))
        module_path = None

        # if the submodule is a package, assemble valid submodule paths, but stop looking for a module
        if os.path.isdir(to_bytes(package_path)):
            # is there a package init?
            module_path = os.path.join(package_path, '__init__.py')
            if not os.path.isfile(to_bytes(module_path)):
                module_path = os.path.join(package_path, '__synthetic__')
                has_code = False
        else:
            module_path = package_path + '.py'
            package_path = None
            if not os.path.isfile(to_bytes(module_path)):
                raise ImportError('{0} not found at {1}'.format(leaf_name, path))

        return module_path, has_code, package_path

    def exec_module(self, module):
        # short-circuit redirect; avoid reinitializing existing modules
        if self._redirect_module:
            return

        # execute the module's code in its namespace
        code_obj = self.get_code(self._fullname)
        if code_obj is not None:  # things like NS packages that can't have code on disk will return None
            exec(code_obj, module.__dict__)

    def create_module(self, spec):
        # short-circuit redirect; we've already imported the redirected module, so just alias it and return it
        if self._redirect_module:
            return self._redirect_module
        else:
            return None

    def load_module(self, fullname):
        # short-circuit redirect; we've already imported the redirected module, so just alias it and return it
        if self._redirect_module:
            sys.modules[self._fullname] = self._redirect_module
            return self._redirect_module

        # we're actually loading a module/package
        module_attrs = dict(
            __loader__=self,
            __file__=self.get_filename(fullname),
            __package__=self._parent_package_name  # sane default for non-packages
        )

        # eg, I am a package
        if self._subpackage_search_paths is not None:  # empty is legal
            module_attrs['__path__'] = self._subpackage_search_paths
            module_attrs['__package__'] = fullname  # per PEP366

        with self._new_or_existing_module(fullname, **module_attrs) as module:
            # execute the module's code in its namespace
            code_obj = self.get_code(fullname)
            if code_obj is not None:  # things like NS packages that can't have code on disk will return None
                exec(code_obj, module.__dict__)

            return module

    def is_package(self, fullname):
        if fullname != self._fullname:
            raise ValueError('this loader cannot answer is_package for {0}, only {1}'.format(fullname, self._fullname))
        return self._subpackage_search_paths is not None

    def get_source(self, fullname):
        if self._decoded_source:
            return self._decoded_source
        if fullname != self._fullname:
            raise ValueError('this loader cannot load source for {0}, only {1}'.format(fullname, self._fullname))
        if not self._source_code_path:
            return None
        # FIXME: what do we want encoding/newline requirements to be?
        self._decoded_source = self.get_data(self._source_code_path)
        return self._decoded_source

    def get_data(self, path):
        if not path:
            raise ValueError('a path must be specified')

        # TODO: ensure we're being asked for a path below something we own
        # TODO: try to handle redirects internally?

        if not path[0] == '/':
            # relative to current package, search package paths if possible (this may not be necessary)
            # candidate_paths = [os.path.join(ssp, path) for ssp in self._subpackage_search_paths]
            raise ValueError('relative resource paths not supported')
        else:
            candidate_paths = [path]

        for p in candidate_paths:
            b_path = to_bytes(p)
            if os.path.isfile(b_path):
                with open(b_path, 'rb') as fd:
                    return fd.read()
            # HACK: if caller asks for __init__.py and the parent dir exists, return empty string (this keep consistency
            # with "collection subpackages don't require __init__.py" working everywhere with get_data
            elif b_path.endswith(b'__init__.py') and os.path.isdir(os.path.dirname(b_path)):
                return ''

        return None

    def _synthetic_filename(self, fullname):
        return '<ansible_synthetic_collection_package>'

    def get_filename(self, fullname):
        if fullname != self._fullname:
            raise ValueError('this loader cannot find files for {0}, only {1}'.format(fullname, self._fullname))

        filename = self._source_code_path

        if not filename and self.is_package(fullname):
            if len(self._subpackage_search_paths) == 1:
                filename = os.path.join(self._subpackage_search_paths[0], '__synthetic__')
            else:
                filename = self._synthetic_filename(fullname)

        return filename

    def get_code(self, fullname):
        if self._compiled_code:
            return self._compiled_code

        # this may or may not be an actual filename, but it's the value we'll use for __file__
        filename = self.get_filename(fullname)
        if not filename:
            filename = '<string>'

        source_code = self.get_source(fullname)

        # for things like synthetic modules that really have no source on disk, don't return a code object at all
        # vs things like an empty package init (which has an empty string source on disk)
        if source_code is None:
            return None

        self._compiled_code = compile(source=source_code, filename=filename, mode='exec', flags=0, dont_inherit=True)

        return self._compiled_code

    def iter_modules(self, prefix):
        return _iter_modules_impl(self._subpackage_search_paths, prefix)

    def __repr__(self):
        return '{0}(path={1})'.format(self.__class__.__name__, self._subpackage_search_paths or self._source_code_path)


class _AnsibleCollectionRootPkgLoader(_AnsibleCollectionPkgLoaderBase):
    def _validate_args(self):
        super(_AnsibleCollectionRootPkgLoader, self)._validate_args()
        if len(self._split_name) != 1:
            raise ImportError('this loader can only load the ansible_collections toplevel package, not {0}'.format(self._fullname))


# Implements Ansible's custom namespace package support.
# The ansible_collections package and one level down (collections namespaces) are Python namespace packages
# that search across all configured collection roots. The collection package (two levels down) is the first one found
# on the configured collection root path, and Python namespace package aggregation is not allowed at or below
# the collection. Implements implicit package (package dir) support for both Py2/3. Package init code is ignored
# by this loader.
class _AnsibleCollectionNSPkgLoader(_AnsibleCollectionPkgLoaderBase):
    def _validate_args(self):
        super(_AnsibleCollectionNSPkgLoader, self)._validate_args()
        if len(self._split_name) != 2:
            raise ImportError('this loader can only load collections namespace packages, not {0}'.format(self._fullname))

    def _validate_final(self):
        # special-case the `ansible` namespace, since `ansible.builtin` is magical
        if not self._subpackage_search_paths and self._package_to_load != 'ansible':
            raise ImportError('no {0} found in {1}'.format(self._package_to_load, self._candidate_paths))


# handles locating the actual collection package and associated metadata
class _AnsibleCollectionPkgLoader(_AnsibleCollectionPkgLoaderBase):
    def _validate_args(self):
        super(_AnsibleCollectionPkgLoader, self)._validate_args()
        if len(self._split_name) != 3:
            raise ImportError('this loader can only load collection packages, not {0}'.format(self._fullname))

    def _validate_final(self):
        if self._split_name[1:3] == ['ansible', 'builtin']:
            # we don't want to allow this one to have on-disk search capability
            self._subpackage_search_paths = []
        elif not self._subpackage_search_paths:
            raise ImportError('no {0} found in {1}'.format(self._package_to_load, self._candidate_paths))
        else:
            # only search within the first collection we found
            self._subpackage_search_paths = [self._subpackage_search_paths[0]]

    def _load_module(self, module):
        if not _meta_yml_to_dict:
            raise ValueError('ansible.utils.collection_loader._meta_yml_to_dict is not set')

        module._collection_meta = {}
        # TODO: load collection metadata, cache in __loader__ state

        collection_name = '.'.join(self._split_name[1:3])

        if collection_name == 'ansible.builtin':
            # ansible.builtin is a synthetic collection, get its routing config from the Ansible distro
            ansible_pkg_path = os.path.dirname(import_module('ansible').__file__)
            metadata_path = os.path.join(ansible_pkg_path, 'config/ansible_builtin_runtime.yml')
            with open(to_bytes(metadata_path), 'rb') as fd:
                raw_routing = fd.read()
        else:
            b_routing_meta_path = to_bytes(os.path.join(module.__path__[0], 'meta/runtime.yml'))
            if os.path.isfile(b_routing_meta_path):
                with open(b_routing_meta_path, 'rb') as fd:
                    raw_routing = fd.read()
            else:
                raw_routing = ''
        try:
            if raw_routing:
                routing_dict = _meta_yml_to_dict(raw_routing, (collection_name, 'runtime.yml'))
                module._collection_meta = self._canonicalize_meta(routing_dict)
        except Exception as ex:
            raise ValueError('error parsing collection metadata: {0}'.format(to_native(ex)))

        AnsibleCollectionConfig.on_collection_load.fire(collection_name=collection_name, collection_path=os.path.dirname(module.__file__))

        return module

    def exec_module(self, module):
        super(_AnsibleCollectionPkgLoader, self).exec_module(module)
        self._load_module(module)

    def create_module(self, spec):
        return None

    def load_module(self, fullname):
        module = super(_AnsibleCollectionPkgLoader, self).load_module(fullname)
        return self._load_module(module)

    def _canonicalize_meta(self, meta_dict):
        # TODO: rewrite import keys and all redirect targets that start with .. (current namespace) and . (current collection)
        # OR we could do it all on the fly?
        # if not meta_dict:
        #     return {}
        #
        # ns_name = '.'.join(self._split_name[0:2])
        # collection_name = '.'.join(self._split_name[0:3])
        #
        # #
        # for routing_type, routing_type_dict in iteritems(meta_dict.get('plugin_routing', {})):
        #     for plugin_key, plugin_dict in iteritems(routing_type_dict):
        #         redirect = plugin_dict.get('redirect', '')
        #         if redirect.startswith('..'):
        #             redirect =  redirect[2:]

        return meta_dict


# loads everything under a collection, including handling redirections defined by the collection
class _AnsibleCollectionLoader(_AnsibleCollectionPkgLoaderBase):
    # HACK: stash this in a better place
    _redirected_package_map = {}  # type: dict[str, str]
    _allows_package_code = True

    def _validate_args(self):
        super(_AnsibleCollectionLoader, self)._validate_args()
        if len(self._split_name) < 4:
            raise ValueError('this loader is only for sub-collection modules/packages, not {0}'.format(self._fullname))

    def _get_candidate_paths(self, path_list):
        if len(path_list) != 1 and self._split_name[1:3] != ['ansible', 'builtin']:
            raise ValueError('this loader requires exactly one path to search')

        return path_list

    def _get_subpackage_search_paths(self, candidate_paths):
        collection_name = '.'.join(self._split_name[1:3])
        collection_meta = _get_collection_metadata(collection_name)

        # check for explicit redirection, as well as ancestor package-level redirection (only load the actual code once!)
        redirect = None
        explicit_redirect = False

        routing_entry = _nested_dict_get(collection_meta, ['import_redirection', self._fullname])
        if routing_entry:
            redirect = routing_entry.get('redirect')

        if redirect:
            explicit_redirect = True
        else:
            redirect = _get_ancestor_redirect(self._redirected_package_map, self._fullname)

        # NB: package level redirection requires hooking all future imports beneath the redirected source package
        # in order to ensure sanity on future relative imports. We always import everything under its "real" name,
        # then add a sys.modules entry with the redirected name using the same module instance. If we naively imported
        # the source for each redirection, most submodules would import OK, but we'd have N runtime copies of the module
        # (one for each name), and relative imports that ascend above the redirected package would break (since they'd
        # see the redirected ancestor package contents instead of the package where they actually live).
        if redirect:
            # FIXME: wrap this so we can be explicit about a failed redirection
            self._redirect_module = import_module(redirect)
            if explicit_redirect and hasattr(self._redirect_module, '__path__') and self._redirect_module.__path__:
                # if the import target looks like a package, store its name so we can rewrite future descendent loads
                self._redirected_package_map[self._fullname] = redirect

            # if we redirected, don't do any further custom package logic
            return None

        # we're not doing a redirect- try to find what we need to actually load a module/package

        # this will raise ImportError if we can't find the requested module/package at all
        if not candidate_paths:
            # noplace to look, just ImportError
            raise ImportError('package has no paths')

        found_path, has_code, package_path = self._module_file_from_path(self._package_to_load, candidate_paths[0])

        # still here? we found something to load...
        if has_code:
            self._source_code_path = found_path

        if package_path:
            return [package_path]  # always needs to be a list

        return None


# This loader only answers for intercepted Ansible Python modules. Normal imports will fail here and be picked up later
# by our path_hook importer (which proxies the built-in import mechanisms, allowing normal caching etc to occur)
class _AnsibleInternalRedirectLoader:
    def __init__(self, fullname, path_list):
        self._redirect = None

        split_name = fullname.split('.')
        toplevel_pkg = split_name[0]
        module_to_load = split_name[-1]

        if toplevel_pkg != 'ansible':
            raise ImportError('not interested')

        builtin_meta = _get_collection_metadata('ansible.builtin')

        routing_entry = _nested_dict_get(builtin_meta, ['import_redirection', fullname])
        if routing_entry:
            self._redirect = routing_entry.get('redirect')

        if not self._redirect:
            raise ImportError('not redirected, go ask path_hook')

    def exec_module(self, module):
        # should never see this
        if not self._redirect:
            raise ValueError('no redirect found for {0}'.format(module.__spec__.name))

        # Replace the module with the redirect
        sys.modules[module.__spec__.name] = import_module(self._redirect)

    def create_module(self, spec):
        return None

    def load_module(self, fullname):
        # since we're delegating to other loaders, this should only be called for internal redirects where we answered
        # find_module with this loader, in which case we'll just directly import the redirection target, insert it into
        # sys.modules under the name it was requested by, and return the original module.

        # should never see this
        if not self._redirect:
            raise ValueError('no redirect found for {0}'.format(fullname))

        # FIXME: smuggle redirection context, provide warning/error that we tried and failed to redirect
        mod = import_module(self._redirect)
        sys.modules[fullname] = mod
        return mod


class AnsibleCollectionRef:
    # FUTURE: introspect plugin loaders to get these dynamically?
    VALID_REF_TYPES = frozenset(to_text(r) for r in ['action', 'become', 'cache', 'callback', 'cliconf', 'connection',
                                                     'doc_fragments', 'filter', 'httpapi', 'inventory', 'lookup',
                                                     'module_utils', 'modules', 'netconf', 'role', 'shell', 'strategy',
                                                     'terminal', 'test', 'vars', 'playbook'])

    # FIXME: tighten this up to match Python identifier reqs, etc
    VALID_SUBDIRS_RE = re.compile(to_text(r'^\w+(\.\w+)*$'))
    VALID_FQCR_RE = re.compile(to_text(r'^\w+(\.\w+){2,}$'))  # can have 0-N included subdirs as well

    def __init__(self, collection_name, subdirs, resource, ref_type):
        """
        Create an AnsibleCollectionRef from components
        :param collection_name: a collection name of the form 'namespace.collectionname'
        :param subdirs: optional subdir segments to be appended below the plugin type (eg, 'subdir1.subdir2')
        :param resource: the name of the resource being references (eg, 'mymodule', 'someaction', 'a_role')
        :param ref_type: the type of the reference, eg 'module', 'role', 'doc_fragment'
        """
        collection_name = to_text(collection_name, errors='strict')
        if subdirs is not None:
            subdirs = to_text(subdirs, errors='strict')
        resource = to_text(resource, errors='strict')
        ref_type = to_text(ref_type, errors='strict')

        if not self.is_valid_collection_name(collection_name):
            raise ValueError('invalid collection name (must be of the form namespace.collection): {0}'.format(to_native(collection_name)))

        if ref_type not in self.VALID_REF_TYPES:
            raise ValueError('invalid collection ref_type: {0}'.format(ref_type))

        self.collection = collection_name
        if subdirs:
            if not re.match(self.VALID_SUBDIRS_RE, subdirs):
                raise ValueError('invalid subdirs entry: {0} (must be empty/None or of the form subdir1.subdir2)'.format(to_native(subdirs)))
            self.subdirs = subdirs
        else:
            self.subdirs = u''

        self.resource = resource
        self.ref_type = ref_type

        package_components = [u'ansible_collections', self.collection]
        fqcr_components = [self.collection]

        self.n_python_collection_package_name = to_native('.'.join(package_components))

        if self.ref_type == u'role':
            package_components.append(u'roles')
        elif self.ref_type == u'playbook':
            package_components.append(u'playbooks')
        else:
            # we assume it's a plugin
            package_components += [u'plugins', self.ref_type]

        if self.subdirs:
            package_components.append(self.subdirs)
            fqcr_components.append(self.subdirs)

        if self.ref_type in (u'role', u'playbook'):
            # playbooks and roles are their own resource
            package_components.append(self.resource)

        fqcr_components.append(self.resource)

        self.n_python_package_name = to_native('.'.join(package_components))
        self._fqcr = u'.'.join(fqcr_components)

    def __repr__(self):
        return 'AnsibleCollectionRef(collection={0!r}, subdirs={1!r}, resource={2!r})'.format(self.collection, self.subdirs, self.resource)

    @property
    def fqcr(self):
        return self._fqcr

    @staticmethod
    def from_fqcr(ref, ref_type):
        """
        Parse a string as a fully-qualified collection reference, raises ValueError if invalid
        :param ref: collection reference to parse (a valid ref is of the form 'ns.coll.resource' or 'ns.coll.subdir1.subdir2.resource')
        :param ref_type: the type of the reference, eg 'module', 'role', 'doc_fragment'
        :return: a populated AnsibleCollectionRef object
        """
        # assuming the fq_name is of the form (ns).(coll).(optional_subdir_N).(resource_name),
        # we split the resource name off the right, split ns and coll off the left, and we're left with any optional
        # subdirs that need to be added back below the plugin-specific subdir we'll add. So:
        # ns.coll.resource -> ansible_collections.ns.coll.plugins.(plugintype).resource
        # ns.coll.subdir1.resource -> ansible_collections.ns.coll.plugins.subdir1.(plugintype).resource
        # ns.coll.rolename -> ansible_collections.ns.coll.roles.rolename
        if not AnsibleCollectionRef.is_valid_fqcr(ref):
            raise ValueError('{0} is not a valid collection reference'.format(to_native(ref)))

        ref = to_text(ref, errors='strict')
        ref_type = to_text(ref_type, errors='strict')
        ext = ''

        if ref_type == u'playbook' and ref.endswith(PB_EXTENSIONS):
            resource_splitname = ref.rsplit(u'.', 2)
            package_remnant = resource_splitname[0]
            resource = resource_splitname[1]
            ext = '.' + resource_splitname[2]
        else:
            resource_splitname = ref.rsplit(u'.', 1)
            package_remnant = resource_splitname[0]
            resource = resource_splitname[1]

        # split the left two components of the collection package name off, anything remaining is plugin-type
        # specific subdirs to be added back on below the plugin type
        package_splitname = package_remnant.split(u'.', 2)
        if len(package_splitname) == 3:
            subdirs = package_splitname[2]
        else:
            subdirs = u''

        collection_name = u'.'.join(package_splitname[0:2])

        return AnsibleCollectionRef(collection_name, subdirs, resource + ext, ref_type)

    @staticmethod
    def try_parse_fqcr(ref, ref_type):
        """
        Attempt to parse a string as a fully-qualified collection reference, returning None on failure (instead of raising an error)
        :param ref: collection reference to parse (a valid ref is of the form 'ns.coll.resource' or 'ns.coll.subdir1.subdir2.resource')
        :param ref_type: the type of the reference, eg 'module', 'role', 'doc_fragment'
        :return: a populated AnsibleCollectionRef object on successful parsing, else None
        """
        try:
            return AnsibleCollectionRef.from_fqcr(ref, ref_type)
        except ValueError:
            pass

    @staticmethod
    def legacy_plugin_dir_to_plugin_type(legacy_plugin_dir_name):
        """
        Utility method to convert from a PluginLoader dir name to a plugin ref_type
        :param legacy_plugin_dir_name: PluginLoader dir name (eg, 'action_plugins', 'library')
        :return: the corresponding plugin ref_type (eg, 'action', 'role')
        """
        legacy_plugin_dir_name = to_text(legacy_plugin_dir_name)

        plugin_type = legacy_plugin_dir_name.removesuffix(u'_plugins')

        if plugin_type == u'library':
            plugin_type = u'modules'

        if plugin_type not in AnsibleCollectionRef.VALID_REF_TYPES:
            raise ValueError('{0} cannot be mapped to a valid collection ref type'.format(to_native(legacy_plugin_dir_name)))

        return plugin_type

    @staticmethod
    def is_valid_fqcr(ref, ref_type=None):
        """
        Validates if is string is a well-formed fully-qualified collection reference (does not look up the collection itself)
        :param ref: candidate collection reference to validate (a valid ref is of the form 'ns.coll.resource' or 'ns.coll.subdir1.subdir2.resource')
        :param ref_type: optional reference type to enable deeper validation, eg 'module', 'role', 'doc_fragment'
        :return: True if the collection ref passed is well-formed, False otherwise
        """

        ref = to_text(ref)

        if not ref_type:
            return bool(re.match(AnsibleCollectionRef.VALID_FQCR_RE, ref))

        return bool(AnsibleCollectionRef.try_parse_fqcr(ref, ref_type))

    @staticmethod
    def is_valid_collection_name(collection_name):
        """
        Validates if the given string is a well-formed collection name (does not look up the collection itself)
        :param collection_name: candidate collection name to validate (a valid name is of the form 'ns.collname')
        :return: True if the collection name passed is well-formed, False otherwise
        """

        collection_name = to_text(collection_name)

        if collection_name.count(u'.') != 1:
            return False

        return all(
            # NOTE: keywords and identifiers are different in different Pythons
            not iskeyword(ns_or_name) and is_python_identifier(ns_or_name)
            for ns_or_name in collection_name.split(u'.')
        )


def _get_collection_path(collection_name):
    collection_name = to_native(collection_name)
    if not collection_name or not isinstance(collection_name, string_types) or len(collection_name.split('.')) != 2:
        raise ValueError('collection_name must be a non-empty string of the form namespace.collection')
    try:
        collection_pkg = import_module('ansible_collections.' + collection_name)
    except ImportError:
        raise ValueError('unable to locate collection {0}'.format(collection_name))

    return to_native(os.path.dirname(to_bytes(collection_pkg.__file__)))


def _get_collection_playbook_path(playbook):

    acr = AnsibleCollectionRef.try_parse_fqcr(playbook, u'playbook')
    if acr:
        try:
            # get_collection_path
            pkg = import_module(acr.n_python_collection_package_name)
        except (IOError, ModuleNotFoundError) as e:
            # leaving e as debug target, even though not used in normal code
            pkg = None

        if pkg:
            cpath = os.path.join(sys.modules[acr.n_python_collection_package_name].__file__.replace('__synthetic__', 'playbooks'))

            if acr.subdirs:
                paths = [to_native(x) for x in acr.subdirs.split(u'.')]
                paths.insert(0, cpath)
                cpath = os.path.join(*paths)

            path = os.path.join(cpath, to_native(acr.resource))
            if os.path.exists(to_bytes(path)):
                return acr.resource, path, acr.collection
            elif not acr.resource.endswith(PB_EXTENSIONS):
                for ext in PB_EXTENSIONS:
                    path = os.path.join(cpath, to_native(acr.resource + ext))
                    if os.path.exists(to_bytes(path)):
                        return acr.resource, path, acr.collection
    return None


def _get_collection_role_path(role_name, collection_list=None):
    return _get_collection_resource_path(role_name, u'role', collection_list)


def _get_collection_resource_path(name, ref_type, collection_list=None):

    if ref_type == u'playbook':
        # they are handled a bit diff due to 'extension variance' and no collection_list
        return _get_collection_playbook_path(name)

    acr = AnsibleCollectionRef.try_parse_fqcr(name, ref_type)
    if acr:
        # looks like a valid qualified collection ref; skip the collection_list
        collection_list = [acr.collection]
        subdirs = acr.subdirs
        resource = acr.resource
    elif not collection_list:
        return None  # not a FQ and no collection search list spec'd, nothing to do
    else:
        resource = name  # treat as unqualified, loop through the collection search list to try and resolve
        subdirs = ''

    for collection_name in collection_list:
        try:
            acr = AnsibleCollectionRef(collection_name=collection_name, subdirs=subdirs, resource=resource, ref_type=ref_type)
            # FIXME: error handling/logging; need to catch any import failures and move along
            pkg = import_module(acr.n_python_package_name)

            if pkg is not None:
                # the package is now loaded, get the collection's package and ask where it lives
                path = os.path.dirname(to_bytes(sys.modules[acr.n_python_package_name].__file__, errors='surrogate_or_strict'))
                return resource, to_text(path, errors='surrogate_or_strict'), collection_name

        except (IOError, ModuleNotFoundError) as e:
            continue
        except Exception as ex:
            # FIXME: pick out typical import errors first, then error logging
            continue

    return None


def _get_collection_name_from_path(path):
    """
    Return the containing collection name for a given path, or None if the path is not below a configured collection, or
    the collection cannot be loaded (eg, the collection is masked by another of the same name higher in the configured
    collection roots).
    :param path: path to evaluate for collection containment
    :return: collection name or None
    """

    # ensure we compare full paths since pkg path will be abspath
    path = to_native(os.path.abspath(to_bytes(path)))

    path_parts = path.split('/')
    if path_parts.count('ansible_collections') != 1:
        return None

    ac_pos = path_parts.index('ansible_collections')

    # make sure it's followed by at least a namespace and collection name
    if len(path_parts) < ac_pos + 3:
        return None

    candidate_collection_name = '.'.join(path_parts[ac_pos + 1:ac_pos + 3])

    try:
        # we've got a name for it, now see if the path prefix matches what the loader sees
        imported_pkg_path = to_native(os.path.dirname(to_bytes(import_module('ansible_collections.' + candidate_collection_name).__file__)))
    except ImportError:
        return None

    # reassemble the original path prefix up the collection name, and it should match what we just imported. If not
    # this is probably a collection root that's not configured.

    original_path_prefix = os.path.join('/', *path_parts[0:ac_pos + 3])

    imported_pkg_path = to_native(os.path.abspath(to_bytes(imported_pkg_path)))
    if original_path_prefix != imported_pkg_path:
        return None

    return candidate_collection_name


def _get_import_redirect(collection_meta_dict, fullname):
    if not collection_meta_dict:
        return None

    return _nested_dict_get(collection_meta_dict, ['import_redirection', fullname, 'redirect'])


def _get_ancestor_redirect(redirected_package_map, fullname):
    # walk the requested module's ancestor packages to see if any have been previously redirected
    cur_pkg = fullname
    while cur_pkg:
        cur_pkg = cur_pkg.rpartition('.')[0]
        ancestor_redirect = redirected_package_map.get(cur_pkg)
        if ancestor_redirect:
            # rewrite the prefix on fullname so we import the target first, then alias it
            redirect = ancestor_redirect + fullname[len(cur_pkg):]
            return redirect
    return None


def _nested_dict_get(root_dict, key_list):
    cur_value = root_dict
    for key in key_list:
        cur_value = cur_value.get(key)
        if not cur_value:
            return None

    return cur_value


def _iter_modules_impl(paths, prefix=''):
    # NB: this currently only iterates what's on disk- redirected modules are not considered
    if not prefix:
        prefix = ''
    else:
        prefix = to_native(prefix)
    # yield (module_loader, name, ispkg) for each module/pkg under path
    # TODO: implement ignore/silent catch for unreadable?
    for b_path in map(to_bytes, paths):
        if not os.path.isdir(b_path):
            continue
        for b_basename in sorted(os.listdir(b_path)):
            b_candidate_module_path = os.path.join(b_path, b_basename)
            if os.path.isdir(b_candidate_module_path):
                # exclude things that obviously aren't Python package dirs
                # FIXME: this dir is adjustable in py3.8+, check for it
                if b'.' in b_basename or b_basename == b'__pycache__':
                    continue

                # TODO: proper string handling?
                yield prefix + to_native(b_basename), True
            else:
                # FIXME: match builtin ordering for package/dir/file, support compiled?
                if b_basename.endswith(b'.py') and b_basename != b'__init__.py':
                    yield prefix + to_native(os.path.splitext(b_basename)[0]), False


def _get_collection_metadata(collection_name):
    collection_name = to_native(collection_name)
    if not collection_name or not isinstance(collection_name, string_types) or len(collection_name.split('.')) != 2:
        raise ValueError('collection_name must be a non-empty string of the form namespace.collection')

    try:
        collection_pkg = import_module('ansible_collections.' + collection_name)
    except ImportError:
        raise ValueError('unable to locate collection {0}'.format(collection_name))

    _collection_meta = getattr(collection_pkg, '_collection_meta', None)

    if _collection_meta is None:
        raise ValueError('collection metadata was not loaded for collection {0}'.format(collection_name))

    return _collection_meta

Anon7 - 2022
AnonSec Team