Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.15.198.120
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible/galaxy/dependency_resolution/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible/galaxy/dependency_resolution/dataclasses.py
# -*- coding: utf-8 -*-
# Copyright: (c) 2020-2021, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""Dependency structs."""
# FIXME: add caching all over the place

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import os
import typing as t

from collections import namedtuple
from collections.abc import MutableSequence, MutableMapping
from glob import iglob
from urllib.parse import urlparse
from yaml import safe_load

if t.TYPE_CHECKING:
    from ansible.galaxy.collection.concrete_artifact_manager import (
        ConcreteArtifactsManager,
    )
    Collection = t.TypeVar(
        'Collection',
        'Candidate', 'Requirement',
        '_ComputedReqKindsMixin',
    )


from ansible.errors import AnsibleError, AnsibleAssertionError
from ansible.galaxy.api import GalaxyAPI
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils.common.arg_spec import ArgumentSpecValidator
from ansible.utils.collection_loader import AnsibleCollectionRef
from ansible.utils.display import Display


_ALLOW_CONCRETE_POINTER_IN_SOURCE = False  # NOTE: This is a feature flag
_GALAXY_YAML = b'galaxy.yml'
_MANIFEST_JSON = b'MANIFEST.json'
_SOURCE_METADATA_FILE = b'GALAXY.yml'

display = Display()


def get_validated_source_info(b_source_info_path, namespace, name, version):
    source_info_path = to_text(b_source_info_path, errors='surrogate_or_strict')

    if not os.path.isfile(b_source_info_path):
        return None

    try:
        with open(b_source_info_path, mode='rb') as fd:
            metadata = safe_load(fd)
    except OSError as e:
        display.warning(
            f"Error getting collection source information at '{source_info_path}': {to_text(e, errors='surrogate_or_strict')}"
        )
        return None

    if not isinstance(metadata, MutableMapping):
        display.warning(f"Error getting collection source information at '{source_info_path}': expected a YAML dictionary")
        return None

    schema_errors = _validate_v1_source_info_schema(namespace, name, version, metadata)
    if schema_errors:
        display.warning(f"Ignoring source metadata file at {source_info_path} due to the following errors:")
        display.warning("\n".join(schema_errors))
        display.warning("Correct the source metadata file by reinstalling the collection.")
        return None

    return metadata


def _validate_v1_source_info_schema(namespace, name, version, provided_arguments):
    argument_spec_data = dict(
        format_version=dict(choices=["1.0.0"]),
        download_url=dict(),
        version_url=dict(),
        server=dict(),
        signatures=dict(
            type=list,
            suboptions=dict(
                signature=dict(),
                pubkey_fingerprint=dict(),
                signing_service=dict(),
                pulp_created=dict(),
            )
        ),
        name=dict(choices=[name]),
        namespace=dict(choices=[namespace]),
        version=dict(choices=[version]),
    )

    if not isinstance(provided_arguments, dict):
        raise AnsibleError(
            f'Invalid offline source info for {namespace}.{name}:{version}, expected a dict and got {type(provided_arguments)}'
        )
    validator = ArgumentSpecValidator(argument_spec_data)
    validation_result = validator.validate(provided_arguments)

    return validation_result.error_messages


def _is_collection_src_dir(dir_path):
    b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict')
    return os.path.isfile(os.path.join(b_dir_path, _GALAXY_YAML))


def _is_installed_collection_dir(dir_path):
    b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict')
    return os.path.isfile(os.path.join(b_dir_path, _MANIFEST_JSON))


def _is_collection_dir(dir_path):
    return (
        _is_installed_collection_dir(dir_path) or
        _is_collection_src_dir(dir_path)
    )


def _find_collections_in_subdirs(dir_path):
    b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict')

    subdir_glob_pattern = os.path.join(
        b_dir_path,
        # b'*',  # namespace is supposed to be top-level per spec
        b'*',  # collection name
    )

    for subdir in iglob(subdir_glob_pattern):
        if os.path.isfile(os.path.join(subdir, _MANIFEST_JSON)):
            yield subdir
        elif os.path.isfile(os.path.join(subdir, _GALAXY_YAML)):
            yield subdir


