Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.118.186.100
Web Server : Apache/2.4.61 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /sbin/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /sbin/amavis-services
#!/usr/bin/perl -T
# SPDX-License-Identifier: BSD-2-Clause

#------------------------------------------------------------------------------
# This is amavis-services, a set of supervisor processes for amavis.
#
# Author: Mark Martinec <Mark.Martinec@ijs.si>
#
# Copyright (c) 2012-2018, Mark Martinec
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright notice,
#    this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of the Jozef Stefan Institute.

# (the above license is the 2-clause BSD license, also known as
#  a "Simplified BSD License", and pertains to this program only)
#
# Patches and problem reports are welcome.
# The latest version of this program is available at:
#   http://www.ijs.si/software/amavisd/
#------------------------------------------------------------------------------

use strict;
use re 'taint';
use warnings;
use warnings FATAL => qw(utf8 void);
no warnings 'uninitialized';

use vars qw($VERSION);  $VERSION = 2.011002;

use Errno qw(ESRCH ENOENT);
use POSIX qw(strftime);
use Time::HiRes ();
use Unix::Syslog qw(:macros :subs);

use vars qw($myproduct_name $myversion_id $myversion_date $myversion);
use vars qw($MYHOME $idle_ttl $active_ttl $log_level);
use vars qw($syslog_ident $syslog_facility);
use vars qw($inner_sock_specs $outer_sock_specs $snmp_sock_specs);
BEGIN {
  $myproduct_name = 'amavis-services';
  $myversion_id = '2.11.2'; $myversion_date = '20170602';
  $myversion = "$myproduct_name-$myversion_id ($myversion_date)";
}


### USER CONFIGURABLE:

$log_level = 0;  # 0..5
$syslog_facility = LOG_MAIL;
$syslog_ident = $myproduct_name;

$MYHOME = '/var/lib/amavis';

# A socket to which amavisd child processes report their data.
# should match one of the sockets in @zmq_sockets in amavisd.conf
$inner_sock_specs = "ipc://$MYHOME/amavisd-zmq.sock";

# A socket to which we forward summarized amavisd data.
# should match a socket of the same name in amavis-status
$outer_sock_specs = "tcp://127.0.0.1:23232";

# A socket on which we accept SNMP queries and respond to.
# should match a socket of the same name in amavisd-snmp-subagent-zmq
$snmp_sock_specs = "tcp://127.0.0.1:23233";  # tcp://*:23233

$idle_ttl = 4*60*60;  # idle children are sent a SIGTERM
                      #   after this many seconds
$active_ttl = 15*60;  # stuck active children are sent a SIGTERM
                      #   after this many seconds

### END OF USER CONFIGURABLE


use vars qw(@age_slots);
BEGIN {
  @age_slots = (
    0.1,    0.2,    0.5,
    1,      2,      4,      8,      15,      30,        # seconds
    1*60,   2*60,   4*60,   8*60,   15*60,   30*60,     # minutes
    1*3600, 2*3600, 4*3600, 8*3600, 15*3600, 30*3600);  # hours
}

use vars qw($zmq_mod_name $zmq_mod_version $zmq_lib_version);
BEGIN {
  my($zmq_major, $zmq_minor, $zmq_patch);
  if (eval { require ZMQ::LibZMQ3 && require ZMQ::Constants }) {
    $zmq_mod_name = 'ZMQ::LibZMQ3';  # new interface module to zmq v3 or libxs
    import ZMQ::LibZMQ3;  import ZMQ::Constants qw(:all);
    ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ3::zmq_version();
  # *zmq_sendmsg   [native]                   # (socket,msgobj,flags)
  # *zmq_recvmsg   [native]                   # (socket,flags) -> msgobj
    *zmq_sendstr = sub {                      # (socket,string,flags)
      my $rv = zmq_send($_[0], $_[1], length $_[1], $_[2]||0);
      $rv == -1 ? undef : $rv;
    };
  } elsif (eval { require ZMQ::LibZMQ2 && require ZMQ::Constants }) {
    $zmq_mod_name = 'ZMQ::LibZMQ2';  # new interface module to zmq v2
    import ZMQ::LibZMQ2;  import ZMQ::Constants qw(:all);
    ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ2::zmq_version();
    # zmq v2/v3 incompatibile renaming
    *zmq_sendmsg = \&ZMQ::LibZMQ2::zmq_send;  # (socket,msgobj,flags)
    *zmq_recvmsg = \&ZMQ::LibZMQ2::zmq_recv;  # (socket,flags) -> msgobj
    *zmq_sendstr = sub {                      # (socket,string,flags)
      my $rv = zmq_send(@_);  $rv == -1 ? undef : $rv;
    };
  } elsif (eval { require ZeroMQ::Constants && require ZeroMQ::Raw }) {
    $zmq_mod_name = 'ZeroMQ';  # old interface module to zmq v2
    import ZeroMQ::Raw;  import ZeroMQ::Constants qw(:all);
    ($zmq_major, $zmq_minor, $zmq_patch) = ZeroMQ::version();
    # zmq v2/v3 incompatibile renaming
    *zmq_sendmsg = \&ZeroMQ::Raw::zmq_send;   # (socket,msgobj,flags)
    *zmq_recvmsg = \&ZeroMQ::Raw::zmq_recv;   # (socket,flags) -> msgobj
    *zmq_sendstr = sub {                      # (socket,string,flags)
      my $rv = zmq_send(@_);  $rv == -1 ? undef : $rv;
    };
  } else {
    die "Perl modules ZMQ::LibZMQ3 or ZMQ::LibZMQ2 or ZeroMQ not available\n";
  }
  $zmq_mod_version = $zmq_mod_name->VERSION;
  $zmq_lib_version = join('.', $zmq_major, $zmq_minor, $zmq_patch);
  1;
}

