Server IP : 85.214.239.14 / Your IP : 3.149.232.219 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/modoboa/env/lib64/python3.5/site-packages/modoboa_amavis/ |
Upload File : |
# -*- coding: utf-8 -*- import os import re import socket import struct from email.utils import parseaddr from functools import wraps import idna from django.conf import settings from django.contrib.auth.views import redirect_to_login from django.urls import reverse from django.utils import six from django.utils.translation import ugettext as _ from modoboa.admin import models as admin_models from modoboa.lib.email_utils import ( split_address, split_local_part, split_mailbox ) from modoboa.lib.exceptions import InternalError from modoboa.lib.sysutils import exec_cmd from modoboa.lib.web_utils import NavigationParameters from modoboa.parameters import tools as param_tools from .models import Policy, Users from .utils import smart_bytes, smart_text def selfservice(ssfunc=None): """Decorator used to expose views to the 'self-service' feature The 'self-service' feature allows users to act on quarantined messages without beeing authenticated. This decorator only acts as a 'router'. :param ssfunc: the function to call if the 'self-service' pre-requisites are satisfied """ def decorator(f): @wraps(f) def wrapped_f(request, *args, **kwargs): secret_id = request.GET.get("secret_id") if not secret_id and request.user.is_authenticated: return f(request, *args, **kwargs) if not param_tools.get_global_parameter("self_service"): return redirect_to_login( reverse("modoboa_amavis:index") ) return ssfunc(request, *args, **kwargs) return wrapped_f return decorator class AMrelease(object): def __init__(self): conf = dict(param_tools.get_global_parameters("modoboa_amavis")) try: if conf["am_pdp_mode"] == "inet": self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((conf["am_pdp_host"], conf["am_pdp_port"])) else: self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect(conf["am_pdp_socket"]) except socket.error as err: raise InternalError( _("Connection to amavis failed: %s" % str(err)) ) def decode(self, answer): def repl(match): return struct.pack("B", int(match.group(0)[1:], 16)) return re.sub(br"%([0-9a-fA-F]{2})", repl, answer) def __del__(self): self.sock.close() def sendreq(self, mailid, secretid, recipient, *others): self.sock.send( smart_bytes("""request=release mail_id=%s secret_id=%s quar_type=Q recipient=%s """ % (smart_text(mailid), smart_text(secretid), smart_text(recipient)))) answer = self.sock.recv(1024) answer = self.decode(answer) if re.search(br"250 [\d\.]+ Ok", answer): return True return False class SpamassassinClient(object): """A stupid spamassassin client.""" def __init__(self, user, recipient_db): """Constructor.""" conf = dict(param_tools.get_global_parameters("modoboa_amavis")) self._sa_is_local = conf["sa_is_local"] self._default_username = conf["default_user"] self._recipient_db = recipient_db self._setup_cache = {} self._username_cache = [] if user.role == "SimpleUsers": if conf["user_level_learning"]: self._username = user.email else: self._username = None self.error = None if self._sa_is_local: self._learn_cmd = self._find_binary("sa-learn") self._learn_cmd += " --{0} --no-sync -u {1}" self._learn_cmd_kwargs = {} self._expected_exit_codes = [0] self._sync_cmd = self._find_binary("sa-learn") self._sync_cmd += " -u {0} --sync" else: self._learn_cmd = self._find_binary("spamc") self._learn_cmd += " -d {0} -p {1}".format( conf["spamd_address"], conf["spamd_port"] ) self._learn_cmd += " -L {0} -u {1}" self._learn_cmd_kwargs = {} self._expected_exit_codes = [5, 6] def _find_binary(self, name): """Find path to binary.""" code, output = exec_cmd("which {}".format(name)) if not code: return smart_text(output).strip() known_paths = getattr(settings, "SA_LOOKUP_PATH", ("/usr/bin", )) for path in known_paths: bpath = os.path.join(path, name) if os.path.isfile(bpath) and os.access(bpath, os.X_OK): return bpath raise InternalError(_("Failed to find {} binary").format(name)) def _get_mailbox_from_rcpt(self, rcpt): """Retrieve a mailbox from a recipient address.""" local_part, domname, extension = ( split_mailbox(rcpt, return_extension=True)) try: mailbox = admin_models.Mailbox.objects.select_related( "domain").get(address=local_part, domain__name=domname) except admin_models.Mailbox.DoesNotExist: alias = admin_models.Alias.objects.filter( address="{}@{}".format(local_part, domname), aliasrecipient__r_mailbox__isnull=False).first() if not alias: raise InternalError(_("No recipient found")) if alias.type != "alias": return None mailbox = alias.aliasrecipient_set.filter( r_mailbox__isnull=False).first() return mailbox def _get_domain_from_rcpt(self, rcpt): """Retrieve a domain from a recipient address.""" local_part, domname = split_mailbox(rcpt) domain = admin_models.Domain.objects.filter(name=domname).first() if not domain: raise InternalError(_("Local domain not found")) return domain def _learn(self, rcpt, msg, mtype): """Internal method to call the learning command.""" if self._username is None: if self._recipient_db == "global": username = self._default_username elif self._recipient_db == "domain": domain = self._get_domain_from_rcpt(rcpt) username = domain.name condition = ( username not in self._setup_cache and setup_manual_learning_for_domain(domain)) if condition: self._setup_cache[username] = True else: mbox = self._get_mailbox_from_rcpt(rcpt) if mbox is None: username = self._default_username else: if isinstance(mbox, admin_models.Mailbox): username = mbox.full_address elif isinstance(mbox, admin_models.AliasRecipient): username = mbox.address else: username = None condition = ( username is not None and username not in self._setup_cache and setup_manual_learning_for_mbox(mbox)) if condition: self._setup_cache[username] = True else: username = self._username if username not in self._setup_cache: mbox = self._get_mailbox_from_rcpt(username) if mbox and setup_manual_learning_for_mbox(mbox): self._setup_cache[username] = True if username not in self._username_cache: self._username_cache.append(username) cmd = self._learn_cmd.format(mtype, username) code, output = exec_cmd( cmd, pinput=smart_bytes(msg), **self._learn_cmd_kwargs) if code in self._expected_exit_codes: return True self.error = smart_text(output) return False def learn_spam(self, rcpt, msg): """Learn new spam.""" return self._learn(rcpt, msg, "spam") def learn_ham(self, rcpt, msg): """Learn new ham.""" return self._learn(rcpt, msg, "ham") def done(self): """Call this method at the end of the processing.""" if self._sa_is_local: for username in self._username_cache: cmd = self._sync_cmd.format(username) exec_cmd(cmd, **self._learn_cmd_kwargs) class QuarantineNavigationParameters(NavigationParameters): """ Specific NavigationParameters subclass for the quarantine. """ def __init__(self, request): super(QuarantineNavigationParameters, self).__init__( request, "quarantine_navparams" ) self.parameters += [ ("pattern", "", False), ("criteria", "from_addr", False), ("msgtype", None, False), ("viewrequests", None, False) ] def _store_page(self): """Specific method to store the current page.""" if self.request.GET.get("reset_page", None) or "page" not in self: self["page"] = 1 else: page = self.request.GET.get("page", None) if page is not None: self["page"] = int(page) def back_to_listing(self): """Return the current listing URL. Looks into the user's session and the current request to build the URL. :return: a string """ url = "listing" params = [] navparams = self.request.session[self.sessionkey] if "page" in navparams: params += ["page=%s" % navparams["page"]] if "order" in navparams: params += ["sort_order=%s" % navparams["order"]] params += ["%s=%s" % (p[0], navparams[p[0]]) for p in self.parameters if p[0] in navparams] if params: url += "?%s" % ("&".join(params)) return url def create_user_and_policy(name, priority=7): """Create records. Create two records (a user and a policy) using :keyword:`name` as an identifier. :param str name: name :return: the new ``Policy`` object """ if Users.objects.filter(email=name).exists(): return Policy.objects.get(policy_name=name[:32]) policy = Policy.objects.create(policy_name=name[:32]) Users.objects.create( email=name, fullname=name, priority=priority, policy=policy ) return policy def create_user_and_use_policy(name, policy, priority=7): """Create a *users* record and use an existing policy. :param str name: user record name :param str policy: string or Policy instance """ if isinstance(policy, six.string_types): policy = Policy.objects.get(policy_name=policy[:32]) Users.objects.get_or_create( email=name, fullname=name, priority=priority, policy=policy ) def update_user_and_policy(oldname, newname): """Update records. :param str oldname: old name :param str newname: new name """ if oldname == newname: return u = Users.objects.get(email=oldname) u.email = newname u.fullname = newname u.policy.policy_name = newname[:32] u.policy.save(update_fields=["policy_name"]) u.save() def delete_user_and_policy(name): """Delete records. :param str name: identifier """ try: u = Users.objects.get(email=name) except Users.DoesNotExist: return u.policy.delete() u.delete() def delete_user(name): """Delete a *users* record. :param str name: user record name """ try: Users.objects.get(email=name).delete() except Users.DoesNotExist: pass def manual_learning_enabled(user): """Check if manual learning is enabled or not. Also check for :kw:`user` if necessary. :return: True if learning is enabled, False otherwise. """ conf = dict(param_tools.get_global_parameters("modoboa_amavis")) if not conf["manual_learning"]: return False if user.role != "SuperAdmins": if user.has_perm("admin.view_domains"): manual_learning = ( conf["domain_level_learning"] or conf["user_level_learning"]) else: manual_learning = conf["user_level_learning"] return manual_learning return True def setup_manual_learning_for_domain(domain): """Setup manual learning if necessary. :return: True if learning has been setup, False otherwise """ if Policy.objects.filter(sa_username=domain.name).exists(): return False policy = Policy.objects.get(policy_name="@{}".format(domain.name[:32])) policy.sa_username = domain.name policy.save() return True def setup_manual_learning_for_mbox(mbox): """Setup manual learning if necessary. :return: True if learning has been setup, False otherwise """ result = False if (isinstance(mbox, admin_models.AliasRecipient) and mbox.r_mailbox is not None): mbox = mbox.r_mailbox if isinstance(mbox, admin_models.Mailbox): pname = mbox.full_address[:32] if not Policy.objects.filter(policy_name=pname).exists(): policy = create_user_and_policy(pname) policy.sa_username = mbox.full_address policy.save() for alias in mbox.alias_addresses: create_user_and_use_policy(alias, policy) result = True return result def make_query_args(address, exact_extension=True, wildcard=None, domain_search=False): assert isinstance(address, six.text_type),\ "address should be of type %s" % six.text_type.__name__ conf = dict(param_tools.get_global_parameters("modoboa_amavis")) local_part, domain = split_address(address) if not conf["localpart_is_case_sensitive"]: local_part = local_part.lower() if domain: domain = domain.lstrip("@").rstrip(".") domain = domain.lower() orig_domain = domain domain = idna.encode(domain, uts46=True).decode("ascii") delimiter = conf["recipient_delimiter"] local_part, extension = split_local_part(local_part, delimiter=delimiter) query_args = [] if ( conf["localpart_is_case_sensitive"] or (domain and domain != orig_domain) ): query_args.append(address) if extension: query_args.append("%s%s%s@%s" % ( local_part, delimiter, extension, domain)) if delimiter and not exact_extension and wildcard: query_args.append("%s%s%s@%s" % ( local_part, delimiter, wildcard, domain)) query_args.append("%s@%s" % (local_part, domain)) if domain_search: query_args.append("@%s" % domain) query_args.append("@.") return query_args def cleanup_email_address(address): address = parseaddr(address) if address[0]: return "%s <%s>" % address return address[1]