Server IP : 85.214.239.14 / Your IP : 3.145.202.60 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /usr/share/perl5/Amavis/IO/ |
Upload File : |
package Amavis::IO::SQL; # an IO wrapper around SQL for inserting/retrieving mail text # to/from a database use strict; use re 'taint'; use warnings; use warnings FATAL => qw(utf8 void); no warnings 'uninitialized'; # use warnings 'extra'; no warnings 'experimental::re_strict'; use re 'strict'; BEGIN { require Exporter; use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS $VERSION); $VERSION = '2.412'; @ISA = qw(Exporter); } use Errno qw(ENOENT EACCES EIO); use DBI qw(:sql_types); # use DBD::Pg; use Amavis::Util qw(ll do_log untaint min max minmax); sub new { my $class = shift; my $self = bless {}, $class; if (@_) { $self->open(@_) or return } $self; } sub open { my $self = shift; if (exists $self->{conn_h}) { eval { $self->close } or 1; # ignore failure, make perlcritic happy } @$self{qw(conn_h clause dbkey mode partition_tag maxbuf rx_time)} = @_; my $conn_h = $self->{conn_h}; $self->{buf} = ''; $self->{chunk_ind} = $self->{pos} = $self->{bufpos} = $self->{eof} = 0; my $driver; my $eval_stat; eval { $driver = $conn_h->driver_name; 1 } or do { $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat }; die $eval_stat if $eval_stat =~ /^timed out\b/; # resignal timeout if ($self->{mode} eq 'w') { # open for write access ll(4) && do_log(4,"Amavis::IO::SQL::open %s drv=%s (%s); key=%s, p_tag=%s", $self->{mode}, $driver, $self->{clause}, $self->{dbkey}, $self->{partition_tag}); } else { # open for read access $eval_stat = undef; eval { $conn_h->execute($self->{clause}, $self->{partition_tag},$self->{dbkey}); 1; } or do { $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat }; my $ll = $eval_stat ne '' ? -1 : 4; do_log($ll,"Amavis::IO::SQL::open %s drv=%s (%s); key=%s, p_tag=%s, s: %s", $self->{mode}, $driver, $self->{clause}, $self->{dbkey}, $self->{partition_tag}, $eval_stat) if ll($ll); if ($eval_stat ne '') { if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout else { die "Amavis::IO::SQL::open $driver SELECT error: $eval_stat" } $! = EIO; return; # not reached } $eval_stat = undef; eval { # fetch the first chunk; if missing treat it as a file-not-found my $a_ref = $conn_h->fetchrow_arrayref($self->{clause}); if (!defined($a_ref)) { $self->{eof} = 1 } else { $self->{buf} = $a_ref->[0]; $self->{chunk_ind}++ } 1; } or do { $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat; if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout else { die "Amavis::IO::SQL::open $driver read error: $eval_stat" } $! = EIO; return; # not reached }; if ($self->{eof}) { # no records, make it look like a missing file do_log(0,"Amavis::IO::SQL::open key=%s, p_tag=%s: no such record", $self->{dbkey}, $self->{partition_tag}); $! = ENOENT; # No such file or directory return; } } $self; } sub DESTROY { my $self = $_[0]; local($@,$!,$_); my $myactualpid = $$; if ($self && $self->{conn_h}) { eval { $self->close or die "Error closing: $!"; 1; } or do { my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat; warn "[$myactualpid] Amavis::IO::SQL::close error: $eval_stat"; }; delete $self->{conn_h}; } } sub close { my $self = $_[0]; my $eval_stat; eval { if ($self->{mode} eq 'w') { $self->flush or die "Can't flush: $!"; } elsif ($self->{conn_h} && $self->{clause} && !$self->{eof}) { # reading, closing before eof was reached $self->{conn_h}->finish($self->{clause}) or die "Can't finish: $!"; }; 1; } or do { $eval_stat = $@ ne '' ? $@ : "errno=$!"; }; delete @$self{ qw(conn_h clause dbkey mode maxbuf rx_time buf chunk_ind pos bufpos eof) }; if (defined $eval_stat) { chomp $eval_stat; if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout else { die "Error closing, $eval_stat" } $! = EIO; return; # not reached } 1; } sub seek { my($self,$pos,$whence) = @_; $whence == 0 or die "Only absolute seek is supported on sql i/o"; $pos >= 0 or die "Can't seek to a negative absolute position on sql i/o"; ll(5) && do_log(5, "Amavis::IO::SQL::seek mode=%s, pos=%s", $self->{mode}, $pos); $self->{mode} ne 'w' or die "Seek to $whence,$pos on sql i/o only supported for read mode"; if ($pos < $self->{pos}) { if (!$self->{eof} && $self->{chunk_ind} <= 1) { # still in the first chunk, just reset pos $self->{pos} = $self->{bufpos} = 0; # reset } else { # beyond the first chunk, restart the query from the beginning my($con,$clause,$key,$mode,$partition_tag,$maxb,$rx_time) = @$self{qw(conn_h clause dbkey mode partition_tag maxbuf rx_time)}; $self->close or die "seek: error closing, $!"; $self->open($con,$clause,$key,$mode,$partition_tag,$maxb,$rx_time) or die "seek: reopen failed: $!"; } } my $skip = $pos - $self->{pos}; if ($skip > 0) { my $s; my $nbytes = $self->read($s,$skip); # acceptable for small skips defined $nbytes or die "seek: error skipping $skip bytes on sql i/o: $!"; } 1; # seek is supposed to return 1 upon success, 0 otherwise } sub read { # SCALAR,LENGTH,OFFSET my $self = shift; my $req_len = $_[1]; my $offset = $_[2]; my $conn_h = $self->{conn_h}; my $a_ref; ll(5) && do_log(5, "Amavis::IO::SQL::read, %d, %d", $self->{chunk_ind}, $self->{bufpos}); eval { while (!$self->{eof} && length($self->{buf})-$self->{bufpos} < $req_len) { $a_ref = $conn_h->fetchrow_arrayref($self->{clause}); if (!defined($a_ref)) { $self->{eof} = 1 } else { $self->{buf} .= $a_ref->[0]; $self->{chunk_ind}++ } } 1; } or do { my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat; # we can't stash an arbitrary error message string into $!, # which forces us to use 'die' to properly report an error if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout else { die "read: sql select failed, $eval_stat" } $! = EIO; return; # not reached }; my $nbytes; if (!defined($offset) || $offset == 0) { $_[0] = substr($self->{buf}, $self->{bufpos}, $req_len); $nbytes = length($_[0]); } else { my $buff = substr($self->{buf}, $self->{bufpos}, $req_len); substr($_[0],$offset) = $buff; $nbytes = length($buff); } $self->{bufpos} += $nbytes; $self->{pos} += $nbytes; if ($self->{bufpos} > 0 && $self->{chunk_ind} > 1) { # discard used-up part of the buf unless at ch.1, which may still be useful ll(5) && do_log(5,"read: moving on by %d chars", $self->{bufpos}); $self->{buf} = substr($self->{buf},$self->{bufpos}); $self->{bufpos} = 0; } $nbytes; # eof: 0, error: undef } sub getline { my $self = $_[0]; my $conn_h = $self->{conn_h}; ll(5) && do_log(5, "Amavis::IO::SQL::getline, chunk %d, pos %d", $self->{chunk_ind}, $self->{bufpos}); my($a_ref,$line); my $ind = -1; eval { while (!$self->{eof} && ($ind=index($self->{buf},"\n",$self->{bufpos})) < 0) { $a_ref = $conn_h->fetchrow_arrayref($self->{clause}); if (!defined($a_ref)) { $self->{eof} = 1 } else { $self->{buf} .= $a_ref->[0]; $self->{chunk_ind}++ } } 1; } or do { my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat; if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout else { die "getline: reading sql select results failed, $eval_stat" } $! = EIO; return; # not reached }; if ($ind < 0 && $self->{eof}) # imply a NL before eof if missing { $self->{buf} .= "\n"; $ind = index($self->{buf}, "\n", $self->{bufpos}) } $ind >= 0 or die "Programming error, NL not found"; if (length($self->{buf}) > $self->{bufpos}) { # nonempty buffer? $line = substr($self->{buf}, $self->{bufpos}, $ind+1-$self->{bufpos}); my $nbytes = length($line); $self->{bufpos} += $nbytes; $self->{pos} += $nbytes; if ($self->{bufpos} > 0 && $self->{chunk_ind} > 1) { # discard used part of the buf unless at ch.1, which may still be useful ll(5) && do_log(5,"getline: moving on by %d chars", $self->{bufpos}); $self->{buf} = substr($self->{buf},$self->{bufpos}); $self->{bufpos} = 0; } } # eof: undef, $! zero; error: undef, $! nonzero $! = 0; $line eq '' ? undef : $line; } sub flush { my $self = $_[0]; return if $self->{mode} ne 'w'; my $msg; my $conn_h = $self->{conn_h}; while ($self->{buf} ne '') { my $ind = $self->{chunk_ind} + 1; ll(4) && do_log(4, "sql flush: key: (%s, %d), p_tag=%s, rx_t=%d, size=%d", $self->{dbkey}, $ind, $self->{partition_tag}, $self->{rx_time}, min(length($self->{buf}),$self->{maxbuf})); eval { my $driver = $conn_h->driver_name; $conn_h->execute($self->{clause}, $self->{partition_tag}, $self->{dbkey}, $ind, # int($self->{rx_time}), [ untaint(substr($self->{buf},0,$self->{maxbuf})), $driver eq 'Pg' ? { pg_type => DBD::Pg::PG_BYTEA() } : SQL_BLOB ] ); 1; } or do { my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat; $msg = $eval_stat; }; last if defined $msg; substr($self->{buf},0,$self->{maxbuf}) = ''; $self->{chunk_ind} = $ind; } if (defined $msg) { chomp $msg; if ($msg =~ /^timed out\b/) { die $msg } # resignal timeout else { $msg = "flush: sql inserting text failed, $msg"; die $msg; # we can't stash an arbitrary error message string into $!, # which forces us to use 'die' to properly report an error } $! = EIO; return; # not reached } 1; } sub print { my $self = shift; $self->{mode} eq 'w' or die "Can't print, not opened for writing"; my $buff_ref = @_ == 1 ? \$_[0] : \join('',@_); my $len = length($$buff_ref); my $nbytes; my $conn_h = $self->{conn_h}; if ($len <= 0) { $nbytes = "0 but true" } else { $self->{buf} .= $$buff_ref; $self->{pos} += $len; $nbytes = $len; while (length($self->{buf}) >= $self->{maxbuf}) { my $ind = $self->{chunk_ind} + 1; ll(4) && do_log(4, "sql print: key: (%s, %d), p_tag=%s, size=%d", $self->{dbkey}, $ind, $self->{partition_tag}, $self->{maxbuf}); eval { my $driver = $conn_h->driver_name; $conn_h->execute($self->{clause}, $self->{partition_tag}, $self->{dbkey}, $ind, # int($self->{rx_time}), [ untaint(substr($self->{buf},0,$self->{maxbuf})), $driver eq 'Pg' ? { pg_type => DBD::Pg::PG_BYTEA() } : SQL_BLOB ] ); 1; } or do { my $eval_stat = $@ ne '' ? $@ : "errno=$!"; chomp $eval_stat; # we can't stash an arbitrary error message string into $!, # which forces us to use 'die' to properly report an error if ($eval_stat =~ /^timed out\b/) { die $eval_stat } # resignal timeout else { die "print: sql inserting mail text failed, $eval_stat" } $! = EIO; return; # not reached }; substr($self->{buf},0,$self->{maxbuf}) = ''; $self->{chunk_ind} = $ind; } } $nbytes; } sub printf { shift->print(sprintf(shift,@_)) } 1;