Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.118.7.18
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib/python3.5/site-packages/gevent/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib/python3.5/site-packages/gevent/tests//test__threadpool.py
from __future__ import print_function

from time import time, sleep
import contextlib
import random
import weakref
import gc

import gevent.testing as greentest
import gevent.threadpool
from gevent.threadpool import ThreadPool
import gevent

from gevent.testing import ExpectedException
from gevent.testing import PYPY



# pylint:disable=too-many-ancestors


@contextlib.contextmanager
def disabled_gc():
    was_enabled = gc.isenabled()
    gc.disable()
    try:
        yield
    finally:
        if was_enabled:
            gc.enable()


class TestCase(greentest.TestCase):
    # These generally need more time
    __timeout__ = greentest.LARGE_TIMEOUT
    pool = None

    ClassUnderTest = ThreadPool
    def _FUT(self):
        return self.ClassUnderTest

    def _makeOne(self, size, increase=greentest.RUN_LEAKCHECKS):
        self.pool = pool = self._FUT()(size)
        if increase:
            # Max size to help eliminate false positives
            self.pool.size = size
        return pool

    def cleanup(self):
        pool = self.pool
        if pool is not None:
            kill = getattr(pool, 'kill', None) or getattr(pool, 'shutdown')
            kill()
            del kill
            del self.pool

            if greentest.RUN_LEAKCHECKS:
                # Each worker thread created a greenlet object and switched to it.
                # It's a custom subclass, but even if it's not, it appears that
                # the root greenlet for the new thread sticks around until there's a
                # gc. Simply calling 'getcurrent()' is enough to "leak" a greenlet.greenlet
                # and a weakref.
                for _ in range(3):
                    gc.collect()


class PoolBasicTests(TestCase):

    def test_execute_async(self):
        pool = self._makeOne(2)
        r = []
        first = pool.spawn(r.append, 1)
        first.get()
        self.assertEqual(r, [1])
        gevent.sleep(0)

        pool.apply_async(r.append, (2, ))
        self.assertEqual(r, [1])

        pool.apply_async(r.append, (3, ))
        self.assertEqual(r, [1])

        pool.apply_async(r.append, (4, ))
        self.assertEqual(r, [1])
        gevent.sleep(0.01)
        self.assertEqualFlakyRaceCondition(sorted(r), [1, 2, 3, 4])

    def test_apply(self):
        pool = self._makeOne(1)
        result = pool.apply(lambda a: ('foo', a), (1, ))
        self.assertEqual(result, ('foo', 1))

    def test_apply_raises(self):
        pool = self._makeOne(1)

        def raiser():
            raise ExpectedException()

        with self.assertRaises(ExpectedException):
            pool.apply(raiser)
    # Don't let the metaclass automatically force any error
    # that reaches the hub from a spawned greenlet to become
    # fatal; that defeats the point of the test.
    test_apply_raises.error_fatal = False

    def test_init_valueerror(self):
        self.switch_expected = False
        with self.assertRaises(ValueError):
            self._makeOne(-1)

#
# tests from standard library test/test_multiprocessing.py


class TimingWrapper(object):

    def __init__(self, the_func):
        self.func = the_func
        self.elapsed = None

    def __call__(self, *args, **kwds):
        t = time()
        try:
            return self.func(*args, **kwds)
        finally:
            self.elapsed = time() - t


def sqr(x, wait=0.0):
    sleep(wait)
    return x * x


def sqr_random_sleep(x):
    sleep(random.random() * 0.1)
    return x * x


TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14

class _AbstractPoolTest(TestCase):

    size = 1

    MAP_IS_GEN = False

    def setUp(self):
        greentest.TestCase.setUp(self)
        self._makeOne(self.size)

    @greentest.ignores_leakcheck
    def test_map(self):
        pmap = self.pool.map
        if self.MAP_IS_GEN:
            pmap = lambda f, i: list(self.pool.map(f, i))
        self.assertEqual(pmap(sqr, range(10)), list(map(sqr, range(10))))
        self.assertEqual(pmap(sqr, range(100)), list(map(sqr, range(100))))

        self.pool.kill()
        del self.pool
        del pmap

SMALL_RANGE = 10
LARGE_RANGE = 1000

if (greentest.PYPY and (greentest.WIN or greentest.RUN_COVERAGE)) or greentest.RUN_LEAKCHECKS:
    # PyPy 5.10 is *really* slow at spawning or switching between
    # threads (especially on Windows or when coverage is enabled) Tests that happen
    # instantaneously on other platforms time out due to the overhead.

    # Leakchecks also take much longer due to all the calls into the GC,
    # most especially on Python 3
    LARGE_RANGE = 50

