Server IP : 85.214.239.14 / Your IP : 3.145.92.96 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/task/2/root/proc/3/root/proc/2/task/2/root/proc/2/root/usr/share/perl5/Amavis/ |
Upload File : |
# SPDX-License-Identifier: GPL-2.0-or-later package Amavis::Redis; use strict; use re 'taint'; use warnings; use warnings FATAL => qw(utf8 void); no warnings 'uninitialized'; # use warnings 'extra'; no warnings 'experimental::re_strict'; use re 'strict'; BEGIN { require Exporter; use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS $VERSION); $VERSION = '2.412'; @ISA = qw(Exporter); } use Amavis::Conf qw(:platform :confvars c cr ca); use Amavis::JSON; use Amavis::Lookup::IP qw(lookup_ip_acl normalize_ip_addr); use Amavis::rfc2821_2822_Tools; use Amavis::Timing qw(section_time); use Amavis::Util qw(ll do_log do_log_safe min max minmax untaint safe_encode safe_encode_utf8 idn_to_ascii format_time_interval unique_list snmp_count); use Amavis::TinyRedis; sub new { my($class, @redis_dsn) = @_; bless { redis_dsn => \@redis_dsn }, $class; } sub disconnect { my $self = $_[0]; # do_log(5, "redis: disconnect"); $self->{connected} = 0; undef $self->{redis}; } sub on_connect { my($self, $r) = @_; my $db_id = $self->{db_id} || 0; do_log(5, "redis: on_connect, db_id %d", $db_id); eval { $r->call('SELECT', $db_id) eq 'OK' ? 1 : 0; } or do { if ($@ =~ /^NOAUTH\b/ || $@ =~ /^ERR operation not permitted/) { defined $self->{password} or die "Redis server requires authentication, no password provided"; $r->call('AUTH', $self->{password}); $r->call('SELECT', $db_id); } else { chomp $@; die "Command 'SELECT $db_id' failed: $@"; } }; eval { $r->call('CLIENT', 'SETNAME', 'amavis['.$$.']') eq 'OK' ? 1 : 0; } or do { # no big deal, just log do_log(5, "redis: command 'CLIENT SETNAME' failed: %s", $@); }; 1; } sub connect { my $self = $_[0]; # do_log(5, "redis: connect"); $self->disconnect if $self->{connected}; $self->{redis} = $self->{db_id} = $self->{ttl} = undef; my($r, $err, $dsn, %options); my $dsn_list_ref = $self->{redis_dsn}; for my $j (1 .. @$dsn_list_ref) { $dsn = $dsn_list_ref->[0]; %options = ref $dsn eq 'HASH' ? %$dsn : (); # expiration time (time-to-live) is 16 days by default $self->{ttl} = exists $options{ttl} ? $options{ttl} : $storage_redis_ttl; $self->{db_id} = $options{db_id}; if (defined $options{password}) { $self->{password} = $options{password}; $options{password} = '(hidden)'; # for logging purposes } undef $err; eval { my %opt = %options; delete @opt{qw(ttl db_id password)}; $opt{server} = idn_to_ascii($opt{server}) if defined $opt{server}; $r = Amavis::TinyRedis->new(on_connect => sub { $self->on_connect(@_) }, %opt); $r or die "Error: $!"; } or do { undef $r; $err = $@; chomp $err; }; $self->{redis} = $r; last if $r; # success, done if ($j < @$dsn_list_ref) { # not all tried yet do_log(0, "Can't connect to a redis server, %s: %s; trying next", join(' ',%options), $err); push(@$dsn_list_ref, shift @$dsn_list_ref); # rotate left } } if (!$r) { $self->{redis} = $self->{db_id} = $self->{ttl} = undef; die sprintf("Can't connect to a redis server %s: %s\n", join(' ',%options), $err); } $self->{connected} = 1; ll(5) && do_log(5, "redis: connected to: %s, ttl %s s", !defined $options{server} ? 'default server' : join(' ',%options), $self->{ttl}||'x'); section_time("redis-connect"); $self->load_lua_programs; $r; } sub DESTROY { my $self = $_[0]; local($@,$!,$_); do_log_safe(5,"Amavis::Redis DESTROY called"); # ignore potential errors during DESTROY of a Redis object eval { $self->{connected} = 0; undef $self->{redis} }; } # find a penpals record which proves that a local user (sender) really sent a # mail to a given recipient some time ago. Returns an interval time in seconds # since the last such mail was sent by our local user to a specified recipient # (or undef if information is not available). If @$message_id_list is a # nonempty list of Message-IDs as found in References header field, the query # also finds previous outgoing messages with a matching Message-ID but # possibly to recipients different from what the mail was originally sent to. # sub penpals_find { my($self, $msginfo, $message_id_list) = @_; my $sender = $msginfo->sender; $message_id_list = [] if !$message_id_list; return if !@$message_id_list && $sender eq ''; # inbound or internal_to_internal, except self_to_self my(@per_recip_data) = grep(!$_->recip_done && $_->recip_is_local && lc($sender) ne lc($_->recip_addr), @{$msginfo->per_recip_data}); return if !@per_recip_data; # do_log(5, "redis: penpals_find"); snmp_count('PenPalsAttempts'); my $sender_smtp = $msginfo->sender_smtp; local($1); $sender_smtp =~ s/^<(.*)>\z/$1/s; my(@recip_addresses) = map { my $a = $_->recip_addr_smtp; $a =~ s/^<(.*)>\z/$1/s; lc $a } @per_recip_data; # NOTE: swap recipient and sender in a query here, as we are # now checking for a potential reply mail - whether the current # recipient has recently sent any mail to the sender of the # current mail: # no need for cryptographical strength, just checking for protocol errors my $nonce = $msginfo->mail_id; my $result; my @args = ( 0, sprintf("%.0f",$msginfo->rx_time), $nonce, lc $sender_smtp, scalar @recip_addresses, @recip_addresses, scalar @$message_id_list, @$message_id_list, ); eval { $self->connect if !$self->{connected}; $result = $self->{redis}->call('EVALSHA', $self->{lua_query_penpals}, @args); 1; } or do { # Lua function probably not cached, define again and re-try if ($@ !~ /^NOSCRIPT/) { $self->disconnect; undef $result; chomp $@; do_log(-1, 'penpals_find, Redis Lua error: %s', $@); } else { $self->load_lua_programs; $result = $self->{redis}->call('EVALSHA', $self->{lua_query_penpals}, @args); } }; my $ok = 1; if (!$result || !@$result) { $ok = 0; $self->disconnect; do_log(0, "redis: penpals_find - no results"); } else { my $r_nonce = pop(@$result); if (!defined($r_nonce) || $r_nonce ne $nonce) { # redis protocol falling out of step? $ok = 0; $self->disconnect; do_log(-1,"redis: penpals_find - nonce mismatch, expected %s, got %s", $nonce, defined $r_nonce ? $r_nonce : 'UNDEF'); } } if ($ok && (@$result != @per_recip_data)) { $ok = 0; $self->disconnect; do_log(-1,"redis: penpals_find - number of results expected %d, got %d", scalar @per_recip_data, scalar @$result); } if ($ok) { for my $r (@per_recip_data) { my $result_entry = shift @$result; next if !$result_entry; my($sid, $rid, $send_time, $best_ref_mail_id, $report) = @$result_entry; if (!$send_time) { # undef or empty (or zero) snmp_count('PenPalsMisses'); ll(4) && do_log(4, "penpals: (redis) not found (%s,%s)%s%s", $sid ? $sid : $r->recip_addr_smtp, $rid ? $rid : $msginfo->sender_smtp, !$report ? '' : ', refs: '.$report, !@$message_id_list ? '' : '; '.join(', ',@$message_id_list) ); } else { # found a previous related correspondence snmp_count('PenPalsHits'); my $age = max(0, $msginfo->rx_time - $send_time); $r->recip_penpals_age($age); $r->recip_penpals_related($best_ref_mail_id); ll(3) && do_log(3, "penpals: (redis) found (%s,%s) age %s%s", $sid ? $sid : $r->recip_addr_smtp, $rid ? $rid : $msginfo->sender_smtp, format_time_interval($age), !$report ? '' : ', refs: '.$report ); # $age and $best_ref_mail_id are not logged explicitly, # as they can be seen in the first entry of a lua query report # (i.e. the last string) } } } $ok; } sub save_info_preliminary { my($self, $msginfo) = @_; my $mail_id = $msginfo->mail_id; defined $mail_id or die "save_info_preliminary: mail_id still undefined"; $self->connect if !$self->{connected}; ll(5) && do_log(5, 'redis: save_info_preliminary: %s, %s, ttl %s s', $mail_id, int $msginfo->rx_time, $self->{ttl}||'x'); # use Lua to do HSETNX *and* EXPIRE atomically, otherwise we risk inserting # a key with no expiration time if redis server goes down inbetween my $added; my $r = $self->{redis}; my(@args) = (1, $mail_id, int $msginfo->rx_time, $self->{ttl} ? int $self->{ttl} : 0); eval { $added = $r->call('EVALSHA', $self->{lua_save_info_preliminary}, @args); 1; } or do { # Lua function probably not cached, define again and re-try if ($@ !~ /^NOSCRIPT/) { $self->disconnect; chomp $@; do_log(-1, 'save_info_preliminary, Redis Lua error: %s', $@); } else { $self->load_lua_programs; $added = $r->call('EVALSHA', $self->{lua_save_info_preliminary}, @args); } }; $self->disconnect if !$database_sessions_persistent; $added; # 1 if added successfully, false otherwise } sub query_and_update_ip_reputation { my($self, $msginfo) = @_; my $ip_trace_ref = $msginfo->ip_addr_trace_public; return if !$ip_trace_ref; my @ip_trace = unique_list($ip_trace_ref); return if !@ip_trace; # Irwin-Hall distribution - approximates normal distribution # n = 4, mean = n/2, variance = n/12, sigma = sqrt(n/12) =~ 0.577 my $normal_random = (rand() + rand() + rand() + rand() - 2) / 0.577; my(@args) = (scalar @ip_trace, map("ip:$_",@ip_trace), sprintf("%.3f", $msginfo->rx_time), sprintf("%.6f", $normal_random) ); my($r, $ip_stats); eval { $self->connect if !$self->{connected}; $r = $self->{redis}; $ip_stats = $r->call('EVALSHA', $self->{lua_query_and_update_ip}, @args); 1; } or do { # Lua function probably not cached, define again and re-try if ($@ !~ /^NOSCRIPT/) { $self->disconnect; chomp $@; do_log(-1, "query_and_update_ip_reputation, Redis Lua error: %s", $@); } else { $self->load_lua_programs; $ip_stats = $r->call('EVALSHA', $self->{lua_query_and_update_ip}, @args); } }; my($highest_score, $worst_ip); for my $entry (!$ip_stats ? () : @$ip_stats) { my($ip, $n_all, $s, $h, $b, $tfirst, $tlast, $ttl) = @$entry; $ip =~ s/^ip://s; # strip key prefix # the current event is not yet counted nor classified if ($n_all <= 0) { do_log(5, "redis: IP %s ttl: %.1f h", $ip, $ttl/3600); } else { my $n_other = $n_all - ($s + $h + $b); if ($n_other < 0) { $n_all = $s + $h + $b; $n_other = 0 } # just in case my $bad_content_ratio = ($s+$b) / $n_all; # gains strength by the number of samples, watered down by share of ham my $score = !($s+$b) ? 0 : 0.9 * ($n_all**0.36) * exp(-6 * $h/$n_all); my $ip_ignore; if ($score >= 0.05) { # it is cheaper to do a redis/lookup unconditionally, # then ditch an ignored IP address later if necessary my($key, $err); ($ip_ignore, $key, $err) = lookup_ip_acl($ip, @{ca('ip_repu_ignore_maps')}); undef $ip_ignore if $err; } my $ll = ($score <= 0 || $ip_ignore) ? 3 : 2; # log level if (ll($ll)) { my $rxtime = $msginfo->rx_time; do_log($ll, "redis: IP %s age: %s%s, ttl: %.1f h, %s, %s%s", $ip, format_time_interval($rxtime-$tfirst), defined $tlast ? ', last: '.format_time_interval($rxtime-$tlast) :'', $ttl/3600, $n_other ? ($b ? "s/h/bv/?: $s/$h/$b/$n_other" : "s/h/?: $s/$h/$n_other") : ($b ? "s/h/bv: $s/$h/$b" : "s/h: $s/$h"), $score <= 0 ? 'clean' : sprintf("%.0f%%, score: %.1f", 100*$bad_content_ratio, $score), $ip_ignore ? ' =>0 ip_repu_ignore' : ''); } $score = 0 if $ip_ignore || $score < 0.05; if (!defined $highest_score || $score > $highest_score) { $highest_score = $score; $worst_ip = $ip; } } } $self->disconnect if !$database_sessions_persistent; ($highest_score, $worst_ip); } sub save_structured_report { my($self, $report_ref, $log_key, $queue_size_limit) = @_; return if !$report_ref; $self->connect if !$self->{connected}; my $r = $self->{redis}; my $report_json = Amavis::JSON::encode($report_ref); # as string of chars # use safe_encode() instead of safe_encode_utf8() here, this way we ensure # the resulting string of octets is always a valid UTF-8, even in case # of a non-ASCII input string with utf8 flag off $report_json = safe_encode('UTF-8', $report_json); # convert to octets do_log(5, "redis: structured_report: %s %s", $log_key, $report_json); $r->b_call("RPUSH", $log_key, $report_json); # keep most recent - queue size limit in case noone is pulling events $r->b_call("LTRIM", $log_key, -$queue_size_limit, -1) if $queue_size_limit; my $res = $r->b_results; # errors will be signalled do_log(5, "redis: save_structured_report, %d bytes, q_lim=%s, q_size=%s", length $report_json, $queue_size_limit || 0, $res ? join(', ',@$res) : '?') if ll(5); 1; } sub save_info_final { my($self, $msginfo, $report_ref) = @_; $self->connect if !$self->{connected}; my $r = $self->{redis}; if (c('enable_ip_repu')) { my $rigm = ca('ip_repu_ignore_maps'); my $ip_trace_ref = $msginfo->ip_addr_trace_public; my @ip_trace; @ip_trace = grep { my($ignore, $key, $err) = lookup_ip_acl($_, @$rigm); !$ignore || $err; } unique_list($ip_trace_ref) if $ip_trace_ref; if (@ip_trace) { my $content = $msginfo->is_in_contents_category(CC_VIRUS) ? 'b' : $msginfo->is_in_contents_category(CC_BANNED) ? 'b' : undef; if (!defined $content) { # test for ham or spam my($min, $max); for my $r (@{$msginfo->per_recip_data}) { my $spam_level = $r->spam_level; next if !defined $spam_level; $max = $spam_level if !defined $max || $spam_level > $max; $min = $spam_level if !defined $min || $spam_level < $min; } if (defined $min) { my $ip_repu_score = $msginfo->ip_repu_score || 0; # positive or 0 # avoid self-reinforcing feedback in the IP reputation auto-learning, # use the score without the past IP reputation contribution if ($max - $ip_repu_score < 0.5) { $content = 'h' } elsif ($min - $ip_repu_score >= 5) { $content = 's' } } } if (!defined $content) { # just increment the total counter $r->b_call("HINCRBY", "ip:$_", 'n', 1) for @ip_trace; $r->b_results; if (ll(5)) { do_log(5,"redis: IP INCR %s", $_) for @ip_trace } } else { # content type is known for (@ip_trace) { $r->b_call("HINCRBY", "ip:$_", 'n', 1); $r->b_call("HINCRBY", "ip:$_", $content, 1); } my $counts = $r->b_results; if (ll(5) && $counts) { do_log(5,"redis: IP INCR %s n=%d, %s=%d", $_, shift @$counts, $content, shift @$counts) for @ip_trace; } } } } if (!$msginfo->originating) { # don't bother saving info on incoming messages, saves Redis storage # while still offering necessary data for a pen pals function $self->disconnect if !$database_sessions_persistent; return; } my $mail_id = $msginfo->mail_id; defined $mail_id or die "save_info_preliminary: mail_id still undefined"; my $sender_smtp = $msginfo->sender_smtp; local($1); $sender_smtp =~ s/^<(.*)>\z/$1/s; my(@recips); # only recipients which did receive a message for my $r (@{$msginfo->per_recip_data}) { my($dest, $resp) = ($r->recip_destiny, $r->recip_smtp_response); next if $dest != D_PASS || ($r->recip_done && $resp !~ /^2/); my $addr_smtp = $r->recip_addr_smtp; next if !defined $addr_smtp; local($1); $addr_smtp =~ s/^<(.*)>\z/$1/s; # don't remember messages sent to self next if lc($sender_smtp) eq lc($addr_smtp); # don't remember problematic outgoing messages, even if delivered next if $r->is_in_contents_category(CC_VIRUS) || $r->is_in_contents_category(CC_BANNED) || $r->is_in_contents_category(CC_SPAM) || # kill_level $r->is_in_contents_category(CC_SPAMMY); # tag2_level push(@recips, lc $addr_smtp); } my $m_id = $msginfo->get_header_field_body('message-id'); $m_id = join(' ',parse_message_id($m_id)) if defined $m_id && $m_id ne ''; # strip CFWS my(@args) = map(defined $_ ? $_ : '', # avoid nil in a Lua table ($self->{ttl}, $msginfo->log_id, $m_id, $msginfo->client_addr, lc $sender_smtp, @recips) ); if (!@recips) { do_log(5,"redis: save_info_final: %s deleted", $mail_id); } elsif (ll(5)) { do_log(5,"redis: save_info_final: %s, passed %d of %d recips, %s", $mail_id, scalar @recips, scalar @{$msginfo->per_recip_data}, join(', ',@args)); } my $result; eval { $result = $r->call('EVALSHA', $self->{lua_save_final}, 1, $mail_id, @args); 1; } or do { # Lua function probably not cached, define again and re-try if ($@ !~ /^NOSCRIPT/) { $self->disconnect; undef $result; chomp $@; do_log(-1, "save_info_final, Redis Lua error: %s", $@); } else { $self->load_lua_programs; $result = $r->call('EVALSHA', $self->{lua_save_final}, 1, $mail_id, @args); } }; my $ok = 1; my $r_nonce = $result; if (!defined($r_nonce) || $r_nonce ne $mail_id) { # redis protocol falling out of step? $ok = 0; $self->disconnect; do_log(-1,"redis: save_info_final - nonce mismatch, expected %s, got %s", $mail_id, defined $r_nonce ? $r_nonce : 'UNDEF'); } # $r->call("EVAL", 'collectgarbage()', 0); $self->disconnect if !$database_sessions_persistent; $ok; } sub load_lua_programs($) { my $self = $_[0]; do_log(5, "redis: load_lua_programs"); my $r = $self->{redis}; eval { $self->{lua_save_info_preliminary} = $r->call('SCRIPT', 'LOAD', <<'END'); --LUA_SAVE_INFO_PRELIMINARY local rcall, tonumber = redis.call, tonumber local mail_id, rx_time, ttl = KEYS[1], ARGV[1], ARGV[2] -- ensure the mail_id is unique, report false otherwise local added = rcall("HSETNX", mail_id, "time", rx_time) if added == 1 and ttl and tonumber(ttl) > 0 then if rcall("EXPIRE", mail_id, ttl) ~= 1 then return { err = "Failed to set EXPIRE on key " .. mail_id } end end return added -- 1:yes, 0:no,failed END } or do { $self->disconnect; die "Redis LUA error - lua_save_info_preliminary: $@\n" }; eval { $self->{lua_save_final} = $r->call('SCRIPT', 'LOAD', <<'END'); --LUA_SAVE_FINAL local mail_id = KEYS[1] local rcall = redis.call local ARGV = ARGV -- not delivered to any recipient, just delete the useless record if #ARGV < 6 then rcall("DEL", mail_id) else local ttl, log_id, msgid, client_addr, sender = unpack(ARGV,1,5) local tonumber, unpack = tonumber, unpack if not tonumber(ttl) or tonumber(ttl) <= 0 then ttl = nil end local addresses = { [sender] = true } -- remaining arguments 6 to #ARGV are recipient addresses for r = 6, #ARGV do addresses[ARGV[r]] = true end -- create mail address -> id mapping for addr in pairs(addresses) do local addr_key = "a:" .. addr local addr_id if not ttl then addr_id = rcall("GET", addr_key) else -- to avoid potential race between GET and EXPIRE, set EXPIRE first local refreshed = rcall("EXPIRE", addr_key, ttl) if refreshed == 1 then addr_id = rcall("GET", addr_key) end end if not addr_id then -- not found, assign a new id and store the email address addr_id = rcall("INCR", "last.id.addr") -- get next id, starts at 1 addr_id = tostring(addr_id) local ok if ttl then ok = rcall("SET", addr_key, addr_id, "EX", ttl, "NX") else ok = rcall("SET", addr_key, addr_id, "NX") end if not ok then -- shouldn't happen, Lua program runs atomically, but anyway... addr_id = rcall("GET", addr_key) -- collision, retry end end addresses[addr] = addr_id end -- create a Message-ID -> id mapping local msgid_key = "m:" .. msgid local msgid_id = rcall("GET", msgid_key) if msgid_id then -- unlikely duplicate Message-ID, but anyway... if ttl then rcall("EXPIRE", msgid_key, ttl) end -- extend its lifetime else msgid_id = rcall("INCR", "last.id.msgid") -- get next id, starts at 1 msgid_id = tostring(msgid_id) local ok if ttl then ok = rcall("SET", msgid_key, msgid_id, "EX", ttl, "NX") else ok = rcall("SET", msgid_key, msgid_id, "NX") end if not ok then -- shouldn't happen, Lua program runs atomically, but anyway... msgid_id = rcall("GET", msgid_key) -- collision, retry end end -- store additional information to an existing mail_id record local sender_id = addresses[sender] rcall("HSET", mail_id, "log", log_id) -- rcall("HMSET", mail_id, "log", log_id, -- "msgid", msgid_id, "ip", client_addr, "sender", sender_id) -- store relations: sender/msgid and sender/recipient pairs local mapkeys = { "sm:" .. sender_id .. "::" .. msgid_id } for r = 6, #ARGV do local recip_id = addresses[ARGV[r]] -- only the most recent sr:* record is kept, older are overwritten mapkeys[#mapkeys+1] = "sr:" .. sender_id .. ":" .. recip_id -- mapkeys[#mapkeys+1] = "srm:" .. sender_id .. ":" .. recip_id .. -- ":" .. msgid_id end if not ttl then for _,k in ipairs(mapkeys) do rcall("SET", k, mail_id) end else for _,k in ipairs(mapkeys) do rcall("SET", k, mail_id, "EX", ttl) end end end return mail_id END } or do { $self->disconnect; die "Redis LUA error - lua_save_final: $@\n" }; eval { $self->{lua_query_and_update_ip} = $r->call('SCRIPT', 'LOAD', <<'END'); --LUA_QUERY_AND_UPDATE_IP local rcall, tonumber, unpack, floor, sprintf = redis.call, tonumber, unpack, math.floor, string.format local KEYS, ARGV = KEYS, ARGV local rx_time, normal_random = ARGV[1], tonumber(ARGV[2]) local results = {} for j = 1, #KEYS do local ipkey = KEYS[j] -- an IP address, prefixed by "ip:" local tfirst, tlast -- Unix times of creation and last access local n, s, h, b -- counts: all, spam, ham, banned+virus local age, ttl -- time since creation, time to live in seconds local ip_addr_data = rcall("HMGET", ipkey, 'tl', 'tf', 'n', 's', 'h', 'b') if ip_addr_data then tlast, tfirst, n, s, h, b = unpack(ip_addr_data,1,6) end if not tlast then -- does not exist, a new entry is needed n = 0; tfirst = rx_time; ttl = 3*3600 -- 3 hours for new entries rcall("HMSET", ipkey, 'tf', rx_time, 'tl', rx_time, 'n', '0') else -- a record for this IP address exists, collect its counts and age n = tonumber(n) or 0 local rx_time_n, tfirst_n, tlast_n = tonumber(rx_time), tonumber(tfirst), tonumber(tlast) if rx_time_n and tfirst_n and tlast_n then -- valid numbers age = rx_time_n - tfirst_n -- time since entry creation local dt = rx_time_n - tlast_n -- time since last occurrence ttl = 3600 * (n >= 8 and 80 or (3 + n^2.2)) -- 4 to 80 hours if ttl < 1.5 * dt then ttl = 1.5 * dt end else -- just in case - ditch a record with invalid fields n = 0; tfirst = rx_time; ttl = 3*3600 rcall("DEL", ipkey); rcall("HMSET", ipkey, 'tf', rx_time, 'n', '0') end rcall("HMSET", ipkey, 'tl', rx_time) -- update its last-seen time end -- the 's', 'h', 'b' and 'n' counts will be updated later if normal_random then -- introduce some randomness, don't let spammers depend on a fixed ttl ttl = ttl * (1 + normal_random * 0.2) if ttl < 4000 then ttl = 4000 end -- no less than 1h 7min end -- set time-to-live in seconds, capped at 3 days, integer if age and (age + ttl > 3*24*3600) then ttl = 3*24*3600 - age end if ttl < 1 then rcall("DEL", ipkey); ttl = 0 else rcall("EXPIRE", ipkey, floor(ttl)) end results[#results+1] = { ipkey, n or 0, s or 0, h or 0, b or 0, tfirst or "", tlast or "", ttl } end return results END } or do { $self->disconnect; die "Redis LUA error - lua_query_and_update_ip: $@\n" }; eval { $self->{lua_query_penpals} = $r->call('SCRIPT', 'LOAD', <<'END'); --LUA_QUERY_PENPALS local tonumber, unpack, sprintf = tonumber, unpack, string.format local rcall = redis.call local ARGV = ARGV local now, nonce, recipient = ARGV[1], ARGV[2], ARGV[3] local senders_count = tonumber(ARGV[4]) local senders_argv_ofs = 5 local messageid_argv_ofs = senders_argv_ofs + senders_count + 1 local messageid_count = tonumber(ARGV[messageid_argv_ofs - 1]) local q_keys1 = {} -- current sender as a potential previous recipient if recipient == '' then recipient = nil end -- nothing ever sent to "<>" if recipient then q_keys1[#q_keys1+1] = "a:" .. recipient end for j = 1, senders_count do q_keys1[#q_keys1+1] = "a:" .. ARGV[senders_argv_ofs + j - 1] end for j = 1, messageid_count do q_keys1[#q_keys1+1] = "m:" .. ARGV[messageid_argv_ofs + j - 1] end -- map e-mail addresses and referenced Message-IDs to internal id numbers local q_result = rcall("MGET", unpack(q_keys1)) q_keys1 = nil local rid -- internal id of a recipient's e-mail addresses local mids = {} -- internal ids corresponding to referenced "Message-ID"s local senders = {} if q_result then local k = 0; if recipient then -- nonempty e-mail address, i.e. not "<>" k = k+1; rid = q_result[k] end for j = 1, senders_count do k = k+1; if not q_result[k] then senders[j] = false -- non-nil else senders[j] = { sid = q_result[k] } end end for j = 1, messageid_count do k = k+1; if q_result[k] then mids[q_result[k]] = true end end end q_result = nil -- prepare query keys to find a closest-matching previous e-mail message -- for each sender local q_keys2, belongs_to_sender, on_hit_txt = {}, {}, {} for _, s in ipairs(senders) do if s then -- try sender/Message-ID pairs without a recipient for m in pairs(mids) do local nxt = #q_keys2 + 1 q_keys2[nxt] = "sm:" .. s.sid .. "::" .. m on_hit_txt[nxt] = "mid=" .. m belongs_to_sender[nxt] = s end -- try a sender/recipient pair without a Message-ID ref if rid then local nxt = #q_keys2 + 1 q_keys2[nxt] = "sr:" .. s.sid .. ":" .. rid on_hit_txt[nxt] = "rid=" .. rid belongs_to_sender[nxt] = s end end end -- get an internal id (or nil) of a matching mail_id for each query key local q_result2 if #q_keys2 >= 1 then q_result2 = rcall("MGET", unpack(q_keys2)) end local msginfo = {} -- data about a message mail_id (e.g. its rx_time) if q_result2 then for j = 1, #q_keys2 do local rx_time_n local mail_id = q_result2[j] if not mail_id then -- no matching mail_id elseif msginfo[mail_id] then -- already looked-up rx_time_n = msginfo[mail_id].rx_time_n else -- not yet looked-up msginfo[mail_id] = {} -- see if a record for this mail_id exists, find its timestamp rx_time_n = tonumber(rcall("HGET", mail_id, "time")) msginfo[mail_id].rx_time_n = rx_time_n end if rx_time_n then -- exists and is a valid number local s = belongs_to_sender[j] if not s.hits then s.hits = {} end if not s.hits[mail_id] then s.hits[mail_id] = on_hit_txt[j] else s.hits[mail_id] = s.hits[mail_id] .. " " .. on_hit_txt[j] end -- for each sender manage a sorted list of mail_ids found if not s.mail_id_list then s.mail_id_list = { mail_id } else -- keep sender's mail_id_list sorted by rx_time, highest first local mail_id_list = s.mail_id_list local first_smaller_ind for j = 1, #mail_id_list do if msginfo[mail_id_list[j]].rx_time_n <= rx_time_n then first_smaller_ind = j; break end end table.insert(mail_id_list, first_smaller_ind or #mail_id_list+1, mail_id) end end end end local results = {} -- one entry for each sender, followed by a nonce for _, s in ipairs(senders) do if not s or not s.mail_id_list then -- no matching mail_id results[#results+1] = { s and s.sid or "", rid } else -- some matches for this sender, compile a report local report = {}; local mail_id_list = s.mail_id_list for _, mail_id in ipairs(mail_id_list) do -- first is best report[#report+1] = sprintf("%s (%.0f s) %s", mail_id, tonumber(now) - msginfo[mail_id].rx_time_n, s.hits and s.hits[mail_id] or "") end results[#results+1] = { s.sid or "", rid or "", msginfo[mail_id_list[1]].rx_time_n, mail_id_list[1], table.concat(report,", ") } end end results[#results+1] = nonce return results END 1; } or do { $self->disconnect; die "Redis LUA error - lua_query_penpals: $@\n" }; ll(5) && do_log(5, "redis: SHA fingerprints: final %s, query %s", map(substr($_,0,10), @$self{qw(lua_save_final lua_query)})); section_time("redis-load"); 1; } 1;