Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.219.220.133
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/supervisor/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/supervisor/rpcinterface.py
import os
import time
import datetime
import errno
import types

from supervisor.compat import as_string
from supervisor.compat import as_bytes
from supervisor.compat import unicode

from supervisor.datatypes import (
    Automatic,
    signal_number,
    )

from supervisor.options import readFile
from supervisor.options import tailFile
from supervisor.options import BadCommand
from supervisor.options import NotExecutable
from supervisor.options import NotFound
from supervisor.options import NoPermission
from supervisor.options import make_namespec
from supervisor.options import split_namespec
from supervisor.options import VERSION

from supervisor.events import notify
from supervisor.events import RemoteCommunicationEvent

from supervisor.http import NOT_DONE_YET
from supervisor.xmlrpc import (
    capped_int,
    Faults,
    RPCError,
    )

from supervisor.states import SupervisorStates
from supervisor.states import getSupervisorStateDescription
from supervisor.states import ProcessStates
from supervisor.states import getProcessStateDescription
from supervisor.states import (
    RUNNING_STATES,
    STOPPED_STATES,
    SIGNALLABLE_STATES
    )

API_VERSION  = '3.0'

class SupervisorNamespaceRPCInterface:
    def __init__(self, supervisord):
        self.supervisord = supervisord

    def _update(self, text):
        self.update_text = text # for unit tests, mainly
        if ( isinstance(self.supervisord.options.mood, int) and
             self.supervisord.options.mood < SupervisorStates.RUNNING ):
            raise RPCError(Faults.SHUTDOWN_STATE)

    # RPC API methods

    def getAPIVersion(self):
        """ Return the version of the RPC API used by supervisord

        @return string version id
        """
        self._update('getAPIVersion')
        return API_VERSION

    getVersion = getAPIVersion # b/w compatibility with releases before 3.0

    def getSupervisorVersion(self):
        """ Return the version of the supervisor package in use by supervisord

        @return string version id
        """
        self._update('getSupervisorVersion')
        return VERSION

    def getIdentification(self):
        """ Return identifying string of supervisord

        @return string identifier identifying string
        """
        self._update('getIdentification')
        return self.supervisord.options.identifier

    def getState(self):
        """ Return current state of supervisord as a struct

        @return struct A struct with keys int statecode, string statename
        """
        self._update('getState')

        state = self.supervisord.options.mood
        statename = getSupervisorStateDescription(state)
        data =  {
            'statecode':state,
            'statename':statename,
            }
        return data

    def getPID(self):
        """ Return the PID of supervisord

        @return int PID
        """
        self._update('getPID')
        return self.supervisord.options.get_pid()

    def readLog(self, offset, length):
        """ Read length bytes from the main log starting at offset

        @param int offset         offset to start reading from.
        @param int length         number of bytes to read from the log.
        @return string result     Bytes of log
        """
        self._update('readLog')

        logfile = self.supervisord.options.logfile

        if logfile is None or not os.path.exists(logfile):
            raise RPCError(Faults.NO_FILE, logfile)

        try:
            return as_string(readFile(logfile, int(offset), int(length)))
        except ValueError as inst:
            why = inst.args[0]
            raise RPCError(getattr(Faults, why))

    readMainLog = readLog # b/w compatibility with releases before 2.1

    def clearLog(self):
        """ Clear the main log.

        @return boolean result always returns True unless error
        """
        self._update('clearLog')

        logfile = self.supervisord.options.logfile
        if logfile is None or not self.supervisord.options.exists(logfile):
            raise RPCError(Faults.NO_FILE)

        # there is a race condition here, but ignore it.
        try:
            self.supervisord.options.remove(logfile)
        except (OSError, IOError):
            raise RPCError(Faults.FAILED)

        for handler in self.supervisord.options.logger.handlers:
            if hasattr(handler, 'reopen'):
                self.supervisord.options.logger.info('reopening log file')
                handler.reopen()
        return True

    def shutdown(self):
        """ Shut down the supervisor process

        @return boolean result always returns True unless error
        """
        self._update('shutdown')
        self.supervisord.options.mood = SupervisorStates.SHUTDOWN
        return True

    def restart(self):
        """ Restart the supervisor process

        @return boolean result  always return True unless error
        """
        self._update('restart')

        self.supervisord.options.mood = SupervisorStates.RESTARTING
        return True

    def reloadConfig(self):
        """
        Reload the configuration.

        The result contains three arrays containing names of process
        groups:

        * `added` gives the process groups that have been added
        * `changed` gives the process groups whose contents have
          changed
        * `removed` gives the process groups that are no longer
          in the configuration

        @return array result  [[added, changed, removed]]

        """
        self._update('reloadConfig')
        try:
            self.supervisord.options.process_config(do_usage=False)
        except ValueError as msg:
            raise RPCError(Faults.CANT_REREAD, msg)

        added, changed, removed = self.supervisord.diff_to_active()

        added = [group.name for group in added]
        changed = [group.name for group in changed]
        removed = [group.name for group in removed]
        return [[added, changed, removed]] # cannot return len > 1, apparently

    def addProcessGroup(self, name):
        """ Update the config for a running process from config file.

        @param string name         name of process group to add
        @return boolean result     true if successful
        """
        self._update('addProcessGroup')

        for config in self.supervisord.options.process_group_configs:
            if config.name == name:
                result = self.supervisord.add_process_group(config)
                if not result:
                    raise RPCError(Faults.ALREADY_ADDED, name)
                return True
        raise RPCError(Faults.BAD_NAME, name)

    def removeProcessGroup(self, name):
        """ Remove a stopped process from the active configuration.

        @param string name         name of process group to remove
        @return boolean result     Indicates whether the removal was successful
        """
        self._update('removeProcessGroup')
        if name not in self.supervisord.process_groups:
            raise RPCError(Faults.BAD_NAME, name)

        result = self.supervisord.remove_process_group(name)
        if not result:
            raise RPCError(Faults.STILL_RUNNING, name)
        return True

    def _getAllProcesses(self, lexical=False):
        # if lexical is true, return processes sorted in lexical order,
        # otherwise, sort in priority order
        all_processes = []

        if lexical:
            group_names = list(self.supervisord.process_groups.keys())
            group_names.sort()
            for group_name in group_names:
                group = self.supervisord.process_groups[group_name]
                process_names = list(group.processes.keys())
                process_names.sort()
                for process_name in process_names:
                    process = group.processes[process_name]
                    all_processes.append((group, process))
        else:
            groups = list(self.supervisord.process_groups.values())
            groups.sort() # asc by priority

            for group in groups:
                processes = list(group.processes.values())
                processes.sort() # asc by priority
                for process in processes:
                    all_processes.append((group, process))

        return all_processes

    def _getGroupAndProcess(self, name):
        # get process to start from name
        group_name, process_name = split_namespec(name)

        group = self.supervisord.process_groups.get(group_name)
        if group is None:
            raise RPCError(Faults.BAD_NAME, name)

        if process_name is None:
            return group, None

        process = group.processes.get(process_name)
        if process is None:
            raise RPCError(Faults.BAD_NAME, name)

        return group, process

    def startProcess(self, name, wait=True):
        """ Start a process

        @param string name Process name (or ``group:name``, or ``group:*``)
        @param boolean wait Wait for process to be fully started
        @return boolean result     Always true unless error

        """
        self._update('startProcess')
        group, process = self._getGroupAndProcess(name)
        if process is None:
            group_name, process_name = split_namespec(name)
            return self.startProcessGroup(group_name, wait)

        # test filespec, don't bother trying to spawn if we know it will
        # eventually fail
        try:
            filename, argv = process.get_execv_args()
        except NotFound as why:
            raise RPCError(Faults.NO_FILE, why.args[0])
        except (BadCommand, NotExecutable, NoPermission) as why:
            raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])

        if process.get_state() in RUNNING_STATES:
            raise RPCError(Faults.ALREADY_STARTED, name)

        if process.get_state() == ProcessStates.UNKNOWN:
            raise RPCError(Faults.FAILED,
                           "%s is in an unknown process state" % name)

        process.spawn()

        # We call reap() in order to more quickly obtain the side effects of
        # process.finish(), which reap() eventually ends up calling.  This
        # might be the case if the spawn() was successful but then the process
        # died before its startsecs elapsed or it exited with an unexpected
        # exit code. In particular, finish() may set spawnerr, which we can
        # check and immediately raise an RPCError, avoiding the need to
        # defer by returning a callback.

        self.supervisord.reap()

        if process.spawnerr:
            raise RPCError(Faults.SPAWN_ERROR, name)

        # We call process.transition() in order to more quickly obtain its
        # side effects.  In particular, it might set the process' state from
        # STARTING->RUNNING if the process has a startsecs==0.
        process.transition()

        if wait and process.get_state() != ProcessStates.RUNNING:
            # by default, this branch will almost always be hit for processes
            # with default startsecs configurations, because the default number
            # of startsecs for a process is "1", and the process will not have
            # entered the RUNNING state yet even though we've called
            # transition() on it.  This is because a process is not considered
            # RUNNING until it has stayed up > startsecs.

            def onwait():
                if process.spawnerr:
                    raise RPCError(Faults.SPAWN_ERROR, name)

                state = process.get_state()

                if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
                    raise RPCError(Faults.ABNORMAL_TERMINATION, name)

                if state == ProcessStates.RUNNING:
                    return True

                return NOT_DONE_YET

            onwait.delay = 0.05
            onwait.rpcinterface = self
            return onwait # deferred

        return True

    def startProcessGroup(self, name, wait=True):
        """ Start all processes in the group named 'name'

        @param string name     The group name
        @param boolean wait    Wait for each process to be fully started
        @return array result   An array of process status info structs
        """
        self._update('startProcessGroup')

        group = self.supervisord.process_groups.get(name)

        if group is None:
            raise RPCError(Faults.BAD_NAME, name)

        processes = list(group.processes.values())
        processes.sort()
        processes = [ (group, process) for process in processes ]

        startall = make_allfunc(processes, isNotRunning, self.startProcess,
                                wait=wait)

        startall.delay = 0.05
        startall.rpcinterface = self
        return startall # deferred

    def startAllProcesses(self, wait=True):
        """ Start all processes listed in the configuration file

        @param boolean wait    Wait for each process to be fully started
        @return array result   An array of process status info structs
        """
        self._update('startAllProcesses')

        processes = self._getAllProcesses()
        startall = make_allfunc(processes, isNotRunning, self.startProcess,
                                wait=wait)

        startall.delay = 0.05
        startall.rpcinterface = self
        return startall # deferred

    def stopProcess(self, name, wait=True):
        """ Stop a process named by name

        @param string name  The name of the process to stop (or 'group:name')
        @param boolean wait        Wait for the process to be fully stopped
        @return boolean result     Always return True unless error
        """
        self._update('stopProcess')

        group, process = self._getGroupAndProcess(name)

        if process is None:
            group_name, process_name = split_namespec(name)
            return self.stopProcessGroup(group_name, wait)

        if process.get_state() not in RUNNING_STATES:
            raise RPCError(Faults.NOT_RUNNING, name)

        msg = process.stop()
        if msg is not None:
            raise RPCError(Faults.FAILED, msg)

        # We'll try to reap any killed child. FWIW, reap calls waitpid, and
        # then, if waitpid returns a pid, calls finish() on the process with
        # that pid, which drains any I/O from the process' dispatchers and
        # changes the process' state.  I chose to call reap without once=True
        # because we don't really care if we reap more than one child.  Even if
        # we only reap one child. we may not even be reaping the child that we
        # just stopped (this is all async, and process.stop() may not work, and
        # we'll need to wait for SIGKILL during process.transition() as the
        # result of normal select looping).

        self.supervisord.reap()

        if wait and process.get_state() not in STOPPED_STATES:

            def onwait():
                # process will eventually enter a stopped state by
                # virtue of the supervisord.reap() method being called
                # during normal operations
                process.stop_report()
                if process.get_state() not in STOPPED_STATES:
                    return NOT_DONE_YET
                return True

            onwait.delay = 0
            onwait.rpcinterface = self
            return onwait # deferred

        return True

    def stopProcessGroup(self, name, wait=True):
        """ Stop all processes in the process group named 'name'

        @param string name     The group name
        @param boolean wait    Wait for each process to be fully stopped
        @return array result   An array of process status info structs
        """
        self._update('stopProcessGroup')

        group = self.supervisord.process_groups.get(name)

        if group is None:
            raise RPCError(Faults.BAD_NAME, name)

        processes = list(group.processes.values())
        processes.sort()
        processes = [ (group, process) for process in processes ]

        killall = make_allfunc(processes, isRunning, self.stopProcess,
                               wait=wait)

        killall.delay = 0.05
        killall.rpcinterface = self
        return killall # deferred

    def stopAllProcesses(self, wait=True):
        """ Stop all processes in the process list

        @param  boolean wait   Wait for each process to be fully stopped
        @return array result   An array of process status info structs
        """
        self._update('stopAllProcesses')

        processes = self._getAllProcesses()

        killall = make_allfunc(processes, isRunning, self.stopProcess,
                               wait=wait)

        killall.delay = 0.05
        killall.rpcinterface = self
        return killall # deferred

    def signalProcess(self, name, signal):
        """ Send an arbitrary UNIX signal to the process named by name

        @param string name    Name of the process to signal (or 'group:name')
        @param string signal  Signal to send, as name ('HUP') or number ('1')
        @return boolean
        """

        self._update('signalProcess')

        group, process = self._getGroupAndProcess(name)

        if process is None:
            group_name, process_name = split_namespec(name)
            return self.signalProcessGroup(group_name, signal=signal)

        try:
            sig = signal_number(signal)
        except ValueError:
            raise RPCError(Faults.BAD_SIGNAL, signal)

        if process.get_state() not in SIGNALLABLE_STATES:
            raise RPCError(Faults.NOT_RUNNING, name)

        msg = process.signal(sig)

        if not msg is None:
            raise RPCError(Faults.FAILED, msg)

        return True

    def signalProcessGroup(self, name, signal):
        """ Send a signal to all processes in the group named 'name'

        @param string name    The group name
        @param string signal  Signal to send, as name ('HUP') or number ('1')
        @return array
        """

        group = self.supervisord.process_groups.get(name)
        self._update('signalProcessGroup')

        if group is None:
            raise RPCError(Faults.BAD_NAME, name)

        processes = list(group.processes.values())
        processes.sort()
        processes = [(group, process) for process in processes]

        sendall = make_allfunc(processes, isSignallable, self.signalProcess,
                               signal=signal)
        result = sendall()
        self._update('signalProcessGroup')

        return result

    def signalAllProcesses(self, signal):
        """ Send a signal to all processes in the process list

        @param string signal  Signal to send, as name ('HUP') or number ('1')
        @return array         An array of process status info structs
        """
        processes = self._getAllProcesses()
        signalall = make_allfunc(processes, isSignallable, self.signalProcess,
            signal=signal)
        result = signalall()
        self._update('signalAllProcesses')
        return result

    def getAllConfigInfo(self):
        """ Get info about all available process configurations. Each struct
        represents a single process (i.e. groups get flattened).

        @return array result  An array of process config info structs
        """
        self._update('getAllConfigInfo')

        configinfo = []
        for gconfig in self.supervisord.options.process_group_configs:
            inuse = gconfig.name in self.supervisord.process_groups
            for pconfig in gconfig.process_configs:
                d = {'autostart': pconfig.autostart,
                     'directory': pconfig.directory,
                     'uid': pconfig.uid,
                     'command': pconfig.command,
                     'exitcodes': pconfig.exitcodes,
                     'group': gconfig.name,
                     'group_prio': gconfig.priority,
                     'inuse': inuse,
                     'killasgroup': pconfig.killasgroup,
                     'name': pconfig.name,
                     'process_prio': pconfig.priority,
                     'redirect_stderr': pconfig.redirect_stderr,
                     'startretries': pconfig.startretries,
                     'startsecs': pconfig.startsecs,
                     'stdout_capture_maxbytes': pconfig.stdout_capture_maxbytes,
                     'stdout_events_enabled': pconfig.stdout_events_enabled,
                     'stdout_logfile': pconfig.stdout_logfile,
                     'stdout_logfile_backups': pconfig.stdout_logfile_backups,
                     'stdout_logfile_maxbytes': pconfig.stdout_logfile_maxbytes,
                     'stdout_syslog': pconfig.stdout_syslog,
                     'stopsignal': int(pconfig.stopsignal), # enum on py3
                     'stopwaitsecs': pconfig.stopwaitsecs,
                     'stderr_capture_maxbytes': pconfig.stderr_capture_maxbytes,
                     'stderr_events_enabled': pconfig.stderr_events_enabled,
                     'stderr_logfile': pconfig.stderr_logfile,
                     'stderr_logfile_backups': pconfig.stderr_logfile_backups,
                     'stderr_logfile_maxbytes': pconfig.stderr_logfile_maxbytes,
                     'stderr_syslog': pconfig.stderr_syslog,
                     'serverurl': pconfig.serverurl,
                    }
                # no support for these types in xml-rpc
                d.update((k, 'auto') for k, v in d.items() if v is Automatic)
                d.update((k, 'none') for k, v in d.items() if v is None)
                configinfo.append(d)

        configinfo.sort(key=lambda r: r['name'])
        return configinfo

    def _interpretProcessInfo(self, info):
        state = info['state']

        if state == ProcessStates.RUNNING:
            start = info['start']
            now = info['now']
            start_dt = datetime.datetime(*time.gmtime(start)[:6])
            now_dt = datetime.datetime(*time.gmtime(now)[:6])
            uptime = now_dt - start_dt
            if _total_seconds(uptime) < 0: # system time set back
                uptime = datetime.timedelta(0)
            desc = 'pid %s, uptime %s' % (info['pid'], uptime)

        elif state in (ProcessStates.FATAL, ProcessStates.BACKOFF):
            desc = info['spawnerr']
            if not desc:
                desc = 'unknown error (try "tail %s")' % info['name']

        elif state in (ProcessStates.STOPPED, ProcessStates.EXITED):
            if info['start']:
                stop = info['stop']
                stop_dt = datetime.datetime(*time.localtime(stop)[:7])
                desc = stop_dt.strftime('%b %d %I:%M %p')
            else:
                desc = 'Not started'

        else:
            desc = ''

        return desc

    def getProcessInfo(self, name):
        """ Get info about a process named name

        @param string name The name of the process (or 'group:name')
        @return struct result     A structure containing data about the process
        """
        self._update('getProcessInfo')

        group, process = self._getGroupAndProcess(name)

        if process is None:
            raise RPCError(Faults.BAD_NAME, name)

        # TODO timestamps are returned as xml-rpc integers for b/c but will
        # saturate the xml-rpc integer type in jan 2038 ("year 2038 problem").
        # future api versions should return timestamps as a different type.
        start = capped_int(process.laststart)
        stop = capped_int(process.laststop)
        now = capped_int(self._now())

        state = process.get_state()
        spawnerr = process.spawnerr or ''
        exitstatus = process.exitstatus or 0
        stdout_logfile = process.config.stdout_logfile or ''
        stderr_logfile = process.config.stderr_logfile or ''

        info = {
            'name':process.config.name,
            'group':group.config.name,
            'start':start,
            'stop':stop,
            'now':now,
            'state':state,
            'statename':getProcessStateDescription(state),
            'spawnerr':spawnerr,
            'exitstatus':exitstatus,
            'logfile':stdout_logfile, # b/c alias
            'stdout_logfile':stdout_logfile,
            'stderr_logfile':stderr_logfile,
            'pid':process.pid,
            }

        description = self._interpretProcessInfo(info)
        info['description'] = description
        return info

    def _now(self): # pragma: no cover
        # this is here to service stubbing in unit tests
        return time.time()

    def getAllProcessInfo(self):
        """ Get info about all processes

        @return array result  An array of process status results
        """
        self._update('getAllProcessInfo')

        all_processes = self._getAllProcesses(lexical=True)

        output = []
        for group, process in all_processes:
            name = make_namespec(group.config.name, process.config.name)
            output.append(self.getProcessInfo(name))
        return output

    def _readProcessLog(self, name, offset, length, channel):
        group, process = self._getGroupAndProcess(name)

        if process is None:
            raise RPCError(Faults.BAD_NAME, name)

        logfile = getattr(process.config, '%s_logfile' % channel)

        if logfile is None or not os.path.exists(logfile):
            raise RPCError(Faults.NO_FILE, logfile)

        try:
            return as_string(readFile(logfile, int(offset), int(length)))
        except ValueError as inst:
            why = inst.args[0]
            raise RPCError(getattr(Faults, why))

    def readProcessStdoutLog(self, name, offset, length):
        """ Read length bytes from name's stdout log starting at offset

        @param string name        the name of the process (or 'group:name')
        @param int offset         offset to start reading from.
        @param int length         number of bytes to read from the log.
        @return string result     Bytes of log
        """
        self._update('readProcessStdoutLog')
        return self._readProcessLog(name, offset, length, 'stdout')

    readProcessLog = readProcessStdoutLog # b/c alias

    def readProcessStderrLog(self, name, offset, length):
        """ Read length bytes from name's stderr log starting at offset

        @param string name        the name of the process (or 'group:name')
        @param int offset         offset to start reading from.
        @param int length         number of bytes to read from the log.
        @return string result     Bytes of log
        """
        self._update('readProcessStderrLog')
        return self._readProcessLog(name, offset, length, 'stderr')

    def _tailProcessLog(self, name, offset, length, channel):
        group, process = self._getGroupAndProcess(name)

        if process is None:
            raise RPCError(Faults.BAD_NAME, name)

        logfile = getattr(process.config, '%s_logfile' % channel)

        if logfile is None or not os.path.exists(logfile):
            return ['', 0, False]

        return tailFile(logfile, int(offset), int(length))

    def tailProcessStdoutLog(self, name, offset, length):
        """
        Provides a more efficient way to tail the (stdout) log than
        readProcessStdoutLog().  Use readProcessStdoutLog() to read
        chunks and tailProcessStdoutLog() to tail.

        Requests (length) bytes from the (name)'s log, starting at
        (offset).  If the total log size is greater than (offset +
        length), the overflow flag is set and the (offset) is
        automatically increased to position the buffer at the end of
        the log.  If less than (length) bytes are available, the
        maximum number of available bytes will be returned.  (offset)
        returned is always the last offset in the log +1.

        @param string name         the name of the process (or 'group:name')
        @param int offset          offset to start reading from
        @param int length          maximum number of bytes to return
        @return array result       [string bytes, int offset, bool overflow]
        """
        self._update('tailProcessStdoutLog')
        return self._tailProcessLog(name, offset, length, 'stdout')

    tailProcessLog = tailProcessStdoutLog # b/c alias

    def tailProcessStderrLog(self, name, offset, length):
        """
        Provides a more efficient way to tail the (stderr) log than
        readProcessStderrLog().  Use readProcessStderrLog() to read
        chunks and tailProcessStderrLog() to tail.

        Requests (length) bytes from the (name)'s log, starting at
        (offset).  If the total log size is greater than (offset +
        length), the overflow flag is set and the (offset) is
        automatically increased to position the buffer at the end of
        the log.  If less than (length) bytes are available, the
        maximum number of available bytes will be returned.  (offset)
        returned is always the last offset in the log +1.

        @param string name         the name of the process (or 'group:name')
        @param int offset          offset to start reading from
        @param int length          maximum number of bytes to return
        @return array result       [string bytes, int offset, bool overflow]
        """
        self._update('tailProcessStderrLog')
        return self._tailProcessLog(name, offset, length, 'stderr')

    def clearProcessLogs(self, name):
        """ Clear the stdout and stderr logs for the named process and
        reopen them.

        @param string name   The name of the process (or 'group:name')
        @return boolean result      Always True unless error
        """
        self._update('clearProcessLogs')

        group, process = self._getGroupAndProcess(name)

        if process is None:
            raise RPCError(Faults.BAD_NAME, name)

        try:
            # implies a reopen
            process.removelogs()
        except (IOError, OSError):
            raise RPCError(Faults.FAILED, name)

        return True

    clearProcessLog = clearProcessLogs # b/c alias

    def clearAllProcessLogs(self):
        """ Clear all process log files

        @return array result   An array of process status info structs
        """
        self._update('clearAllProcessLogs')
        results  = []
        callbacks = []

        all_processes = self._getAllProcesses()

        for group, process in all_processes:
            callbacks.append((group, process, self.clearProcessLog))

        def clearall():
            if not callbacks:
                return results

            group, process, callback = callbacks.pop(0)
            name = make_namespec(group.config.name, process.config.name)
            try:
                callback(name)
            except RPCError as e:
                results.append(
                    {'name':process.config.name,
                     'group':group.config.name,
                     'status':e.code,
                     'description':e.text})
            else:
                results.append(
                    {'name':process.config.name,
                     'group':group.config.name,
                     'status':Faults.SUCCESS,
                     'description':'OK'}
                    )

            if callbacks:
                return NOT_DONE_YET

            return results

        clearall.delay = 0.05
        clearall.rpcinterface = self
        return clearall # deferred

    def sendProcessStdin(self, name, chars):
        """ Send a string of chars to the stdin of the process name.
        If non-7-bit data is sent (unicode), it is encoded to utf-8
        before being sent to the process' stdin.  If chars is not a
        string or is not unicode, raise INCORRECT_PARAMETERS.  If the
        process is not running, raise NOT_RUNNING.  If the process'
        stdin cannot accept input (e.g. it was closed by the child
        process), raise NO_FILE.

        @param string name        The process name to send to (or 'group:name')
        @param string chars       The character data to send to the process
        @return boolean result    Always return True unless error
        """
        self._update('sendProcessStdin')

        if not isinstance(chars, (str, bytes, unicode)):
            raise RPCError(Faults.INCORRECT_PARAMETERS, chars)

        chars = as_bytes(chars)

        group, process = self._getGroupAndProcess(name)

        if process is None:
            raise RPCError(Faults.BAD_NAME, name)

        if not process.pid or process.killing:
            raise RPCError(Faults.NOT_RUNNING, name)

        try:
            process.write(chars)
        except OSError as why:
            if why.args[0] == errno.EPIPE:
                raise RPCError(Faults.NO_FILE, name)
            else:
                raise

        return True

    def sendRemoteCommEvent(self, type, data):
        """ Send an event that will be received by event listener
        subprocesses subscribing to the RemoteCommunicationEvent.

        @param  string  type  String for the "type" key in the event header
        @param  string  data  Data for the event body
        @return boolean       Always return True unless error
        """
        if isinstance(type, unicode):
            type = type.encode('utf-8')
        if isinstance(data, unicode):
            data = data.encode('utf-8')

        notify(
            RemoteCommunicationEvent(type, data)
        )

        return True

