Server IP : 85.214.239.14 / Your IP : 3.145.12.195 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/supervisor/tests/ |
Upload File : |
# -*- coding: utf-8 -*- import unittest import sys import operator import os import time import errno from supervisor.tests.base import DummyOptions from supervisor.tests.base import DummySupervisor from supervisor.tests.base import DummyProcess from supervisor.tests.base import DummyPConfig from supervisor.tests.base import DummyPGroupConfig from supervisor.tests.base import DummyProcessGroup from supervisor.tests.base import PopulatedDummySupervisor from supervisor.tests.base import _NOW from supervisor.tests.base import _TIMEFORMAT from supervisor.compat import as_string, PY2 from supervisor.datatypes import Automatic class TestBase(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def _assertRPCError(self, code, callable, *args, **kw): from supervisor import xmlrpc try: callable(*args, **kw) except xmlrpc.RPCError as inst: self.assertEqual(inst.code, code) else: raise AssertionError("Didn't raise") class MainXMLRPCInterfaceTests(TestBase): def _getTargetClass(self): from supervisor import xmlrpc return xmlrpc.RootRPCInterface def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) def test_ctor(self): interface = self._makeOne([('supervisor', None)]) self.assertEqual(interface.supervisor, None) def test_traverse(self): dummy = DummyRPCInterface() interface = self._makeOne([('dummy', dummy)]) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.UNKNOWN_METHOD, xmlrpc.traverse, interface, 'notthere.hello', []) self._assertRPCError(xmlrpc.Faults.UNKNOWN_METHOD, xmlrpc.traverse, interface, 'supervisor._readFile', []) self._assertRPCError(xmlrpc.Faults.INCORRECT_PARAMETERS, xmlrpc.traverse, interface, 'dummy.hello', [1]) self.assertEqual(xmlrpc.traverse( interface, 'dummy.hello', []), 'Hello!') class SupervisorNamespaceXMLRPCInterfaceTests(TestBase): def _getTargetClass(self): from supervisor import rpcinterface return rpcinterface.SupervisorNamespaceRPCInterface def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) def test_update(self): from supervisor import xmlrpc from supervisor.supervisord import SupervisorStates supervisord = DummySupervisor() interface = self._makeOne(supervisord) interface._update('foo') self.assertEqual(interface.update_text, 'foo') supervisord.options.mood = SupervisorStates.SHUTDOWN self._assertRPCError(xmlrpc.Faults.SHUTDOWN_STATE, interface._update, 'foo') def test_getAPIVersion(self): from supervisor import rpcinterface supervisord = DummySupervisor() interface = self._makeOne(supervisord) version = interface.getAPIVersion() self.assertEqual(version, rpcinterface.API_VERSION) self.assertEqual(interface.update_text, 'getAPIVersion') def test_getAPIVersion_aliased_to_deprecated_getVersion(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) self.assertEqual(interface.getAPIVersion, interface.getVersion) def test_getSupervisorVersion(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) version = interface.getSupervisorVersion() from supervisor import options self.assertEqual(version, options.VERSION) self.assertEqual(interface.update_text, 'getSupervisorVersion') def test_getIdentification(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) identifier = interface.getIdentification() self.assertEqual(identifier, supervisord.options.identifier) self.assertEqual(interface.update_text, 'getIdentification') def test_getState(self): from supervisor.states import getSupervisorStateDescription supervisord = DummySupervisor() interface = self._makeOne(supervisord) stateinfo = interface.getState() statecode = supervisord.options.mood statename = getSupervisorStateDescription(statecode) self.assertEqual(stateinfo['statecode'], statecode) self.assertEqual(stateinfo['statename'], statename) self.assertEqual(interface.update_text, 'getState') def test_getPID(self): options = DummyOptions() supervisord = DummySupervisor(options) interface = self._makeOne(supervisord) self.assertEqual(interface.getPID(), options.get_pid()) self.assertEqual(interface.update_text, 'getPID') def test_readLog_aliased_to_deprecated_readMainLog(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) self.assertEqual(interface.readMainLog, interface.readLog) def test_readLog_unreadable(self): from supervisor import xmlrpc supervisord = DummySupervisor() interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.NO_FILE, interface.readLog, offset=0, length=1) def test_readLog_badargs(self): from supervisor import xmlrpc supervisord = DummySupervisor() interface = self._makeOne(supervisord) try: logfile = supervisord.options.logfile with open(logfile, 'w+') as f: f.write('x' * 2048) self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS, interface.readLog, offset=-1, length=1) self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS, interface.readLog, offset=-1, length=-1) finally: os.remove(logfile) def test_readLog(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) logfile = supervisord.options.logfile try: with open(logfile, 'w+') as f: f.write('x' * 2048) f.write('y' * 2048) data = interface.readLog(offset=0, length=0) self.assertEqual(interface.update_text, 'readLog') self.assertEqual(data, ('x' * 2048) + ('y' * 2048)) data = interface.readLog(offset=2048, length=0) self.assertEqual(data, 'y' * 2048) data = interface.readLog(offset=0, length=2048) self.assertEqual(data, 'x' * 2048) data = interface.readLog(offset=-4, length=0) self.assertEqual(data, 'y' * 4) finally: os.remove(logfile) def test_clearLog_unreadable(self): from supervisor import xmlrpc supervisord = DummySupervisor() interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.NO_FILE, interface.clearLog) def test_clearLog_unremoveable(self): from supervisor import xmlrpc options = DummyOptions() options.existing = [options.logfile] options.remove_exception = OSError(errno.EPERM, os.strerror(errno.EPERM)) supervisord = DummySupervisor(options) interface = self._makeOne(supervisord) self.assertRaises(xmlrpc.RPCError, interface.clearLog) self.assertEqual(interface.update_text, 'clearLog') def test_clearLog(self): options = DummyOptions() options.existing = [options.logfile] supervisord = DummySupervisor(options) interface = self._makeOne(supervisord) result = interface.clearLog() self.assertEqual(interface.update_text, 'clearLog') self.assertEqual(result, True) self.assertEqual(options.removed[0], options.logfile) for handler in supervisord.options.logger.handlers: self.assertEqual(handler.reopened, True) def test_shutdown(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) value = interface.shutdown() self.assertEqual(value, True) self.assertEqual(supervisord.options.mood, -1) def test_restart(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) value = interface.restart() self.assertEqual(value, True) self.assertEqual(supervisord.options.mood, 0) def test_reloadConfig(self): options = DummyOptions() supervisord = DummySupervisor(options) interface = self._makeOne(supervisord) changes = [ [DummyPGroupConfig(options, 'added')], [DummyPGroupConfig(options, 'changed')], [DummyPGroupConfig(options, 'dropped')] ] supervisord.diff_to_active = lambda : changes value = interface.reloadConfig() self.assertEqual(value, [[['added'], ['changed'], ['dropped']]]) def test_reloadConfig_process_config_raises_ValueError(self): from supervisor import xmlrpc options = DummyOptions() def raise_exc(*arg, **kw): raise ValueError('foo') options.process_config = raise_exc supervisord = DummySupervisor(options) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.CANT_REREAD, interface.reloadConfig) def test_addProcessGroup(self): from supervisor.supervisord import Supervisor from supervisor import xmlrpc options = DummyOptions() supervisord = Supervisor(options) pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) gconfig = DummyPGroupConfig(options, 'group1', pconfigs=[pconfig]) supervisord.options.process_group_configs = [gconfig] interface = self._makeOne(supervisord) result = interface.addProcessGroup('group1') self.assertTrue(result) self.assertEqual(list(supervisord.process_groups.keys()), ['group1']) self._assertRPCError(xmlrpc.Faults.ALREADY_ADDED, interface.addProcessGroup, 'group1') self.assertEqual(list(supervisord.process_groups.keys()), ['group1']) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.addProcessGroup, 'asdf') self.assertEqual(list(supervisord.process_groups.keys()), ['group1']) def test_removeProcessGroup(self): from supervisor.supervisord import Supervisor options = DummyOptions() supervisord = Supervisor(options) pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) gconfig = DummyPGroupConfig(options, 'group1', pconfigs=[pconfig]) supervisord.options.process_group_configs = [gconfig] interface = self._makeOne(supervisord) interface.addProcessGroup('group1') result = interface.removeProcessGroup('group1') self.assertTrue(result) self.assertEqual(list(supervisord.process_groups.keys()), []) def test_removeProcessGroup_bad_name(self): from supervisor.supervisord import Supervisor from supervisor import xmlrpc options = DummyOptions() supervisord = Supervisor(options) pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) gconfig = DummyPGroupConfig(options, 'group1', pconfigs=[pconfig]) supervisord.options.process_group_configs = [gconfig] interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.removeProcessGroup, 'asdf') def test_removeProcessGroup_still_running(self): from supervisor.supervisord import Supervisor from supervisor import xmlrpc options = DummyOptions() supervisord = Supervisor(options) pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) gconfig = DummyPGroupConfig(options, 'group1', pconfigs=[pconfig]) supervisord.options.process_group_configs = [gconfig] process = DummyProcessGroup(gconfig) process.unstopped_processes = [123] supervisord.process_groups = {'group1':process} interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.STILL_RUNNING, interface.removeProcessGroup, 'group1') def test_startProcess_already_started(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'pid', 10) interface = self._makeOne(supervisord) self._assertRPCError( xmlrpc.Faults.ALREADY_STARTED, interface.startProcess, 'foo' ) def test_startProcess_unknown_state(self): from supervisor import xmlrpc from supervisor.states import ProcessStates options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'pid', 10) supervisord.set_procattr('foo', 'state', ProcessStates.UNKNOWN) interface = self._makeOne(supervisord) self._assertRPCError( xmlrpc.Faults.FAILED, interface.startProcess, 'foo' ) process = supervisord.process_groups['foo'].processes['foo'] self.assertEqual(process.spawned, False) def test_startProcess_bad_group_name(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) supervisord = PopulatedDummySupervisor(options, 'group1', pconfig) interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.startProcess, 'group2:foo') def test_startProcess_bad_process_name(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) supervisord = PopulatedDummySupervisor(options, 'group1', pconfig) interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.startProcess, 'group1:bar') def test_startProcess_file_not_found(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/foo/bar', autostart=False) from supervisor.options import NotFound supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) process = supervisord.process_groups['foo'].processes['foo'] process.execv_arg_exception = NotFound interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.NO_FILE, interface.startProcess, 'foo') def test_startProcess_bad_command(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/foo/bar', autostart=False) from supervisor.options import BadCommand supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) process = supervisord.process_groups['foo'].processes['foo'] process.execv_arg_exception = BadCommand interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.NOT_EXECUTABLE, interface.startProcess, 'foo') def test_startProcess_file_not_executable(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/foo/bar', autostart=False) from supervisor.options import NotExecutable supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) process = supervisord.process_groups['foo'].processes['foo'] process.execv_arg_exception = NotExecutable interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.NOT_EXECUTABLE, interface.startProcess, 'foo') def test_startProcess_spawnerr(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) process = supervisord.process_groups['foo'].processes['foo'] process.spawnerr = 'abc' interface = self._makeOne(supervisord) self._assertRPCError( xmlrpc.Faults.SPAWN_ERROR, interface.startProcess, 'foo' ) def test_startProcess(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False, startsecs=.01) from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) result = interface.startProcess('foo') process = supervisord.process_groups['foo'].processes['foo'] self.assertEqual(process.spawned, True) self.assertEqual(interface.update_text, 'startProcess') process.state = ProcessStates.RUNNING self.assertEqual(result, True) def test_startProcess_spawnerr_in_onwait(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) process = supervisord.process_groups['foo'].processes['foo'] def spawn(): process.spawned = True process.state = ProcessStates.STARTING def transition(): process.spawnerr = 'abc' process.spawn = spawn process.transition = transition interface = self._makeOne(supervisord) callback = interface.startProcess('foo') self._assertRPCError(xmlrpc.Faults.SPAWN_ERROR, callback) def test_startProcess_success_in_onwait(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) process = supervisord.process_groups['foo'].processes['foo'] def spawn(): process.spawned = True process.state = ProcessStates.STARTING process.spawn = spawn interface = self._makeOne(supervisord) callback = interface.startProcess('foo') process.state = ProcessStates.RUNNING self.assertEqual(callback(), True) def test_startProcess_nowait(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) result = interface.startProcess('foo', wait=False) self.assertEqual(result, True) process = supervisord.process_groups['foo'].processes['foo'] self.assertEqual(process.spawned, True) self.assertEqual(interface.update_text, 'startProcess') def test_startProcess_abnormal_term_process_not_running(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False) from supervisor.process import ProcessStates from supervisor import http supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] def spawn(): process.spawned = True process.state = ProcessStates.STARTING process.spawn = spawn callback = interface.startProcess('foo', 100) # milliseconds result = callback() self.assertEqual(result, http.NOT_DONE_YET) self.assertEqual(process.spawned, True) self.assertEqual(interface.update_text, 'startProcess') process.state = ProcessStates.BACKOFF from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.ABNORMAL_TERMINATION, callback) def test_startProcess_splat_calls_startProcessGroup(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', __file__, autostart=False, startsecs=.01) pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2, startsecs=.01) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) from supervisor.process import ProcessStates supervisord.set_procattr('process1', 'state', ProcessStates.STOPPED) supervisord.set_procattr('process2', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) interface.startProcess('foo:*') self.assertEqual(interface.update_text, 'startProcessGroup') def test_startProcessGroup(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', __file__, priority=1, startsecs=.01) pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2, startsecs=.01) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) from supervisor.process import ProcessStates from supervisor.xmlrpc import Faults supervisord.set_procattr('process1', 'state', ProcessStates.STOPPED) supervisord.set_procattr('process2', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) callback = interface.startProcessGroup('foo') self.assertEqual( callback(), [{'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process1'}, {'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process2'} ] ) self.assertEqual(interface.update_text, 'startProcess') process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.spawned, True) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.spawned, True) def test_startProcessGroup_nowait(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', __file__, priority=1, startsecs=.01) pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2, startsecs=.01) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) from supervisor.process import ProcessStates supervisord.set_procattr('process1', 'state', ProcessStates.STOPPED) supervisord.set_procattr('process2', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) callback = interface.startProcessGroup('foo', wait=False) from supervisor.xmlrpc import Faults self.assertEqual( callback(), [{'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process1'}, {'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process2'} ] ) def test_startProcessGroup_badname(self): from supervisor import xmlrpc supervisord = DummySupervisor() interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.startProcessGroup, 'foo') def test_startAllProcesses(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', __file__, priority=1, startsecs=.01) pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2, startsecs=.01) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) from supervisor.process import ProcessStates supervisord.set_procattr('process1', 'state', ProcessStates.STOPPED) supervisord.set_procattr('process2', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) callback = interface.startAllProcesses() from supervisor.xmlrpc import Faults self.assertEqual( callback(), [{'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process1'}, {'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process2'} ] ) self.assertEqual(interface.update_text, 'startProcess') process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.spawned, True) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.spawned, True) def test_startAllProcesses_nowait(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', __file__, priority=1, startsecs=.01) pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2, startsecs=.01) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) from supervisor.process import ProcessStates supervisord.set_procattr('process1', 'state', ProcessStates.STOPPED) supervisord.set_procattr('process2', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) callback = interface.startAllProcesses(wait=False) from supervisor.xmlrpc import Faults self.assertEqual( callback(), [{'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process1'}, {'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process2'} ] ) def test_stopProcess(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) result = interface.stopProcess('foo') self.assertTrue(result) self.assertEqual(interface.update_text, 'stopProcess') process = supervisord.process_groups['foo'].processes['foo'] self.assertEqual(process.backoff, 0) self.assertEqual(process.delay, 0) self.assertFalse(process.killing) self.assertEqual(process.state, ProcessStates.STOPPED) self.assertTrue(process.stop_report_called) self.assertEqual(len(supervisord.process_groups['foo'].processes), 1) self.assertEqual(interface.update_text, 'stopProcess') def test_stopProcess_nowait(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__) from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) result = interface.stopProcess('foo', wait=False) self.assertEqual(result, True) process = supervisord.process_groups['foo'].processes['foo'] self.assertEqual(process.stop_called, True) self.assertTrue(process.stop_report_called) self.assertEqual(interface.update_text, 'stopProcess') def test_stopProcess_success_in_onwait(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) process = supervisord.process_groups['foo'].processes['foo'] L = [ ProcessStates.RUNNING, ProcessStates.STOPPING, ProcessStates.STOPPED ] def get_state(): return L.pop(0) process.get_state = get_state interface = self._makeOne(supervisord) callback = interface.stopProcess('foo') self.assertEqual(interface.update_text, 'stopProcess') self.assertTrue(callback()) def test_stopProcess_NDY_in_onwait(self): from supervisor.http import NOT_DONE_YET options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) process = supervisord.process_groups['foo'].processes['foo'] L = [ ProcessStates.RUNNING, ProcessStates.STOPPING, ProcessStates.STOPPING ] def get_state(): return L.pop(0) process.get_state = get_state interface = self._makeOne(supervisord) callback = interface.stopProcess('foo') self.assertEqual(callback(), NOT_DONE_YET) self.assertEqual(interface.update_text, 'stopProcess') def test_stopProcess_bad_name(self): from supervisor.xmlrpc import Faults supervisord = DummySupervisor() interface = self._makeOne(supervisord) self._assertRPCError(Faults.BAD_NAME, interface.stopProcess, 'foo') def test_stopProcess_not_running(self): from supervisor.states import ProcessStates from supervisor.xmlrpc import Faults options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.EXITED) interface = self._makeOne(supervisord) self._assertRPCError(Faults.NOT_RUNNING, interface.stopProcess, 'foo') def test_stopProcess_failed(self): from supervisor.xmlrpc import Faults options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'stop', lambda: 'unstoppable') interface = self._makeOne(supervisord) self._assertRPCError(Faults.FAILED, interface.stopProcess, 'foo') def test_stopProcessGroup(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', '/bin/foo', priority=1) pconfig2 = DummyPConfig(options, 'process2', '/bin/foo2', priority=2) from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) supervisord.set_procattr('process2', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) callback = interface.stopProcessGroup('foo') self.assertEqual(interface.update_text, 'stopProcessGroup') from supervisor import http value = http.NOT_DONE_YET while 1: value = callback() if value is not http.NOT_DONE_YET: break from supervisor.xmlrpc import Faults self.assertEqual(value, [ {'status': Faults.SUCCESS, 'group':'foo', 'name': 'process1', 'description': 'OK'}, {'status': Faults.SUCCESS, 'group':'foo', 'name': 'process2', 'description': 'OK'}, ] ) process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.stop_called, True) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.stop_called, True) def test_stopProcessGroup_nowait(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', __file__, priority=1) pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) from supervisor.process import ProcessStates supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) supervisord.set_procattr('process2', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) callback = interface.stopProcessGroup('foo', wait=False) from supervisor.xmlrpc import Faults self.assertEqual( callback(), [ {'name': 'process1', 'description': 'OK', 'group': 'foo', 'status': Faults.SUCCESS}, {'name': 'process2', 'description': 'OK', 'group': 'foo', 'status': Faults.SUCCESS} ] ) def test_stopProcessGroup_badname(self): from supervisor import xmlrpc supervisord = DummySupervisor() interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.stopProcessGroup, 'foo') def test_stopProcess_splat_calls_stopProcessGroup(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', __file__, autostart=False, startsecs=.01) pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2, startsecs=.01) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) from supervisor.process import ProcessStates supervisord.set_procattr('process1', 'state', ProcessStates.STOPPED) supervisord.set_procattr('process2', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) interface.stopProcess('foo:*') self.assertEqual(interface.update_text, 'stopProcessGroup') def test_stopAllProcesses(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', '/bin/foo', priority=1) pconfig2 = DummyPConfig(options, 'process2', '/bin/foo2', priority=2) from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) supervisord.set_procattr('process2', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) callback = interface.stopAllProcesses() self.assertEqual(interface.update_text, 'stopAllProcesses') from supervisor import http value = http.NOT_DONE_YET while 1: value = callback() if value is not http.NOT_DONE_YET: break from supervisor.xmlrpc import Faults self.assertEqual(value, [ {'status': Faults.SUCCESS, 'group':'foo', 'name': 'process1', 'description': 'OK'}, {'status': Faults.SUCCESS, 'group':'foo', 'name': 'process2', 'description': 'OK'}, ] ) process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.stop_called, True) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.stop_called, True) def test_stopAllProcesses_nowait(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', __file__, priority=1) pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) from supervisor.process import ProcessStates supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) supervisord.set_procattr('process2', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) callback = interface.stopAllProcesses(wait=False) from supervisor.xmlrpc import Faults self.assertEqual( callback(), [{'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process1'}, {'group': 'foo', 'status': Faults.SUCCESS, 'description': 'OK', 'name': 'process2'} ] ) def test_signalProcess_with_signal_number(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) result = interface.signalProcess('foo', 10) self.assertEqual(interface.update_text, 'signalProcess') self.assertEqual(result, True) p = supervisord.process_groups[supervisord.group_name].processes['foo'] self.assertEqual(p.sent_signal, 10) def test_signalProcess_with_signal_name(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.RUNNING) from supervisor.datatypes import signal_number signame = 'quit' signum = signal_number(signame) interface = self._makeOne(supervisord) result = interface.signalProcess('foo', signame) self.assertEqual(interface.update_text, 'signalProcess') self.assertEqual(result, True) p = supervisord.process_groups[supervisord.group_name].processes['foo'] self.assertEqual(p.sent_signal, signum) def test_signalProcess_stopping(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPING) interface = self._makeOne(supervisord) result = interface.signalProcess('foo', 10) self.assertEqual(interface.update_text, 'signalProcess') self.assertEqual(result, True) p = supervisord.process_groups[supervisord.group_name].processes['foo'] self.assertEqual(p.sent_signal, 10) def test_signalProcess_badsignal(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') from supervisor.process import ProcessStates from supervisor import xmlrpc supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) self._assertRPCError( xmlrpc.Faults.BAD_SIGNAL, interface.signalProcess, 'foo', 155 ) def test_signalProcess_notrunning(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') from supervisor.process import ProcessStates from supervisor import xmlrpc supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) self._assertRPCError( xmlrpc.Faults.NOT_RUNNING, interface.signalProcess, 'foo', 10 ) def test_signalProcess_signal_returns_message(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') from supervisor.process import ProcessStates from supervisor import xmlrpc supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.RUNNING) def signalreturn(sig): return 'msg' pgroup = supervisord.process_groups[supervisord.group_name] proc = pgroup.processes['foo'] proc.signal = signalreturn interface = self._makeOne(supervisord) self._assertRPCError( xmlrpc.Faults.FAILED, interface.signalProcess, 'foo', 10 ) def test_signalProcess_withgroup(self): """ Test that sending foo:* works """ options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', '/bin/foo') pconfig2 = DummyPConfig(options, 'process2', '/bin/foo2') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) supervisord.set_procattr('process2', 'state', ProcessStates.STOPPING) interface = self._makeOne(supervisord) result = interface.signalProcess('foo:*', 10) self.assertEqual(interface.update_text, 'signalProcessGroup') # Sort so we get deterministic results despite hash randomization result = sorted(result, key=operator.itemgetter('name')) from supervisor.xmlrpc import Faults self.assertEqual(result, [ {'status': Faults.SUCCESS, 'group': 'foo', 'name': 'process1', 'description': 'OK'}, {'status': Faults.SUCCESS, 'group':'foo', 'name': 'process2', 'description': 'OK'}, ] ) process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.sent_signal, 10) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.sent_signal, 10) def test_signalProcessGroup_with_signal_number(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', '/bin/foo') pconfig2 = DummyPConfig(options, 'process2', '/bin/foo2') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) supervisord.set_procattr('process2', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) result = interface.signalProcessGroup('foo', 10) self.assertEqual(interface.update_text, 'signalProcessGroup') # Sort so we get deterministic results despite hash randomization result = sorted(result, key=operator.itemgetter('name')) from supervisor.xmlrpc import Faults self.assertEqual(result, [ {'status': Faults.SUCCESS, 'group': 'foo', 'name': 'process1', 'description': 'OK'}, {'status': Faults.SUCCESS, 'group': 'foo', 'name': 'process2', 'description': 'OK'}, ] ) process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.sent_signal, 10) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.sent_signal, 10) def test_signalProcessGroup_with_signal_name(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', '/bin/foo') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1) supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) from supervisor.datatypes import signal_number signame = 'term' signum = signal_number(signame) interface = self._makeOne(supervisord) result = interface.signalProcessGroup('foo', signame) self.assertEqual(interface.update_text, 'signalProcessGroup') from supervisor.xmlrpc import Faults self.assertEqual(result, [ {'status': Faults.SUCCESS, 'group': 'foo', 'name': 'process1', 'description': 'OK'}, ] ) process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.sent_signal, signum) def test_signalProcessGroup_nosuchgroup(self): from supervisor import xmlrpc options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', '/bin/foo') pconfig2 = DummyPConfig(options, 'process2', '/bin/foo2') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) supervisord.set_procattr('process2', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.signalProcessGroup, 'bar', 10 ) def test_signalAllProcesses_with_signal_number(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', '/bin/foo') pconfig2 = DummyPConfig(options, 'process2', '/bin/foo2') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) supervisord.set_procattr('process2', 'state', ProcessStates.RUNNING) interface = self._makeOne(supervisord) result = interface.signalAllProcesses(10) self.assertEqual(interface.update_text, 'signalAllProcesses') # Sort so we get deterministic results despite hash randomization result = sorted(result, key=operator.itemgetter('name')) from supervisor.xmlrpc import Faults self.assertEqual(result, [ {'status': Faults.SUCCESS, 'group': 'foo', 'name': 'process1', 'description': 'OK'}, {'status': Faults.SUCCESS, 'group': 'foo', 'name': 'process2', 'description': 'OK'}, ] ) process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.sent_signal, 10) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.sent_signal, 10) def test_signalAllProcesses_with_signal_name(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', '/bin/foo') from supervisor.process import ProcessStates supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1) supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) from supervisor.datatypes import signal_number signame = 'hup' signum = signal_number(signame) interface = self._makeOne(supervisord) result = interface.signalAllProcesses(signame) self.assertEqual(interface.update_text, 'signalAllProcesses') from supervisor.xmlrpc import Faults self.assertEqual(result, [ {'status': Faults.SUCCESS, 'group': 'foo', 'name': 'process1', 'description': 'OK'}, ] ) process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.sent_signal, signum) def test_getAllConfigInfo(self): options = DummyOptions() supervisord = DummySupervisor(options, 'foo') pconfig1 = DummyPConfig(options, 'process1', __file__, stdout_logfile=Automatic, stderr_logfile=Automatic, ) pconfig2 = DummyPConfig(options, 'process2', __file__, stdout_logfile=None, stderr_logfile=None, ) gconfig = DummyPGroupConfig(options, 'group1', pconfigs=[pconfig1, pconfig2]) supervisord.process_groups = {'group1': DummyProcessGroup(gconfig)} supervisord.options.process_group_configs = [gconfig] interface = self._makeOne(supervisord) configs = interface.getAllConfigInfo() self.assertEqual(configs[0]['autostart'], True) self.assertEqual(configs[0]['stopwaitsecs'], 10) self.assertEqual(configs[0]['stdout_events_enabled'], False) self.assertEqual(configs[0]['stderr_events_enabled'], False) self.assertEqual(configs[0]['group'], 'group1') self.assertEqual(configs[0]['stdout_capture_maxbytes'], 0) self.assertEqual(configs[0]['name'], 'process1') self.assertEqual(configs[0]['stopsignal'], 15) self.assertEqual(configs[0]['stderr_syslog'], False) self.assertEqual(configs[0]['stdout_logfile_maxbytes'], 0) self.assertEqual(configs[0]['group_prio'], 999) self.assertEqual(configs[0]['killasgroup'], False) self.assertEqual(configs[0]['process_prio'], 999) self.assertEqual(configs[0]['stdout_syslog'], False) self.assertEqual(configs[0]['stderr_logfile_maxbytes'], 0) self.assertEqual(configs[0]['startsecs'], 10) self.assertEqual(configs[0]['redirect_stderr'], False) self.assertEqual(configs[0]['stdout_logfile'], 'auto') self.assertEqual(configs[0]['exitcodes'], (0,)) self.assertEqual(configs[0]['stderr_capture_maxbytes'], 0) self.assertEqual(configs[0]['startretries'], 999) self.assertEqual(configs[0]['stderr_logfile_maxbytes'], 0) self.assertEqual(configs[0]['inuse'], True) self.assertEqual(configs[0]['stderr_logfile'], 'auto') self.assertEqual(configs[0]['stdout_logfile_backups'], 0) assert 'test_rpcinterfaces.py' in configs[0]['command'] self.assertEqual(configs[1]['autostart'], True) self.assertEqual(configs[1]['stopwaitsecs'], 10) self.assertEqual(configs[1]['stdout_events_enabled'], False) self.assertEqual(configs[1]['stderr_events_enabled'], False) self.assertEqual(configs[1]['group'], 'group1') self.assertEqual(configs[1]['stdout_capture_maxbytes'], 0) self.assertEqual(configs[1]['name'], 'process2') self.assertEqual(configs[1]['stopsignal'], 15) self.assertEqual(configs[1]['stderr_syslog'], False) self.assertEqual(configs[1]['stdout_logfile_maxbytes'], 0) self.assertEqual(configs[1]['group_prio'], 999) self.assertEqual(configs[1]['killasgroup'], False) self.assertEqual(configs[1]['process_prio'], 999) self.assertEqual(configs[1]['stdout_syslog'], False) self.assertEqual(configs[1]['stderr_logfile_maxbytes'], 0) self.assertEqual(configs[1]['startsecs'], 10) self.assertEqual(configs[1]['redirect_stderr'], False) self.assertEqual(configs[1]['stdout_logfile'], 'none') self.assertEqual(configs[1]['exitcodes'], (0,)) self.assertEqual(configs[1]['stderr_capture_maxbytes'], 0) self.assertEqual(configs[1]['startretries'], 999) self.assertEqual(configs[1]['stderr_logfile_maxbytes'], 0) self.assertEqual(configs[1]['inuse'], True) self.assertEqual(configs[1]['stderr_logfile'], 'none') self.assertEqual(configs[1]['stdout_logfile_backups'], 0) assert 'test_rpcinterfaces.py' in configs[0]['command'] def test_getAllConfigInfo_filters_types_not_compatible_with_xmlrpc(self): options = DummyOptions() supervisord = DummySupervisor(options, 'foo') pconfig1 = DummyPConfig(options, 'process1', __file__) pconfig2 = DummyPConfig(options, 'process2', __file__) gconfig = DummyPGroupConfig(options, 'group1', pconfigs=[pconfig1, pconfig2]) supervisord.process_groups = {'group1': DummyProcessGroup(gconfig)} supervisord.options.process_group_configs = [gconfig] interface = self._makeOne(supervisord) unmarshallables = [type(None)] try: from enum import Enum unmarshallables.append(Enum) except ImportError: # python 2 pass for typ in unmarshallables: for config in interface.getAllConfigInfo(): for k, v in config.items(): self.assertFalse(isinstance(v, typ), k) def test__interpretProcessInfo(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) start = _NOW -100 stop = _NOW -1 from supervisor.process import ProcessStates running = {'name':'running', 'pid':1, 'state':ProcessStates.RUNNING, 'start':start, 'stop':stop, 'now':_NOW} description = interface._interpretProcessInfo(running) self.assertEqual(description, 'pid 1, uptime 0:01:40') fatal = {'name':'fatal', 'pid':2, 'state':ProcessStates.FATAL, 'start':start, 'stop':stop, 'now':_NOW, 'spawnerr':'Hosed'} description = interface._interpretProcessInfo(fatal) self.assertEqual(description, 'Hosed') fatal2 = {'name':'fatal', 'pid':2, 'state':ProcessStates.FATAL, 'start':start, 'stop':stop, 'now':_NOW, 'spawnerr':'',} description = interface._interpretProcessInfo(fatal2) self.assertEqual(description, 'unknown error (try "tail fatal")') stopped = {'name':'stopped', 'pid':3, 'state':ProcessStates.STOPPED, 'start':start, 'stop':stop, 'now':_NOW, 'spawnerr':'',} description = interface._interpretProcessInfo(stopped) from datetime import datetime stoptime = datetime(*time.localtime(stop)[:7]) self.assertEqual(description, stoptime.strftime(_TIMEFORMAT)) stopped2 = {'name':'stopped', 'pid':3, 'state':ProcessStates.STOPPED, 'start':0, 'stop':stop, 'now':_NOW, 'spawnerr':'',} description = interface._interpretProcessInfo(stopped2) self.assertEqual(description, 'Not started') def test__interpretProcessInfo_doesnt_report_negative_uptime(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) from supervisor.process import ProcessStates running = {'name': 'running', 'pid': 42, 'state': ProcessStates.RUNNING, 'start': _NOW + 10, # started in the future 'stop': None, 'now': _NOW} description = interface._interpretProcessInfo(running) self.assertEqual(description, 'pid 42, uptime 0:00:00') def test_getProcessInfo(self): from supervisor.process import ProcessStates options = DummyOptions() config = DummyPConfig(options, 'foo', '/bin/foo', stdout_logfile='/tmp/fleeb.bar') process = DummyProcess(config) process.pid = 111 process.laststart = 10 process.laststop = 11 pgroup_config = DummyPGroupConfig(options, name='foo') pgroup = DummyProcessGroup(pgroup_config) pgroup.processes = {'foo':process} supervisord = DummySupervisor(process_groups={'foo':pgroup}) interface = self._makeOne(supervisord) data = interface.getProcessInfo('foo') self.assertEqual(interface.update_text, 'getProcessInfo') self.assertEqual(data['logfile'], '/tmp/fleeb.bar') self.assertEqual(data['stdout_logfile'], '/tmp/fleeb.bar') self.assertEqual(data['stderr_logfile'], '') self.assertEqual(data['name'], 'foo') self.assertEqual(data['pid'], 111) self.assertEqual(data['start'], 10) self.assertEqual(data['stop'], 11) self.assertEqual(data['state'], ProcessStates.RUNNING) self.assertEqual(data['statename'], 'RUNNING') self.assertEqual(data['exitstatus'], 0) self.assertEqual(data['spawnerr'], '') self.assertTrue(data['description'].startswith('pid 111')) def test_getProcessInfo_logfile_NONE(self): options = DummyOptions() config = DummyPConfig(options, 'foo', '/bin/foo', stdout_logfile=None) process = DummyProcess(config) process.pid = 111 process.laststart = 10 process.laststop = 11 pgroup_config = DummyPGroupConfig(options, name='foo') pgroup = DummyProcessGroup(pgroup_config) pgroup.processes = {'foo':process} supervisord = DummySupervisor(process_groups={'foo':pgroup}) interface = self._makeOne(supervisord) data = interface.getProcessInfo('foo') self.assertEqual(data['logfile'], '') self.assertEqual(data['stdout_logfile'], '') def test_getProcessInfo_unknown_state(self): from supervisor.states import ProcessStates options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.UNKNOWN) interface = self._makeOne(supervisord) data = interface.getProcessInfo('foo') self.assertEqual(data['statename'], 'UNKNOWN') self.assertEqual(data['description'], '') def test_getProcessInfo_bad_name_when_bad_process(self): from supervisor import xmlrpc supervisord = DummySupervisor() interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.getProcessInfo, 'nonexistent') def test_getProcessInfo_bad_name_when_no_process(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.getProcessInfo, 'foo:') def test_getProcessInfo_caps_timestamps_exceeding_xmlrpc_maxint(self): from supervisor.compat import xmlrpclib options = DummyOptions() config = DummyPConfig(options, 'foo', '/bin/foo', stdout_logfile=None) process = DummyProcess(config) process.laststart = float(xmlrpclib.MAXINT + 1) process.laststop = float(xmlrpclib.MAXINT + 1) pgroup_config = DummyPGroupConfig(options, name='foo') pgroup = DummyProcessGroup(pgroup_config) pgroup.processes = {'foo':process} supervisord = DummySupervisor(process_groups={'foo':pgroup}) interface = self._makeOne(supervisord) interface._now = lambda: float(xmlrpclib.MAXINT + 1) data = interface.getProcessInfo('foo') self.assertEqual(data['start'], xmlrpclib.MAXINT) self.assertEqual(data['stop'], xmlrpclib.MAXINT) self.assertEqual(data['now'], xmlrpclib.MAXINT) def test_getAllProcessInfo(self): from supervisor.process import ProcessStates options = DummyOptions() p1config = DummyPConfig(options, 'process1', '/bin/process1', priority=1, stdout_logfile='/tmp/process1.log') p2config = DummyPConfig(options, 'process2', '/bin/process2', priority=2, stdout_logfile='/tmp/process2.log') supervisord = PopulatedDummySupervisor(options, 'gname', p1config, p2config) supervisord.set_procattr('process1', 'pid', 111) supervisord.set_procattr('process1', 'laststart', 10) supervisord.set_procattr('process1', 'laststop', 11) supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING) supervisord.set_procattr('process2', 'pid', 0) supervisord.set_procattr('process2', 'laststart', 20) supervisord.set_procattr('process2', 'laststop', 11) supervisord.set_procattr('process2', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) info = interface.getAllProcessInfo() self.assertEqual(interface.update_text, 'getProcessInfo') self.assertEqual(len(info), 2) p1info = info[0] self.assertEqual(p1info['logfile'], '/tmp/process1.log') self.assertEqual(p1info['stdout_logfile'], '/tmp/process1.log') self.assertEqual(p1info['stderr_logfile'], '') self.assertEqual(p1info['name'], 'process1') self.assertEqual(p1info['pid'], 111) self.assertEqual(p1info['start'], 10) self.assertEqual(p1info['stop'], 11) self.assertEqual(p1info['state'], ProcessStates.RUNNING) self.assertEqual(p1info['statename'], 'RUNNING') self.assertEqual(p1info['exitstatus'], 0) self.assertEqual(p1info['spawnerr'], '') self.assertEqual(p1info['group'], 'gname') self.assertTrue(p1info['description'].startswith('pid 111')) p2info = info[1] process2 = supervisord.process_groups['gname'].processes['process2'] self.assertEqual(p2info['logfile'], '/tmp/process2.log') self.assertEqual(p2info['stdout_logfile'], '/tmp/process2.log') self.assertEqual(p1info['stderr_logfile'], '') self.assertEqual(p2info['name'], 'process2') self.assertEqual(p2info['pid'], 0) self.assertEqual(p2info['start'], process2.laststart) self.assertEqual(p2info['stop'], 11) self.assertEqual(p2info['state'], ProcessStates.STOPPED) self.assertEqual(p2info['statename'], 'STOPPED') self.assertEqual(p2info['exitstatus'], 0) self.assertEqual(p2info['spawnerr'], '') self.assertEqual(p1info['group'], 'gname') from datetime import datetime starttime = datetime(*time.localtime(process2.laststart)[:7]) self.assertEqual(p2info['description'], starttime.strftime(_TIMEFORMAT)) def test_readProcessStdoutLog_unreadable(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'process1', '/bin/process1', priority=1, stdout_logfile='/tmp/process1.log') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.NO_FILE, interface.readProcessStdoutLog, 'process1', offset=0, length=1) def test_readProcessStdoutLog_badargs(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'process1', '/bin/process1', priority=1, stdout_logfile='/tmp/process1.log') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['process1'].processes['process1'] logfile = process.config.stdout_logfile try: with open(logfile, 'w+') as f: f.write('x' * 2048) self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS, interface.readProcessStdoutLog, 'process1', offset=-1, length=1) self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS, interface.readProcessStdoutLog, 'process1', offset=-1, length=-1) finally: os.remove(logfile) def test_readProcessStdoutLog_bad_name_no_process(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'process1', '/bin/process1', priority=1, stdout_logfile='/tmp/process1.log') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.readProcessStdoutLog, 'foo:*', offset=0, length=1) def test_readProcessStdoutLog(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stdout_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] logfile = process.config.stdout_logfile try: with open(logfile, 'w+') as f: f.write('x' * 2048) f.write('y' * 2048) data = interface.readProcessStdoutLog('foo', offset=0, length=0) self.assertEqual(interface.update_text, 'readProcessStdoutLog') self.assertEqual(data, ('x' * 2048) + ('y' * 2048)) data = interface.readProcessStdoutLog('foo', offset=2048, length=0) self.assertEqual(data, 'y' * 2048) data = interface.readProcessStdoutLog('foo', offset=0, length=2048) self.assertEqual(data, 'x' * 2048) data = interface.readProcessStdoutLog('foo', offset=-4, length=0) self.assertEqual(data, 'y' * 4) finally: os.remove(logfile) def test_readProcessLogAliasedTo_readProcessStdoutLog(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) self.assertEqual(interface.readProcessLog, interface.readProcessStdoutLog) def test_readProcessStderrLog_unreadable(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'process1', '/bin/process1', priority=1, stderr_logfile='/tmp/process1.log') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.NO_FILE, interface.readProcessStderrLog, 'process1', offset=0, length=1) def test_readProcessStderrLog_badargs(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'process1', '/bin/process1', priority=1, stderr_logfile='/tmp/process1.log') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['process1'].processes['process1'] logfile = process.config.stderr_logfile try: with open(logfile, 'w+') as f: f.write('x' * 2048) self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS, interface.readProcessStderrLog, 'process1', offset=-1, length=1) self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS, interface.readProcessStderrLog, 'process1', offset=-1, length=-1) finally: os.remove(logfile) def test_readProcessStderrLog_bad_name_no_process(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'process1', '/bin/process1', priority=1, stdout_logfile='/tmp/process1.log') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.readProcessStderrLog, 'foo:*', offset=0, length=1) def test_readProcessStderrLog(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stderr_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] logfile = process.config.stderr_logfile try: with open(logfile, 'w+') as f: f.write('x' * 2048) f.write('y' * 2048) data = interface.readProcessStderrLog('foo', offset=0, length=0) self.assertEqual(interface.update_text, 'readProcessStderrLog') self.assertEqual(data, ('x' * 2048) + ('y' * 2048)) data = interface.readProcessStderrLog('foo', offset=2048, length=0) self.assertEqual(data, 'y' * 2048) data = interface.readProcessStderrLog('foo', offset=0, length=2048) self.assertEqual(data, 'x' * 2048) data = interface.readProcessStderrLog('foo', offset=-4, length=0) self.assertEqual(data, 'y' * 4) finally: os.remove(logfile) def test_tailProcessStdoutLog_bad_name(self): from supervisor import xmlrpc supervisord = DummySupervisor() interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.tailProcessStdoutLog, 'BAD_NAME', 0, 10) def test_tailProcessStdoutLog_bad_name_no_process(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'process1', '/bin/process1', priority=1, stdout_logfile='/tmp/process1.log') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.tailProcessStdoutLog, 'foo:*', 0, 10) def test_tailProcessStdoutLog_all(self): # test entire log is returned when offset==0 and logsize < length from supervisor.compat import letters options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stdout_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] logfile = process.config.stdout_logfile try: with open(logfile, 'w+') as f: f.write(letters) data, offset, overflow = interface.tailProcessStdoutLog('foo', offset=0, length=len(letters)) self.assertEqual(interface.update_text, 'tailProcessStdoutLog') self.assertEqual(overflow, False) self.assertEqual(offset, len(letters)) self.assertEqual(data, letters) finally: os.remove(logfile) def test_tailProcessStdoutLog_none(self): # test nothing is returned when offset <= logsize from supervisor.compat import letters options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stdout_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] logfile = process.config.stdout_logfile try: with open(logfile, 'w+') as f: f.write(letters) # offset==logsize data, offset, overflow = interface.tailProcessStdoutLog('foo', offset=len(letters), length=100) self.assertEqual(interface.update_text, 'tailProcessStdoutLog') self.assertEqual(overflow, False) self.assertEqual(offset, len(letters)) self.assertEqual(data, '') # offset > logsize data, offset, overflow = interface.tailProcessStdoutLog('foo', offset=len(letters)+5, length=100) self.assertEqual(interface.update_text, 'tailProcessStdoutLog') self.assertEqual(overflow, False) self.assertEqual(offset, len(letters)) self.assertEqual(data, '') finally: os.remove(logfile) def test_tailProcessStdoutLog_overflow(self): # test buffer overflow occurs when logsize > offset+length from supervisor.compat import letters options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stdout_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] logfile = process.config.stdout_logfile try: with open(logfile, 'w+') as f: f.write(letters) data, offset, overflow = interface.tailProcessStdoutLog('foo', offset=0, length=5) self.assertEqual(interface.update_text, 'tailProcessStdoutLog') self.assertEqual(overflow, True) self.assertEqual(offset, len(letters)) self.assertEqual(data, letters[-5:]) finally: os.remove(logfile) def test_tailProcessStdoutLog_unreadable(self): # test nothing is returned if the log doesn't exist yet options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stdout_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) data, offset, overflow = interface.tailProcessStdoutLog('foo', offset=0, length=100) self.assertEqual(interface.update_text, 'tailProcessStdoutLog') self.assertEqual(overflow, False) self.assertEqual(offset, 0) self.assertEqual(data, '') def test_tailProcessLogAliasedTo_tailProcessStdoutLog(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) self.assertEqual(interface.tailProcessLog, interface.tailProcessStdoutLog) def test_tailProcessStderrLog_bad_name(self): from supervisor import xmlrpc supervisord = DummySupervisor() interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.tailProcessStderrLog, 'BAD_NAME', 0, 10) def test_tailProcessStderrLog_bad_name_no_process(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'process1', '/bin/process1', priority=1, stdout_logfile='/tmp/process1.log') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.tailProcessStderrLog, 'foo:*', 0, 10) def test_tailProcessStderrLog_all(self): # test entire log is returned when offset==0 and logsize < length from supervisor.compat import letters options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stderr_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] logfile = process.config.stderr_logfile try: with open(logfile, 'w+') as f: f.write(letters) data, offset, overflow = interface.tailProcessStderrLog('foo', offset=0, length=len(letters)) self.assertEqual(interface.update_text, 'tailProcessStderrLog') self.assertEqual(overflow, False) self.assertEqual(offset, len(letters)) self.assertEqual(data, letters) finally: os.remove(logfile) def test_tailProcessStderrLog_none(self): # test nothing is returned when offset <= logsize from supervisor.compat import letters options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stderr_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] logfile = process.config.stderr_logfile try: with open(logfile, 'w+') as f: f.write(letters) # offset==logsize data, offset, overflow = interface.tailProcessStderrLog('foo', offset=len(letters), length=100) self.assertEqual(interface.update_text, 'tailProcessStderrLog') self.assertEqual(overflow, False) self.assertEqual(offset, len(letters)) self.assertEqual(data, '') # offset > logsize data, offset, overflow = interface.tailProcessStderrLog('foo', offset=len(letters)+5, length=100) self.assertEqual(interface.update_text, 'tailProcessStderrLog') self.assertEqual(overflow, False) self.assertEqual(offset, len(letters)) self.assertEqual(data, '') finally: os.remove(logfile) def test_tailProcessStderrLog_overflow(self): # test buffer overflow occurs when logsize > offset+length from supervisor.compat import letters options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stderr_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] logfile = process.config.stderr_logfile try: with open(logfile, 'w+') as f: f.write(letters) data, offset, overflow = interface.tailProcessStderrLog('foo', offset=0, length=5) self.assertEqual(interface.update_text, 'tailProcessStderrLog') self.assertEqual(overflow, True) self.assertEqual(offset, len(letters)) self.assertEqual(data, letters[-5:]) finally: os.remove(logfile) def test_tailProcessStderrLog_unreadable(self): # test nothing is returned if the log doesn't exist yet options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo', stderr_logfile='/tmp/fooooooo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) data, offset, overflow = interface.tailProcessStderrLog('foo', offset=0, length=100) self.assertEqual(interface.update_text, 'tailProcessStderrLog') self.assertEqual(overflow, False) self.assertEqual(offset, 0) self.assertEqual(data, '') def test_clearProcessLogs_bad_name_no_group(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'foo', 'foo') process = DummyProcess(pconfig) pgroup = DummyProcessGroup(None) pgroup.processes = {'foo': process} supervisord = DummySupervisor(process_groups={'foo':pgroup}) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.clearProcessLogs, 'badgroup') def test_clearProcessLogs_bad_name_no_process(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'foo', 'foo') process = DummyProcess(pconfig) pgroup = DummyProcessGroup(None) pgroup.processes = {'foo': process} supervisord = DummySupervisor(process_groups={'foo':pgroup}) interface = self._makeOne(supervisord) self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.clearProcessLogs, 'foo:*') def test_clearProcessLogs(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', 'foo') process = DummyProcess(pconfig) pgroup = DummyProcessGroup(None) pgroup.processes = {'foo': process} supervisord = DummySupervisor(process_groups={'foo':pgroup}) interface = self._makeOne(supervisord) interface.clearProcessLogs('foo') self.assertEqual(process.logsremoved, True) def test_clearProcessLogs_failed(self): from supervisor import xmlrpc options = DummyOptions() pconfig = DummyPConfig(options, 'foo', 'foo') process = DummyProcess(pconfig) pgroup = DummyProcessGroup(None) pgroup.processes = {'foo': process} process.error_at_clear = True supervisord = DummySupervisor(process_groups={'foo':pgroup}) interface = self._makeOne(supervisord) self.assertRaises(xmlrpc.RPCError, interface.clearProcessLogs, 'foo') def test_clearProcessLogAliasedTo_clearProcessLogs(self): options = DummyOptions() pconfig = DummyPConfig(options, 'foo', '/bin/foo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) interface = self._makeOne(supervisord) self.assertEqual(interface.clearProcessLog, interface.clearProcessLogs) def test_clearAllProcessLogs(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo', priority=1) pconfig2 = DummyPConfig(options, 'process2', 'bar', priority=2) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) interface = self._makeOne(supervisord) callback = interface.clearAllProcessLogs() callback() results = callback() from supervisor import xmlrpc self.assertEqual(results[0], {'name':'process1', 'group':'foo', 'status':xmlrpc.Faults.SUCCESS, 'description':'OK'}) self.assertEqual(results[1], {'name':'process2', 'group':'foo', 'status':xmlrpc.Faults.SUCCESS, 'description':'OK'}) process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.logsremoved, True) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.logsremoved, True) def test_clearAllProcessLogs_onefails(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo', priority=1) pconfig2 = DummyPConfig(options, 'process2', 'bar', priority=2) supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1, pconfig2) supervisord.set_procattr('process1', 'error_at_clear', True) interface = self._makeOne(supervisord) callback = interface.clearAllProcessLogs() callback() results = callback() process1 = supervisord.process_groups['foo'].processes['process1'] self.assertEqual(process1.logsremoved, False) process2 = supervisord.process_groups['foo'].processes['process2'] self.assertEqual(process2.logsremoved, True) self.assertEqual(len(results), 2) from supervisor import xmlrpc self.assertEqual(results[0], {'name':'process1', 'group':'foo', 'status':xmlrpc.Faults.FAILED, 'description':'FAILED: foo:process1'}) self.assertEqual(results[1], {'name':'process2', 'group':'foo', 'status':xmlrpc.Faults.SUCCESS, 'description':'OK'}) def test_clearAllProcessLogs_no_processes(self): supervisord = DummySupervisor() self.assertEqual(supervisord.process_groups, {}) interface = self._makeOne(supervisord) callback = interface.clearAllProcessLogs() results = callback() self.assertEqual(results, []) def test_sendProcessStdin_raises_incorrect_params_when_not_chars(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1) interface = self._makeOne(supervisord) thing_not_chars = 42 from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.INCORRECT_PARAMETERS, interface.sendProcessStdin, 'process1', thing_not_chars) def test_sendProcessStdin_raises_bad_name_when_bad_process(self): supervisord = DummySupervisor() interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.sendProcessStdin, 'nonexistent', 'chars for stdin') def test_sendProcessStdin_raises_bad_name_when_no_process(self): options = DummyOptions() supervisord = PopulatedDummySupervisor(options, 'foo') interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.BAD_NAME, interface.sendProcessStdin, 'foo:*', 'chars for stdin') def test_sendProcessStdin_raises_not_running_when_not_process_pid(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig1) supervisord.set_procattr('process1', 'pid', 0) interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.NOT_RUNNING, interface.sendProcessStdin, 'process1', 'chars for stdin') def test_sendProcessStdin_raises_not_running_when_killing(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig1) supervisord.set_procattr('process1', 'pid', 42) supervisord.set_procattr('process1', 'killing', True) interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.NOT_RUNNING, interface.sendProcessStdin, 'process1', 'chars for stdin') def test_sendProcessStdin_raises_no_file_when_write_raises_epipe(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig1) supervisord.set_procattr('process1', 'pid', 42) supervisord.set_procattr('process1', 'killing', False) supervisord.set_procattr('process1', 'write_exception', OSError(errno.EPIPE, os.strerror(errno.EPIPE))) interface = self._makeOne(supervisord) from supervisor import xmlrpc self._assertRPCError(xmlrpc.Faults.NO_FILE, interface.sendProcessStdin, 'process1', 'chars for stdin') def test_sendProcessStdin_reraises_other_oserrors(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig1) supervisord.set_procattr('process1', 'pid', 42) supervisord.set_procattr('process1', 'killing', False) supervisord.set_procattr('process1', 'write_exception', OSError(errno.EINTR, os.strerror(errno.EINTR))) interface = self._makeOne(supervisord) self.assertRaises(OSError, interface.sendProcessStdin, 'process1', 'chars for stdin') def test_sendProcessStdin_writes_chars_and_returns_true(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig1) supervisord.set_procattr('process1', 'pid', 42) interface = self._makeOne(supervisord) chars = b'chars for stdin' self.assertTrue(interface.sendProcessStdin('process1', chars)) self.assertEqual('sendProcessStdin', interface.update_text) process1 = supervisord.process_groups['process1'].processes['process1'] self.assertEqual(process1.stdin_buffer, chars) def test_sendProcessStdin_unicode_encoded_to_utf8(self): options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') supervisord = PopulatedDummySupervisor(options, 'process1', pconfig1) supervisord.set_procattr('process1', 'pid', 42) interface = self._makeOne(supervisord) interface.sendProcessStdin('process1', as_string(b'fi\xc3\xad')) process1 = supervisord.process_groups['process1'].processes['process1'] self.assertEqual(process1.stdin_buffer, b'fi\xc3\xad') def test_sendRemoteCommEvent_notifies_subscribers(self): options = DummyOptions() supervisord = DummySupervisor(options) interface = self._makeOne(supervisord) from supervisor import events L = [] def callback(event): L.append(event) try: events.callbacks[:] = [(events.RemoteCommunicationEvent, callback)] result = interface.sendRemoteCommEvent('foo', 'bar') finally: events.callbacks[:] = [] events.clear() self.assertTrue(result) self.assertEqual(len(L), 1) event = L[0] self.assertEqual(event.type, 'foo') self.assertEqual(event.data, 'bar') def test_sendRemoteCommEvent_unicode_encoded_to_utf8(self): options = DummyOptions() supervisord = DummySupervisor(options) interface = self._makeOne(supervisord) from supervisor import events L = [] def callback(event): L.append(event) try: events.callbacks[:] = [(events.RemoteCommunicationEvent, callback)] result = interface.sendRemoteCommEvent( as_string('fií once'), as_string('fií twice'), ) finally: events.callbacks[:] = [] events.clear() self.assertTrue(result) self.assertEqual(len(L), 1) event = L[0] if PY2: self.assertEqual(event.type, 'fi\xc3\xad once') self.assertEqual(event.data, 'fi\xc3\xad twice') else: self.assertEqual(event.type, 'fií once') self.assertEqual(event.data, 'fií twice') class SystemNamespaceXMLRPCInterfaceTests(TestBase): def _getTargetClass(self): from supervisor import xmlrpc return xmlrpc.SystemNamespaceRPCInterface def _makeOne(self): from supervisor import rpcinterface supervisord = DummySupervisor() supervisor = rpcinterface.SupervisorNamespaceRPCInterface(supervisord) return self._getTargetClass()( [('supervisor', supervisor), ] ) def test_ctor(self): interface = self._makeOne() self.assertTrue(interface.namespaces['supervisor']) self.assertTrue(interface.namespaces['system']) def test_listMethods(self): interface = self._makeOne() methods = interface.listMethods() methods.sort() keys = list(interface._listMethods().keys()) keys.sort() self.assertEqual(methods, keys) def test_methodSignature(self): from supervisor import xmlrpc interface = self._makeOne() self._assertRPCError(xmlrpc.Faults.SIGNATURE_UNSUPPORTED, interface.methodSignature, ['foo.bar']) result = interface.methodSignature('system.methodSignature') self.assertEqual(result, ['array', 'string']) def test_allMethodDocs(self): from supervisor import xmlrpc # belt-and-suspenders test for docstring-as-typing parsing correctness # and documentation validity vs. implementation _RPCTYPES = ['int', 'double', 'string', 'boolean', 'dateTime.iso8601', 'base64', 'binary', 'array', 'struct'] interface = self._makeOne() methods = interface._listMethods() for k in methods.keys(): # if a method doesn't have a @return value, an RPCError is raised. # Detect that here. try: interface.methodSignature(k) except xmlrpc.RPCError: raise AssertionError('methodSignature for %s raises ' 'RPCError (missing @return doc?)' % k) # we want to test that the number of arguments implemented in # the function is the same as the number of arguments implied by # the doc @params, and that they show up in the same order. ns_name, method_name = k.split('.', 1) namespace = interface.namespaces[ns_name] meth = getattr(namespace, method_name) try: code = meth.func_code except Exception: code = meth.__code__ argnames = code.co_varnames[1:code.co_argcount] parsed = xmlrpc.gettags(str(meth.__doc__)) plines = [] ptypes = [] pnames = [] ptexts = [] rlines = [] rtypes = [] rnames = [] rtexts = [] for thing in parsed: if thing[1] == 'param': # tag name plines.append(thing[0]) # doc line number ptypes.append(thing[2]) # data type pnames.append(thing[3]) # function name ptexts.append(thing[4]) # description elif thing[1] == 'return': # tag name rlines.append(thing[0]) # doc line number rtypes.append(thing[2]) # data type rnames.append(thing[3]) # function name rtexts.append(thing[4]) # description elif thing[1] is not None: raise AssertionError( 'unknown tag type %s for %s, parsed %s' % (thing[1], k, parsed)) # param tokens if len(argnames) != len(pnames): raise AssertionError('Incorrect documentation ' '(%s args, %s doc params) in %s' % (len(argnames), len(pnames), k)) for docline in plines: self.assertTrue(type(docline) == int, (docline, type(docline), k, parsed)) for doctype in ptypes: self.assertTrue(doctype in _RPCTYPES, doctype) for x in range(len(pnames)): if pnames[x] != argnames[x]: msg = 'Name wrong: (%s vs. %s in %s)\n%s' % (pnames[x], argnames[x], k, parsed) raise AssertionError(msg) for doctext in ptexts: self.assertTrue(type(doctext) == type(''), doctext) # result tokens if len(rlines) > 1: raise AssertionError( 'Duplicate @return values in docs for %s' % k) for docline in rlines: self.assertTrue(type(docline) == int, (docline, type(docline), k)) for doctype in rtypes: self.assertTrue(doctype in _RPCTYPES, doctype) for docname in rnames: self.assertTrue(type(docname) == type(''), (docname, type(docname), k)) for doctext in rtexts: self.assertTrue(type(doctext) == type(''), (doctext, type(doctext), k)) def test_multicall_simplevals(self): interface = self._makeOne() results = interface.multicall([ {'methodName':'system.methodHelp', 'params':['system.methodHelp']}, {'methodName':'system.listMethods', 'params':[]}, ]) self.assertEqual(results[0], interface.methodHelp('system.methodHelp')) self.assertEqual(results[1], interface.listMethods()) def test_multicall_recursion_guard(self): from supervisor import xmlrpc interface = self._makeOne() results = interface.multicall([ {'methodName': 'system.multicall', 'params': []}, ]) e = xmlrpc.RPCError(xmlrpc.Faults.INCORRECT_PARAMETERS, 'Recursive system.multicall forbidden') recursion_fault = {'faultCode': e.code, 'faultString': e.text} self.assertEqual(results, [recursion_fault]) def test_multicall_nested_callback(self): from supervisor import http interface = self._makeOne() callback = interface.multicall([ {'methodName':'supervisor.stopAllProcesses'}]) results = http.NOT_DONE_YET while results is http.NOT_DONE_YET: results = callback() self.assertEqual(results[0], []) def test_methodHelp(self): from supervisor import xmlrpc interface = self._makeOne() self._assertRPCError(xmlrpc.Faults.SIGNATURE_UNSUPPORTED, interface.methodHelp, ['foo.bar']) help = interface.methodHelp('system.methodHelp') self.assertEqual(help, interface.methodHelp.__doc__) class Test_make_allfunc(unittest.TestCase): def _callFUT(self, processes, predicate, func, **extra_kwargs): from supervisor.rpcinterface import make_allfunc return make_allfunc(processes, predicate, func, **extra_kwargs) def test_rpcerror_nocallbacks(self): from supervisor import xmlrpc def cb(name, **kw): raise xmlrpc.RPCError(xmlrpc.Faults.FAILED) options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') proc = DummyProcess(pconfig1) group = DummyProcessGroup(pconfig1) def pred(proc): return True af = self._callFUT([(group, proc)], pred, cb) result = af() self.assertEqual(result, [{'description': 'FAILED', 'group': 'process1', 'name': 'process1', 'status': xmlrpc.Faults.FAILED}]) def test_func_callback_normal_return_val(self): def cb(name, **kw): return lambda: 1 options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') proc = DummyProcess(pconfig1) group = DummyProcessGroup(pconfig1) def pred(proc): return True af = self._callFUT([(group, proc)], pred, cb) result = af() self.assertEqual( result, [{'group': 'process1', 'description': 'OK', 'status': 80, 'name': 'process1'}] ) def test_func_callback_raises_RPCError(self): from supervisor import xmlrpc def cb(name, **kw): def inner(): raise xmlrpc.RPCError(xmlrpc.Faults.FAILED) return inner options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') proc = DummyProcess(pconfig1) group = DummyProcessGroup(pconfig1) def pred(proc): return True af = self._callFUT([(group, proc)], pred, cb) result = af() self.assertEqual( result, [{'description': 'FAILED', 'group': 'process1', 'status': 30, 'name': 'process1'}] ) def test_func_callback_returns_NOT_DONE_YET(self): from supervisor.http import NOT_DONE_YET def cb(name, **kw): def inner(): return NOT_DONE_YET return inner options = DummyOptions() pconfig1 = DummyPConfig(options, 'process1', 'foo') proc = DummyProcess(pconfig1) group = DummyProcessGroup(pconfig1) def pred(proc): return True af = self._callFUT([(group, proc)], pred, cb) result = af() self.assertEqual( result, NOT_DONE_YET, ) class Test_make_main_rpcinterface(unittest.TestCase): def _callFUT(self, supervisord): from supervisor.rpcinterface import make_main_rpcinterface return make_main_rpcinterface(supervisord) def test_it(self): inst = self._callFUT(None) self.assertEqual( inst.__class__.__name__, 'SupervisorNamespaceRPCInterface' ) class DummyRPCInterface: def hello(self): return 'Hello!' def test_suite(): return unittest.findTestCases(sys.modules[__name__]) if __name__ == '__main__': unittest.main(defaultTest='test_suite')