sub zmq_version {
  sprintf("%s %s, lib %s",
          $zmq_mod_name, $zmq_mod_version, $zmq_lib_version);
};

sub zmq_recvstr {               # (socket,buffer,offset) -> (len,more)
  my $sock = $_[0];
  my $offset = $_[2] || 0;
  my $zm = zmq_recvmsg($sock);  # a copy of a received msg obj
  if (!$zm) { substr($_[1],$offset) = ''; return }
  ($offset ? substr($_[1],$offset) : $_[1]) = zmq_msg_data($zm);
  my $len = length($_[1]) - $offset;
  zmq_msg_close($zm);
  return $len  if !wantarray;
  my $more = zmq_getsockopt($sock, ZMQ_RCVMORE);
  if ($more == -1) { substr($_[1],$offset) = ''; return }
  ($len, $more);
};

my($interrupted, $syslog_open, $foreground);
my($zmq_ctx, $inner_sock, $outer_sock, $snmp_sock);

my $zmq_poll_units = 1000;  # milliseconds since zmq v3
$zmq_poll_units *= 1000  if $zmq_lib_version =~ /^[012]\./;  # microseconds


# Return untainted copy of a string (argument can be a string or a string ref)
#
sub untaint($) {
  return undef  if !defined $_[0];  # must return undef even in a list context!
  no re 'taint';
  local $1;  # avoids Perl taint bug: tainted global $1 propagates taintedness
  (ref($_[0]) ? ${$_[0]} : $_[0]) =~ /^(.*)\z/s;
  $1;
}

sub ll($) {
  my($level) = @_;
  $level <= $log_level;
}

sub do_log($$;@) {
# my($level,$errmsg,@args) = @_;
  my $level = shift;
  if ($level <= $log_level) {
    my $errmsg = shift;
    # treat $errmsg as sprintf format string if additional arguments provided
    $errmsg = sprintf($errmsg,@_)  if @_;
    if (!$syslog_open) {
      $errmsg .= "\n";
      print STDERR $errmsg;  # print ignoring I/O status, except SIGPIPE
    } else {
      my $prio = $level >=  3 ? LOG_DEBUG  # most frequent first
               : $level >=  1 ? LOG_INFO
               : $level >=  0 ? LOG_NOTICE
               : $level >= -1 ? LOG_WARNING
               : $level >= -2 ? LOG_ERR
               :                LOG_CRIT;
      syslog($prio, "%s", $errmsg);
    }
  }
}

sub process_message($$) {
  my($process_states_ref,$msgstr_ref) = @_;
  if (!$msgstr_ref || !defined $$msgstr_ref) {
    # should not happen (except on a failure of zmq_recvmsg)
  } elsif ($$msgstr_ref =~ /^am\.st \d+\s+/s) {
    my($subscription_chan, $pid, $time, $state, $task_id) =
      split(' ', $$msgstr_ref);
    if ($state eq 'FLUSH') {
      do_log(0, "childproc_minder: FLUSH process states");
      %$process_states_ref = ();  # flush all kept state (e.g. on a restart)
    } elsif ($state eq 'exiting' || $state eq 'purged') {
      delete $process_states_ref->{$pid};  # may or may not exist
    } else {
      $state = ' ' if $state eq '-';
      my $p = $process_states_ref->{$pid};
      if ($p) {
        $p->{state} = $state;
        $p->{task_id} = $task_id;
      } else {  # new process appeared
        $process_states_ref->{$pid} = $p = {
          state     => $state,
          task_id   => $task_id,
          timestamp => undef,
          base_timestamp => undef,
        };
      }
      my $now = Time::HiRes::time;
      if ($time > 1e9) {  # Unix time in seconds with fraction (> Y2000)
        $p->{base_timestamp} = $p->{timestamp} = $time;
      } elsif (!$p->{base_timestamp}) {  # delta time but no base
        $p->{timestamp} = $now;
        $p->{base_timestamp} = $p->{timestamp} - $time/1000;  # estimate
      } else {  # delta time since base_timestamp in ms
        $p->{timestamp} = $p->{base_timestamp} + $time/1000;
      }
      $p->{tick} = $now;
    }
  }
  1;
}

