| Server IP : 85.214.239.14 / Your IP : 216.73.216.99 Web Server : Apache/2.4.65 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 8.2.29 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/task/3/cwd/proc/3/task/3/cwd/proc/2/task/2/cwd/usr/sbin/ |
Upload File : |
#!/usr/bin/perl -T
# SPDX-License-Identifier: BSD-2-Clause
#------------------------------------------------------------------------------
# This is amavis-services, a set of supervisor processes for amavis.
#
# Author: Mark Martinec <Mark.Martinec@ijs.si>
#
# Copyright (c) 2012-2018, Mark Martinec
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of the Jozef Stefan Institute.
# (the above license is the 2-clause BSD license, also known as
# a "Simplified BSD License", and pertains to this program only)
#
# Patches and problem reports are welcome.
# The latest version of this program is available at:
# http://www.ijs.si/software/amavisd/
#------------------------------------------------------------------------------
use strict;
use re 'taint';
use warnings;
use warnings FATAL => qw(utf8 void);
no warnings 'uninitialized';
use vars qw($VERSION); $VERSION = 2.011002;
use Errno qw(ESRCH ENOENT);
use POSIX qw(strftime);
use Time::HiRes ();
use Unix::Syslog qw(:macros :subs);
use vars qw($myproduct_name $myversion_id $myversion_date $myversion);
use vars qw($MYHOME $idle_ttl $active_ttl $log_level);
use vars qw($syslog_ident $syslog_facility);
use vars qw($inner_sock_specs $outer_sock_specs $snmp_sock_specs);
BEGIN {
$myproduct_name = 'amavis-services';
$myversion_id = '2.11.2'; $myversion_date = '20170602';
$myversion = "$myproduct_name-$myversion_id ($myversion_date)";
}
### USER CONFIGURABLE:
$log_level = 0; # 0..5
$syslog_facility = LOG_MAIL;
$syslog_ident = $myproduct_name;
$MYHOME = '/var/lib/amavis';
# A socket to which amavisd child processes report their data.
# should match one of the sockets in @zmq_sockets in amavisd.conf
$inner_sock_specs = "ipc://$MYHOME/amavisd-zmq.sock";
# A socket to which we forward summarized amavisd data.
# should match a socket of the same name in amavis-status
$outer_sock_specs = "tcp://127.0.0.1:23232";
# A socket on which we accept SNMP queries and respond to.
# should match a socket of the same name in amavisd-snmp-subagent-zmq
$snmp_sock_specs = "tcp://127.0.0.1:23233"; # tcp://*:23233
$idle_ttl = 4*60*60; # idle children are sent a SIGTERM
# after this many seconds
$active_ttl = 15*60; # stuck active children are sent a SIGTERM
# after this many seconds
### END OF USER CONFIGURABLE
use vars qw(@age_slots);
BEGIN {
@age_slots = (
0.1, 0.2, 0.5,
1, 2, 4, 8, 15, 30, # seconds
1*60, 2*60, 4*60, 8*60, 15*60, 30*60, # minutes
1*3600, 2*3600, 4*3600, 8*3600, 15*3600, 30*3600); # hours
}
use vars qw($zmq_mod_name $zmq_mod_version $zmq_lib_version);
BEGIN {
my($zmq_major, $zmq_minor, $zmq_patch);
if (eval { require ZMQ::LibZMQ3 && require ZMQ::Constants }) {
$zmq_mod_name = 'ZMQ::LibZMQ3'; # new interface module to zmq v3 or libxs
import ZMQ::LibZMQ3; import ZMQ::Constants qw(:all);
($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ3::zmq_version();
# *zmq_sendmsg [native] # (socket,msgobj,flags)
# *zmq_recvmsg [native] # (socket,flags) -> msgobj
*zmq_sendstr = sub { # (socket,string,flags)
my $rv = zmq_send($_[0], $_[1], length $_[1], $_[2]||0);
$rv == -1 ? undef : $rv;
};
} elsif (eval { require ZMQ::LibZMQ2 && require ZMQ::Constants }) {
$zmq_mod_name = 'ZMQ::LibZMQ2'; # new interface module to zmq v2
import ZMQ::LibZMQ2; import ZMQ::Constants qw(:all);
($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ2::zmq_version();
# zmq v2/v3 incompatibile renaming
*zmq_sendmsg = \&ZMQ::LibZMQ2::zmq_send; # (socket,msgobj,flags)
*zmq_recvmsg = \&ZMQ::LibZMQ2::zmq_recv; # (socket,flags) -> msgobj
*zmq_sendstr = sub { # (socket,string,flags)
my $rv = zmq_send(@_); $rv == -1 ? undef : $rv;
};
} elsif (eval { require ZeroMQ::Constants && require ZeroMQ::Raw }) {
$zmq_mod_name = 'ZeroMQ'; # old interface module to zmq v2
import ZeroMQ::Raw; import ZeroMQ::Constants qw(:all);
($zmq_major, $zmq_minor, $zmq_patch) = ZeroMQ::version();
# zmq v2/v3 incompatibile renaming
*zmq_sendmsg = \&ZeroMQ::Raw::zmq_send; # (socket,msgobj,flags)
*zmq_recvmsg = \&ZeroMQ::Raw::zmq_recv; # (socket,flags) -> msgobj
*zmq_sendstr = sub { # (socket,string,flags)
my $rv = zmq_send(@_); $rv == -1 ? undef : $rv;
};
} else {
die "Perl modules ZMQ::LibZMQ3 or ZMQ::LibZMQ2 or ZeroMQ not available\n";
}
$zmq_mod_version = $zmq_mod_name->VERSION;
$zmq_lib_version = join('.', $zmq_major, $zmq_minor, $zmq_patch);
1;
}
sub zmq_version {
sprintf("%s %s, lib %s",
$zmq_mod_name, $zmq_mod_version, $zmq_lib_version);
};
sub zmq_recvstr { # (socket,buffer,offset) -> (len,more)
my $sock = $_[0];
my $offset = $_[2] || 0;
my $zm = zmq_recvmsg($sock); # a copy of a received msg obj
if (!$zm) { substr($_[1],$offset) = ''; return }
($offset ? substr($_[1],$offset) : $_[1]) = zmq_msg_data($zm);
my $len = length($_[1]) - $offset;
zmq_msg_close($zm);
return $len if !wantarray;
my $more = zmq_getsockopt($sock, ZMQ_RCVMORE);
if ($more == -1) { substr($_[1],$offset) = ''; return }
($len, $more);
};
my($interrupted, $syslog_open, $foreground);
my($zmq_ctx, $inner_sock, $outer_sock, $snmp_sock);
my $zmq_poll_units = 1000; # milliseconds since zmq v3
$zmq_poll_units *= 1000 if $zmq_lib_version =~ /^[012]\./; # microseconds
# Return untainted copy of a string (argument can be a string or a string ref)
#
sub untaint($) {
return undef if !defined $_[0]; # must return undef even in a list context!
no re 'taint';
local $1; # avoids Perl taint bug: tainted global $1 propagates taintedness
(ref($_[0]) ? ${$_[0]} : $_[0]) =~ /^(.*)\z/s;
$1;
}
sub ll($) {
my($level) = @_;
$level <= $log_level;
}
sub do_log($$;@) {
# my($level,$errmsg,@args) = @_;
my $level = shift;
if ($level <= $log_level) {
my $errmsg = shift;
# treat $errmsg as sprintf format string if additional arguments provided
$errmsg = sprintf($errmsg,@_) if @_;
if (!$syslog_open) {
$errmsg .= "\n";
print STDERR $errmsg; # print ignoring I/O status, except SIGPIPE
} else {
my $prio = $level >= 3 ? LOG_DEBUG # most frequent first
: $level >= 1 ? LOG_INFO
: $level >= 0 ? LOG_NOTICE
: $level >= -1 ? LOG_WARNING
: $level >= -2 ? LOG_ERR
: LOG_CRIT;
syslog($prio, "%s", $errmsg);
}
}
}
sub process_message($$) {
my($process_states_ref,$msgstr_ref) = @_;
if (!$msgstr_ref || !defined $$msgstr_ref) {
# should not happen (except on a failure of zmq_recvmsg)
} elsif ($$msgstr_ref =~ /^am\.st \d+\s+/s) {
my($subscription_chan, $pid, $time, $state, $task_id) =
split(' ', $$msgstr_ref);
if ($state eq 'FLUSH') {
do_log(0, "childproc_minder: FLUSH process states");
%$process_states_ref = (); # flush all kept state (e.g. on a restart)
} elsif ($state eq 'exiting' || $state eq 'purged') {
delete $process_states_ref->{$pid}; # may or may not exist
} else {
$state = ' ' if $state eq '-';
my $p = $process_states_ref->{$pid};
if ($p) {
$p->{state} = $state;
$p->{task_id} = $task_id;
} else { # new process appeared
$process_states_ref->{$pid} = $p = {
state => $state,
task_id => $task_id,
timestamp => undef,
base_timestamp => undef,
};
}
my $now = Time::HiRes::time;
if ($time > 1e9) { # Unix time in seconds with fraction (> Y2000)
$p->{base_timestamp} = $p->{timestamp} = $time;
} elsif (!$p->{base_timestamp}) { # delta time but no base
$p->{timestamp} = $now;
$p->{base_timestamp} = $p->{timestamp} - $time/1000; # estimate
} else { # delta time since base_timestamp in ms
$p->{timestamp} = $p->{base_timestamp} + $time/1000;
}
$p->{tick} = $now;
}
}
1;
}
sub check_proc($) {
my($process_states_ref) = @_;
do_log(2, "CHECK");
my $cnt_gone = 0;
my $cnt_terminated = 0;
while (my($pid,$p) = each %$process_states_ref) {
my $now = Time::HiRes::time;
my $age = $now - $p->{base_timestamp};
my $idling = $p->{task_id} eq '' && $p->{state} =~ /^[. ]\z/s;
my $overdue = $age > ($idling ? $idle_ttl : $active_ttl);
my $n; # number of checked processes (0 or 1)
if (!$overdue && $now - $p->{tick} < 10) {
$n = 1; # recently heard from it, assume it is still there
do_log(2, "PID %d skipped, recently heared from", $pid);
} elsif (!$overdue && $idling &&
$p->{last_checked_timestamp} &&
$now - $p->{last_checked_timestamp} < 20) {
$n = 1; # recently checked, idle, assume it is still there
do_log(2, "PID %d skipped, recently checked", $pid);
} elsif (!$overdue && $p->{last_checked_timestamp} &&
$now - $p->{last_checked_timestamp} < 10) {
$n = 1; # recently checked, busy, assume it is still there
do_log(2, "PID %d skipped, recently checked", $pid);
} else {
do_log(2, "PID %d checking", $pid);
$p->{last_checked_timestamp} = $now;
$n = kill(0,$pid); # test if the process is still there
if ($n == 0) {
# ESRCH means there is no such process
if ($! != ESRCH) {
do_log(-1, "Can't check the process %s: %s", $pid, $!);
} elsif (defined $p->{sig_sent}) {
$cnt_terminated++;
do_log(2, "PID %d sucessfully terminated by SIG%s, %s",
$pid, $p->{sig_sent}, $p->{task_id} || $p->{state});
} else {
$cnt_gone++;
do_log(-2, "PID %d went away, %s",
$pid, $p->{task_id} || $p->{state} );
}
delete $process_states_ref->{$pid};
defined zmq_sendstr($inner_sock,
sprintf('am.st %s %014.3f purged', $pid,$now))
or die "zmq_sendstr failed: $!"
}
}
if ($n == 0) {
# already dealt with
} elsif (!$overdue) { # life is good
do_log(2, "PID %d: %s", $pid, $p->{task_id} || $p->{state} );
} elsif (!$p->{sig_sent} ||
$p->{sig_sent_timestamp} + $p->{sig_sent_retry_in} >= $now) {
# overdue, terminate or kill, or retry the killing
if (!$p->{sig_sent}) {
$p->{sig_sent} = 'TERM';
$p->{sig_sent_retry_in} = 20;
} else {
$p->{sig_sent} = 'KILL';
$p->{sig_sent_retry_in} *= 1.5; # increase the wait time for a retry
}
$p->{sig_sent_timestamp} = $now;
if (kill($p->{sig_sent},$pid)) {
do_log(2, "PID %d SIG%s, %s",
$pid, $p->{sig_sent}, $p->{task_id} || $p->{state});
} elsif ($! == ESRCH) {
# already gone by now, no fuss
} else {
warn "Can't $p->{sig_sent} the [$pid]: $!";
}
if ($p->{sig_sent_retry_in} > 600) {
do_log(2, "Giving up on PID %d, %s",
$pid, $p->{task_id} || $p->{state});
delete $process_states_ref->{$pid};
defined zmq_sendstr($inner_sock,
sprintf('am.st %s %014.3f purged', $pid,$now))
or die "zmq_sendstr failed: $!"
}
}
}
($cnt_gone, $cnt_terminated);
}
# The childproc_minder process receives information about amavisd child
# processes from an outer socket, forwarded there by a forwarding process.
# Based on updates received, and based on its own processes sanity checks,
# it maintains a state in memory of each amavisd child process.
#
# It must run on the same host as amavisd child processes in order to be
# able to check for lost or crashed processes.
#
# It kills amavisd child processes which are active longer than $active_ttl
# seconds, or are idling for more than $idle_ttl seconds. For killed or lost
# processes it sends 'purged' messages to the inner socket, and periodically
# sends a list of amavisd child process PIDs to the inner socket for the
# benefit of more ephemeral clients.
#
sub childproc_minder() {
do_log(5, "childproc-minder: zmq_init");
$zmq_ctx = zmq_init(1);
$zmq_ctx or die "Can't create ZMQ context: $!";
do_log(5, "childproc-minder: creating outer ZMQ_SUB socket");
$outer_sock = zmq_socket($zmq_ctx, ZMQ_SUB);
$outer_sock or die "Can't create outer ZMQ socket: $!";
do_log(5, "childproc-minder: zmq_setsockopt on outer socket");
zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1 # milliseconds
or die "zmq_setsockopt LINGER failed: $!";
my $outer_sock_ipv4only = 1; # a ZMQ default
if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) {
zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY(), 0) != -1
or die "zmq_setsockopt failed: $!";
$outer_sock_ipv4only = 0;
}
zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1
or die "zmq_setsockopt SUBSCRIBE failed: $!";
do_log(5, "childproc-minder: connecting to outer zmq socket %s%s",
$outer_sock_specs, $outer_sock_ipv4only ? '' : ', IPv6 enabled');
zmq_connect($outer_sock, $outer_sock_specs) != -1
or die "zmq_connect to $outer_sock_specs failed: $!";
do_log(5, "childproc-minder: creating inner ZMQ_PUB socket");
$inner_sock = zmq_socket($zmq_ctx, ZMQ_PUB);
$inner_sock or die "Can't create inner ZMQ socket: $!";
do_log(5, "childproc-minder: zmq_setsockopt on inner socket");
zmq_setsockopt($inner_sock, ZMQ_LINGER, 2000) != -1 # milliseconds
or die "zmq_setsockopt LINGER failed: $!";
my $inner_sock_ipv4only = 1; # a ZMQ default
if (defined &ZMQ_IPV4ONLY && $inner_sock_specs =~ /:[0-9a-f]*:/i) {
zmq_setsockopt($inner_sock, ZMQ_IPV4ONLY(), 0) != -1
or die "zmq_setsockopt IPV4ONLY failed: $!";
$inner_sock_ipv4only = 0;
}
my $hwm = defined &ZMQ_SNDHWM ? ZMQ_SNDHWM()
: defined &ZMQ_HWM ? ZMQ_HWM() : undef;
if (defined $hwm) {
zmq_setsockopt($inner_sock, $hwm, 100) != -1
or die "zmq_setsockopt HWM failed: $!";
}
do_log(5, "childproc-minder: connecting to inner zmq socket %s%s",
$inner_sock_specs, $inner_sock_ipv4only ? '' : ', IPv6 enabled');
zmq_connect($inner_sock, $inner_sock_specs) != -1
or die "zmq_connect to $inner_sock_specs failed: $!";
sleep 1; # a crude way to avoid a "slow joiner" syndrome #***
my $last_checked = 0;
my $last_proclist_sent = 0;
my %process_states; # associative array on pid
do_log(5, "childproc-minder: sending FLUSH to inner zmq socket");
my $now = Time::HiRes::time;
defined zmq_sendstr($inner_sock,
sprintf('am.st %s %014.3f FLUSH', $$, $now))
or die "zmq_sendstr failed: $!";
do_log(5, "childproc-minder: entering event loop");
for (;;) {
zmq_poll(
[
{ socket => $outer_sock,
events => ZMQ_POLLIN,
callback =>
sub {
my($msgstr, $msgstr_l, $more);
for (;;) {
($msgstr_l,$more) = zmq_recvstr($outer_sock,$msgstr);
defined $msgstr_l or die "zmq_recvstr failed: $!";
do_log(5, "childproc-minder: got %s", $msgstr);
process_message(\%process_states,\$msgstr);
last if !$more;
}
},
},
],
1 * $zmq_poll_units
) != -1 or die "zmq_poll failed: $!";
$now = Time::HiRes::time;
if ($last_checked + 1 < $now && $last_proclist_sent + 2 < $now) {
my($cnt_gone, $cnt_terminated) = check_proc(\%process_states);
$last_checked = $now;
my(@proc_idle_list, @proc_busy_list);
my(@num_proc_busy_by_age, %num_proc_busy_by_activity);
my $now = Time::HiRes::time;
for my $pid (sort { $a <=> $b } keys %process_states) {
my $p = $process_states{$pid};
my $s = $p->{state};
if ($p->{task_id} eq '' && $s =~ /^[. ]\z/s) {
push(@proc_idle_list, $pid);
} else {
push(@proc_busy_list, $pid);
$num_proc_busy_by_age[0]++;
if (defined $p->{base_timestamp}) {
my $age = $now - $p->{base_timestamp};
my $j = 1;
for my $t (@age_slots) {
if ($age >= $t) { $num_proc_busy_by_age[$j]++ }
$j++;
}
}
if ($s eq 'm' || $s eq 'd' || $s eq 'F') { $s = 'm' }
elsif ($s eq 'D' || $s eq 'V' || $s eq 'S') { }
else { $s = ' ' }
$num_proc_busy_by_activity{$s}++;
}
}
for my $j (0..@age_slots) { # age_slots start at 1, zero is extra
send_gauge("ProcBusy$j", $num_proc_busy_by_age[$j] || 0, 1);
}
send_gauge('ProcBusyTransfer', $num_proc_busy_by_activity{'m'}||0, 1);
send_gauge('ProcBusyDecode', $num_proc_busy_by_activity{'D'}||0, 1);
send_gauge('ProcBusyVirus', $num_proc_busy_by_activity{'V'}||0, 1);
send_gauge('ProcBusySpam', $num_proc_busy_by_activity{'S'}||0, 1);
send_gauge('ProcBusyOther', $num_proc_busy_by_activity{' '}||0, 1);
send_count('ProcGone', $cnt_gone, 1) if $cnt_gone;
send_gauge('ProcAll', @proc_busy_list + @proc_idle_list, 1);
send_gauge('ProcBusy', scalar @proc_busy_list, 1);
send_gauge('ProcIdle', scalar @proc_idle_list, 0); # last chunk
# must not mix different subscription prefixes
# within the same multi-part message
my $msg = 'am.proc.busy ' . join(' ',@proc_busy_list);
# do_log(5, "childproc-minder: sending %s", $msg);
defined zmq_sendstr($inner_sock, $msg)
or die "zmq_sendstr failed: $!";
$msg = 'am.proc.idle ' . join(' ',@proc_idle_list);
# do_log(5, "childproc-minder: sending %s", $msg);
defined zmq_sendstr($inner_sock, $msg)
or die "zmq_sendstr failed: $!";
$last_proclist_sent = $now;
}
} # forever
# not reached
}
sub send_gauge($$;$) {
my($name,$value,$more) = @_;
defined zmq_sendstr($inner_sock,
sprintf('am.nanny %s G32 %d', $name,$value),
$more ? ZMQ_SNDMORE : 0)
or die "zmq_sendstr (send_gauge $name) failed: $!";
}
sub send_count($$;$) {
my($name,$value,$more) = @_;
defined zmq_sendstr($inner_sock,
sprintf('am.nanny %s C32 %d', $name,$value),
$more ? ZMQ_SNDMORE : 0)
or die "zmq_sendstr (send_count $name) failed: $!";
}
# snmp_responder listens to am.snmp and am.nanny messages reporting the
# SNMP variable updates (as broadcast to the outer socket by a forwarding
# process and child_minder), keeps evidence of a current value of each
# SNMP variable, and also listens on a $snmp_sock_specs socket for queries
# from amavisd-snmp-subagent-zmq, responding to each query by current values
# of SNMP variables.
#
sub snmp_responder() {
do_log(5, "snmp-responder: zmq_init");
$zmq_ctx = zmq_init(1);
$zmq_ctx or die "Can't create ZMQ context: $!";
do_log(5, "snmp-responder: creating outer ZMQ_SUB socket");
$outer_sock = zmq_socket($zmq_ctx, ZMQ_SUB);
$outer_sock or die "Can't create outer ZMQ socket: $!";
do_log(5, "snmp-responder: zmq_setsockopt on outer socket");
zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1 # milliseconds
or die "zmq_setsockopt LINGER failed: $!";
my $outer_sock_ipv4only = 1; # a ZMQ default
if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) {
zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY(), 0) != -1
or die "zmq_setsockopt IPV4ONLY failed: $!";
$outer_sock_ipv4only = 0;
}
zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.snmp ') != -1
or die "zmq_setsockopt SUBSCRIBE failed: $!";
zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.nanny ') != -1
or die "zmq_setsockopt SUBSCRIBE failed: $!";
do_log(5, "snmp-responder: connecting to outer zmq socket %s%s",
$outer_sock_specs, $outer_sock_ipv4only ? '' : ', IPv6 enabled');
zmq_connect($outer_sock, $outer_sock_specs) != -1
or die "zmq_connect to $outer_sock_specs failed: $!";
do_log(5, "snmp-responder: creating snmp ZMQ_REP socket");
$snmp_sock = zmq_socket($zmq_ctx, ZMQ_REP);
$snmp_sock or die "Can't create ZMQ socket: $!";
do_log(5, "snmp-responder: zmq_setsockopt LINGER on snmp socket");
zmq_setsockopt($snmp_sock, ZMQ_LINGER, 2000) != -1 # milliseconds
or die "zmq_setsockopt LINGER failed: $!";
my $snmp_sock_ipv4only = 1; # a ZMQ default
if (defined &ZMQ_IPV4ONLY && $snmp_sock_specs =~ /:[0-9a-f]*:/i) {
do_log(5, "snmp-responder: zmq_setsockopt IPV4ONLY on snmp socket");
zmq_setsockopt($snmp_sock, ZMQ_IPV4ONLY(), 0) != -1
or die "zmq_setsockopt IPV4ONLY failed: $!";
$snmp_sock_ipv4only = 0;
}
# my $hwm = defined &ZMQ_SNDHWM ? ZMQ_SNDHWM()
# : defined &ZMQ_HWM ? ZMQ_HWM() : undef;
# if (defined $hwm) {
# do_log(5, "snmp-responder: zmq_setsockopt SNDHWM on snmp socket");
# zmq_setsockopt($inner_sock, $hwm, 2000) != -1
# or die "zmq_setsockopt HWM failed: $!";
# }
do_log(5, "snmp-responder: binding to snmp zmq socket %s%s",
$snmp_sock_specs, $snmp_sock_ipv4only ? '' : ', IPv6 enabled');
zmq_bind($snmp_sock, $snmp_sock_specs) != -1
or die "zmq_bind to $snmp_sock_specs failed: $!";
my(%snmp_var, %snmp_type);
$snmp_var{'sysUpTime'} = int(time); # to be converted to TIM
$snmp_type{'sysUpTime'} = 'INT';
$snmp_var{'sysObjectID'} = '1.3.6.1.4.1.15312.2';
$snmp_type{'sysObjectID'} = 'OID';
$snmp_var{'sysServices'} = 64;
$snmp_type{'sysServices'} = 'INT';
do_log(5, "snmp-responder: entering event loop");
for (;;) {
zmq_poll(
[
{ socket => $snmp_sock,
events => ZMQ_POLLIN,
callback => sub { # listen to queries
# fetch a query of a form: "am.snmp?" or "am.nanny?"
for (;;) {
my($msgstr, $msgstr_l, $more);
($msgstr_l,$more) = zmq_recvstr($snmp_sock,$msgstr);
defined $msgstr_l or die "zmq_recvstr failed: $!";
do_log(5, 'snmp-responder: %sgot "%s"', $more?'M':' ', $msgstr);
if ($msgstr ne 'am.snmp?' && $msgstr ne 'am.nanny?') {
do_log(2, 'snmp-responder: ignored "%s"', $msgstr);
} else {
my $chan = $msgstr; $chan =~ s/\?\z//;
my $query_nanny = $chan eq 'am.nanny';
my $response;
while (my($key,$val) = each(%snmp_var)) {
next if $query_nanny ? $key !~ /^Proc/ : $key =~ /^Proc/;
my $type = $snmp_type{$key};
if (!defined $val) { $type = $val = '?' }
if (defined $response) { # previous
do_log(2, 'snmp-responder: sending "%s"', $response);
defined zmq_sendstr($snmp_sock,$response, ZMQ_SNDMORE)
or die "zmq_sendstr failed: $!";
}
$response = join(' ', $chan, $key, $type, $val);
}
if (defined $response) {
defined zmq_sendstr($snmp_sock,$response)
or die "zmq_sendstr failed: $!";
}
}
last if !$more;
}
},
},
{ socket => $outer_sock,
events => ZMQ_POLLIN,
callback => sub { # listen to information updates
for (;;) {
my($msgstr, $msgstr_l, $more, $chan, $key, $type, $rest);
($msgstr_l,$more) = zmq_recvstr($outer_sock,$msgstr);
defined $msgstr_l or die "zmq_recvstr failed: $!";
($chan,$key,$type,$rest) = split(' ',$msgstr,4);
if ($chan ne 'am.snmp' && $chan ne 'am.nanny') {
do_log(5, "snmp_responder: ignored: %s", $msgstr);
} elsif (!defined $key) {
do_log(5, "snmp_responder: ignored, no key: %s", $msgstr);
} elsif ($key eq 'FLUSH') {
# amavisd cold start, flush SNMP variables
do_log(0, "snmp_responder: FLUSH snmp data");
%snmp_var = (); %snmp_type = ();
$snmp_var{'sysUpTime'} = int(time); # to be converted to TIM
$snmp_type{'sysUpTime'} = 'INT';
$snmp_var{'sysObjectID'} = '1.3.6.1.4.1.15312.2';
$snmp_type{'sysObjectID'} = 'OID';
$snmp_var{'sysServices'} = 64;
$snmp_type{'sysServices'} = 'INT';
} elsif (!$snmp_var{$key}) {
$snmp_var{$key} = $rest;
$snmp_type{$key} = $type;
} elsif ($type =~ /^(C32|C64|TIM)\z/) { # a counter
$snmp_var{$key} += $rest;
} else {
$snmp_var{$key} = $rest; # string, gauge, absolute value
}
last if !$more;
}
},
},
],
-1, # blocking (no timeout)
) != -1 or die "zmq_poll failed: $!";
}
# not reached
}
# msg_forwarder forwards messages from inner ZMQ socket to outer ZMQ socket.
# Binding (not connecting) to both sockets provides a single stable point
# in the system.
#
# Amavisd child processes are dynamic and connect to the inner socket,
# supplying information. Similarly the childproc_minder process occasionally
# feeds its supplementary information updates to this inner socket.
#
# Dynamic clients like amavisd-nanny, amavisd-snmp-subagent, amavisd-agent,
# and a childproc_minder process connect to the outer socket to receive
# information from there.
#
sub msg_forwarder() {
do_log(5, "msg-forwarder: zmq_init");
$zmq_ctx = zmq_init(1);
$zmq_ctx or die "Can't create a ZMQ context";
# receive from amavisd child processes
do_log(5, "msg-forwarder: creating inner ZMQ_SUB socket");
$inner_sock = zmq_socket($zmq_ctx, ZMQ_SUB);
$inner_sock or die "Error creating inner ZMQ_SUB socket: $!";
do_log(5, "msg-forwarder: zmq_setsockopt on inner socket");
zmq_setsockopt($inner_sock, ZMQ_LINGER, 2000) != -1 # milliseconds
or die "zmq_setsockopt LINGER failed: $!";
my $inner_sock_ipv4only = 1; # a ZMQ default
if (defined &ZMQ_IPV4ONLY && $inner_sock_specs =~ /:[0-9a-f]*:/i) {
zmq_setsockopt($inner_sock, ZMQ_IPV4ONLY(), 0) != -1
or die "zmq_setsockopt IPV4ONLY failed: $!";
$inner_sock_ipv4only = 0;
}
zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, '') != -1
or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.log.') != -1
# or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.proc.') != -1
# or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1
# or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.snmp ') != -1
# or die "zmq_setsockopt SUBSCRIBE failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.nanny ') != -1
# or die "zmq_setsockopt SUBSCRIBE failed: $!";
do_log(5, "msg-forwarder: binding to inner zmq socket %s%s",
$inner_sock_specs, $inner_sock_ipv4only ? '' : ', IPv6 enabled');
zmq_bind($inner_sock, $inner_sock_specs) != -1
or die "zmq_bind to $inner_sock_specs failed: $!";
# forward to a public outer socket
# to clients like amavisd-nanny, amavisd-agent, amavisd-snmp-subagent
do_log(5, "msg-forwarder: creating outer ZMQ_PUB socket");
$outer_sock = zmq_socket($zmq_ctx, ZMQ_PUB);
$outer_sock or die "Error creating outer ZMQ_PUB socket: $!";
do_log(5, "msg-forwarder: zmq_setsockopt on outer socket");
zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1 # milliseconds
or die "zmq_setsockopt LINGER failed: $!";
my $outer_sock_ipv4only = 1; # a ZMQ default
if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) {
zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY(), 0) != -1
or die "zmq_setsockopt IPV4ONLY failed: $!";
$outer_sock_ipv4only = 0;
}
do_log(5, "msg-forwarder: binding to outer zmq socket %s%s",
$outer_sock_specs, $outer_sock_ipv4only ? '' : ', IPv6 enabled');
zmq_bind($outer_sock, $outer_sock_specs) != -1
or die "zmq_bind to $outer_sock_specs failed: $!";
# start forwarding
# if (0) {
if ($zmq_mod_name eq 'ZeroMQ' || $zmq_mod_name eq 'ZMQ::LibZMQ2') {
do_log(5, "msg_forwarder: using a built-in zmq_device for forwarding");
zmq_device(ZMQ_FORWARDER, $inner_sock, $outer_sock);
} else { # ZMQ_FORWARDER device is no longer available in 3.1
# 0MQ 3.2.1: zmq_device() deprecated and replaced by zmq_proxy()
do_log(5, "msg_forwarder: start forwarding");
my $debug = $foreground && ll(5);
if ($debug) { $| = 1; print "starting\n" }
my $cnt = 0;
for (;;) { # pass messages
$cnt++;
for (;;) { # pass one multi-part message
my $zmsg = zmq_recvmsg($inner_sock); # a copy of a received msg obj
$zmsg or die "zmq_recvmsg failed: $!";
my $more = zmq_getsockopt($inner_sock, ZMQ_RCVMORE);
$more != -1 or die "zmq_getsockopt RCVMORE failed: $!";
# if ($debug && $zmsg) {
# my $str = zmq_msg_data($zmsg); # copy and return as a perl scalar
# printf("%s %s\n", $more?'M':' ', $str) if 1 || $str =~ /^am\.st /;
# do_log(5, "msg-forwarder: %s %s", $more?'M':' ', $str);
# }
# the zmq_sendmsg nullifies a message in a $zmsg object
zmq_sendmsg($outer_sock, $zmsg, $more ? ZMQ_SNDMORE : 0) != -1
or die "zmq_sendmsg failed: $!";
zmq_msg_close($zmsg); # declares a msg obj as no longer required
last if !$more;
}
if ($debug) {
print '.';
printf(" %d\n", $cnt) if $cnt % 100 == 0;
}
}
}
# not reached
}
sub usage() {
my $me = $0; local $1; $me =~ s{([^/]*)\z}{$1}s;
"Usage: $me [-f] [-d log_level] (msg-forwarder|childproc-minder|snmp-responder)";
}
# main program starts here
my $normal_termination = 0;
$SIG{'__DIE__' } =
sub { if (!$^S) { my($m) = @_; chomp($m); do_log(-2, "_DIE: %s", $m) } };
$SIG{'__WARN__'} =
sub { my($m) = @_; chomp($m); do_log(0, "_WARN: %s", $m) };
my $task_name;
$foreground = 0;
my(@argv) = @ARGV; # preserve @ARGV, may modify @argv
while (@argv >= 2 && $argv[0] =~ /^-[d]\z/ ||
@argv >= 1 && $argv[0] =~ /^-/) {
my($opt,$val);
$opt = shift @argv;
$val = shift @argv if $opt !~ /^-[hVf-]\z/; # these take no arguments
if ($opt eq '--') {
last;
} elsif ($opt eq '-h') { # -h (help)
printf STDERR ("%s\n\n%s\n", $myversion, usage());
} elsif ($opt eq '-V') { # -V (version)
printf STDERR ("%s\n", $myversion);
} elsif ($opt eq '-f') {
$foreground = 1;
} elsif ($opt eq '-d') { # -d log_level or -d SAdbg1,SAdbg2,..,SAdbg3
$val =~ /^\d+\z/ or die "Bad value for option -d: $val\n";
$log_level = untaint($val) if $val =~ /^\d+\z/;
} else {
printf STDERR ("Error in command line options: %s\n\n%s\n", $opt, usage());
exit 1;
}
}
if (@argv == 1 &&
$argv[0] =~ /^(?:msg-forwarder|childproc-minder|snmp-responder)\z/) {
$task_name = $argv[0];
} else {
printf STDERR ("Error parsing a command line %s\n\n%s\n",
join(' ',@ARGV), usage());
exit 1;
}
if (!$foreground) {
openlog($syslog_ident, LOG_PID | LOG_NDELAY, $syslog_facility);
$syslog_open = 1;
}
do_log(0, "%s task '%s' [%d] started. %s, perl %s",
$myversion, $task_name, $$, zmq_version(), $]);
eval { # catch TERM and INT signals for a controlled shutdown
my $h = sub { $interrupted = $_[0]; die "\n" };
local $SIG{INT} = $h;
local $SIG{TERM} = $h;
if ($task_name eq 'msg-forwarder') {
msg_forwarder();
} elsif ($task_name eq 'childproc-minder') {
childproc_minder();
} elsif ($task_name eq 'snmp-responder') {
snmp_responder();
} else {
die "Unrecognized task name: $task_name";
}
}; # until interrupted
do_log(0, "Task '%s' [%d] shutting down", $task_name, $$);
if ($inner_sock) {
do_log(0, "%s closing inner socket", $task_name);
zmq_setsockopt($inner_sock, ZMQ_LINGER, 0); # ignoring status
zmq_close($inner_sock); # ignoring status
}
if ($outer_sock) {
do_log(0, "%s closing outer socket", $task_name);
zmq_setsockopt($outer_sock, ZMQ_LINGER, 0); # ignoring status
zmq_close($outer_sock); # ignoring status
}
if ($snmp_sock) {
do_log(0, "%s closing SNMP socket", $task_name);
zmq_setsockopt($snmp_sock, ZMQ_LINGER, 0); # ignoring status
zmq_close($snmp_sock); # ignoring status
}
if ($zmq_ctx) {
do_log(0, "%s closing context", $task_name);
zmq_term($zmq_ctx); # ignoring status
}
END {
do_log(0, "Task '%s' [%d] exiting: %s",
$task_name, $$, $interrupted) if !$normal_termination;
if ($syslog_open) { closelog(); $syslog_open = 0 }
}