Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.137.175.83
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/task/3/cwd/srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/task/3/cwd/srv/modoboa/env/lib64/python3.5/site-packages/gevent/tests//test__core_fork.py
from __future__ import print_function
import gevent.monkey
gevent.monkey.patch_all()
import gevent
import os

import multiprocessing

hub = gevent.get_hub()
pid = os.getpid()
newpid = None


def on_fork():
    global newpid
    newpid = os.getpid()

fork_watcher = hub.loop.fork(ref=False)
fork_watcher.start(on_fork)


def run(q):
    # libev only calls fork callbacks at the beginning of
    # the loop; we use callbacks extensively so it takes *two*
    # calls to sleep (with a timer) to actually get wrapped
    # around to the beginning of the loop.
    gevent.sleep(0.01)
    gevent.sleep(0.01)
    q.put(newpid)


def test():
    # Use a thread to make us multi-threaded
    hub.threadpool.apply(lambda: None)
    # If the Queue is global, q.get() hangs on Windows; must pass as
    # an argument.
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=run, args=(q,))
    p.start()
    p.join()
    p_val = q.get()

    assert p_val is not None, "The fork watcher didn't run"
    assert p_val != pid

if __name__ == '__main__':
    # Must call for Windows to fork properly; the fork can't be in the top-level
    multiprocessing.freeze_support()
    # fork watchers weren't firing in multi-threading processes.
    # This test is designed to prove that they are.
    # However, it fails on Windows: The fork watcher never runs!
    # This makes perfect sense: on Windows, our patches to os.fork()
    # that call gevent.hub.reinit() don't get used; os.fork doesn't
    # exist and multiprocessing.Process uses the windows-specific _subprocess.CreateProcess()
    # to create a whole new process that has no relation to the current process;
    # that process then calls multiprocessing.forking.main() to do its work.
    # Since no state is shared, a fork watcher cannot exist in that process.
    test()

Anon7 - 2022
AnonSec Team