Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.15.198.69
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible/galaxy/collection/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible/galaxy/collection/gpg.py
# -*- coding: utf-8 -*-
# Copyright: (c) 2022, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""Signature verification helpers."""

from ansible.errors import AnsibleError
from ansible.galaxy.user_agent import user_agent
from ansible.module_utils.urls import open_url

import contextlib
import os
import subprocess
import sys
import typing as t

from dataclasses import dataclass, fields as dc_fields
from functools import partial
from urllib.error import HTTPError, URLError

if t.TYPE_CHECKING:
    from ansible.utils.display import Display

IS_PY310_PLUS = sys.version_info[:2] >= (3, 10)

frozen_dataclass = partial(dataclass, frozen=True, **({'slots': True} if IS_PY310_PLUS else {}))


def get_signature_from_source(source, display=None):  # type: (str, t.Optional[Display]) -> str
    if display is not None:
        display.vvvv(f"Using signature at {source}")
    try:
        with open_url(
            source,
            http_agent=user_agent(),
            validate_certs=True,
            follow_redirects='safe'
        ) as resp:
            signature = resp.read()
    except (HTTPError, URLError) as e:
        raise AnsibleError(
            f"Failed to get signature for collection verification from '{source}': {e}"
        ) from e

    return signature


def run_gpg_verify(
    manifest_file,  # type: str
    signature,  # type: str
    keyring,  # type: str
    display,  # type: Display
):  # type: (...) -> tuple[str, int]
    status_fd_read, status_fd_write = os.pipe()

    # running the gpg command will create the keyring if it does not exist
    remove_keybox = not os.path.exists(keyring)

    cmd = [
        'gpg',
        f'--status-fd={status_fd_write}',
        '--verify',
        '--batch',
        '--no-tty',
        '--no-default-keyring',
        f'--keyring={keyring}',
        '-',
        manifest_file,
    ]
    cmd_str = ' '.join(cmd)
    display.vvvv(f"Running command '{cmd}'")

    try:
        p = subprocess.Popen(
            cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            pass_fds=(status_fd_write,),
            encoding='utf8',
        )
    except (FileNotFoundError, subprocess.SubprocessError) as err:
        raise AnsibleError(
            f"Failed during GnuPG verification with command '{cmd_str}': {err}"
        ) from err
    else:
        stdout, stderr = p.communicate(input=signature)
    finally:
        os.close(status_fd_write)

    if remove_keybox:
        with contextlib.suppress(OSError):
            os.remove(keyring)

    with os.fdopen(status_fd_read) as f:
        stdout = f.read()
        display.vvvv(
            f"stdout: \n{stdout}\nstderr: \n{stderr}\n(exit code {p.returncode})"
        )
        return stdout, p.returncode


def parse_gpg_errors(status_out):  # type: (str) -> t.Iterator[GpgBaseError]
    for line in status_out.splitlines():
        if not line:
            continue
        try:
            _dummy, status, remainder = line.split(maxsplit=2)
        except ValueError:
            _dummy, status = line.split(maxsplit=1)
            remainder = None

        try:
            cls = GPG_ERROR_MAP[status]
        except KeyError:
            continue

        fields = [status]
        if remainder:
            fields.extend(
                remainder.split(
                    None,
                    len(dc_fields(cls)) - 2
                )
            )

        yield cls(*fields)


@frozen_dataclass
class GpgBaseError(Exception):
    status: str

    @classmethod
    def get_gpg_error_description(cls) -> str:
        """Return the current class description."""
        return ' '.join(cls.__doc__.split())

    def __post_init__(self):
        for field in dc_fields(self):
            super(GpgBaseError, self).__setattr__(field.name, field.type(getattr(self, field.name)))


@frozen_dataclass
class GpgExpSig(GpgBaseError):
    """The signature with the keyid is good, but the signature is expired."""
    keyid: str
    username: str


@frozen_dataclass
class GpgExpKeySig(GpgBaseError):
    """The signature with the keyid is good, but the signature was made by an expired key."""
    keyid: str
    username: str


@frozen_dataclass
class GpgRevKeySig(GpgBaseError):
    """The signature with the keyid is good, but the signature was made by a revoked key."""
    keyid: str
    username: str


@frozen_dataclass
class GpgBadSig(GpgBaseError):
    """The signature with the keyid has not been verified okay."""
    keyid: str
    username: str


@frozen_dataclass
class GpgErrSig(GpgBaseError):
    """"It was not possible to check the signature.  This may be caused by
    a missing public key or an unsupported algorithm.  A RC of 4
    indicates unknown algorithm, a 9 indicates a missing public
    key.
    """
    keyid: str
    pkalgo: int
    hashalgo: int
    sig_class: str
    time: int
    rc: int
    fpr: str


@frozen_dataclass
class GpgNoPubkey(GpgBaseError):
    """The public key is not available."""
    keyid: str


@frozen_dataclass
class GpgMissingPassPhrase(GpgBaseError):
    """No passphrase was supplied."""


@frozen_dataclass
class GpgBadPassphrase(GpgBaseError):
    """The supplied passphrase was wrong or not given."""
    keyid: str


@frozen_dataclass
class GpgNoData(GpgBaseError):
    """No data has been found.  Codes for WHAT are:
    - 1 :: No armored data.
    - 2 :: Expected a packet but did not find one.
    - 3 :: Invalid packet found, this may indicate a non OpenPGP
           message.
    - 4 :: Signature expected but not found.
    """
    what: str


@frozen_dataclass
class GpgUnexpected(GpgBaseError):
    """No data has been found.  Codes for WHAT are:
    - 1 :: No armored data.
    - 2 :: Expected a packet but did not find one.
    - 3 :: Invalid packet found, this may indicate a non OpenPGP
           message.
    - 4 :: Signature expected but not found.
    """
    what: str


@frozen_dataclass
class GpgError(GpgBaseError):
    """This is a generic error status message, it might be followed by error location specific data."""
    location: str
    code: int
    more: str = ""


@frozen_dataclass
class GpgFailure(GpgBaseError):
    """This is the counterpart to SUCCESS and used to indicate a program failure."""
    location: str
    code: int


@frozen_dataclass
class GpgBadArmor(GpgBaseError):
    """The ASCII armor is corrupted."""


@frozen_dataclass
class GpgKeyExpired(GpgBaseError):
    """The key has expired."""
    timestamp: int


@frozen_dataclass
class GpgKeyRevoked(GpgBaseError):
    """The used key has been revoked by its owner."""


@frozen_dataclass
class GpgNoSecKey(GpgBaseError):
    """The secret key is not available."""
    keyid: str


GPG_ERROR_MAP = {
    'EXPSIG': GpgExpSig,
    'EXPKEYSIG': GpgExpKeySig,
    'REVKEYSIG': GpgRevKeySig,
    'BADSIG': GpgBadSig,
    'ERRSIG': GpgErrSig,
    'NO_PUBKEY': GpgNoPubkey,
    'MISSING_PASSPHRASE': GpgMissingPassPhrase,
    'BAD_PASSPHRASE': GpgBadPassphrase,
    'NODATA': GpgNoData,
    'UNEXPECTED': GpgUnexpected,
    'ERROR': GpgError,
    'FAILURE': GpgFailure,
    'BADARMOR': GpgBadArmor,
    'KEYEXPIRED': GpgKeyExpired,
    'KEYREVOKED': GpgKeyRevoked,
    'NO_SECKEY': GpgNoSecKey,
}

Anon7 - 2022
AnonSec Team