Server IP : 85.214.239.14 / Your IP : 18.221.59.242 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/task/3/root/lib/python3/dist-packages/httpx/ |
Upload File : |
""" Handlers for Content-Encoding. See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding """ import codecs import io import typing import zlib from ._compat import brotli from ._exceptions import DecodingError class ContentDecoder: def decode(self, data: bytes) -> bytes: raise NotImplementedError() # pragma: no cover def flush(self) -> bytes: raise NotImplementedError() # pragma: no cover class IdentityDecoder(ContentDecoder): """ Handle unencoded data. """ def decode(self, data: bytes) -> bytes: return data def flush(self) -> bytes: return b"" class DeflateDecoder(ContentDecoder): """ Handle 'deflate' decoding. See: https://stackoverflow.com/questions/1838699 """ def __init__(self) -> None: self.first_attempt = True self.decompressor = zlib.decompressobj() def decode(self, data: bytes) -> bytes: was_first_attempt = self.first_attempt self.first_attempt = False try: return self.decompressor.decompress(data) except zlib.error as exc: if was_first_attempt: self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS) return self.decode(data) raise DecodingError(str(exc)) from exc def flush(self) -> bytes: try: return self.decompressor.flush() except zlib.error as exc: # pragma: no cover raise DecodingError(str(exc)) from exc class GZipDecoder(ContentDecoder): """ Handle 'gzip' decoding. See: https://stackoverflow.com/questions/1838699 """ def __init__(self) -> None: self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16) def decode(self, data: bytes) -> bytes: try: return self.decompressor.decompress(data) except zlib.error as exc: raise DecodingError(str(exc)) from exc def flush(self) -> bytes: try: return self.decompressor.flush() except zlib.error as exc: # pragma: no cover raise DecodingError(str(exc)) from exc class BrotliDecoder(ContentDecoder): """ Handle 'brotli' decoding. Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/ or `pip install brotli`. See https://github.com/google/brotli Supports both 'brotlipy' and 'Brotli' packages since they share an import name. The top branches are for 'brotlipy' and bottom branches for 'Brotli' """ def __init__(self) -> None: if brotli is None: # pragma: no cover raise ImportError( "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' " "packages have been installed. " "Make sure to install httpx using `pip install httpx[brotli]`." ) from None self.decompressor = brotli.Decompressor() self.seen_data = False self._decompress: typing.Callable[[bytes], bytes] if hasattr(self.decompressor, "decompress"): # The 'brotlicffi' package. self._decompress = self.decompressor.decompress # pragma: no cover else: # The 'brotli' package. self._decompress = self.decompressor.process # pragma: no cover def decode(self, data: bytes) -> bytes: if not data: return b"" self.seen_data = True try: return self._decompress(data) except brotli.error as exc: raise DecodingError(str(exc)) from exc def flush(self) -> bytes: if not self.seen_data: return b"" try: if hasattr(self.decompressor, "finish"): # Only available in the 'brotlicffi' package. # As the decompressor decompresses eagerly, this # will never actually emit any data. However, it will potentially throw # errors if a truncated or damaged data stream has been used. self.decompressor.finish() # pragma: no cover return b"" except brotli.error as exc: # pragma: no cover raise DecodingError(str(exc)) from exc class MultiDecoder(ContentDecoder): """ Handle the case where multiple encodings have been applied. """ def __init__(self, children: typing.Sequence[ContentDecoder]) -> None: """ 'children' should be a sequence of decoders in the order in which each was applied. """ # Note that we reverse the order for decoding. self.children = list(reversed(children)) def decode(self, data: bytes) -> bytes: for child in self.children: data = child.decode(data) return data def flush(self) -> bytes: data = b"" for child in self.children: data = child.decode(data) + child.flush() return data class ByteChunker: """ Handles returning byte content in fixed-size chunks. """ def __init__(self, chunk_size: typing.Optional[int] = None) -> None: self._buffer = io.BytesIO() self._chunk_size = chunk_size def decode(self, content: bytes) -> typing.List[bytes]: if self._chunk_size is None: return [content] if content else [] self._buffer.write(content) if self._buffer.tell() >= self._chunk_size: value = self._buffer.getvalue() chunks = [ value[i : i + self._chunk_size] for i in range(0, len(value), self._chunk_size) ] if len(chunks[-1]) == self._chunk_size: self._buffer.seek(0) self._buffer.truncate() return chunks else: self._buffer.seek(0) self._buffer.write(chunks[-1]) self._buffer.truncate() return chunks[:-1] else: return [] def flush(self) -> typing.List[bytes]: value = self._buffer.getvalue() self._buffer.seek(0) self._buffer.truncate() return [value] if value else [] class TextChunker: """ Handles returning text content in fixed-size chunks. """ def __init__(self, chunk_size: typing.Optional[int] = None) -> None: self._buffer = io.StringIO() self._chunk_size = chunk_size def decode(self, content: str) -> typing.List[str]: if self._chunk_size is None: return [content] self._buffer.write(content) if self._buffer.tell() >= self._chunk_size: value = self._buffer.getvalue() chunks = [ value[i : i + self._chunk_size] for i in range(0, len(value), self._chunk_size) ] if len(chunks[-1]) == self._chunk_size: self._buffer.seek(0) self._buffer.truncate() return chunks else: self._buffer.seek(0) self._buffer.write(chunks[-1]) self._buffer.truncate() return chunks[:-1] else: return [] def flush(self) -> typing.List[str]: value = self._buffer.getvalue() self._buffer.seek(0) self._buffer.truncate() return [value] if value else [] class TextDecoder: """ Handles incrementally decoding bytes into text """ def __init__(self, encoding: str = "utf-8"): self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace") def decode(self, data: bytes) -> str: return self.decoder.decode(data) def flush(self) -> str: return self.decoder.decode(b"", True) class LineDecoder: """ Handles incrementally reading lines from text. Uses universal line decoding, supporting any of `\n`, `\r`, or `\r\n` as line endings, normalizing to `\n`. """ def __init__(self) -> None: self.buffer = "" def decode(self, text: str) -> typing.List[str]: lines = [] if text and self.buffer and self.buffer[-1] == "\r": if text.startswith("\n"): # Handle the case where we have an "\r\n" split across # our previous input, and our new chunk. lines.append(self.buffer[:-1] + "\n") self.buffer = "" text = text[1:] else: # Handle the case where we have "\r" at the end of our # previous input. lines.append(self.buffer[:-1] + "\n") self.buffer = "" while text: num_chars = len(text) for idx in range(num_chars): char = text[idx] next_char = None if idx + 1 == num_chars else text[idx + 1] if char == "\n": lines.append(self.buffer + text[: idx + 1]) self.buffer = "" text = text[idx + 1 :] break elif char == "\r" and next_char == "\n": lines.append(self.buffer + text[:idx] + "\n") self.buffer = "" text = text[idx + 2 :] break elif char == "\r" and next_char is not None: lines.append(self.buffer + text[:idx] + "\n") self.buffer = "" text = text[idx + 1 :] break elif next_char is None: self.buffer += text text = "" break return lines def flush(self) -> typing.List[str]: if self.buffer.endswith("\r"): # Handle the case where we had a trailing '\r', which could have # been a '\r\n' pair. lines = [self.buffer[:-1] + "\n"] elif self.buffer: lines = [self.buffer] else: lines = [] self.buffer = "" return lines SUPPORTED_DECODERS = { "identity": IdentityDecoder, "gzip": GZipDecoder, "deflate": DeflateDecoder, "br": BrotliDecoder, } if brotli is None: SUPPORTED_DECODERS.pop("br") # pragma: no cover