sub check_proc($) {
  my($process_states_ref) = @_;
  do_log(2, "CHECK");
  my $cnt_gone = 0;
  my $cnt_terminated = 0;
  while (my($pid,$p) = each %$process_states_ref) {
    my $now = Time::HiRes::time;
    my $age = $now - $p->{base_timestamp};
    my $idling = $p->{task_id} eq '' && $p->{state} =~ /^[. ]\z/s;
    my $overdue = $age > ($idling ? $idle_ttl : $active_ttl);
    my $n;  # number of checked processes (0 or 1)
    if (!$overdue && $now - $p->{tick} < 10) {
      $n = 1;  # recently heard from it, assume it is still there
      do_log(2, "PID %d skipped, recently heared from", $pid);
    } elsif (!$overdue && $idling &&
             $p->{last_checked_timestamp} &&
             $now - $p->{last_checked_timestamp} < 20) {
      $n = 1;  # recently checked, idle, assume it is still there
      do_log(2, "PID %d skipped, recently checked", $pid);
    } elsif (!$overdue && $p->{last_checked_timestamp} &&
             $now - $p->{last_checked_timestamp} < 10) {
      $n = 1;  # recently checked, busy, assume it is still there
      do_log(2, "PID %d skipped, recently checked", $pid);
    } else {
      do_log(2, "PID %d checking", $pid);
      $p->{last_checked_timestamp} = $now;
      $n = kill(0,$pid);  # test if the process is still there
      if ($n == 0) {
        # ESRCH means there is no such process
        if ($! != ESRCH) {
          do_log(-1, "Can't check the process %s: %s", $pid, $!);
        } elsif (defined $p->{sig_sent}) {
          $cnt_terminated++;
          do_log(2, "PID %d sucessfully terminated by SIG%s, %s",
                    $pid, $p->{sig_sent}, $p->{task_id} || $p->{state});
        } else {
          $cnt_gone++;
          do_log(-2, "PID %d went away, %s",
                     $pid, $p->{task_id} || $p->{state} );
        }
        delete $process_states_ref->{$pid};
        defined zmq_sendstr($inner_sock,
                            sprintf('am.st %s %014.3f purged', $pid,$now))
          or die "zmq_sendstr failed: $!"
      }
    }
    if ($n == 0) {
      # already dealt with
    } elsif (!$overdue) {  # life is good
      do_log(2, "PID %d: %s", $pid, $p->{task_id} || $p->{state} );
    } elsif (!$p->{sig_sent} ||
             $p->{sig_sent_timestamp} + $p->{sig_sent_retry_in} >= $now) {
      # overdue, terminate or kill, or retry the killing
      if (!$p->{sig_sent}) {
        $p->{sig_sent} = 'TERM';
        $p->{sig_sent_retry_in} = 20;
      } else {
        $p->{sig_sent} = 'KILL';
        $p->{sig_sent_retry_in} *= 1.5;  # increase the wait time for a retry
      }
      $p->{sig_sent_timestamp} = $now;
      if (kill($p->{sig_sent},$pid)) {
        do_log(2, "PID %d SIG%s, %s",
                  $pid, $p->{sig_sent}, $p->{task_id} || $p->{state});
      } elsif ($! == ESRCH) {
        # already gone by now, no fuss
      } else {
        warn "Can't $p->{sig_sent} the [$pid]: $!";
      }
      if ($p->{sig_sent_retry_in} > 600) {
        do_log(2, "Giving up on PID %d, %s",
                  $pid, $p->{task_id} || $p->{state});
        delete $process_states_ref->{$pid};
        defined zmq_sendstr($inner_sock,
                            sprintf('am.st %s %014.3f purged', $pid,$now))
          or die "zmq_sendstr failed: $!"
      }
    }
  }
  ($cnt_gone, $cnt_terminated);
}