class TestPool(_AbstractPoolTest):

    def test_greenlet_class(self):
        from greenlet import getcurrent
        from gevent.threadpool import _WorkerGreenlet
        worker_greenlet = self.pool.apply(getcurrent)

        self.assertIsInstance(worker_greenlet, _WorkerGreenlet)
        r = repr(worker_greenlet)
        self.assertIn('ThreadPoolWorker', r)
        self.assertIn('thread_ident', r)
        self.assertIn('hub=', r)

        from gevent.util import format_run_info

        info = '\n'.join(format_run_info())
        self.assertIn("<ThreadPoolWorker", info)

    def test_apply(self):
        papply = self.pool.apply
        self.assertEqual(papply(sqr, (5,)), sqr(5))
        self.assertEqual(papply(sqr, (), {'x': 3}), sqr(x=3))

    def test_async(self):
        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
        get = TimingWrapper(res.get)
        self.assertEqual(get(), 49)
        self.assertTimeoutAlmostEqual(get.elapsed, TIMEOUT1, 1)

    def test_async_callback(self):
        result = []
        res = self.pool.apply_async(sqr, (7, TIMEOUT1,), callback=result.append)
        get = TimingWrapper(res.get)
        self.assertEqual(get(), 49)
        self.assertTimeoutAlmostEqual(get.elapsed, TIMEOUT1, 1)
        gevent.sleep(0)  # lets the callback run
        assert result == [49], result

    def test_async_timeout(self):
        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
        get = TimingWrapper(res.get)
        self.assertRaises(gevent.Timeout, get, timeout=TIMEOUT2)
        self.assertTimeoutAlmostEqual(get.elapsed, TIMEOUT2, 1)
        self.pool.join()

    def test_imap_list_small(self):
        it = self.pool.imap(sqr, range(SMALL_RANGE))
        self.assertEqual(list(it), list(map(sqr, range(SMALL_RANGE))))

    def test_imap_it_small(self):
        it = self.pool.imap(sqr, range(SMALL_RANGE))
        for i in range(SMALL_RANGE):
            self.assertEqual(next(it), i * i)
        self.assertRaises(StopIteration, next, it)

    def test_imap_it_large(self):
        it = self.pool.imap(sqr, range(LARGE_RANGE))
        for i in range(LARGE_RANGE):
            self.assertEqual(next(it), i * i)
        self.assertRaises(StopIteration, next, it)

    def test_imap_gc(self):
        it = self.pool.imap(sqr, range(SMALL_RANGE))
        for i in range(SMALL_RANGE):
            self.assertEqual(next(it), i * i)
            gc.collect()
        self.assertRaises(StopIteration, next, it)

    def test_imap_unordered_gc(self):
        it = self.pool.imap_unordered(sqr, range(SMALL_RANGE))
        result = []
        for _ in range(SMALL_RANGE):
            result.append(next(it))
            gc.collect()
        with self.assertRaises(StopIteration):
            next(it)
        self.assertEqual(sorted(result), [x * x for x in range(SMALL_RANGE)])

    def test_imap_random(self):
        it = self.pool.imap(sqr_random_sleep, range(SMALL_RANGE))
        self.assertEqual(list(it), list(map(sqr, range(SMALL_RANGE))))

    def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, range(LARGE_RANGE))
        self.assertEqual(sorted(it), list(map(sqr, range(LARGE_RANGE))))

        it = self.pool.imap_unordered(sqr, range(LARGE_RANGE))
        self.assertEqual(sorted(it), list(map(sqr, range(LARGE_RANGE))))

    def test_imap_unordered_random(self):
        it = self.pool.imap_unordered(sqr_random_sleep, range(SMALL_RANGE))
        self.assertEqual(sorted(it), list(map(sqr, range(SMALL_RANGE))))

    def test_terminate(self):
        size = self.size or 10
        result = self.pool.map_async(sleep, [0.1] * (size * 2))
        gevent.sleep(0.1)
        kill = TimingWrapper(self.pool.kill)
        kill()
        assert kill.elapsed < 0.1 * self.size + 0.5, kill.elapsed
        result.join()

    def sleep(self, x):
        sleep(float(x) / 10.0)
        return str(x)

    def test_imap_unordered_sleep(self):
        # testing that imap_unordered returns items in competion order
        result = list(self.pool.imap_unordered(self.sleep, [10, 1, 2]))
        if self.pool.size == 1:
            expected = ['10', '1', '2']
        else:
            expected = ['1', '2', '10']
        self.assertEqual(result, expected)


class TestPool2(TestPool):
    size = 2

    @greentest.ignores_leakcheck # Asking for the hub in the new thread shows up as a "leak"
    def test_recursive_apply(self):
        p = self.pool

        def a():
            return p.apply(b)

        def b():
            # make sure we can do both types of callbacks
            # (loop iteration and end-of-loop) in the recursive
            # call
            gevent.sleep()
            gevent.sleep(0.001)
            return "B"

        result = p.apply(a)
        self.assertEqual(result, "B")


