Server IP : 85.214.239.14 / Your IP : 3.15.3.17 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/requests_toolbelt/multipart/ |
Upload File : |
# -*- coding: utf-8 -*- """ requests_toolbelt.multipart.decoder =================================== This holds all the implementation details of the MultipartDecoder """ import sys import email.parser from .encoder import encode_with from requests.structures import CaseInsensitiveDict def _split_on_find(content, bound): point = content.find(bound) return content[:point], content[point + len(bound):] class ImproperBodyPartContentException(Exception): pass class NonMultipartContentTypeException(Exception): pass def _header_parser(string, encoding): major = sys.version_info[0] if major == 3: string = string.decode(encoding) headers = email.parser.HeaderParser().parsestr(string).items() return ( (encode_with(k, encoding), encode_with(v, encoding)) for k, v in headers ) class BodyPart(object): """ The ``BodyPart`` object is a ``Response``-like interface to an individual subpart of a multipart response. It is expected that these will generally be created by objects of the ``MultipartDecoder`` class. Like ``Response``, there is a ``CaseInsensitiveDict`` object named headers, ``content`` to access bytes, ``text`` to access unicode, and ``encoding`` to access the unicode codec. """ def __init__(self, content, encoding): self.encoding = encoding headers = {} # Split into header section (if any) and the content if b'\r\n\r\n' in content: first, self.content = _split_on_find(content, b'\r\n\r\n') if first != b'': headers = _header_parser(first.lstrip(), encoding) else: raise ImproperBodyPartContentException( 'content does not contain CR-LF-CR-LF' ) self.headers = CaseInsensitiveDict(headers) @property def text(self): """Content of the ``BodyPart`` in unicode.""" return self.content.decode(self.encoding) class MultipartDecoder(object): """ The ``MultipartDecoder`` object parses the multipart payload of a bytestring into a tuple of ``Response``-like ``BodyPart`` objects. The basic usage is:: import requests from requests_toolbelt import MultipartDecoder response = requests.get(url) decoder = MultipartDecoder.from_response(response) for part in decoder.parts: print(part.headers['content-type']) If the multipart content is not from a response, basic usage is:: from requests_toolbelt import MultipartDecoder decoder = MultipartDecoder(content, content_type) for part in decoder.parts: print(part.headers['content-type']) For both these usages, there is an optional ``encoding`` parameter. This is a string, which is the name of the unicode codec to use (default is ``'utf-8'``). """ def __init__(self, content, content_type, encoding='utf-8'): #: Original Content-Type header self.content_type = content_type #: Response body encoding self.encoding = encoding #: Parsed parts of the multipart response body self.parts = tuple() self._find_boundary() self._parse_body(content) def _find_boundary(self): ct_info = tuple(x.strip() for x in self.content_type.split(';')) mimetype = ct_info[0] if mimetype.split('/')[0].lower() != 'multipart': raise NonMultipartContentTypeException( "Unexpected mimetype in content-type: '{}'".format(mimetype) ) for item in ct_info[1:]: attr, value = _split_on_find( item, '=' ) if attr.lower() == 'boundary': self.boundary = encode_with(value.strip('"'), self.encoding) @staticmethod def _fix_first_part(part, boundary_marker): bm_len = len(boundary_marker) if boundary_marker == part[:bm_len]: return part[bm_len:] else: return part def _parse_body(self, content): boundary = b''.join((b'--', self.boundary)) def body_part(part): fixed = MultipartDecoder._fix_first_part(part, boundary) return BodyPart(fixed, self.encoding) def test_part(part): return (part != b'' and part != b'\r\n' and part[:4] != b'--\r\n' and part != b'--') parts = content.split(b''.join((b'\r\n', boundary))) self.parts = tuple(body_part(x) for x in parts if test_part(x)) @classmethod def from_response(cls, response, encoding='utf-8'): content = response.content content_type = response.headers.get('content-type', None) return cls(content, content_type, encoding)