Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 13.58.82.214
Web Server : Apache/2.4.61 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/test__core_watcher.py
from __future__ import absolute_import, print_function

import gevent.testing as greentest
from gevent import config
from gevent.testing.sysinfo import CFFI_BACKEND

from gevent.core import READ # pylint:disable=no-name-in-module
from gevent.core import WRITE # pylint:disable=no-name-in-module


class Test(greentest.TestCase):

    __timeout__ = None

    def setUp(self):
        super(Test, self).setUp()
        self.loop = config.loop(default=False)
        self.timer = self.loop.timer(0.01)

    def tearDown(self):
        if self.timer is not None:
            self.timer.close()
        if self.loop is not None:
            self.loop.destroy()
        self.loop = self.timer = None
        super(Test, self).tearDown()

    def test_non_callable_to_start(self):
        # test that cannot pass non-callable thing to start()
        self.assertRaises(TypeError, self.timer.start, None)
        self.assertRaises(TypeError, self.timer.start, 5)

    def test_non_callable_after_start(self):
        # test that cannot set 'callback' to non-callable thing later either
        lst = []
        timer = self.timer
        timer.start(lst.append)


        with self.assertRaises(TypeError):
            timer.callback = False

        with self.assertRaises(TypeError):
            timer.callback = 5

    def test_args_can_be_changed_after_start(self):
        lst = []
        timer = self.timer
        self.timer.start(lst.append)
        self.assertEqual(timer.args, ())
        timer.args = (1, 2, 3)
        self.assertEqual(timer.args, (1, 2, 3))

        # Only tuple can be args
        with self.assertRaises(TypeError):
            timer.args = 5
        with self.assertRaises(TypeError):
            timer.args = [4, 5]

        self.assertEqual(timer.args, (1, 2, 3))

        # None also works, means empty tuple
        # XXX why?
        timer.args = None
        self.assertEqual(timer.args, None)


    def test_run(self):
        loop = self.loop
        lst = []

        self.timer.start(lambda *args: lst.append(args))

        loop.run()
        loop.update_now()

        self.assertEqual(lst, [()])

        # Even if we lose all references to it, the ref in the callback
        # keeps it alive
        self.timer.start(reset, self.timer, lst)
        self.timer = None
        loop.run()
        self.assertEqual(lst, [(), 25])

    def test_invalid_fd(self):
        loop = self.loop

        # Negative case caught everywhere. ValueError
        # on POSIX, OSError on Windows Py3, IOError on Windows Py2
        with self.assertRaises((ValueError, OSError, IOError)):
            loop.io(-1, READ)


    @greentest.skipOnWindows("Stdout can't be watched on Win32")
    def test_reuse_io(self):
        loop = self.loop

        # Watchers aren't reused once all outstanding
        # refs go away BUT THEY MUST BE CLOSED
        tty_watcher = loop.io(1, WRITE)
        watcher_handle = tty_watcher._watcher if CFFI_BACKEND else tty_watcher
        tty_watcher.close()
        del tty_watcher
        # XXX: Note there is a cycle in the CFFI code
        # from watcher_handle._handle -> watcher_handle.
        # So it doesn't go away until a GC runs.
        import gc
        gc.collect()

        tty_watcher = loop.io(1, WRITE)
        self.assertIsNot(tty_watcher._watcher if CFFI_BACKEND else tty_watcher, watcher_handle)
        tty_watcher.close()


def reset(watcher, lst):
    watcher.args = None
    watcher.callback = lambda: None
    lst.append(25)
    watcher.close()


if __name__ == '__main__':
    greentest.main()

Anon7 - 2022
AnonSec Team