@greentest.ignores_leakcheck
class TestPool3(TestPool):
    size = 3

@greentest.ignores_leakcheck
class TestPool10(TestPool):
    size = 10



# class TestJoinSleep(greentest.GenericGetTestCase):
#
#     def wait(self, timeout):
#         pool = ThreadPool(1)
#         pool.spawn(gevent.sleep, 10)
#         pool.join(timeout=timeout)
#
#
# class TestJoinSleep_raise_error(greentest.GenericWaitTestCase):
#
#     def wait(self, timeout):
#         pool = ThreadPool(1)
#         g = pool.spawn(gevent.sleep, 10)
#         pool.join(timeout=timeout, raise_error=True)


class TestJoinEmpty(TestCase):
    switch_expected = False

    @greentest.skipIf(greentest.PYPY and greentest.LIBUV and greentest.RUNNING_ON_TRAVIS,
                      "This sometimes appears to crash in PyPy2 5.9.0, "
                      "but never crashes on macOS or local Ubunto with same PyPy version")
    # Running this test standalone doesn't crash PyPy, only when it's run
    # as part of this whole file. Removing it does solve the crash though.
    def test(self):
        pool = self._makeOne(1)
        pool.join()


class TestSpawn(TestCase):
    switch_expected = True

    @greentest.ignores_leakcheck
    def test(self):
        pool = self._makeOne(1)
        self.assertEqual(len(pool), 0)
        log = []
        sleep_n_log = lambda item, seconds: [sleep(seconds), log.append(item)]
        pool.spawn(sleep_n_log, 'a', 0.1)
        self.assertEqual(len(pool), 1)
        pool.spawn(sleep_n_log, 'b', 0.1)
        # even though the pool is of size 1, it can contain 2 items
        # since we allow +1 for better throughput
        self.assertEqual(len(pool), 2)
        gevent.sleep(0.15)
        self.assertEqual(log, ['a'])
        self.assertEqual(len(pool), 1)
        gevent.sleep(0.15)
        self.assertEqual(log, ['a', 'b'])
        self.assertEqual(len(pool), 0)


def error_iter():
    yield 1
    yield 2
    raise greentest.ExpectedException


class TestErrorInIterator(TestCase):

    error_fatal = False

    def test(self):
        self.pool = self._makeOne(3)
        self.assertRaises(greentest.ExpectedException, self.pool.map, lambda x: None, error_iter())
        gevent.sleep(0.001)

    def test_unordered(self):
        self.pool = self._makeOne(3)

        def unordered():
            return list(self.pool.imap_unordered(lambda x: None, error_iter()))

        self.assertRaises(greentest.ExpectedException, unordered)
        gevent.sleep(0.001)


class TestMaxsize(TestCase):

    def test_inc(self):
        self.pool = self._makeOne(0)
        done = []
        # Try to be careful not to tick over the libuv timer.
        # See libuv/loop.py:_start_callback_timer
        gevent.spawn(self.pool.spawn, done.append, 1)
        gevent.spawn_later(0.01, self.pool.spawn, done.append, 2)
        gevent.sleep(0.02)
        self.assertEqual(done, [])
        self.pool.maxsize = 1
        gevent.sleep(0.02)

        self.assertEqualFlakyRaceCondition(done, [1, 2])

    @greentest.ignores_leakcheck
    def test_setzero(self):
        pool = self.pool = self._makeOne(3)
        pool.spawn(sleep, 0.1)
        pool.spawn(sleep, 0.2)
        pool.spawn(sleep, 0.3)
        gevent.sleep(0.2)
        self.assertEqual(pool.size, 3)
        pool.maxsize = 0
        gevent.sleep(0.2)
        self.assertEqualFlakyRaceCondition(pool.size, 0)


class TestSize(TestCase):

    @greentest.reraises_flaky_race_condition()
    def test(self):
        pool = self.pool = self._makeOne(2, increase=False)
        self.assertEqual(pool.size, 0)
        pool.size = 1
        self.assertEqual(pool.size, 1)
        pool.size = 2
        self.assertEqual(pool.size, 2)
        pool.size = 1
        self.assertEqual(pool.size, 1)

        with self.assertRaises(ValueError):
            pool.size = -1

        with self.assertRaises(ValueError):
            pool.size = 3

        pool.size = 0
        self.assertEqual(pool.size, 0)
        pool.size = 2
        self.assertEqual(pool.size, 2)


class TestRef(TestCase):

    def test(self):
        pool = self.pool = self._makeOne(2)

        refs = []
        obj = SomeClass()
        obj.refs = refs
        func = obj.func
        del obj

        with disabled_gc():
            # we do this:
            #     result = func(Object(), kwarg1=Object())
            # but in a thread pool and see that arguments', result's and func's references are not leaked
            result = pool.apply(func, (Object(), ), {'kwarg1': Object()})
            assert isinstance(result, Object), repr(result)
            gevent.sleep(0.1)  # XXX should not be needed

            refs.append(weakref.ref(func))
            del func, result
            if PYPY:
                gc.collect()
                gc.collect()
            for r in refs:
                self.assertIsNone(r())

            self.assertEqual(4, len(refs))


