Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 52.14.213.73
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/2/cwd/proc/2/cwd/srv/modoboa/env/lib/python3.5/site-packages/modoboa_webmail/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/2/cwd/proc/2/cwd/srv/modoboa/env/lib/python3.5/site-packages/modoboa_webmail/lib/imaputils.py
# coding: utf-8
"""
:mod:`imaputils` --- Extra IMAPv4 utilities
-------------------------------------------
"""

import email
from functools import wraps
import imaplib
import re
import socket
import ssl
import time

import six

from django.utils.encoding import smart_bytes
from django.utils.translation import ugettext as _

from modoboa.lib import imap_utf7  # noqa
from modoboa.lib.connections import ConnectionsManager
from modoboa.lib.exceptions import InternalError
from modoboa.parameters import tools as param_tools

from ..exceptions import ImapError, WebmailInternalError
from .fetch_parser import FetchResponseParser

# imaplib.Debug = 4

# workaround for the "got more than 10000 bytes" exception. MAXLINE
# value set to 1M, as on latest python versions.
MAXLINE = 1000000
if hasattr(imaplib, "_MAXLINE") and getattr(imaplib, "_MAXLINE") < MAXLINE:
    setattr(imaplib, "_MAXLINE", MAXLINE)


class capability(object):

    """
    Simple decorator to check if the server presents the required
    capability. If not, a fallback method is called instead.

    :param name: the capability name (upper case)
    :param fallback_method: a method's name
    """

    def __init__(self, name, fallback_method):
        self.name = name
        self.fallback_method = fallback_method

    def __call__(self, method):
        @wraps(method)
        def wrapped_func(cls, *args, **kwargs):
            if self.name in cls.capabilities:
                return method(cls, *args, **kwargs)
            return getattr(cls, self.fallback_method)(cls, **kwargs)

        return wrapped_func


class BodyStructure(object):

    """
    BODYSTRUCTURE response parser.

    Just a simple class that tries to distinguish content parts from
    attachments.
    """

    def __init__(self, definition=None):
        self.is_multipart = False
        self.contents = {}
        self.attachments = []
        self.inlines = {}

        if definition is not None:
            self.load_from_definition(definition)

    def __store_part(self, definition, pnum, multisubtype):
        """Store the given message part in the appropriate category.

        This method sort parts in two categories:

        * contents (what is going to be displayed)
        * attachments

        As there is no official definition about what is a content and
        what is an attachment, the following rules are applied:

        * If the MIME type is text/plain or text/html:

         * If no previous part of this type has already been seen,
           it's a content
         * Otherwise it's an attachment

        * Else, if the multipart subtype is related, we consider this
          part as content because it is certainly an embedded image

        * Any other MIME type is considered as an attachment (for now)

        :param definition: a part definition (list)
        :param pnum: the part's number
        :param multisubtype: the multipart subtype

        """
        pnum = "1" if pnum is None else pnum
        params = {
            "pnum": pnum,
            "params": definition[2],
            "cid": definition[3],
            "description": definition[4],
            "encoding": definition[5],
            "size": definition[6]
        }
        mtype = definition[0].lower()
        subtype = definition[1].lower()
        ftype = "%s/%s" % (definition[0].lower(), subtype)
        if ftype in ("text/plain", "text/html"):
            if subtype not in self.contents:
                self.contents[subtype] = [params]
            else:
                self.contents[subtype].append(params)
            return
        elif multisubtype in ["related"]:
            self.inlines[params["cid"].strip("<>")] = params
            return

        params["Content-Type"] = ftype
        if len(definition) > 7:
            extensions = ["md5", "disposition", "language", "location"]
            if mtype == "text":
                extensions = ["textlines"] + extensions
            elif ftype == "message/rfc822":
                extensions = [
                    "envelopestruct",
                    "bodystruct",
                    "textlines"] + extensions
            for idx, value in enumerate(definition[7:]):
                params[extensions[idx]] = value

        self.attachments += [params]

    def load_from_definition(self, definition, multisubtype=None):
        for mp in definition:
            if isinstance(mp, list):
                if isinstance(mp[0], list):
                    self.load_from_definition(mp, mp[1])
                else:
                    self.load_from_definition(mp)
            elif isinstance(mp, dict):
                if isinstance(mp["struct"][0], list):
                    self.load_from_definition(mp["struct"][0], mp["struct"][1])
                    continue
                self.__store_part(mp["struct"], mp["partnum"], multisubtype)

    def has_attachments(self):
        return len(self.attachments)

    def find_attachment(self, pnum):
        for att in self.attachments:
            if pnum == att["pnum"]:
                return att
        return None


