Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.191.200.36
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/2/task/2/cwd/usr/lib/node_modules/pm2/node_modules/@pm2/agent/src/push/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/2/task/2/cwd/usr/lib/node_modules/pm2/node_modules/@pm2/agent/src/push/PushInteractor.js
'use strict'

const debug = require('debug')('interactor:push')
const fs = require('fs')
const path = require('path')
const cst = require('../../constants.js')
const DataRetriever = require('./DataRetriever.js')
const Utility = require('../Utility.js')
const Aggregator = require('./TransactionAggregator.js')

/**
 * PushInteractor is the class that handle pushing data to KM
 * @param {Object} opts interactor options
 * @param {PM2Client} ipm2 pm2 daemon client used to listen on bus
 * @param {WebsocketTransport} transport websocket transport used to send data to KM
 */
module.exports = class PushInteractor {
  constructor (opts, ipm2, transport) {
    this._ipm2 = ipm2
    this.transport = transport
    this.opts = opts
    this.log_buffer = {}
    this.processes = new Map() // Key is process name, value is pm2 env
    this.broadcast_logs = new Map() // key is process name, value is true or false
    this.ip_interval_counter = 60
    debug('Push interactor constructed')
    this._cacheFS = new Utility.Cache({
      miss: function (key) {
        try {
          const content = fs.readFileSync(path.resolve(key))
          return content.toString().split(/\r?\n/)
        } catch (err) {
          return debug('Error while trying to get file from FS : %s', err.message || err)
        }
      },
      ttl: 60 * 30
    })
    this._stackParser = new Utility.StackTraceParser({ cache: this._cacheFS, context: cst.CONTEXT_ON_ERROR })
    // // start transaction aggregator
    this.aggregator = new Aggregator(this)
  }

  /**
   * Start the interactor by starting all workers and listeners
   */
  start () {
    // stop old running task
    if (this._worker_executor !== undefined) {
      this.stop()
    }
    debug('Push interactor started')
    this._worker_executor = setInterval(this._worker.bind(this), cst.STATUS_INTERVAL)
    this._ipm2.bus.on('*', this._onPM2Event.bind(this))
  }

  /**
   * Stop the interactor by removing all running worker and listeners
   */
  stop () {
    debug('Push interactor stopped')
    if (this._worker_executor !== undefined) {
      clearInterval(this._worker_executor)
      this._worker_executor = null
    }
    if (this._cacheFS._worker !== undefined) {
      clearInterval(this._cacheFS._worker)
      this._cacheFS._worker = null
    }
  }

  /**
   * Listener of pm2 bus
   * @param {String} event channel
   * @param {Object} packet data
   */
  _onPM2Event (event, packet) {
    debug('New PM2 event %s', event)
    if (event === 'axm:action') return false
    if (!packet.process) return debug('No process field [%s]', event)

    // Drop transitional state processes (_old_*)
    if (packet && packet.process && packet.process.pm_id && typeof packet.process.pm_id === 'string' &&
        packet.process.pm_id.indexOf('_old') > -1) return false
    if (this.processes.get(packet.process.name) && this.processes.get(packet.process.name)._km_monitored === false) return false

    // bufferize logs
    if (event.match(/^log:/)) {
      if (!this.log_buffer[packet.process.name]) {
        this.log_buffer[packet.process.name] = []
      }
      // delete the last one if too long
      if (this.log_buffer[packet.process.name].length >= cst.LOGS_BUFFER) {
        this.log_buffer[packet.process.name].shift()
      }
      // push the log data
      this.log_buffer[packet.process.name].push(packet.data)

      // don't send logs if not enabled
      if (!global._logs && !this.broadcast_logs.get(packet.process.pm_id)) return false
      // disabled logs anyway
      if (!this.processes.has(packet.process.name) || this.processes.get(packet.process.name).send_logs === false) return false
    }

    // attach additional info for exception
    if (event === 'process:exception' && cst.ENABLE_CONTEXT_ON_ERROR === true) {
      packet.data.last_logs = this.log_buffer[packet.process.name]
      packet.data = this._stackParser.attachContext(packet.data)
    }

    if (event === 'axm:reply' && packet.data && packet.data.return && (packet.data.return.heapdump || packet.data.return.cpuprofile || packet.data.return.heapprofile)) {
      return this._sendFile(packet)
    }

    if (event === 'human:event') {
      packet.name = packet.data.__name
      delete packet.data.__name
    }

    // Normalize data
    packet.process = {
      pm_id: packet.process.pm_id,
      name: packet.process.name,
      rev: packet.process.rev || ((packet.process.versioning && packet.process.versioning.revision) ? packet.process.versioning.revision : null),
      server: this.opts.MACHINE_NAME
    }

    // agregate transaction data before sending them
    if (event.indexOf('axm:trace') > -1) return this.aggregator.aggregate(packet)

    if (event.match(/^log:/)) {
      packet.log_type = event.split(':')[1]
      event = 'logs'
    }

    return this.transport.send(event, packet)
  }

  /**
   * Worker function that will retrieve process metadata and send them to KM
   */
  _worker () {
    if (!this._ipm2.rpc || !this._ipm2.rpc.getMonitorData)
      return debug('Cant access to getMonitorData RPC PM2 method')

    if (this.ip_interval_counter-- <= 0) {
      this.opts.internal_ip = Utility.network.getIP('v4')
      this.ip_interval_counter = 60
    }

    this._ipm2.rpc.getMonitorData({}, (err, processes) => {
      if (err) {
        return console.error(err || 'Cant access to getMonitorData RPC PM2 method')
      }

      // set broadcast logs
      processes.forEach((process) => {
        this.processes.set(process.name, process.pm2_env)
        this.broadcast_logs.set(process.pm_id, process.pm2_env.broadcast_logs == 1 || process.pm2_env.broadcast_logs == 'true') // eslint-disable-line
      })

      processes = processes.filter(process => process.pm2_env._km_monitored !== false)

      // send data
      this.transport.send('status', {
        data: DataRetriever.status(processes, this.opts),
        server_name: this.opts.MACHINE_NAME,
        internal_ip: this.opts.internal_ip
      })
    })
  }

  /**
   * Handle packet containing file metadata to send to KM
   */
  _sendFile (packet) {
    const filePath = JSON.parse(JSON.stringify(packet.data.return.dump_file))
    let type = null
    if (packet.data.return.heapdump) {
      type = 'heapdump'
    } else if (packet.data.return.heapprofile) {
      type = 'heapprofile'
    } else if (packet.data.return.cpuprofile) {
      type = 'cpuprofile'
    } else {
      return debug(`Invalid profiling packet: ${JSON.stringify(packet)}`)
    }
    debug('Send file for %s', type)

    packet = {
      pm_id: packet.process.pm_id,
      name: packet.process.name,
      server_name: this.opts.MACHINE_NAME,
      public_key: this.opts.PUBLIC_KEY,
      type: type
    }
    packet[type] = true

    fs.readFile(filePath, (err, data) => {
      if (err) return debug(err)
      fs.unlink(filePath, debug)
      packet.data = data.toString('utf-8')
      return this.transport.send('profiling', packet)
    })
  }
}

Anon7 - 2022
AnonSec Team