Server IP : 85.214.239.14 / Your IP : 3.147.126.199 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/ |
Upload File : |
"""Internal library for admin.""" import ipaddress import logging import random import string from functools import wraps from itertools import chain import dns.resolver from dns.name import IDNA_2008_UTS_46 from django.contrib.contenttypes.models import ContentType from django.db.models import Q from django.utils.encoding import smart_text from django.utils.translation import ugettext as _ from modoboa.core import signals as core_signals from modoboa.core.models import User from modoboa.lib.exceptions import PermDeniedException from modoboa.parameters import tools as param_tools from . import signals from .models import Alias, Domain, DomainAlias def needs_mailbox(): """Check if the current user owns at least one mailbox Some applications (the webmail for example) need a mailbox to work. """ def decorator(f): @wraps(f) def wrapped_f(request, *args, **kwargs): if hasattr(request.user, "mailbox"): return f(request, *args, **kwargs) raise PermDeniedException(_("A mailbox is required")) return wrapped_f return decorator def get_identities(user, searchquery=None, idtfilter=None, grpfilter=None): """Return all the identities owned by a user. :param user: the desired user :param str searchquery: search pattern :param list idtfilter: identity type filters :param list grpfilter: group names filters :return: a queryset """ accounts = [] if idtfilter is None or not idtfilter or idtfilter == "account": ids = user.objectaccess_set \ .filter(content_type=ContentType.objects.get_for_model(user)) \ .values_list("object_id", flat=True) q = Q(pk__in=ids) if searchquery is not None: q &= Q(username__icontains=searchquery) \ | Q(email__icontains=searchquery) if grpfilter is not None and grpfilter: if grpfilter == "SuperAdmins": q &= Q(is_superuser=True) else: q &= Q(groups__name=grpfilter) accounts = User.objects.filter(q).prefetch_related("groups") aliases = [] if idtfilter is None or not idtfilter \ or (idtfilter in ["alias", "forward", "dlist"]): alct = ContentType.objects.get_for_model(Alias) ids = user.objectaccess_set.filter(content_type=alct) \ .values_list("object_id", flat=True) q = Q(pk__in=ids, internal=False) if searchquery is not None: q &= ( Q(address__icontains=searchquery) | Q(domain__name__icontains=searchquery) ) aliases = Alias.objects.select_related("domain").filter(q) if idtfilter is not None and idtfilter: aliases = [al for al in aliases if al.type == idtfilter] return chain(accounts, aliases) def get_domains(user, domfilter=None, searchquery=None, **extrafilters): """Return all the domains the user can access. :param ``User`` user: user object :param str searchquery: filter :rtype: list :return: a list of domains and/or relay domains """ domains = ( Domain.objects.get_for_admin(user).prefetch_related("domainalias_set")) if domfilter: domains = domains.filter(type=domfilter) if searchquery is not None: q = Q(name__contains=searchquery) q |= Q(domainalias__name__contains=searchquery) domains = domains.filter(q).distinct() results = signals.extra_domain_qset_filters.send( sender="get_domains", domfilter=domfilter, extrafilters=extrafilters) if results: qset_filters = {} for result in results: qset_filters.update(result[1]) domains = domains.filter(**qset_filters) return domains def check_if_domain_exists(name, dtypes): """Check if a domain already exists. We not only look for domains, we also look for every object that could conflict with a domain (domain alias, etc.) """ for dtype, label in dtypes: if dtype.objects.filter(name=name).exists(): return label return None def import_domain(user, row, formopts): """Specific code for domains import""" if not user.has_perm("admin.add_domain"): raise PermDeniedException(_("You are not allowed to import domains")) core_signals.can_create_object.send( sender="import", context=user, klass=Domain) dom = Domain() dom.from_csv(user, row) def import_domainalias(user, row, formopts): """Specific code for domain aliases import""" if not user.has_perm("admin.add_domainalias"): raise PermDeniedException( _("You are not allowed to import domain aliases.")) core_signals.can_create_object.send( sender="import", context=user, klass=DomainAlias) domalias = DomainAlias() domalias.from_csv(user, row) def import_account(user, row, formopts): """Specific code for accounts import""" account = User() account.from_csv(user, row, formopts["crypt_password"]) def _import_alias(user, row, **kwargs): """Specific code for aliases import""" alias = Alias() alias.from_csv(user, row, **kwargs) def import_alias(user, row, formopts): _import_alias(user, row, expected_elements=4) def import_forward(user, row, formopts): _import_alias(user, row, expected_elements=4) def import_dlist(user, row, formopts): _import_alias(user, row) def get_dns_resolver(): """Return a DNS resolver object.""" dns_server = param_tools.get_global_parameter("custom_dns_server") if dns_server: resolver = dns.resolver.Resolver() resolver.nameservers = [dns_server] else: resolver = dns.resolver return resolver def get_dns_records(name, typ, resolver=None): """Retrieve DNS records for given name and type.""" logger = logging.getLogger("modoboa.admin") if not resolver: resolver = get_dns_resolver() try: dns_answers = resolver.query(name, typ) except dns.resolver.NXDOMAIN as e: logger.error(_("No DNS record found for %s") % name, exc_info=e) except dns.resolver.NoAnswer as e: logger.error( _("No %(type)s record for %(name)s") % {"type": typ, "name": name}, exc_info=e ) except dns.resolver.NoNameservers as e: logger.error(_("No working name servers found"), exc_info=e) except dns.resolver.Timeout as e: logger.warning( _("DNS resolution timeout, unable to query %s at the moment") % name, exc_info=e) else: return dns_answers return None def get_domain_mx_list(domain): """Return a list of MX IP address for domain.""" result = [] logger = logging.getLogger("modoboa.admin") resolver = get_dns_resolver() dns_answers = get_dns_records(domain, "MX", resolver) if dns_answers is None: return result for dns_answer in dns_answers: mx_domain = dns_answer.exchange.to_unicode( omit_final_dot=True, idna_codec=IDNA_2008_UTS_46) for rtype in ["A", "AAAA"]: ip_answers = get_dns_records(mx_domain, rtype, resolver) if not ip_answers: continue for ip_answer in ip_answers: try: address_smart = smart_text(ip_answer.address) mx_ip = ipaddress.ip_address(address_smart) except ValueError as e: logger.warning( _("Invalid IP address format for " "{domain}; {addr}").format( domain=mx_domain, addr=smart_text(ip_answer.address) ), exc_info=e) else: result.append((mx_domain, mx_ip)) return result def domain_has_authorized_mx(name): """Check if domain has authorized mx record at least.""" valid_mxs = param_tools.get_global_parameter("valid_mxs") valid_mxs = [ipaddress.ip_network(smart_text(v.strip())) for v in valid_mxs.split() if v.strip()] domain_mxs = get_domain_mx_list(name) for _mx_addr, mx_ip_addr in domain_mxs: for subnet in valid_mxs: if mx_ip_addr in subnet: return True return False def make_password(): """Create a random password.""" length = int( param_tools.get_global_parameter("random_password_length", app="core") ) return "".join( random.SystemRandom().choice( string.ascii_letters + string.digits) for _ in range(length))