Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.142.133.41
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_test/_internal/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_test/_internal/timeout.py
"""Timeout management for tests."""
from __future__ import annotations

import dataclasses
import datetime
import functools
import os
import signal
import time
import typing as t

from .io import (
    read_json_file,
)

from .config import (
    CommonConfig,
    TestConfig,
)

from .util import (
    display,
    TimeoutExpiredError,
)

from .thread import (
    WrappedThread,
)

from .constants import (
    TIMEOUT_PATH,
)

from .test import (
    TestTimeout,
)


@dataclasses.dataclass(frozen=True)
class TimeoutDetail:
    """Details required to enforce a timeout on test execution."""

    _DEADLINE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'  # format used to maintain backwards compatibility with previous versions of ansible-test

    deadline: datetime.datetime
    duration: int | float  # minutes

    @property
    def remaining(self) -> datetime.timedelta:
        """The amount of time remaining before the timeout occurs. If the timeout has passed, this will be a negative duration."""
        return self.deadline - datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)

    def to_dict(self) -> dict[str, t.Any]:
        """Return timeout details as a dictionary suitable for JSON serialization."""
        return dict(
            deadline=self.deadline.strftime(self._DEADLINE_FORMAT),
            duration=self.duration,
        )

    @staticmethod
    def from_dict(value: dict[str, t.Any]) -> TimeoutDetail:
        """Return a TimeoutDetail instance using the value previously returned by to_dict."""
        return TimeoutDetail(
            deadline=datetime.datetime.strptime(value['deadline'], TimeoutDetail._DEADLINE_FORMAT).replace(tzinfo=datetime.timezone.utc),
            duration=value['duration'],
        )

    @staticmethod
    def create(duration: int | float) -> TimeoutDetail | None:
        """Return a new TimeoutDetail instance for the specified duration (in minutes), or None if the duration is zero."""
        if not duration:
            return None

        if duration == int(duration):
            duration = int(duration)

        return TimeoutDetail(
            deadline=datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0) + datetime.timedelta(seconds=int(duration * 60)),
            duration=duration,
        )


def get_timeout() -> TimeoutDetail | None:
    """Return details about the currently set timeout, if any, otherwise return None."""
    try:
        return TimeoutDetail.from_dict(read_json_file(TIMEOUT_PATH))
    except FileNotFoundError:
        return None


def configure_timeout(args: CommonConfig) -> None:
    """Configure the timeout."""
    if isinstance(args, TestConfig):
        configure_test_timeout(args)  # only tests are subject to the timeout


def configure_test_timeout(args: TestConfig) -> None:
    """Configure the test timeout."""
    timeout = get_timeout()

    if not timeout:
        return

    timeout_remaining = timeout.remaining

    test_timeout = TestTimeout(timeout.duration)

    if timeout_remaining <= datetime.timedelta():
        test_timeout.write(args)

        raise TimeoutExpiredError(f'The {timeout.duration} minute test timeout expired {timeout_remaining * -1} ago at {timeout.deadline}.')

    display.info(f'The {timeout.duration} minute test timeout expires in {timeout_remaining} at {timeout.deadline}.', verbosity=1)

    def timeout_handler(_dummy1: t.Any, _dummy2: t.Any) -> None:
        """Runs when SIGUSR1 is received."""
        test_timeout.write(args)

        raise TimeoutExpiredError(f'Tests aborted after exceeding the {timeout.duration} minute time limit.')

    def timeout_waiter(timeout_seconds: int) -> None:
        """Background thread which will kill the current process if the timeout elapses."""
        time.sleep(timeout_seconds)
        os.kill(os.getpid(), signal.SIGUSR1)

    signal.signal(signal.SIGUSR1, timeout_handler)

    instance = WrappedThread(functools.partial(timeout_waiter, timeout_remaining.total_seconds()))
    instance.daemon = True
    instance.start()

Anon7 - 2022
AnonSec Team