Server IP : 85.214.239.14 / Your IP : 3.149.214.28 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/root/proc/3/root/proc/2/task/2/root/usr/share/perl5/Mail/DMARC/Report/Store/ |
Upload File : |
package Mail::DMARC::Report::Store::SQL; our $VERSION = '1.20211209'; use strict; use warnings; use Carp; use Data::Dumper; use DBIx::Simple; use File::ShareDir; use Mail::DMARC::Report::Store::SQL::Grammars::MySQL; use Mail::DMARC::Report::Store::SQL::Grammars::SQLite; use Mail::DMARC::Report::Store::SQL::Grammars::PostgreSQL; use parent 'Mail::DMARC::Base'; use Mail::DMARC::Report::Aggregate; sub save_aggregate { my ( $self, $agg ) = @_; $self->db_connect(); croak "policy_published must be a Mail::DMARC::Policy object" if 'Mail::DMARC::Policy' ne ref $agg->policy_published; #warn Dumper($meta); ## no critic (Carp) foreach my $f ( qw/ org_name email begin end / ) { croak "meta field $f required" if ! $agg->metadata->$f; } my $rid = $self->get_report_id( $agg ) or croak "failed to create report!"; # on 6/8/2013, Microsoft spat out a bunch of reports with zero records. if ( ! $agg->record ) { warn "\ta report with ZERO records! Illegal.\n"; ## no critic (Carp) return $rid; }; foreach my $rec ( @{ $agg->record } ) { $self->insert_agg_record($rid, $rec); }; return $rid; } sub retrieve { my ( $self, %args ) = @_; my $query = $self->grammar->select_report_query; my @params; if ( $args{rid} ) { $query .= $self->grammar->and_arg('r.id'); push @params, $args{rid}; }; if ( $args{begin} ) { $query .= $self->grammar->and_arg('r.begin', '>='); push @params, $args{begin}; }; if ( $args{end} ) { $query .= $self->grammar->and_arg('r.end', '<='); push @params, $args{end}; }; if ( $args{author} ) { $query .= $self->grammar->and_arg('a.org_name'); push @params, $args{author}; }; if ( $args{from_domain} ) { $query .= $self->grammar->and_arg('fd.domain'); push @params, $args{from_domain}; }; my $reports = $self->query( $query, \@params ); foreach (@$reports ) { $_->{begin} = join(" ", split(/T/, $self->epoch_to_iso( $_->{begin} ))); $_->{end} = join(" ", split(/T/, $self->epoch_to_iso( $_->{end} ))); }; return $reports; } sub next_todo { my ( $self ) = @_; if ( ! exists $self->{ _todo_list } ) { $self->{_todo_list} = $self->query( $self->grammar->select_todo_query, [ $self->time ] ); return if ! $self->{_todo_list}; } my $next_todo = shift @{ $self->{_todo_list} }; if ( ! $next_todo ) { delete $self->{_todo_list}; return; } my $agg = Mail::DMARC::Report::Aggregate->new(); $self->populate_agg_metadata( \$agg, \$next_todo ); my $pp = $self->get_report_policy_published( $next_todo->{rid} ); $pp->{domain} = $next_todo->{from_domain}; $agg->policy_published( Mail::DMARC::Policy->new( %$pp ) ); $self->populate_agg_records( \$agg, $next_todo->{rid} ); return $agg; } sub retrieve_todo { my ( $self, @args ) = @_; # this method extracts the data from the SQL tables and populates a # list of Aggregate report objects with them. my $reports = $self->query( $self->grammar->select_todo_query, [ $self->time ] ); my @reports_todo; return \@reports_todo if ! scalar @$reports; foreach my $report ( @{ $reports } ) { my $agg = Mail::DMARC::Report::Aggregate->new(); $self->populate_agg_metadata( \$agg, \$report ); my $pp = $self->get_report_policy_published( $report->{rid} ); $pp->{domain} = $report->{from_domain}; $agg->policy_published( Mail::DMARC::Policy->new( %$pp ) ); $self->populate_agg_records( \$agg, $report->{rid} ); push @reports_todo, $agg; } return \@reports_todo; } sub delete_report { my $self = shift; my $report_id = shift or croak "missing report ID"; print "deleting report $report_id\n" if $self->verbose; # deletes with FK don't cascade in SQLite? Clean each table manually my $rows = $self->query( $self->grammar->report_record_id, [$report_id] ); my @row_ids = map { $_->{id} } @$rows; if (scalar @row_ids) { foreach my $table (qw/ report_record_spf report_record_dkim report_record_reason /) { print "deleting $table rows " . join(',', @row_ids) . "\n" if $self->verbose; eval { $self->query( $self->grammar->delete_from_where_record_in($table), \@row_ids); }; # warn $@ if $@; } } foreach my $table (qw/ report_policy_published report_record report_error /) { print "deleting $table rows for report $report_id\n" if $self->verbose; eval { $self->query( $self->grammar->delete_from_where_report($table), [$report_id] ); }; # warn $@ if $@; } # In MySQL, where FK constraints DO cascade, this is the only query needed $self->query( $self->grammar->delete_report, [$report_id] ); return 1; } sub get_domain_id { my ( $self, $domain ) = @_; croak "missing domain calling " . ( caller(0) )[3] if !$domain; my $r = $self->query( $self->grammar->select_domain_id, [$domain] ); if ( $r && scalar @$r ) { return $r->[0]{id}; } return $self->query( $self->grammar->insert_domain, [$domain]); } sub get_author_id { my ( $self, $meta ) = @_; croak "missing author name" if !$meta->org_name; my $r = $self->query( $self->grammar->select_author_id, [ $meta->org_name ] ); if ( $r && scalar @$r ) { return $r->[0]{id}; } carp "missing email" if !$meta->email; return $self->query( $self->grammar->insert_author, [ $meta->org_name, $meta->email, $meta->extra_contact_info ] ); } sub get_report_id { my ( $self, $aggr ) = @_; my $meta = $aggr->metadata; my $pol = $aggr->policy_published; # check if report exists my $author_id = $self->get_author_id( $meta ) or croak; my $from_dom_id = $self->get_domain_id( $pol->domain ) or croak; my $ids; if ( $meta->report_id ) { # reports arriving via the wire will have an author ID & report ID $ids = $self->query( $self->grammar->select_report_id, [ $meta->report_id, $author_id ] ); } else { # Reports submitted by our local MTA will not have a report ID # They aggregate on the From domain, where the DMARC policy was discovered $ids = $self->query( $self->grammar->select_id_with_end, [ $from_dom_id, $self->time, $author_id ] ); }; if ( scalar @$ids ) { # report already exists return $self->{report_id} = $ids->[0]{id}; } my $rid = $self->{report_id} = $self->query( $self->grammar->insert_report, [ $from_dom_id, $meta->begin, $meta->end, $author_id, $meta->uuid ] ) or return; $self->insert_policy_published( $rid, $pol ); return $rid; } sub get_report { my ($self,@args) = @_; croak "invalid parameters" if @args % 2; my %args = @args; my $query = $self->grammar->select_report_query; my @params; my @known = qw/ r.id a.org_name fd.domain r.begin r.end /; my %known = map { $_ => 1 } @known; # TODO: allow custom search ops? 'searchOper' => 'eq', if ( $args{searchField} && $known{ $args{searchField} } ) { $query .= $self->grammar->and_arg($args{searchField}); push @params, $args{searchString}; }; foreach my $known ( @known ) { next if ! defined $args{$known}; $query .= $self->grammar->and_arg($known); push @params, $args{$known}; }; if ( $args{sidx} && $known{$args{sidx}} ) { if ( $args{sord} ) { $query .= $self->grammar->order_by($args{sidx}, $args{sord} eq 'desc' ? ' DESC' : ' ASC'); }; }; my $total_recs = $self->dbix->query($self->grammar->count_reports)->list; my $total_pages = 0; if ( $args{rows} ) { if ( $args{page} ) { $total_pages = POSIX::ceil($total_recs / $args{rows}); my $start = ($args{rows} * $args{page}) - $args{rows}; $start = 0 if $start < 0; $query .= $self->grammar->limit_args(2); push @params, $start, $args{rows}; } else { $query .= $self->grammar->limit_args; push @params, $args{rows}; }; }; # warn "query: $query\n" . join(", ", @params) . "\n"; my $reports = $self->query($query, \@params); foreach (@$reports ) { $_->{begin} = join('<br>', split(/T/, $self->epoch_to_iso( $_->{begin} ))); $_->{end} = join('<br>', split(/T/, $self->epoch_to_iso( $_->{end} ))); }; # return in the format expected by jqGrid return { cur_page => $args{page}, total_pages => $total_pages, total_rows => $total_recs, rows => $reports, }; } sub get_report_policy_published { my ($self, $rid) = @_; my $pp = $self->query($self->grammar->select_report_policy_published, [ $rid ] )->[0]; $pp->{p} ||= 'none'; $pp = Mail::DMARC::Policy->new( v=>'DMARC1', %$pp ); return $pp; } sub get_rr { my ($self,@args) = @_; croak "invalid parameters" if @args % 2; my %args = @args; # warn Dumper(\%args); croak "missing report ID (rid)!" if ! defined $args{rid}; my $rows = $self->query( $self->grammar->select_rr_query, [ $args{rid} ] ); foreach ( @$rows ) { $_->{source_ip} = $self->any_inet_ntop( $_->{source_ip} ) if $self->grammar->language ne 'postgresql'; $_->{reasons} = $self->query($self->grammar->select_report_reason, [ $_->{id} ] ); }; return { cur_page => 1, total_pages => 1, total_rows => scalar @$rows, rows => $rows, }; } sub populate_agg_metadata { my ($self, $agg_ref, $report_ref) = @_; $$agg_ref->metadata->report_id( $$report_ref->{rid} ); foreach my $f ( qw/ org_name email extra_contact_info / ) { $$agg_ref->metadata->$f( $self->config->{organization}{$f} ); }; foreach my $f ( qw/ begin end / ) { $$agg_ref->metadata->$f( $$report_ref->{$f} ); }; my $errors = $self->query($self->grammar->select_report_error, [ $$report_ref->{rid} ] ); foreach ( @$errors ) { $$agg_ref->metadata->error( $_->{error} ); }; return 1; } sub populate_agg_records { my ($self, $agg_ref, $rid) = @_; my $recs = $self->query( $self->grammar->select_rr_query, [ $rid ] ); # aggregate the connections per IP-Disposition-DKIM-SPF uniqueness my (%ips, %uniq, %pe, %auth, %ident, %reasons, %other); foreach my $rec ( @$recs ) { my $ip = $rec->{source_ip}; $ip = $self->any_inet_ntop($rec->{source_ip}) if $self->grammar->language ne 'postgresql'; my $key = join('-', $ip, @$rec{ qw/ disposition dkim spf / }); # hash slice $uniq{ $key }++; $ips{$key} = $rec->{source_ip}; $ident{$key}{header_from} ||= $rec->{header_from}; $ident{$key}{envelope_from} ||= $rec->{envelope_from}; $ident{$key}{envelope_to} ||= $rec->{envelope_to}; $pe{$key}{disposition} ||= $rec->{disposition}; $pe{$key}{dkim} ||= $rec->{dkim}; $pe{$key}{spf} ||= $rec->{spf}; $auth{$key}{spf} ||= $self->get_row_spf($rec->{id}); $auth{$key}{dkim} ||= $self->get_row_dkim($rec->{id}); my $reasons = $self->get_row_reason( $rec->{id} ); foreach my $reason ( @$reasons ) { my $type = $reason->{type} or next; $reasons{$key}{$type} = $reason->{comment}; # flatten reasons } } foreach my $u ( keys %uniq ) { my $record = Mail::DMARC::Report::Aggregate::Record->new( identifiers => $ident{$u}, auth_results => $auth{$u}, row => { source_ip => $self->grammar->language eq 'postgresql' ? $ips{$u} : $self->any_inet_ntop( $ips{$u} ), count => $uniq{ $u }, policy_evaluated => { %{ $pe{$u} }, $reasons{$u} ? ( reason => [ map { { type => $_, comment => $reasons{$u}{$_} } } sort keys %{ $reasons{$u} } ] ) : (), }, } ); $$agg_ref->record( $record ); } return $$agg_ref->record; } sub row_exists { my ($self, $rid, $rec ) = @_; if ( ! defined $rec->{row}{count} ) { print "new record\n" if $self->verbose; return; }; my $rows = $self->query( $self->grammar->select_report_record, [ $rid, $rec->{row}{source_ip}, $rec->{row}{count}, ] ); return 1 if scalar @$rows; return; } sub insert_agg_record { my ($self, $row_id, $rec) = @_; return 1 if $self->row_exists( $row_id, $rec); $row_id = $self->insert_rr( $row_id, $rec ) or croak "failed to insert report row"; my $reasons = $rec->row->policy_evaluated->reason; if ( $reasons ) { foreach my $reason ( @$reasons ) { next if !$reason || !$reason->{type}; $self->insert_rr_reason( $row_id, $reason->{type}, $reason->{comment} ); }; } my $spf_ref = $rec->auth_results->spf; if ( $spf_ref ) { foreach my $spf (@$spf_ref) { $self->insert_rr_spf( $row_id, $spf ); } } my $dkim = $rec->auth_results->dkim; if ($dkim) { foreach my $sig (@$dkim) { next if ! $sig || ! $sig->{domain}; $self->insert_rr_dkim( $row_id, $sig ); } } return 1; } sub insert_error { my ( $self, $rid, $error ) = @_; # wait >5m before trying to deliver this report again $self->query($self->grammar->insert_error(0), [$self->time + (5*60), $rid]); return $self->query( $self->grammar->insert_error(1), [ $rid, $error ] ); } sub insert_rr_reason { my ( $self, $row_id, $type, $comment ) = @_; return $self->query( $self->grammar->insert_rr_reason, [ $row_id, $type, ($comment || '') ] ); } sub insert_rr_dkim { my ( $self, $row_id, $dkim ) = @_; my (@fields, @values); foreach ( qw/ domain selector result human_result / ) { next if ! defined $dkim->{$_}; if ( 'domain' eq $_ ) { push @fields, 'domain_id'; push @values, $self->get_domain_id( $dkim->{domain} ); next; }; push @fields, $_; push @values, $dkim->{$_}; }; my $query = $self->grammar->insert_rr_dkim(\@fields); $self->query( $query, [ $row_id, @values ] ); return 1; } sub insert_rr_spf { my ( $self, $row_id, $spf ) = @_; my (@fields, @values); for ( qw/ domain scope result / ) { next if ! defined $spf->{$_}; if ( 'domain' eq $_ ) { push @fields, 'domain_id'; push @values, $self->get_domain_id( $spf->{domain} ); next; }; push @fields, $_; push @values, $spf->{$_}; }; my $query = $self->grammar->insert_rr_spf(\@fields); $self->query( $query, [ $row_id, @values ]); return 1; } sub insert_rr { my ( $self, $report_id, $rec ) = @_; $report_id or croak "report ID required?!"; my $query = $self->grammar->insert_rr; my $ip = $rec->row->source_ip; $ip = $self->any_inet_pton( $ip ) if $self->grammar->language ne 'postgresql'; my @args = ( $report_id, $ip, $rec->{row}{count}, ); foreach my $f ( qw/ header_from envelope_to envelope_from / ) { push @args, $rec->identifiers->$f ? $self->get_domain_id( $rec->identifiers->$f ) : undef; }; push @args, map { $rec->row->policy_evaluated->$_ } qw/ disposition dkim spf /; my $rr_id = $self->query( $query, \@args ) or croak; return $self->{report_row_id} = $rr_id; } sub insert_policy_published { my ( $self, $id, $pub ) = @_; my $query = $self->grammar->insert_policy_published; $self->query( $query, [ $id, @$pub{ qw/ adkim aspf p sp pct rua /} ] ); return 1; } sub db_connect { my $self = shift; my $dsn = $self->config->{report_store}{dsn} or croak; my $user = $self->config->{report_store}{user}; my $pass = $self->config->{report_store}{pass}; # cacheing if ($self->{grammar} && $self->{dbix}) { my $cached_grammar_type = $self->{grammar}->dsn; if ( $dsn =~ /$cached_grammar_type/ ) { return $self->{dbix}; # caching } } my $needs_tables; $self->{grammar} = undef; if ($dsn =~ /sqlite/i) { my ($db) = ( split /=/, $dsn )[-1]; if ( !$db || $db eq ':memory:' || !-e $db ) { my $schema = 'mail_dmarc_schema.sqlite'; $needs_tables = $self->get_db_schema($schema) or croak "can't locate DB $db AND can't find $schema! Create $db manually.\n"; } $self->{grammar} = Mail::DMARC::Report::Store::SQL::Grammars::SQLite->new(); } elsif ($dsn =~ /mysql/i) { $self->{grammar} = Mail::DMARC::Report::Store::SQL::Grammars::MySQL->new(); } elsif ($dsn =~ /pg/i) { $self->{grammar} = Mail::DMARC::Report::Store::SQL::Grammars::PostgreSQL->new(); } else { croak "can't determine database type, so unable to load grammar.\n"; } $self->{dbix} = DBIx::Simple->connect( $dsn, $user, $pass ) or return $self->error( DBIx::Simple->error ); if ($needs_tables) { $self->apply_db_schema($needs_tables); } return $self->{dbix}; } sub db_check_err { my ( $self, $err ) = @_; ## no critic (PackageVars) return if !defined $DBI::errstr; return if !$DBI::errstr; return if $DBI::errstr eq 'DBI error: '; croak $err . $DBI::errstr; } sub dbix { return $_[0]->{dbix} if $_[0]->{dbix}; return $_[0]->db_connect(); } sub apply_db_schema { my ( $self, $file ) = @_; my $setup = $self->slurp($file); foreach ( split /;/, $setup ) { # warn "$_\n"; $self->dbix->query($_); } return; } sub get_db_schema { my ( $self, $file ) = @_; return "share/$file" if -f "share/$file"; # when testing return File::ShareDir::dist_file( 'Mail-DMARC', $file ); # when installed } sub query { my ( $self, $query, $params, @extra ) = @_; my @c = caller; my $err = sprintf( "query called by %s, %s\n", $c[0], $c[2] ) . "\t$query\n\t"; my @params; if ( defined $params ) { @params = ref $params eq 'ARRAY' ? @$params : $params; no warnings; ## no critic (NoWarnings) $err .= join( ', ', @params ); } croak "too many arguments to exec_query!" if @extra; my $dbix = $self->db_connect() or croak DBIx::Simple->error; return $self->query_insert( $query, $err, @params ) if $query =~ /^INSERT/ix; return $self->query_replace( $query, $err, @params ) if $query =~ /^(?:REPLACE|UPDATE)/ix; return $self->query_delete( $query, $err, @params ) if $query =~ /^(?:DELETE|TRUNCATE)/ix; return $self->query_any( $query, $err, @params ); } sub query_any { my ( $self, $query, $err, @params ) = @_; # warn "query: $query\n" . join(", ", @params) . "\n"; my $r; eval { $r = $self->dbix->query( $query, @params )->hashes; } or print ''; $self->db_check_err($err); die "something went wrong with: $err\n" if ! $r; ## no critic (Carp) return $r; } sub query_insert { my ( $self, $query, $err, @params ) = @_; eval { $self->dbix->query( $query, @params ) } or do { warn DBIx::Simple->error . "\n"; croak $err; }; $self->db_check_err($err); # If the table has no autoincrement field, last_insert_id is zero my ( undef, undef, $table ) = split /\s+/, $query; ($table) = split( /\(/, $table ) if $table =~ /\(/; $table =~ s/^"|"$//g; croak "unable to determine table in query: $query" if !$table; return $self->dbix->last_insert_id( undef, undef, $table, undef ); } sub query_replace { my ( $self, $query, $err, @params ) = @_; $self->dbix->query( $query, @params ) or croak $err; $self->db_check_err($err); return 1; # sorry, no indication of success } sub query_delete { my ( $self, $query, $err, @params ) = @_; my $affected = $self->dbix->query( $query, @params )->rows or croak $err; $self->db_check_err($err); return $affected; } sub get_row_spf { my ($self, $rowid) = @_; return $self->query( $self->grammar->select_row_spf, [ $rowid ] ); } sub get_row_dkim { my ($self, $rowid) = @_; return $self->query( $self->grammar->select_row_dkim, [ $rowid ] ); } sub get_row_reason { my ($self, $rowid) = @_; return $self->query( $self->grammar->select_row_reason, [ $rowid ] ); } sub grammar { my $self = shift; $self->db_connect(); return $self->{grammar}; } 1; __END__ =pod =head1 NAME Mail::DMARC::Report::Store::SQL - store and retrieve reports from a SQL RDBMS =head1 VERSION version 1.20211209 =head1 DESCRIPTION Uses ANSI SQL syntax, keeping the SQL as portable as possible. DB engine specific features are to be avoided. =head1 SYPNOSIS Store and retrieve DMARC reports from SQL data store. Tested with SQLite, MySQL and PostgreSQL. =head1 AUTHORS =over 4 =item * Matt Simerson <msimerson@cpan.org> =item * Davide Migliavacca <shari@cpan.org> =item * Marc Bradshaw <marc@marcbradshaw.net> =back =head1 COPYRIGHT AND LICENSE This software is copyright (c) 2021 by Matt Simerson. This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself. =cut