Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.129.24.240
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/management/commands/subcommands/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/management/commands/subcommands/_mx.py
"""Management command to check defined domains."""

import ipaddress

import gevent
from gevent import socket

from django.conf import settings
from django.core import mail
from django.core.mail import EmailMessage
from django.core.management.base import BaseCommand
from django.template.loader import render_to_string
from django.utils.encoding import smart_text
from django.utils.functional import cached_property
from django.utils.translation import ugettext as _

from modoboa.admin import constants, models
from modoboa.dnstools import models as dns_models
from modoboa.parameters import tools as param_tools


class CheckMXRecords(BaseCommand):
    """Command class."""

    help = "Check defined domains."  # NOQA:A003

    @cached_property
    def providers(self):
        """Return a list of DNSBL providers."""
        if not hasattr(settings, "DNSBL_PROVIDERS"):
            return constants.DNSBL_PROVIDERS
        return settings.DNSBL_PROVIDERS

    @cached_property
    def sender(self):
        """Return sender address for notifications."""
        return param_tools.get_global_parameter("sender_address", app="core")

    @cached_property
    def valid_mxs(self):
        """Return valid MXs set in admin."""
        valid_mxs = param_tools.get_global_parameter("valid_mxs")
        return [ipaddress.ip_network(smart_text(v.strip()))
                for v in valid_mxs.split() if v.strip()]

    def add_arguments(self, parser):
        """Add extra arguments to command."""
        parser.add_argument(
            "--no-dnsbl", action="store_true", default=False,
            help="Skip DNSBL queries.")
        parser.add_argument(
            "--email", type=str, action="append", default=[],
            help="One or more email to notify")
        parser.add_argument(
            "--skip-admin-emails", action="store_true",
            default=False,
            help="Skip domain's admins email notification.")
        parser.add_argument(
            "--domain", type=str, action="append", default=[],
            help="Domain name or id to update.")
        parser.add_argument(
            "--timeout", type=int, default=3,
            help="Timeout used for queries.")
        parser.add_argument(
            "--ttl", type=int, default=7200,
            help="TTL for dns query.")

    def query_dnsbl(self, mx_list, provider):
        """Check given IP against given DNSBL provider."""
        results = {}
        for ip, mxs in mx_list.items():
            try:
                ip = ipaddress.ip_address(smart_text(ip))
            except ValueError as e:
                continue
            else:
                delim = "." if ip.version == 4 else ":"
                reverse = delim.join(ip.exploded.split(delim)[::-1])
            pattern = "{}.{}.".format(reverse, provider)
            try:
                result = socket.gethostbyname(pattern)
            except socket.gaierror:
                result = False
            for mx in mxs:
                results[mx] = result
        return provider, results

    def store_dnsbl_result(self, domain, provider, results, **options):
        """Store DNSBL provider results for domain."""
        alerts = {}
        to_create = []
        for mx, result in list(results.items()):
            if not result:
                result = ""
            dnsbl_result = models.DNSBLResult.objects.filter(
                domain=domain, provider=provider, mx=mx).first()
            trigger = False
            if dnsbl_result is None:
                to_create.append(
                    models.DNSBLResult(
                        domain=domain, provider=provider, mx=mx,
                        status=result)
                )
                if result:
                    trigger = True
            else:
                dnsbl_result.status = result
                dnsbl_result.save()
                if not dnsbl_result.status and result:
                    trigger = True
            if trigger:
                if domain not in alerts:
                    alerts[domain] = []
                alerts[domain].append((provider, mx))
        models.DNSBLResult.objects.bulk_create(to_create)
        if not alerts:
            return
        emails = list(options["email"])
        if not options["skip_admin_emails"]:
            emails.extend(
                domain.admins.exclude(mailbox__isnull=True)
                .values_list("email", flat=True)
            )
        if not len(emails):
            return
        with mail.get_connection() as connection:
            for domain, providers in list(alerts.items()):
                content = render_to_string(
                    "admin/notifications/domain_in_dnsbl.html", {
                        "domain": domain, "alerts": providers
                    })
                subject = _("[modoboa] DNSBL issue(s) for domain {}").format(
                    domain.name)
                msg = EmailMessage(
                    subject, content.strip(), self.sender, emails,
                    connection=connection
                )
                msg.send()

    def check_valid_mx(self, domain, mx_list, **options):
        """Check that domain's MX record exist.

        If `valid_mx` is provided, retrieved MX records must be
        contained in it.
        """
        alerts = []
        check = False
        mxs = [(mx, ipaddress.ip_address("%s" % mx.address))
               for mx in mx_list]
        valid_mxs = self.valid_mxs
        if not mxs:
            alerts.append(_("Domain {} has no MX record").format(domain))
        elif valid_mxs:
            for subnet in valid_mxs:
                for mx, addr in mxs:
                    if addr in subnet:
                        mx.managed = check = True
                        mx.save()
            if check is False:
                mx_names = [
                    "{0.name} ({0.address})".format(mx) for mx in mx_list]
                alerts.append(
                    _("MX record for domain {0} is invalid: {1}").format(
                        domain, ", ".join(mx_names))
                )
        if not alerts:
            return
        emails = list(options["email"])
        if not options["skip_admin_emails"]:
            emails.extend(
                domain.admins.exclude(mailbox__isnull=True)
                .values_list("email", flat=True)
            )
        if not len(emails):
            return
        content = render_to_string(
            "admin/notifications/domain_invalid_mx.html", {
                "domain": domain, "alerts": alerts
            })
        subject = _("[modoboa] MX issue(s) for domain {}").format(
            domain.name)
        msg = EmailMessage(subject, content.strip(), self.sender, emails)
        msg.send()

    def check_domain(self, domain, timeout=3, ttl=7200, **options):
        """Check specified domain."""
        mx_list = list(
            models.MXRecord.objects.get_or_create_for_domain(domain, ttl))

        if param_tools.get_global_parameter("enable_mx_checks"):
            self.check_valid_mx(domain, mx_list, **options)

        if param_tools.get_global_parameter("enable_spf_checks"):
            dns_models.DNSRecord.objects.get_or_create_for_domain(
                domain, "spf", ttl)
        condition = (
            param_tools.get_global_parameter("enable_dkim_checks") and
            domain.dkim_public_key
        )
        if condition:
            dns_models.DNSRecord.objects.get_or_create_for_domain(
                domain, "dkim", ttl)
        if param_tools.get_global_parameter("enable_dmarc_checks"):
            dns_models.DNSRecord.objects.get_or_create_for_domain(
                domain, "dmarc", ttl)
        if param_tools.get_global_parameter("enable_autoconfig_checks"):
            dns_models.DNSRecord.objects.get_or_create_for_domain(
                domain, "autoconfig", ttl)
            dns_models.DNSRecord.objects.get_or_create_for_domain(
                domain, "autodiscover", ttl)

        condition = (
            not param_tools.get_global_parameter("enable_dnsbl_checks") or
            options["no_dnsbl"] is True)
        if condition or not mx_list:
            return

        mx_by_ip = {}
        for mx in mx_list:
            if mx.address not in mx_by_ip:
                mx_by_ip[mx.address] = [mx]
            elif mx not in mx_by_ip[mx.address]:
                mx_by_ip[mx.address].append(mx)

        jobs = [
            gevent.spawn(self.query_dnsbl, mx_by_ip, provider)
            for provider in self.providers]
        gevent.joinall(jobs, timeout)
        for job in jobs:
            if not job.successful():
                continue
            provider, results = job.value
            self.store_dnsbl_result(domain, provider, results, **options)

    def handle(self, *args, **options):
        """Command entry point."""
        # Remove deprecated records first
        models.DNSBLResult.objects.exclude(
            provider__in=self.providers).delete()

        if options["domain"]:
            domains = []
            for domain in options["domain"]:
                try:
                    if domain.isdigit():
                        domains.append(models.Domain.objects.get(pk=domain))
                    else:
                        domains.append(models.Domain.objects.get(name=domain))
                except models.Domain.DoesNotExist:
                    pass
        else:
            domains = models.Domain.objects.filter(
                enabled=True, enable_dns_checks=True)

        options.pop("domain")

        for domain in domains:
            if domain.uses_a_reserved_tld:
                continue
            self.check_domain(domain, **options)

Anon7 - 2022
AnonSec Team