def _is_collection_namespace_dir(tested_str):
    return any(_find_collections_in_subdirs(tested_str))


def _is_file_path(tested_str):
    return os.path.isfile(to_bytes(tested_str, errors='surrogate_or_strict'))


def _is_http_url(tested_str):
    return urlparse(tested_str).scheme.lower() in {'http', 'https'}


def _is_git_url(tested_str):
    return tested_str.startswith(('git+', 'git@'))


def _is_concrete_artifact_pointer(tested_str):
    return any(
        predicate(tested_str)
        for predicate in (
            # NOTE: Maintain the checks to be sorted from light to heavy:
            _is_git_url,
            _is_http_url,
            _is_file_path,
            _is_collection_dir,
            _is_collection_namespace_dir,
        )
    )


class _ComputedReqKindsMixin:
    UNIQUE_ATTRS = ('fqcn', 'ver', 'src', 'type')

    def __init__(self, *args, **kwargs):
        if not self.may_have_offline_galaxy_info:
            self._source_info = None
        else:
            info_path = self.construct_galaxy_info_path(to_bytes(self.src, errors='surrogate_or_strict'))

            self._source_info = get_validated_source_info(
                info_path,
                self.namespace,
                self.name,
                self.ver
            )

    def __hash__(self):
        return hash(tuple(getattr(self, attr) for attr in _ComputedReqKindsMixin.UNIQUE_ATTRS))

    def __eq__(self, candidate):
        return hash(self) == hash(candidate)

    @classmethod
    def from_dir_path_as_unknown(  # type: ignore[misc]
            cls,  # type: t.Type[Collection]
            dir_path,  # type: bytes
            art_mgr,  # type: ConcreteArtifactsManager
    ):  # type: (...)  -> Collection
        """Make collection from an unspecified dir type.

        This alternative constructor attempts to grab metadata from the
        given path if it's a directory. If there's no metadata, it
        falls back to guessing the FQCN based on the directory path and
        sets the version to "*".

        It raises a ValueError immediately if the input is not an
        existing directory path.
        """
        if not os.path.isdir(dir_path):
            raise ValueError(
                "The collection directory '{path!s}' doesn't exist".
                format(path=to_native(dir_path)),
            )

        try:
            return cls.from_dir_path(dir_path, art_mgr)
        except ValueError:
            return cls.from_dir_path_implicit(dir_path)

    @classmethod
    def from_dir_path(cls, dir_path, art_mgr):
        """Make collection from an directory with metadata."""
        b_dir_path = to_bytes(dir_path, errors='surrogate_or_strict')
        if not _is_collection_dir(b_dir_path):
            display.warning(
                u"Collection at '{path!s}' does not have a {manifest_json!s} "
                u'file, nor has it {galaxy_yml!s}: cannot detect version.'.
                format(
                    galaxy_yml=to_text(_GALAXY_YAML),
                    manifest_json=to_text(_MANIFEST_JSON),
                    path=to_text(dir_path, errors='surrogate_or_strict'),
                ),
            )
            raise ValueError(
                '`dir_path` argument must be an installed or a source'
                ' collection directory.',
            )

        tmp_inst_req = cls(None, None, dir_path, 'dir', None)
        req_version = art_mgr.get_direct_collection_version(tmp_inst_req)
        try:
            req_name = art_mgr.get_direct_collection_fqcn(tmp_inst_req)
        except TypeError as err:
            # Looks like installed/source dir but isn't: doesn't have valid metadata.
            display.warning(
                u"Collection at '{path!s}' has a {manifest_json!s} "
                u"or {galaxy_yml!s} file but it contains invalid metadata.".
                format(
                    galaxy_yml=to_text(_GALAXY_YAML),
                    manifest_json=to_text(_MANIFEST_JSON),
                    path=to_text(dir_path, errors='surrogate_or_strict'),
                ),
            )
            raise ValueError(
                "Collection at '{path!s}' has invalid metadata".
                format(path=to_text(dir_path, errors='surrogate_or_strict'))
            ) from err

        return cls(req_name, req_version, dir_path, 'dir', None)

    @classmethod
    def from_dir_path_implicit(  # type: ignore[misc]
            cls,  # type: t.Type[Collection]
            dir_path,  # type: bytes
    ):  # type: (...)  -> Collection
        """Construct a collection instance based on an arbitrary dir.

        This alternative constructor infers the FQCN based on the parent
        and current directory names. It also sets the version to "*"
        regardless of whether any of known metadata files are present.
        """
        # There is no metadata, but it isn't required for a functional collection. Determine the namespace.name from the path.
        u_dir_path = to_text(dir_path, errors='surrogate_or_strict')
        path_list = u_dir_path.split(os.path.sep)
        req_name = '.'.join(path_list[-2:])
        return cls(req_name, '*', dir_path, 'dir', None)  # type: ignore[call-arg]

    @classmethod
    def from_string(cls, collection_input, artifacts_manager, supplemental_signatures):
        req = {}
        if _is_concrete_artifact_pointer(collection_input):
            # Arg is a file path or URL to a collection
            req['name'] = collection_input
        else:
            req['name'], _sep, req['version'] = collection_input.partition(':')
            if not req['version']:
                del req['version']
        req['signatures'] = supplemental_signatures

        return cls.from_requirement_dict(req, artifacts_manager)

    @classmethod
    def from_requirement_dict(cls, collection_req, art_mgr, validate_signature_options=True):
        req_name = collection_req.get('name', None)
        req_version = collection_req.get('version', '*')
        req_type = collection_req.get('type')
        # TODO: decide how to deprecate the old src API behavior
        req_source = collection_req.get('source', None)
        req_signature_sources = collection_req.get('signatures', None)
        if req_signature_sources is not None:
            if validate_signature_options and art_mgr.keyring is None:
                raise AnsibleError(
                    f"Signatures were provided to verify {req_name} but no keyring was configured."
                )

            if not isinstance(req_signature_sources, MutableSequence):
                req_signature_sources = [req_signature_sources]
            req_signature_sources = frozenset(req_signature_sources)

        if req_type is None:
            if (  # FIXME: decide on the future behavior:
                    _ALLOW_CONCRETE_POINTER_IN_SOURCE
                    and req_source is not None
                    and _is_concrete_artifact_pointer(req_source)
            ):
                src_path = req_source
            elif (
                    req_name is not None
                    and AnsibleCollectionRef.is_valid_collection_name(req_name)
            ):
                req_type = 'galaxy'
            elif (
                    req_name is not None
                    and _is_concrete_artifact_pointer(req_name)
            ):
                src_path, req_name = req_name, None
            else:
                dir_tip_tmpl = (  # NOTE: leading LFs are for concat
                    '\n\nTip: Make sure you are pointing to the right '
                    'subdirectory — `{src!s}` looks like a directory '
                    'but it is neither a collection, nor a namespace '
                    'dir.'
                )

                if req_source is not None and os.path.isdir(req_source):
                    tip = dir_tip_tmpl.format(src=req_source)
                elif req_name is not None and os.path.isdir(req_name):
                    tip = dir_tip_tmpl.format(src=req_name)
                elif req_name:
                    tip = '\n\nCould not find {0}.'.format(req_name)
                else:
                    tip = ''

                raise AnsibleError(  # NOTE: I'd prefer a ValueError instead
                    'Neither the collection requirement entry key '
                    "'name', nor 'source' point to a concrete "
                    "resolvable collection artifact. Also 'name' is "
                    'not an FQCN. A valid collection name must be in '
                    'the format <namespace>.<collection>. Please make '
                    'sure that the namespace and the collection name '
                    'contain characters from [a-zA-Z0-9_] only.'
                    '{extra_tip!s}'.format(extra_tip=tip),
                )

        if req_type is None:
            if _is_git_url(src_path):
                req_type = 'git'
                req_source = src_path
            elif _is_http_url(src_path):
                req_type = 'url'
                req_source = src_path
            elif _is_file_path(src_path):
                req_type = 'file'
                req_source = src_path
            elif _is_collection_dir(src_path):
                if _is_installed_collection_dir(src_path) and _is_collection_src_dir(src_path):
                    # Note that ``download`` requires a dir with a ``galaxy.yml`` and fails if it
                    # doesn't exist, but if a ``MANIFEST.json`` also exists, it would be used
                    # instead of the ``galaxy.yml``.
                    raise AnsibleError(
                        u"Collection requirement at '{path!s}' has both a {manifest_json!s} "
                        u"file and a {galaxy_yml!s}.\nThe requirement must either be an installed "
                        u"collection directory or a source collection directory, not both.".
                        format(
                            path=to_text(src_path, errors='surrogate_or_strict'),
                            manifest_json=to_text(_MANIFEST_JSON),
                            galaxy_yml=to_text(_GALAXY_YAML),
                        )
                    )
                req_type = 'dir'
                req_source = src_path
            elif _is_collection_namespace_dir(src_path):
                req_name = None  # No name for a virtual req or "namespace."?
                req_type = 'subdirs'
                req_source = src_path
            else:
                raise AnsibleError(  # NOTE: this is never supposed to be hit
                    'Failed to automatically detect the collection '
                    'requirement type.',
                )

        if req_type not in {'file', 'galaxy', 'git', 'url', 'dir', 'subdirs'}:
            raise AnsibleError(
                "The collection requirement entry key 'type' must be "
                'one of file, galaxy, git, dir, subdirs, or url.'
            )

        if req_name is None and req_type == 'galaxy':
            raise AnsibleError(
                'Collections requirement entry should contain '
                "the key 'name' if it's requested from a Galaxy-like "
                'index server.',
            )

        if req_type != 'galaxy' and req_source is None:
            req_source, req_name = req_name, None

        if (
                req_type == 'galaxy' and
                isinstance(req_source, GalaxyAPI) and
                not _is_http_url(req_source.api_server)
        ):
            raise AnsibleError(
                "Collections requirement 'source' entry should contain "
                'a valid Galaxy API URL but it does not: {not_url!s} '
                'is not an HTTP URL.'.
                format(not_url=req_source.api_server),
            )

        tmp_inst_req = cls(req_name, req_version, req_source, req_type, req_signature_sources)

        if req_type not in {'galaxy', 'subdirs'} and req_name is None:
            req_name = art_mgr.get_direct_collection_fqcn(tmp_inst_req)  # TODO: fix the cache key in artifacts manager?

        if req_type not in {'galaxy', 'subdirs'} and req_version == '*':
            req_version = art_mgr.get_direct_collection_version(tmp_inst_req)

        return cls(
            req_name, req_version,
            req_source, req_type,
            req_signature_sources,
        )

    def __repr__(self):
        return (
            '<{self!s} of type {coll_type!r} from {src!s}>'.
            format(self=self, coll_type=self.type, src=self.src or 'Galaxy')
        )

    def __str__(self):
        return to_native(self.__unicode__())

    def __unicode__(self):
        if self.fqcn is None:
            return (
                f'{self.type} collection from a Git repo' if self.is_scm
                else f'{self.type} collection from a namespace'
            )

        return (
            u'{fqcn!s}:{ver!s}'.
            format(fqcn=to_text(self.fqcn), ver=to_text(self.ver))
        )

    @property
    def may_have_offline_galaxy_info(self):
        if self.fqcn is None:
            # Virtual collection
            return False
        elif not self.is_dir or self.src is None or not _is_collection_dir(self.src):
            # Not a dir or isn't on-disk
            return False
        return True

    def construct_galaxy_info_path(self, b_collection_path):
        if not self.may_have_offline_galaxy_info and not self.type == 'galaxy':
            raise TypeError('Only installed collections from a Galaxy server have offline Galaxy info')

        # Store Galaxy metadata adjacent to the namespace of the collection
        # Chop off the last two parts of the path (/ns/coll) to get the dir containing the ns
        b_src = to_bytes(b_collection_path, errors='surrogate_or_strict')
        b_path_parts = b_src.split(to_bytes(os.path.sep))[0:-2]
        b_metadata_dir = to_bytes(os.path.sep).join(b_path_parts)

        # ns.coll-1.0.0.info
        b_dir_name = to_bytes(f"{self.namespace}.{self.name}-{self.ver}.info", errors="surrogate_or_strict")

        # collections/ansible_collections/ns.coll-1.0.0.info/GALAXY.yml
        return os.path.join(b_metadata_dir, b_dir_name, _SOURCE_METADATA_FILE)

    def _get_separate_ns_n_name(self):  # FIXME: use LRU cache
        return self.fqcn.split('.')

    @property
    def namespace(self):
        if self.is_virtual:
            raise TypeError(f'{self.type} collections do not have a namespace')

        return self._get_separate_ns_n_name()[0]

    @property
    def name(self):
        if self.is_virtual:
            raise TypeError(f'{self.type} collections do not have a name')

        return self._get_separate_ns_n_name()[-1]

    @property
    def canonical_package_id(self):
        if not self.is_virtual:
            return to_native(self.fqcn)

        return (
            '<virtual namespace from {src!s} of type {src_type!s}>'.
            format(src=to_native(self.src), src_type=to_native(self.type))
        )

    @property
    def is_virtual(self):
        return self.is_scm or self.is_subdirs

    @property
    def is_file(self):
        return self.type == 'file'

    @property
    def is_dir(self):
        return self.type == 'dir'

    @property
    def namespace_collection_paths(self):
        return [
            to_native(path)
            for path in _find_collections_in_subdirs(self.src)
        ]

    @property
    def is_subdirs(self):
        return self.type == 'subdirs'

    @property
    def is_url(self):
        return self.type == 'url'

    @property
    def is_scm(self):
        return self.type == 'git'

    @property
    def is_concrete_artifact(self):
        return self.type in {'git', 'url', 'file', 'dir', 'subdirs'}

    @property
    def is_online_index_pointer(self):
        return not self.is_concrete_artifact

    @property
    def source_info(self):
        return self._source_info


