Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.142.251.204
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/test__core_stat.py
from __future__ import print_function

import os
import tempfile
import time

import gevent
import gevent.core

import gevent.testing as greentest
import gevent.testing.flaky

#pylint: disable=protected-access


DELAY = 0.5

WIN = greentest.WIN

LIBUV = greentest.LIBUV

class TestCoreStat(greentest.TestCase):

    __timeout__ = greentest.LARGE_TIMEOUT

    def setUp(self):
        super(TestCoreStat, self).setUp()
        fd, path = tempfile.mkstemp(suffix='.gevent_test_core_stat')
        os.close(fd)
        self.temp_path = path
        self.hub = gevent.get_hub()
        # If we don't specify an interval, we default to zero.
        # libev interprets that as meaning to use its default interval,
        # which is about 5 seconds. If we go below it's minimum check
        # threshold, it bumps it up to the minimum.
        self.watcher = self.hub.loop.stat(self.temp_path, interval=-1)

    def tearDown(self):
        self.watcher.close()
        if os.path.exists(self.temp_path):
            os.unlink(self.temp_path)
        super(TestCoreStat, self).tearDown()

    def _write(self):
        with open(self.temp_path, 'wb', buffering=0) as f:
            f.write(b'x')

    def _check_attr(self, name, none):
        # Deals with the complex behaviour of the 'attr' and 'prev'
        # attributes on Windows. This codifies it, rather than simply letting
        # the test fail, so we know exactly when and what changes it.
        try:
            x = getattr(self.watcher, name)
        except ImportError:
            if WIN:
                # the 'posix' module is not available
                pass
            else:
                raise
        else:
            if WIN and not LIBUV:
                # The ImportError is only raised for the first time;
                # after that, the attribute starts returning None
                self.assertIsNone(x, "Only None is supported on Windows")
            if none:
                self.assertIsNone(x, name)
            else:
                self.assertIsNotNone(x, name)

    def _wait_on_greenlet(self, func, *greenlet_args):
        start = time.time()

        self.hub.loop.update_now()
        greenlet = gevent.spawn_later(DELAY, func, *greenlet_args)
        with gevent.Timeout(5 + DELAY + 0.5):
            self.hub.wait(self.watcher)
        now = time.time()

        self.assertGreaterEqual(now, start, "Time must move forward")

        wait_duration = now - start
        reaction = wait_duration - DELAY

        if reaction <= 0.0:
            # Sigh. This is especially true on PyPy on Windows
            raise gevent.testing.flaky.FlakyTestRaceCondition(
                "Bad timer resolution (on Windows?), test is useless. Start %s, now %s" % (start, now))

        self.assertGreaterEqual(
            reaction, 0.0,
            'Watcher %s reacted too early: %.3fs' % (self.watcher, reaction))

        greenlet.join()

    def test_watcher_basics(self):
        watcher = self.watcher
        filename = self.temp_path
        self.assertEqual(watcher.path, filename)
        filenames = filename if isinstance(filename, bytes) else filename.encode('ascii')
        self.assertEqual(watcher._paths, filenames)
        self.assertEqual(watcher.interval, -1)

    def test_write(self):
        self._wait_on_greenlet(self._write)

        self._check_attr('attr', False)
        self._check_attr('prev', False)
        # The watcher interval changed after it started; -1 is illegal
        self.assertNotEqual(self.watcher.interval, -1)

    def test_unlink(self):
        self._wait_on_greenlet(os.unlink, self.temp_path)

        self._check_attr('attr', True)
        self._check_attr('prev', False)

if __name__ == '__main__':
    greentest.main()

Anon7 - 2022
AnonSec Team