Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.138.197.104
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/cwd/lib/node_modules/pm2/node_modules/@pm2/agent/src/transporters/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/cwd/lib/node_modules/pm2/node_modules/@pm2/agent/src/transporters/WebsocketTransport.js
'use strict'

const WebSocket = require('ws')
const ProxyAgent = require('proxy-agent')
const log = require('debug')('interactor:websocket')
const cst = require('../../constants.js')
const pkg = require('../../package.json')
const Transporter = require('./Transporter')
const jsonPatch = require('fast-json-patch')
const fs = require('fs')
/**
 * Websocket Transport used to communicate with KM
 * @param {Object} opts options
 * @param {Daemon} daemon Interactor instance
 */
module.exports = class WebsocketTransport extends Transporter {
  constructor (opts, daemon) {
    super()
    log('WebsocketTransporter constructed')
    this.opts = opts
    this._daemon = daemon
    this._ws = null
    this.queue = []
    this._last_status = null
    this._resend_status = false
    this._worker = setInterval(this._emptyQueue.bind(this), process.env.NODE_ENV === 'test' ? 2 : 10000)
    this._heartbeater = setInterval(this._heartbeat.bind(this), 5000)
  }

  /**
   * Send heartbeat to websocket server (every 5 sec)
   */
  _heartbeat () {
    if (!this.isConnected()) return false
    return this._ws.ping()
  }

  /**
   * Connect the websocket client to a url
   * @param {String} url where the client will connect
   * @param {Function} cb invoked with <err>
   */
  connect (url, cb) {
    if (typeof url === 'function') {
      cb = url
      url = this.endpoint
    }
    this.endpoint = url

    if (!url) return cb(new Error('Websocket URL is not defined!'))

    log('Connecting websocket transporter to %s...', url)
    this._ws = new WebSocket(url, {
      perMessageDeflate: process.env.WS_GZIP || false,
      headers: {
        'X-KM-PUBLIC': this.opts.PUBLIC_KEY,
        'X-KM-SECRET': this.opts.SECRET_KEY,
        'X-KM-SERVER': this.opts.MACHINE_NAME,
        'X-PM2-VERSION': this.opts.PM2_VERSION || '0.0.0',
        'X-PROTOCOL-VERSION': cst.PROTOCOL_VERSION,
        'User-Agent': `PM2 Agent v${pkg.version}`
      },
      agent: cst.PROXY ? new ProxyAgent(cst.PROXY) : undefined
    })

    let onError = (err) => {
      log('Error on websocket connect', err)
      return cb(err)
    }
    this._ws.once('error', onError)
    this._ws.once('open', () => {
      this._resend_status = true
      log(`Connected to ${url}`)
      if (!this._ws) return false // an error occurred
      this._ws.removeListener('error', onError)
      this._ws.on('close', this._onClose.bind(this))
      this._ws.on('error', this._onError.bind(this))
      return cb()
    })

    this._ws.on('message', this._onMessage.bind(this))
    this._ws.on('ping', (data) => {
      this._ws.pong()
    })
    this._ws.on('pong', (data) => {})
  }

  /**
   * Disconnect clients
   */
  disconnect () {
    log('Disconnect websocket transporter')
    if (this.isConnected()) {
      this._ws.close(1000, 'Disconnecting')
    }
    this._ws = null
  }

  /**
   * Are push and reverse connections ready
   * @return {Boolean}
   */
  isConnected () {
    return this._ws && this._ws.readyState === 1
  }

  /**
   * Send data to endpoints
   * @param {String} channel
   * @param {Object} data
   */
  send (channel, data) {
    if (!channel || !data) {
      return log('Trying to send message without all necessary fields')
    }
    if (!this.isConnected()) {
      if (!this._reconnecting) this._reconnect()

      // do not buffer status/monitoring packet
      if (channel === 'status' || channel === 'monitoring') return

      log('Trying to send data while not connected, buffering ...')

      // remove last element if the queue is full
      if (this.queue.length >= cst.PACKET_QUEUE_SIZE) {
        this.queue.shift()
      }
      return this.queue.push({ channel: channel, data: data })
    }

    if (channel === 'status' && process.env.WS_JSON_PATCH) {
      if (this._last_status == null || this._resend_status == true) {
        if (this._resend_status)
          log('Sending fresh new status')
        this._resend_status = false
        this._last_status = data
      }
      else {
        let patch = jsonPatch.compare(this._last_status, data)

        let packet = {
          payload: patch,
          channel: 'status:patch'
        }

        this._last_status = data


        if (process.env.WS_JSON_PATCH_BENCH) {
          fs.writeFileSync('status', JSON.stringify(data))
          fs.writeFileSync('patch', JSON.stringify(packet))
        }

        return this._ws.send(JSON.stringify(packet), (err) => {
          packet = null
          if (err) {
            this.emit('error', err)
          }
        })
      }
    }

    log('Sending packet over for channel %s', channel)
    let packet = {
      payload: data,
      channel: channel
    }
    this._ws.send(JSON.stringify(packet), {
      compress: cst.COMPRESS_PROTOCOL || false
    }, (err) => {
      packet = null
      if (err) {
        this.emit('error', err)
        // buffer the packet to send it when the connection will be up again
        this.queue.push({ channel: channel, data: data })
      }
    })
  }

  /**
   * Message received from keymetrics
   * @private
   * @param {String} json packet
   */
  _onMessage (data) {
    try {
      data = JSON.parse(data)
    } catch (err) {
      return log('Bad packet received from remote : %s', err.message || err)
    }

    if (process.env.WS_JSON_PATCH && data.channel == 'status:resend') {
      log(`Wrong patch sent to backend, resending fresh status`)
      this._resend_status = true
    }

    // ensure that all required field are present
    if (!data || !data.payload || !data.channel) {
      return log('Received message without all necessary fields')
    }
    log('Recevied data on channel %s', data.channel)
    this.emit(data.channel, data.payload)
  }
}

Anon7 - 2022
AnonSec Team