# The childproc_minder process receives information about amavisd child
# processes from an outer socket, forwarded there by a forwarding process.
# Based on updates received, and based on its own processes sanity checks,
# it maintains a state in memory of each amavisd child process.
#
# It must run on the same host as amavisd child processes in order to be
# able to check for lost or crashed processes.
#
# It kills amavisd child processes which are active longer than $active_ttl
# seconds, or are idling for more than $idle_ttl seconds. For killed or lost
# processes it sends 'purged' messages to the inner socket, and periodically
# sends a list of amavisd child process PIDs to the inner socket for the
# benefit of more ephemeral clients.
#
sub childproc_minder() {
  do_log(5, "childproc-minder: zmq_init");
  $zmq_ctx = zmq_init(1);
  $zmq_ctx or die "Can't create ZMQ context: $!";

  do_log(5, "childproc-minder: creating outer ZMQ_SUB socket");
  $outer_sock = zmq_socket($zmq_ctx, ZMQ_SUB);
  $outer_sock or die "Can't create outer ZMQ socket: $!";

  do_log(5, "childproc-minder: zmq_setsockopt on outer socket");
  zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";
  my $outer_sock_ipv4only = 1;  # a ZMQ default
  if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) {
    zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY(), 0) != -1
      or die "zmq_setsockopt failed: $!";
    $outer_sock_ipv4only = 0;
  }
  zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";

  do_log(5, "childproc-minder: connecting to outer zmq socket %s%s",
            $outer_sock_specs, $outer_sock_ipv4only ? '' : ', IPv6 enabled');
  zmq_connect($outer_sock, $outer_sock_specs) != -1
    or die "zmq_connect to $outer_sock_specs failed: $!";

  do_log(5, "childproc-minder: creating inner ZMQ_PUB socket");
  $inner_sock = zmq_socket($zmq_ctx, ZMQ_PUB);
  $inner_sock or die "Can't create inner ZMQ socket: $!";

  do_log(5, "childproc-minder: zmq_setsockopt on inner socket");
  zmq_setsockopt($inner_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";

  my $inner_sock_ipv4only = 1;  # a ZMQ default
  if (defined &ZMQ_IPV4ONLY && $inner_sock_specs =~ /:[0-9a-f]*:/i) {
    zmq_setsockopt($inner_sock, ZMQ_IPV4ONLY(), 0) != -1
      or die "zmq_setsockopt IPV4ONLY failed: $!";
    $inner_sock_ipv4only = 0;
  }

  my $hwm = defined &ZMQ_SNDHWM ? ZMQ_SNDHWM()
          : defined &ZMQ_HWM    ? ZMQ_HWM() : undef;
  if (defined $hwm) {
    zmq_setsockopt($inner_sock, $hwm, 100) != -1
      or die "zmq_setsockopt HWM failed: $!";
  }

  do_log(5, "childproc-minder: connecting to inner zmq socket %s%s",
            $inner_sock_specs, $inner_sock_ipv4only ? '' : ', IPv6 enabled');
  zmq_connect($inner_sock, $inner_sock_specs) != -1
    or die "zmq_connect to $inner_sock_specs failed: $!";
  sleep 1;  # a crude way to avoid a "slow joiner" syndrome  #***

  my $last_checked = 0;
  my $last_proclist_sent = 0;
  my %process_states;  # associative array on pid

  do_log(5, "childproc-minder: sending FLUSH to inner zmq socket");
  my $now = Time::HiRes::time;
  defined zmq_sendstr($inner_sock,
                      sprintf('am.st %s %014.3f FLUSH', $$, $now))
    or die "zmq_sendstr failed: $!";

  do_log(5, "childproc-minder: entering event loop");
  for (;;) {
    zmq_poll(
      [
        { socket => $outer_sock,
          events => ZMQ_POLLIN,
          callback =>
            sub {
              my($msgstr, $msgstr_l, $more);
              for (;;) {
                ($msgstr_l,$more) = zmq_recvstr($outer_sock,$msgstr);
                defined $msgstr_l  or die "zmq_recvstr failed: $!";
                do_log(5, "childproc-minder: got %s", $msgstr);
                process_message(\%process_states,\$msgstr);
                last if !$more;
              }
            },
        },
      ],
      1 * $zmq_poll_units
    ) != -1  or die "zmq_poll failed: $!";

    $now = Time::HiRes::time;
    if ($last_checked + 1 < $now && $last_proclist_sent + 2 < $now) {
      my($cnt_gone, $cnt_terminated) = check_proc(\%process_states);
      $last_checked = $now;

      my(@proc_idle_list, @proc_busy_list);
      my(@num_proc_busy_by_age, %num_proc_busy_by_activity);
      my $now = Time::HiRes::time;
      for my $pid (sort { $a <=> $b } keys %process_states) {
        my $p = $process_states{$pid};
        my $s = $p->{state};
        if ($p->{task_id} eq '' && $s =~ /^[. ]\z/s) {
          push(@proc_idle_list, $pid);
        } else {
          push(@proc_busy_list, $pid);
          $num_proc_busy_by_age[0]++;
          if (defined $p->{base_timestamp}) {
            my $age = $now - $p->{base_timestamp};
            my $j = 1;
            for my $t (@age_slots) {
              if ($age >= $t) { $num_proc_busy_by_age[$j]++ }
              $j++;
            }
          }
          if ($s eq 'm' || $s eq 'd' || $s eq 'F') { $s = 'm' }
          elsif ($s eq 'D' || $s eq 'V' || $s eq 'S') { }
          else { $s = ' ' }
          $num_proc_busy_by_activity{$s}++;
        }
      }
      for my $j (0..@age_slots) {  # age_slots start at 1, zero is extra
        send_gauge("ProcBusy$j", $num_proc_busy_by_age[$j] || 0, 1);
      }
      send_gauge('ProcBusyTransfer', $num_proc_busy_by_activity{'m'}||0, 1);
      send_gauge('ProcBusyDecode',   $num_proc_busy_by_activity{'D'}||0, 1);
      send_gauge('ProcBusyVirus',    $num_proc_busy_by_activity{'V'}||0, 1);
      send_gauge('ProcBusySpam',     $num_proc_busy_by_activity{'S'}||0, 1);
      send_gauge('ProcBusyOther',    $num_proc_busy_by_activity{' '}||0, 1);
      send_count('ProcGone', $cnt_gone, 1)  if $cnt_gone;
      send_gauge('ProcAll',  @proc_busy_list + @proc_idle_list, 1);
      send_gauge('ProcBusy', scalar @proc_busy_list, 1);
      send_gauge('ProcIdle', scalar @proc_idle_list, 0);  # last chunk

      # must not mix different subscription prefixes
      # within the same multi-part message

      my $msg = 'am.proc.busy ' . join(' ',@proc_busy_list);
    # do_log(5, "childproc-minder: sending %s", $msg);
      defined zmq_sendstr($inner_sock, $msg)
        or die "zmq_sendstr failed: $!";
      $msg = 'am.proc.idle ' . join(' ',@proc_idle_list);
    # do_log(5, "childproc-minder: sending %s", $msg);
      defined zmq_sendstr($inner_sock, $msg)
        or die "zmq_sendstr failed: $!";

      $last_proclist_sent = $now;
    }
  } # forever
  # not reached
}

