Server IP : 85.214.239.14 / Your IP : 3.145.170.164 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/task/3/cwd/srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/ |
Upload File : |
import gevent.testing as greentest import gevent from gevent.hub import get_hub import sys SHOULD_EXPIRE = 0.01 if not greentest.RUNNING_ON_CI: SHOULD_NOT_EXPIRE = SHOULD_EXPIRE * 2.0 else: SHOULD_NOT_EXPIRE = SHOULD_EXPIRE * 20.0 class TestDirectRaise(greentest.TestCase): switch_expected = False def test_direct_raise_class(self): try: raise gevent.Timeout except gevent.Timeout as t: assert not t.pending, repr(t) def test_direct_raise_instance(self): timeout = gevent.Timeout() try: raise timeout except gevent.Timeout as t: assert timeout is t, (timeout, t) assert not t.pending, repr(t) class Test(greentest.TestCase): def _test(self, timeout, close): try: get_hub().switch() self.fail('Must raise Timeout') except gevent.Timeout as ex: if ex is not timeout: raise if close: ex.close() return ex def _check_expires(self, timeout): timeout.start() self._test(timeout, False) # Restart timeout.start() return self._test(timeout, True) def test_expires(self): timeout = gevent.Timeout(SHOULD_EXPIRE) self._check_expires(timeout) def test_expires_false(self): # A False exception value only matters to a # context manager timeout = gevent.Timeout(SHOULD_EXPIRE, False) self._check_expires(timeout) def test_expires_str(self): # str values are accepted but not documented; they change # the message timeout = gevent.Timeout(SHOULD_EXPIRE, 'XXX') ex = self._check_expires(timeout) self.assertTrue(str(ex).endswith('XXX')) def assert_type_err(self, ex): # PyPy3 uses 'exceptions must derive', everyone else uses "exceptions must be" self.assertTrue("exceptions must be" in str(ex) or "exceptions must derive" in str(ex), str(ex)) def test_expires_non_exception(self): timeout = gevent.Timeout(SHOULD_EXPIRE, object()) timeout.start() try: get_hub().switch() self.fail("Most raise TypeError") except TypeError as ex: self.assert_type_err(ex) timeout.close() class OldStyle: pass timeout = gevent.Timeout(SHOULD_EXPIRE, OldStyle) # Type timeout.start() try: get_hub().switch() self.fail("Must raise OldStyle") except TypeError as ex: self.assertTrue(greentest.PY3, "Py3 raises a TypeError for non-BaseExceptions") self.assert_type_err(ex) except: # pylint:disable=bare-except self.assertTrue(greentest.PY2, "Old style classes can only be raised on Py2") t = sys.exc_info()[0] self.assertEqual(t, OldStyle) timeout.close() timeout = gevent.Timeout(SHOULD_EXPIRE, OldStyle()) # instance timeout.start() try: get_hub().switch() self.fail("Must raise OldStyle") except TypeError as ex: self.assertTrue(greentest.PY3, "Py3 raises a TypeError for non-BaseExceptions") self.assert_type_err(ex) except: # pylint:disable=bare-except self.assertTrue(greentest.PY2, "Old style classes can only be raised on Py2") t = sys.exc_info()[0] self.assertEqual(t, OldStyle) timeout.close() def _check_context_manager_expires(self, timeout, raises=True): try: with timeout: get_hub().switch() except gevent.Timeout as ex: if ex is not timeout: raise return ex if raises: self.fail("Must raise Timeout") def test_context_manager(self): timeout = gevent.Timeout(SHOULD_EXPIRE) self._check_context_manager_expires(timeout) def test_context_manager_false(self): # Suppress the exception timeout = gevent.Timeout(SHOULD_EXPIRE, False) self._check_context_manager_expires(timeout, raises=False) self.assertTrue(str(timeout).endswith('(silent)'), str(timeout)) def test_context_manager_str(self): timeout = gevent.Timeout(SHOULD_EXPIRE, 'XXX') ex = self._check_context_manager_expires(timeout) self.assertTrue(str(ex).endswith('XXX'), str(ex)) def test_cancel(self): timeout = gevent.Timeout(SHOULD_EXPIRE) timeout.start() timeout.cancel() gevent.sleep(SHOULD_NOT_EXPIRE) self.assertFalse(timeout.pending, timeout) timeout.close() @greentest.ignores_leakcheck def test_with_timeout(self): with self.assertRaises(gevent.Timeout): gevent.with_timeout(SHOULD_EXPIRE, gevent.sleep, SHOULD_NOT_EXPIRE) X = object() r = gevent.with_timeout(SHOULD_EXPIRE, gevent.sleep, SHOULD_NOT_EXPIRE, timeout_value=X) self.assertIs(r, X) r = gevent.with_timeout(SHOULD_NOT_EXPIRE, gevent.sleep, SHOULD_EXPIRE, timeout_value=X) self.assertIsNone(r) if __name__ == '__main__': greentest.main()