def _total_seconds(timedelta):
    return ((timedelta.days * 86400 + timedelta.seconds) * 10**6 +
                timedelta.microseconds) / 10**6

def make_allfunc(processes, predicate, func, **extra_kwargs):
    """ Return a closure representing a function that calls a
    function for every process, and returns a result """

    callbacks = []
    results = []

    def allfunc(
        processes=processes,
        predicate=predicate,
        func=func,
        extra_kwargs=extra_kwargs,
        callbacks=callbacks, # used only to fool scoping, never passed by caller
        results=results, # used only to fool scoping, never passed by caller
        ):

        if not callbacks:

            for group, process in processes:
                name = make_namespec(group.config.name, process.config.name)
                if predicate(process):
                    try:
                        callback = func(name, **extra_kwargs)
                    except RPCError as e:
                        results.append({'name':process.config.name,
                                        'group':group.config.name,
                                        'status':e.code,
                                        'description':e.text})
                        continue
                    if isinstance(callback, types.FunctionType):
                        callbacks.append((group, process, callback))
                    else:
                        results.append(
                            {'name':process.config.name,
                             'group':group.config.name,
                             'status':Faults.SUCCESS,
                             'description':'OK'}
                            )

        if not callbacks:
            return results

        for struct in callbacks[:]:

            group, process, cb = struct

            try:
                value = cb()
            except RPCError as e:
                results.append(
                    {'name':process.config.name,
                     'group':group.config.name,
                     'status':e.code,
                     'description':e.text})
                callbacks.remove(struct)
            else:
                if value is not NOT_DONE_YET:
                    results.append(
                        {'name':process.config.name,
                         'group':group.config.name,
                         'status':Faults.SUCCESS,
                         'description':'OK'}
                        )
                    callbacks.remove(struct)

        if callbacks:
            return NOT_DONE_YET

        return results

    # XXX the above implementation has a weakness inasmuch as the
    # first call into each individual process callback will always
    # return NOT_DONE_YET, so they need to be called twice.  The
    # symptom of this is that calling this method causes the
    # client to block for much longer than it actually requires to
    # kill all of the running processes.  After the first call to
    # the killit callback, the process is actually dead, but the
    # above killall method processes the callbacks one at a time
    # during the select loop, which, because there is no output
    # from child processes after e.g. stopAllProcesses is called,
    # is not busy, so hits the timeout for each callback.  I
    # attempted to make this better, but the only way to make it
    # better assumes totally synchronous reaping of child
    # processes, which requires infrastructure changes to
    # supervisord that are scary at the moment as it could take a
    # while to pin down all of the platform differences and might
    # require a C extension to the Python signal module to allow
    # the setting of ignore flags to signals.
    return allfunc

def isRunning(process):
    return process.get_state() in RUNNING_STATES

def isNotRunning(process):
    return not isRunning(process)

def isSignallable(process):
    if process.get_state() in SIGNALLABLE_STATES:
        return True

# this is not used in code but referenced via an entry point in the conf file
def make_main_rpcinterface(supervisord):
    return SupervisorNamespaceRPCInterface(supervisord)


Anon7 - 2022
AnonSec Team