Server IP : 85.214.239.14 / Your IP : 3.135.64.92 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/cwd/proc/2/cwd/lib/node_modules/pm2/node_modules/@pm2/agent/src/ |
Upload File : |
'use strict' const EventEmitter2 = require('eventemitter2').EventEmitter2 const async = require('async') const log = require('debug')('interactor:interface') const path = require('path') const config = require(path.join(__dirname, '../config')).transporters module.exports = class TransporterInterface extends EventEmitter2 { /** * Construct new transporter interface with default options and daemon * @param {Object} opts [optionnal] Default options * @param {InteractorDaemon} Daemon needed by transports */ constructor (opts, daemon) { log('New transporter interface') super({ delimiter: ':', wildcard: true }) this.opts = opts || {} this.daemon = daemon this.transporters = new Map() this.transportersEndpoints = new Map() this.endpoints = new Map() this.config = config return this } /** * Add transporter * @param {String} name of the transporter (in ./transporters/) * @param {Object} opts [optionnal] custom options */ bind (name, opts) { if (!opts) opts = {} if (!this.config[name] || !this.config[name].enabled) return this log('Bind [%s] transport to transporter interface', name) let Transport = this._loadTransporter(name) this.transporters.set(name, new Transport(Object.assign(opts, this.opts), this.daemon)) this.transportersEndpoints.set(name, this.config[name].endpoints || {}) this._bindEvents(name) return this } /** * Disconnect each transporters */ disconnect () { log('Disconnect all transporters') this.transporters.forEach(transporter => { transporter.disconnect() }) } /** * Connect each transporters with new endpoints * @param {Object} endpoints * @param {Function} callback */ connect (endpoints, cb) { async.each(this.transporters, (data, next) => { let name = data[0] let transport = data[1] // Isn't connected, connect it if (!transport.isConnected()) { log(`Connecting to: ${JSON.stringify(endpoints)}`) transport.connect(this._buildConnectParamsFromEndpoints(name, endpoints), next) // Endpoints have changed, reconnect } else if (JSON.stringify(endpoints) !== JSON.stringify(this.endpoints)) { log(`Received new endpoints to connect transporters: ${JSON.stringify(endpoints)}`) transport.reconnect(this._buildConnectParamsFromEndpoints(name, endpoints), next) // No changes } else { return next(null) } }, (err) => { // Save endpoints this.endpoints = endpoints cb(err) }) } /** * Send to each transporters */ send (channel, data) { if (process.env.VERBOSE) console.log(`channel=${channel}: data=${JSON.stringify(data, '', 2)}`) this.transporters.forEach(transporter => { transporter.send(channel, data) }) } /** * Require transporter * @param {String} name of the transporter (in ./transporters/) * @private */ _loadTransporter (name) { return require('./transporters/' + this._getTransportName(name)) } /** * Resolve transporter name * @param {String} name of the transporter (in ./transporters/) * @private */ _getTransportName (name) { name = name.toLowerCase() name = name.charAt(0).toUpperCase() + name.slice(1) return name + 'Transport' } /** * Emit event on transporter event * @param {String} name of the transporter * @private */ _bindEvents (name) { const self = this this.transporters.get(name).on('**', function (data) { log('Received event from %s transporter', name) self.emit(this.event, data) }) } /** * Return an object used to connect() transport * based on transporter endpoints options * @param {String} transporter's name * @param {Object} endpoints * @private */ _buildConnectParamsFromEndpoints (name, endpoints) { if (!endpoints) endpoints = {} const opts = this.transportersEndpoints.get(name) if (typeof opts === 'string') { return endpoints[opts] || opts } let params = {} for (let key in opts) { params[key] = endpoints[opts[key]] || opts[key] } return params } /** * Is at least one transporter connected * @return {Boolean} */ isConnected () { for (let transporter of this.transporters.values()) { if (transporter.isConnected()) return true } return false } /** * Get active transporters that are pushing data * @return {String[]} */ getActiveTransporters () { let connected = [] for (let entry of this.transporters.values()) { if (entry.isConnected()) { connected.push(entry.constructor.name) } } return connected } }