Server IP : 85.214.239.14 / Your IP : 3.15.29.209 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/ |
Upload File : |
from __future__ import print_function, absolute_import from gevent import monkey; monkey.patch_all(subprocess=True) import signal import sys import socket from time import sleep import gevent from gevent.server import StreamServer import gevent.testing as greentest from gevent.testing import util @greentest.skipOnLibuvOnCIOnPyPy("Timing issues sometimes lead to connection refused") class Test(util.TestServer): server = 'portforwarder.py' args = ['127.0.0.1:10011', '127.0.0.1:10012'] if sys.platform.startswith('win'): from subprocess import CREATE_NEW_PROCESS_GROUP # Must be in a new process group to use CTRL_C_EVENT, otherwise # we get killed too start_kwargs = {'creationflags': CREATE_NEW_PROCESS_GROUP} def after(self): if sys.platform == 'win32': self.assertIsNotNone(self.popen.poll()) else: self.assertEqual(self.popen.poll(), 0) def _run_all_tests(self): log = [] def handle(sock, _address): while True: data = sock.recv(1024) print('got %r' % data) if not data: break log.append(data) server = StreamServer(self.args[1], handle) server.start() try: conn = socket.create_connection(('127.0.0.1', 10011)) conn.sendall(b'msg1') sleep(0.1) # On Windows, SIGTERM actually abruptly terminates the process; # it can't be caught. However, CTRL_C_EVENT results in a KeyboardInterrupt # being raised, so we can shut down properly. self.popen.send_signal(getattr(signal, 'CTRL_C_EVENT', signal.SIGTERM)) sleep(0.1) conn.sendall(b'msg2') conn.close() with gevent.Timeout(2.1): self.popen.wait() finally: server.close() self.assertEqual([b'msg1', b'msg2'], log) if __name__ == '__main__': greentest.main()