Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.219.190.131
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/2/task/2/cwd/usr/lib/python3/dist-packages/ansible_test/_internal/cli/argparsing/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/2/task/2/cwd/usr/lib/python3/dist-packages/ansible_test/_internal/cli/argparsing/parsers.py
"""General purpose composite argument parsing and completion."""
from __future__ import annotations

import abc
import collections.abc as c
import contextlib
import dataclasses
import enum
import os
import re
import typing as t

# NOTE: When choosing delimiters, take into account Bash and argcomplete behavior.
#
# Recommended characters for assignment and/or continuation: `/` `:` `=`
#
# The recommended assignment_character list is due to how argcomplete handles continuation characters.
# see: https://github.com/kislyuk/argcomplete/blob/5a20d6165fbb4d4d58559378919b05964870cc16/argcomplete/__init__.py#L557-L558

PAIR_DELIMITER = ','
ASSIGNMENT_DELIMITER = '='
PATH_DELIMITER = '/'


# This class was originally frozen. However, that causes issues when running under Python 3.11.
# See: https://github.com/python/cpython/issues/99856
@dataclasses.dataclass
class Completion(Exception):
    """Base class for argument completion results."""


@dataclasses.dataclass
class CompletionUnavailable(Completion):
    """Argument completion unavailable."""

    message: str = 'No completions available.'


@dataclasses.dataclass
class CompletionError(Completion):
    """Argument completion error."""

    message: t.Optional[str] = None


@dataclasses.dataclass
class CompletionSuccess(Completion):
    """Successful argument completion result."""

    list_mode: bool
    consumed: str
    continuation: str
    matches: list[str] = dataclasses.field(default_factory=list)

    @property
    def preserve(self) -> bool:
        """
        True if argcomplete should not mangle completion values, otherwise False.
        Only used when more than one completion exists to avoid overwriting the word undergoing completion.
        """
        return len(self.matches) > 1 and self.list_mode

    @property
    def completions(self) -> list[str]:
        """List of completion values to return to argcomplete."""
        completions = self.matches
        continuation = '' if self.list_mode else self.continuation

        if not self.preserve:
            # include the existing prefix to avoid rewriting the word undergoing completion
            completions = [f'{self.consumed}{completion}{continuation}' for completion in completions]

        return completions


class ParserMode(enum.Enum):
    """Mode the parser is operating in."""

    PARSE = enum.auto()
    COMPLETE = enum.auto()
    LIST = enum.auto()


class ParserError(Exception):
    """Base class for all parsing exceptions."""


@dataclasses.dataclass
class ParserBoundary:
    """Boundary details for parsing composite input."""

    delimiters: str
    required: bool
    match: t.Optional[str] = None
    ready: bool = True


@dataclasses.dataclass
class ParserState:
    """State of the composite argument parser."""

    mode: ParserMode
    remainder: str = ''
    consumed: str = ''
    boundaries: list[ParserBoundary] = dataclasses.field(default_factory=list)
    namespaces: list[t.Any] = dataclasses.field(default_factory=list)
    parts: list[str] = dataclasses.field(default_factory=list)

    @property
    def incomplete(self) -> bool:
        """True if parsing is incomplete (unparsed input remains), otherwise False."""
        return self.remainder is not None

    def match(self, value: str, choices: list[str]) -> bool:
        """Return True if the given value matches the provided choices, taking into account parsing boundaries, otherwise return False."""
        if self.current_boundary:
            delimiters, delimiter = self.current_boundary.delimiters, self.current_boundary.match
        else:
            delimiters, delimiter = '', None

        for choice in choices:
            if choice.rstrip(delimiters) == choice:
                # choice is not delimited
                if value == choice:
                    return True  # value matched
            else:
                # choice is delimited
                if f'{value}{delimiter}' == choice:
                    return True  # value and delimiter matched

        return False

    def read(self) -> str:
        """Read and return the next input segment, taking into account parsing boundaries."""
        delimiters = "".join(boundary.delimiters for boundary in self.boundaries)

        if delimiters:
            pattern = '([' + re.escape(delimiters) + '])'
            regex = re.compile(pattern)
            parts = regex.split(self.remainder, 1)
        else:
            parts = [self.remainder]

        if len(parts) > 1:
            value, delimiter, remainder = parts
        else:
            value, delimiter, remainder = parts[0], None, None

        for boundary in reversed(self.boundaries):
            if delimiter and delimiter in boundary.delimiters:
                boundary.match = delimiter
                self.consumed += value + delimiter
                break

            boundary.match = None
            boundary.ready = False

            if boundary.required:
                break

        self.remainder = remainder

        return value

    @property
    def root_namespace(self) -> t.Any:
        """THe root namespace."""
        return self.namespaces[0]

    @property
    def current_namespace(self) -> t.Any:
        """The current namespace."""
        return self.namespaces[-1]

    @property
    def current_boundary(self) -> t.Optional[ParserBoundary]:
        """The current parser boundary, if any, otherwise None."""
        return self.boundaries[-1] if self.boundaries else None

    def set_namespace(self, namespace: t.Any) -> None:
        """Set the current namespace."""
        self.namespaces.append(namespace)

    @contextlib.contextmanager
    def delimit(self, delimiters: str, required: bool = True) -> c.Iterator[ParserBoundary]:
        """Context manager for delimiting parsing of input."""
        boundary = ParserBoundary(delimiters=delimiters, required=required)

        self.boundaries.append(boundary)

        try:
            yield boundary
        finally:
            self.boundaries.pop()

        if boundary.required and not boundary.match:
            raise ParserError('required delimiter not found, hit up-level delimiter or end of input instead')