sub send_gauge($$;$) {
  my($name,$value,$more) = @_;
  defined zmq_sendstr($inner_sock,
                      sprintf('am.nanny %s G32 %d', $name,$value),
                      $more ? ZMQ_SNDMORE : 0)
    or die "zmq_sendstr (send_gauge $name) failed: $!";
}

sub send_count($$;$) {
  my($name,$value,$more) = @_;
  defined zmq_sendstr($inner_sock,
                      sprintf('am.nanny %s C32 %d', $name,$value),
                      $more ? ZMQ_SNDMORE : 0)
    or die "zmq_sendstr (send_count $name) failed: $!";
}

# snmp_responder listens to am.snmp and am.nanny messages reporting the
# SNMP variable updates (as broadcast to the outer socket by a forwarding
# process and child_minder), keeps evidence of a current value of each
# SNMP variable, and also listens on a $snmp_sock_specs socket for queries
# from amavisd-snmp-subagent-zmq, responding to each query by current values
# of SNMP variables.
#
sub snmp_responder() {
  do_log(5, "snmp-responder: zmq_init");
  $zmq_ctx = zmq_init(1);
  $zmq_ctx or die "Can't create ZMQ context: $!";

  do_log(5, "snmp-responder: creating outer ZMQ_SUB socket");
  $outer_sock = zmq_socket($zmq_ctx, ZMQ_SUB);
  $outer_sock or die "Can't create outer ZMQ socket: $!";

  do_log(5, "snmp-responder: zmq_setsockopt on outer socket");
  zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";
  my $outer_sock_ipv4only = 1;  # a ZMQ default
  if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) {
    zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY(), 0) != -1
      or die "zmq_setsockopt IPV4ONLY failed: $!";
    $outer_sock_ipv4only = 0;
  }
  zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.snmp ') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";
  zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.nanny ') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";

  do_log(5, "snmp-responder: connecting to outer zmq socket %s%s",
            $outer_sock_specs, $outer_sock_ipv4only ? '' : ', IPv6 enabled');
  zmq_connect($outer_sock, $outer_sock_specs) != -1
    or die "zmq_connect to $outer_sock_specs failed: $!";

  do_log(5, "snmp-responder: creating snmp ZMQ_REP socket");
  $snmp_sock = zmq_socket($zmq_ctx, ZMQ_REP);
  $snmp_sock or die "Can't create ZMQ socket: $!";

  do_log(5, "snmp-responder: zmq_setsockopt LINGER on snmp socket");
  zmq_setsockopt($snmp_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";

  my $snmp_sock_ipv4only = 1;  # a ZMQ default
  if (defined &ZMQ_IPV4ONLY && $snmp_sock_specs =~ /:[0-9a-f]*:/i) {
    do_log(5, "snmp-responder: zmq_setsockopt IPV4ONLY on snmp socket");
    zmq_setsockopt($snmp_sock, ZMQ_IPV4ONLY(), 0) != -1
      or die "zmq_setsockopt IPV4ONLY failed: $!";
    $snmp_sock_ipv4only = 0;
  }

# my $hwm = defined &ZMQ_SNDHWM ? ZMQ_SNDHWM()
#         : defined &ZMQ_HWM    ? ZMQ_HWM() : undef;
# if (defined $hwm) {
#   do_log(5, "snmp-responder: zmq_setsockopt SNDHWM on snmp socket");
#   zmq_setsockopt($inner_sock, $hwm, 2000) != -1
#     or die "zmq_setsockopt HWM failed: $!";
# }

  do_log(5, "snmp-responder: binding to snmp zmq socket %s%s",
            $snmp_sock_specs, $snmp_sock_ipv4only ? '' : ', IPv6 enabled');
  zmq_bind($snmp_sock, $snmp_sock_specs) != -1
    or die "zmq_bind to $snmp_sock_specs failed: $!";

  my(%snmp_var, %snmp_type);
  $snmp_var{'sysUpTime'}    = int(time);  # to be converted to TIM
  $snmp_type{'sysUpTime'}   = 'INT';
  $snmp_var{'sysObjectID'}  = '1.3.6.1.4.1.15312.2';
  $snmp_type{'sysObjectID'} = 'OID';
  $snmp_var{'sysServices'}  = 64;
  $snmp_type{'sysServices'} = 'INT';

  do_log(5, "snmp-responder: entering event loop");
  for (;;) {
    zmq_poll(
      [
        { socket => $snmp_sock,
          events => ZMQ_POLLIN,
          callback => sub {  # listen to queries
            # fetch a query of a form: "am.snmp?" or "am.nanny?"
            for (;;) {
              my($msgstr, $msgstr_l, $more);
              ($msgstr_l,$more) = zmq_recvstr($snmp_sock,$msgstr);
              defined $msgstr_l  or die "zmq_recvstr failed: $!";
              do_log(5, 'snmp-responder: %sgot "%s"', $more?'M':' ', $msgstr);
              if ($msgstr ne 'am.snmp?' && $msgstr ne 'am.nanny?') {
                do_log(2, 'snmp-responder: ignored "%s"', $msgstr);
              } else {
                my $chan = $msgstr; $chan =~ s/\?\z//;
                my $query_nanny = $chan eq 'am.nanny';
                my $response;
                while (my($key,$val) = each(%snmp_var)) {
                  next if $query_nanny ? $key !~ /^Proc/ : $key =~ /^Proc/;
                  my $type = $snmp_type{$key};
                  if (!defined $val) { $type = $val = '?' }
                  if (defined $response) {  # previous
                    do_log(2, 'snmp-responder: sending "%s"', $response);
                    defined zmq_sendstr($snmp_sock,$response, ZMQ_SNDMORE)
                      or die "zmq_sendstr failed: $!";
                  }
                  $response = join(' ', $chan, $key, $type, $val);
                }
                if (defined $response) {
                  defined zmq_sendstr($snmp_sock,$response)
                    or die "zmq_sendstr failed: $!";
                }
              }
              last if !$more;
            }
          },
        },
        { socket => $outer_sock,
          events => ZMQ_POLLIN,
          callback => sub {  # listen to information updates
            for (;;) {
              my($msgstr, $msgstr_l, $more, $chan, $key, $type, $rest);
              ($msgstr_l,$more) = zmq_recvstr($outer_sock,$msgstr);
              defined $msgstr_l  or die "zmq_recvstr failed: $!";
              ($chan,$key,$type,$rest) = split(' ',$msgstr,4);
              if ($chan ne 'am.snmp' && $chan ne 'am.nanny') {
                do_log(5, "snmp_responder: ignored: %s", $msgstr);
              } elsif (!defined $key) {
                do_log(5, "snmp_responder: ignored, no key: %s", $msgstr);
              } elsif ($key eq 'FLUSH') {
                # amavisd cold start, flush SNMP variables
                do_log(0, "snmp_responder: FLUSH snmp data");
                %snmp_var = (); %snmp_type = ();
                $snmp_var{'sysUpTime'} = int(time);  # to be converted to TIM
                $snmp_type{'sysUpTime'}   = 'INT';
                $snmp_var{'sysObjectID'}  = '1.3.6.1.4.1.15312.2';
                $snmp_type{'sysObjectID'} = 'OID';
                $snmp_var{'sysServices'}  = 64;
                $snmp_type{'sysServices'} = 'INT';
              } elsif (!$snmp_var{$key}) {
                $snmp_var{$key} = $rest;
                $snmp_type{$key} = $type;
              } elsif ($type =~ /^(C32|C64|TIM)\z/) {  # a counter
                $snmp_var{$key} += $rest;
              } else {
                $snmp_var{$key} = $rest;  # string, gauge, absolute value
              }
              last if !$more;
            }
          },
        },
      ],
      -1,  # blocking (no timeout)
    ) != -1  or die "zmq_poll failed: $!";
  }
  # not reached
}