class Object(object):
    pass


class SomeClass(object):

    refs = None

    def func(self, arg1, kwarg1=None):
        result = Object()
        self.refs.extend([weakref.ref(x) for x in [arg1, kwarg1, result]])
        return result


def noop():
    pass


class TestRefCount(TestCase):

    def test(self):
        pool = self._makeOne(1)
        pool.spawn(noop)
        gevent.sleep(0)
        pool.kill()

if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):

    from concurrent.futures import TimeoutError as FutureTimeoutError
    from concurrent.futures import wait as cf_wait
    from concurrent.futures import as_completed as cf_as_completed

    from gevent import monkey

    class TestTPE(_AbstractPoolTest):
        size = 1

        MAP_IS_GEN = True

        ClassUnderTest = gevent.threadpool.ThreadPoolExecutor

        MONKEY_PATCHED = False

        @greentest.ignores_leakcheck
        def test_future(self):
            self.assertEqual(monkey.is_module_patched('threading'),
                             self.MONKEY_PATCHED)
            pool = self.pool

            calledback = []

            def fn():
                gevent.sleep(0.5)
                return 42

            def callback(future):
                future.calledback += 1
                raise greentest.ExpectedException("Expected, ignored")

            future = pool.submit(fn) # pylint:disable=no-member
            future.calledback = 0
            future.add_done_callback(callback)
            self.assertRaises(FutureTimeoutError, future.result, timeout=0.001)

            def spawned():
                return 2016

            spawned_greenlet = gevent.spawn(spawned)

            # Whether or not we are monkey patched, the background
            # greenlet we spawned got to run while we waited.

            self.assertEqual(future.result(), 42)
            self.assertTrue(future.done())
            self.assertFalse(future.cancelled())
            # Make sure the notifier has a chance to run so the call back
            # gets called
            gevent.sleep()
            self.assertEqual(future.calledback, 1)

            self.assertTrue(spawned_greenlet.ready())
            self.assertEqual(spawned_greenlet.value, 2016)

            # Adding the callback again runs immediately
            future.add_done_callback(lambda f: calledback.append(True))
            self.assertEqual(calledback, [True])

            # We can wait on the finished future
            done, _not_done = cf_wait((future,))
            self.assertEqual(list(done), [future])

            self.assertEqual(list(cf_as_completed((future,))), [future])
            # Doing so does not call the callback again
            self.assertEqual(future.calledback, 1)
            # even after a trip around the event loop
            gevent.sleep()
            self.assertEqual(future.calledback, 1)

            pool.kill()
            del future
            del pool
            del self.pool

        @greentest.ignores_leakcheck
        def test_future_wait_module_function(self):
            # Instead of waiting on the result, we can wait
            # on the future using the module functions
            self.assertEqual(monkey.is_module_patched('threading'),
                             self.MONKEY_PATCHED)
            pool = self.pool

            def fn():
                gevent.sleep(0.5)
                return 42

            future = pool.submit(fn) # pylint:disable=no-member
            if self.MONKEY_PATCHED:
                # Things work as expected when monkey-patched
                _done, not_done = cf_wait((future,), timeout=0.001)
                self.assertEqual(list(not_done), [future])

                def spawned():
                    return 2016

                spawned_greenlet = gevent.spawn(spawned)

                done, _not_done = cf_wait((future,))
                self.assertEqual(list(done), [future])
                self.assertTrue(spawned_greenlet.ready())
                self.assertEqual(spawned_greenlet.value, 2016)
            else:
                # When not monkey-patched, raises an AttributeError
                self.assertRaises(AttributeError, cf_wait, (future,))

            pool.kill()
            del future
            del pool
            del self.pool

        @greentest.ignores_leakcheck
        def test_future_wait_gevent_function(self):
            # The future object can be waited on with gevent functions.
            self.assertEqual(monkey.is_module_patched('threading'),
                             self.MONKEY_PATCHED)
            pool = self.pool

            def fn():
                gevent.sleep(0.5)
                return 42

            future = pool.submit(fn) # pylint:disable=no-member

            def spawned():
                return 2016

            spawned_greenlet = gevent.spawn(spawned)

            done = gevent.wait((future,))
            self.assertEqual(list(done), [future])
            self.assertTrue(spawned_greenlet.ready())
            self.assertEqual(spawned_greenlet.value, 2016)

            pool.kill()
            del future
            del pool
            del self.pool


if __name__ == '__main__':
    greentest.main()

Anon7 - 2022
AnonSec Team