Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.221.217.100
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/task/3/cwd/srv/modoboa/env/lib/python3.5/site-packages/modoboa_webmail/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/task/3/cwd/srv/modoboa/env/lib/python3.5/site-packages/modoboa_webmail/lib/imapemail.py
"""
Set of classes to manipulate/display emails inside the webmail.
"""
import os
import re
import email

import chardet
import six

from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
from django.urls import reverse
from django.utils.encoding import smart_text
from django.utils.html import conditional_escape
from django.utils.translation import ugettext as _

from modoboa.core.extensions import exts_pool
from modoboa.lib import u2u_decode
from modoboa.lib.email_utils import Email, EmailAddress

from . import imapheader
from .imaputils import (
    get_imapconnector, BodyStructure
)
from .utils import decode_payload


class ImapEmail(Email):

    """
    A class to represent an email fetched from an IMAP server.
    """

    headernames = [
        ('From', True),
        ('To', True),
        ('Cc', True),
        ('Date', True),
        ('Subject', True),
    ]

    def __init__(self, request, *args, **kwargs):
        super(ImapEmail, self).__init__(*args, **kwargs)
        self.request = request
        self.imapc = get_imapconnector(request)
        self.mbox, self.mailid = self.mailid.split(":")

    def _insert_contact_links(self, addresses):
        """Insert 'add to address book' links."""
        result = []
        title = _("Add to contacts")
        url = reverse("api:contact-list")
        link_tpl = (
            u" <a class='addcontact' href='{}' title='{}'>"
            u"<span class='fa fa-vcard'></span></a>"
        )
        for address in addresses:
            address += link_tpl.format(url, title)
            result.append(address)
        return result

    def fetch_headers(self, raw_addresses=False):
        """Fetch message headers from server."""
        msg = self.imapc.fetchmail(
            self.mbox, self.mailid, readonly=False,
            what=" ".join(self.headers_as_list)
        )
        headers = msg["BODY[HEADER.FIELDS ({})]".format(self.headers_as_text)]
        self.fetch_body_structure(msg)
        msg = email.message_from_string(headers)
        contacts_plugin_installed = exts_pool.get_extension("modoboa_contacts")
        headers_with_address = ("From", "To", "Cc", "Reply-To")
        for hdr in self.headernames:
            label = hdr[0]
            hdrvalue = self.get_header(msg, label, raw=raw_addresses)
            if not hdrvalue:
                continue
            safe = False
            if hdr[1]:
                if label in headers_with_address:
                    if contacts_plugin_installed and not raw_addresses:
                        hdrvalue = self._insert_contact_links(hdrvalue)
                    hdrvalue = ", ".join(hdrvalue)
                    safe = True
                self.headers += [
                    {"name": label, "value": hdrvalue, "safe": safe}]
            label = re.sub("-", "_", label)
            setattr(self, label, hdrvalue)

    def get_header(self, msg, hdrname, **kwargs):
        """Look for a particular header.

        We also try to decode the default value.
        """
        hdrvalue = super(ImapEmail, self).get_header(msg, hdrname)
        if not hdrvalue:
            return ""
        try:
            key = re.sub("-", "_", hdrname).lower()
            hdrvalue = getattr(imapheader, "parse_%s" % key)(
                hdrvalue, **kwargs)
        except AttributeError:
            pass
        return hdrvalue

    def fetch_body_structure(self, msg=None):
        """Fetch BODYSTRUCTURE for email."""
        if msg is None:
            msg = self.imapc.fetchmail(
                self.mbox, self.mailid, readonly=False
            )
        self.bs = BodyStructure(msg["BODYSTRUCTURE"])
        self._find_attachments()
        if self.dformat not in ["plain", "html"]:
            self.dformat = self.request.user.parameters.get_value(
                self.dformat)
        fallback_fmt = "html" if self.dformat == "plain" else "plain"
        self.mformat = (
            self.dformat if self.dformat in self.bs.contents else fallback_fmt)

    @property
    def headers_as_list(self):
        return [hdr[0].upper() for hdr in self.headernames]

    @property
    def headers_as_text(self):
        return " ".join(self.headers_as_list)

    @property
    def body(self):
        """Load email's body.

        This operation has to be made "on demand" because it requires
        a communication with the IMAP server.
        """
        if self._body is None:
            self.fetch_body_structure()
            bodyc = u""
            parts = self.bs.contents.get(self.mformat, [])
            for part in parts:
                pnum = part["pnum"]
                data = self.imapc._cmd(
                    "FETCH", self.mailid, "(BODY.PEEK[%s])" % pnum
                )
                if not data or not int(self.mailid) in data:
                    continue
                content = decode_payload(
                    part["encoding"], data[int(self.mailid)]["BODY[%s]" % pnum]
                )
                if not isinstance(content, six.text_type):
                    charset = self._find_content_charset(part)
                    if charset is not None:
                        try:
                            content = content.decode(charset)
                        except (UnicodeDecodeError, LookupError):
                            result = chardet.detect(content)
                            content = content.decode(result["encoding"])
                bodyc += content
            self._fetch_inlines()
            bodyc = getattr(self, "_post_process_%s" % self.mformat)(bodyc)
            self._body = getattr(self, "viewmail_%s" % self.mformat)(
                bodyc, links=self.links
            )
        return self._body

    @body.setter
    def body(self, value):
        self._body = value

    @property
    def source(self):
        """Retrieve email source."""
        return self.imapc.fetchmail(
            self.mbox, self.mailid, what="source")['BODY[]']

    def _find_content_charset(self, part):
        for pos, elem in enumerate(part["params"]):
            if elem == "charset":
                return part["params"][pos + 1]
        return None

    def _find_attachments(self):
        """Retrieve attachments from the parsed body structure.

        We try to find and decode a file name for each attachment. If
        we failed, a generic name will be used (ie. part_1, part_2, ...).
        """
        for att in self.bs.attachments:
            attname = "part_{}".format(att["pnum"])
            if "params" in att and att["params"] != "NIL":
                for pos, value in enumerate(att["params"]):
                    if not value.startswith("name"):
                        continue
                    attname = (
                        u2u_decode.u2u_decode(att["params"][pos + 1])
                        .strip("\r\t\n")
                    )
                    break
            elif "disposition" in att and len(att["disposition"]) > 1:
                for pos, value in enumerate(att["disposition"][1]):
                    if not value.startswith("filename"):
                        continue
                    attname = u2u_decode.u2u_decode(
                        att["disposition"][1][pos + 1]
                    ).strip("\r\t\n")
                    break
            self.attachments[att["pnum"]] = smart_text(attname)

    def _fetch_inlines(self):
        """Store inline images on filesystem to display them."""
        for cid, params in list(self.bs.inlines.items()):
            if re.search(r"\.\.", cid):
                continue
            fname = "modoboa_webmail/%s_%s" % (self.mailid, cid)
            path = os.path.join(settings.MEDIA_ROOT, fname)
            params["fname"] = os.path.join(settings.MEDIA_URL, fname)
            if default_storage.exists(path):
                continue
            pdef, content = self.imapc.fetchpart(
                self.mailid, self.mbox, params["pnum"]
            )
            default_storage.save(
                path, ContentFile(decode_payload(params["encoding"], content)))

    def _map_cid(self, url):
        m = re.match(".*cid:(.+)", url)
        if m:
            if m.group(1) in self.bs.inlines:
                return self.bs.inlines[m.group(1)]["fname"]
        return url

    def fetch_attachment(self, pnum):
        """Fetch an attachment from the IMAP server."""
        return self.imapc.fetchpart(self.mailid, self.mbox, pnum)


