Server IP : 85.214.239.14 / Your IP : 18.224.60.132 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/task/2/root/usr/share/perl5/Amavis/ |
Upload File : |
# SPDX-License-Identifier: GPL-2.0-or-later package Amavis::TinyRedis; use strict; use re 'taint'; use warnings; # use warnings 'extra'; no warnings 'experimental::re_strict'; use re 'strict'; use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED); use IO::Socket::UNIX; use Time::HiRes (); use vars qw($VERSION); BEGIN { $VERSION = '1.000'; } use Amavis::Conf qw(:platform); # $io_socket_module_name sub new { my($class, %args) = @_; my $self = bless { args => {%args} }, $class; my $outbuf = ''; $self->{outbuf} = \$outbuf; $self->{batch_size} = 0; $self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379'; $self->{on_connect} = $args{on_connect}; return if !$self->connect; $self; } sub DESTROY { my $self = $_[0]; local($@, $!, $_); undef $self->{sock}; } sub disconnect { my $self = $_[0]; local($@, $!); undef $self->{sock}; } sub connect { my $self = $_[0]; $self->disconnect; my $sock; my $server = $self->{server}; if ($server =~ m{^/}) { $sock = IO::Socket::UNIX->new( Peer => $server, Type => SOCK_STREAM); } elsif ($server =~ /^(?: \[ ([^\]]+) \] | ([^:]+) ) : ([^:]+) \z/xs) { $server = defined $1 ? $1 : $2; my $port = $3; $sock = $io_socket_module_name->new( PeerAddr => $server, PeerPort => $port, Proto => 'tcp'); } else { die "Invalid 'server:port' specification: $server"; } if ($sock) { $self->{sock} = $sock; $self->{sock_fd} = $sock->fileno; $self->{fd_mask} = ''; vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1; # an on_connect() callback must not use batched calls! $self->{on_connect}->($self) if $self->{on_connect}; } $sock; } # Receive, parse and return $cnt consecutive redis replies as a list. # sub _response { my($self, $cnt) = @_; my $sock = $self->{sock}; if (!$sock) { $self->connect or die "Connect failed: $!"; $sock = $self->{sock}; }; my @list; for (1 .. $cnt) { my $result = <$sock>; if (!defined $result) { $self->disconnect; die "Error reading from Redis server: $!"; } chomp $result; my $resp_type = substr($result, 0, 1, ''); if ($resp_type eq '$') { # bulk reply if ($result < 0) { push(@list, undef); # null bulk reply } else { my $data = ''; my $ofs = 0; my $len = $result + 2; while ($len > 0) { my $nbytes = read($sock, $data, $len, $ofs); if (!$nbytes) { $self->disconnect; defined $nbytes or die "Error reading from Redis server: $!"; die "Redis server closed connection"; } $ofs += $nbytes; $len -= $nbytes; } chomp $data; push(@list, $data); } } elsif ($resp_type eq ':') { # integer reply push(@list, 0+$result); } elsif ($resp_type eq '+') { # status reply push(@list, $result); } elsif ($resp_type eq '*') { # multi-bulk reply push(@list, $result < 0 ? undef : $self->_response(0+$result) ); } elsif ($resp_type eq '-') { # error reply die "$result\n"; } else { die "Unknown Redis reply: $resp_type ($result)"; } } \@list; } sub _write_buff { my($self, $bufref) = @_; if (!$self->{sock}) { $self->connect or die "Connect failed: $!" }; my $nwrite; for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) { # to reliably detect a disconnect we need to check for an input event # using a select; checking status of syswrite is not sufficient my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask}; my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef); defined $nfound && $nfound >= 0 or die "Select failed: $!"; if (vec($rout, $self->{sock_fd}, 1) && !sysread($self->{sock}, $inbuff, 1024)) { # eof, try reconnecting $self->connect or die "Connect failed: $!"; } local $SIG{PIPE} = 'IGNORE'; # don't signal on a write to a widowed pipe $nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs); next if defined $nwrite; $nwrite = 0; if ($! == EINTR || $! == EAGAIN) { # no big deal, try again Time::HiRes::sleep(0.1); # slow down, just in case } else { $self->disconnect; if ($! == ENOTCONN || $! == EPIPE || $! == ECONNRESET || $! == ECONNABORTED) { $self->connect or die "Connect failed: $!"; } else { die "Error writing to redis socket: $!"; } } } 1; } # Send a redis command with arguments, returning a redis reply. # sub call { my $self = shift; my $buff = '*' . scalar(@_) . "\015\012"; $buff .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_; $self->_write_buff(\$buff); local($/) = "\015\012"; my $arr_ref = $self->_response(1); $arr_ref && $arr_ref->[0]; } # Append a redis command with arguments to a batch. # sub b_call { my $self = shift; my $bufref = $self->{outbuf}; $$bufref .= '*' . scalar(@_) . "\015\012"; $$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_; ++ $self->{batch_size}; } # Send a batch of commands, returning an arrayref of redis replies, # each array element corresponding to one command in a batch. # sub b_results { my $self = $_[0]; my $batch_size = $self->{batch_size}; return if !$batch_size; my $bufref = $self->{outbuf}; $self->_write_buff($bufref); $$bufref = ''; $self->{batch_size} = 0; local($/) = "\015\012"; $self->_response($batch_size); } 1;