# msg_forwarder forwards messages from inner ZMQ socket to outer ZMQ socket.
# Binding (not connecting) to both sockets provides a single stable point
# in the system.
#
# Amavisd child processes are dynamic and connect to the inner socket,
# supplying information. Similarly the childproc_minder process occasionally
# feeds its supplementary information updates to this inner socket.
#
# Dynamic clients like amavisd-nanny, amavisd-snmp-subagent, amavisd-agent,
# and a childproc_minder process connect to the outer socket to receive
# information from there.
#
sub msg_forwarder() {
  do_log(5, "msg-forwarder: zmq_init");
  $zmq_ctx = zmq_init(1);
  $zmq_ctx or die "Can't create a ZMQ context";

  # receive from amavisd child processes
  do_log(5, "msg-forwarder: creating inner ZMQ_SUB socket");
  $inner_sock = zmq_socket($zmq_ctx, ZMQ_SUB);
  $inner_sock or die "Error creating inner ZMQ_SUB socket: $!";

  do_log(5, "msg-forwarder: zmq_setsockopt on inner socket");
  zmq_setsockopt($inner_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";

  my $inner_sock_ipv4only = 1;  # a ZMQ default
  if (defined &ZMQ_IPV4ONLY && $inner_sock_specs =~ /:[0-9a-f]*:/i) {
    zmq_setsockopt($inner_sock, ZMQ_IPV4ONLY(), 0) != -1
      or die "zmq_setsockopt IPV4ONLY failed: $!";
    $inner_sock_ipv4only = 0;
  }

  zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, '') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.log.') != -1
#   or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.proc.') != -1
#   or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1
#   or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.snmp ') != -1
#   or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.nanny ') != -1
#   or die "zmq_setsockopt SUBSCRIBE failed: $!";

  do_log(5, "msg-forwarder: binding to inner zmq socket %s%s",
            $inner_sock_specs, $inner_sock_ipv4only ? '' : ', IPv6 enabled');
  zmq_bind($inner_sock, $inner_sock_specs) != -1
    or die "zmq_bind to $inner_sock_specs failed: $!";

  # forward to a public outer socket
  # to clients like amavisd-nanny, amavisd-agent, amavisd-snmp-subagent
  do_log(5, "msg-forwarder: creating outer ZMQ_PUB socket");
  $outer_sock = zmq_socket($zmq_ctx, ZMQ_PUB);
  $outer_sock or die "Error creating outer ZMQ_PUB socket: $!";

  do_log(5, "msg-forwarder: zmq_setsockopt on outer socket");
  zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";

  my $outer_sock_ipv4only = 1;  # a ZMQ default
  if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) {
    zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY(), 0) != -1
      or die "zmq_setsockopt IPV4ONLY failed: $!";
    $outer_sock_ipv4only = 0;
  }

  do_log(5, "msg-forwarder: binding to outer zmq socket %s%s",
            $outer_sock_specs, $outer_sock_ipv4only ? '' : ', IPv6 enabled');
  zmq_bind($outer_sock, $outer_sock_specs) != -1
    or die "zmq_bind to $outer_sock_specs failed: $!";

  # start forwarding