class Modifier(ImapEmail):
    """Message modifier."""

    def __init__(self, form, request, *args, **kwargs):
        kwargs["dformat"] = request.user.parameters.get_value("editor")
        super(Modifier, self).__init__(request, *args, **kwargs)
        self.form = form
        self.fetch_headers(raw_addresses=True)
        getattr(self, "_modify_%s" % self.dformat)()

    def _modify_plain(self):
        self.body = re.sub("</?pre>", "", self.body)

    def _modify_html(self):
        if self.dformat == "html" and self.mformat != self.dformat:
            self.body = re.sub("</?pre>", "", self.body)
            self.body = re.sub("\n", "<br>", self.body)

    @property
    def subject(self):
        """Just a shortcut to return a subject in any case."""
        return self.Subject if hasattr(self, "Subject") else ""


class ReplyModifier(Modifier):
    """Modify a message to reply to it."""

    headernames = ImapEmail.headernames + [
        ("Reply-To", True),
        ("Message-ID", False)
    ]

    def __init__(self, *args, **kwargs):
        super(ReplyModifier, self).__init__(*args, **kwargs)

        self.textheader = u"%s %s" % (self.From, _("wrote:"))
        if self.dformat == "html":
            self.textheader = u"<p>{}</p>".format(self.textheader)
        if hasattr(self, "Message_ID"):
            self.form.fields["origmsgid"].initial = self.Message_ID
        if not hasattr(self, "Reply_To"):
            self.form.fields["to"].initial = self.From
        else:
            self.form.fields["to"].initial = self.Reply_To
        if self.request.GET.get("all", "0") == "1":  # reply-all
            self.form.fields["cc"].initial = ""
            toparse = self.To.split(",")
            if hasattr(self, 'Cc'):
                toparse += self.Cc.split(",")
            for addr in toparse:
                tmp = EmailAddress(addr)
                if tmp.address and tmp.address == self.request.user.username:
                    continue
                if self.form.fields["cc"].initial != "":
                    self.form.fields["cc"].initial += ", "
                self.form.fields["cc"].initial += tmp.fulladdress
        m = re.match(r"re\s*:\s*.+", self.subject.lower())
        if m:
            self.form.fields["subject"].initial = self.subject
        else:
            self.form.fields["subject"].initial = "Re: %s" % self.subject

    def _modify_plain(self):
        super(ReplyModifier, self)._modify_plain()
        lines = self.body.split('\n')
        body = ""
        for l in lines:
            if body != "":
                body += "\n"
            body += ">%s" % l
        self.body = body