@dataclasses.dataclass
class DocumentationState:
    """State of the composite argument parser's generated documentation."""

    sections: dict[str, str] = dataclasses.field(default_factory=dict)


class Parser(metaclass=abc.ABCMeta):
    """Base class for all composite argument parsers."""

    @abc.abstractmethod
    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""

    def document(self, state: DocumentationState) -> t.Optional[str]:
        """Generate and return documentation for this parser."""
        raise Exception(f'Undocumented parser: {type(self)}')


class MatchConditions(enum.Flag):
    """Acceptable condition(s) for matching user input to available choices."""

    CHOICE = enum.auto()
    """Match any choice."""
    ANY = enum.auto()
    """Match any non-empty string."""
    NOTHING = enum.auto()
    """Match an empty string which is not followed by a boundary match."""


class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta):
    """Base class for composite argument parsers which use a list of choices that can be generated during completion."""

    def __init__(self, conditions: MatchConditions = MatchConditions.CHOICE) -> None:
        self.conditions = conditions

    @abc.abstractmethod
    def get_choices(self, value: str) -> list[str]:
        """Return a list of valid choices based on the given input value."""

    def no_completion_match(self, value: str) -> CompletionUnavailable:  # pylint: disable=unused-argument
        """Return an instance of CompletionUnavailable when no match was found for the given value."""
        return CompletionUnavailable()

    def no_choices_available(self, value: str) -> ParserError:  # pylint: disable=unused-argument
        """Return an instance of ParserError when parsing fails and no choices are available."""
        return ParserError('No choices available.')

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        value = state.read()
        choices = self.get_choices(value)

        if state.mode == ParserMode.PARSE or state.incomplete:
            if self.conditions & MatchConditions.CHOICE and state.match(value, choices):
                return value

            if self.conditions & MatchConditions.ANY and value:
                return value

            if self.conditions & MatchConditions.NOTHING and not value and state.current_boundary and not state.current_boundary.match:
                return value

            if state.mode == ParserMode.PARSE:
                if choices:
                    raise ParserError(f'"{value}" not in: {", ".join(choices)}')

                raise self.no_choices_available(value)

            raise CompletionUnavailable()

        matches = [choice for choice in choices if choice.startswith(value)]

        if not matches:
            raise self.no_completion_match(value)

        continuation = state.current_boundary.delimiters if state.current_boundary and state.current_boundary.required else ''

        raise CompletionSuccess(
            list_mode=state.mode == ParserMode.LIST,
            consumed=state.consumed,
            continuation=continuation,
            matches=matches,
        )