# if (0) {
  if ($zmq_mod_name eq 'ZeroMQ' || $zmq_mod_name eq 'ZMQ::LibZMQ2') {
    do_log(5, "msg_forwarder: using a built-in zmq_device for forwarding");
    zmq_device(ZMQ_FORWARDER, $inner_sock, $outer_sock);

  } else {  # ZMQ_FORWARDER device is no longer available in 3.1
    # 0MQ 3.2.1: zmq_device() deprecated and replaced by zmq_proxy()
    do_log(5, "msg_forwarder: start forwarding");
    my $debug = $foreground && ll(5);
    if ($debug) { $| = 1; print "starting\n" }
    my $cnt = 0;
    for (;;) {  # pass messages
      $cnt++;
      for (;;) {  # pass one multi-part message
        my $zmsg = zmq_recvmsg($inner_sock);  # a copy of a received msg obj
        $zmsg or die "zmq_recvmsg failed: $!";
        my $more = zmq_getsockopt($inner_sock, ZMQ_RCVMORE);
        $more != -1  or die "zmq_getsockopt RCVMORE failed: $!";
      # if ($debug && $zmsg) {
      #   my $str = zmq_msg_data($zmsg);  # copy and return as a perl scalar
      #   printf("%s %s\n", $more?'M':' ', $str)  if 1 || $str =~ /^am\.st /;
      #   do_log(5, "msg-forwarder: %s %s", $more?'M':' ', $str);
      # }
        # the zmq_sendmsg nullifies a message in a $zmsg object
        zmq_sendmsg($outer_sock, $zmsg, $more ? ZMQ_SNDMORE : 0) != -1
          or die "zmq_sendmsg failed: $!";
        zmq_msg_close($zmsg);  # declares a msg obj as no longer required
        last if !$more;
      }
      if ($debug) {
        print '.';
        printf(" %d\n", $cnt)  if $cnt % 100 == 0;
      }
    }
  }
  # not reached
}

