Server IP : 85.214.239.14 / Your IP : 18.119.166.141 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/modoboa/env/lib64/python3.5/site-packages/modoboa_webmail/lib/ |
Upload File : |
# coding: utf-8 """Simple parser for FETCH responses. The ``imaplib`` module doesn't parse IMAP responses, it returns raw values. Since Modoboa relies on BODYSTRUCTURE attributes to display messages (we don't want to overload the server), a parser is required. Python 2/3 compatibility note: this parser excepts bytes objects when run with Python3 and str (not unicode) ones with Python2. """ from __future__ import print_function import re import chardet import six class ParseError(Exception): """Generic parsing error""" pass class Lexer(object): """The lexical analysis part. This class provides a simple way to define tokens (with patterns) to be detected. Patterns are provided using a list of 2-uple. Each 2-uple consists of a token name and an associated pattern. Example: [("left_bracket", r'\['),] """ def __init__(self, definitions): self.definitions = definitions parts = [] for name, part in definitions: parts.append(r"(?P<%s>%s)" % (name, part)) self.regexpString = "|".join(parts) self.regexp = re.compile(self.regexpString, re.MULTILINE) self.wsregexp = re.compile(r"\s+", re.M) def curlineno(self): """Return the current line number""" return self.text[:self.pos].count("\n") + 1 def scan(self, text): """Analyze some data. Analyse the passed content. Each time a token is recognized, a 2-uple containing its name and the parsed value is raised (using yield). :param text: a string containing the data to parse :raises: ParseError """ self.pos = 0 self.text = text while self.pos < len(text): m = self.wsregexp.match(text, self.pos) if m is not None: self.pos = m.end() continue m = self.regexp.match(text, self.pos) if m is None: raise ParseError("unknown token {}".format(text[self.pos:])) self.pos = m.end() yield (m.lastgroup, m.group(m.lastgroup)) class FetchResponseParser(object): """A token generator. By *token*, I mean: *literal*, *quoted* or anything else until the next ' ' or ')' character (number, NIL and others should fall into this last category). """ rules = [ ("left_parenthesis", r'\('), ("right_parenthesis", r'\)'), ("string", r'"([^"\\]|\\.)*"'), ("nil", r'NIL'), ("data_item", r"(?P<name>[A-Z][A-Z\.0-9]+)" r"(?P<section>\[.*\])?(?P<origin_octet>\<\d+\>)?"), ("number", r'[0-9]+'), ("literal_marker", r'{\d+}'), ("flag", r'(\\|\$)?[a-zA-Z0-9\-_]+'), ] def __init__(self): """Constructor.""" self.lexer = Lexer(self.rules) self.__reset_parser() def __reset_parser(self): """Reset parser states.""" self.result = {} self.__current_message = {} self.__next_literal_len = 0 self.__cur_data_item = None self.__args_parsing_func = None self.__expected = None self.__depth = 0 self.__bs_stack = [] def set_expected(self, *args): """Indicate next expected token types.""" self.__expected = args def __default_args_parser(self, ttype, tvalue): """Default arguments parser.""" self.__current_message[self.__cur_data_item] = tvalue self.__args_parsing_func = None def __flags_args_parser(self, ttype, tvalue): """FLAGS arguments parser.""" if ttype == "left_parenthesis": self.__current_message[self.__cur_data_item] = [] self.__depth += 1 elif ttype == "flag": self.__current_message[self.__cur_data_item].append(tvalue) self.set_expected("flag", "right_parenthesis") elif ttype == "right_parenthesis": self.__args_parsing_func = None self.__depth -= 1 else: raise ParseError( "Unexpected token found: {}".format(ttype)) def __set_part_numbers(self, bs, prefix=""): """Set part numbers.""" cpt = 1 for mp in bs: if isinstance(mp, list): self.__set_part_numbers(mp, prefix) elif isinstance(mp, dict): if isinstance(mp["struct"][0], list): nprefix = "{}{}.".format(prefix, cpt) self.__set_part_numbers(mp["struct"][0], nprefix) mp["partnum"] = "{}{}".format(prefix, cpt) cpt += 1 def __bstruct_args_parser(self, ttype, tvalue): """BODYSTRUCTURE arguments parser.""" if ttype == "left_parenthesis": self.__bs_stack = [[]] + self.__bs_stack return if ttype == "right_parenthesis": if len(self.__bs_stack) > 1: part = self.__bs_stack.pop(0) # Check if we are parsing a list of mime part or a # list or arguments. condition = ( len(self.__bs_stack[0]) > 0 and not isinstance(self.__bs_stack[0][0], dict)) if condition: self.__bs_stack[0].append(part) else: self.__bs_stack[0].append({"struct": part}) else: # End of BODYSTRUCTURE if not isinstance(self.__bs_stack[0][0], list): # Special case for non multipart structures self.__bs_stack[0] = {"struct": self.__bs_stack[0]} self.__set_part_numbers(self.__bs_stack) self.__current_message[self.__cur_data_item] = self.__bs_stack self.__bs_stack = [] self.__args_parsing_func = None return if ttype == "string": tvalue = tvalue.strip('"') # Check if previous element was a mime part. If so, we are # dealing with a 'multipart' mime part... condition = ( len(self.__bs_stack[0]) and isinstance(self.__bs_stack[0][-1], dict)) if condition: self.__bs_stack[0] = [self.__bs_stack[0]] + [tvalue] return elif ttype == "number": tvalue = int(tvalue) self.__bs_stack[0].append(tvalue) def __parse_data_item(self, ttype, tvalue): """Find next data item.""" if ttype == "data_item": self.__cur_data_item = tvalue if tvalue == "BODYSTRUCTURE": self.set_expected("left_parenthesis") self.__args_parsing_func = self.__bstruct_args_parser elif tvalue == "FLAGS": self.set_expected("left_parenthesis") self.__args_parsing_func = self.__flags_args_parser else: self.__args_parsing_func = self.__default_args_parser return elif ttype == "right_parenthesis": self.__depth -= 1 assert self.__depth == 0 # FIXME: sometimes, FLAGS are returned outside the UID # scope (see sample 1 in tests). For now, we just ignore # them but we need a better solution! if "UID" in self.__current_message: self.result[int(self.__current_message.pop("UID"))] = ( self.__current_message) self.__current_message = {} return raise ParseError( "unexpected {} found while looking for data_item near {}" .format(ttype, tvalue)) def __convert_to_str(self, chunk): """Convert chunk to str and guess encoding.""" condition = ( six.PY2 and isinstance(chunk, six.text_type) or six.PY3 and isinstance(chunk, six.binary_type) ) if not condition: return chunk try: chunk = chunk.decode("utf-8") except UnicodeDecodeError: pass else: return chunk try: result = chardet.detect(chunk) except UnicodeDecodeError: raise RuntimeError("Can't find string encoding") return chunk.decode(result["encoding"]) def parse_chunk(self, chunk): """Parse chunk.""" if not chunk: return chunk = self.__convert_to_str(chunk) if self.__next_literal_len: literal = chunk[:self.__next_literal_len] chunk = chunk[self.__next_literal_len:] self.__next_literal_len = 0 if self.__cur_data_item != "BODYSTRUCTURE": self.__current_message[self.__cur_data_item] = literal self.__args_parsing_func = None else: self.__args_parsing_func("literal", literal) for ttype, tvalue in self.lexer.scan(chunk): if self.__expected is not None: if ttype not in self.__expected: raise ParseError( "unexpected {} found while looking for {}" .format(ttype, "|".join(self.__expected))) self.__expected = None if ttype == "literal_marker": self.__next_literal_len = int(tvalue[1:-1]) continue elif self.__depth == 0: if ttype == "number": # We should start here with a message ID self.set_expected("left_parenthesis") if ttype == "left_parenthesis": self.__depth += 1 continue elif self.__args_parsing_func is None: self.__parse_data_item(ttype, tvalue) continue self.__args_parsing_func(ttype, tvalue) def parse(self, data): """Parse received data.""" self.__reset_parser() for chunk in data: if isinstance(chunk, tuple): for schunk in chunk: self.parse_chunk(schunk) else: self.parse_chunk(chunk) return self.result