Server IP : 85.214.239.14 / Your IP : 3.139.235.100 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/self/root/srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/ |
Upload File : |
import gevent from gevent import socket from gevent import server import gevent.testing as greentest # XXX also test: send, sendall, recvfrom, recvfrom_into, sendto def readall(sock, _): while sock.recv(1024): pass # pragma: no cover we never actually send the data sock.close() class Test(greentest.TestCase): error_fatal = False def setUp(self): self.server = server.StreamServer(('127.0.0.1', 0), readall) self.server.start() def tearDown(self): self.server.stop() def test_recv_closed(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(('localhost', self.server.server_port)) receiver = gevent.spawn(sock.recv, 25) try: gevent.sleep(0.001) sock.close() receiver.join(timeout=0.1) self.assertTrue(receiver.ready(), receiver) self.assertEqual(receiver.value, None) self.assertIsInstance(receiver.exception, socket.error) self.assertEqual(receiver.exception.errno, socket.EBADF) finally: receiver.kill() # XXX: This is possibly due to the bad behaviour of small sleeps? # The timeout is the global test timeout, 10s @greentest.skipOnLibuvOnCI("Sometimes randomly times out") def test_recv_twice(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(('localhost', self.server.server_port)) receiver = gevent.spawn(sock.recv, 25) try: gevent.sleep(0.001) self.assertRaises(AssertionError, sock.recv, 25) self.assertRaises(AssertionError, sock.recv, 25) finally: receiver.kill() sock.close() if __name__ == '__main__': greentest.main()