Server IP : 85.214.239.14 / Your IP : 3.133.155.48 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/modoboa/env/lib/python3.5/site-packages/modoboa/admin/forms/ |
Upload File : |
"""Forms related to domains management.""" from functools import reduce from django import forms from django.http import QueryDict from django.urls import reverse from django.utils.encoding import force_text from django.utils.translation import ugettext as _, ugettext_lazy from modoboa.core import signals as core_signals from modoboa.core.models import User from modoboa.lib.exceptions import Conflict from modoboa.lib.fields import DomainNameField from modoboa.lib.form_utils import ( DynamicForm, TabForms, WizardForm, WizardStep, YesNoField ) from modoboa.lib.web_utils import render_to_json_response, size2integer from modoboa.parameters import tools as param_tools from .. import constants, lib, signals from ..models import Alias, Domain, DomainAlias, Mailbox class DomainFormGeneral(forms.ModelForm, DynamicForm): """A form to create/edit a domain.""" type = forms.ChoiceField( # NOQA:A003 label=ugettext_lazy("Type"), ) aliases = DomainNameField( label=ugettext_lazy("Alias(es)"), required=False, help_text=ugettext_lazy( "Alias(es) of this domain. Indicate only one name per input, " "press ENTER to add a new input." ) ) dkim_key_selector = forms.CharField( label=ugettext_lazy("Key selector"), initial="modoboa", required=False) quota = forms.CharField( label=ugettext_lazy("Quota"), initial=0, help_text=ugettext_lazy( "Quota shared between mailboxes. Can be expressed in KB, " "MB (default) or GB. A value of 0 means no quota." ) ) default_mailbox_quota = forms.CharField( label=ugettext_lazy("Default mailbox quota"), initial=0, help_text=ugettext_lazy( "Default quota applied to mailboxes. Can be expressed in KB, MB " "(default) or GB. A value of 0 means no quota." ) ) class Meta: model = Domain fields = ( "name", "type", "quota", "default_mailbox_quota", "aliases", "enabled", "enable_dns_checks", "enable_dkim", "dkim_key_selector", "dkim_key_length" ) def __init__(self, user, *args, **kwargs): self.oldname = None if "instance" in kwargs: self.old_dkim_key_length = kwargs["instance"].dkim_key_length self.oldname = kwargs["instance"].name super(DomainFormGeneral, self).__init__(*args, **kwargs) params = dict(param_tools.get_global_parameters("admin")) self.fields["quota"].initial = params["default_domain_quota"] self.fields["default_mailbox_quota"].initial = ( params["default_mailbox_quota"]) self.fields["type"].choices = constants.DOMAIN_TYPES self.field_widths = { "quota": 3, "default_mailbox_quota": 3 } self.user = user if len(args) and isinstance(args[0], QueryDict): self._load_from_qdict(args[0], "aliases", DomainNameField) elif "instance" in kwargs: d = kwargs["instance"] for pos, dalias in enumerate(d.domainalias_set.all()): name = "aliases_%d" % (pos + 1) self._create_field(forms.CharField, name, dalias.name, 3) def clean_name(self): """Check unicity and more.""" name = self.cleaned_data["name"].lower() label = lib.check_if_domain_exists( name, [(DomainAlias, _("domain alias"))]) if label is not None: raise forms.ValidationError( _("A %s with this name already exists") % force_text(label)) domains_must_have_authorized_mx = ( param_tools.get_global_parameter("domains_must_have_authorized_mx") ) if domains_must_have_authorized_mx and not self.user.is_superuser: if not lib.domain_has_authorized_mx(name): raise forms.ValidationError( _("No authorized MX record found for this domain")) return name def clean_enable_dkim(self): """Check prerequisites.""" enabled = self.cleaned_data.get("enable_dkim") if not enabled: return enabled storage_dir = param_tools.get_global_parameter("dkim_keys_storage_dir") if not storage_dir: raise forms.ValidationError( _("DKIM keys storage directory not configured")) return enabled def clean_quota(self): """Return proper quota value.""" return size2integer(self.cleaned_data["quota"], output_unit="MB") def clean_default_mailbox_quota(self): """Return proper quota value.""" return size2integer( self.cleaned_data["default_mailbox_quota"], output_unit="MB") def clean(self): """Custom fields validation. We want to prevent duplicate names between domains and domain aliases. The validation way is not very smart... """ cleaned_data = super(DomainFormGeneral, self).clean() if self._errors: return cleaned_data condition = ( self.cleaned_data["quota"] != 0 and self.cleaned_data["default_mailbox_quota"] > self.cleaned_data["quota"]) if condition: self.add_error( "default_mailbox_quota", _("Cannot be greater than domain quota")) elif self.user.role == "Resellers": limit = self.user.userobjectlimit_set.get(name="quota") if limit.max_value != 0: quota = self.cleaned_data["quota"] msg = _("You can't define an unlimited quota.") if quota == 0: self.add_error("quota", msg) default_mailbox_quota = self.cleaned_data[ "default_mailbox_quota"] if default_mailbox_quota == 0: self.add_error("default_mailbox_quota", msg) if self.cleaned_data["enable_dkim"]: if not self.cleaned_data.get("dkim_key_selector"): self.add_error( "dkim_key_selector", _("This field is required.")) self.aliases = [] copied_data = cleaned_data.copy() for k in copied_data.keys(): if not k.startswith("aliases"): continue if cleaned_data[k] == "": del cleaned_data[k] continue if cleaned_data[k] == self.cleaned_data["name"]: self._errors[k] = self.error_class( [_("A %s with this name already exists") % _("domain")] ) del cleaned_data[k] continue label = lib.check_if_domain_exists( cleaned_data[k], [(Domain, _("domain"))]) if label is not None: self.add_error( k, _("A %s with this name already exists") % force_text(label) ) else: self.aliases.append(cleaned_data[k]) return cleaned_data def save(self, user, commit=True, domalias_post_create=False): """Custom save method. Updating a domain may have consequences on other objects (domain alias, mailbox, quota). The most tricky part concerns quotas update. """ d = super(DomainFormGeneral, self).save(commit=False) core_signals.can_create_object.send( sender=self.__class__, context=user, klass=Domain, instance=d) if not commit: return d d.save() for dalias in d.domainalias_set.all(): if dalias.name not in self.aliases: dalias.delete() else: self.aliases.remove(dalias.name) if self.aliases: core_signals.can_create_object.send( self.__class__, context=user, klass=DomainAlias, count=len(self.aliases)) core_signals.can_create_object.send( self.__class__, context=d, object_type="domain_aliases", count=len(self.aliases)) for alias in self.aliases: if d.domainalias_set.filter(name=alias).exists(): continue options = {"creator": user} if domalias_post_create else {} DomainAlias(name=alias, target=d, enabled=d.enabled).save( **options) return d class DomainFormOptions(forms.Form): """A form containing options for domain creation.""" create_dom_admin = YesNoField( label=ugettext_lazy("Create a domain administrator"), initial=False, help_text=ugettext_lazy( "Automatically create an administrator for this domain" ) ) dom_admin_username = forms.CharField( label=ugettext_lazy("Name"), initial="admin", help_text=ugettext_lazy( "The administrator's name. Don't include the domain's name here, " "it will be automatically appended." ), required=False ) random_password = YesNoField( label=ugettext_lazy("Random password"), initial=False, help_text=ugettext_lazy( "Generate a random password for the administrator." ), required=False ) with_mailbox = YesNoField( label=ugettext_lazy("With a mailbox"), initial=True, help_text=ugettext_lazy( "Create a mailbox for the administrator." ), required=False ) create_aliases = YesNoField( label=ugettext_lazy("Create aliases"), initial=True, help_text=ugettext_lazy( "Automatically create standard aliases for this domain" ), required=False ) def __init__(self, user, *args, **kwargs): super(DomainFormOptions, self).__init__(*args, **kwargs) results = core_signals.user_can_set_role.send( sender=self.__class__, user=user, role="DomainAdmins") if False in [result[1] for result in results]: self.fields = {} return def clean_dom_admin_username(self): """Ensure admin username is an email address.""" if "@" in self.cleaned_data["dom_admin_username"]: raise forms.ValidationError(_("Invalid format")) return self.cleaned_data["dom_admin_username"] def clean(self): """Check required values.""" cleaned_data = super(DomainFormOptions, self).clean() if cleaned_data.get("create_dom_admin"): if not cleaned_data.get("dom_admin_username"): self.add_error( "dom_admin_username", _("This field is required.")) if "create_aliases" not in cleaned_data: self.add_error( "create_aliases", _("This field is required.")) return cleaned_data def save(self, *args, **kwargs): if not self.fields: return if not self.cleaned_data["create_dom_admin"]: return user = kwargs.pop("user") domain = kwargs.pop("domain") username = "%s@%s" % ( self.cleaned_data["dom_admin_username"], domain.name) try: da = User.objects.get(username=username) except User.DoesNotExist: pass else: raise Conflict(_("User '%s' already exists") % username) core_signals.can_create_object.send( self.__class__, context=user, klass=Mailbox) da = User(username=username, email=username, is_active=True) if self.cleaned_data["random_password"]: password = lib.make_password() else: password = param_tools.get_global_parameter( "default_password", app="core") da.set_password(password) da.save() da.role = "DomainAdmins" da.post_create(user) if self.cleaned_data["with_mailbox"]: dom_admin_username = self.cleaned_data["dom_admin_username"] mb = Mailbox( address=dom_admin_username, domain=domain, user=da, use_domain_quota=True ) mb.set_quota( override_rules=user.has_perm("admin.change_domain")) mb.save(creator=user) condition = ( domain.type == "domain" and self.cleaned_data["create_aliases"] and dom_admin_username != "postmaster" ) if condition: core_signals.can_create_object.send( self.__class__, context=user, klass=Alias) address = u"postmaster@{}".format(domain.name) alias = Alias.objects.create( address=address, domain=domain, enabled=True) alias.set_recipients([mb.full_address]) alias.post_create(user) domain.add_admin(da) class DomainForm(TabForms): """Domain edition form.""" def __init__(self, request, *args, **kwargs): self.user = request.user self.forms = [] self.domain = None if self.user.has_perm("admin.change_domain"): self.forms.append({ "id": "general", "title": _("General"), "formtpl": "admin/domain_general_form.html", "cls": DomainFormGeneral, "mandatory": True, "new_args": [request.user] }) cbargs = {"user": self.user} if "instances" in kwargs: self.domain = kwargs["instances"]["general"] cbargs["domain"] = self.domain results = signals.extra_domain_forms.send( sender=self.__class__, **cbargs) self.forms += reduce( lambda a, b: a + b, [result[1] for result in results]) if not self.forms: self.active_id = "admins" super(DomainForm, self).__init__(request, *args, **kwargs) def extra_context(self, context): """Add information to template context.""" context.update({ "title": self.domain.name, "action": reverse( "admin:domain_change", args=[self.domain.pk]), "formid": "domform", "domain": self.domain }) def is_valid(self): """Custom validation. We just save the current name before it is potentially modified. """ if "general" in self.instances: instance = self.instances["general"] instance.oldname = instance.name return super(DomainForm, self).is_valid() def save(self): """Custom save method. As forms interact with each other, it is easier to make custom code to save them. """ if not self.forms: return first_form = self.forms[0]["instance"] options = {} if isinstance(first_form, DomainFormGeneral): domain = first_form.save( self.request.user, domalias_post_create=True) options.update({"domain": domain}) else: first_form.save(self.request.user) for f in self.forms[1:]: f["instance"].save(self.request.user, **options) def done(self): return render_to_json_response(_("Domain modified")) class DomainWizard(WizardForm): """Domain creation wizard.""" def __init__(self, request): super(DomainWizard, self).__init__(request) self.add_step( WizardStep( "general", DomainFormGeneral, _("General"), "admin/domain_general_form.html", [request.user] ) ) results = signals.extra_domain_wizard_steps.send(sender=self.__class__) for result in results: for step in result[1]: self.add_step(step) self.add_step( WizardStep( "options", DomainFormOptions, _("Options"), "admin/domain_options_form.html", [self.request.user] ) ) def extra_context(self, context): context.update({ "title": _("New domain"), "action": reverse("admin:domain_add"), "formid": "domform" }) def done(self): genform = self.first_step.form domain = genform.save(self.request.user) domain.post_create(self.request.user) for step in self.steps[1:]: if not step.check_access(self): continue step.form.save(user=self.request.user, domain=domain) return render_to_json_response(_("Domain created"))