class ChoicesParser(DynamicChoicesParser):
    """Composite argument parser which relies on a static list of choices."""

    def __init__(self, choices: list[str], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
        self.choices = choices

        super().__init__(conditions=conditions)

    def get_choices(self, value: str) -> list[str]:
        """Return a list of valid choices based on the given input value."""
        return self.choices

    def document(self, state: DocumentationState) -> t.Optional[str]:
        """Generate and return documentation for this parser."""
        return '|'.join(self.choices)


class EnumValueChoicesParser(ChoicesParser):
    """Composite argument parser which relies on a static list of choices derived from the values of an enum."""

    def __init__(self, enum_type: t.Type[enum.Enum], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
        self.enum_type = enum_type

        super().__init__(choices=[str(item.value) for item in enum_type], conditions=conditions)

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        value = super().parse(state)
        return self.enum_type(value)


class IntegerParser(DynamicChoicesParser):
    """Composite argument parser for integers."""

    PATTERN = re.compile('^[1-9][0-9]*$')

    def __init__(self, maximum: t.Optional[int] = None) -> None:
        self.maximum = maximum

        super().__init__()

    def get_choices(self, value: str) -> list[str]:
        """Return a list of valid choices based on the given input value."""
        if not value:
            numbers = list(range(1, 10))
        elif self.PATTERN.search(value):
            int_prefix = int(value)
            base = int_prefix * 10
            numbers = [int_prefix] + [base + i for i in range(0, 10)]
        else:
            numbers = []

        # NOTE: the minimum is currently fixed at 1

        if self.maximum is not None:
            numbers = [n for n in numbers if n <= self.maximum]

        return [str(n) for n in numbers]

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        value = super().parse(state)
        return int(value)

    def document(self, state: DocumentationState) -> t.Optional[str]:
        """Generate and return documentation for this parser."""
        return '{integer}'


class BooleanParser(ChoicesParser):
    """Composite argument parser for boolean (yes/no) values."""

    def __init__(self) -> None:
        super().__init__(['yes', 'no'])

    def parse(self, state: ParserState) -> bool:
        """Parse the input from the given state and return the result."""
        value = super().parse(state)
        return value == 'yes'


class AnyParser(ChoicesParser):
    """Composite argument parser which accepts any input value."""

    def __init__(self, nothing: bool = False, no_match_message: t.Optional[str] = None) -> None:
        self.no_match_message = no_match_message

        conditions = MatchConditions.ANY

        if nothing:
            conditions |= MatchConditions.NOTHING

        super().__init__([], conditions=conditions)

    def no_completion_match(self, value: str) -> CompletionUnavailable:
        """Return an instance of CompletionUnavailable when no match was found for the given value."""
        if self.no_match_message:
            return CompletionUnavailable(message=self.no_match_message)

        return super().no_completion_match(value)

    def no_choices_available(self, value: str) -> ParserError:
        """Return an instance of ParserError when parsing fails and no choices are available."""
        if self.no_match_message:
            return ParserError(self.no_match_message)

        return super().no_choices_available(value)


class RelativePathNameParser(DynamicChoicesParser):
    """Composite argument parser for relative path names."""

    RELATIVE_NAMES = ['.', '..']

    def __init__(self, choices: list[str]) -> None:
        self.choices = choices

        super().__init__()

    def get_choices(self, value: str) -> list[str]:
        """Return a list of valid choices based on the given input value."""
        choices = list(self.choices)

        if value in self.RELATIVE_NAMES:
            # complete relative names, but avoid suggesting them unless the current name is relative
            # unfortunately this will be sorted in reverse of what bash presents ("../ ./" instead of "./ ../")
            choices.extend(f'{item}{PATH_DELIMITER}' for item in self.RELATIVE_NAMES)

        return choices


class FileParser(Parser):
    """Composite argument parser for absolute or relative file paths."""

    def parse(self, state: ParserState) -> str:
        """Parse the input from the given state and return the result."""
        if state.mode == ParserMode.PARSE:
            path = AnyParser().parse(state)

            if not os.path.isfile(path):
                raise ParserError(f'Not a file: {path}')
        else:
            path = ''

            with state.delimit(PATH_DELIMITER, required=False) as boundary:  # type: ParserBoundary
                while boundary.ready:
                    directory = path or '.'

                    try:
                        with os.scandir(directory) as scan:  # type: c.Iterator[os.DirEntry]
                            choices = [f'{item.name}{PATH_DELIMITER}' if item.is_dir() else item.name for item in scan]
                    except OSError:
                        choices = []

                    if not path:
                        choices.append(PATH_DELIMITER)  # allow absolute paths
                        choices.append('../')  # suggest relative paths

                    part = RelativePathNameParser(choices).parse(state)
                    path += f'{part}{boundary.match or ""}'

        return path


class AbsolutePathParser(Parser):
    """Composite argument parser for absolute file paths. Paths are only verified for proper syntax, not for existence."""

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        path = ''

        with state.delimit(PATH_DELIMITER, required=False) as boundary:  # type: ParserBoundary
            while boundary.ready:
                if path:
                    path += AnyParser(nothing=True).parse(state)
                else:
                    path += ChoicesParser([PATH_DELIMITER]).parse(state)

                path += boundary.match or ''

        return path


class NamespaceParser(Parser, metaclass=abc.ABCMeta):
    """Base class for composite argument parsers that store their results in a namespace."""

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        namespace = state.current_namespace
        current = getattr(namespace, self.dest)

        if current and self.limit_one:
            if state.mode == ParserMode.PARSE:
                raise ParserError('Option cannot be specified more than once.')

            raise CompletionError('Option cannot be specified more than once.')

        value = self.get_value(state)

        if self.use_list:
            if not current:
                current = []
                setattr(namespace, self.dest, current)

            current.append(value)
        else:
            setattr(namespace, self.dest, value)

        return value

    def get_value(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result, without storing the result in the namespace."""
        return super().parse(state)

    @property
    def use_list(self) -> bool:
        """True if the destination is a list, otherwise False."""
        return False

    @property
    def limit_one(self) -> bool:
        """True if only one target is allowed, otherwise False."""
        return not self.use_list

    @property
    @abc.abstractmethod
    def dest(self) -> str:
        """The name of the attribute where the value should be stored."""


class NamespaceWrappedParser(NamespaceParser):
    """Composite argument parser that wraps a non-namespace parser and stores the result in a namespace."""

    def __init__(self, dest: str, parser: Parser) -> None:
        self._dest = dest
        self.parser = parser

    def get_value(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result, without storing the result in the namespace."""
        return self.parser.parse(state)

    @property
    def dest(self) -> str:
        """The name of the attribute where the value should be stored."""
        return self._dest


class KeyValueParser(Parser, metaclass=abc.ABCMeta):
    """Base class for key/value composite argument parsers."""

    @abc.abstractmethod
    def get_parsers(self, state: ParserState) -> dict[str, Parser]:
        """Return a dictionary of key names and value parsers."""

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        namespace = state.current_namespace
        parsers = self.get_parsers(state)
        keys = list(parsers)

        with state.delimit(PAIR_DELIMITER, required=False) as pair:  # type: ParserBoundary
            while pair.ready:
                with state.delimit(ASSIGNMENT_DELIMITER):
                    key = ChoicesParser(keys).parse(state)

                value = parsers[key].parse(state)

                setattr(namespace, key, value)

                keys.remove(key)

        return namespace


class PairParser(Parser, metaclass=abc.ABCMeta):
    """Base class for composite argument parsers consisting of a left and right argument parser, with input separated by a delimiter."""

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        namespace = self.create_namespace()

        state.set_namespace(namespace)

        with state.delimit(self.delimiter, self.required) as boundary:  # type: ParserBoundary
            choice = self.get_left_parser(state).parse(state)

        if boundary.match:
            self.get_right_parser(choice).parse(state)

        return namespace

    @property
    def required(self) -> bool:
        """True if the delimiter (and thus right parser) is required, otherwise False."""
        return False

    @property
    def delimiter(self) -> str:
        """The delimiter to use between the left and right parser."""
        return PAIR_DELIMITER

    @abc.abstractmethod
    def create_namespace(self) -> t.Any:
        """Create and return a namespace."""

    @abc.abstractmethod
    def get_left_parser(self, state: ParserState) -> Parser:
        """Return the parser for the left side."""

    @abc.abstractmethod
    def get_right_parser(self, choice: t.Any) -> Parser:
        """Return the parser for the right side."""


class TypeParser(Parser, metaclass=abc.ABCMeta):
    """Base class for composite argument parsers which parse a type name, a colon and then parse results based on the type given by the type name."""

    def get_parsers(self, state: ParserState) -> dict[str, Parser]:  # pylint: disable=unused-argument
        """Return a dictionary of type names and type parsers."""
        return self.get_stateless_parsers()

    @abc.abstractmethod
    def get_stateless_parsers(self) -> dict[str, Parser]:
        """Return a dictionary of type names and type parsers."""

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        parsers = self.get_parsers(state)

        with state.delimit(':'):
            key = ChoicesParser(list(parsers)).parse(state)

        value = parsers[key].parse(state)

        return value

Anon7 - 2022
AnonSec Team