Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.129.72.26
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_test/_internal/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_test/_internal/completion.py
"""Loading, parsing and storing of completion configurations."""
from __future__ import annotations

import abc
import dataclasses
import enum
import os
import typing as t

from .constants import (
    CONTROLLER_PYTHON_VERSIONS,
    SUPPORTED_PYTHON_VERSIONS,
)

from .util import (
    ANSIBLE_TEST_DATA_ROOT,
    cache,
    read_lines_without_comments,
)

from .data import (
    data_context,
)

from .become import (
    SUPPORTED_BECOME_METHODS,
)


class CGroupVersion(enum.Enum):
    """The control group version(s) required by a container."""

    NONE = 'none'
    V1_ONLY = 'v1-only'
    V2_ONLY = 'v2-only'
    V1_V2 = 'v1-v2'

    def __repr__(self) -> str:
        return f'{self.__class__.__name__}.{self.name}'


class AuditMode(enum.Enum):
    """The audit requirements of a container."""

    NONE = 'none'
    REQUIRED = 'required'

    def __repr__(self) -> str:
        return f'{self.__class__.__name__}.{self.name}'


@dataclasses.dataclass(frozen=True)
class CompletionConfig(metaclass=abc.ABCMeta):
    """Base class for completion configuration."""

    name: str

    @property
    @abc.abstractmethod
    def is_default(self) -> bool:
        """True if the completion entry is only used for defaults, otherwise False."""


@dataclasses.dataclass(frozen=True)
class PosixCompletionConfig(CompletionConfig, metaclass=abc.ABCMeta):
    """Base class for completion configuration of POSIX environments."""

    @property
    @abc.abstractmethod
    def supported_pythons(self) -> list[str]:
        """Return a list of the supported Python versions."""

    @abc.abstractmethod
    def get_python_path(self, version: str) -> str:
        """Return the path of the requested Python version."""

    def get_default_python(self, controller: bool) -> str:
        """Return the default Python version for a controller or target as specified."""
        context_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS
        version = [python for python in self.supported_pythons if python in context_pythons][0]
        return version

    @property
    def controller_supported(self) -> bool:
        """True if at least one Python version is provided which supports the controller, otherwise False."""
        return any(version in CONTROLLER_PYTHON_VERSIONS for version in self.supported_pythons)


@dataclasses.dataclass(frozen=True)
class PythonCompletionConfig(PosixCompletionConfig, metaclass=abc.ABCMeta):
    """Base class for completion configuration of Python environments."""

    python: str = ''
    python_dir: str = '/usr/bin'

    @property
    def supported_pythons(self) -> list[str]:
        """Return a list of the supported Python versions."""
        versions = self.python.split(',') if self.python else []
        versions = [version for version in versions if version in SUPPORTED_PYTHON_VERSIONS]
        return versions

    def get_python_path(self, version: str) -> str:
        """Return the path of the requested Python version."""
        return os.path.join(self.python_dir, f'python{version}')


@dataclasses.dataclass(frozen=True)
class RemoteCompletionConfig(CompletionConfig):
    """Base class for completion configuration of remote environments provisioned through Ansible Core CI."""

    provider: t.Optional[str] = None
    arch: t.Optional[str] = None

    @property
    def platform(self) -> str:
        """The name of the platform."""
        return self.name.partition('/')[0]

    @property
    def version(self) -> str:
        """The version of the platform."""
        return self.name.partition('/')[2]

    @property
    def is_default(self) -> bool:
        """True if the completion entry is only used for defaults, otherwise False."""
        return not self.version

    def __post_init__(self):
        if not self.provider:
            raise Exception(f'Remote completion entry "{self.name}" must provide a "provider" setting.')

        if not self.arch:
            raise Exception(f'Remote completion entry "{self.name}" must provide a "arch" setting.')


@dataclasses.dataclass(frozen=True)
class InventoryCompletionConfig(CompletionConfig):
    """Configuration for inventory files."""

    def __init__(self) -> None:
        super().__init__(name='inventory')

    @property
    def is_default(self) -> bool:
        """True if the completion entry is only used for defaults, otherwise False."""
        return False


@dataclasses.dataclass(frozen=True)
class PosixSshCompletionConfig(PythonCompletionConfig):
    """Configuration for a POSIX host reachable over SSH."""

    def __init__(self, user: str, host: str) -> None:
        super().__init__(
            name=f'{user}@{host}',
            python=','.join(SUPPORTED_PYTHON_VERSIONS),
        )

    @property
    def is_default(self) -> bool:
        """True if the completion entry is only used for defaults, otherwise False."""
        return False