class ForwardModifier(Modifier):

    """Modify a message so it can be forwarded."""

    def __init__(self, *args, **kwargs):
        super(ForwardModifier, self).__init__(*args, **kwargs)
        self._header()
        self.form.fields["subject"].initial = u"Fwd: %s" % self.subject

    def __getfunc(self, name):
        return getattr(self, "%s_%s" % (name, self.dformat))

    def _header(self):
        self.textheader = u"{}\n".format(self.__getfunc("_header_begin")())
        self.textheader += (
            self.__getfunc("_header_line")(_("Subject"), self.subject))
        self.textheader += (
            self.__getfunc("_header_line")(_("Date"), self.Date))
        for hdr in ["From", "To", "Reply-To"]:
            try:
                key = re.sub("-", "_", hdr)
                value = getattr(self, key)
                self.textheader += (
                    self.__getfunc("_header_line")(_(hdr), value))
            except AttributeError:
                pass
        self.textheader += self.__getfunc("_header_end")()

    def _header_begin_plain(self):
        return u"----- %s -----" % _("Original message")

    def _header_begin_html(self):
        return u"----- %s -----" % _("Original message")

    def _header_line_plain(self, key, value):
        return u"%s: %s\n" % (key, value)

    def _header_line_html(self, key, value):
        return u"<p>%s: %s</p>" % (key, conditional_escape(value))

    def _header_end_plain(self):
        return u"\n"

    def _header_end_html(self):
        return u""

Anon7 - 2022
AnonSec Team