Server IP : 85.214.239.14 / Your IP : 3.137.219.213 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /usr/share/perl5/Amavis/ |
Upload File : |
# SPDX-License-Identifier: GPL-2.0-or-later package Amavis::ZMQ; use strict; use re 'taint'; use warnings; use warnings FATAL => qw(utf8 void); no warnings 'uninitialized'; # use warnings 'extra'; no warnings 'experimental::re_strict'; use re 'strict'; BEGIN { require Exporter; use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS $VERSION); $VERSION = '2.412'; @ISA = qw(Exporter); } use Amavis::Conf qw(:platform $myversion $nanny_details_level); use Amavis::Util qw(ll do_log do_log_safe snmp_initial_oids snmp_counters_get); use vars qw($zmq_mod_name $zmq_mod_version $zmq_lib_version); BEGIN { my($zmq_major, $zmq_minor, $zmq_patch); if (eval { require ZMQ::LibZMQ3 && require ZMQ::Constants }) { $zmq_mod_name = 'ZMQ::LibZMQ3'; # new interface module to zmq v3 or libxs import ZMQ::LibZMQ3; import ZMQ::Constants qw(:all); ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ3::zmq_version(); # *zmq_sendmsg [native] # (socket,msgobj,flags) # *zmq_recvmsg [native] # (socket,flags) -> msgobj *zmq_sendstr = sub { # (socket,string,flags) my $rv = zmq_send($_[0], $_[1], length $_[1], $_[2]||0); $rv == -1 ? undef : $rv; }; } elsif (eval { require ZMQ::LibZMQ2 && require ZMQ::Constants }) { $zmq_mod_name = 'ZMQ::LibZMQ2'; # new interface module to zmq v2 import ZMQ::LibZMQ2; import ZMQ::Constants qw(:all); ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ2::zmq_version(); # zmq v2/v3 incompatibile renaming *zmq_sendmsg = \&ZMQ::LibZMQ2::zmq_send; # (socket,msgobj,flags) *zmq_recvmsg = \&ZMQ::LibZMQ2::zmq_recv; # (socket,flags) -> msgobj *zmq_sendstr = sub { # (socket,string,flags) my $rv = zmq_send(@_); $rv == -1 ? undef : $rv; }; } elsif (eval { require ZeroMQ::Constants && require ZeroMQ::Raw }) { $zmq_mod_name = 'ZeroMQ'; # old interface module to zmq v2 import ZeroMQ::Raw; import ZeroMQ::Constants qw(:all); ($zmq_major, $zmq_minor, $zmq_patch) = ZeroMQ::version(); # zmq v2/v3 incompatibile renaming *zmq_sendmsg = \&ZeroMQ::Raw::zmq_send; # (socket,msgobj,flags) *zmq_recvmsg = \&ZeroMQ::Raw::zmq_recv; # (socket,flags) -> msgobj *zmq_sendstr = sub { # (socket,string,flags) my $rv = zmq_send(@_); $rv == -1 ? undef : $rv; }; } else { die "Perl modules ZMQ::LibZMQ3 or ZMQ::LibZMQ2 or ZeroMQ not available\n"; } $zmq_mod_version = $zmq_mod_name->VERSION; $zmq_lib_version = join('.', $zmq_major, $zmq_minor, $zmq_patch); 1; } # BEGIN sub zmq_version { sprintf("%s %s, lib %s", $zmq_mod_name, $zmq_mod_version, $zmq_lib_version); }; sub new { my($class,@socknames) = @_; my $self = { ctx => undef, sock => undef, inactivated => 0, socknames => [ @socknames ], base_timestamp => undef }; bless $self, $class; $self->establish; $self; } sub inactivate { my $self = $_[0]; $self->{inactivated} = 1; } use vars qw($zmq_in_establish); # prevents loop if logging to zmq sub establish { my $self = $_[0]; return if $self->{inactivated} || $zmq_in_establish; my($ctx,$sock); eval { $zmq_in_establish = 1; $ctx = $self->{ctx}; if (!$ctx) { $self->{sock} = undef; # just in case # do_log(5,'zmq: zmq_init'); $self->{ctx} = $ctx = zmq_init(1); $ctx or die "Error creating ZMQ context: $!"; } $sock = $self->{sock}; if (!$sock && $ctx) { # connect to a socket # do_log(5,'zmq: zmq_socket'); $self->{sock} = $sock = zmq_socket($ctx, ZMQ_PUB); if (!$sock) { die "Error creating ZMQ socket: $!"; } else { # do_log(5,'zmq: zmq_setsockopt'); zmq_setsockopt($sock, ZMQ_LINGER, 2000) != -1 # milliseconds or die "Error setting ZMQ_LINGER on a ZMQ socket: $!"; my $hwm = defined &ZMQ_SNDHWM ? ZMQ_SNDHWM() : defined &ZMQ_HWM ? ZMQ_HWM() : undef; if (defined $hwm) { zmq_setsockopt($sock, $hwm, 1000) != -1 or die "Error setting highwater mark on a ZMQ socket: $!"; } for my $sockspec (@{$self->{socknames}}) { my $sock_ipv4only = 1; # a ZMQ default if (defined &ZMQ_IPV4ONLY && $sockspec =~ /:[0-9a-f]*:/i) { zmq_setsockopt($sock, ZMQ_IPV4ONLY(), 0) != -1 or die "Error turning off ZMQ_IPV4ONLY on a ZMQ socket: $!"; $sock_ipv4only = 0; } # do_log(5,'zmq: zmq_connect %s%s', $sockspec, # $sock_ipv4only ? '' : ', IPv6 enabled'); zmq_connect($sock, $sockspec) == 0 or die "Error connecting ZMQ socket to $sockspec: $!"; } } } 1; } or do { # clean up, disable, and resignal a failure zmq_close($sock) if $sock; # ignoring status zmq_term($ctx) if $ctx; # ignoring status undef $self->{sock}; undef $self->{ctx}; $self->{inactivated} = 1; $zmq_in_establish = 0; chomp $@; die "zmq establish failed: $@\n"; # propagate the exception }; $zmq_in_establish = 0; $sock; } sub DESTROY { my $self = $_[0]; local($@,$!,$_); # can occur soon after fork, must not use context (like calling a logger) if (!$self->{inactivated}) { my $sock = $self->{sock}; if ($sock) { zmq_setsockopt($sock, ZMQ_LINGER, 0); # ignoring status zmq_close($sock); # ignoring status } my $ctx = $self->{ctx}; zmq_term($ctx) if $ctx; # ignoring status } undef $self->{sock}; undef $self->{ctx}; %{$self} = (); # then ditch the rest } sub register_proc { my($self, $details_level, $reset_timestamp, $state, $task_id) = @_; my $sock = $self->{sock}; # = $self->establish; return if !$sock; # if (!defined $state || $details_level <= $nanny_details_level) { if (1) { my $pid = $$; my $msg; my $now = Time::HiRes::time; if ($reset_timestamp || !$self->{base_timestamp}) { $self->{base_timestamp} = $now; $msg = sprintf('am.st %d %014.3f ', $pid, $now); } else { my $dt = $now - $self->{base_timestamp}; $msg = sprintf('am.st %d %d ', $pid, $dt <= 0 ? 0 : int($dt*1000 + 0.5)); } if (!defined $state) { $msg .= 'exiting'; } else { $state = '-' if $state eq ' ' || $state eq ''; # simplifies parsing $msg .= $state; $msg .= ' ' . $task_id if defined $task_id; } # do_log(5,'zmq: register_proc: %s', $msg); defined zmq_sendstr($sock, $msg) or die "Error sending a ZMQ message: $!"; } } sub write_log { # my($self, $level, $errmsg) = @_; my $self = $_[0]; my $sock = $self->{sock}; # = $self->establish; return if !$sock; my $level = $_[1]; my $nstars = 6 - $level; $nstars = 7 if $nstars > 7; $nstars = 1 if $nstars < 1; # ignoring status to prevent a logging loop zmq_sendstr($sock, sprintf('am.log.%s %s %014.3f %s', '*' x $nstars, $$, Time::HiRes::time, $_[2])); } # insert startup time SNMP entry, called from the master process at startup # sub put_initial_snmp_data { my($self,$flush) = @_; my $sock = $self->{sock}; # = $self->establish; return if !$sock; # do_log(5,'zmq: publishing initial snmp data'); if ($flush) { # do_log(5,'zmq: sending am.snmp FLUSH'); defined zmq_sendstr($sock, 'am.snmp FLUSH') or die "Error sending a ZMQ flush message: $!"; } my $list_ref = snmp_initial_oids(); my $list_ind_last = $#{$list_ref}; for my $obj_ind (0 .. $list_ind_last) { my($key,$type,$val) = @{$list_ref->[$obj_ind]}; my $more = $obj_ind < $list_ind_last; my $msg = sprintf('am.snmp %s %s %s', $key, $type, $val); # do_log(5,'zmq: sending %s %s', $more?'M':' ', $msg); defined zmq_sendstr($sock, $msg, $more ? ZMQ_SNDMORE : 0) or die "Error sending a ZMQ message: $!"; }; } sub update_snmp_variables { my $self = $_[0]; my $sock = $self->{sock}; # = $self->establish; return if !$sock; my $msg; my $snmp_var_names_ref = snmp_counters_get(); if (defined $snmp_var_names_ref && @$snmp_var_names_ref) { do_log(4,'zmq: updating snmp variables'); for my $key (@$snmp_var_names_ref) { my($snmp_var_name, $val, $type) = ref $key ? @$key : ($key); if ($snmp_var_name eq 'entropy') { next; # don't broadcast entropy } elsif (!defined $type || $type eq '') { # a counter, same as C32 $type = 'C32'; $val = 1 if !defined $val; # by default a counter increments by 1 next if $val < 0; # a counter is supposed to be unsigned } elsif ($type eq 'C32' || $type eq 'C64') { # a counter $val = 1 if !defined $val; # by default a counter increments by 1 next if $val < 0; # a counter is supposed to be unsigned } elsif ($type eq 'INT') { # an integer # no limitations here, sprintf will convert it to a string } elsif ($type eq 'TIM') { # TimeTicks next if $val < 0; # non-decrementing } if (defined $msg) { # send assembled message from previous iteration # do_log(5,'zmq: sending M %s', $msg); defined zmq_sendstr($sock, $msg, ZMQ_SNDMORE) or die "Error sending a ZMQ message: $!"; } $msg = sprintf('am.snmp %s %s %s', $snmp_var_name, $type, $val); } if (defined $msg) { # last chunk of a multi-part message # do_log(5,'zmq: sending %s', $msg); defined zmq_sendstr($sock, $msg, 0) or die "Error sending a ZMQ message: $!"; } } } 1;