@dataclasses.dataclass(frozen=True)
class DockerCompletionConfig(PythonCompletionConfig):
    """Configuration for Docker containers."""

    image: str = ''
    seccomp: str = 'default'
    cgroup: str = CGroupVersion.V1_V2.value
    audit: str = AuditMode.REQUIRED.value  # most containers need this, so the default is required, leaving it to be opt-out for containers which don't need it
    placeholder: bool = False

    @property
    def is_default(self) -> bool:
        """True if the completion entry is only used for defaults, otherwise False."""
        return False

    @property
    def audit_enum(self) -> AuditMode:
        """The audit requirements for the container. Raises an exception if the value is invalid."""
        try:
            return AuditMode(self.audit)
        except ValueError:
            raise ValueError(f'Docker completion entry "{self.name}" has an invalid value "{self.audit}" for the "audit" setting.') from None

    @property
    def cgroup_enum(self) -> CGroupVersion:
        """The control group version(s) required by the container. Raises an exception if the value is invalid."""
        try:
            return CGroupVersion(self.cgroup)
        except ValueError:
            raise ValueError(f'Docker completion entry "{self.name}" has an invalid value "{self.cgroup}" for the "cgroup" setting.') from None

    def __post_init__(self):
        if not self.image:
            raise Exception(f'Docker completion entry "{self.name}" must provide an "image" setting.')

        if not self.supported_pythons and not self.placeholder:
            raise Exception(f'Docker completion entry "{self.name}" must provide a "python" setting.')

        # verify properties can be correctly parsed to enums
        assert self.audit_enum
        assert self.cgroup_enum


@dataclasses.dataclass(frozen=True)
class NetworkRemoteCompletionConfig(RemoteCompletionConfig):
    """Configuration for remote network platforms."""

    collection: str = ''
    connection: str = ''
    placeholder: bool = False

    def __post_init__(self):
        if not self.placeholder:
            super().__post_init__()


@dataclasses.dataclass(frozen=True)
class PosixRemoteCompletionConfig(RemoteCompletionConfig, PythonCompletionConfig):
    """Configuration for remote POSIX platforms."""

    become: t.Optional[str] = None
    placeholder: bool = False

    def __post_init__(self):
        if not self.placeholder:
            super().__post_init__()

        if self.become and self.become not in SUPPORTED_BECOME_METHODS:
            raise Exception(f'POSIX remote completion entry "{self.name}" setting "become" must be omitted or one of: {", ".join(SUPPORTED_BECOME_METHODS)}')

        if not self.supported_pythons:
            if self.version and not self.placeholder:
                raise Exception(f'POSIX remote completion entry "{self.name}" must provide a "python" setting.')
        else:
            if not self.version:
                raise Exception(f'POSIX remote completion entry "{self.name}" is a platform default and cannot provide a "python" setting.')


@dataclasses.dataclass(frozen=True)
class WindowsRemoteCompletionConfig(RemoteCompletionConfig):
    """Configuration for remote Windows platforms."""


TCompletionConfig = t.TypeVar('TCompletionConfig', bound=CompletionConfig)


def load_completion(name: str, completion_type: t.Type[TCompletionConfig]) -> dict[str, TCompletionConfig]:
    """Load the named completion entries, returning them in dictionary form using the specified completion type."""
    lines = read_lines_without_comments(os.path.join(ANSIBLE_TEST_DATA_ROOT, 'completion', '%s.txt' % name), remove_blank_lines=True)

    if data_context().content.collection:
        context = 'collection'
    else:
        context = 'ansible-core'

    items = {name: data for name, data in [parse_completion_entry(line) for line in lines] if data.get('context', context) == context}

    for item in items.values():
        item.pop('context', None)
        item.pop('placeholder', None)

    completion = {name: completion_type(name=name, **data) for name, data in items.items()}

    return completion


def parse_completion_entry(value: str) -> tuple[str, dict[str, str]]:
    """Parse the given completion entry, returning the entry name and a dictionary of key/value settings."""
    values = value.split()

    name = values[0]
    data = {kvp[0]: kvp[1] if len(kvp) > 1 else '' for kvp in [item.split('=', 1) for item in values[1:]]}

    return name, data


def filter_completion(
    completion: dict[str, TCompletionConfig],
    controller_only: bool = False,
    include_defaults: bool = False,
) -> dict[str, TCompletionConfig]:
    """Return the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified."""
    if controller_only:
        # The cast is needed because mypy gets confused here and forgets that completion values are TCompletionConfig.
        completion = {name: t.cast(TCompletionConfig, config) for name, config in completion.items() if
                      isinstance(config, PosixCompletionConfig) and config.controller_supported}

    if not include_defaults:
        completion = {name: config for name, config in completion.items() if not config.is_default}

    return completion


@cache
def docker_completion() -> dict[str, DockerCompletionConfig]:
    """Return docker completion entries."""
    return load_completion('docker', DockerCompletionConfig)


@cache
def remote_completion() -> dict[str, PosixRemoteCompletionConfig]:
    """Return remote completion entries."""
    return load_completion('remote', PosixRemoteCompletionConfig)


@cache
def windows_completion() -> dict[str, WindowsRemoteCompletionConfig]:
    """Return windows completion entries."""
    return load_completion('windows', WindowsRemoteCompletionConfig)


@cache
def network_completion() -> dict[str, NetworkRemoteCompletionConfig]:
    """Return network completion entries."""
    return load_completion('network', NetworkRemoteCompletionConfig)

Anon7 - 2022
AnonSec Team