RequirementNamedTuple = namedtuple('Requirement', ('fqcn', 'ver', 'src', 'type', 'signature_sources'))  # type: ignore[name-match]


CandidateNamedTuple = namedtuple('Candidate', ('fqcn', 'ver', 'src', 'type', 'signatures'))  # type: ignore[name-match]


class Requirement(
        _ComputedReqKindsMixin,
        RequirementNamedTuple,
):
    """An abstract requirement request."""

    def __new__(cls, *args, **kwargs):
        self = RequirementNamedTuple.__new__(cls, *args, **kwargs)
        return self

    def __init__(self, *args, **kwargs):
        super(Requirement, self).__init__()


class Candidate(
        _ComputedReqKindsMixin,
        CandidateNamedTuple,
):
    """A concrete collection candidate with its version resolved."""

    def __new__(cls, *args, **kwargs):
        self = CandidateNamedTuple.__new__(cls, *args, **kwargs)
        return self

    def __init__(self, *args, **kwargs):
        super(Candidate, self).__init__()

    def with_signatures_repopulated(self):  # type: (Candidate) -> Candidate
        """Populate a new Candidate instance with Galaxy signatures.
        :raises AnsibleAssertionError: If the supplied candidate is not sourced from a Galaxy-like index.
        """
        if self.type != 'galaxy':
            raise AnsibleAssertionError(f"Invalid collection type for {self!r}: unable to get signatures from a galaxy server.")

        signatures = self.src.get_collection_signatures(self.namespace, self.name, self.ver)
        return self.__class__(self.fqcn, self.ver, self.src, self.type, frozenset([*self.signatures, *signatures]))

Anon7 - 2022
AnonSec Team