@six.add_metaclass(ConnectionsManager)
class IMAPconnector(object):

    """The IMAPv4 connector."""

    namespaces_pattern = re.compile(r'(\(\(.+?\)\)|NIL)')
    namespace_pattern = re.compile(
        r'\("(?P<prefix>.*?)" "(?P<delimiter>.+?)"\)')
    list_base_pattern = (
        r'\((?P<flags>.*?)\) "(?P<delimiter>.*)" "?(?P<name>[^"]*)"?'
    )
    list_response_pattern_literal = re.compile(
        r'\((?P<flags>.*?)\) "(?P<delimiter>.*)" \{(?P<namelen>\d+)\}')
    list_response_pattern = re.compile(list_base_pattern)
    listextended_response_pattern = \
        re.compile(list_base_pattern + r'\s*(?P<childinfo>.*)')
    unseen_pattern = re.compile(r'[^\(]+\(UNSEEN (\d+)\)')

    def __init__(self, user=None, password=None):
        self.__hdelimiter = None
        self.__ns_prefixes = {}
        self.quota_usage = -1
        self.criterions = []
        self.conf = dict(param_tools.get_global_parameters("modoboa_webmail"))
        self.address = self.conf["imap_server"]
        self.port = self.conf["imap_port"]
        self.login(user, password)
        self.load_namespaces()

    def _cmd(self, name, *args, **kwargs):
        """IMAP command wrapper

        To simplify errors handling, this wrapper calls the
        appropriate method (``uid`` or FIXME) and then check the
        return code. If an error has occured, an ``ImapError``
        exception is raised.

        For specific commands commands (FETCH, ...), the result is
        parsed using the IMAPclient module before being returned.

        :param name: the command's name
        :return: the command's result
        """
        if name in ['FETCH', 'SORT', 'STORE', 'COPY', 'SEARCH']:
            try:
                typ, data = self.m.uid(name, *args)
            except imaplib.IMAP4.error as e:
                raise ImapError(e)
            if typ == "NO":
                raise ImapError(data)
            if name == 'FETCH':
                return FetchResponseParser().parse(data)
            return data

        try:
            typ, data = self.m._simple_command(name, *args)
        except imaplib.IMAP4.error as e:
            raise ImapError(e)
        if typ == "NO":
            raise ImapError(data)
        if 'responses' not in kwargs:
            if name not in self.m.untagged_responses:
                return None
            return self.m.untagged_responses.pop(name)
        res = []
        for r in kwargs['responses']:
            if r not in self.m.untagged_responses:
                return None
            res.append(self.m.untagged_responses.pop(r))
        return res

    @property
    def hdelimiter(self):
        """Return the default hierachy delimiter.

        :return: a string
        """
        if self.__hdelimiter is None:
            raise InternalError(
                _("Failed to retrieve hierarchy delimiter"))
        return self.__hdelimiter

    def refresh(self, user, password):
        """Check if current connection needs a refresh

        Is it really secure?
        """
        if self.m is not None:
            try:
                self._cmd("NOOP")
            except ImapError:
                if hasattr(self, "current_mailbox"):
                    del self.current_mailbox
            else:
                return

        self.login(user, password)

    def login(self, user, passwd):
        """Custom login method

        We connect to the server, issue a LOGIN command. If
        successfull, we try to record a eventuel CAPABILITY untagged
        response. Otherwise, we issue the command.

        :param user: username
        :param passwd: password
        """
        try:
            if self.conf["imap_secured"]:
                self.m = imaplib.IMAP4_SSL(self.address, self.port)
            else:
                self.m = imaplib.IMAP4(self.address, self.port)
        except (socket.error, imaplib.IMAP4.error, ssl.SSLError) as error:
            raise ImapError(_("Connection to IMAP server failed: %s" % error))

        passwd = self.m._quote(passwd)
        data = self._cmd("LOGIN", smart_bytes(user), smart_bytes(passwd))
        self.m.state = "AUTH"
        if "CAPABILITY" in self.m.untagged_responses:
            self.capabilities = (
                self.m.untagged_responses.pop('CAPABILITY')[0]
                .decode().split())
        else:
            data = self._cmd("CAPABILITY")
            self.capabilities = data[0].decode().split()

    def logout(self):
        """Logout from server."""
        try:
            self._cmd("CHECK")
        except ImapError:
            pass
        self._cmd("LOGOUT")
        del self.m
        self.m = None
        if hasattr(self, "current_mailbox"):
            del self.current_mailbox

    def load_namespaces(self):
        """Load available namespaces."""
        data = self._cmd("NAMESPACE")
        nslist = self.namespaces_pattern.findall(data[0].decode())
        for pos, item in enumerate(["personal", "others", "public"]):
            if nslist[pos] == "NIL":
                continue
            ns = nslist[pos][1:-1]
            for m in self.namespace_pattern.finditer(ns):
                if self.__hdelimiter is None:
                    self.__hdelimiter = m.group("delimiter")
                if item not in self.__ns_prefixes:
                    self.__ns_prefixes[item] = []
                self.__ns_prefixes[item].append(m.group("prefix"))

    def parse_search_parameters(self, criterion, pattern):
        """Parse search information and apply them."""

        def or_criterion(old, c):
            if old == "":
                return c
            return "OR (%s) (%s)" % (old, c)

        if criterion == u"both":
            criterion = u"from_addr, subject"
        criterions = ""
        for c in criterion.split(','):
            if c == "from_addr":
                key = "FROM"
            elif c == "subject":
                key = "SUBJECT"
            else:
                continue
            criterions = or_criterion(
                criterions, '(%s "%s")' % (key, pattern))
        if six.PY3:
            criterions = bytearray(criterions, "utf-8")
        elif isinstance(criterions, six.text_type):
            criterions = criterions.encode("utf-8")
        self.criterions = [criterions]

    def messages_count(self, **kwargs):
        """An enhanced version of messages_count

        With IMAP, to know how many messages a mailbox contains, we
        have to make a request to the server. To avoid requests
        multiplications, we sort messages in the same time. This will
        be usefull for other methods.

        :param order: sorting order
        :param folder: mailbox to scan
        """
        if "order" in kwargs and kwargs["order"]:
            sign = kwargs["order"][:1]
            criterion = kwargs["order"][1:].upper()
            if sign == '-':
                criterion = "REVERSE %s" % criterion
        else:
            criterion = "REVERSE DATE"
        folder = kwargs["folder"] if "folder" in kwargs else None

        # FIXME: pourquoi suis je obligé de faire un SELECT ici?  un
        # EXAMINE plante mais je pense que c'est du à une mauvaise
        # lecture des réponses de ma part...
        self.select_mailbox(folder, readonly=False)
        cmdname = "SORT" if six.PY3 else b"SORT"
        data = self._cmd(
            cmdname,
            bytearray("(%s)" % criterion, "utf-8"),
            b"UTF-8", b"(NOT DELETED)", *self.criterions)
        self.messages = data[0].decode().split()
        self.getquota(folder)
        return len(self.messages)

    def select_mailbox(self, name, readonly=True, force=False):
        """Issue a SELECT/EXAMINE command to the server

        The given name is first 'imap-utf7' encoded.

        :param name: mailbox's name
        :param readonly:
        """
        if hasattr(self, "current_mailbox"):
            if self.current_mailbox == name and not force:
                return
        self.current_mailbox = name
        name = self._encode_mbox_name(name)
        if readonly:
            self._cmd("EXAMINE", name)
        else:
            self._cmd("SELECT", name)
        self.m.state = "SELECTED"

    def unseen_messages(self, mailbox):
        """Return the number of unseen messages

        :param mailbox: the mailbox's name
        :return: an integer
        """
        data = self._cmd(
            "STATUS", self._encode_mbox_name(mailbox), "(UNSEEN)")
        m = self.unseen_pattern.match(data[-1].decode())
        if m is None:
            return 0
        return int(m.group(1))

    def _encode_mbox_name(self, folder):
        """Encode folder name (str) to imap4-utf-7 and quote it."""
        if not folder:
            return "INBOX"
        return b'"' + folder.encode("imap4-utf-7") + b'"'

    def _parse_mailbox_name(self, descr, prefix, delimiter, parts):
        if not len(parts):
            return False
        path = "%s%s%s" % (prefix, delimiter, parts[0])
        sdescr = None
        for d in descr:
            if d["path"] == path:
                sdescr = d
                break
        if sdescr is None:
            sdescr = {"name": parts[0], "path": path, "sub": []}
            descr += [sdescr]
        if self._parse_mailbox_name(sdescr["sub"], path, delimiter, parts[1:]):
            sdescr["class"] = "subfolders"
        return True

    def _listmboxes_simple(self, topmailbox='INBOX', mailboxes=None, **kwargs):
        # data = self._cmd("LIST", "", "*")
        if not mailboxes:
            mailboxes = []
        (status, data) = self.m.list()
        newmboxes = []
        for mb in data:
            flags, delimiter, name = self.list_response_pattern.match(
                mb.decode()).groups()
            name = bytearray(name.strip('"'), "utf-8").decode("imap4-utf-7")
            mdm_found = False
            for idx, mdm in enumerate(mailboxes):
                if mdm["name"] == name:
                    mdm_found = True
                    descr = mailboxes[idx]
                    break
            if not mdm_found:
                descr = {"name": name}
                newmboxes += [descr]

            if re.search(r"\%s" % delimiter, name):
                parts = name.split(delimiter)
                if "path" not in descr:
                    descr["path"] = parts[0]
                    descr["sub"] = []
                if self._parse_mailbox_name(descr["sub"], parts[0], delimiter,
                                            parts[1:]):
                    descr["class"] = "subfolders"
                continue

        from operator import itemgetter
        mailboxes += sorted(newmboxes, key=itemgetter("name"))

    @capability('LIST-EXTENDED', '_listmboxes_simple')
    def _listmboxes(self, topmailbox, mailboxes, until_mailbox=None):
        """Retrieve mailboxes list."""
        pattern = (
            '"{0}{1}%"'.format(
                topmailbox.encode("imap4-utf-7").decode(), self.hdelimiter)
            if topmailbox else "%"
        )
        resp = self._cmd(
            "LIST", '""', pattern, "RETURN", "(CHILDREN STATUS (MESSAGES))")
        newmboxes = []
        for mb in resp:
            if not mb:
                continue
            if type(mb) in [list, tuple]:
                flags, delimiter, namelen = (
                    self.list_response_pattern_literal.match(
                        mb[0].decode()).groups()
                )
                name = mb[1][0:int(namelen)]
            else:
                flags, delimiter, name, childinfo = (
                    self.listextended_response_pattern.match(
                        mb.decode()).groups())
            flags = flags.split(" ")
            name = bytearray(name, "utf-8")
            name = name.decode("imap4-utf-7")
            mdm_found = False
            for idx, mdm in enumerate(mailboxes):
                if mdm["name"] == name:
                    mdm_found = True
                    descr = mailboxes[idx]
                    break
            if not mdm_found:
                descr = {"name": name}
                newmboxes += [descr]

            if '\\Marked' in flags or '\\UnMarked' not in flags:
                descr["send_status"] = True
            if r'\NonExistent' in flags:
                descr["removed"] = True
            if '\\HasChildren' in flags:
                descr["path"] = name
                descr["sub"] = []
                if until_mailbox and until_mailbox.startswith(name):
                    self._listmboxes(name, descr["sub"], until_mailbox)

        from operator import itemgetter
        mailboxes += sorted(newmboxes, key=itemgetter("name"))

    def getmboxes(
            self, user, topmailbox='', until_mailbox=None,
            unseen_messages=True):
        """Returns a list of mailboxes for a particular user

        By default, only the first level of mailboxes under
        ``topmailbox`` is returned. If ``until_mailbox`` is specified,
        all levels needed to access this mailbox will be returned.

        :param user: a ``User`` instance
        :param topmailbox: the mailbox where to start in the tree
        :param until_mailbox: the deepest needed mailbox
        :param unseen_messages: include unseen messages counters or not
        :return: a list
        """
        if topmailbox:
            md_mailboxes = []
        else:
            md_mailboxes = [
                {"name": "INBOX", "class": "fa fa-inbox",
                 "label": _("Inbox")},
                {"name": user.parameters.get_value("drafts_folder"),
                 "class": "fa fa-file", "label": _("Drafts")},
                {"name": user.parameters.get_value("junk_folder"),
                 "class": "fa fa-fire", "label": _("Junk")},
                {"name": user.parameters.get_value("sent_folder"),
                 "class": "fa fa-envelope", "label": _("Sent")},
                {"name": user.parameters.get_value("trash_folder"),
                 "class": "fa fa-trash", "label": _("Trash")}
            ]
        if until_mailbox:
            name, parent = separate_mailbox(until_mailbox, self.hdelimiter)
            if parent:
                until_mailbox = parent
        self._listmboxes(topmailbox, md_mailboxes, until_mailbox)

        if unseen_messages:
            for mb in md_mailboxes:
                if "send_status" not in mb:
                    continue
                del mb["send_status"]
                key = "path" if "path" in mb else "name"
                if mb.get("removed", False):
                    continue
                count = self.unseen_messages(mb[key])
                if count == 0:
                    continue
                mb["unseen"] = count
        return md_mailboxes

    def _add_flag(self, mbox, msgset, flag):
        """Add flag to a messages set.

        :param mbox: the mailbox containing the messages
        :param msgset: messages set (uid)
        :param flag: the flag to add
        """
        self.select_mailbox(mbox, False)
        self._cmd("STORE", msgset, "+FLAGS", flag)

    def _remove_flag(self, mbox, msgset, flag):
        """Remove flag from a message set.

        :param mbox: the mailbox containing the messages
        :param msgset: messages set (uid)
        :param flag: the flag to remove
        """
        self.select_mailbox(mbox, False)
        self._cmd("STORE", msgset, "-FLAGS", flag)

    def mark_messages_unread(self, mbox, msgset):
        """Mark a set of messages as unread

        :param mbox: the mailbox containing the messages
        :param msgset: messages set (uid)
        """
        self._remove_flag(mbox, msgset, r'(\Seen)')

    def mark_messages_read(self, mbox, msgset):
        """Mark a set of messages as unread

        :param mbox: the mailbox containing the messages
        :param msgset: messages set (uid)
        """
        self._add_flag(mbox, msgset, r'(\Seen)')

    def mark_messages_flagged(self, mbox, msgset):
        """Mark a set of messages as flagged.

        :param mbox: the mailbox containing the messages
        :param msgset: messages set (uid)
        """
        self._add_flag(mbox, msgset, r'(\Flagged)')

    def mark_messages_unflagged(self, mbox, msgset):
        """Mark a set of messages as unflagged.

        :param mbox: the mailbox containing the messages
        :param msgset: messages set (uid)
        """
        self._remove_flag(mbox, msgset, r'(\Flagged)')

    def msg_forwarded(self, mailbox, mailid):
        self._add_flag(mailbox, mailid, '($Forwarded)')

    def msg_answered(self, mailbox, mailid):
        """Add the \Answered flag to this email"""
        self._add_flag(mailbox, mailid, r'(\Answered)')

    def move(self, msgset, oldmailbox, newmailbox):
        """Move messages between mailboxes."""
        self.select_mailbox(oldmailbox, False)
        self._cmd("COPY", msgset, self._encode_mbox_name(newmailbox))
        self._cmd("STORE", msgset, "+FLAGS", r'(\Deleted \Seen)')

    def push_mail(self, folder, msg):
        now = imaplib.Time2Internaldate(time.time())
        msg = bytes(msg) if six.PY3 else str(msg)
        return self.m.append(
            self._encode_mbox_name(folder), r'(\Seen)', now, msg)

    def empty(self, mbox):
        self.select_mailbox(mbox, False)
        resp = self._cmd("SEARCH", "ALL")
        seq = b",".join(resp[0].split())
        if seq == b"":
            return
        self._cmd("STORE", seq, "+FLAGS", r'(\Deleted)')
        self._cmd("EXPUNGE")

    def compact(self, mbox):
        """Compact a specific mailbox

        Issue an EXPUNGE command for the specified mailbox.

        :param mbox: the mailbox's name
        """
        self.select_mailbox(mbox, False)
        self._cmd("EXPUNGE")

    def create_folder(self, name, parent=None):
        if parent is not None:
            name = "%s%s%s" % (parent, self.hdelimiter, name)
        typ, data = self.m.create(self._encode_mbox_name(name))
        if typ == "NO":
            raise WebmailInternalError(data[0])
        return True

    def rename_folder(self, oldname, newname):
        typ, data = self.m.rename(self._encode_mbox_name(oldname),
                                  self._encode_mbox_name(newname))
        if typ == "NO":
            raise WebmailInternalError(data[0], ajax=True)
        return True

    def delete_folder(self, name):
        typ, data = self.m.delete(self._encode_mbox_name(name))
        if typ == "NO":
            raise WebmailInternalError(data[0])
        return True

    def getquota(self, mailbox):
        """Retrieve quota information from the server.

        We also compute the current usage.
        """
        if "QUOTA" not in self.capabilities:
            self.quota_limit = self.quota_current = None
            return
        try:
            data = self._cmd("GETQUOTAROOT", self._encode_mbox_name(mailbox),
                             responses=["QUOTAROOT", "QUOTA"])
        except ImapError:
            data = None
        finally:
            if data is None:
                self.quota_limit = self.quota_current = None
                return

        quotadef = data[1][0].decode()
        m = re.search(r"\(STORAGE (\d+) (\d+)\)", quotadef)
        if not m:
            print("Problem while parsing quota def")
            return
        self.quota_limit = int(m.group(2))
        self.quota_current = int(m.group(1))
        try:
            self.quota_usage = (
                int(float(self.quota_current) / float(self.quota_limit) * 100)
            )
        except TypeError:
            self.quota_usage = -1

    def fetchpart(self, uid, mbox, partnum):
        """Retrieve a specific message part

        Useful to fetch attachments from the server. Part headers and
        the payload are returned separatly.

        :param uid: a message UID
        :param mbox: the mailbox containing the message
        :param partnum: the part number
        :return: a 2uple (dict, string)
        """
        self.select_mailbox(mbox, False)
        data = self._cmd("FETCH", uid, "(BODYSTRUCTURE BODY[%s])" % partnum)
        bs = BodyStructure(data[int(uid)]["BODYSTRUCTURE"])
        attdef = bs.find_attachment(partnum)
        return attdef, data[int(uid)]["BODY[%s]" % partnum]

    def fetch(self, start, stop=None, mbox=None):
        """Retrieve information about messages from the server

        Issue a FETCH command to retrieve information about one or
        more messages (such as headers) from the server.

        :param start: index of the first message
        :param stop: index of the last message (optionnal)
        :param mbox: the mailbox that contains the messages
        """
        self.select_mailbox(mbox, False)
        if start and stop:
            submessages = self.messages[start - 1:stop]
            mrange = ",".join(submessages)
        else:
            submessages = [start]
            mrange = start
        headers = "DATE FROM TO CC SUBJECT"
        query = (
            "(FLAGS BODYSTRUCTURE RFC822.SIZE BODY.PEEK[HEADER.FIELDS ({})])"
            .format(headers)
        )
        data = self._cmd("FETCH", mrange, query)
        result = []
        for uid in submessages:
            msg_data = data[int(uid)]
            msg = email.message_from_string(
                msg_data["BODY[HEADER.FIELDS ({})]".format(headers)]
            )
            msg["imapid"] = uid
            msg["size"] = msg_data["RFC822.SIZE"]
            if r"\Seen" not in msg_data["FLAGS"]:
                msg["style"] = "unseen"
            if r"\Answered" in msg_data["FLAGS"]:
                msg["answered"] = True
            if r"$Forwarded" in msg_data["FLAGS"]:
                msg["forwarded"] = True
            if r"\Flagged" in msg_data["FLAGS"]:
                msg["flagged"] = True
            bstruct = BodyStructure(msg_data["BODYSTRUCTURE"])
            if bstruct.has_attachments():
                msg["attachments"] = True
            result += [msg]
        return result

    def fetchmail(self, mbox, mailid, readonly=True, what="bodystructure"):
        """Retrieve information about a specific message

        Issue a FETCH command to retrieve a message's content from the
        server. In order to not overload the server, we first retrieve
        the BODYSTRUCTURE of the message. Then, according to the
        result and to the user's preferences, we retrieve the
        appropriate content (plain, html, etc.).

        :param mbox: the mailbox containing the message
        :param mailid: the message's unique id
        :param readonly:
        :param headers:
        """
        self.select_mailbox(mbox, readonly)
        if what == "bodystructure":
            to_fetch = "(BODYSTRUCTURE)"
        elif what == "source":
            to_fetch = "(BODY[])"
        else:
            bcmd = "BODY.PEEK" if readonly else "BODY"
            to_fetch = "(BODYSTRUCTURE {}[HEADER.FIELDS ({})])".format(
                bcmd, what)
        data = self._cmd("FETCH", mailid, to_fetch)
        return data[int(mailid)]


def separate_mailbox(fullname, sep="."):
    """Split a mailbox name

    If a separator is found in ``fullname``, this function returns the
    corresponding name and parent mailbox name.
    """
    if sep in fullname:
        parts = fullname.split(sep)
        name = parts[-1]
        parent = sep.join(parts[0:len(parts) - 1])
        return name, parent
    return fullname, None


def get_imapconnector(request):
    """Simple shortcut to create a connector

    :param request: a ``Request`` object
    """
    imapc = IMAPconnector(user=request.user.username,
                          password=request.session["password"])
    return imapc


if __name__ == "__main__":
    import doctest
    doctest.testmod()

Anon7 - 2022
AnonSec Team