sub usage() {
  my $me = $0; local $1; $me =~ s{([^/]*)\z}{$1}s;
  "Usage: $me [-f] [-d log_level] (msg-forwarder|childproc-minder|snmp-responder)";
}

# main program starts here

my $normal_termination = 0;

$SIG{'__DIE__' } =
  sub { if (!$^S) { my($m) = @_; chomp($m); do_log(-2, "_DIE: %s", $m) } };
$SIG{'__WARN__'} =
  sub { my($m) = @_; chomp($m); do_log(0, "_WARN: %s", $m) };

my $task_name;

$foreground = 0;
my(@argv) = @ARGV;  # preserve @ARGV, may modify @argv
while (@argv >= 2 && $argv[0] =~ /^-[d]\z/ ||
       @argv >= 1 && $argv[0] =~ /^-/) {
  my($opt,$val);
  $opt = shift @argv;
  $val = shift @argv  if $opt !~ /^-[hVf-]\z/;  # these take no arguments
  if ($opt eq '--') {
    last;
  } elsif ($opt eq '-h') {  # -h  (help)
    printf STDERR ("%s\n\n%s\n", $myversion, usage());
  } elsif ($opt eq '-V') {  # -V  (version)
    printf STDERR ("%s\n", $myversion);
  } elsif ($opt eq '-f') {
    $foreground = 1;
  } elsif ($opt eq '-d') {  # -d log_level or -d SAdbg1,SAdbg2,..,SAdbg3
    $val =~ /^\d+\z/  or die "Bad value for option -d: $val\n";
    $log_level = untaint($val)  if $val =~ /^\d+\z/;
  } else {
    printf STDERR ("Error in command line options: %s\n\n%s\n", $opt, usage());
    exit 1;
  }
}
if (@argv == 1 &&
    $argv[0] =~ /^(?:msg-forwarder|childproc-minder|snmp-responder)\z/) {
  $task_name = $argv[0];
} else {
  printf STDERR ("Error parsing a command line %s\n\n%s\n",
                 join(' ',@ARGV), usage());
  exit 1;
}

if (!$foreground) {
  openlog($syslog_ident, LOG_PID | LOG_NDELAY, $syslog_facility);
  $syslog_open = 1;
}

do_log(0, "%s task '%s' [%d] started. %s, perl %s",
          $myversion, $task_name, $$, zmq_version(), $]);

eval {  # catch TERM and INT signals for a controlled shutdown
  my $h = sub { $interrupted = $_[0]; die "\n" };
  local $SIG{INT}  = $h;
  local $SIG{TERM} = $h;
  if ($task_name eq 'msg-forwarder') {
    msg_forwarder();
  } elsif ($task_name eq 'childproc-minder') {
    childproc_minder();
  } elsif ($task_name eq 'snmp-responder') {
    snmp_responder();
  } else {
    die "Unrecognized task name: $task_name";
  }
};  # until interrupted
do_log(0, "Task '%s' [%d] shutting down", $task_name, $$);

if ($inner_sock) {
  do_log(0, "%s closing inner socket", $task_name);
  zmq_setsockopt($inner_sock, ZMQ_LINGER, 0);  # ignoring status
  zmq_close($inner_sock);  # ignoring status
}
if ($outer_sock) {
  do_log(0, "%s closing outer socket", $task_name);
  zmq_setsockopt($outer_sock, ZMQ_LINGER, 0);  # ignoring status
  zmq_close($outer_sock);  # ignoring status
}
if ($snmp_sock) {
  do_log(0, "%s closing SNMP socket", $task_name);
  zmq_setsockopt($snmp_sock, ZMQ_LINGER, 0);  # ignoring status
  zmq_close($snmp_sock);  # ignoring status
}
if ($zmq_ctx) {
  do_log(0, "%s closing context", $task_name);
  zmq_term($zmq_ctx);  # ignoring status
}

END {
  do_log(0, "Task '%s' [%d] exiting: %s",
            $task_name, $$, $interrupted) if !$normal_termination;
  if ($syslog_open) { closelog(); $syslog_open = 0 }
}

Anon7 - 2022
AnonSec Team